From a03180996020109a5965f20e4d451024fa3b67f8 Mon Sep 17 00:00:00 2001 From: redhat <2292650292@qq.com> Date: Wed, 12 Feb 2025 16:25:14 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E8=87=AA=E5=8A=A8=E7=BB=AD?= =?UTF-8?q?=E6=9C=9F=EF=BC=8C=E8=A7=A3=E5=86=B3=E6=AD=BB=E9=94=81=E5=92=8C?= =?UTF-8?q?=E7=BB=AD=E6=9C=9F=E6=8A=A5=E9=94=99=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 2 +- redisLock/redis.go | 30 +++++++++++++++++++++--------- worker/rpc.go | 6 +++--- worker/service/main.go | 16 ++++++++++++++++ 4 files changed, 41 insertions(+), 13 deletions(-) diff --git a/main.go b/main.go index 25f926f..b1722bb 100644 --- a/main.go +++ b/main.go @@ -27,7 +27,7 @@ func main() { Work: client.CreateRpcWorker(createRpcPoll(*host)), } - e.Run(10) + e.Run(5000) } func createRpcPoll(port string) chan *rpc.Client { diff --git a/redisLock/redis.go b/redisLock/redis.go index cf87254..6f5150b 100644 --- a/redisLock/redis.go +++ b/redisLock/redis.go @@ -33,7 +33,7 @@ func (r *RedisLock) Lock() { script := `if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) - redis.call('expire',KEYS[1],ARGV[2]) + redis.call('pexpire',KEYS[1],ARGV[2]) return 1 else return 0 @@ -47,7 +47,7 @@ func (r *RedisLock) Lock() { case r.lockKeepAliveCh <- struct{}{}: go r.autoReNewExpire() default: - + log.Printf("start renew expore err: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) } log.Printf("lock ok: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) @@ -65,8 +65,17 @@ func (r *RedisLock) Unlock() { elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end` - if r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value).Val().(int64) == 1 { - r.lockKeepDoneCh <- struct{}{} + r.lockKeepDoneCh <- struct{}{} //这里一定要放到if外面,并且进来就要退掉定时线程,开始死锁是因为续期失败 + //导致续期协程return掉,再往lockKeepDoneCh写数据时就会死锁。而且不能只在判断解锁成功时给该管道发送数据。如果解锁失败不掉用写数据 + //会导致续期协程永不退出。ttl时间不能太短,太短可能会导致续期还没有完成key过期。30ms会有问题。网络环境差该值还要继续增大。 + + //for len(r.lockKeepAliveCh) > 0 { + // //<-r.lockKeepAliveCh + //} + + val := r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value).Val() + + if i, ok := val.(int64); ok && i == 1 { log.Printf("unlock ok: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) } else { log.Printf("unlock err: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) @@ -78,19 +87,22 @@ func (r *RedisLock) autoReNewExpire() { <-r.lockKeepAliveCh }() - tc := time.NewTicker(time.Duration(r.kvt.ttl) * time.Second / 3) + log.Printf("auto renew start: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) + + tc := time.NewTicker(time.Duration(r.kvt.ttl) * time.Millisecond / 5) defer tc.Stop() for { select { case <-tc.C: - script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('expire',KEYS[1],ARGV[2]) + script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('pexpire',KEYS[1],ARGV[2]) else return 0 end` - i, ok := r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value, r.kvt.ttl).Val().(int64) - if !ok || i == 0 { + val := r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value, r.kvt.ttl).Val() + + if i, ok := val.(int64); !ok || i == 0 { log.Printf("auto renew err: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) - return + //return } case <-r.lockKeepDoneCh: log.Printf("auto renew down: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) diff --git a/worker/rpc.go b/worker/rpc.go index 82325ca..ac63ba4 100644 --- a/worker/rpc.go +++ b/worker/rpc.go @@ -7,6 +7,7 @@ import ( "goRedisDLM/engine" "goRedisDLM/redisLock" "log" + "math/rand" "strconv" "time" @@ -23,12 +24,11 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro log.Printf("uuid: %s, lock key: %s\n", value.String(), request.Dlock) - lock := redisLock.CreateRedisLock(i.Client, context.Background(), request.Dlock, value.String(), 3) + lock := redisLock.CreateRedisLock(i.Client, context.Background(), request.Dlock, value.String(), 300) lock.Lock() - time.Sleep(time.Second * 5) + time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond) - log.Printf("uuid: %s get dlock\n", value.String()) get := i.Client.Get(context.Background(), "inventory") log.Printf("%s\n", get) diff --git a/worker/service/main.go b/worker/service/main.go index cf4d649..daffe54 100644 --- a/worker/service/main.go +++ b/worker/service/main.go @@ -5,6 +5,7 @@ import ( "goRedisDLM/rpcSupport" "goRedisDLM/worker" "log" + _ "net/http/pprof" "github.com/go-redis/redis/v8" ) @@ -21,6 +22,21 @@ func main() { return } + /* + pprof, err := strconv.Atoi(strings.Trim(*port, ":")) + if err != nil { + log.Println("pprof port error") + + return + } + + pprofPort := fmt.Sprintf(":%d", pprof+100) + + go func() { + log.Println("pprof listen on: ", pprofPort) + log.Fatal(http.ListenAndServe(pprofPort, nil)) + }() + */ redisClient := redis.NewClient(&redis.Options{ Addr: *redisHost, Password: "redhat",