goconsistencehash/migrator.go
2025-05-07 10:34:10 +08:00

279 lines
4.7 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package consistencehash
import (
"context"
"errors"
"fmt"
"math"
)
type Migrator func(ctx context.Context, datas map[string]struct{}, from, to string) error
func (c *ConsistenceHash) decrScore(score int32) int32 {
if score == 0 {
return (math.MaxInt32 - 1)
}
return (score - 1)
}
func (c *ConsistenceHash) incrScore(score int32) int32 {
if score == (math.MaxInt32 - 1) {
return 0
}
return (score + 1)
}
func (c *ConsistenceHash) migratorIn(ctx context.Context, score int32, nodeId string) (datas map[string]struct{}, from, to string, _err error) {
if c.migrator == nil {
return
}
nodeIds, err := c.hashRing.Node(ctx, score)
if err != nil {
_err = err
return
}
if len(nodeIds) > 1 {
return
}
lastScore, err := c.hashRing.Floor(ctx, c.decrScore(score))
if err != nil {
_err = err
return
}
if lastScore == -1 || lastScore == score {
return
}
nextScore, err := c.hashRing.Ceiling(ctx, c.incrScore(score))
if err != nil {
_err = err
}
if nextScore == -1 || nextScore == score {
return
}
patternOne := lastScore > score
if patternOne {
lastScore -= math.MaxInt32
}
nextNode, err := c.hashRing.Node(ctx, nextScore)
if err != nil {
_err = err
return
}
if len(nextNode) == 0 {
return
}
if c.getRawNodeId(nextNode[0]) == c.getRawNodeId(nodeId) {
return
}
_datas, err := c.hashRing.Datas(ctx, nextNode[0])
if err != nil {
_err = err
return
}
if len(_datas) == 0 {
return
}
for key := range _datas {
sum := c.encrytor.Hash(key)
if patternOne && (sum > (lastScore + math.MaxInt32)) {
sum -= math.MaxInt32
}
if sum < lastScore || sum >= score {
continue
}
if datas == nil {
datas = make(map[string]struct{})
}
datas[key] = struct{}{}
}
to = c.getRawNodeId(nodeId)
from = c.getRawNodeId(nextNode[0])
if err := c.hashRing.DelDatas(ctx, nextNode[0], datas); err != nil {
_err = err
return
}
if err := c.hashRing.AddDatas(ctx, nodeId, datas); err != nil {
_err = err
return
}
return
}
func (c *ConsistenceHash) migratorOut(ctx context.Context, score int32, nodeId string,
allDatas *map[string]struct{}) (datas map[string]struct{}, from, to string, _err error) {
var rawTo string
defer func () {
if _err !=nil{
return
}
if to == "" || len(datas)==0{
return
}
if _err = c.hashRing.DelDatas(ctx, nodeId, datas); _err != nil {
return
}
_err = c.hashRing.AddDatas(ctx, rawTo, datas)
}()
if c.migrator == nil {
return
}
nodes, err := c.hashRing.Node(ctx, score)
fmt.Println(nodes,score)
if err != nil {
_err = err
return
}
if len(nodes) == 0 {
return
}
if nodes[0] != nodeId {
return
}
if len(*allDatas) == 0 {
return
}
lastScore, err := c.hashRing.Floor(ctx, c.decrScore(score))
fmt.Println(lastScore,err)
if err != nil {
_err = err
return
}
if lastScore == -1 {
return
}
fmt.Printf("score: %d, lastscore: %d",score,lastScore)
if lastScore == score {
for _, nodeIds := range nodes {
if c.getRawNodeId(nodeIds) != c.getRawNodeId(nodeId) {
rawTo = nodeIds
}
}
if rawTo == "" {
return
}
}
patternOne := lastScore > score //这里必须定义模式如果直接让sum > (lastScore + math.MaxInt32)判断,后面括号可能会溢出
if patternOne {
lastScore -= math.MaxInt32
}
fmt.Println("del lastscore", lastScore, "score", score)
for key := range *allDatas {
sum := c.encrytor.Hash(key)
fmt.Println("del sum", sum, "getsum", lastScore+math.MaxInt32)
if patternOne && (sum > (lastScore + math.MaxInt32)) {
sum -= math.MaxInt32
}
if sum < lastScore || sum >= score {
continue
}
if datas == nil {
datas = make(map[string]struct{})
}
datas[key] = struct{}{}
delete(*allDatas, key)
}
from = c.getRawNodeId(nodeId)
if rawTo != "" {
to = c.getRawNodeId(rawTo)
return
}
rawTo, err = c.getNextNode(ctx, score, nodeId, nil)
if err != nil {
_err = err
return
}
to = c.getRawNodeId(rawTo)
if to == "" {
_err = errors.New("unavailable to node")
return
}
return
}
func (c *ConsistenceHash) getNextNode(ctx context.Context, score int32, nodeId string, dup map[int32]struct{}) (string, error) {
nextScore, err := c.hashRing.Ceiling(ctx, c.incrScore(score))
if err != nil {
return "", err
}
if nextScore == -1 {
return "", nil
}
if _, ok := dup[nextScore]; ok {
return "", nil
}
nodes, err := c.hashRing.Node(ctx, nextScore)
if err != nil {
return "", err
}
if len(nodes) == 0 {
return "", errors.New("un available next node")
}
for _, nodeIds := range nodes {
if c.getRawNodeId(nodeIds) != c.getRawNodeId(nodeId) {
return nodeIds, nil
}
}
if dup == nil {
dup = make(map[int32]struct{})
}
dup[nextScore] = struct{}{}
return c.getNextNode(ctx, nextScore, nodeId, dup)
}