diff --git a/go.mod b/go.mod index 4c7220e..914946e 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module dashboard go 1.24.3 require ( - git.zhangshuocauc.cn/redhat/timewheel v1.0.1 + git.zhangshuocauc.cn/redhat/timewheel v1.0.3 github.com/bwmarrin/snowflake v0.3.0 github.com/creack/pty v1.1.24 github.com/fsnotify/fsnotify v1.9.0 @@ -15,7 +15,6 @@ require ( github.com/gorilla/websocket v1.5.3 github.com/jmoiron/sqlx v1.4.0 github.com/juju/ratelimit v1.0.2 - github.com/looplab/fsm v1.0.3 github.com/mattn/go-sqlite3 v1.14.28 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/shirou/gopsutil/v4 v4.25.4 diff --git a/go.sum b/go.sum index 7e5a13d..8c3776c 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= -git.zhangshuocauc.cn/redhat/timewheel v1.0.1 h1:llUaze4uk7yo5p/VsjbmYl0daWIvCEonrqCQnTpC6dw= -git.zhangshuocauc.cn/redhat/timewheel v1.0.1/go.mod h1:PNpKCPvSb0ynhx3k2oiigYEcKKq8G/d4P9Fh4VFZiac= +git.zhangshuocauc.cn/redhat/timewheel v1.0.3 h1:FOs+pm+80ZK6WmDV5ekzrGMOQf+kih8XkqZCqwlpRsI= +git.zhangshuocauc.cn/redhat/timewheel v1.0.3/go.mod h1:PNpKCPvSb0ynhx3k2oiigYEcKKq8G/d4P9Fh4VFZiac= github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg= github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0= @@ -77,8 +77,6 @@ github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/looplab/fsm v1.0.3 h1:qtxBsa2onOs0qFOtkqwf5zE0uP0+Te+wlIvXctPKpcw= -github.com/looplab/fsm v1.0.3/go.mod h1:PmD3fFvQEIsjMEfvZdrCDZ6y8VwKTwWNjlpEr6IKPO4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= diff --git a/pkg/fsm/event.go b/pkg/fsm/event.go deleted file mode 100644 index c63067e..0000000 --- a/pkg/fsm/event.go +++ /dev/null @@ -1,24 +0,0 @@ -package fsm - -// 定义任务状态 -const ( - StatePending = "pending" // 待派发 - StateAssigned = "assigned" // 已派发 - StateAccepted = "accepted" // 已接单 - StateSubmitted = "submitted" // 已提交 - StateApproved = "approved" // 已审核 - StateRejected = "rejected" // 已驳回 - StateSettled = "settled" // 已结算 - StateCanceled = "canceled" // 已取消 -) - -// 定义任务事件 -const ( - EventAssign = "assign" // 派发任务 - EventAccept = "accept" // 接受任务 - EventSubmit = "submit" // 提交任务 - EventApprove = "approve" // 审核通过 - EventReject = "reject" // 审核驳回 - EventSettle = "settle" // 结算任务 - EventCancel = "cancel" // 取消任务 -) diff --git a/pkg/fsm/fsm.go b/pkg/fsm/fsm.go index 6cfdb70..59832e0 100644 --- a/pkg/fsm/fsm.go +++ b/pkg/fsm/fsm.go @@ -2,87 +2,283 @@ package fsm import ( "context" + "errors" "fmt" + "log" + "sync" + "time" - "github.com/looplab/fsm" + "git.zhangshuocauc.cn/redhat/timewheel" ) -// Task 任务结构体 -type Task struct { - ID string - FSM *fsm.FSM - UserID string // 示例:记录操作者 +type EventName string + +func (e EventName) Error() string { + return string(e) } -// NewTask 创建新任务 -func NewTask(id string) *Task { - task := &Task{ - ID: id, +const ( + EventEntry EventName = "EventEntry" + EventExit EventName = "EventExit" + EventTimeOut EventName = "EventTimeOut" + + EventOK EventName = "EventOK" + EventNoProc EventName = "EventNoProc" +) + +var internalEvents = Events{ + EventEntry, + EventExit, + EventTimeOut, + EventOK, + EventNoProc, +} + +type StateFunc func(context.Context, *Event) error + +type StateRule struct { + Name string + Parent string + InitState string + Processor StateFunc + Dst []string // Define rules that allow conversion, allowing null values +} + +type Event struct { + FSM *FSM + Event EventName + Src string + Dst string + Args []interface{} +} + +type FSM struct { + eventMu sync.Mutex + log *log.Logger + name string + currentState *StateRule + eventNames Events + fsmStates StateRules + timer *timewheel.TimeWheel // Use a time wheel timer with a default slot count of 10 and a cycle time of 100ms +} + +type StateRules []*StateRule + +func (s StateRules) findByName(name string) *StateRule { + if name == "" { + return nil } - // 创建状态机 - task.FSM = fsm.NewFSM( - StatePending, // 初始状态 - fsm.Events{ // 定义状态转换规则 - {Name: EventAssign, Src: []string{StatePending}, Dst: StateAssigned}, - {Name: EventAccept, Src: []string{StateAssigned}, Dst: StateAccepted}, - {Name: EventSubmit, Src: []string{StateAccepted}, Dst: StateSubmitted}, - {Name: EventApprove, Src: []string{StateSubmitted}, Dst: StateApproved}, - {Name: EventReject, Src: []string{StateSubmitted}, Dst: StateRejected}, - {Name: EventSettle, Src: []string{StateApproved}, Dst: StateSettled}, - // Cancel 事件可以从 Pending 或 Assigned 状态触发 - {Name: EventCancel, Src: []string{StatePending, StateAssigned}, Dst: StateCanceled}, - }, - fsm.Callbacks{ // 定义回调函数 - // 通用回调:每次触发事件前执行(通过 e.Cancel() 可以组织内存中实际状态变化) - "before_event": func(_ context.Context, e *fsm.Event) { - if e.Event == EventCancel { - e.Cancel(fmt.Errorf("failed to handle event %s", e.Event)) - return + for _, sub := range s { + if sub.Name == name { + return sub + } + } + + return nil +} + +func (s StateRules) checkRule() bool { + for _, state := range s { + for _, des := range state.Dst { + if des != "" && s.findByName(des) == nil { + return false + } + } + + if state.Parent != "" && s.findByName(state.Parent) == nil { + return false + } + + if state.InitState != "" && s.findByName(state.InitState) == nil { + return false + } + } + + return true +} + +type Events []EventName + +func (e Events) findByName(name EventName) EventName { + for _, sub := range e { + if sub == name { + return sub + } + } + + return "" +} + +func NewFsm(name string, initState string, events Events, states StateRules, logs ...*log.Logger) (*FSM, error) { + if len(events) == 0 || len(states) == 0 { + return nil, fmt.Errorf("initial state cannot be nil") + } + + // Verify if all conversion rules are correct + if !states.checkRule() { + return nil, fmt.Errorf("state rules error") + } + + fsm := &FSM{ + name: name, + currentState: states.findByName(initState), + eventNames: events, + fsmStates: states, + timer: timewheel.New(10, time.Millisecond*100), + } + + fsm.eventNames = append(fsm.eventNames, internalEvents...) + if len(logs) == 1 { + fsm.log = logs[0] + } + if fsm.log == nil { + fsm.log = log.Default() + } + + // Execute ENTRY event for initial state and its sub-states + for tempState := fsm.currentState; tempState != nil; tempState = fsm.fsmStates.findByName(tempState.InitState) { + fsm.currentState = tempState + if err := fsm.ExecuteEvent(context.TODO(), EventEntry, nil); err != nil { + return nil, err + } + } + + return fsm, nil +} + +func (fsm *FSM) StartEventTimer(ctx context.Context, typeT timewheel.TimeMode, cycle time.Duration, arg interface{}) { + oldState := fsm.currentState.Name + fsm.log.Printf("Start event timer state: %s\n", oldState) + fsm.timer.AddTask(fsm.currentState.Name, typeT, func() { + // Verify whether the current state has changed + // If there is a change, timeout events cannot be executed anymore and the timer for the status needs to be deleted + if fsm.currentState.Name != oldState { + fsm.log.Printf("Event timer but state alrady change (%s -> %s)\n", oldState, fsm.currentState.Name) + fsm.timer.RemoveTask(oldState) + + return + } + fsm.ExecuteEvent(ctx, EventTimeOut, arg) + }, cycle) +} + +func (fsm *FSM) StopEventTimer() { + fsm.timer.RemoveTask(fsm.currentState.Name) +} + +func (fsm *FSM) StopTimer() { + fsm.timer.Stop() +} + +// Because the timer timeout event is triggered by other goroutines, the execution of the event requires a lock +func (fsm *FSM) ExecuteEvent(ctx context.Context, event EventName, arg ...interface{}) error { + fsm.eventMu.Lock() + defer fsm.eventMu.Unlock() + + if fsm.eventNames.findByName(event) == "" { + return fmt.Errorf("new state cannot be nil") + } + + fsm.log.Printf("%s %-23s STATE:%-23s EVENT:%s\n", time.Now().Format(".999"), fsm.name, fsm.currentState.Name, event) + + tmpState := fsm.currentState + if tmpState == nil { + return fmt.Errorf("current state is nil") + } + + if tmpState.Processor != nil { + e := &Event{FSM: fsm, Event: event, Src: tmpState.Name, Dst: tmpState.Name, Args: arg} + result := tmpState.Processor(ctx, e) + + for errors.Is(result, EventNoProc) && fsm.fsmStates.findByName(tmpState.Parent) != nil { + tmpState = fsm.fsmStates.findByName(tmpState.Parent) + result = tmpState.Processor(ctx, e) + } + + if result != nil && !errors.Is(result, EventNoProc) && !errors.Is(result, EventOK) { + return result + } + } + + return nil +} + +func (fsm *FSM) StateChange(ctx context.Context, newState string, arg interface{}) error { + if newState == "" || fsm.fsmStates.findByName(newState) == nil { + return fmt.Errorf("new state cannot be nil") + } + + // If there are defined rules, they need to be checked. If they are empty, it means that any transition between states is allowed + if !fsm.checkDstRule(newState) { + return fmt.Errorf("the new state is not in dst rule") + } + + for tempState := fsm.currentState; tempState != nil; tempState = fsm.fsmStates.findByName(tempState.Parent) { + fsm.currentState = tempState + if err := fsm.ExecuteEvent(ctx, EventExit, arg); err != nil { + return err + } + fsm.StopEventTimer() + + if newState == tempState.Name { + if err := fsm.ExecuteEvent(ctx, EventEntry, arg); err != nil { + return err + } + break + } + + if found, ok := fsm.findState(fsm.fsmStates.findByName(newState), tempState); ok { + for end := len(found) - 1; end >= 0; end-- { + fsm.currentState = found[end] + if err := fsm.ExecuteEvent(ctx, EventEntry, arg); err != nil { + return err } - fmt.Printf("[Event]Task %s event %s triggered\n", task.ID, e.Event) - }, - // 通用回调:每次进入新状态时触发 - "enter_state": func(_ context.Context, e *fsm.Event) { - fmt.Printf("[State] Task %s state changed from %s to %s\n", task.ID, e.Src, e.Dst) - // 在这里可以添加通用逻辑,如更新数据库状态 - }, - // 特定状态回调:进入 Assigned 状态后执行 - StateAssigned: func(_ context.Context, e *fsm.Event) { - fmt.Printf("[State] Task %s assigned to user %s\n", task.ID, task.UserID) - // 在这里可以发送通知给被分配者 - }, - }, - ) + } + break + } + } - return task + // Enter sub-states of the new state + for tempState := fsm.fsmStates.findByName(newState).InitState; fsm.fsmStates.findByName(tempState) != nil; tempState = fsm.fsmStates.findByName(tempState).InitState { + fsm.currentState = fsm.fsmStates.findByName(tempState) + if err := fsm.ExecuteEvent(ctx, EventEntry, arg); err != nil { + return err + } + } + + return nil } -// 任务状态流转方法 -func (t *Task) Assign(ctx context.Context, userID string) error { - t.UserID = userID // 可以在事件触发前设置相关信息 - return t.FSM.Event(ctx, EventAssign) // 触发 Assign 事件 +func (fsm *FSM) findState(targetState, findState *StateRule) ([]*StateRule, bool) { + if targetState == nil || findState == nil { + return nil, false + } + + var fsmStateList []*StateRule + + for tempState := targetState; tempState != nil; tempState = fsm.fsmStates.findByName(tempState.Parent) { + fsmStateList = append(fsmStateList, tempState) + if findState.Parent == tempState.Parent { + return fsmStateList, true + } + } + + return fsmStateList, false } -func (t *Task) Accept(ctx context.Context) error { - return t.FSM.Event(ctx, EventAccept) +func (fsm *FSM) checkDstRule(newState string) bool { + curState := fsm.currentState + + if len(curState.Dst) > 0 { + for _, dst := range curState.Dst { + if dst == "" || dst == newState { + return true + } + } + + return false + } + + return true } - -func (t *Task) Submit(ctx context.Context) error { - return t.FSM.Event(ctx, EventSubmit) -} - -func (t *Task) Approve(ctx context.Context) error { - return t.FSM.Event(ctx, EventApprove) -} - -func (t *Task) Settle(ctx context.Context) error { - return t.FSM.Event(ctx, EventSettle) -} - -func (t *Task) Cancel(ctx context.Context) error { - return t.FSM.Event(ctx, EventCancel) -} - -// ... 其他事件触发方法 ... - diff --git a/pkg/fsm/fsm_test.go b/pkg/fsm/fsm_test.go index 684e1ad..2c3e293 100644 --- a/pkg/fsm/fsm_test.go +++ b/pkg/fsm/fsm_test.go @@ -3,65 +3,93 @@ package fsm import ( "context" "fmt" + "log" "testing" + "time" - "github.com/looplab/fsm" + "git.zhangshuocauc.cn/redhat/timewheel" ) +func state1(ctx context.Context, e *Event) error { + switch e.Event { + case EventEntry: + log.Println("state1", *e) + case EventExit: + log.Println("state1", *e) + default: + log.Println("state1", *e) + return EventNoProc + } + + return EventOK +} + +func state2(ctx context.Context, e *Event) error { + switch e.Event { + case EventEntry: + log.Println("state2", *e) + case EventExit: + log.Println("state2", *e) + default: + log.Println("state2", *e) + return EventNoProc + } + + return EventOK +} + +func state3(ctx context.Context, e *Event) error { + switch e.Event { + case EventEntry: + log.Println("state3", *e) + case EventExit: + log.Println("state3", *e) + case "assign": + log.Println("state3", *e) + default: + log.Println("state3", *e) + return EventNoProc + } + + return EventOK +} + +var events = Events{ + "assign", + "accept", +} + func Test_fsm(t *testing.T) { + fsm, err := NewFsm("MyFsm", "state1", events, []*StateRule{ + {Name: "state1", Parent: "", InitState: "state2", Processor: state1, Dst: []string{}}, + {Name: "state2", Parent: "state1", InitState: "", Processor: state2, Dst: []string{}}, + {Name: "state3", Parent: "state1", InitState: "", Processor: state3, Dst: []string{}}, + {}, + }) + + if err != nil { + fmt.Println(err) + } + + defer fsm.StopTimer() + ctx := context.Background() - // 创建新任务 - fmt.Println("----------------task1----------------") - fmt.Println("-------------验证状态流转--------------") - task := NewTask("task-001") + fmt.Println(fsm) - // 任务状态流转 - fmt.Println("Current state:", task.FSM.Current()) // pending + fsm.ExecuteEvent(ctx, EventName("assign"), nil) - _ = task.Assign(ctx, "user-001") - fmt.Println("Current state:", task.FSM.Current()) // assigned + fmt.Println(fsm.StateChange(ctx, "state3", nil)) - _ = task.Accept(ctx) - fmt.Println("Current state:", task.FSM.Current()) // accepted + fsm.ExecuteEvent(ctx, EventName("accept"), nil) - _ = task.Submit(ctx) - fmt.Println("Current state:", task.FSM.Current()) // submitted + fsm.ExecuteEvent(ctx, EventName("accept1"), nil) - _ = task.Approve(ctx) - fmt.Println("Current state:", task.FSM.Current()) // approved + fsm.StartEventTimer(ctx, timewheel.TimeTypeLoop, time.Millisecond*100, nil) - _ = task.Settle(ctx) - fmt.Println("Current state:", task.FSM.Current()) // settled + <-time.After(time.Millisecond*100) - // 回调报错情况 - fmt.Println("----------------task2----------------") - fmt.Println("-------------验证回调报错情况-------------") - task2 := NewTask("task-002") - fmt.Println("Current state:", task2.FSM.Current()) // pending + fmt.Println(fsm.StateChange(ctx, "state2", nil)) - _ = task2.Assign(ctx, "user-002") - fmt.Println("Current state:", task2.FSM.Current()) // assigned - - e := task2.Cancel(ctx) - fmt.Println("Error:", e) - fmt.Println("Current state:", task2.FSM.Current()) // assigned(cancel failed) -} - -func TestMermaidOutput(t *testing.T) { - task := NewTask("task-001") - - gotDiagram, err := fsm.VisualizeForMermaidWithGraphType(task.FSM, fsm.StateDiagram) - if err != nil { - t.Errorf("got error for visualizing with type MERMAID: %s", err) - } - - gotFlowchart, err := fsm.VisualizeForMermaidWithGraphType(task.FSM, fsm.FlowChart) - if err != nil { - t.Errorf("got error for visualizing with type MERMAID: %s", err) - } - - fmt.Println(gotDiagram) - fmt.Println("--------------------------------") - fmt.Println(gotFlowchart) + <-time.After(3 * time.Second) } diff --git a/pkg/myfsm/fsm.go b/pkg/myfsm/fsm.go deleted file mode 100644 index 4347ea2..0000000 --- a/pkg/myfsm/fsm.go +++ /dev/null @@ -1,176 +0,0 @@ -package fsm - -import ( - "fmt" - "log" - "time" - - "git.zhangshuocauc.cn/redhat/timewheel" -) - -type EventName string - -const ( - EventEntry EventName = "EventEntry" - EventExit EventName = "EventExit" - EventTimeOut EventName = "EventTimeOut" - - EventOK EventName = "EventOK" - EventNoProc EventName = "EventNoProc" -) - -type Callback func(fsm *FSM, event EventName, arg interface{}) EventName - -type Callbacks map[string]Callback - -type FSMState struct { - name string - parent string - initState string - processor Callback -} - -type FSM struct { - name string - currentState *FSMState - eventNames *[]string - fsmStats map[string]*FSMState - timer *timewheel.TimeWheel -} - -type Event struct { - FSM *FSM - Event string - Src string - Dst string - Err error - Args []interface{} - canceled bool - async bool - cancelFunc func() -} - -func NewFSMState(name string, parent, initState string, processor Callback) *FSMState { - return &FSMState{ - name: name, - parent: parent, - initState: initState, - processor: processor, - } -} - -func NewFSM(name string, eventNames *[]string, fsmStats ...*FSMState) (*FSM, error) { - if len(fsmStats) == 0 { - return nil, fmt.Errorf("initial state cannot be nil") - } - fsm := &FSM{ - name: name, - currentState: fsmStats[0], - eventNames: eventNames, - fsmStats: make(map[string]*FSMState), - timer: timewheel.New(1, time.Millisecond*100), - } - - for index := range fsmStats { - fsm.fsmStats[fsmStats[index].name] = fsmStats[index] - } - - // Execute ENTRY event for initial state and its sub-states - for tempState := fsm.currentState; tempState != nil; tempState = fsm.fsmStats[tempState.initState] { - fsm.currentState = tempState - if err := fsm.ExecuteEvent(EventEntry, nil); err != nil { - return nil, err - } - } - - return fsm, nil -} - -func (fsm *FSM) StartTimer(typeT timewheel.TimeMode, cycle time.Duration, arg interface{}) { - fsm.timer.AddTask(fsm.currentState.name, typeT, func() { - fsm.ExecuteEvent(EventTimeOut, arg) - }, time.Now().Add(cycle)) -} - -func (fsm *FSM) StopTimer() { - fsm.timer.RemoveTask(fsm.currentState.name) -} - -func (fsm *FSM) ExecuteEvent(event EventName, arg interface{}) error { - if fsm == nil { - return fmt.Errorf("FSM is nil") - } - - log.Printf("%d %-23s STATE:%-23s EVENT:%s\n", time.Now().UnixMilli(), fsm.name, fsm.currentState.name, event) - - tmpState := fsm.currentState - if tmpState == nil { - return fmt.Errorf("current state is nil") - } - - result := tmpState.processor(fsm, event, arg) - for result == EventNoProc && fsm.fsmStats[tmpState.parent] != nil { - tmpState = fsm.fsmStats[tmpState.parent] - result = tmpState.processor(fsm, event, arg) - } - - return nil -} - -func (fsm *FSM) ChangeState(newState string, arg interface{}) error { - if newState == "" { - return fmt.Errorf("new state cannot be nil") - } - - curState := fsm.currentState - - for tempState := curState; tempState != nil; tempState = fsm.fsmStats[tempState.parent] { - fsm.currentState = tempState - if err := fsm.ExecuteEvent(EventExit, nil); err != nil { - return err - } - fsm.StopTimer() - - if newState == tempState.name { - if err := fsm.ExecuteEvent(EventEntry, arg); err != nil { - return err - } - break - } - - if found, ok := fsm.findState(fsm.fsmStats[newState], tempState); ok { - for end := len(found) - 1; end >= 0; end-- { - fsm.currentState = found[end] - if err := fsm.ExecuteEvent(EventEntry, arg); err != nil { - return err - } - } - break - } - } - - // Enter sub-states of the new state - for tempState := fsm.fsmStats[newState].initState; fsm.fsmStats[tempState] != nil; tempState = fsm.fsmStats[tempState].initState { - fsm.currentState = fsm.fsmStats[tempState] - if err := fsm.ExecuteEvent(EventEntry, arg); err != nil { - return err - } - } - - return nil -} - -func (fsm *FSM) findState(targetState, findState *FSMState) ([]*FSMState, bool) { - var fsmStateList []*FSMState - - for tempState := targetState; tempState != nil; tempState = fsm.fsmStats[tempState.parent] { - - fsmStateList = append(fsmStateList, tempState) - - if findState.parent == tempState.parent { - return fsmStateList, true - } - } - - return fsmStateList, false -} diff --git a/pkg/myfsm/fsm_test.go b/pkg/myfsm/fsm_test.go deleted file mode 100644 index 4d410d9..0000000 --- a/pkg/myfsm/fsm_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package fsm - -import ( - "log" - "testing" - "time" - - "git.zhangshuocauc.cn/redhat/timewheel" -) - -type myFsm struct { - fsm *FSM -} - -func state1(fsm *FSM, event EventName, arg interface{}) EventName { - switch event { - case EventEntry: - log.Println("state1", event) - case EventExit: - log.Println("state1", event) - default: - log.Println("state1", event) - return EventNoProc - } - - return EventOK -} - -func state2(fsm *FSM, event EventName, arg interface{}) EventName { - switch event { - case EventEntry: - log.Println("state2", event) - case EventExit: - log.Println("state2", event) - case EventTimeOut: - log.Println("state2", event) - default: - log.Println("state2", event) - return EventNoProc - } - - return EventOK -} - -func state3(fsm *FSM, event EventName, arg interface{}) EventName { - switch event { - case EventEntry: - log.Println("state3", event) - case EventExit: - log.Println("state3", event) - case "test event": - log.Println("state3", event) - case EventTimeOut: - log.Println("state3", event) - default: - log.Println("state3", event) - return EventNoProc - } - - return EventOK -} - -func newMyFsm() *myFsm { - state1 := NewFSMState("State1", "", "State2", state1) - state2 := NewFSMState("State2", "State1", "", state2) - state3 := NewFSMState("State3", "State1", "", state3) - - fsm, err := NewFSM("MyFSM", &[]string{string(EventEntry), string(EventExit)}, state1, state2, state3) - if err != nil { - log.Fatal(err) - } - - return &myFsm{fsm: fsm} -} - -func Test_fsm(t *testing.T) { - myfsm := newMyFsm() - - if err := myfsm.fsm.ChangeState("State3", nil); err != nil { - log.Fatal(err) - } - - myfsm.fsm.ExecuteEvent("test event", nil) - - myfsm.fsm.StartTimer(timewheel.TimerTypeOnce, time.Millisecond*100, nil) - - <-time.After(200 * time.Millisecond) - - myfsm.fsm.ChangeState("State2", nil) - - myfsm.fsm.StartTimer(timewheel.TimeTypeLoop, time.Second, nil) - - // Keep the program running to observeinterior - <-time.After(3 * time.Second) -} diff --git a/pkg/timewheel/timewheel.go b/pkg/timewheel/timewheel.go index 1f27d2b..b548269 100644 --- a/pkg/timewheel/timewheel.go +++ b/pkg/timewheel/timewheel.go @@ -2,8 +2,6 @@ package timewheel import ( "container/list" - "fmt" - "math" "sync" "time" ) @@ -13,7 +11,6 @@ type taskSlice struct { pos int cycle int rawCycle int - flower time.Duration mode TimeMode task func() } @@ -21,11 +18,12 @@ type taskSlice struct { type TimeWheel struct { sync.Once curtSlot int + slotLen int interval time.Duration ticker *time.Ticker slots []*list.List + tinyWheel *list.List taskMap map[string]*list.Element - tasksMap map[string][]int stopC chan struct{} addTaskC chan *taskSlice removeTaskC chan string @@ -55,7 +53,8 @@ func New(slots int, interval time.Duration) *TimeWheel { stopC: make(chan struct{}), addTaskC: make(chan *taskSlice), removeTaskC: make(chan string), - tasksMap: make(map[string][]int), + tinyWheel: list.New(), + slotLen: slots, } for i := 0; i < slots; i++ { @@ -77,7 +76,6 @@ func (t *TimeWheel) AddTask(id string, mode TimeMode, task func(), flower time.D mode: mode, rawCycle: cycle, pos: pos, - flower: flower, } } @@ -94,28 +92,12 @@ func (t *TimeWheel) Stop() { func (t *TimeWheel) getSlotPosAndCycle(flower time.Duration) (int, int) { delay := time.Until(time.Now().Add(flower)) - cycle := delay / (t.interval * time.Duration(len(t.slots))) - pos := (t.curtSlot + int(delay/t.interval)) % len(t.slots) + cycle := delay / (t.interval * time.Duration(t.slotLen)) + pos := (t.curtSlot + int(delay/t.interval)) % t.slotLen return int(cycle), pos } -func (t *TimeWheel) getCyclePos(task *taskSlice) []int { - if task.cycle == 0 && task.mode == TimeTypeLoop { - var listc []int - dur := int(math.Ceil(float64(float64(task.flower) / float64(t.interval)))) - start := task.pos - for i := 0; i < int(math.Ceil(float64(len(t.slots))/float64(dur))); i++ { - listc = append(listc, start%len(t.slots)) - start += dur - } - - return listc - } - - return nil -} - func (t *TimeWheel) run() { for { select { @@ -124,9 +106,9 @@ func (t *TimeWheel) run() { case <-t.ticker.C: t.tick() case task := <-t.addTaskC: - t.addCycelTask(task) + t.addTask(task) case key := <-t.removeTaskC: - t.removeCycelTask(key) + t.removeTask(key) } } } @@ -136,10 +118,11 @@ func (t *TimeWheel) tick() { defer t.updateCurtSlot() t.execute(list) + t.execute(t.tinyWheel) } func (t *TimeWheel) updateCurtSlot() { - t.curtSlot = (t.curtSlot + 1) % len(t.slots) + t.curtSlot = (t.curtSlot + 1) % t.slotLen } func (t *TimeWheel) execute(l *list.List) { @@ -173,52 +156,21 @@ func (t *TimeWheel) execute(l *list.List) { } } -func (t *TimeWheel) addCycelTask(task *taskSlice) { - t.removeCycelTask(task.id) - - poss := t.getCyclePos(task) - fmt.Println(poss) - if len(poss) > 0 { - t.tasksMap[task.id] = poss - for _, index := range poss { - internalTask := &taskSlice{ - id: fmt.Sprintf("%s_%d", task.id, index), - pos: index, - cycle: task.cycle, - rawCycle: task.rawCycle, - mode: task.mode, - task: task.task, - } - - t.addTask(internalTask) - } - - return - } - - t.addTask(task) -} - -func (t *TimeWheel) removeCycelTask(key string) { - poss := t.tasksMap[key] - if len(poss) > 0 { - for _, index := range poss { - id := fmt.Sprintf("%s_%d", key, index) - t.removeTask(id) - } - - return - } - - t.removeTask(key) -} - func (t *TimeWheel) addTask(task *taskSlice) { if _, ok := t.taskMap[task.id]; ok { t.removeTask(task.id) } - list := t.slots[task.pos] + var list *list.List + if task.mode == TimeTypeLoop { + task.cycle = task.cycle*t.slotLen + (task.pos-t.curtSlot+t.slotLen)%t.slotLen + task.rawCycle = task.cycle + + list = t.tinyWheel + } else { + list = t.slots[task.pos] + } + etask := list.PushBack(task) t.taskMap[task.id] = etask } @@ -232,5 +184,10 @@ func (t *TimeWheel) removeTask(key string) { delete(t.taskMap, key) task := etask.Value.(*taskSlice) - _ = t.slots[task.pos].Remove(etask) + + if task.mode == TimeTypeLoop { + _ = t.tinyWheel.Remove(etask) + } else { + _ = t.slots[task.pos].Remove(etask) + } } diff --git a/pkg/timewheel/timewheel_test.go b/pkg/timewheel/timewheel_test.go index 2fe57d6..6aba960 100644 --- a/pkg/timewheel/timewheel_test.go +++ b/pkg/timewheel/timewheel_test.go @@ -11,7 +11,8 @@ func Test_timeWheel(t *testing.T) { <-time.After(10 * time.Millisecond) - t.Errorf("test2, %v", time.Now()) + t.Errorf("start time, %v", time.Now()) + timeWheel.AddTask("test1", TimeTypeLoop, func() { t.Errorf("test1, %v", time.Now()) }, time.Millisecond*300) @@ -22,5 +23,5 @@ func Test_timeWheel(t *testing.T) { t.Errorf("test2, %v", time.Now()) }, time.Second) - <-time.After(6 * time.Second) + <-time.After(8 * time.Second) }