扣减库存使用lua脚本,保证锁存在时才可以操作。主程序对返回数据做校验,对错误任务进行重试。

This commit is contained in:
redhat 2025-02-14 11:22:35 +08:00
parent a031809960
commit ab0612d9b6
4 changed files with 26 additions and 9 deletions

View File

@ -26,9 +26,12 @@ func (e *FirstEngine) CreateWorker(in chan Request, out chan Response) {
req := <-in req := <-in
work, err := e.Work(req) work, err := e.Work(req)
if err != nil { if err != nil {
log.Printf("work err: %v", err) log.Printf("work:%v err: %v", req, err)
//出错之后让其他线程重新调度
e.Scheduler.Submit(req)
} else {
out <- work
} }
out <- work
} }
}() }()
} }

View File

@ -23,7 +23,7 @@ func main() {
e := engine.FirstEngine{ e := engine.FirstEngine{
Scheduler: &scheduler.FirstScheduler{}, Scheduler: &scheduler.FirstScheduler{},
WorkCount: 15, WorkCount: 5,
Work: client.CreateRpcWorker(createRpcPoll(*host)), Work: client.CreateRpcWorker(createRpcPoll(*host)),
} }

View File

@ -87,7 +87,7 @@ func (r *RedisLock) autoReNewExpire() {
<-r.lockKeepAliveCh <-r.lockKeepAliveCh
}() }()
log.Printf("auto renew start: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) //log.Printf("auto renew start: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
tc := time.NewTicker(time.Duration(r.kvt.ttl) * time.Millisecond / 5) tc := time.NewTicker(time.Duration(r.kvt.ttl) * time.Millisecond / 5)
defer tc.Stop() defer tc.Stop()

View File

@ -3,7 +3,6 @@ package worker
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"goRedisDLM/engine" "goRedisDLM/engine"
"goRedisDLM/redisLock" "goRedisDLM/redisLock"
"log" "log"
@ -22,14 +21,16 @@ type Inventory struct {
func (i *Inventory) Work(request engine.Request, response *engine.Response) error { func (i *Inventory) Work(request engine.Request, response *engine.Response) error {
value := uuid.New() value := uuid.New()
log.Printf("uuid: %s, lock key: %s\n", value.String(), request.Dlock) //log.Printf("uuid: %s, lock key: %s\n", value.String(), request.Dlock)
lock := redisLock.CreateRedisLock(i.Client, context.Background(), request.Dlock, value.String(), 300) ctx := context.Background()
lock := redisLock.CreateRedisLock(i.Client, ctx, request.Dlock, value.String(), 100)
lock.Lock() lock.Lock()
time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond) time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond)
get := i.Client.Get(context.Background(), "inventory") get := i.Client.Get(ctx, "inventory")
log.Printf("%s\n", get) log.Printf("%s\n", get)
inventory, err := strconv.Atoi(get.Val()) inventory, err := strconv.Atoi(get.Val())
@ -38,7 +39,20 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro
} }
inventory = inventory - 1 inventory = inventory - 1
if inventory >= 0 { if inventory >= 0 {
i.Client.Set(context.Background(), "inventory", fmt.Sprintf("%d", inventory), redis.KeepTTL) //这里应该使用lua脚本再检测一下当前是否获取到锁获取到之后再进行扣减。类似双检加锁的思想因为此时可能会续期失败。
//i.Client.Set(ctx, "inventory", fmt.Sprintf("%d", inventory), redis.KeepTTL)
script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('set',KEYS[2],ARGV[2])
else return 0 end`
val := i.Client.Eval(ctx, script, []string{request.Dlock, "inventory"}, value.String(), inventory).Val()
if res, ok := val.(string); ok && res == "OK" {
} else {
//任务执行失败需要通知调用者让调用者自己决策是否需要重新调度还是放弃这样worker和主线程实现了解偶
log.Println("error set inventory")
err = errors.New("inventory err")
goto Error
}
} else { } else {
log.Printf("error DLock error :%d\n", inventory) log.Printf("error DLock error :%d\n", inventory)
err = errors.New("inventory err") err = errors.New("inventory err")