From f4459ebec40021ef397db58439b564ab73484492 Mon Sep 17 00:00:00 2001 From: redhat <2292650292@qq.com> Date: Tue, 10 Jun 2025 16:01:36 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E6=97=B6=E9=97=B4=E8=BD=AE=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E5=BE=AE=E5=AF=B9=E8=B1=A1=E5=88=97=E8=A1=A8=EF=BC=8C?= =?UTF-8?q?=E7=AE=80=E5=8C=96=E5=91=A8=E6=9C=9F=E8=AE=A1=E7=AE=97=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- timewheel.go | 37 +++++++++++++++++++++++++++++-------- timewheel_test.go | 6 +++--- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/timewheel.go b/timewheel.go index bd12644..3aaebe4 100644 --- a/timewheel.go +++ b/timewheel.go @@ -13,14 +13,17 @@ type taskSlice struct { rawCycle int mode TimeMode task func() + tiny bool } type TimeWheel struct { sync.Once curtSlot int + slotLen int interval time.Duration ticker *time.Ticker slots []*list.List + tinyWheel *list.List taskMap map[string]*list.Element stopC chan struct{} addTaskC chan *taskSlice @@ -51,6 +54,8 @@ func New(slots int, interval time.Duration) *TimeWheel { stopC: make(chan struct{}), addTaskC: make(chan *taskSlice), removeTaskC: make(chan string), + tinyWheel: list.New(), + slotLen: slots, } for i := 0; i < slots; i++ { @@ -62,7 +67,7 @@ func New(slots int, interval time.Duration) *TimeWheel { return t } -func (t *TimeWheel) AddTask(id string, mode TimeMode, task func(), flower time.Time) { +func (t *TimeWheel) AddTask(id string, mode TimeMode, task func(), flower time.Duration) { cycle, pos := t.getSlotPosAndCycle(flower) t.addTaskC <- &taskSlice{ @@ -72,6 +77,7 @@ func (t *TimeWheel) AddTask(id string, mode TimeMode, task func(), flower time.T mode: mode, rawCycle: cycle, pos: pos, + tiny: int(flower) < t.slotLen*int(t.interval), } } @@ -86,10 +92,10 @@ func (t *TimeWheel) Stop() { }) } -func (t *TimeWheel) getSlotPosAndCycle(flower time.Time) (int, int) { - delay := time.Until(flower) - cycle := delay / (t.interval * time.Duration(len(t.slots))) - pos := (t.curtSlot + int(delay/t.interval)) % len(t.slots) +func (t *TimeWheel) getSlotPosAndCycle(flower time.Duration) (int, int) { + delay := time.Until(time.Now().Add(flower)) + cycle := delay / (t.interval * time.Duration(t.slotLen)) + pos := (t.curtSlot + int(delay/t.interval)) % t.slotLen return int(cycle), pos } @@ -114,10 +120,11 @@ func (t *TimeWheel) tick() { defer t.updateCurtSlot() t.execute(list) + t.execute(t.tinyWheel) } func (t *TimeWheel) updateCurtSlot() { - t.curtSlot = (t.curtSlot + 1) % len(t.slots) + t.curtSlot = (t.curtSlot + 1) % t.slotLen } func (t *TimeWheel) execute(l *list.List) { @@ -156,7 +163,16 @@ func (t *TimeWheel) addTask(task *taskSlice) { t.removeTask(task.id) } - list := t.slots[task.pos] + var list *list.List + if task.tiny { + task.cycle = (task.pos - t.curtSlot + t.slotLen) % t.slotLen + task.rawCycle = task.cycle + + list = t.tinyWheel + } else { + list = t.slots[task.pos] + } + etask := list.PushBack(task) t.taskMap[task.id] = etask } @@ -170,5 +186,10 @@ func (t *TimeWheel) removeTask(key string) { delete(t.taskMap, key) task := etask.Value.(*taskSlice) - _ = t.slots[task.pos].Remove(etask) + + if task.tiny { + _ = t.tinyWheel.Remove(etask) + } else { + _ = t.slots[task.pos].Remove(etask) + } } diff --git a/timewheel_test.go b/timewheel_test.go index fbe4a32..2fe57d6 100644 --- a/timewheel_test.go +++ b/timewheel_test.go @@ -14,13 +14,13 @@ func Test_timeWheel(t *testing.T) { t.Errorf("test2, %v", time.Now()) timeWheel.AddTask("test1", TimeTypeLoop, func() { t.Errorf("test1, %v", time.Now()) - }, time.Now().Add(time.Second)) + }, time.Millisecond*300) timeWheel.AddTask("test2", TimeTypeLoop, func() { t.Errorf("test2, %v", time.Now()) - }, time.Now().Add(5*time.Second)) + }, time.Second) timeWheel.AddTask("test2", TimerTypeOnce, func() { t.Errorf("test2, %v", time.Now()) - }, time.Now().Add(3*time.Second)) + }, time.Second) <-time.After(6 * time.Second) }