diff --git a/engine/firstengine.go b/engine/firstengine.go index 664e1dd..09aed02 100644 --- a/engine/firstengine.go +++ b/engine/firstengine.go @@ -26,9 +26,12 @@ func (e *FirstEngine) CreateWorker(in chan Request, out chan Response) { req := <-in work, err := e.Work(req) if err != nil { - log.Printf("work err: %v", err) + log.Printf("work:%v err: %v", req, err) + //出错之后让其他线程重新调度 + e.Scheduler.Submit(req) + } else { + out <- work } - out <- work } }() } diff --git a/main.go b/main.go index b1722bb..2dda547 100644 --- a/main.go +++ b/main.go @@ -23,7 +23,7 @@ func main() { e := engine.FirstEngine{ Scheduler: &scheduler.FirstScheduler{}, - WorkCount: 15, + WorkCount: 5, Work: client.CreateRpcWorker(createRpcPoll(*host)), } diff --git a/redisLock/redis.go b/redisLock/redis.go index 6f5150b..794581d 100644 --- a/redisLock/redis.go +++ b/redisLock/redis.go @@ -87,7 +87,7 @@ func (r *RedisLock) autoReNewExpire() { <-r.lockKeepAliveCh }() - log.Printf("auto renew start: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) + //log.Printf("auto renew start: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) tc := time.NewTicker(time.Duration(r.kvt.ttl) * time.Millisecond / 5) defer tc.Stop() diff --git a/worker/rpc.go b/worker/rpc.go index ac63ba4..f7218ea 100644 --- a/worker/rpc.go +++ b/worker/rpc.go @@ -3,7 +3,6 @@ package worker import ( "context" "errors" - "fmt" "goRedisDLM/engine" "goRedisDLM/redisLock" "log" @@ -22,14 +21,16 @@ type Inventory struct { func (i *Inventory) Work(request engine.Request, response *engine.Response) error { value := uuid.New() - log.Printf("uuid: %s, lock key: %s\n", value.String(), request.Dlock) + //log.Printf("uuid: %s, lock key: %s\n", value.String(), request.Dlock) - lock := redisLock.CreateRedisLock(i.Client, context.Background(), request.Dlock, value.String(), 300) + ctx := context.Background() + + lock := redisLock.CreateRedisLock(i.Client, ctx, request.Dlock, value.String(), 100) lock.Lock() time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond) - get := i.Client.Get(context.Background(), "inventory") + get := i.Client.Get(ctx, "inventory") log.Printf("%s\n", get) inventory, err := strconv.Atoi(get.Val()) @@ -38,7 +39,20 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro } inventory = inventory - 1 if inventory >= 0 { - i.Client.Set(context.Background(), "inventory", fmt.Sprintf("%d", inventory), redis.KeepTTL) + //这里应该使用lua脚本,再检测一下当前是否获取到锁,获取到之后再进行扣减。类似双检加锁的思想,因为此时可能会续期失败。 + //i.Client.Set(ctx, "inventory", fmt.Sprintf("%d", inventory), redis.KeepTTL) + script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('set',KEYS[2],ARGV[2]) + else return 0 end` + + val := i.Client.Eval(ctx, script, []string{request.Dlock, "inventory"}, value.String(), inventory).Val() + if res, ok := val.(string); ok && res == "OK" { + + } else { + //任务执行失败需要通知调用者,让调用者自己决策是否需要重新调度还是放弃,这样worker和主线程实现了解偶 + log.Println("error set inventory") + err = errors.New("inventory err") + goto Error + } } else { log.Printf("error DLock error :%d\n", inventory) err = errors.New("inventory err")