1、将redis锁抽离成sdk

This commit is contained in:
redhat 2025-04-18 16:37:26 +08:00
parent 78ef85201a
commit c648588cc5
10 changed files with 22 additions and 550 deletions

8
go.mod
View File

@ -1,10 +1,14 @@
module goRedisDLM module goRedisDLM
go 1.22 go 1.24
toolchain go1.24.2
require ( require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect git.zhangshuocauc.cn/redhat/goredislock v1.0.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/google/uuid v1.6.0 // indirect github.com/google/uuid v1.6.0 // indirect
github.com/redis/go-redis/v9 v9.7.3 // indirect
) )

6
go.sum
View File

@ -1,8 +1,14 @@
git.zhangshuocauc.cn/redhat/goredislock v1.0.0 h1:Yf8GpOwOPPWfuOmiGypXndHcBtHvOW2rng2KK8sarVc=
git.zhangshuocauc.cn/redhat/goredislock v1.0.0/go.mod h1:IWVypCUU7RRc7x3aUQPPKMP2Z5foCgwLV6D0Rjp4j0E=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM=
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=

View File

@ -1,195 +0,0 @@
package redisLock
import (
"context"
"errors"
"fmt"
"goRedisDLM/utils"
"log"
"sync/atomic"
"time"
"github.com/go-redis/redis/v8"
)
var ErrorNotMyLock = errors.New("not myLock")
const REDISLOCKPREFIXKEY = "goRedisLock"
func isErrorNotMyLock(err error) bool {
return errors.Is(err, ErrorNotMyLock)
}
type RedisLockNew struct {
RedisLockOptions
client *redis.Client
ctx context.Context
key string
token string
DogRunning int32
stopDog context.CancelFunc
}
func (r *RedisLockNew) IsHeldByCurrent() bool {
return r.client.Exists(r.ctx, r.getLockKey()).Val() == 1
}
func (r *RedisLockNew) getLockKey() string {
return REDISLOCKPREFIXKEY + "_" + r.key
}
func (r *RedisLockNew) blockingLock() error {
// maxWaitSecond为0时认为永久阻塞
var timeAfter <-chan time.Time
if r.maxWaitSecond > 0 {
timeAfter = time.After(time.Duration(r.maxWaitSecond) * time.Second)
}
ticker := time.NewTicker(time.Duration(500) * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
select {
case <-r.ctx.Done():
return r.ctx.Err()
case <-timeAfter:
return errors.New("timeout")
default:
}
err := r.tryLock()
if err == nil {
return nil
}
if !isErrorNotMyLock(err) {
return err
}
}
return nil
}
func (r *RedisLockNew) tryLock() error {
result, err := r.client.SetNX(r.ctx, r.getLockKey(), r.token,
time.Duration(r.RedisLockOptions.expireSecond)*time.Second).Result()
if err != nil {
return err
}
if !result {
return fmt.Errorf("try lock result: %v, err: %w", result, ErrorNotMyLock)
}
return nil
}
func (r *RedisLockNew) expireLock() error {
result, err := r.client.Eval(r.ctx, LuaExpireLock, []string{r.getLockKey()},
r.token, time.Duration(r.expireSecond)*time.Second).Result()
if err != nil {
return err
}
if res, ok := result.(int64); ok && res == 1 {
return nil
} else {
return fmt.Errorf("unlock err: ok: %v, res:%v, raw:%v", ok, err, result)
}
}
func (r *RedisLockNew) runWatchDog(ctx context.Context) {
ticker := time.NewTicker(time.Duration(10) * time.Second)
defer ticker.Stop()
for range ticker.C {
select {
case <-ctx.Done():
return
default:
}
if err := r.expireLock(); err != nil {
log.Printf("expire lock err: %v", err)
}
}
}
func (r *RedisLockNew) watchDog() {
if !r.isReNew {
return
}
for !atomic.CompareAndSwapInt32(&r.DogRunning, 0, 1) {
}
var ctx context.Context
ctx, r.stopDog = context.WithCancel(r.ctx)
go func() {
defer func() {
atomic.StoreInt32(&r.DogRunning, 0)
}()
r.runWatchDog(ctx)
}()
}
func (r *RedisLockNew) Lock() (err error) {
defer func() {
if err == nil {
r.watchDog()
}
}()
err = r.tryLock()
if err == nil {
return nil
}
if !isErrorNotMyLock(err) || !r.isBlock {
return err
}
err = r.blockingLock()
return err
}
func (r *RedisLockNew) UnLock() error {
defer func() {
if r.stopDog != nil {
r.stopDog()
}
}()
result, err := r.client.Eval(r.ctx, LuaUnLock, []string{r.getLockKey()}, r.token).Result()
if err != nil {
return err
}
if res, ok := result.(int64); ok && res == 1 {
return nil
} else {
return fmt.Errorf("unlock err: ok: %v, res:%v, raw:%v", ok, err, result)
}
}
func NewRedisLock(client *redis.Client, ctx context.Context, key string, opts ...RedisLockOption) *RedisLockNew {
lock := &RedisLockNew{
client: client,
ctx: ctx,
key: key,
token: utils.GetProcessAndGoroutineIDStr(),
}
for _, o := range opts {
o(&lock.RedisLockOptions)
}
lock.repairOption()
return lock
}

View File

@ -1,61 +0,0 @@
package redisLock
import (
"context"
"sync"
"testing"
"github.com/go-redis/redis/v8"
)
func Test_blockingLock(t *testing.T) {
addr := "192.168.8.1:6379"
passwd := ""
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: passwd,
})
ctx := context.Background()
lock1 := NewRedisLock(client, ctx, "test_key", WithExpireTime(10))
lock2 := NewRedisLock(client, ctx, "test_key",
WithBlock(), WithExpireTime(5), WithMaxWaitTime(0))
var wg sync.WaitGroup
wg.Add(1)
go func() {
var err error
defer wg.Done()
defer func() {
if err == nil {
if err = lock1.UnLock(); err != nil {
t.Error(err)
}
}
}()
if err = lock1.Lock(); err != nil {
t.Error(err)
return
}
}()
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if err := lock2.UnLock(); err != nil {
t.Error(err)
}
}()
if err := lock2.Lock(); err != nil {
t.Error(err)
return
}
}()
wg.Wait()
t.Log("success")
}

View File

@ -1,13 +0,0 @@
package redisLock
const LuaUnLock = `
if (redis.call('get',KEYS[1]) == ARGV[1])
then return redis.call('del',KEYS[1])
else return 0
end`
const LuaExpireLock = `
if (redis.call('get',KEYS[1]) == ARGV[1])
then return redis.call('expire',KEYS[1],ARGV[2])
else return 0
end`

View File

@ -1,71 +0,0 @@
package redisLock
const (
DEFAULTMAXWAITSECOND = 5
DEFAULTMAXEXPIRESECOND = 30
)
type RedisLockOptions struct {
isBlock bool
maxWaitSecond int
expireSecond int
isReNew bool
}
func (r *RedisLockOptions) repairOption() {
//if r.isBlock && r.maxWaitSecond <= 0 {
// r.maxWaitSecond = DEFAULTMAXWAITSECOND
//}
if r.expireSecond > 0 {
return
}
r.expireSecond = DEFAULTMAXEXPIRESECOND
r.isReNew = true
}
type RedisLockOption func(*RedisLockOptions)
func WithBlock() RedisLockOption {
return func(o *RedisLockOptions) {
o.isBlock = true
}
}
func WithMaxWaitTime(maxWaitTime int) RedisLockOption {
return func(o *RedisLockOptions) {
o.maxWaitSecond = maxWaitTime
}
}
func WithExpireTime(expireTime int) RedisLockOption {
return func(o *RedisLockOptions) {
o.expireSecond = expireTime
}
}
type RedLockOptions struct {
perHostTimeout int
expireHostTime int
}
func (r *RedLockOptions) repairOption() {
if r.perHostTimeout <= 0 {
r.perHostTimeout = DEFAULTMAXWAITSECOND
}
}
type RedLockOption func(*RedLockOptions)
func WithPerHostTimeout(perHostTimeout int) RedLockOption {
return func(o *RedLockOptions) {
o.perHostTimeout = perHostTimeout
}
}
func WithExpireHostTime(expireTime int) RedLockOption {
return func(o *RedLockOptions) {
o.expireHostTime = expireTime
}
}

View File

@ -1,118 +0,0 @@
package redisLock
import (
"context"
"log"
"time"
"github.com/google/uuid"
"github.com/go-redis/redis/v8"
)
type redisCommInfo struct {
key string
value string
ttl int
}
type RedisLock struct {
client *redis.Client
ctx context.Context
lockKeepAliveCh chan struct{}
lockKeepDoneCh chan struct{}
kvt redisCommInfo
}
func CreateRedisLock(c *redis.Client, ctx context.Context, k string, ttl int) *RedisLock {
return &RedisLock{c, ctx, make(chan struct{}, 1), make(chan struct{}), redisCommInfo{k, uuid.New().String(), ttl}}
}
func (r *RedisLock) Lock() {
//for r.client.SetNX(r.ctx, r.kvt.key, r.kvt.value, time.Duration(r.kvt.ttl)*time.Second).Val() == false {
// time.Sleep(time.Millisecond * 20)
//}
script := `if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then
redis.call('hincrby',KEYS[1],ARGV[1],1)
redis.call('pexpire',KEYS[1],ARGV[2])
return 1
else
return 0
end`
for r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value, r.kvt.ttl).Val().(int64) == 0 {
time.Sleep(time.Millisecond * 20)
}
select {
case r.lockKeepAliveCh <- struct{}{}:
go r.autoReNewExpire()
default:
log.Printf("start renew expore err: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
}
log.Printf("lock ok: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
}
func (r *RedisLock) UnLock() {
//if r.client.Get(r.ctx, r.kvt.key).Val() == r.kvt.value {
// r.client.Del(r.ctx, r.kvt.key)
//}
//script := "if (redis.call('get',KEYS[1]) == ARGV[1]) then return redis.call('del',KEYS[1]) else return 0 end"
//r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value)
script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then return nil
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1])
else return 0 end`
r.lockKeepDoneCh <- struct{}{} //这里一定要放到if外面并且进来就要退掉定时线程开始死锁是因为续期失败
//导致续期协程return掉再往lockKeepDoneCh写数据时就会死锁。而且不能只在判断解锁成功时给该管道发送数据。如果解锁失败不掉用写数据
//会导致续期协程永不退出。ttl时间不能太短太短可能会导致续期还没有完成key过期。30ms会有问题。网络环境差该值还要继续增大。
//for len(r.lockKeepAliveCh) > 0 {
// //<-r.lockKeepAliveCh
//}
val := r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value).Val()
if i, ok := val.(int64); ok && i == 1 {
log.Printf("unlock ok: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
} else {
log.Printf("unlock err: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
}
}
func (r *RedisLock) IsHeldByCurrent() bool {
return r.client.HExists(r.ctx, r.kvt.key, r.kvt.value).Val()
}
func (r *RedisLock) autoReNewExpire() {
defer func() {
<-r.lockKeepAliveCh
}()
//log.Printf("auto renew start: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
tc := time.NewTicker(time.Duration(r.kvt.ttl) * time.Millisecond / 5)
defer tc.Stop()
for {
select {
case <-tc.C:
script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('pexpire',KEYS[1],ARGV[2])
else return 0 end`
val := r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value, r.kvt.ttl).Val()
if i, ok := val.(int64); !ok || i == 0 {
log.Printf("auto renew err: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
//return
}
case <-r.lockKeepDoneCh:
log.Printf("auto renew down: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
return
}
}
}

View File

@ -1,81 +0,0 @@
package redisLock
import (
"context"
"errors"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
type RedLock struct {
locks []*RedisLockNew
RedLockOptions
}
func (r *RedLock) UnLock() (err error) {
for _, lock := range r.locks {
if _error := lock.UnLock(); _error != nil {
if err == nil {
err = _error
}
}
}
return
}
func (r *RedLock) Lock() error {
successLock := 0
for _, lock := range r.locks {
now := time.Now()
err := lock.Lock()
since := time.Since(now)
if err == nil && since < time.Duration(r.perHostTimeout) {
successLock++
}
}
if successLock < len(r.locks)>>1+1 {
return errors.New("redis lock timeout")
}
return nil
}
type RedisHost struct {
NetWork string
Host string
Pass string
}
func NewRedLock(key string, hosts []*RedisHost, opts ...RedLockOption) (*RedLock, error) {
if len(hosts) < 3 {
return nil, fmt.Errorf("invalid redis host length %d", len(hosts))
}
l := &RedLock{}
for _, opt := range opts {
opt(&l.RedLockOptions)
}
l.repairOption()
if len(hosts)*l.perHostTimeout*10 > l.expireHostTime {
return nil, fmt.Errorf("invalid redis host length %d", len(hosts)*l.perHostTimeout*10)
}
l.locks = make([]*RedisLockNew, 0, len(hosts))
for _, host := range hosts {
client := redis.NewClient(&redis.Options{
Network: host.NetWork,
Addr: host.Host,
Password: host.Pass,
})
l.locks = append(l.locks, NewRedisLock(client, context.Background(), key, WithExpireTime(5)))
}
return l, nil
}

View File

@ -5,11 +5,12 @@ import (
"errors" "errors"
"fmt" "fmt"
"goRedisDLM/engine" "goRedisDLM/engine"
"goRedisDLM/redisLock"
"log" "log"
"strconv" "strconv"
"github.com/go-redis/redis/v8" "git.zhangshuocauc.cn/redhat/goredislock"
"github.com/redis/go-redis/v9"
) )
type Inventory struct { type Inventory struct {
@ -20,9 +21,9 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro
ctx := context.Background() ctx := context.Background()
//lock := redisLock.CreateRedisLock(i.Client, ctx, request.Dlock, 100) //lock := redisLock.CreateRedisLock(i.Client, ctx, request.Dlock, 100)
lock := redisLock.NewRedisLock(i.Client, ctx, "test_key", redisLock.WithBlock(), lock := goredislock.NewRedisLock(i.Client, "test_key", goredislock.WithBlock(),
redisLock.WithMaxWaitTime(20), redisLock.WithExpireTime(5)) goredislock.WithWaitTime(20), goredislock.WithExpireTime(5))
lock.Lock() lock.Lock(ctx)
//time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond) //time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond)
@ -42,7 +43,7 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro
// //
//val := i.Client.Eval(ctx, script, []string{request.Dlock, "inventory"}, value.String(), inventory).Val() //val := i.Client.Eval(ctx, script, []string{request.Dlock, "inventory"}, value.String(), inventory).Val()
if lock.IsHeldByCurrent() { if true {
setString := fmt.Sprintf("%d", inventory) setString := fmt.Sprintf("%d", inventory)
if i.Client.Set(ctx, "inventory", setString, redis.KeepTTL).Val() != "OK" { if i.Client.Set(ctx, "inventory", setString, redis.KeepTTL).Val() != "OK" {
log.Println("error set inventory") log.Println("error set inventory")
@ -60,13 +61,13 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro
err = errors.New("inventory err") err = errors.New("inventory err")
goto Error goto Error
} }
lock.UnLock() lock.Unlock(ctx)
response.Id = request.Id response.Id = request.Id
return nil return nil
Error: Error:
lock.UnLock() lock.Unlock(ctx)
return err return err
} }

View File

@ -7,7 +7,7 @@ import (
"log" "log"
_ "net/http/pprof" _ "net/http/pprof"
"github.com/go-redis/redis/v8" "github.com/redis/go-redis/v9"
) )
var port = flag.String("port", "", "port") var port = flag.String("port", "", "port")