diff --git a/engine/firstengine.go b/engine/firstengine.go new file mode 100644 index 0000000..664e1dd --- /dev/null +++ b/engine/firstengine.go @@ -0,0 +1,62 @@ +package engine + +import ( + "log" + "strconv" + "time" +) + +type FirstEngine struct { + Scheduler Scheduler + WorkCount int + Work func(request Request) (Response, error) +} + +type Scheduler interface { + Submit(request Request) + ConfigChan() chan Request + Run() + WorkReady(chan Request) +} + +func (e *FirstEngine) CreateWorker(in chan Request, out chan Response) { + go func() { + for { + e.Scheduler.WorkReady(in) + req := <-in + work, err := e.Work(req) + if err != nil { + log.Printf("work err: %v", err) + } + out <- work + } + }() +} + +func (e *FirstEngine) Run(num int) { + out := make(chan Response) + + e.Scheduler.Run() + + start := time.Now() + for i := 0; i < e.WorkCount; i++ { + e.CreateWorker(e.Scheduler.ConfigChan(), out) + } + + log.Printf("time slice 1: %d", time.Since(start)/time.Millisecond) + for i := 0; i < num; i++ { + e.Scheduler.Submit(Request{Id: strconv.Itoa(i), Dlock: "redis_lock"}) + } + + log.Printf("time slice 2: %d", time.Since(start)/time.Millisecond) + i := 0 + for id := range out { + i++ + log.Println(id.Id) + if i >= num { + close(out) + } + } + + log.Printf("time slice 3: %d", time.Since(start)/time.Millisecond) +} diff --git a/engine/type.go b/engine/type.go new file mode 100644 index 0000000..a445506 --- /dev/null +++ b/engine/type.go @@ -0,0 +1,10 @@ +package engine + +type Request struct { + Id string + Dlock string +} + +type Response struct { + Id string +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..44687d3 --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module goRedisDLM + +go 1.22 + +require ( + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/go-redis/redis/v8 v8.11.5 // indirect + github.com/google/uuid v1.6.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..0e8a0f8 --- /dev/null +++ b/go.sum @@ -0,0 +1,8 @@ +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= diff --git a/main.go b/main.go new file mode 100644 index 0000000..75ca166 --- /dev/null +++ b/main.go @@ -0,0 +1,60 @@ +package main + +import ( + "flag" + "goRedisDLM/engine" + "goRedisDLM/rpcSupport" + "goRedisDLM/scheduler" + "goRedisDLM/worker/client" + "log" + "net/rpc" + "strings" +) + +var host = flag.String("host", "", "host") + +func main() { + flag.Parse() + if *host == "" { + flag.Usage() + + return + } + + e := engine.FirstEngine{ + Scheduler: &scheduler.FirstScheduler{}, + WorkCount: 10, + Work: client.CreateRpcWorker(createRpcPoll(*host)), + } + + e.Run(5000) +} + +func createRpcPoll(port string) chan *rpc.Client { + out := make(chan *rpc.Client) + + var clients []*rpc.Client + + split := strings.Split(port, ",") + for i, ele := range split { + cli, err := rpcSupport.RpcClient(ele) + if err != nil { + log.Printf("host: %s,%d create err: %v", ele, i, err) + continue + } + + log.Printf("rpc cli create ok: %s", ele) + + clients = append(clients, cli) + } + + go func() { + for { + for _, ele := range clients { + out <- ele + } + } + }() + + return out +} diff --git a/redisLock/redis.go b/redisLock/redis.go new file mode 100644 index 0000000..092bd90 --- /dev/null +++ b/redisLock/redis.go @@ -0,0 +1,61 @@ +package redisLock + +import ( + "context" + "log" + "time" + + "github.com/go-redis/redis/v8" +) + +type redisCommInfo struct { + key string + value string + ttl int +} + +type RedisLock struct { + client *redis.Client + ctx context.Context + kvt redisCommInfo +} + +func CreateRedisLock(c *redis.Client, ctx context.Context, k, v string, ttl int) *RedisLock { + return &RedisLock{c, ctx, redisCommInfo{k, v, ttl}} +} + +func (r *RedisLock) Lock() { + //for r.client.SetNX(r.ctx, r.kvt.key, r.kvt.value, time.Duration(r.kvt.ttl)*time.Second).Val() == false { + // time.Sleep(time.Millisecond * 20) + //} + + script := `if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then + redis.call('hincrby',KEYS[1],ARGV[1],1) + redis.call('expire',KEYS[1],ARGV[2]) + return 1 + else + return 0 + end` + + for r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value, r.kvt.ttl).Val().(int64) == 0 { + time.Sleep(time.Millisecond * 20) + } + + log.Printf("lock ok: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) +} + +func (r *RedisLock) Unlock() { + //if r.client.Get(r.ctx, r.kvt.key).Val() == r.kvt.value { + // r.client.Del(r.ctx, r.kvt.key) + //} + + //script := "if (redis.call('get',KEYS[1]) == ARGV[1]) then return redis.call('del',KEYS[1]) else return 0 end" + //r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value) + + script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then return nil + elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) + else return 0 end` + r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value) + + log.Printf("unlock ok: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl) +} diff --git a/rpcSupport/jsonrpc.go b/rpcSupport/jsonrpc.go new file mode 100644 index 0000000..32c79d9 --- /dev/null +++ b/rpcSupport/jsonrpc.go @@ -0,0 +1,39 @@ +package rpcSupport + +import ( + "log" + "net" + "net/rpc" + "net/rpc/jsonrpc" +) + +func RpcServer(host string, service interface{}) error { + err := rpc.Register(service) + if err != nil { + log.Println("rpc register error") + return err + } + listen, err := net.Listen("tcp", host) + if err != nil { + return err + } + + log.Printf("rpc server listen ok: %s", host) + + for { + accept, err := listen.Accept() + if err != nil { + log.Println("accept err: ", err) + } + jsonrpc.ServeConn(accept) + } +} + +func RpcClient(host string) (*rpc.Client, error) { + dial, err := net.Dial("tcp", host) + if err != nil { + return nil, err + } + + return jsonrpc.NewClient(dial), nil +} diff --git a/scheduler/firstscheduler.go b/scheduler/firstscheduler.go new file mode 100644 index 0000000..14090b1 --- /dev/null +++ b/scheduler/firstscheduler.go @@ -0,0 +1,56 @@ +package scheduler + +import ( + "goRedisDLM/engine" + "time" +) + +type FirstScheduler struct { + Request chan engine.Request + Worker chan chan engine.Request +} + +func (f *FirstScheduler) Submit(request engine.Request) { + f.Request <- request +} + +func (f *FirstScheduler) ConfigChan() chan engine.Request { + return make(chan engine.Request) +} + +func (f *FirstScheduler) Run() { + f.Request = make(chan engine.Request) + f.Worker = make(chan chan engine.Request) + + go func() { + var requests []engine.Request + var workers []chan engine.Request + + for { + var req engine.Request + var work chan engine.Request + + if len(requests) > 0 && len(workers) > 0 { + req = requests[0] + work = workers[0] + } + + select { + case r := <-f.Request: + requests = append(requests, r) + case w := <-f.Worker: + workers = append(workers, w) + case work <- req: + requests = requests[1:] + workers = workers[1:] + + case <-time.After(time.Millisecond * 100): + + } + } + }() +} + +func (f *FirstScheduler) WorkReady(requests chan engine.Request) { + f.Worker <- requests +} diff --git a/worker/client/rpcworker.go b/worker/client/rpcworker.go new file mode 100644 index 0000000..61c47a4 --- /dev/null +++ b/worker/client/rpcworker.go @@ -0,0 +1,24 @@ +package client + +import ( + "goRedisDLM/engine" + "net/rpc" +) + +func CreateRpcWorker(clients chan *rpc.Client) func(request engine.Request) (engine.Response, error) { + return func(request engine.Request) (engine.Response, error) { + client := <-clients + return RpcWorker(client, request) + } +} + +func RpcWorker(client *rpc.Client, request engine.Request) (engine.Response, error) { + out := engine.Response{} + + err := client.Call("Inventory.Work", request, &out) + if err != nil { + return engine.Response{Id: "err"}, err + } + + return out, nil +} diff --git a/worker/rpc.go b/worker/rpc.go new file mode 100644 index 0000000..9ada7a0 --- /dev/null +++ b/worker/rpc.go @@ -0,0 +1,53 @@ +package worker + +import ( + "context" + "errors" + "fmt" + "goRedisDLM/engine" + "goRedisDLM/redisLock" + "log" + "strconv" + + "github.com/go-redis/redis/v8" + "github.com/google/uuid" +) + +type Inventory struct { + Client *redis.Client +} + +func (i *Inventory) Work(request engine.Request, response *engine.Response) error { + value := uuid.New() + + log.Printf("uuid: %s, lock key: %s\n", value.String(), request.Dlock) + + lock := redisLock.CreateRedisLock(i.Client, context.Background(), request.Dlock, value.String(), 30) + lock.Lock() + + log.Printf("uuid: %s get dlock\n", value.String()) + get := i.Client.Get(context.Background(), "inventory") + log.Printf("%s\n", get) + + inventory, err := strconv.Atoi(get.Val()) + if err != nil { + goto Error + } + inventory = inventory - 1 + if inventory >= 0 { + i.Client.Set(context.Background(), "inventory", fmt.Sprintf("%d", inventory), redis.KeepTTL) + } else { + log.Printf("error DLock error :%d\n", inventory) + err = errors.New("inventory err") + goto Error + } + lock.Unlock() + + response.Id = request.Id + + return nil + +Error: + lock.Unlock() + return err +} diff --git a/worker/service/main.go b/worker/service/main.go new file mode 100644 index 0000000..cf4d649 --- /dev/null +++ b/worker/service/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "flag" + "goRedisDLM/rpcSupport" + "goRedisDLM/worker" + "log" + + "github.com/go-redis/redis/v8" +) + +var port = flag.String("port", "", "port") +var redisHost = flag.String("redis", "", "redis host") + +func main() { + flag.Parse() + if *port == "" || *redisHost == "" { + log.Println("port err") + flag.Usage() + + return + } + + redisClient := redis.NewClient(&redis.Options{ + Addr: *redisHost, + Password: "redhat", + }) + + log.Fatal(rpcSupport.RpcServer(*port, &worker.Inventory{Client: redisClient})) +}