175 lines
2.8 KiB
Go
175 lines
2.8 KiB
Go
package lock
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/redis/go-redis/v9"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
var ErrorsOtherLock = errors.New("not my lock")
|
|
|
|
type RedisLock struct {
|
|
client *redis.Client
|
|
token string
|
|
key string
|
|
opts Options
|
|
|
|
isRuning int32
|
|
calFn context.CancelFunc
|
|
}
|
|
|
|
func NewRedisLock(client *redis.Client, key string, opts ...Option) *RedisLock {
|
|
redisLock := &RedisLock{
|
|
client: client,
|
|
key: key,
|
|
token: fmt.Sprintf("%d", time.Now().UnixNano()),
|
|
}
|
|
|
|
for _, o := range opts {
|
|
o(&redisLock.opts)
|
|
}
|
|
|
|
redisLock.opts.repire()
|
|
|
|
return redisLock
|
|
}
|
|
|
|
func (r *RedisLock) tryLock(ctx context.Context) error {
|
|
res, err := r.client.SetNX(ctx, r.key, r.token, time.Second*time.Duration(r.opts.expireTime)).Result()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !res {
|
|
return errors.New("not my lock")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *RedisLock) reNew(ctx context.Context) error {
|
|
ires, err := r.client.Eval(ctx, LuaReNew, []string{r.key}, r.token, r.opts.expireTime).Result()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if res, ok := ires.(int64); ok && res == 1 {
|
|
return nil
|
|
}
|
|
|
|
return ErrorsOtherLock
|
|
}
|
|
|
|
func (r *RedisLock) unLock(ctx context.Context) error {
|
|
ires, err := r.client.Eval(ctx, LuaUnLock, []string{r.key}, r.token).Result()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if res, ok := ires.(int64); ok && res == 1 {
|
|
return nil
|
|
}
|
|
|
|
return ErrorsOtherLock
|
|
}
|
|
|
|
func (r *RedisLock) block(ctx context.Context) error {
|
|
var after <-chan time.Time
|
|
if r.opts.maxWaitTime > 0 {
|
|
after = time.After(time.Duration(r.opts.maxWaitTime))
|
|
}
|
|
|
|
ticker := time.NewTicker(200 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
for range ticker.C {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-after:
|
|
return errors.New("time out")
|
|
default:
|
|
}
|
|
|
|
err := r.tryLock(ctx)
|
|
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
if !errors.Is(err, ErrorsOtherLock) {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *RedisLock) watchDog(ctx context.Context) {
|
|
ticker := time.NewTicker(2 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
_ = r.reNew(ctx)
|
|
}
|
|
}
|
|
|
|
func (r *RedisLock) watch(ctx context.Context) {
|
|
if !r.opts.reNew {
|
|
return
|
|
}
|
|
|
|
for !atomic.CompareAndSwapInt32(&r.isRuning, 0, 1) {
|
|
}
|
|
|
|
var ctxn context.Context
|
|
ctxn, r.calFn = context.WithCancel(ctx)
|
|
|
|
atomic.StoreInt32(&r.isRuning, 1)
|
|
|
|
go func() {
|
|
defer func() {
|
|
atomic.StoreInt32(&r.isRuning, 0)
|
|
}()
|
|
r.watchDog(ctxn)
|
|
}()
|
|
}
|
|
|
|
func (r *RedisLock) Lock(ctx context.Context) (err error) {
|
|
defer func() {
|
|
if err == nil {
|
|
r.calFn = nil
|
|
r.watch(ctx)
|
|
}
|
|
}()
|
|
|
|
err = r.tryLock(ctx)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
if !r.opts.block || !errors.Is(err, ErrorsOtherLock) {
|
|
return err
|
|
}
|
|
|
|
err = r.block(ctx)
|
|
|
|
return err
|
|
}
|
|
|
|
func (r *RedisLock) UnLock(ctx context.Context) error {
|
|
if r.calFn != nil {
|
|
r.calFn()
|
|
}
|
|
|
|
return r.unLock(ctx)
|
|
}
|