1、时间轮增加微对象列表,简化周期计算。
This commit is contained in:
parent
2f36f2338b
commit
f4459ebec4
37
timewheel.go
37
timewheel.go
@ -13,14 +13,17 @@ type taskSlice struct {
|
|||||||
rawCycle int
|
rawCycle int
|
||||||
mode TimeMode
|
mode TimeMode
|
||||||
task func()
|
task func()
|
||||||
|
tiny bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type TimeWheel struct {
|
type TimeWheel struct {
|
||||||
sync.Once
|
sync.Once
|
||||||
curtSlot int
|
curtSlot int
|
||||||
|
slotLen int
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
ticker *time.Ticker
|
ticker *time.Ticker
|
||||||
slots []*list.List
|
slots []*list.List
|
||||||
|
tinyWheel *list.List
|
||||||
taskMap map[string]*list.Element
|
taskMap map[string]*list.Element
|
||||||
stopC chan struct{}
|
stopC chan struct{}
|
||||||
addTaskC chan *taskSlice
|
addTaskC chan *taskSlice
|
||||||
@ -51,6 +54,8 @@ func New(slots int, interval time.Duration) *TimeWheel {
|
|||||||
stopC: make(chan struct{}),
|
stopC: make(chan struct{}),
|
||||||
addTaskC: make(chan *taskSlice),
|
addTaskC: make(chan *taskSlice),
|
||||||
removeTaskC: make(chan string),
|
removeTaskC: make(chan string),
|
||||||
|
tinyWheel: list.New(),
|
||||||
|
slotLen: slots,
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < slots; i++ {
|
for i := 0; i < slots; i++ {
|
||||||
@ -62,7 +67,7 @@ func New(slots int, interval time.Duration) *TimeWheel {
|
|||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TimeWheel) AddTask(id string, mode TimeMode, task func(), flower time.Time) {
|
func (t *TimeWheel) AddTask(id string, mode TimeMode, task func(), flower time.Duration) {
|
||||||
cycle, pos := t.getSlotPosAndCycle(flower)
|
cycle, pos := t.getSlotPosAndCycle(flower)
|
||||||
|
|
||||||
t.addTaskC <- &taskSlice{
|
t.addTaskC <- &taskSlice{
|
||||||
@ -72,6 +77,7 @@ func (t *TimeWheel) AddTask(id string, mode TimeMode, task func(), flower time.T
|
|||||||
mode: mode,
|
mode: mode,
|
||||||
rawCycle: cycle,
|
rawCycle: cycle,
|
||||||
pos: pos,
|
pos: pos,
|
||||||
|
tiny: int(flower) < t.slotLen*int(t.interval),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,10 +92,10 @@ func (t *TimeWheel) Stop() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TimeWheel) getSlotPosAndCycle(flower time.Time) (int, int) {
|
func (t *TimeWheel) getSlotPosAndCycle(flower time.Duration) (int, int) {
|
||||||
delay := time.Until(flower)
|
delay := time.Until(time.Now().Add(flower))
|
||||||
cycle := delay / (t.interval * time.Duration(len(t.slots)))
|
cycle := delay / (t.interval * time.Duration(t.slotLen))
|
||||||
pos := (t.curtSlot + int(delay/t.interval)) % len(t.slots)
|
pos := (t.curtSlot + int(delay/t.interval)) % t.slotLen
|
||||||
|
|
||||||
return int(cycle), pos
|
return int(cycle), pos
|
||||||
}
|
}
|
||||||
@ -114,10 +120,11 @@ func (t *TimeWheel) tick() {
|
|||||||
defer t.updateCurtSlot()
|
defer t.updateCurtSlot()
|
||||||
|
|
||||||
t.execute(list)
|
t.execute(list)
|
||||||
|
t.execute(t.tinyWheel)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TimeWheel) updateCurtSlot() {
|
func (t *TimeWheel) updateCurtSlot() {
|
||||||
t.curtSlot = (t.curtSlot + 1) % len(t.slots)
|
t.curtSlot = (t.curtSlot + 1) % t.slotLen
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TimeWheel) execute(l *list.List) {
|
func (t *TimeWheel) execute(l *list.List) {
|
||||||
@ -156,7 +163,16 @@ func (t *TimeWheel) addTask(task *taskSlice) {
|
|||||||
t.removeTask(task.id)
|
t.removeTask(task.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
list := t.slots[task.pos]
|
var list *list.List
|
||||||
|
if task.tiny {
|
||||||
|
task.cycle = (task.pos - t.curtSlot + t.slotLen) % t.slotLen
|
||||||
|
task.rawCycle = task.cycle
|
||||||
|
|
||||||
|
list = t.tinyWheel
|
||||||
|
} else {
|
||||||
|
list = t.slots[task.pos]
|
||||||
|
}
|
||||||
|
|
||||||
etask := list.PushBack(task)
|
etask := list.PushBack(task)
|
||||||
t.taskMap[task.id] = etask
|
t.taskMap[task.id] = etask
|
||||||
}
|
}
|
||||||
@ -170,5 +186,10 @@ func (t *TimeWheel) removeTask(key string) {
|
|||||||
delete(t.taskMap, key)
|
delete(t.taskMap, key)
|
||||||
|
|
||||||
task := etask.Value.(*taskSlice)
|
task := etask.Value.(*taskSlice)
|
||||||
_ = t.slots[task.pos].Remove(etask)
|
|
||||||
|
if task.tiny {
|
||||||
|
_ = t.tinyWheel.Remove(etask)
|
||||||
|
} else {
|
||||||
|
_ = t.slots[task.pos].Remove(etask)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -14,13 +14,13 @@ func Test_timeWheel(t *testing.T) {
|
|||||||
t.Errorf("test2, %v", time.Now())
|
t.Errorf("test2, %v", time.Now())
|
||||||
timeWheel.AddTask("test1", TimeTypeLoop, func() {
|
timeWheel.AddTask("test1", TimeTypeLoop, func() {
|
||||||
t.Errorf("test1, %v", time.Now())
|
t.Errorf("test1, %v", time.Now())
|
||||||
}, time.Now().Add(time.Second))
|
}, time.Millisecond*300)
|
||||||
timeWheel.AddTask("test2", TimeTypeLoop, func() {
|
timeWheel.AddTask("test2", TimeTypeLoop, func() {
|
||||||
t.Errorf("test2, %v", time.Now())
|
t.Errorf("test2, %v", time.Now())
|
||||||
}, time.Now().Add(5*time.Second))
|
}, time.Second)
|
||||||
timeWheel.AddTask("test2", TimerTypeOnce, func() {
|
timeWheel.AddTask("test2", TimerTypeOnce, func() {
|
||||||
t.Errorf("test2, %v", time.Now())
|
t.Errorf("test2, %v", time.Now())
|
||||||
}, time.Now().Add(3*time.Second))
|
}, time.Second)
|
||||||
|
|
||||||
<-time.After(6 * time.Second)
|
<-time.After(6 * time.Second)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user