goconsistencehash/redis/lock/lock.go
2025-05-07 10:34:10 +08:00

175 lines
2.8 KiB
Go

package lock
import (
"context"
"errors"
"fmt"
"github.com/redis/go-redis/v9"
"sync/atomic"
"time"
)
var ErrorsOtherLock = errors.New("not my lock")
type RedisLock struct {
client *redis.Client
token string
key string
opts Options
isRuning int32
calFn context.CancelFunc
}
func NewRedisLock(client *redis.Client, key string, opts ...Option) *RedisLock {
redisLock := &RedisLock{
client: client,
key: key,
token: fmt.Sprintf("%d", time.Now().UnixNano()),
}
for _, o := range opts {
o(&redisLock.opts)
}
redisLock.opts.repire()
return redisLock
}
func (r *RedisLock) tryLock(ctx context.Context) error {
res, err := r.client.SetNX(ctx, r.key, r.token, time.Second*time.Duration(r.opts.expireTime)).Result()
if err != nil {
return err
}
if !res {
return errors.New("not my lock")
}
return nil
}
func (r *RedisLock) reNew(ctx context.Context) error {
ires, err := r.client.Eval(ctx, LuaReNew, []string{r.key}, r.token, r.opts.expireTime).Result()
if err != nil {
return err
}
if res, ok := ires.(int64); ok && res == 1 {
return nil
}
return ErrorsOtherLock
}
func (r *RedisLock) unLock(ctx context.Context) error {
ires, err := r.client.Eval(ctx, LuaUnLock, []string{r.key}, r.token).Result()
if err != nil {
return err
}
if res, ok := ires.(int64); ok && res == 1 {
return nil
}
return ErrorsOtherLock
}
func (r *RedisLock) block(ctx context.Context) error {
var after <-chan time.Time
if r.opts.maxWaitTime > 0 {
after = time.After(time.Duration(r.opts.maxWaitTime))
}
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
select {
case <-ctx.Done():
return ctx.Err()
case <-after:
return errors.New("time out")
default:
}
err := r.tryLock(ctx)
if err == nil {
return nil
}
if !errors.Is(err, ErrorsOtherLock) {
return err
}
}
return nil
}
func (r *RedisLock) watchDog(ctx context.Context) {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for range ticker.C {
select {
case <-ctx.Done():
return
default:
}
_ = r.reNew(ctx)
}
}
func (r *RedisLock) watch(ctx context.Context) {
if !r.opts.reNew {
return
}
for !atomic.CompareAndSwapInt32(&r.isRuning, 0, 1) {
}
var ctxn context.Context
ctxn, r.calFn = context.WithCancel(ctx)
atomic.StoreInt32(&r.isRuning, 1)
go func() {
defer func() {
atomic.StoreInt32(&r.isRuning, 0)
}()
r.watchDog(ctxn)
}()
}
func (r *RedisLock) Lock(ctx context.Context) (err error) {
defer func() {
if err == nil {
r.calFn = nil
r.watch(ctx)
}
}()
err = r.tryLock(ctx)
if err == nil {
return nil
}
if !r.opts.block || !errors.Is(err, ErrorsOtherLock) {
return err
}
err = r.block(ctx)
return err
}
func (r *RedisLock) UnLock(ctx context.Context) error {
if r.calFn != nil {
r.calFn()
}
return r.unLock(ctx)
}