diff --git a/go.mod b/go.mod index 44687d3..7e0b11f 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,14 @@ module goRedisDLM -go 1.22 +go 1.24 + +toolchain go1.24.2 require ( - github.com/cespare/xxhash/v2 v2.1.2 // indirect + git.zhangshuocauc.cn/redhat/goredislock v1.0.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-redis/redis/v8 v8.11.5 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/redis/go-redis/v9 v9.7.3 // indirect ) diff --git a/go.sum b/go.sum index 0e8a0f8..0e98b9c 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,14 @@ +git.zhangshuocauc.cn/redhat/goredislock v1.0.0 h1:Yf8GpOwOPPWfuOmiGypXndHcBtHvOW2rng2KK8sarVc= +git.zhangshuocauc.cn/redhat/goredislock v1.0.0/go.mod h1:IWVypCUU7RRc7x3aUQPPKMP2Z5foCgwLV6D0Rjp4j0E= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM= +github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= diff --git a/redisLock/lock.go b/redisLock/lock.go deleted file mode 100644 index 8c1fd88..0000000 --- a/redisLock/lock.go +++ /dev/null @@ -1,195 +0,0 @@ -package redisLock - -import ( - "context" - "errors" - "fmt" - "goRedisDLM/utils" - "log" - "sync/atomic" - "time" - - "github.com/go-redis/redis/v8" -) - -var ErrorNotMyLock = errors.New("not myLock") - -const REDISLOCKPREFIXKEY = "goRedisLock" - -func isErrorNotMyLock(err error) bool { - return errors.Is(err, ErrorNotMyLock) -} - -type RedisLockNew struct { - RedisLockOptions - client *redis.Client - ctx context.Context - key string - token string - - DogRunning int32 - stopDog context.CancelFunc -} - -func (r *RedisLockNew) IsHeldByCurrent() bool { - return r.client.Exists(r.ctx, r.getLockKey()).Val() == 1 -} - -func (r *RedisLockNew) getLockKey() string { - return REDISLOCKPREFIXKEY + "_" + r.key -} - -func (r *RedisLockNew) blockingLock() error { - // maxWaitSecond为0时认为永久阻塞 - var timeAfter <-chan time.Time - if r.maxWaitSecond > 0 { - timeAfter = time.After(time.Duration(r.maxWaitSecond) * time.Second) - } - - ticker := time.NewTicker(time.Duration(500) * time.Millisecond) - defer ticker.Stop() - - for range ticker.C { - select { - case <-r.ctx.Done(): - return r.ctx.Err() - case <-timeAfter: - return errors.New("timeout") - default: - } - - err := r.tryLock() - - if err == nil { - return nil - } - - if !isErrorNotMyLock(err) { - return err - } - } - - return nil -} - -func (r *RedisLockNew) tryLock() error { - result, err := r.client.SetNX(r.ctx, r.getLockKey(), r.token, - time.Duration(r.RedisLockOptions.expireSecond)*time.Second).Result() - - if err != nil { - return err - } - - if !result { - return fmt.Errorf("try lock result: %v, err: %w", result, ErrorNotMyLock) - } - - return nil -} - -func (r *RedisLockNew) expireLock() error { - result, err := r.client.Eval(r.ctx, LuaExpireLock, []string{r.getLockKey()}, - r.token, time.Duration(r.expireSecond)*time.Second).Result() - if err != nil { - return err - } - - if res, ok := result.(int64); ok && res == 1 { - return nil - } else { - return fmt.Errorf("unlock err: ok: %v, res:%v, raw:%v", ok, err, result) - } -} - -func (r *RedisLockNew) runWatchDog(ctx context.Context) { - ticker := time.NewTicker(time.Duration(10) * time.Second) - defer ticker.Stop() - - for range ticker.C { - select { - case <-ctx.Done(): - return - default: - } - - if err := r.expireLock(); err != nil { - log.Printf("expire lock err: %v", err) - } - } -} - -func (r *RedisLockNew) watchDog() { - if !r.isReNew { - return - } - - for !atomic.CompareAndSwapInt32(&r.DogRunning, 0, 1) { - } - - var ctx context.Context - ctx, r.stopDog = context.WithCancel(r.ctx) - go func() { - defer func() { - atomic.StoreInt32(&r.DogRunning, 0) - }() - - r.runWatchDog(ctx) - }() -} - -func (r *RedisLockNew) Lock() (err error) { - defer func() { - if err == nil { - r.watchDog() - } - }() - - err = r.tryLock() - if err == nil { - return nil - } - - if !isErrorNotMyLock(err) || !r.isBlock { - return err - } - - err = r.blockingLock() - - return err -} - -func (r *RedisLockNew) UnLock() error { - defer func() { - if r.stopDog != nil { - r.stopDog() - } - }() - - result, err := r.client.Eval(r.ctx, LuaUnLock, []string{r.getLockKey()}, r.token).Result() - if err != nil { - return err - } - - if res, ok := result.(int64); ok && res == 1 { - return nil - } else { - return fmt.Errorf("unlock err: ok: %v, res:%v, raw:%v", ok, err, result) - } -} - -func NewRedisLock(client *redis.Client, ctx context.Context, key string, opts ...RedisLockOption) *RedisLockNew { - lock := &RedisLockNew{ - client: client, - ctx: ctx, - key: key, - token: utils.GetProcessAndGoroutineIDStr(), - } - - for _, o := range opts { - o(&lock.RedisLockOptions) - } - - lock.repairOption() - - return lock -} diff --git a/redisLock/lock_test.go b/redisLock/lock_test.go deleted file mode 100644 index c73d2fa..0000000 --- a/redisLock/lock_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package redisLock - -import ( - "context" - "sync" - "testing" - - "github.com/go-redis/redis/v8" -) - -func Test_blockingLock(t *testing.T) { - addr := "192.168.8.1:6379" - passwd := "" - - client := redis.NewClient(&redis.Options{ - Addr: addr, - Password: passwd, - }) - - ctx := context.Background() - - lock1 := NewRedisLock(client, ctx, "test_key", WithExpireTime(10)) - lock2 := NewRedisLock(client, ctx, "test_key", - WithBlock(), WithExpireTime(5), WithMaxWaitTime(0)) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - var err error - defer wg.Done() - defer func() { - if err == nil { - if err = lock1.UnLock(); err != nil { - t.Error(err) - } - } - }() - if err = lock1.Lock(); err != nil { - t.Error(err) - return - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - defer func() { - if err := lock2.UnLock(); err != nil { - t.Error(err) - } - }() - if err := lock2.Lock(); err != nil { - t.Error(err) - return - } - }() - - wg.Wait() - - t.Log("success") -} diff --git a/redisLock/lua.go b/redisLock/lua.go deleted file mode 100644 index fb1debd..0000000 --- a/redisLock/lua.go +++ /dev/null @@ -1,13 +0,0 @@ -package redisLock - -const LuaUnLock = ` -if (redis.call('get',KEYS[1]) == ARGV[1]) -then return redis.call('del',KEYS[1]) -else return 0 -end` - -const LuaExpireLock = ` -if (redis.call('get',KEYS[1]) == ARGV[1]) -then return redis.call('expire',KEYS[1],ARGV[2]) -else return 0 -end` diff --git a/redisLock/option.go b/redisLock/option.go deleted file mode 100644 index 90d15be..0000000 --- a/redisLock/option.go +++ /dev/null @@ -1,71 +0,0 @@ -package redisLock - -const ( - DEFAULTMAXWAITSECOND = 5 - DEFAULTMAXEXPIRESECOND = 30 -) - -type RedisLockOptions struct { - isBlock bool - maxWaitSecond int - expireSecond int - isReNew bool -} - -func (r *RedisLockOptions) repairOption() { - //if r.isBlock && r.maxWaitSecond <= 0 { - // r.maxWaitSecond = DEFAULTMAXWAITSECOND - //} - - if r.expireSecond > 0 { - return - } - - r.expireSecond = DEFAULTMAXEXPIRESECOND - r.isReNew = true -} - -type RedisLockOption func(*RedisLockOptions) - -func WithBlock() RedisLockOption { - return func(o *RedisLockOptions) { - o.isBlock = true - } -} - -func WithMaxWaitTime(maxWaitTime int) RedisLockOption { - return func(o *RedisLockOptions) { - o.maxWaitSecond = maxWaitTime - } -} - -func WithExpireTime(expireTime int) RedisLockOption { - return func(o *RedisLockOptions) { - o.expireSecond = expireTime - } -} - -type RedLockOptions struct { - perHostTimeout int - expireHostTime int -} - -func (r *RedLockOptions) repairOption() { - if r.perHostTimeout <= 0 { - r.perHostTimeout = DEFAULTMAXWAITSECOND - } -} - -type RedLockOption func(*RedLockOptions) - -func WithPerHostTimeout(perHostTimeout int) RedLockOption { - return func(o *RedLockOptions) { - o.perHostTimeout = perHostTimeout - } -} - -func WithExpireHostTime(expireTime int) RedLockOption { - return func(o *RedLockOptions) { - o.expireHostTime = expireTime - } -} diff --git a/redisLock/redis.go b/redisLock/redis.go deleted file mode 100644 index 1862147..0000000 --- a/redisLock/redis.go +++ /dev/null @@ -1,118 +0,0 @@ -package redisLock - -import ( - "context" - "log" - "time" - - "github.com/google/uuid" - - "github.com/go-redis/redis/v8" -) - -type redisCommInfo struct { - key string - value string - ttl int -} - -type RedisLock struct { - client *redis.Client - ctx context.Context - lockKeepAliveCh chan struct{} - lockKeepDoneCh chan struct{} - kvt redisCommInfo -} - -func CreateRedisLock(c *redis.Client, ctx context.Context, k string, ttl int) *RedisLock { - return &RedisLock{c, ctx, make(chan struct{}, 1), make(chan struct{}), redisCommInfo{k, uuid.New().String(), ttl}} -} - -func (r *RedisLock) Lock() { - //for r.client.SetNX(r.ctx, r.kvt.key, r.kvt.value, time.Duration(r.kvt.ttl)*time.Second).Val() == false { - // time.Sleep(time.Millisecond * 20) - //} - - script := `if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then - redis.call('hincrby',KEYS[1],ARGV[1],1) - redis.call('pexpire',KEYS[1],ARGV[2]) - return 1 - else - return 0 - end` - - for r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value, r.kvt.ttl).Val().(int64) == 0 { - time.Sleep(time.Millisecond * 20) - } - - select { - case r.lockKeepAliveCh <- struct{}{}: - go r.autoReNewExpire() - default: - log.Printf("start renew expore err: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) - } - - log.Printf("lock ok: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) -} - -func (r *RedisLock) UnLock() { - //if r.client.Get(r.ctx, r.kvt.key).Val() == r.kvt.value { - // r.client.Del(r.ctx, r.kvt.key) - //} - - //script := "if (redis.call('get',KEYS[1]) == ARGV[1]) then return redis.call('del',KEYS[1]) else return 0 end" - //r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value) - - script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then return nil - elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) - else return 0 end` - - r.lockKeepDoneCh <- struct{}{} //这里一定要放到if外面,并且进来就要退掉定时线程,开始死锁是因为续期失败 - //导致续期协程return掉,再往lockKeepDoneCh写数据时就会死锁。而且不能只在判断解锁成功时给该管道发送数据。如果解锁失败不掉用写数据 - //会导致续期协程永不退出。ttl时间不能太短,太短可能会导致续期还没有完成key过期。30ms会有问题。网络环境差该值还要继续增大。 - - //for len(r.lockKeepAliveCh) > 0 { - // //<-r.lockKeepAliveCh - //} - - val := r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value).Val() - - if i, ok := val.(int64); ok && i == 1 { - log.Printf("unlock ok: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) - } else { - log.Printf("unlock err: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) - } -} - -func (r *RedisLock) IsHeldByCurrent() bool { - return r.client.HExists(r.ctx, r.kvt.key, r.kvt.value).Val() -} - -func (r *RedisLock) autoReNewExpire() { - defer func() { - <-r.lockKeepAliveCh - }() - - //log.Printf("auto renew start: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) - - tc := time.NewTicker(time.Duration(r.kvt.ttl) * time.Millisecond / 5) - defer tc.Stop() - - for { - select { - case <-tc.C: - script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('pexpire',KEYS[1],ARGV[2]) - else return 0 end` - - val := r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value, r.kvt.ttl).Val() - - if i, ok := val.(int64); !ok || i == 0 { - log.Printf("auto renew err: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) - //return - } - case <-r.lockKeepDoneCh: - log.Printf("auto renew down: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) - return - } - } -} diff --git a/redisLock/redlock.go b/redisLock/redlock.go deleted file mode 100644 index d20cbdc..0000000 --- a/redisLock/redlock.go +++ /dev/null @@ -1,81 +0,0 @@ -package redisLock - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/go-redis/redis/v8" -) - -type RedLock struct { - locks []*RedisLockNew - RedLockOptions -} - -func (r *RedLock) UnLock() (err error) { - for _, lock := range r.locks { - if _error := lock.UnLock(); _error != nil { - if err == nil { - err = _error - } - } - } - - return -} - -func (r *RedLock) Lock() error { - successLock := 0 - for _, lock := range r.locks { - now := time.Now() - err := lock.Lock() - since := time.Since(now) - if err == nil && since < time.Duration(r.perHostTimeout) { - successLock++ - } - } - - if successLock < len(r.locks)>>1+1 { - return errors.New("redis lock timeout") - } - - return nil -} - -type RedisHost struct { - NetWork string - Host string - Pass string -} - -func NewRedLock(key string, hosts []*RedisHost, opts ...RedLockOption) (*RedLock, error) { - if len(hosts) < 3 { - return nil, fmt.Errorf("invalid redis host length %d", len(hosts)) - } - - l := &RedLock{} - - for _, opt := range opts { - opt(&l.RedLockOptions) - } - - l.repairOption() - if len(hosts)*l.perHostTimeout*10 > l.expireHostTime { - return nil, fmt.Errorf("invalid redis host length %d", len(hosts)*l.perHostTimeout*10) - } - - l.locks = make([]*RedisLockNew, 0, len(hosts)) - - for _, host := range hosts { - client := redis.NewClient(&redis.Options{ - Network: host.NetWork, - Addr: host.Host, - Password: host.Pass, - }) - l.locks = append(l.locks, NewRedisLock(client, context.Background(), key, WithExpireTime(5))) - } - - return l, nil -} diff --git a/worker/rpc.go b/worker/rpc.go index 3a2bd01..2e0d4c6 100644 --- a/worker/rpc.go +++ b/worker/rpc.go @@ -5,11 +5,12 @@ import ( "errors" "fmt" "goRedisDLM/engine" - "goRedisDLM/redisLock" "log" "strconv" - "github.com/go-redis/redis/v8" + "git.zhangshuocauc.cn/redhat/goredislock" + + "github.com/redis/go-redis/v9" ) type Inventory struct { @@ -20,9 +21,9 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro ctx := context.Background() //lock := redisLock.CreateRedisLock(i.Client, ctx, request.Dlock, 100) - lock := redisLock.NewRedisLock(i.Client, ctx, "test_key", redisLock.WithBlock(), - redisLock.WithMaxWaitTime(20), redisLock.WithExpireTime(5)) - lock.Lock() + lock := goredislock.NewRedisLock(i.Client, "test_key", goredislock.WithBlock(), + goredislock.WithWaitTime(20), goredislock.WithExpireTime(5)) + lock.Lock(ctx) //time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond) @@ -42,7 +43,7 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro // //val := i.Client.Eval(ctx, script, []string{request.Dlock, "inventory"}, value.String(), inventory).Val() - if lock.IsHeldByCurrent() { + if true { setString := fmt.Sprintf("%d", inventory) if i.Client.Set(ctx, "inventory", setString, redis.KeepTTL).Val() != "OK" { log.Println("error set inventory") @@ -60,13 +61,13 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro err = errors.New("inventory err") goto Error } - lock.UnLock() + lock.Unlock(ctx) response.Id = request.Id return nil Error: - lock.UnLock() + lock.Unlock(ctx) return err } diff --git a/worker/service/main.go b/worker/service/main.go index c1d9488..76baab0 100644 --- a/worker/service/main.go +++ b/worker/service/main.go @@ -7,7 +7,7 @@ import ( "log" _ "net/http/pprof" - "github.com/go-redis/redis/v8" + "github.com/redis/go-redis/v9" ) var port = flag.String("port", "", "port")