diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml new file mode 100644 index 0000000..a55e7a1 --- /dev/null +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -0,0 +1,5 @@ + + + + \ No newline at end of file diff --git a/.idea/goRedisDLM.iml b/.idea/goRedisDLM.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/goRedisDLM.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..bac1210 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/watcherTasks.xml b/.idea/watcherTasks.xml new file mode 100644 index 0000000..cb1a783 --- /dev/null +++ b/.idea/watcherTasks.xml @@ -0,0 +1,8 @@ + + + + + + + \ No newline at end of file diff --git a/engine/firstengine.go b/engine/firstengine.go index 09aed02..0803b56 100644 --- a/engine/firstengine.go +++ b/engine/firstengine.go @@ -1,6 +1,7 @@ package engine import ( + "context" "log" "strconv" "time" @@ -19,6 +20,8 @@ type Scheduler interface { WorkReady(chan Request) } +var ctx context.Context + func (e *FirstEngine) CreateWorker(in chan Request, out chan Response) { go func() { for { diff --git a/redisLock/lock.go b/redisLock/lock.go new file mode 100644 index 0000000..8c1fd88 --- /dev/null +++ b/redisLock/lock.go @@ -0,0 +1,195 @@ +package redisLock + +import ( + "context" + "errors" + "fmt" + "goRedisDLM/utils" + "log" + "sync/atomic" + "time" + + "github.com/go-redis/redis/v8" +) + +var ErrorNotMyLock = errors.New("not myLock") + +const REDISLOCKPREFIXKEY = "goRedisLock" + +func isErrorNotMyLock(err error) bool { + return errors.Is(err, ErrorNotMyLock) +} + +type RedisLockNew struct { + RedisLockOptions + client *redis.Client + ctx context.Context + key string + token string + + DogRunning int32 + stopDog context.CancelFunc +} + +func (r *RedisLockNew) IsHeldByCurrent() bool { + return r.client.Exists(r.ctx, r.getLockKey()).Val() == 1 +} + +func (r *RedisLockNew) getLockKey() string { + return REDISLOCKPREFIXKEY + "_" + r.key +} + +func (r *RedisLockNew) blockingLock() error { + // maxWaitSecond为0时认为永久阻塞 + var timeAfter <-chan time.Time + if r.maxWaitSecond > 0 { + timeAfter = time.After(time.Duration(r.maxWaitSecond) * time.Second) + } + + ticker := time.NewTicker(time.Duration(500) * time.Millisecond) + defer ticker.Stop() + + for range ticker.C { + select { + case <-r.ctx.Done(): + return r.ctx.Err() + case <-timeAfter: + return errors.New("timeout") + default: + } + + err := r.tryLock() + + if err == nil { + return nil + } + + if !isErrorNotMyLock(err) { + return err + } + } + + return nil +} + +func (r *RedisLockNew) tryLock() error { + result, err := r.client.SetNX(r.ctx, r.getLockKey(), r.token, + time.Duration(r.RedisLockOptions.expireSecond)*time.Second).Result() + + if err != nil { + return err + } + + if !result { + return fmt.Errorf("try lock result: %v, err: %w", result, ErrorNotMyLock) + } + + return nil +} + +func (r *RedisLockNew) expireLock() error { + result, err := r.client.Eval(r.ctx, LuaExpireLock, []string{r.getLockKey()}, + r.token, time.Duration(r.expireSecond)*time.Second).Result() + if err != nil { + return err + } + + if res, ok := result.(int64); ok && res == 1 { + return nil + } else { + return fmt.Errorf("unlock err: ok: %v, res:%v, raw:%v", ok, err, result) + } +} + +func (r *RedisLockNew) runWatchDog(ctx context.Context) { + ticker := time.NewTicker(time.Duration(10) * time.Second) + defer ticker.Stop() + + for range ticker.C { + select { + case <-ctx.Done(): + return + default: + } + + if err := r.expireLock(); err != nil { + log.Printf("expire lock err: %v", err) + } + } +} + +func (r *RedisLockNew) watchDog() { + if !r.isReNew { + return + } + + for !atomic.CompareAndSwapInt32(&r.DogRunning, 0, 1) { + } + + var ctx context.Context + ctx, r.stopDog = context.WithCancel(r.ctx) + go func() { + defer func() { + atomic.StoreInt32(&r.DogRunning, 0) + }() + + r.runWatchDog(ctx) + }() +} + +func (r *RedisLockNew) Lock() (err error) { + defer func() { + if err == nil { + r.watchDog() + } + }() + + err = r.tryLock() + if err == nil { + return nil + } + + if !isErrorNotMyLock(err) || !r.isBlock { + return err + } + + err = r.blockingLock() + + return err +} + +func (r *RedisLockNew) UnLock() error { + defer func() { + if r.stopDog != nil { + r.stopDog() + } + }() + + result, err := r.client.Eval(r.ctx, LuaUnLock, []string{r.getLockKey()}, r.token).Result() + if err != nil { + return err + } + + if res, ok := result.(int64); ok && res == 1 { + return nil + } else { + return fmt.Errorf("unlock err: ok: %v, res:%v, raw:%v", ok, err, result) + } +} + +func NewRedisLock(client *redis.Client, ctx context.Context, key string, opts ...RedisLockOption) *RedisLockNew { + lock := &RedisLockNew{ + client: client, + ctx: ctx, + key: key, + token: utils.GetProcessAndGoroutineIDStr(), + } + + for _, o := range opts { + o(&lock.RedisLockOptions) + } + + lock.repairOption() + + return lock +} diff --git a/redisLock/lock_test.go b/redisLock/lock_test.go new file mode 100644 index 0000000..c73d2fa --- /dev/null +++ b/redisLock/lock_test.go @@ -0,0 +1,61 @@ +package redisLock + +import ( + "context" + "sync" + "testing" + + "github.com/go-redis/redis/v8" +) + +func Test_blockingLock(t *testing.T) { + addr := "192.168.8.1:6379" + passwd := "" + + client := redis.NewClient(&redis.Options{ + Addr: addr, + Password: passwd, + }) + + ctx := context.Background() + + lock1 := NewRedisLock(client, ctx, "test_key", WithExpireTime(10)) + lock2 := NewRedisLock(client, ctx, "test_key", + WithBlock(), WithExpireTime(5), WithMaxWaitTime(0)) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + var err error + defer wg.Done() + defer func() { + if err == nil { + if err = lock1.UnLock(); err != nil { + t.Error(err) + } + } + }() + if err = lock1.Lock(); err != nil { + t.Error(err) + return + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer func() { + if err := lock2.UnLock(); err != nil { + t.Error(err) + } + }() + if err := lock2.Lock(); err != nil { + t.Error(err) + return + } + }() + + wg.Wait() + + t.Log("success") +} diff --git a/redisLock/lua.go b/redisLock/lua.go new file mode 100644 index 0000000..fb1debd --- /dev/null +++ b/redisLock/lua.go @@ -0,0 +1,13 @@ +package redisLock + +const LuaUnLock = ` +if (redis.call('get',KEYS[1]) == ARGV[1]) +then return redis.call('del',KEYS[1]) +else return 0 +end` + +const LuaExpireLock = ` +if (redis.call('get',KEYS[1]) == ARGV[1]) +then return redis.call('expire',KEYS[1],ARGV[2]) +else return 0 +end` diff --git a/redisLock/option.go b/redisLock/option.go new file mode 100644 index 0000000..90d15be --- /dev/null +++ b/redisLock/option.go @@ -0,0 +1,71 @@ +package redisLock + +const ( + DEFAULTMAXWAITSECOND = 5 + DEFAULTMAXEXPIRESECOND = 30 +) + +type RedisLockOptions struct { + isBlock bool + maxWaitSecond int + expireSecond int + isReNew bool +} + +func (r *RedisLockOptions) repairOption() { + //if r.isBlock && r.maxWaitSecond <= 0 { + // r.maxWaitSecond = DEFAULTMAXWAITSECOND + //} + + if r.expireSecond > 0 { + return + } + + r.expireSecond = DEFAULTMAXEXPIRESECOND + r.isReNew = true +} + +type RedisLockOption func(*RedisLockOptions) + +func WithBlock() RedisLockOption { + return func(o *RedisLockOptions) { + o.isBlock = true + } +} + +func WithMaxWaitTime(maxWaitTime int) RedisLockOption { + return func(o *RedisLockOptions) { + o.maxWaitSecond = maxWaitTime + } +} + +func WithExpireTime(expireTime int) RedisLockOption { + return func(o *RedisLockOptions) { + o.expireSecond = expireTime + } +} + +type RedLockOptions struct { + perHostTimeout int + expireHostTime int +} + +func (r *RedLockOptions) repairOption() { + if r.perHostTimeout <= 0 { + r.perHostTimeout = DEFAULTMAXWAITSECOND + } +} + +type RedLockOption func(*RedLockOptions) + +func WithPerHostTimeout(perHostTimeout int) RedLockOption { + return func(o *RedLockOptions) { + o.perHostTimeout = perHostTimeout + } +} + +func WithExpireHostTime(expireTime int) RedLockOption { + return func(o *RedLockOptions) { + o.expireHostTime = expireTime + } +} diff --git a/redisLock/redis.go b/redisLock/redis.go index 09d2939..1862147 100644 --- a/redisLock/redis.go +++ b/redisLock/redis.go @@ -55,7 +55,7 @@ func (r *RedisLock) Lock() { log.Printf("lock ok: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) } -func (r *RedisLock) Unlock() { +func (r *RedisLock) UnLock() { //if r.client.Get(r.ctx, r.kvt.key).Val() == r.kvt.value { // r.client.Del(r.ctx, r.kvt.key) //} @@ -84,7 +84,7 @@ func (r *RedisLock) Unlock() { } } -func (r *RedisLock) IsHeld() bool { +func (r *RedisLock) IsHeldByCurrent() bool { return r.client.HExists(r.ctx, r.kvt.key, r.kvt.value).Val() } diff --git a/redisLock/redlock.go b/redisLock/redlock.go new file mode 100644 index 0000000..d20cbdc --- /dev/null +++ b/redisLock/redlock.go @@ -0,0 +1,81 @@ +package redisLock + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/go-redis/redis/v8" +) + +type RedLock struct { + locks []*RedisLockNew + RedLockOptions +} + +func (r *RedLock) UnLock() (err error) { + for _, lock := range r.locks { + if _error := lock.UnLock(); _error != nil { + if err == nil { + err = _error + } + } + } + + return +} + +func (r *RedLock) Lock() error { + successLock := 0 + for _, lock := range r.locks { + now := time.Now() + err := lock.Lock() + since := time.Since(now) + if err == nil && since < time.Duration(r.perHostTimeout) { + successLock++ + } + } + + if successLock < len(r.locks)>>1+1 { + return errors.New("redis lock timeout") + } + + return nil +} + +type RedisHost struct { + NetWork string + Host string + Pass string +} + +func NewRedLock(key string, hosts []*RedisHost, opts ...RedLockOption) (*RedLock, error) { + if len(hosts) < 3 { + return nil, fmt.Errorf("invalid redis host length %d", len(hosts)) + } + + l := &RedLock{} + + for _, opt := range opts { + opt(&l.RedLockOptions) + } + + l.repairOption() + if len(hosts)*l.perHostTimeout*10 > l.expireHostTime { + return nil, fmt.Errorf("invalid redis host length %d", len(hosts)*l.perHostTimeout*10) + } + + l.locks = make([]*RedisLockNew, 0, len(hosts)) + + for _, host := range hosts { + client := redis.NewClient(&redis.Options{ + Network: host.NetWork, + Addr: host.Host, + Password: host.Pass, + }) + l.locks = append(l.locks, NewRedisLock(client, context.Background(), key, WithExpireTime(5))) + } + + return l, nil +} diff --git a/utils/os.go b/utils/os.go new file mode 100644 index 0000000..50597d3 --- /dev/null +++ b/utils/os.go @@ -0,0 +1,25 @@ +package utils + +import ( + "fmt" + "os" + "runtime" + "strconv" + "strings" +) + +func GetCurrentProcessID() string { + return strconv.Itoa(os.Getpid()) +} + +// GetCurrentGoroutineID 获取当前的协程ID +func GetCurrentGoroutineID() string { + buf := make([]byte, 128) + buf = buf[:runtime.Stack(buf, false)] + stackInfo := string(buf) + return strings.TrimSpace(strings.Split(strings.Split(stackInfo, "[running]")[0], "goroutine")[1]) +} + +func GetProcessAndGoroutineIDStr() string { + return fmt.Sprintf("%s_%s", GetCurrentProcessID(), GetCurrentGoroutineID()) +} diff --git a/worker/rpc.go b/worker/rpc.go index e3ca986..3a2bd01 100644 --- a/worker/rpc.go +++ b/worker/rpc.go @@ -7,9 +7,7 @@ import ( "goRedisDLM/engine" "goRedisDLM/redisLock" "log" - "math/rand" "strconv" - "time" "github.com/go-redis/redis/v8" ) @@ -21,10 +19,12 @@ type Inventory struct { func (i *Inventory) Work(request engine.Request, response *engine.Response) error { ctx := context.Background() - lock := redisLock.CreateRedisLock(i.Client, ctx, request.Dlock, 100) + //lock := redisLock.CreateRedisLock(i.Client, ctx, request.Dlock, 100) + lock := redisLock.NewRedisLock(i.Client, ctx, "test_key", redisLock.WithBlock(), + redisLock.WithMaxWaitTime(20), redisLock.WithExpireTime(5)) lock.Lock() - time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond) + //time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond) get := i.Client.Get(ctx, "inventory") log.Printf("%s\n", get) @@ -42,7 +42,7 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro // //val := i.Client.Eval(ctx, script, []string{request.Dlock, "inventory"}, value.String(), inventory).Val() - if lock.IsHeld() { + if lock.IsHeldByCurrent() { setString := fmt.Sprintf("%d", inventory) if i.Client.Set(ctx, "inventory", setString, redis.KeepTTL).Val() != "OK" { log.Println("error set inventory") @@ -60,13 +60,13 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro err = errors.New("inventory err") goto Error } - lock.Unlock() + lock.UnLock() response.Id = request.Id return nil Error: - lock.Unlock() + lock.UnLock() return err } diff --git a/worker/service/main.go b/worker/service/main.go index daffe54..c1d9488 100644 --- a/worker/service/main.go +++ b/worker/service/main.go @@ -39,7 +39,7 @@ func main() { */ redisClient := redis.NewClient(&redis.Options{ Addr: *redisHost, - Password: "redhat", + Password: "", }) log.Fatal(rpcSupport.RpcServer(*port, &worker.Inventory{Client: redisClient}))