324 lines
6.3 KiB
Go
324 lines
6.3 KiB
Go
package hashring
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"git.zhangshuocauc.cn/redhat/goconsistencehash/local"
|
|
"github.com/redis/go-redis/v9"
|
|
"strconv"
|
|
)
|
|
|
|
const (
|
|
RingPreFix = "redis:consistence_hash:ring"
|
|
NodePreFix = "redis:consistence_hash:virtual"
|
|
DataKeysPreFix = "redis:consistence_hash:datakeys"
|
|
)
|
|
|
|
type RedisHashRing struct {
|
|
local.Locker
|
|
client *redis.Client
|
|
key string
|
|
}
|
|
|
|
func NewRedisHashRing(client *redis.Client, key string, lock local.Locker) *RedisHashRing {
|
|
return &RedisHashRing{
|
|
Locker: lock,
|
|
client: client,
|
|
key: key,
|
|
}
|
|
}
|
|
|
|
func (h *RedisHashRing) getRingKey() string {
|
|
return fmt.Sprintf("%s:%s", RingPreFix, h.key)
|
|
}
|
|
|
|
func (h *RedisHashRing) getNodeKey() string {
|
|
return fmt.Sprintf("%s:%s", NodePreFix, h.key)
|
|
}
|
|
|
|
func (h *RedisHashRing) getDataKey(nodeId string) string {
|
|
return fmt.Sprintf("%s:%s:%s", DataKeysPreFix, h.key, nodeId)
|
|
}
|
|
|
|
func (h *RedisHashRing) Add(ctx context.Context, score int32, nodeId string) error {
|
|
res, err := h.client.ZRangeByScore(ctx, h.getRingKey(), &redis.ZRangeBy{
|
|
Min: strconv.Itoa(int(score)),
|
|
Max: strconv.Itoa(int(score)),
|
|
}).Result()
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(res) > 1 {
|
|
return errors.New("score to many element")
|
|
}
|
|
|
|
var nodeIds []string
|
|
if len(res) == 1 {
|
|
if err := json.Unmarshal([]byte(res[0]), &nodeIds); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, id := range nodeIds {
|
|
if id == nodeId {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
if err := h.client.ZRem(ctx, h.getRingKey(), score).Err(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
nodeIds = append(nodeIds, nodeId)
|
|
appendNodeIds, _ := json.Marshal(nodeIds)
|
|
|
|
if err := h.client.ZAdd(ctx, h.getRingKey(), redis.Z{
|
|
Score: float64(score),
|
|
Member: appendNodeIds,
|
|
}).Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *RedisHashRing) Del(ctx context.Context, score int32, nodeId string) error {
|
|
res, err := h.client.ZRangeByScore(ctx, h.getRingKey(), &redis.ZRangeBy{
|
|
Min: strconv.Itoa(int(score)),
|
|
Max: strconv.Itoa(int(score)),
|
|
}).Result()
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(res) != 1 {
|
|
return errors.New("score to error element")
|
|
}
|
|
|
|
var nodeIds []string
|
|
if err := json.Unmarshal([]byte(res[0]), &nodeIds); err != nil {
|
|
return err
|
|
}
|
|
|
|
var (
|
|
exists bool
|
|
index int
|
|
)
|
|
|
|
for idx, num := range nodeIds {
|
|
if num == nodeId {
|
|
exists = true
|
|
index = idx
|
|
break
|
|
}
|
|
}
|
|
|
|
if !exists {
|
|
return errors.New("the score have no such nodeid")
|
|
}
|
|
|
|
if err := h.client.ZRemRangeByScore(ctx, h.getRingKey(), strconv.Itoa(int(score)), strconv.Itoa(int(score))).Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(nodeIds) <= 1 {
|
|
return nil
|
|
}
|
|
|
|
nodeIds = append(nodeIds[:index], nodeIds[index+1:]...)
|
|
|
|
if err := h.client.ZAdd(ctx, h.getRingKey(), redis.Z{
|
|
Score: float64(score),
|
|
Member: nodeIds,
|
|
}).Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *RedisHashRing) Floor(ctx context.Context, score int32) (int32, error) {
|
|
res, err := h.client.ZRevRangeByScoreWithScores(ctx, h.getRingKey(), &redis.ZRangeBy{
|
|
Min: "-inf",
|
|
Max: strconv.Itoa(int(score)),
|
|
Count: 1,
|
|
}).Result()
|
|
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
if len(res) == 1 {
|
|
return int32(res[0].Score), nil
|
|
}
|
|
|
|
res, err = h.client.ZRevRangeByScoreWithScores(ctx, h.getRingKey(), &redis.ZRangeBy{
|
|
Min: "-inf",
|
|
Max: "+inf",
|
|
Count: 1,
|
|
}).Result()
|
|
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
if len(res) == 1 {
|
|
return int32(res[0].Score), nil
|
|
}
|
|
|
|
return -1, nil
|
|
}
|
|
|
|
func (h *RedisHashRing) Ceiling(ctx context.Context, score int32) (int32, error) {
|
|
res, err := h.client.ZRangeByScoreWithScores(ctx, h.getRingKey(), &redis.ZRangeBy{
|
|
Min: strconv.Itoa(int(score)),
|
|
Max: "+inf",
|
|
Count: 1,
|
|
}).Result()
|
|
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
if len(res) == 1 {
|
|
return int32(res[0].Score), nil
|
|
}
|
|
|
|
res, err = h.client.ZRangeByScoreWithScores(ctx, h.getRingKey(), &redis.ZRangeBy{
|
|
Min: "-inf",
|
|
Max: "+inf",
|
|
Count: 1,
|
|
}).Result()
|
|
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
if len(res) == 1 {
|
|
return int32(res[0].Score), nil
|
|
}
|
|
|
|
return -1, nil
|
|
}
|
|
|
|
func (h *RedisHashRing) Node(ctx context.Context, score int32) ([]string, error) {
|
|
res, err := h.client.ZRangeByScore(ctx, h.getRingKey(), &redis.ZRangeBy{
|
|
Min: strconv.Itoa(int(score)),
|
|
Max: strconv.Itoa(int(score)),
|
|
}).Result()
|
|
|
|
if err != nil {
|
|
return []string{}, err
|
|
}
|
|
|
|
if len(res) != 1 {
|
|
return []string{}, errors.New("score to error element")
|
|
}
|
|
|
|
var nodeIds []string
|
|
if err := json.Unmarshal([]byte(res[0]), &nodeIds); err != nil {
|
|
return []string{}, err
|
|
}
|
|
|
|
return nodeIds, nil
|
|
}
|
|
|
|
func (h *RedisHashRing) Nodes(ctx context.Context) (map[string]int, error) {
|
|
res, err := h.client.HGetAll(ctx, h.getNodeKey()).Result()
|
|
if err != nil {
|
|
return map[string]int{}, err
|
|
}
|
|
|
|
var keys map[string]int
|
|
for key, value := range res {
|
|
count, err := strconv.Atoi(value)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
if keys == nil {
|
|
keys = make(map[string]int)
|
|
}
|
|
|
|
keys[key] = count
|
|
}
|
|
|
|
return keys, nil
|
|
}
|
|
|
|
func (h *RedisHashRing) AddNodes(ctx context.Context, nodeId string, num int) error {
|
|
if err := h.client.HSet(ctx, h.getNodeKey(), map[string]string{
|
|
nodeId: strconv.Itoa(num),
|
|
}).Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *RedisHashRing) DelNodes(ctx context.Context, nodeId string) error {
|
|
if err := h.client.HDel(ctx, h.getNodeKey(), nodeId).Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *RedisHashRing) Datas(ctx context.Context, nodeId string) (map[string]struct{}, error) {
|
|
res, err := h.client.SMembers(ctx, h.getDataKey(nodeId)).Result()
|
|
if err != nil {
|
|
return map[string]struct{}{}, err
|
|
}
|
|
|
|
var datas map[string]struct{}
|
|
|
|
for _, keys := range res {
|
|
if datas == nil {
|
|
datas = make(map[string]struct{})
|
|
}
|
|
|
|
datas[keys] = struct{}{}
|
|
}
|
|
|
|
return datas, nil
|
|
}
|
|
|
|
func (h *RedisHashRing) AddDatas(ctx context.Context, nodeId string, datas map[string]struct{}) error {
|
|
var addkeys []string
|
|
for key := range datas {
|
|
addkeys = append(addkeys, key)
|
|
}
|
|
|
|
if len(addkeys) == 0 {
|
|
return nil
|
|
}
|
|
|
|
if err := h.client.SAdd(ctx, h.getDataKey(nodeId), addkeys).Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *RedisHashRing) DelDatas(ctx context.Context, nodeId string, datas map[string]struct{}) error {
|
|
var addkeys []string
|
|
for key := range datas {
|
|
addkeys = append(addkeys, key)
|
|
}
|
|
|
|
if len(addkeys) == 0 {
|
|
return nil
|
|
}
|
|
|
|
if err := h.client.SRem(ctx, h.getDataKey(nodeId), addkeys).Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|