goconsistencehash/redis/hashring/hashring.go
2025-05-07 10:34:10 +08:00

324 lines
6.3 KiB
Go

package hashring
import (
"context"
"encoding/json"
"errors"
"fmt"
"git.zhangshuocauc.cn/redhat/goconsistencehash/local"
"github.com/redis/go-redis/v9"
"strconv"
)
const (
RingPreFix = "redis:consistence_hash:ring"
NodePreFix = "redis:consistence_hash:virtual"
DataKeysPreFix = "redis:consistence_hash:datakeys"
)
type RedisHashRing struct {
local.Locker
client *redis.Client
key string
}
func NewRedisHashRing(client *redis.Client, key string, lock local.Locker) *RedisHashRing {
return &RedisHashRing{
Locker: lock,
client: client,
key: key,
}
}
func (h *RedisHashRing) getRingKey() string {
return fmt.Sprintf("%s:%s", RingPreFix, h.key)
}
func (h *RedisHashRing) getNodeKey() string {
return fmt.Sprintf("%s:%s", NodePreFix, h.key)
}
func (h *RedisHashRing) getDataKey(nodeId string) string {
return fmt.Sprintf("%s:%s:%s", DataKeysPreFix, h.key, nodeId)
}
func (h *RedisHashRing) Add(ctx context.Context, score int32, nodeId string) error {
res, err := h.client.ZRangeByScore(ctx, h.getRingKey(), &redis.ZRangeBy{
Min: strconv.Itoa(int(score)),
Max: strconv.Itoa(int(score)),
}).Result()
if err != nil {
return err
}
if len(res) > 1 {
return errors.New("score to many element")
}
var nodeIds []string
if len(res) == 1 {
if err := json.Unmarshal([]byte(res[0]), &nodeIds); err != nil {
return err
}
for _, id := range nodeIds {
if id == nodeId {
return nil
}
}
if err := h.client.ZRem(ctx, h.getRingKey(), score).Err(); err != nil {
return err
}
}
nodeIds = append(nodeIds, nodeId)
appendNodeIds, _ := json.Marshal(nodeIds)
if err := h.client.ZAdd(ctx, h.getRingKey(), redis.Z{
Score: float64(score),
Member: appendNodeIds,
}).Err(); err != nil {
return err
}
return nil
}
func (h *RedisHashRing) Del(ctx context.Context, score int32, nodeId string) error {
res, err := h.client.ZRangeByScore(ctx, h.getRingKey(), &redis.ZRangeBy{
Min: strconv.Itoa(int(score)),
Max: strconv.Itoa(int(score)),
}).Result()
if err != nil {
return err
}
if len(res) != 1 {
return errors.New("score to error element")
}
var nodeIds []string
if err := json.Unmarshal([]byte(res[0]), &nodeIds); err != nil {
return err
}
var (
exists bool
index int
)
for idx, num := range nodeIds {
if num == nodeId {
exists = true
index = idx
break
}
}
if !exists {
return errors.New("the score have no such nodeid")
}
if err := h.client.ZRemRangeByScore(ctx, h.getRingKey(), strconv.Itoa(int(score)), strconv.Itoa(int(score))).Err(); err != nil {
return err
}
if len(nodeIds) <= 1 {
return nil
}
nodeIds = append(nodeIds[:index], nodeIds[index+1:]...)
if err := h.client.ZAdd(ctx, h.getRingKey(), redis.Z{
Score: float64(score),
Member: nodeIds,
}).Err(); err != nil {
return err
}
return nil
}
func (h *RedisHashRing) Floor(ctx context.Context, score int32) (int32, error) {
res, err := h.client.ZRevRangeByScoreWithScores(ctx, h.getRingKey(), &redis.ZRangeBy{
Min: "-inf",
Max: strconv.Itoa(int(score)),
Count: 1,
}).Result()
if err != nil {
return -1, err
}
if len(res) == 1 {
return int32(res[0].Score), nil
}
res, err = h.client.ZRevRangeByScoreWithScores(ctx, h.getRingKey(), &redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Count: 1,
}).Result()
if err != nil {
return -1, err
}
if len(res) == 1 {
return int32(res[0].Score), nil
}
return -1, nil
}
func (h *RedisHashRing) Ceiling(ctx context.Context, score int32) (int32, error) {
res, err := h.client.ZRangeByScoreWithScores(ctx, h.getRingKey(), &redis.ZRangeBy{
Min: strconv.Itoa(int(score)),
Max: "+inf",
Count: 1,
}).Result()
if err != nil {
return -1, err
}
if len(res) == 1 {
return int32(res[0].Score), nil
}
res, err = h.client.ZRangeByScoreWithScores(ctx, h.getRingKey(), &redis.ZRangeBy{
Min: "-inf",
Max: "+inf",
Count: 1,
}).Result()
if err != nil {
return -1, err
}
if len(res) == 1 {
return int32(res[0].Score), nil
}
return -1, nil
}
func (h *RedisHashRing) Node(ctx context.Context, score int32) ([]string, error) {
res, err := h.client.ZRangeByScore(ctx, h.getRingKey(), &redis.ZRangeBy{
Min: strconv.Itoa(int(score)),
Max: strconv.Itoa(int(score)),
}).Result()
if err != nil {
return []string{}, err
}
if len(res) != 1 {
return []string{}, errors.New("score to error element")
}
var nodeIds []string
if err := json.Unmarshal([]byte(res[0]), &nodeIds); err != nil {
return []string{}, err
}
return nodeIds, nil
}
func (h *RedisHashRing) Nodes(ctx context.Context) (map[string]int, error) {
res, err := h.client.HGetAll(ctx, h.getNodeKey()).Result()
if err != nil {
return map[string]int{}, err
}
var keys map[string]int
for key, value := range res {
count, err := strconv.Atoi(value)
if err != nil {
continue
}
if keys == nil {
keys = make(map[string]int)
}
keys[key] = count
}
return keys, nil
}
func (h *RedisHashRing) AddNodes(ctx context.Context, nodeId string, num int) error {
if err := h.client.HSet(ctx, h.getNodeKey(), map[string]string{
nodeId: strconv.Itoa(num),
}).Err(); err != nil {
return err
}
return nil
}
func (h *RedisHashRing) DelNodes(ctx context.Context, nodeId string) error {
if err := h.client.HDel(ctx, h.getNodeKey(), nodeId).Err(); err != nil {
return err
}
return nil
}
func (h *RedisHashRing) Datas(ctx context.Context, nodeId string) (map[string]struct{}, error) {
res, err := h.client.SMembers(ctx, h.getDataKey(nodeId)).Result()
if err != nil {
return map[string]struct{}{}, err
}
var datas map[string]struct{}
for _, keys := range res {
if datas == nil {
datas = make(map[string]struct{})
}
datas[keys] = struct{}{}
}
return datas, nil
}
func (h *RedisHashRing) AddDatas(ctx context.Context, nodeId string, datas map[string]struct{}) error {
var addkeys []string
for key := range datas {
addkeys = append(addkeys, key)
}
if len(addkeys) == 0 {
return nil
}
if err := h.client.SAdd(ctx, h.getDataKey(nodeId), addkeys).Err(); err != nil {
return err
}
return nil
}
func (h *RedisHashRing) DelDatas(ctx context.Context, nodeId string, datas map[string]struct{}) error {
var addkeys []string
for key := range datas {
addkeys = append(addkeys, key)
}
if len(addkeys) == 0 {
return nil
}
if err := h.client.SRem(ctx, h.getDataKey(nodeId), addkeys).Err(); err != nil {
return err
}
return nil
}