279 lines
4.7 KiB
Go
279 lines
4.7 KiB
Go
package consistencehash
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"math"
|
||
)
|
||
|
||
type Migrator func(ctx context.Context, datas map[string]struct{}, from, to string) error
|
||
|
||
func (c *ConsistenceHash) decrScore(score int32) int32 {
|
||
if score == 0 {
|
||
return (math.MaxInt32 - 1)
|
||
}
|
||
|
||
return (score - 1)
|
||
}
|
||
|
||
func (c *ConsistenceHash) incrScore(score int32) int32 {
|
||
if score == (math.MaxInt32 - 1) {
|
||
return 0
|
||
}
|
||
|
||
return (score + 1)
|
||
}
|
||
|
||
func (c *ConsistenceHash) migratorIn(ctx context.Context, score int32, nodeId string) (datas map[string]struct{}, from, to string, _err error) {
|
||
if c.migrator == nil {
|
||
return
|
||
}
|
||
|
||
nodeIds, err := c.hashRing.Node(ctx, score)
|
||
if err != nil {
|
||
_err = err
|
||
return
|
||
}
|
||
|
||
if len(nodeIds) > 1 {
|
||
return
|
||
}
|
||
|
||
lastScore, err := c.hashRing.Floor(ctx, c.decrScore(score))
|
||
if err != nil {
|
||
_err = err
|
||
return
|
||
}
|
||
|
||
if lastScore == -1 || lastScore == score {
|
||
return
|
||
}
|
||
|
||
nextScore, err := c.hashRing.Ceiling(ctx, c.incrScore(score))
|
||
if err != nil {
|
||
_err = err
|
||
}
|
||
|
||
if nextScore == -1 || nextScore == score {
|
||
return
|
||
}
|
||
|
||
patternOne := lastScore > score
|
||
|
||
if patternOne {
|
||
lastScore -= math.MaxInt32
|
||
}
|
||
|
||
nextNode, err := c.hashRing.Node(ctx, nextScore)
|
||
if err != nil {
|
||
_err = err
|
||
return
|
||
}
|
||
|
||
if len(nextNode) == 0 {
|
||
return
|
||
}
|
||
|
||
if c.getRawNodeId(nextNode[0]) == c.getRawNodeId(nodeId) {
|
||
return
|
||
}
|
||
|
||
_datas, err := c.hashRing.Datas(ctx, nextNode[0])
|
||
if err != nil {
|
||
_err = err
|
||
return
|
||
}
|
||
|
||
if len(_datas) == 0 {
|
||
return
|
||
}
|
||
|
||
for key := range _datas {
|
||
sum := c.encrytor.Hash(key)
|
||
if patternOne && (sum > (lastScore + math.MaxInt32)) {
|
||
sum -= math.MaxInt32
|
||
}
|
||
|
||
if sum < lastScore || sum >= score {
|
||
continue
|
||
}
|
||
|
||
if datas == nil {
|
||
datas = make(map[string]struct{})
|
||
}
|
||
|
||
datas[key] = struct{}{}
|
||
}
|
||
|
||
to = c.getRawNodeId(nodeId)
|
||
from = c.getRawNodeId(nextNode[0])
|
||
|
||
if err := c.hashRing.DelDatas(ctx, nextNode[0], datas); err != nil {
|
||
_err = err
|
||
return
|
||
}
|
||
|
||
if err := c.hashRing.AddDatas(ctx, nodeId, datas); err != nil {
|
||
_err = err
|
||
return
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
func (c *ConsistenceHash) migratorOut(ctx context.Context, score int32, nodeId string,
|
||
allDatas *map[string]struct{}) (datas map[string]struct{}, from, to string, _err error) {
|
||
|
||
var rawTo string
|
||
|
||
defer func () {
|
||
if _err !=nil{
|
||
return
|
||
}
|
||
|
||
if to == "" || len(datas)==0{
|
||
return
|
||
}
|
||
|
||
if _err = c.hashRing.DelDatas(ctx, nodeId, datas); _err != nil {
|
||
return
|
||
}
|
||
|
||
_err = c.hashRing.AddDatas(ctx, rawTo, datas)
|
||
}()
|
||
|
||
if c.migrator == nil {
|
||
return
|
||
}
|
||
|
||
nodes, err := c.hashRing.Node(ctx, score)
|
||
fmt.Println(nodes,score)
|
||
if err != nil {
|
||
_err = err
|
||
return
|
||
}
|
||
|
||
if len(nodes) == 0 {
|
||
return
|
||
}
|
||
|
||
if nodes[0] != nodeId {
|
||
return
|
||
}
|
||
|
||
if len(*allDatas) == 0 {
|
||
return
|
||
}
|
||
|
||
lastScore, err := c.hashRing.Floor(ctx, c.decrScore(score))
|
||
fmt.Println(lastScore,err)
|
||
if err != nil {
|
||
_err = err
|
||
return
|
||
}
|
||
|
||
if lastScore == -1 {
|
||
return
|
||
}
|
||
|
||
fmt.Printf("score: %d, lastscore: %d",score,lastScore)
|
||
|
||
if lastScore == score {
|
||
for _, nodeIds := range nodes {
|
||
if c.getRawNodeId(nodeIds) != c.getRawNodeId(nodeId) {
|
||
rawTo = nodeIds
|
||
}
|
||
}
|
||
|
||
if rawTo == "" {
|
||
return
|
||
}
|
||
}
|
||
|
||
patternOne := lastScore > score //这里必须定义模式,如果直接让sum > (lastScore + math.MaxInt32)判断,后面括号可能会溢出
|
||
|
||
if patternOne {
|
||
lastScore -= math.MaxInt32
|
||
}
|
||
|
||
fmt.Println("del lastscore", lastScore, "score", score)
|
||
|
||
for key := range *allDatas {
|
||
sum := c.encrytor.Hash(key)
|
||
fmt.Println("del sum", sum, "getsum", lastScore+math.MaxInt32)
|
||
if patternOne && (sum > (lastScore + math.MaxInt32)) {
|
||
sum -= math.MaxInt32
|
||
}
|
||
|
||
if sum < lastScore || sum >= score {
|
||
continue
|
||
}
|
||
|
||
if datas == nil {
|
||
datas = make(map[string]struct{})
|
||
}
|
||
|
||
datas[key] = struct{}{}
|
||
delete(*allDatas, key)
|
||
}
|
||
|
||
from = c.getRawNodeId(nodeId)
|
||
if rawTo != "" {
|
||
to = c.getRawNodeId(rawTo)
|
||
return
|
||
}
|
||
|
||
rawTo, err = c.getNextNode(ctx, score, nodeId, nil)
|
||
if err != nil {
|
||
_err = err
|
||
return
|
||
}
|
||
|
||
to = c.getRawNodeId(rawTo)
|
||
|
||
if to == "" {
|
||
_err = errors.New("unavailable to node")
|
||
return
|
||
}
|
||
|
||
return
|
||
}
|
||
|
||
func (c *ConsistenceHash) getNextNode(ctx context.Context, score int32, nodeId string, dup map[int32]struct{}) (string, error) {
|
||
nextScore, err := c.hashRing.Ceiling(ctx, c.incrScore(score))
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
if nextScore == -1 {
|
||
return "", nil
|
||
}
|
||
|
||
if _, ok := dup[nextScore]; ok {
|
||
return "", nil
|
||
}
|
||
|
||
nodes, err := c.hashRing.Node(ctx, nextScore)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
if len(nodes) == 0 {
|
||
return "", errors.New("un available next node")
|
||
}
|
||
|
||
for _, nodeIds := range nodes {
|
||
if c.getRawNodeId(nodeIds) != c.getRawNodeId(nodeId) {
|
||
return nodeIds, nil
|
||
}
|
||
}
|
||
|
||
if dup == nil {
|
||
dup = make(map[int32]struct{})
|
||
}
|
||
dup[nextScore] = struct{}{}
|
||
|
||
return c.getNextNode(ctx, nextScore, nodeId, dup)
|
||
}
|