package hashring import ( "context" "encoding/json" "errors" "fmt" "git.zhangshuocauc.cn/redhat/goconsistencehash/local" "github.com/redis/go-redis/v9" "strconv" ) const ( RingPreFix = "redis:consistence_hash:ring" NodePreFix = "redis:consistence_hash:virtual" DataKeysPreFix = "redis:consistence_hash:datakeys" ) type RedisHashRing struct { local.Locker client *redis.Client key string } func NewRedisHashRing(client *redis.Client, key string, lock local.Locker) *RedisHashRing { return &RedisHashRing{ Locker: lock, client: client, key: key, } } func (h *RedisHashRing) getRingKey() string { return fmt.Sprintf("%s:%s", RingPreFix, h.key) } func (h *RedisHashRing) getNodeKey() string { return fmt.Sprintf("%s:%s", NodePreFix, h.key) } func (h *RedisHashRing) getDataKey(nodeId string) string { return fmt.Sprintf("%s:%s:%s", DataKeysPreFix, h.key, nodeId) } func (h *RedisHashRing) Add(ctx context.Context, score int32, nodeId string) error { res, err := h.client.ZRangeByScore(ctx, h.getRingKey(), &redis.ZRangeBy{ Min: strconv.Itoa(int(score)), Max: strconv.Itoa(int(score)), }).Result() if err != nil { return err } if len(res) > 1 { return errors.New("score to many element") } var nodeIds []string if len(res) == 1 { if err := json.Unmarshal([]byte(res[0]), &nodeIds); err != nil { return err } for _, id := range nodeIds { if id == nodeId { return nil } } if err := h.client.ZRem(ctx, h.getRingKey(), score).Err(); err != nil { return err } } nodeIds = append(nodeIds, nodeId) appendNodeIds, _ := json.Marshal(nodeIds) if err := h.client.ZAdd(ctx, h.getRingKey(), redis.Z{ Score: float64(score), Member: appendNodeIds, }).Err(); err != nil { return err } return nil } func (h *RedisHashRing) Del(ctx context.Context, score int32, nodeId string) error { res, err := h.client.ZRangeByScore(ctx, h.getRingKey(), &redis.ZRangeBy{ Min: strconv.Itoa(int(score)), Max: strconv.Itoa(int(score)), }).Result() if err != nil { return err } if len(res) != 1 { return errors.New("score to error element") } var nodeIds []string if err := json.Unmarshal([]byte(res[0]), &nodeIds); err != nil { return err } var ( exists bool index int ) for idx, num := range nodeIds { if num == nodeId { exists = true index = idx break } } if !exists { return errors.New("the score have no such nodeid") } if err := h.client.ZRemRangeByScore(ctx, h.getRingKey(), strconv.Itoa(int(score)), strconv.Itoa(int(score))).Err(); err != nil { return err } if len(nodeIds) <= 1 { return nil } nodeIds = append(nodeIds[:index], nodeIds[index+1:]...) if err := h.client.ZAdd(ctx, h.getRingKey(), redis.Z{ Score: float64(score), Member: nodeIds, }).Err(); err != nil { return err } return nil } func (h *RedisHashRing) Floor(ctx context.Context, score int32) (int32, error) { res, err := h.client.ZRevRangeByScoreWithScores(ctx, h.getRingKey(), &redis.ZRangeBy{ Min: "-inf", Max: strconv.Itoa(int(score)), Count: 1, }).Result() if err != nil { return -1, err } if len(res) == 1 { return int32(res[0].Score), nil } res, err = h.client.ZRevRangeByScoreWithScores(ctx, h.getRingKey(), &redis.ZRangeBy{ Min: "-inf", Max: "+inf", Count: 1, }).Result() if err != nil { return -1, err } if len(res) == 1 { return int32(res[0].Score), nil } return -1, nil } func (h *RedisHashRing) Ceiling(ctx context.Context, score int32) (int32, error) { res, err := h.client.ZRangeByScoreWithScores(ctx, h.getRingKey(), &redis.ZRangeBy{ Min: strconv.Itoa(int(score)), Max: "+inf", Count: 1, }).Result() if err != nil { return -1, err } if len(res) == 1 { return int32(res[0].Score), nil } res, err = h.client.ZRangeByScoreWithScores(ctx, h.getRingKey(), &redis.ZRangeBy{ Min: "-inf", Max: "+inf", Count: 1, }).Result() if err != nil { return -1, err } if len(res) == 1 { return int32(res[0].Score), nil } return -1, nil } func (h *RedisHashRing) Node(ctx context.Context, score int32) ([]string, error) { res, err := h.client.ZRangeByScore(ctx, h.getRingKey(), &redis.ZRangeBy{ Min: strconv.Itoa(int(score)), Max: strconv.Itoa(int(score)), }).Result() if err != nil { return []string{}, err } if len(res) != 1 { return []string{}, errors.New("score to error element") } var nodeIds []string if err := json.Unmarshal([]byte(res[0]), &nodeIds); err != nil { return []string{}, err } return nodeIds, nil } func (h *RedisHashRing) Nodes(ctx context.Context) (map[string]int, error) { res, err := h.client.HGetAll(ctx, h.getNodeKey()).Result() if err != nil { return map[string]int{}, err } var keys map[string]int for key, value := range res { count, err := strconv.Atoi(value) if err != nil { continue } if keys == nil { keys = make(map[string]int) } keys[key] = count } return keys, nil } func (h *RedisHashRing) AddNodes(ctx context.Context, nodeId string, num int) error { if err := h.client.HSet(ctx, h.getNodeKey(), map[string]string{ nodeId: strconv.Itoa(num), }).Err(); err != nil { return err } return nil } func (h *RedisHashRing) DelNodes(ctx context.Context, nodeId string) error { if err := h.client.HDel(ctx, h.getNodeKey(), nodeId).Err(); err != nil { return err } return nil } func (h *RedisHashRing) Datas(ctx context.Context, nodeId string) (map[string]struct{}, error) { res, err := h.client.SMembers(ctx, h.getDataKey(nodeId)).Result() if err != nil { return map[string]struct{}{}, err } var datas map[string]struct{} for _, keys := range res { if datas == nil { datas = make(map[string]struct{}) } datas[keys] = struct{}{} } return datas, nil } func (h *RedisHashRing) AddDatas(ctx context.Context, nodeId string, datas map[string]struct{}) error { var addkeys []string for key := range datas { addkeys = append(addkeys, key) } if len(addkeys) == 0 { return nil } if err := h.client.SAdd(ctx, h.getDataKey(nodeId), addkeys).Err(); err != nil { return err } return nil } func (h *RedisHashRing) DelDatas(ctx context.Context, nodeId string, datas map[string]struct{}) error { var addkeys []string for key := range datas { addkeys = append(addkeys, key) } if len(addkeys) == 0 { return nil } if err := h.client.SRem(ctx, h.getDataKey(nodeId), addkeys).Err(); err != nil { return err } return nil }