完善自动续期,解决死锁和续期报错问题
This commit is contained in:
parent
3b2da94e92
commit
a031809960
2
main.go
2
main.go
@ -27,7 +27,7 @@ func main() {
|
||||
Work: client.CreateRpcWorker(createRpcPoll(*host)),
|
||||
}
|
||||
|
||||
e.Run(10)
|
||||
e.Run(5000)
|
||||
}
|
||||
|
||||
func createRpcPoll(port string) chan *rpc.Client {
|
||||
|
@ -33,7 +33,7 @@ func (r *RedisLock) Lock() {
|
||||
|
||||
script := `if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then
|
||||
redis.call('hincrby',KEYS[1],ARGV[1],1)
|
||||
redis.call('expire',KEYS[1],ARGV[2])
|
||||
redis.call('pexpire',KEYS[1],ARGV[2])
|
||||
return 1
|
||||
else
|
||||
return 0
|
||||
@ -47,7 +47,7 @@ func (r *RedisLock) Lock() {
|
||||
case r.lockKeepAliveCh <- struct{}{}:
|
||||
go r.autoReNewExpire()
|
||||
default:
|
||||
|
||||
log.Printf("start renew expore err: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
|
||||
}
|
||||
|
||||
log.Printf("lock ok: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
|
||||
@ -65,8 +65,17 @@ func (r *RedisLock) Unlock() {
|
||||
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1])
|
||||
else return 0 end`
|
||||
|
||||
if r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value).Val().(int64) == 1 {
|
||||
r.lockKeepDoneCh <- struct{}{}
|
||||
r.lockKeepDoneCh <- struct{}{} //这里一定要放到if外面,并且进来就要退掉定时线程,开始死锁是因为续期失败
|
||||
//导致续期协程return掉,再往lockKeepDoneCh写数据时就会死锁。而且不能只在判断解锁成功时给该管道发送数据。如果解锁失败不掉用写数据
|
||||
//会导致续期协程永不退出。ttl时间不能太短,太短可能会导致续期还没有完成key过期。30ms会有问题。网络环境差该值还要继续增大。
|
||||
|
||||
//for len(r.lockKeepAliveCh) > 0 {
|
||||
// //<-r.lockKeepAliveCh
|
||||
//}
|
||||
|
||||
val := r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value).Val()
|
||||
|
||||
if i, ok := val.(int64); ok && i == 1 {
|
||||
log.Printf("unlock ok: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
|
||||
} else {
|
||||
log.Printf("unlock err: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
|
||||
@ -78,19 +87,22 @@ func (r *RedisLock) autoReNewExpire() {
|
||||
<-r.lockKeepAliveCh
|
||||
}()
|
||||
|
||||
tc := time.NewTicker(time.Duration(r.kvt.ttl) * time.Second / 3)
|
||||
log.Printf("auto renew start: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
|
||||
|
||||
tc := time.NewTicker(time.Duration(r.kvt.ttl) * time.Millisecond / 5)
|
||||
defer tc.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-tc.C:
|
||||
script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('expire',KEYS[1],ARGV[2])
|
||||
script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('pexpire',KEYS[1],ARGV[2])
|
||||
else return 0 end`
|
||||
|
||||
i, ok := r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value, r.kvt.ttl).Val().(int64)
|
||||
if !ok || i == 0 {
|
||||
val := r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value, r.kvt.ttl).Val()
|
||||
|
||||
if i, ok := val.(int64); !ok || i == 0 {
|
||||
log.Printf("auto renew err: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
|
||||
return
|
||||
//return
|
||||
}
|
||||
case <-r.lockKeepDoneCh:
|
||||
log.Printf("auto renew down: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"goRedisDLM/engine"
|
||||
"goRedisDLM/redisLock"
|
||||
"log"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@ -23,12 +24,11 @@ func (i *Inventory) Work(request engine.Request, response *engine.Response) erro
|
||||
|
||||
log.Printf("uuid: %s, lock key: %s\n", value.String(), request.Dlock)
|
||||
|
||||
lock := redisLock.CreateRedisLock(i.Client, context.Background(), request.Dlock, value.String(), 3)
|
||||
lock := redisLock.CreateRedisLock(i.Client, context.Background(), request.Dlock, value.String(), 300)
|
||||
lock.Lock()
|
||||
|
||||
time.Sleep(time.Second * 5)
|
||||
time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond)
|
||||
|
||||
log.Printf("uuid: %s get dlock\n", value.String())
|
||||
get := i.Client.Get(context.Background(), "inventory")
|
||||
log.Printf("%s\n", get)
|
||||
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"goRedisDLM/rpcSupport"
|
||||
"goRedisDLM/worker"
|
||||
"log"
|
||||
_ "net/http/pprof"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
@ -21,6 +22,21 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
/*
|
||||
pprof, err := strconv.Atoi(strings.Trim(*port, ":"))
|
||||
if err != nil {
|
||||
log.Println("pprof port error")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
pprofPort := fmt.Sprintf(":%d", pprof+100)
|
||||
|
||||
go func() {
|
||||
log.Println("pprof listen on: ", pprofPort)
|
||||
log.Fatal(http.ListenAndServe(pprofPort, nil))
|
||||
}()
|
||||
*/
|
||||
redisClient := redis.NewClient(&redis.Options{
|
||||
Addr: *redisHost,
|
||||
Password: "redhat",
|
||||
|
Loading…
Reference in New Issue
Block a user