This commit is contained in:
redhat 2025-02-06 16:48:15 +08:00
parent 511ffa1109
commit 868d533c8f
11 changed files with 413 additions and 0 deletions

62
engine/firstengine.go Normal file
View File

@ -0,0 +1,62 @@
package engine
import (
"log"
"strconv"
"time"
)
type FirstEngine struct {
Scheduler Scheduler
WorkCount int
Work func(request Request) (Response, error)
}
type Scheduler interface {
Submit(request Request)
ConfigChan() chan Request
Run()
WorkReady(chan Request)
}
func (e *FirstEngine) CreateWorker(in chan Request, out chan Response) {
go func() {
for {
e.Scheduler.WorkReady(in)
req := <-in
work, err := e.Work(req)
if err != nil {
log.Printf("work err: %v", err)
}
out <- work
}
}()
}
func (e *FirstEngine) Run(num int) {
out := make(chan Response)
e.Scheduler.Run()
start := time.Now()
for i := 0; i < e.WorkCount; i++ {
e.CreateWorker(e.Scheduler.ConfigChan(), out)
}
log.Printf("time slice 1: %d", time.Since(start)/time.Millisecond)
for i := 0; i < num; i++ {
e.Scheduler.Submit(Request{Id: strconv.Itoa(i), Dlock: "redis_lock"})
}
log.Printf("time slice 2: %d", time.Since(start)/time.Millisecond)
i := 0
for id := range out {
i++
log.Println(id.Id)
if i >= num {
close(out)
}
}
log.Printf("time slice 3: %d", time.Since(start)/time.Millisecond)
}

10
engine/type.go Normal file
View File

@ -0,0 +1,10 @@
package engine
type Request struct {
Id string
Dlock string
}
type Response struct {
Id string
}

10
go.mod Normal file
View File

@ -0,0 +1,10 @@
module goRedisDLM
go 1.22
require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/google/uuid v1.6.0 // indirect
)

8
go.sum Normal file
View File

@ -0,0 +1,8 @@
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=

60
main.go Normal file
View File

@ -0,0 +1,60 @@
package main
import (
"flag"
"goRedisDLM/engine"
"goRedisDLM/rpcSupport"
"goRedisDLM/scheduler"
"goRedisDLM/worker/client"
"log"
"net/rpc"
"strings"
)
var host = flag.String("host", "", "host")
func main() {
flag.Parse()
if *host == "" {
flag.Usage()
return
}
e := engine.FirstEngine{
Scheduler: &scheduler.FirstScheduler{},
WorkCount: 10,
Work: client.CreateRpcWorker(createRpcPoll(*host)),
}
e.Run(5000)
}
func createRpcPoll(port string) chan *rpc.Client {
out := make(chan *rpc.Client)
var clients []*rpc.Client
split := strings.Split(port, ",")
for i, ele := range split {
cli, err := rpcSupport.RpcClient(ele)
if err != nil {
log.Printf("host: %s,%d create err: %v", ele, i, err)
continue
}
log.Printf("rpc cli create ok: %s", ele)
clients = append(clients, cli)
}
go func() {
for {
for _, ele := range clients {
out <- ele
}
}
}()
return out
}

61
redisLock/redis.go Normal file
View File

@ -0,0 +1,61 @@
package redisLock
import (
"context"
"log"
"time"
"github.com/go-redis/redis/v8"
)
type redisCommInfo struct {
key string
value string
ttl int
}
type RedisLock struct {
client *redis.Client
ctx context.Context
kvt redisCommInfo
}
func CreateRedisLock(c *redis.Client, ctx context.Context, k, v string, ttl int) *RedisLock {
return &RedisLock{c, ctx, redisCommInfo{k, v, ttl}}
}
func (r *RedisLock) Lock() {
//for r.client.SetNX(r.ctx, r.kvt.key, r.kvt.value, time.Duration(r.kvt.ttl)*time.Second).Val() == false {
// time.Sleep(time.Millisecond * 20)
//}
script := `if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then
redis.call('hincrby',KEYS[1],ARGV[1],1)
redis.call('expire',KEYS[1],ARGV[2])
return 1
else
return 0
end`
for r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value, r.kvt.ttl).Val().(int64) == 0 {
time.Sleep(time.Millisecond * 20)
}
log.Printf("lock ok: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
}
func (r *RedisLock) Unlock() {
//if r.client.Get(r.ctx, r.kvt.key).Val() == r.kvt.value {
// r.client.Del(r.ctx, r.kvt.key)
//}
//script := "if (redis.call('get',KEYS[1]) == ARGV[1]) then return redis.call('del',KEYS[1]) else return 0 end"
//r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value)
script := `if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then return nil
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1])
else return 0 end`
r.client.Eval(r.ctx, script, []string{r.kvt.key}, r.kvt.value)
log.Printf("unlock ok: %s,%s,%d\n", r.kvt.key, r.kvt.value, r.kvt.ttl)
}

39
rpcSupport/jsonrpc.go Normal file
View File

@ -0,0 +1,39 @@
package rpcSupport
import (
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
func RpcServer(host string, service interface{}) error {
err := rpc.Register(service)
if err != nil {
log.Println("rpc register error")
return err
}
listen, err := net.Listen("tcp", host)
if err != nil {
return err
}
log.Printf("rpc server listen ok: %s", host)
for {
accept, err := listen.Accept()
if err != nil {
log.Println("accept err: ", err)
}
jsonrpc.ServeConn(accept)
}
}
func RpcClient(host string) (*rpc.Client, error) {
dial, err := net.Dial("tcp", host)
if err != nil {
return nil, err
}
return jsonrpc.NewClient(dial), nil
}

View File

@ -0,0 +1,56 @@
package scheduler
import (
"goRedisDLM/engine"
"time"
)
type FirstScheduler struct {
Request chan engine.Request
Worker chan chan engine.Request
}
func (f *FirstScheduler) Submit(request engine.Request) {
f.Request <- request
}
func (f *FirstScheduler) ConfigChan() chan engine.Request {
return make(chan engine.Request)
}
func (f *FirstScheduler) Run() {
f.Request = make(chan engine.Request)
f.Worker = make(chan chan engine.Request)
go func() {
var requests []engine.Request
var workers []chan engine.Request
for {
var req engine.Request
var work chan engine.Request
if len(requests) > 0 && len(workers) > 0 {
req = requests[0]
work = workers[0]
}
select {
case r := <-f.Request:
requests = append(requests, r)
case w := <-f.Worker:
workers = append(workers, w)
case work <- req:
requests = requests[1:]
workers = workers[1:]
case <-time.After(time.Millisecond * 100):
}
}
}()
}
func (f *FirstScheduler) WorkReady(requests chan engine.Request) {
f.Worker <- requests
}

View File

@ -0,0 +1,24 @@
package client
import (
"goRedisDLM/engine"
"net/rpc"
)
func CreateRpcWorker(clients chan *rpc.Client) func(request engine.Request) (engine.Response, error) {
return func(request engine.Request) (engine.Response, error) {
client := <-clients
return RpcWorker(client, request)
}
}
func RpcWorker(client *rpc.Client, request engine.Request) (engine.Response, error) {
out := engine.Response{}
err := client.Call("Inventory.Work", request, &out)
if err != nil {
return engine.Response{Id: "err"}, err
}
return out, nil
}

53
worker/rpc.go Normal file
View File

@ -0,0 +1,53 @@
package worker
import (
"context"
"errors"
"fmt"
"goRedisDLM/engine"
"goRedisDLM/redisLock"
"log"
"strconv"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
)
type Inventory struct {
Client *redis.Client
}
func (i *Inventory) Work(request engine.Request, response *engine.Response) error {
value := uuid.New()
log.Printf("uuid: %s, lock key: %s\n", value.String(), request.Dlock)
lock := redisLock.CreateRedisLock(i.Client, context.Background(), request.Dlock, value.String(), 30)
lock.Lock()
log.Printf("uuid: %s get dlock\n", value.String())
get := i.Client.Get(context.Background(), "inventory")
log.Printf("%s\n", get)
inventory, err := strconv.Atoi(get.Val())
if err != nil {
goto Error
}
inventory = inventory - 1
if inventory >= 0 {
i.Client.Set(context.Background(), "inventory", fmt.Sprintf("%d", inventory), redis.KeepTTL)
} else {
log.Printf("error DLock error :%d\n", inventory)
err = errors.New("inventory err")
goto Error
}
lock.Unlock()
response.Id = request.Id
return nil
Error:
lock.Unlock()
return err
}

30
worker/service/main.go Normal file
View File

@ -0,0 +1,30 @@
package main
import (
"flag"
"goRedisDLM/rpcSupport"
"goRedisDLM/worker"
"log"
"github.com/go-redis/redis/v8"
)
var port = flag.String("port", "", "port")
var redisHost = flag.String("redis", "", "redis host")
func main() {
flag.Parse()
if *port == "" || *redisHost == "" {
log.Println("port err")
flag.Usage()
return
}
redisClient := redis.NewClient(&redis.Options{
Addr: *redisHost,
Password: "redhat",
})
log.Fatal(rpcSupport.RpcServer(*port, &worker.Inventory{Client: redisClient}))
}