From 5afb8b1aa75c66a7a52b707416821ffb841449ee Mon Sep 17 00:00:00 2001 From: redhat <2292650292@qq.com> Date: Mon, 17 Feb 2025 11:42:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=8E=B7=E5=8F=96=E5=BD=93?= =?UTF-8?q?=E5=89=8D=E9=94=81=E7=8A=B6=E6=80=81=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- redisLock/redis.go | 10 ++++++++-- worker/rpc.go | 28 +++++++++++++++------------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/redisLock/redis.go b/redisLock/redis.go index 794581d..09d2939 100644 --- a/redisLock/redis.go +++ b/redisLock/redis.go @@ -5,6 +5,8 @@ import ( "log" "time" + "github.com/google/uuid" + "github.com/go-redis/redis/v8" ) @@ -22,8 +24,8 @@ type RedisLock struct { kvt redisCommInfo } -func CreateRedisLock(c *redis.Client, ctx context.Context, k, v string, ttl int) *RedisLock { - return &RedisLock{c, ctx, make(chan struct{}, 1), make(chan struct{}), redisCommInfo{k, v, ttl}} +func CreateRedisLock(c *redis.Client, ctx context.Context, k string, ttl int) *RedisLock { + return &RedisLock{c, ctx, make(chan struct{}, 1), make(chan struct{}), redisCommInfo{k, uuid.New().String(), ttl}} } func (r *RedisLock) Lock() { @@ -82,6 +84,10 @@ func (r *RedisLock) Unlock() { } } +func (r *RedisLock) IsHeld() bool { + return r.client.HExists(r.ctx, r.kvt.key, r.kvt.value).Val() +} + func (r *RedisLock) autoReNewExpire() { defer func() { <-r.lockKeepAliveCh diff --git a/worker/rpc.go b/worker/rpc.go index f7218ea..e3ca986 100644 --- a/worker/rpc.go +++ b/worker/rpc.go @@ -3,6 +3,7 @@ package worker import ( "context" "errors" + "fmt" "goRedisDLM/engine" "goRedisDLM/redisLock" "log" @@ -11,7 +12,6 @@ import ( "time" "github.com/go-redis/redis/v8" - "github.com/google/uuid" ) type Inventory struct { @@ -19,13 +19,9 @@ type Inventory struct { } func (i *Inventory) Work(request engine.Request, response *engine.Response) error { - value := uuid.New() - - //log.Printf("uuid: %s, lock key: %s\n", value.String(), request.Dlock) - ctx := context.Background() - lock := redisLock.CreateRedisLock(i.Client, ctx, request.Dlock, value.String(), 100) + lock := redisLock.CreateRedisLock(i.Client, ctx, request.Dlock, 100) lock.Lock() time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond) @@ -41,16 +37,22 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro if inventory >= 0 { //这里应该使用lua脚本,再检测一下当前是否获取到锁,获取到之后再进行扣减。类似双检加锁的思想,因为此时可能会续期失败。 //i.Client.Set(ctx, "inventory", fmt.Sprintf("%d", inventory), redis.KeepTTL) - script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('set',KEYS[2],ARGV[2]) - else return 0 end` - - val := i.Client.Eval(ctx, script, []string{request.Dlock, "inventory"}, value.String(), inventory).Val() - if res, ok := val.(string); ok && res == "OK" { + //script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('set',KEYS[2],ARGV[2]) + // else return 0 end` + // + //val := i.Client.Eval(ctx, script, []string{request.Dlock, "inventory"}, value.String(), inventory).Val() + if lock.IsHeld() { + setString := fmt.Sprintf("%d", inventory) + if i.Client.Set(ctx, "inventory", setString, redis.KeepTTL).Val() != "OK" { + log.Println("error set inventory") + err = errors.New("inventory err") + goto Error + } } else { //任务执行失败需要通知调用者,让调用者自己决策是否需要重新调度还是放弃,这样worker和主线程实现了解偶 - log.Println("error set inventory") - err = errors.New("inventory err") + log.Println("error lock is not held") + err = errors.New("error lock is not held") goto Error } } else {