diff --git a/go.mod b/go.mod index 7cd4594..4c7220e 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,9 @@ module dashboard -go 1.23.0 - -toolchain go1.23.9 +go 1.24.3 require ( + git.zhangshuocauc.cn/redhat/timewheel v1.0.1 github.com/bwmarrin/snowflake v0.3.0 github.com/creack/pty v1.1.24 github.com/fsnotify/fsnotify v1.9.0 diff --git a/go.sum b/go.sum index 8a5e3d7..7e5a13d 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +git.zhangshuocauc.cn/redhat/timewheel v1.0.1 h1:llUaze4uk7yo5p/VsjbmYl0daWIvCEonrqCQnTpC6dw= +git.zhangshuocauc.cn/redhat/timewheel v1.0.1/go.mod h1:PNpKCPvSb0ynhx3k2oiigYEcKKq8G/d4P9Fh4VFZiac= github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg= github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0= diff --git a/pkg/myfsm/fsm.go b/pkg/myfsm/fsm.go index 6d980ac..4347ea2 100644 --- a/pkg/myfsm/fsm.go +++ b/pkg/myfsm/fsm.go @@ -4,13 +4,16 @@ import ( "fmt" "log" "time" + + "git.zhangshuocauc.cn/redhat/timewheel" ) type EventName string const ( - EventEntry EventName = "EventEntry" - EventExit EventName = "EventExit" + EventEntry EventName = "EventEntry" + EventExit EventName = "EventExit" + EventTimeOut EventName = "EventTimeOut" EventOK EventName = "EventOK" EventNoProc EventName = "EventNoProc" @@ -32,6 +35,7 @@ type FSM struct { currentState *FSMState eventNames *[]string fsmStats map[string]*FSMState + timer *timewheel.TimeWheel } type Event struct { @@ -64,6 +68,7 @@ func NewFSM(name string, eventNames *[]string, fsmStats ...*FSMState) (*FSM, err currentState: fsmStats[0], eventNames: eventNames, fsmStats: make(map[string]*FSMState), + timer: timewheel.New(1, time.Millisecond*100), } for index := range fsmStats { @@ -81,6 +86,16 @@ func NewFSM(name string, eventNames *[]string, fsmStats ...*FSMState) (*FSM, err return fsm, nil } +func (fsm *FSM) StartTimer(typeT timewheel.TimeMode, cycle time.Duration, arg interface{}) { + fsm.timer.AddTask(fsm.currentState.name, typeT, func() { + fsm.ExecuteEvent(EventTimeOut, arg) + }, time.Now().Add(cycle)) +} + +func (fsm *FSM) StopTimer() { + fsm.timer.RemoveTask(fsm.currentState.name) +} + func (fsm *FSM) ExecuteEvent(event EventName, arg interface{}) error { if fsm == nil { return fmt.Errorf("FSM is nil") @@ -114,6 +129,7 @@ func (fsm *FSM) ChangeState(newState string, arg interface{}) error { if err := fsm.ExecuteEvent(EventExit, nil); err != nil { return err } + fsm.StopTimer() if newState == tempState.name { if err := fsm.ExecuteEvent(EventEntry, arg); err != nil { diff --git a/pkg/myfsm/fsm_test.go b/pkg/myfsm/fsm_test.go index 4ca568e..4d410d9 100644 --- a/pkg/myfsm/fsm_test.go +++ b/pkg/myfsm/fsm_test.go @@ -4,6 +4,8 @@ import ( "log" "testing" "time" + + "git.zhangshuocauc.cn/redhat/timewheel" ) type myFsm struct { @@ -30,6 +32,8 @@ func state2(fsm *FSM, event EventName, arg interface{}) EventName { log.Println("state2", event) case EventExit: log.Println("state2", event) + case EventTimeOut: + log.Println("state2", event) default: log.Println("state2", event) return EventNoProc @@ -46,6 +50,8 @@ func state3(fsm *FSM, event EventName, arg interface{}) EventName { log.Println("state3", event) case "test event": log.Println("state3", event) + case EventTimeOut: + log.Println("state3", event) default: log.Println("state3", event) return EventNoProc @@ -59,7 +65,7 @@ func newMyFsm() *myFsm { state2 := NewFSMState("State2", "State1", "", state2) state3 := NewFSMState("State3", "State1", "", state3) - fsm, err := NewFSM("MyFSM", &[]string{string(EventEntry), string(EventExit)}, state2, state1, state3) + fsm, err := NewFSM("MyFSM", &[]string{string(EventEntry), string(EventExit)}, state1, state2, state3) if err != nil { log.Fatal(err) } @@ -76,12 +82,14 @@ func Test_fsm(t *testing.T) { myfsm.fsm.ExecuteEvent("test event", nil) - // Start a timer (example) - // timerHandle := &ReactorTimerHandle{} - // if err := fsm.StartReactorTimer(timerHandle, 0, 1000, EventEntry, nil); err != nil { - // log.Fatal(err) - // } + myfsm.fsm.StartTimer(timewheel.TimerTypeOnce, time.Millisecond*100, nil) + + <-time.After(200 * time.Millisecond) + + myfsm.fsm.ChangeState("State2", nil) + + myfsm.fsm.StartTimer(timewheel.TimeTypeLoop, time.Second, nil) // Keep the program running to observeinterior - time.Sleep(2 * time.Second) + <-time.After(3 * time.Second) } diff --git a/pkg/timewheel/timewheel.go b/pkg/timewheel/timewheel.go new file mode 100644 index 0000000..1f27d2b --- /dev/null +++ b/pkg/timewheel/timewheel.go @@ -0,0 +1,236 @@ +package timewheel + +import ( + "container/list" + "fmt" + "math" + "sync" + "time" +) + +type taskSlice struct { + id string + pos int + cycle int + rawCycle int + flower time.Duration + mode TimeMode + task func() +} + +type TimeWheel struct { + sync.Once + curtSlot int + interval time.Duration + ticker *time.Ticker + slots []*list.List + taskMap map[string]*list.Element + tasksMap map[string][]int + stopC chan struct{} + addTaskC chan *taskSlice + removeTaskC chan string +} + +type TimeMode int + +const ( + TimerTypeOnce TimeMode = iota + TimeTypeLoop +) + +func New(slots int, interval time.Duration) *TimeWheel { + if slots <= 0 { + slots = 10 + } + + if interval <= 0 { + interval = time.Second + } + + t := &TimeWheel{ + interval: interval, + ticker: time.NewTicker(interval), + slots: make([]*list.List, 0, slots), + taskMap: make(map[string]*list.Element), + stopC: make(chan struct{}), + addTaskC: make(chan *taskSlice), + removeTaskC: make(chan string), + tasksMap: make(map[string][]int), + } + + for i := 0; i < slots; i++ { + t.slots = append(t.slots, list.New()) + } + + go t.run() + + return t +} + +func (t *TimeWheel) AddTask(id string, mode TimeMode, task func(), flower time.Duration) { + cycle, pos := t.getSlotPosAndCycle(flower) + + t.addTaskC <- &taskSlice{ + id: id, + task: task, + cycle: cycle, + mode: mode, + rawCycle: cycle, + pos: pos, + flower: flower, + } +} + +func (t *TimeWheel) RemoveTask(key string) { + t.removeTaskC <- key +} + +func (t *TimeWheel) Stop() { + t.Do(func() { + t.ticker.Stop() + close(t.stopC) + }) +} + +func (t *TimeWheel) getSlotPosAndCycle(flower time.Duration) (int, int) { + delay := time.Until(time.Now().Add(flower)) + cycle := delay / (t.interval * time.Duration(len(t.slots))) + pos := (t.curtSlot + int(delay/t.interval)) % len(t.slots) + + return int(cycle), pos +} + +func (t *TimeWheel) getCyclePos(task *taskSlice) []int { + if task.cycle == 0 && task.mode == TimeTypeLoop { + var listc []int + dur := int(math.Ceil(float64(float64(task.flower) / float64(t.interval)))) + start := task.pos + for i := 0; i < int(math.Ceil(float64(len(t.slots))/float64(dur))); i++ { + listc = append(listc, start%len(t.slots)) + start += dur + } + + return listc + } + + return nil +} + +func (t *TimeWheel) run() { + for { + select { + case <-t.stopC: + return + case <-t.ticker.C: + t.tick() + case task := <-t.addTaskC: + t.addCycelTask(task) + case key := <-t.removeTaskC: + t.removeCycelTask(key) + } + } +} + +func (t *TimeWheel) tick() { + list := t.slots[t.curtSlot] + defer t.updateCurtSlot() + + t.execute(list) +} + +func (t *TimeWheel) updateCurtSlot() { + t.curtSlot = (t.curtSlot + 1) % len(t.slots) +} + +func (t *TimeWheel) execute(l *list.List) { + for e := l.Front(); e != nil; { + event := e.Value.(*taskSlice) + if event.cycle > 0 { + event.cycle-- + e = e.Next() + continue + } + + go func() { + defer func() { + if err := recover(); err != nil { + + } + }() + + event.task() + }() + + if event.mode == TimerTypeOnce { + next := e.Next() + l.Remove(e) + delete(t.taskMap, event.id) + e = next + } else { + e = e.Next() + event.cycle = event.rawCycle + } + } +} + +func (t *TimeWheel) addCycelTask(task *taskSlice) { + t.removeCycelTask(task.id) + + poss := t.getCyclePos(task) + fmt.Println(poss) + if len(poss) > 0 { + t.tasksMap[task.id] = poss + for _, index := range poss { + internalTask := &taskSlice{ + id: fmt.Sprintf("%s_%d", task.id, index), + pos: index, + cycle: task.cycle, + rawCycle: task.rawCycle, + mode: task.mode, + task: task.task, + } + + t.addTask(internalTask) + } + + return + } + + t.addTask(task) +} + +func (t *TimeWheel) removeCycelTask(key string) { + poss := t.tasksMap[key] + if len(poss) > 0 { + for _, index := range poss { + id := fmt.Sprintf("%s_%d", key, index) + t.removeTask(id) + } + + return + } + + t.removeTask(key) +} + +func (t *TimeWheel) addTask(task *taskSlice) { + if _, ok := t.taskMap[task.id]; ok { + t.removeTask(task.id) + } + + list := t.slots[task.pos] + etask := list.PushBack(task) + t.taskMap[task.id] = etask +} + +func (t *TimeWheel) removeTask(key string) { + etask, ok := t.taskMap[key] + if !ok { + return + } + + delete(t.taskMap, key) + + task := etask.Value.(*taskSlice) + _ = t.slots[task.pos].Remove(etask) +} diff --git a/pkg/timewheel/timewheel_test.go b/pkg/timewheel/timewheel_test.go new file mode 100644 index 0000000..2fe57d6 --- /dev/null +++ b/pkg/timewheel/timewheel_test.go @@ -0,0 +1,26 @@ +package timewheel + +import ( + "testing" + "time" +) + +func Test_timeWheel(t *testing.T) { + timeWheel := New(10, 100*time.Millisecond) + defer timeWheel.Stop() + + <-time.After(10 * time.Millisecond) + + t.Errorf("test2, %v", time.Now()) + timeWheel.AddTask("test1", TimeTypeLoop, func() { + t.Errorf("test1, %v", time.Now()) + }, time.Millisecond*300) + timeWheel.AddTask("test2", TimeTypeLoop, func() { + t.Errorf("test2, %v", time.Now()) + }, time.Second) + timeWheel.AddTask("test2", TimerTypeOnce, func() { + t.Errorf("test2, %v", time.Now()) + }, time.Second) + + <-time.After(6 * time.Second) +}