From ab0612d9b68d284ac5703d3bbea0be484930f480 Mon Sep 17 00:00:00 2001 From: redhat <2292650292@qq.com> Date: Fri, 14 Feb 2025 11:22:35 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A3=E5=87=8F=E5=BA=93=E5=AD=98=E4=BD=BF?= =?UTF-8?q?=E7=94=A8lua=E8=84=9A=E6=9C=AC=EF=BC=8C=E4=BF=9D=E8=AF=81?= =?UTF-8?q?=E9=94=81=E5=AD=98=E5=9C=A8=E6=97=B6=E6=89=8D=E5=8F=AF=E4=BB=A5?= =?UTF-8?q?=E6=93=8D=E4=BD=9C=E3=80=82=E4=B8=BB=E7=A8=8B=E5=BA=8F=E5=AF=B9?= =?UTF-8?q?=E8=BF=94=E5=9B=9E=E6=95=B0=E6=8D=AE=E5=81=9A=E6=A0=A1=E9=AA=8C?= =?UTF-8?q?=EF=BC=8C=E5=AF=B9=E9=94=99=E8=AF=AF=E4=BB=BB=E5=8A=A1=E8=BF=9B?= =?UTF-8?q?=E8=A1=8C=E9=87=8D=E8=AF=95=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- engine/firstengine.go | 7 +++++-- main.go | 2 +- redisLock/redis.go | 2 +- worker/rpc.go | 24 +++++++++++++++++++----- 4 files changed, 26 insertions(+), 9 deletions(-) diff --git a/engine/firstengine.go b/engine/firstengine.go index 664e1dd..09aed02 100644 --- a/engine/firstengine.go +++ b/engine/firstengine.go @@ -26,9 +26,12 @@ func (e *FirstEngine) CreateWorker(in chan Request, out chan Response) { req := <-in work, err := e.Work(req) if err != nil { - log.Printf("work err: %v", err) + log.Printf("work:%v err: %v", req, err) + //出错之后让其他线程重新调度 + e.Scheduler.Submit(req) + } else { + out <- work } - out <- work } }() } diff --git a/main.go b/main.go index b1722bb..2dda547 100644 --- a/main.go +++ b/main.go @@ -23,7 +23,7 @@ func main() { e := engine.FirstEngine{ Scheduler: &scheduler.FirstScheduler{}, - WorkCount: 15, + WorkCount: 5, Work: client.CreateRpcWorker(createRpcPoll(*host)), } diff --git a/redisLock/redis.go b/redisLock/redis.go index 6f5150b..794581d 100644 --- a/redisLock/redis.go +++ b/redisLock/redis.go @@ -87,7 +87,7 @@ func (r *RedisLock) autoReNewExpire() { <-r.lockKeepAliveCh }() - log.Printf("auto renew start: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) + //log.Printf("auto renew start: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) tc := time.NewTicker(time.Duration(r.kvt.ttl) * time.Millisecond / 5) defer tc.Stop() diff --git a/worker/rpc.go b/worker/rpc.go index ac63ba4..f7218ea 100644 --- a/worker/rpc.go +++ b/worker/rpc.go @@ -3,7 +3,6 @@ package worker import ( "context" "errors" - "fmt" "goRedisDLM/engine" "goRedisDLM/redisLock" "log" @@ -22,14 +21,16 @@ type Inventory struct { func (i *Inventory) Work(request engine.Request, response *engine.Response) error { value := uuid.New() - log.Printf("uuid: %s, lock key: %s\n", value.String(), request.Dlock) + //log.Printf("uuid: %s, lock key: %s\n", value.String(), request.Dlock) - lock := redisLock.CreateRedisLock(i.Client, context.Background(), request.Dlock, value.String(), 300) + ctx := context.Background() + + lock := redisLock.CreateRedisLock(i.Client, ctx, request.Dlock, value.String(), 100) lock.Lock() time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond) - get := i.Client.Get(context.Background(), "inventory") + get := i.Client.Get(ctx, "inventory") log.Printf("%s\n", get) inventory, err := strconv.Atoi(get.Val()) @@ -38,7 +39,20 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro } inventory = inventory - 1 if inventory >= 0 { - i.Client.Set(context.Background(), "inventory", fmt.Sprintf("%d", inventory), redis.KeepTTL) + //这里应该使用lua脚本,再检测一下当前是否获取到锁,获取到之后再进行扣减。类似双检加锁的思想,因为此时可能会续期失败。 + //i.Client.Set(ctx, "inventory", fmt.Sprintf("%d", inventory), redis.KeepTTL) + script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('set',KEYS[2],ARGV[2]) + else return 0 end` + + val := i.Client.Eval(ctx, script, []string{request.Dlock, "inventory"}, value.String(), inventory).Val() + if res, ok := val.(string); ok && res == "OK" { + + } else { + //任务执行失败需要通知调用者,让调用者自己决策是否需要重新调度还是放弃,这样worker和主线程实现了解偶 + log.Println("error set inventory") + err = errors.New("inventory err") + goto Error + } } else { log.Printf("error DLock error :%d\n", inventory) err = errors.New("inventory err")