1、增加新的redis锁实现方式

This commit is contained in:
redhat 2025-04-18 16:16:40 +08:00
parent 5afb8b1aa7
commit 78ef85201a
15 changed files with 497 additions and 10 deletions

8
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

5
.idea/codeStyles/codeStyleConfig.xml generated Normal file
View File

@ -0,0 +1,5 @@
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state>
</component>

9
.idea/goRedisDLM.iml generated Normal file
View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

8
.idea/modules.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/goRedisDLM.iml" filepath="$PROJECT_DIR$/.idea/goRedisDLM.iml" />
</modules>
</component>
</project>

8
.idea/watcherTasks.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectTasksOptions">
<enabled-global>
<option value="goimports" />
</enabled-global>
</component>
</project>

View File

@ -1,6 +1,7 @@
package engine
import (
"context"
"log"
"strconv"
"time"
@ -19,6 +20,8 @@ type Scheduler interface {
WorkReady(chan Request)
}
var ctx context.Context
func (e *FirstEngine) CreateWorker(in chan Request, out chan Response) {
go func() {
for {

195
redisLock/lock.go Normal file
View File

@ -0,0 +1,195 @@
package redisLock
import (
"context"
"errors"
"fmt"
"goRedisDLM/utils"
"log"
"sync/atomic"
"time"
"github.com/go-redis/redis/v8"
)
var ErrorNotMyLock = errors.New("not myLock")
const REDISLOCKPREFIXKEY = "goRedisLock"
func isErrorNotMyLock(err error) bool {
return errors.Is(err, ErrorNotMyLock)
}
type RedisLockNew struct {
RedisLockOptions
client *redis.Client
ctx context.Context
key string
token string
DogRunning int32
stopDog context.CancelFunc
}
func (r *RedisLockNew) IsHeldByCurrent() bool {
return r.client.Exists(r.ctx, r.getLockKey()).Val() == 1
}
func (r *RedisLockNew) getLockKey() string {
return REDISLOCKPREFIXKEY + "_" + r.key
}
func (r *RedisLockNew) blockingLock() error {
// maxWaitSecond为0时认为永久阻塞
var timeAfter <-chan time.Time
if r.maxWaitSecond > 0 {
timeAfter = time.After(time.Duration(r.maxWaitSecond) * time.Second)
}
ticker := time.NewTicker(time.Duration(500) * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
select {
case <-r.ctx.Done():
return r.ctx.Err()
case <-timeAfter:
return errors.New("timeout")
default:
}
err := r.tryLock()
if err == nil {
return nil
}
if !isErrorNotMyLock(err) {
return err
}
}
return nil
}
func (r *RedisLockNew) tryLock() error {
result, err := r.client.SetNX(r.ctx, r.getLockKey(), r.token,
time.Duration(r.RedisLockOptions.expireSecond)*time.Second).Result()
if err != nil {
return err
}
if !result {
return fmt.Errorf("try lock result: %v, err: %w", result, ErrorNotMyLock)
}
return nil
}
func (r *RedisLockNew) expireLock() error {
result, err := r.client.Eval(r.ctx, LuaExpireLock, []string{r.getLockKey()},
r.token, time.Duration(r.expireSecond)*time.Second).Result()
if err != nil {
return err
}
if res, ok := result.(int64); ok && res == 1 {
return nil
} else {
return fmt.Errorf("unlock err: ok: %v, res:%v, raw:%v", ok, err, result)
}
}
func (r *RedisLockNew) runWatchDog(ctx context.Context) {
ticker := time.NewTicker(time.Duration(10) * time.Second)
defer ticker.Stop()
for range ticker.C {
select {
case <-ctx.Done():
return
default:
}
if err := r.expireLock(); err != nil {
log.Printf("expire lock err: %v", err)
}
}
}
func (r *RedisLockNew) watchDog() {
if !r.isReNew {
return
}
for !atomic.CompareAndSwapInt32(&r.DogRunning, 0, 1) {
}
var ctx context.Context
ctx, r.stopDog = context.WithCancel(r.ctx)
go func() {
defer func() {
atomic.StoreInt32(&r.DogRunning, 0)
}()
r.runWatchDog(ctx)
}()
}
func (r *RedisLockNew) Lock() (err error) {
defer func() {
if err == nil {
r.watchDog()
}
}()
err = r.tryLock()
if err == nil {
return nil
}
if !isErrorNotMyLock(err) || !r.isBlock {
return err
}
err = r.blockingLock()
return err
}
func (r *RedisLockNew) UnLock() error {
defer func() {
if r.stopDog != nil {
r.stopDog()
}
}()
result, err := r.client.Eval(r.ctx, LuaUnLock, []string{r.getLockKey()}, r.token).Result()
if err != nil {
return err
}
if res, ok := result.(int64); ok && res == 1 {
return nil
} else {
return fmt.Errorf("unlock err: ok: %v, res:%v, raw:%v", ok, err, result)
}
}
func NewRedisLock(client *redis.Client, ctx context.Context, key string, opts ...RedisLockOption) *RedisLockNew {
lock := &RedisLockNew{
client: client,
ctx: ctx,
key: key,
token: utils.GetProcessAndGoroutineIDStr(),
}
for _, o := range opts {
o(&lock.RedisLockOptions)
}
lock.repairOption()
return lock
}

61
redisLock/lock_test.go Normal file
View File

@ -0,0 +1,61 @@
package redisLock
import (
"context"
"sync"
"testing"
"github.com/go-redis/redis/v8"
)
func Test_blockingLock(t *testing.T) {
addr := "192.168.8.1:6379"
passwd := ""
client := redis.NewClient(&redis.Options{
Addr: addr,
Password: passwd,
})
ctx := context.Background()
lock1 := NewRedisLock(client, ctx, "test_key", WithExpireTime(10))
lock2 := NewRedisLock(client, ctx, "test_key",
WithBlock(), WithExpireTime(5), WithMaxWaitTime(0))
var wg sync.WaitGroup
wg.Add(1)
go func() {
var err error
defer wg.Done()
defer func() {
if err == nil {
if err = lock1.UnLock(); err != nil {
t.Error(err)
}
}
}()
if err = lock1.Lock(); err != nil {
t.Error(err)
return
}
}()
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if err := lock2.UnLock(); err != nil {
t.Error(err)
}
}()
if err := lock2.Lock(); err != nil {
t.Error(err)
return
}
}()
wg.Wait()
t.Log("success")
}

13
redisLock/lua.go Normal file
View File

@ -0,0 +1,13 @@
package redisLock
const LuaUnLock = `
if (redis.call('get',KEYS[1]) == ARGV[1])
then return redis.call('del',KEYS[1])
else return 0
end`
const LuaExpireLock = `
if (redis.call('get',KEYS[1]) == ARGV[1])
then return redis.call('expire',KEYS[1],ARGV[2])
else return 0
end`

71
redisLock/option.go Normal file
View File

@ -0,0 +1,71 @@
package redisLock
const (
DEFAULTMAXWAITSECOND = 5
DEFAULTMAXEXPIRESECOND = 30
)
type RedisLockOptions struct {
isBlock bool
maxWaitSecond int
expireSecond int
isReNew bool
}
func (r *RedisLockOptions) repairOption() {
//if r.isBlock && r.maxWaitSecond <= 0 {
// r.maxWaitSecond = DEFAULTMAXWAITSECOND
//}
if r.expireSecond > 0 {
return
}
r.expireSecond = DEFAULTMAXEXPIRESECOND
r.isReNew = true
}
type RedisLockOption func(*RedisLockOptions)
func WithBlock() RedisLockOption {
return func(o *RedisLockOptions) {
o.isBlock = true
}
}
func WithMaxWaitTime(maxWaitTime int) RedisLockOption {
return func(o *RedisLockOptions) {
o.maxWaitSecond = maxWaitTime
}
}
func WithExpireTime(expireTime int) RedisLockOption {
return func(o *RedisLockOptions) {
o.expireSecond = expireTime
}
}
type RedLockOptions struct {
perHostTimeout int
expireHostTime int
}
func (r *RedLockOptions) repairOption() {
if r.perHostTimeout <= 0 {
r.perHostTimeout = DEFAULTMAXWAITSECOND
}
}
type RedLockOption func(*RedLockOptions)
func WithPerHostTimeout(perHostTimeout int) RedLockOption {
return func(o *RedLockOptions) {
o.perHostTimeout = perHostTimeout
}
}
func WithExpireHostTime(expireTime int) RedLockOption {
return func(o *RedLockOptions) {
o.expireHostTime = expireTime
}
}

View File

@ -55,7 +55,7 @@ func (r *RedisLock) Lock() {
log.Printf("lock ok: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
}
func (r *RedisLock) Unlock() {
func (r *RedisLock) UnLock() {
//if r.client.Get(r.ctx, r.kvt.key).Val() == r.kvt.value {
// r.client.Del(r.ctx, r.kvt.key)
//}
@ -84,7 +84,7 @@ func (r *RedisLock) Unlock() {
}
}
func (r *RedisLock) IsHeld() bool {
func (r *RedisLock) IsHeldByCurrent() bool {
return r.client.HExists(r.ctx, r.kvt.key, r.kvt.value).Val()
}

81
redisLock/redlock.go Normal file
View File

@ -0,0 +1,81 @@
package redisLock
import (
"context"
"errors"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
type RedLock struct {
locks []*RedisLockNew
RedLockOptions
}
func (r *RedLock) UnLock() (err error) {
for _, lock := range r.locks {
if _error := lock.UnLock(); _error != nil {
if err == nil {
err = _error
}
}
}
return
}
func (r *RedLock) Lock() error {
successLock := 0
for _, lock := range r.locks {
now := time.Now()
err := lock.Lock()
since := time.Since(now)
if err == nil && since < time.Duration(r.perHostTimeout) {
successLock++
}
}
if successLock < len(r.locks)>>1+1 {
return errors.New("redis lock timeout")
}
return nil
}
type RedisHost struct {
NetWork string
Host string
Pass string
}
func NewRedLock(key string, hosts []*RedisHost, opts ...RedLockOption) (*RedLock, error) {
if len(hosts) < 3 {
return nil, fmt.Errorf("invalid redis host length %d", len(hosts))
}
l := &RedLock{}
for _, opt := range opts {
opt(&l.RedLockOptions)
}
l.repairOption()
if len(hosts)*l.perHostTimeout*10 > l.expireHostTime {
return nil, fmt.Errorf("invalid redis host length %d", len(hosts)*l.perHostTimeout*10)
}
l.locks = make([]*RedisLockNew, 0, len(hosts))
for _, host := range hosts {
client := redis.NewClient(&redis.Options{
Network: host.NetWork,
Addr: host.Host,
Password: host.Pass,
})
l.locks = append(l.locks, NewRedisLock(client, context.Background(), key, WithExpireTime(5)))
}
return l, nil
}

25
utils/os.go Normal file
View File

@ -0,0 +1,25 @@
package utils
import (
"fmt"
"os"
"runtime"
"strconv"
"strings"
)
func GetCurrentProcessID() string {
return strconv.Itoa(os.Getpid())
}
// GetCurrentGoroutineID 获取当前的协程ID
func GetCurrentGoroutineID() string {
buf := make([]byte, 128)
buf = buf[:runtime.Stack(buf, false)]
stackInfo := string(buf)
return strings.TrimSpace(strings.Split(strings.Split(stackInfo, "[running]")[0], "goroutine")[1])
}
func GetProcessAndGoroutineIDStr() string {
return fmt.Sprintf("%s_%s", GetCurrentProcessID(), GetCurrentGoroutineID())
}

View File

@ -7,9 +7,7 @@ import (
"goRedisDLM/engine"
"goRedisDLM/redisLock"
"log"
"math/rand"
"strconv"
"time"
"github.com/go-redis/redis/v8"
)
@ -21,10 +19,12 @@ type Inventory struct {
func (i *Inventory) Work(request engine.Request, response *engine.Response) error {
ctx := context.Background()
lock := redisLock.CreateRedisLock(i.Client, ctx, request.Dlock, 100)
//lock := redisLock.CreateRedisLock(i.Client, ctx, request.Dlock, 100)
lock := redisLock.NewRedisLock(i.Client, ctx, "test_key", redisLock.WithBlock(),
redisLock.WithMaxWaitTime(20), redisLock.WithExpireTime(5))
lock.Lock()
time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond)
//time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond)
get := i.Client.Get(ctx, "inventory")
log.Printf("%s\n", get)
@ -42,7 +42,7 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro
//
//val := i.Client.Eval(ctx, script, []string{request.Dlock, "inventory"}, value.String(), inventory).Val()
if lock.IsHeld() {
if lock.IsHeldByCurrent() {
setString := fmt.Sprintf("%d", inventory)
if i.Client.Set(ctx, "inventory", setString, redis.KeepTTL).Val() != "OK" {
log.Println("error set inventory")
@ -60,13 +60,13 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro
err = errors.New("inventory err")
goto Error
}
lock.Unlock()
lock.UnLock()
response.Id = request.Id
return nil
Error:
lock.Unlock()
lock.UnLock()
return err
}

View File

@ -39,7 +39,7 @@ func main() {
*/
redisClient := redis.NewClient(&redis.Options{
Addr: *redisHost,
Password: "redhat",
Password: "",
})
log.Fatal(rpcSupport.RpcServer(*port, &worker.Inventory{Client: redisClient}))