1、增加状态机模块。
This commit is contained in:
parent
ab69108550
commit
e467ba3028
3
go.mod
3
go.mod
@ -3,7 +3,7 @@ module dashboard
|
|||||||
go 1.24.3
|
go 1.24.3
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.zhangshuocauc.cn/redhat/timewheel v1.0.1
|
git.zhangshuocauc.cn/redhat/timewheel v1.0.3
|
||||||
github.com/bwmarrin/snowflake v0.3.0
|
github.com/bwmarrin/snowflake v0.3.0
|
||||||
github.com/creack/pty v1.1.24
|
github.com/creack/pty v1.1.24
|
||||||
github.com/fsnotify/fsnotify v1.9.0
|
github.com/fsnotify/fsnotify v1.9.0
|
||||||
@ -15,7 +15,6 @@ require (
|
|||||||
github.com/gorilla/websocket v1.5.3
|
github.com/gorilla/websocket v1.5.3
|
||||||
github.com/jmoiron/sqlx v1.4.0
|
github.com/jmoiron/sqlx v1.4.0
|
||||||
github.com/juju/ratelimit v1.0.2
|
github.com/juju/ratelimit v1.0.2
|
||||||
github.com/looplab/fsm v1.0.3
|
|
||||||
github.com/mattn/go-sqlite3 v1.14.28
|
github.com/mattn/go-sqlite3 v1.14.28
|
||||||
github.com/natefinch/lumberjack v2.0.0+incompatible
|
github.com/natefinch/lumberjack v2.0.0+incompatible
|
||||||
github.com/shirou/gopsutil/v4 v4.25.4
|
github.com/shirou/gopsutil/v4 v4.25.4
|
||||||
|
6
go.sum
6
go.sum
@ -1,7 +1,7 @@
|
|||||||
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
|
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
|
||||||
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
|
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
|
||||||
git.zhangshuocauc.cn/redhat/timewheel v1.0.1 h1:llUaze4uk7yo5p/VsjbmYl0daWIvCEonrqCQnTpC6dw=
|
git.zhangshuocauc.cn/redhat/timewheel v1.0.3 h1:FOs+pm+80ZK6WmDV5ekzrGMOQf+kih8XkqZCqwlpRsI=
|
||||||
git.zhangshuocauc.cn/redhat/timewheel v1.0.1/go.mod h1:PNpKCPvSb0ynhx3k2oiigYEcKKq8G/d4P9Fh4VFZiac=
|
git.zhangshuocauc.cn/redhat/timewheel v1.0.3/go.mod h1:PNpKCPvSb0ynhx3k2oiigYEcKKq8G/d4P9Fh4VFZiac=
|
||||||
github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg=
|
github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg=
|
||||||
github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
|
github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
|
||||||
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
|
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
|
||||||
@ -77,8 +77,6 @@ github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
|
|||||||
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
||||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||||
github.com/looplab/fsm v1.0.3 h1:qtxBsa2onOs0qFOtkqwf5zE0uP0+Te+wlIvXctPKpcw=
|
|
||||||
github.com/looplab/fsm v1.0.3/go.mod h1:PmD3fFvQEIsjMEfvZdrCDZ6y8VwKTwWNjlpEr6IKPO4=
|
|
||||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
|
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
|
||||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
|
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
|
||||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||||
|
@ -1,24 +0,0 @@
|
|||||||
package fsm
|
|
||||||
|
|
||||||
// 定义任务状态
|
|
||||||
const (
|
|
||||||
StatePending = "pending" // 待派发
|
|
||||||
StateAssigned = "assigned" // 已派发
|
|
||||||
StateAccepted = "accepted" // 已接单
|
|
||||||
StateSubmitted = "submitted" // 已提交
|
|
||||||
StateApproved = "approved" // 已审核
|
|
||||||
StateRejected = "rejected" // 已驳回
|
|
||||||
StateSettled = "settled" // 已结算
|
|
||||||
StateCanceled = "canceled" // 已取消
|
|
||||||
)
|
|
||||||
|
|
||||||
// 定义任务事件
|
|
||||||
const (
|
|
||||||
EventAssign = "assign" // 派发任务
|
|
||||||
EventAccept = "accept" // 接受任务
|
|
||||||
EventSubmit = "submit" // 提交任务
|
|
||||||
EventApprove = "approve" // 审核通过
|
|
||||||
EventReject = "reject" // 审核驳回
|
|
||||||
EventSettle = "settle" // 结算任务
|
|
||||||
EventCancel = "cancel" // 取消任务
|
|
||||||
)
|
|
318
pkg/fsm/fsm.go
318
pkg/fsm/fsm.go
@ -2,87 +2,283 @@ package fsm
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/looplab/fsm"
|
"git.zhangshuocauc.cn/redhat/timewheel"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Task 任务结构体
|
type EventName string
|
||||||
type Task struct {
|
|
||||||
ID string
|
func (e EventName) Error() string {
|
||||||
FSM *fsm.FSM
|
return string(e)
|
||||||
UserID string // 示例:记录操作者
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTask 创建新任务
|
const (
|
||||||
func NewTask(id string) *Task {
|
EventEntry EventName = "EventEntry"
|
||||||
task := &Task{
|
EventExit EventName = "EventExit"
|
||||||
ID: id,
|
EventTimeOut EventName = "EventTimeOut"
|
||||||
|
|
||||||
|
EventOK EventName = "EventOK"
|
||||||
|
EventNoProc EventName = "EventNoProc"
|
||||||
|
)
|
||||||
|
|
||||||
|
var internalEvents = Events{
|
||||||
|
EventEntry,
|
||||||
|
EventExit,
|
||||||
|
EventTimeOut,
|
||||||
|
EventOK,
|
||||||
|
EventNoProc,
|
||||||
|
}
|
||||||
|
|
||||||
|
type StateFunc func(context.Context, *Event) error
|
||||||
|
|
||||||
|
type StateRule struct {
|
||||||
|
Name string
|
||||||
|
Parent string
|
||||||
|
InitState string
|
||||||
|
Processor StateFunc
|
||||||
|
Dst []string // Define rules that allow conversion, allowing null values
|
||||||
|
}
|
||||||
|
|
||||||
|
type Event struct {
|
||||||
|
FSM *FSM
|
||||||
|
Event EventName
|
||||||
|
Src string
|
||||||
|
Dst string
|
||||||
|
Args []interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type FSM struct {
|
||||||
|
eventMu sync.Mutex
|
||||||
|
log *log.Logger
|
||||||
|
name string
|
||||||
|
currentState *StateRule
|
||||||
|
eventNames Events
|
||||||
|
fsmStates StateRules
|
||||||
|
timer *timewheel.TimeWheel // Use a time wheel timer with a default slot count of 10 and a cycle time of 100ms
|
||||||
|
}
|
||||||
|
|
||||||
|
type StateRules []*StateRule
|
||||||
|
|
||||||
|
func (s StateRules) findByName(name string) *StateRule {
|
||||||
|
if name == "" {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// 创建状态机
|
for _, sub := range s {
|
||||||
task.FSM = fsm.NewFSM(
|
if sub.Name == name {
|
||||||
StatePending, // 初始状态
|
return sub
|
||||||
fsm.Events{ // 定义状态转换规则
|
}
|
||||||
{Name: EventAssign, Src: []string{StatePending}, Dst: StateAssigned},
|
}
|
||||||
{Name: EventAccept, Src: []string{StateAssigned}, Dst: StateAccepted},
|
|
||||||
{Name: EventSubmit, Src: []string{StateAccepted}, Dst: StateSubmitted},
|
return nil
|
||||||
{Name: EventApprove, Src: []string{StateSubmitted}, Dst: StateApproved},
|
}
|
||||||
{Name: EventReject, Src: []string{StateSubmitted}, Dst: StateRejected},
|
|
||||||
{Name: EventSettle, Src: []string{StateApproved}, Dst: StateSettled},
|
func (s StateRules) checkRule() bool {
|
||||||
// Cancel 事件可以从 Pending 或 Assigned 状态触发
|
for _, state := range s {
|
||||||
{Name: EventCancel, Src: []string{StatePending, StateAssigned}, Dst: StateCanceled},
|
for _, des := range state.Dst {
|
||||||
},
|
if des != "" && s.findByName(des) == nil {
|
||||||
fsm.Callbacks{ // 定义回调函数
|
return false
|
||||||
// 通用回调:每次触发事件前执行(通过 e.Cancel() 可以组织内存中实际状态变化)
|
}
|
||||||
"before_event": func(_ context.Context, e *fsm.Event) {
|
}
|
||||||
if e.Event == EventCancel {
|
|
||||||
e.Cancel(fmt.Errorf("failed to handle event %s", e.Event))
|
if state.Parent != "" && s.findByName(state.Parent) == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if state.InitState != "" && s.findByName(state.InitState) == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
type Events []EventName
|
||||||
|
|
||||||
|
func (e Events) findByName(name EventName) EventName {
|
||||||
|
for _, sub := range e {
|
||||||
|
if sub == name {
|
||||||
|
return sub
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFsm(name string, initState string, events Events, states StateRules, logs ...*log.Logger) (*FSM, error) {
|
||||||
|
if len(events) == 0 || len(states) == 0 {
|
||||||
|
return nil, fmt.Errorf("initial state cannot be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify if all conversion rules are correct
|
||||||
|
if !states.checkRule() {
|
||||||
|
return nil, fmt.Errorf("state rules error")
|
||||||
|
}
|
||||||
|
|
||||||
|
fsm := &FSM{
|
||||||
|
name: name,
|
||||||
|
currentState: states.findByName(initState),
|
||||||
|
eventNames: events,
|
||||||
|
fsmStates: states,
|
||||||
|
timer: timewheel.New(10, time.Millisecond*100),
|
||||||
|
}
|
||||||
|
|
||||||
|
fsm.eventNames = append(fsm.eventNames, internalEvents...)
|
||||||
|
if len(logs) == 1 {
|
||||||
|
fsm.log = logs[0]
|
||||||
|
}
|
||||||
|
if fsm.log == nil {
|
||||||
|
fsm.log = log.Default()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute ENTRY event for initial state and its sub-states
|
||||||
|
for tempState := fsm.currentState; tempState != nil; tempState = fsm.fsmStates.findByName(tempState.InitState) {
|
||||||
|
fsm.currentState = tempState
|
||||||
|
if err := fsm.ExecuteEvent(context.TODO(), EventEntry, nil); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fsm, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsm *FSM) StartEventTimer(ctx context.Context, typeT timewheel.TimeMode, cycle time.Duration, arg interface{}) {
|
||||||
|
oldState := fsm.currentState.Name
|
||||||
|
fsm.log.Printf("Start event timer state: %s\n", oldState)
|
||||||
|
fsm.timer.AddTask(fsm.currentState.Name, typeT, func() {
|
||||||
|
// Verify whether the current state has changed
|
||||||
|
// If there is a change, timeout events cannot be executed anymore and the timer for the status needs to be deleted
|
||||||
|
if fsm.currentState.Name != oldState {
|
||||||
|
fsm.log.Printf("Event timer but state alrady change (%s -> %s)\n", oldState, fsm.currentState.Name)
|
||||||
|
fsm.timer.RemoveTask(oldState)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fmt.Printf("[Event]Task %s event %s triggered\n", task.ID, e.Event)
|
fsm.ExecuteEvent(ctx, EventTimeOut, arg)
|
||||||
},
|
}, cycle)
|
||||||
// 通用回调:每次进入新状态时触发
|
|
||||||
"enter_state": func(_ context.Context, e *fsm.Event) {
|
|
||||||
fmt.Printf("[State] Task %s state changed from %s to %s\n", task.ID, e.Src, e.Dst)
|
|
||||||
// 在这里可以添加通用逻辑,如更新数据库状态
|
|
||||||
},
|
|
||||||
// 特定状态回调:进入 Assigned 状态后执行
|
|
||||||
StateAssigned: func(_ context.Context, e *fsm.Event) {
|
|
||||||
fmt.Printf("[State] Task %s assigned to user %s\n", task.ID, task.UserID)
|
|
||||||
// 在这里可以发送通知给被分配者
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
return task
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 任务状态流转方法
|
func (fsm *FSM) StopEventTimer() {
|
||||||
func (t *Task) Assign(ctx context.Context, userID string) error {
|
fsm.timer.RemoveTask(fsm.currentState.Name)
|
||||||
t.UserID = userID // 可以在事件触发前设置相关信息
|
|
||||||
return t.FSM.Event(ctx, EventAssign) // 触发 Assign 事件
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) Accept(ctx context.Context) error {
|
func (fsm *FSM) StopTimer() {
|
||||||
return t.FSM.Event(ctx, EventAccept)
|
fsm.timer.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) Submit(ctx context.Context) error {
|
// Because the timer timeout event is triggered by other goroutines, the execution of the event requires a lock
|
||||||
return t.FSM.Event(ctx, EventSubmit)
|
func (fsm *FSM) ExecuteEvent(ctx context.Context, event EventName, arg ...interface{}) error {
|
||||||
|
fsm.eventMu.Lock()
|
||||||
|
defer fsm.eventMu.Unlock()
|
||||||
|
|
||||||
|
if fsm.eventNames.findByName(event) == "" {
|
||||||
|
return fmt.Errorf("new state cannot be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
fsm.log.Printf("%s %-23s STATE:%-23s EVENT:%s\n", time.Now().Format(".999"), fsm.name, fsm.currentState.Name, event)
|
||||||
|
|
||||||
|
tmpState := fsm.currentState
|
||||||
|
if tmpState == nil {
|
||||||
|
return fmt.Errorf("current state is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if tmpState.Processor != nil {
|
||||||
|
e := &Event{FSM: fsm, Event: event, Src: tmpState.Name, Dst: tmpState.Name, Args: arg}
|
||||||
|
result := tmpState.Processor(ctx, e)
|
||||||
|
|
||||||
|
for errors.Is(result, EventNoProc) && fsm.fsmStates.findByName(tmpState.Parent) != nil {
|
||||||
|
tmpState = fsm.fsmStates.findByName(tmpState.Parent)
|
||||||
|
result = tmpState.Processor(ctx, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
if result != nil && !errors.Is(result, EventNoProc) && !errors.Is(result, EventOK) {
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) Approve(ctx context.Context) error {
|
func (fsm *FSM) StateChange(ctx context.Context, newState string, arg interface{}) error {
|
||||||
return t.FSM.Event(ctx, EventApprove)
|
if newState == "" || fsm.fsmStates.findByName(newState) == nil {
|
||||||
|
return fmt.Errorf("new state cannot be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// If there are defined rules, they need to be checked. If they are empty, it means that any transition between states is allowed
|
||||||
|
if !fsm.checkDstRule(newState) {
|
||||||
|
return fmt.Errorf("the new state is not in dst rule")
|
||||||
|
}
|
||||||
|
|
||||||
|
for tempState := fsm.currentState; tempState != nil; tempState = fsm.fsmStates.findByName(tempState.Parent) {
|
||||||
|
fsm.currentState = tempState
|
||||||
|
if err := fsm.ExecuteEvent(ctx, EventExit, arg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fsm.StopEventTimer()
|
||||||
|
|
||||||
|
if newState == tempState.Name {
|
||||||
|
if err := fsm.ExecuteEvent(ctx, EventEntry, arg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if found, ok := fsm.findState(fsm.fsmStates.findByName(newState), tempState); ok {
|
||||||
|
for end := len(found) - 1; end >= 0; end-- {
|
||||||
|
fsm.currentState = found[end]
|
||||||
|
if err := fsm.ExecuteEvent(ctx, EventEntry, arg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enter sub-states of the new state
|
||||||
|
for tempState := fsm.fsmStates.findByName(newState).InitState; fsm.fsmStates.findByName(tempState) != nil; tempState = fsm.fsmStates.findByName(tempState).InitState {
|
||||||
|
fsm.currentState = fsm.fsmStates.findByName(tempState)
|
||||||
|
if err := fsm.ExecuteEvent(ctx, EventEntry, arg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) Settle(ctx context.Context) error {
|
func (fsm *FSM) findState(targetState, findState *StateRule) ([]*StateRule, bool) {
|
||||||
return t.FSM.Event(ctx, EventSettle)
|
if targetState == nil || findState == nil {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
var fsmStateList []*StateRule
|
||||||
|
|
||||||
|
for tempState := targetState; tempState != nil; tempState = fsm.fsmStates.findByName(tempState.Parent) {
|
||||||
|
fsmStateList = append(fsmStateList, tempState)
|
||||||
|
if findState.Parent == tempState.Parent {
|
||||||
|
return fsmStateList, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fsmStateList, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) Cancel(ctx context.Context) error {
|
func (fsm *FSM) checkDstRule(newState string) bool {
|
||||||
return t.FSM.Event(ctx, EventCancel)
|
curState := fsm.currentState
|
||||||
|
|
||||||
|
if len(curState.Dst) > 0 {
|
||||||
|
for _, dst := range curState.Dst {
|
||||||
|
if dst == "" || dst == newState {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// ... 其他事件触发方法 ...
|
|
||||||
|
|
||||||
|
@ -3,65 +3,93 @@ package fsm
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/looplab/fsm"
|
"git.zhangshuocauc.cn/redhat/timewheel"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func state1(ctx context.Context, e *Event) error {
|
||||||
|
switch e.Event {
|
||||||
|
case EventEntry:
|
||||||
|
log.Println("state1", *e)
|
||||||
|
case EventExit:
|
||||||
|
log.Println("state1", *e)
|
||||||
|
default:
|
||||||
|
log.Println("state1", *e)
|
||||||
|
return EventNoProc
|
||||||
|
}
|
||||||
|
|
||||||
|
return EventOK
|
||||||
|
}
|
||||||
|
|
||||||
|
func state2(ctx context.Context, e *Event) error {
|
||||||
|
switch e.Event {
|
||||||
|
case EventEntry:
|
||||||
|
log.Println("state2", *e)
|
||||||
|
case EventExit:
|
||||||
|
log.Println("state2", *e)
|
||||||
|
default:
|
||||||
|
log.Println("state2", *e)
|
||||||
|
return EventNoProc
|
||||||
|
}
|
||||||
|
|
||||||
|
return EventOK
|
||||||
|
}
|
||||||
|
|
||||||
|
func state3(ctx context.Context, e *Event) error {
|
||||||
|
switch e.Event {
|
||||||
|
case EventEntry:
|
||||||
|
log.Println("state3", *e)
|
||||||
|
case EventExit:
|
||||||
|
log.Println("state3", *e)
|
||||||
|
case "assign":
|
||||||
|
log.Println("state3", *e)
|
||||||
|
default:
|
||||||
|
log.Println("state3", *e)
|
||||||
|
return EventNoProc
|
||||||
|
}
|
||||||
|
|
||||||
|
return EventOK
|
||||||
|
}
|
||||||
|
|
||||||
|
var events = Events{
|
||||||
|
"assign",
|
||||||
|
"accept",
|
||||||
|
}
|
||||||
|
|
||||||
func Test_fsm(t *testing.T) {
|
func Test_fsm(t *testing.T) {
|
||||||
|
fsm, err := NewFsm("MyFsm", "state1", events, []*StateRule{
|
||||||
|
{Name: "state1", Parent: "", InitState: "state2", Processor: state1, Dst: []string{}},
|
||||||
|
{Name: "state2", Parent: "state1", InitState: "", Processor: state2, Dst: []string{}},
|
||||||
|
{Name: "state3", Parent: "state1", InitState: "", Processor: state3, Dst: []string{}},
|
||||||
|
{},
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer fsm.StopTimer()
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// 创建新任务
|
fmt.Println(fsm)
|
||||||
fmt.Println("----------------task1----------------")
|
|
||||||
fmt.Println("-------------验证状态流转--------------")
|
|
||||||
task := NewTask("task-001")
|
|
||||||
|
|
||||||
// 任务状态流转
|
fsm.ExecuteEvent(ctx, EventName("assign"), nil)
|
||||||
fmt.Println("Current state:", task.FSM.Current()) // pending
|
|
||||||
|
|
||||||
_ = task.Assign(ctx, "user-001")
|
fmt.Println(fsm.StateChange(ctx, "state3", nil))
|
||||||
fmt.Println("Current state:", task.FSM.Current()) // assigned
|
|
||||||
|
|
||||||
_ = task.Accept(ctx)
|
fsm.ExecuteEvent(ctx, EventName("accept"), nil)
|
||||||
fmt.Println("Current state:", task.FSM.Current()) // accepted
|
|
||||||
|
|
||||||
_ = task.Submit(ctx)
|
fsm.ExecuteEvent(ctx, EventName("accept1"), nil)
|
||||||
fmt.Println("Current state:", task.FSM.Current()) // submitted
|
|
||||||
|
|
||||||
_ = task.Approve(ctx)
|
fsm.StartEventTimer(ctx, timewheel.TimeTypeLoop, time.Millisecond*100, nil)
|
||||||
fmt.Println("Current state:", task.FSM.Current()) // approved
|
|
||||||
|
|
||||||
_ = task.Settle(ctx)
|
<-time.After(time.Millisecond*100)
|
||||||
fmt.Println("Current state:", task.FSM.Current()) // settled
|
|
||||||
|
|
||||||
// 回调报错情况
|
fmt.Println(fsm.StateChange(ctx, "state2", nil))
|
||||||
fmt.Println("----------------task2----------------")
|
|
||||||
fmt.Println("-------------验证回调报错情况-------------")
|
|
||||||
task2 := NewTask("task-002")
|
|
||||||
fmt.Println("Current state:", task2.FSM.Current()) // pending
|
|
||||||
|
|
||||||
_ = task2.Assign(ctx, "user-002")
|
<-time.After(3 * time.Second)
|
||||||
fmt.Println("Current state:", task2.FSM.Current()) // assigned
|
|
||||||
|
|
||||||
e := task2.Cancel(ctx)
|
|
||||||
fmt.Println("Error:", e)
|
|
||||||
fmt.Println("Current state:", task2.FSM.Current()) // assigned(cancel failed)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMermaidOutput(t *testing.T) {
|
|
||||||
task := NewTask("task-001")
|
|
||||||
|
|
||||||
gotDiagram, err := fsm.VisualizeForMermaidWithGraphType(task.FSM, fsm.StateDiagram)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("got error for visualizing with type MERMAID: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
gotFlowchart, err := fsm.VisualizeForMermaidWithGraphType(task.FSM, fsm.FlowChart)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("got error for visualizing with type MERMAID: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println(gotDiagram)
|
|
||||||
fmt.Println("--------------------------------")
|
|
||||||
fmt.Println(gotFlowchart)
|
|
||||||
}
|
}
|
||||||
|
176
pkg/myfsm/fsm.go
176
pkg/myfsm/fsm.go
@ -1,176 +0,0 @@
|
|||||||
package fsm
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.zhangshuocauc.cn/redhat/timewheel"
|
|
||||||
)
|
|
||||||
|
|
||||||
type EventName string
|
|
||||||
|
|
||||||
const (
|
|
||||||
EventEntry EventName = "EventEntry"
|
|
||||||
EventExit EventName = "EventExit"
|
|
||||||
EventTimeOut EventName = "EventTimeOut"
|
|
||||||
|
|
||||||
EventOK EventName = "EventOK"
|
|
||||||
EventNoProc EventName = "EventNoProc"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Callback func(fsm *FSM, event EventName, arg interface{}) EventName
|
|
||||||
|
|
||||||
type Callbacks map[string]Callback
|
|
||||||
|
|
||||||
type FSMState struct {
|
|
||||||
name string
|
|
||||||
parent string
|
|
||||||
initState string
|
|
||||||
processor Callback
|
|
||||||
}
|
|
||||||
|
|
||||||
type FSM struct {
|
|
||||||
name string
|
|
||||||
currentState *FSMState
|
|
||||||
eventNames *[]string
|
|
||||||
fsmStats map[string]*FSMState
|
|
||||||
timer *timewheel.TimeWheel
|
|
||||||
}
|
|
||||||
|
|
||||||
type Event struct {
|
|
||||||
FSM *FSM
|
|
||||||
Event string
|
|
||||||
Src string
|
|
||||||
Dst string
|
|
||||||
Err error
|
|
||||||
Args []interface{}
|
|
||||||
canceled bool
|
|
||||||
async bool
|
|
||||||
cancelFunc func()
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewFSMState(name string, parent, initState string, processor Callback) *FSMState {
|
|
||||||
return &FSMState{
|
|
||||||
name: name,
|
|
||||||
parent: parent,
|
|
||||||
initState: initState,
|
|
||||||
processor: processor,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewFSM(name string, eventNames *[]string, fsmStats ...*FSMState) (*FSM, error) {
|
|
||||||
if len(fsmStats) == 0 {
|
|
||||||
return nil, fmt.Errorf("initial state cannot be nil")
|
|
||||||
}
|
|
||||||
fsm := &FSM{
|
|
||||||
name: name,
|
|
||||||
currentState: fsmStats[0],
|
|
||||||
eventNames: eventNames,
|
|
||||||
fsmStats: make(map[string]*FSMState),
|
|
||||||
timer: timewheel.New(1, time.Millisecond*100),
|
|
||||||
}
|
|
||||||
|
|
||||||
for index := range fsmStats {
|
|
||||||
fsm.fsmStats[fsmStats[index].name] = fsmStats[index]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute ENTRY event for initial state and its sub-states
|
|
||||||
for tempState := fsm.currentState; tempState != nil; tempState = fsm.fsmStats[tempState.initState] {
|
|
||||||
fsm.currentState = tempState
|
|
||||||
if err := fsm.ExecuteEvent(EventEntry, nil); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return fsm, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fsm *FSM) StartTimer(typeT timewheel.TimeMode, cycle time.Duration, arg interface{}) {
|
|
||||||
fsm.timer.AddTask(fsm.currentState.name, typeT, func() {
|
|
||||||
fsm.ExecuteEvent(EventTimeOut, arg)
|
|
||||||
}, time.Now().Add(cycle))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fsm *FSM) StopTimer() {
|
|
||||||
fsm.timer.RemoveTask(fsm.currentState.name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fsm *FSM) ExecuteEvent(event EventName, arg interface{}) error {
|
|
||||||
if fsm == nil {
|
|
||||||
return fmt.Errorf("FSM is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("%d %-23s STATE:%-23s EVENT:%s\n", time.Now().UnixMilli(), fsm.name, fsm.currentState.name, event)
|
|
||||||
|
|
||||||
tmpState := fsm.currentState
|
|
||||||
if tmpState == nil {
|
|
||||||
return fmt.Errorf("current state is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
result := tmpState.processor(fsm, event, arg)
|
|
||||||
for result == EventNoProc && fsm.fsmStats[tmpState.parent] != nil {
|
|
||||||
tmpState = fsm.fsmStats[tmpState.parent]
|
|
||||||
result = tmpState.processor(fsm, event, arg)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fsm *FSM) ChangeState(newState string, arg interface{}) error {
|
|
||||||
if newState == "" {
|
|
||||||
return fmt.Errorf("new state cannot be nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
curState := fsm.currentState
|
|
||||||
|
|
||||||
for tempState := curState; tempState != nil; tempState = fsm.fsmStats[tempState.parent] {
|
|
||||||
fsm.currentState = tempState
|
|
||||||
if err := fsm.ExecuteEvent(EventExit, nil); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
fsm.StopTimer()
|
|
||||||
|
|
||||||
if newState == tempState.name {
|
|
||||||
if err := fsm.ExecuteEvent(EventEntry, arg); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if found, ok := fsm.findState(fsm.fsmStats[newState], tempState); ok {
|
|
||||||
for end := len(found) - 1; end >= 0; end-- {
|
|
||||||
fsm.currentState = found[end]
|
|
||||||
if err := fsm.ExecuteEvent(EventEntry, arg); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enter sub-states of the new state
|
|
||||||
for tempState := fsm.fsmStats[newState].initState; fsm.fsmStats[tempState] != nil; tempState = fsm.fsmStats[tempState].initState {
|
|
||||||
fsm.currentState = fsm.fsmStats[tempState]
|
|
||||||
if err := fsm.ExecuteEvent(EventEntry, arg); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fsm *FSM) findState(targetState, findState *FSMState) ([]*FSMState, bool) {
|
|
||||||
var fsmStateList []*FSMState
|
|
||||||
|
|
||||||
for tempState := targetState; tempState != nil; tempState = fsm.fsmStats[tempState.parent] {
|
|
||||||
|
|
||||||
fsmStateList = append(fsmStateList, tempState)
|
|
||||||
|
|
||||||
if findState.parent == tempState.parent {
|
|
||||||
return fsmStateList, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return fsmStateList, false
|
|
||||||
}
|
|
@ -1,95 +0,0 @@
|
|||||||
package fsm
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.zhangshuocauc.cn/redhat/timewheel"
|
|
||||||
)
|
|
||||||
|
|
||||||
type myFsm struct {
|
|
||||||
fsm *FSM
|
|
||||||
}
|
|
||||||
|
|
||||||
func state1(fsm *FSM, event EventName, arg interface{}) EventName {
|
|
||||||
switch event {
|
|
||||||
case EventEntry:
|
|
||||||
log.Println("state1", event)
|
|
||||||
case EventExit:
|
|
||||||
log.Println("state1", event)
|
|
||||||
default:
|
|
||||||
log.Println("state1", event)
|
|
||||||
return EventNoProc
|
|
||||||
}
|
|
||||||
|
|
||||||
return EventOK
|
|
||||||
}
|
|
||||||
|
|
||||||
func state2(fsm *FSM, event EventName, arg interface{}) EventName {
|
|
||||||
switch event {
|
|
||||||
case EventEntry:
|
|
||||||
log.Println("state2", event)
|
|
||||||
case EventExit:
|
|
||||||
log.Println("state2", event)
|
|
||||||
case EventTimeOut:
|
|
||||||
log.Println("state2", event)
|
|
||||||
default:
|
|
||||||
log.Println("state2", event)
|
|
||||||
return EventNoProc
|
|
||||||
}
|
|
||||||
|
|
||||||
return EventOK
|
|
||||||
}
|
|
||||||
|
|
||||||
func state3(fsm *FSM, event EventName, arg interface{}) EventName {
|
|
||||||
switch event {
|
|
||||||
case EventEntry:
|
|
||||||
log.Println("state3", event)
|
|
||||||
case EventExit:
|
|
||||||
log.Println("state3", event)
|
|
||||||
case "test event":
|
|
||||||
log.Println("state3", event)
|
|
||||||
case EventTimeOut:
|
|
||||||
log.Println("state3", event)
|
|
||||||
default:
|
|
||||||
log.Println("state3", event)
|
|
||||||
return EventNoProc
|
|
||||||
}
|
|
||||||
|
|
||||||
return EventOK
|
|
||||||
}
|
|
||||||
|
|
||||||
func newMyFsm() *myFsm {
|
|
||||||
state1 := NewFSMState("State1", "", "State2", state1)
|
|
||||||
state2 := NewFSMState("State2", "State1", "", state2)
|
|
||||||
state3 := NewFSMState("State3", "State1", "", state3)
|
|
||||||
|
|
||||||
fsm, err := NewFSM("MyFSM", &[]string{string(EventEntry), string(EventExit)}, state1, state2, state3)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &myFsm{fsm: fsm}
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_fsm(t *testing.T) {
|
|
||||||
myfsm := newMyFsm()
|
|
||||||
|
|
||||||
if err := myfsm.fsm.ChangeState("State3", nil); err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
myfsm.fsm.ExecuteEvent("test event", nil)
|
|
||||||
|
|
||||||
myfsm.fsm.StartTimer(timewheel.TimerTypeOnce, time.Millisecond*100, nil)
|
|
||||||
|
|
||||||
<-time.After(200 * time.Millisecond)
|
|
||||||
|
|
||||||
myfsm.fsm.ChangeState("State2", nil)
|
|
||||||
|
|
||||||
myfsm.fsm.StartTimer(timewheel.TimeTypeLoop, time.Second, nil)
|
|
||||||
|
|
||||||
// Keep the program running to observeinterior
|
|
||||||
<-time.After(3 * time.Second)
|
|
||||||
}
|
|
@ -2,8 +2,6 @@ package timewheel
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
"fmt"
|
|
||||||
"math"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -13,7 +11,6 @@ type taskSlice struct {
|
|||||||
pos int
|
pos int
|
||||||
cycle int
|
cycle int
|
||||||
rawCycle int
|
rawCycle int
|
||||||
flower time.Duration
|
|
||||||
mode TimeMode
|
mode TimeMode
|
||||||
task func()
|
task func()
|
||||||
}
|
}
|
||||||
@ -21,11 +18,12 @@ type taskSlice struct {
|
|||||||
type TimeWheel struct {
|
type TimeWheel struct {
|
||||||
sync.Once
|
sync.Once
|
||||||
curtSlot int
|
curtSlot int
|
||||||
|
slotLen int
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
ticker *time.Ticker
|
ticker *time.Ticker
|
||||||
slots []*list.List
|
slots []*list.List
|
||||||
|
tinyWheel *list.List
|
||||||
taskMap map[string]*list.Element
|
taskMap map[string]*list.Element
|
||||||
tasksMap map[string][]int
|
|
||||||
stopC chan struct{}
|
stopC chan struct{}
|
||||||
addTaskC chan *taskSlice
|
addTaskC chan *taskSlice
|
||||||
removeTaskC chan string
|
removeTaskC chan string
|
||||||
@ -55,7 +53,8 @@ func New(slots int, interval time.Duration) *TimeWheel {
|
|||||||
stopC: make(chan struct{}),
|
stopC: make(chan struct{}),
|
||||||
addTaskC: make(chan *taskSlice),
|
addTaskC: make(chan *taskSlice),
|
||||||
removeTaskC: make(chan string),
|
removeTaskC: make(chan string),
|
||||||
tasksMap: make(map[string][]int),
|
tinyWheel: list.New(),
|
||||||
|
slotLen: slots,
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < slots; i++ {
|
for i := 0; i < slots; i++ {
|
||||||
@ -77,7 +76,6 @@ func (t *TimeWheel) AddTask(id string, mode TimeMode, task func(), flower time.D
|
|||||||
mode: mode,
|
mode: mode,
|
||||||
rawCycle: cycle,
|
rawCycle: cycle,
|
||||||
pos: pos,
|
pos: pos,
|
||||||
flower: flower,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -94,28 +92,12 @@ func (t *TimeWheel) Stop() {
|
|||||||
|
|
||||||
func (t *TimeWheel) getSlotPosAndCycle(flower time.Duration) (int, int) {
|
func (t *TimeWheel) getSlotPosAndCycle(flower time.Duration) (int, int) {
|
||||||
delay := time.Until(time.Now().Add(flower))
|
delay := time.Until(time.Now().Add(flower))
|
||||||
cycle := delay / (t.interval * time.Duration(len(t.slots)))
|
cycle := delay / (t.interval * time.Duration(t.slotLen))
|
||||||
pos := (t.curtSlot + int(delay/t.interval)) % len(t.slots)
|
pos := (t.curtSlot + int(delay/t.interval)) % t.slotLen
|
||||||
|
|
||||||
return int(cycle), pos
|
return int(cycle), pos
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TimeWheel) getCyclePos(task *taskSlice) []int {
|
|
||||||
if task.cycle == 0 && task.mode == TimeTypeLoop {
|
|
||||||
var listc []int
|
|
||||||
dur := int(math.Ceil(float64(float64(task.flower) / float64(t.interval))))
|
|
||||||
start := task.pos
|
|
||||||
for i := 0; i < int(math.Ceil(float64(len(t.slots))/float64(dur))); i++ {
|
|
||||||
listc = append(listc, start%len(t.slots))
|
|
||||||
start += dur
|
|
||||||
}
|
|
||||||
|
|
||||||
return listc
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *TimeWheel) run() {
|
func (t *TimeWheel) run() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -124,9 +106,9 @@ func (t *TimeWheel) run() {
|
|||||||
case <-t.ticker.C:
|
case <-t.ticker.C:
|
||||||
t.tick()
|
t.tick()
|
||||||
case task := <-t.addTaskC:
|
case task := <-t.addTaskC:
|
||||||
t.addCycelTask(task)
|
t.addTask(task)
|
||||||
case key := <-t.removeTaskC:
|
case key := <-t.removeTaskC:
|
||||||
t.removeCycelTask(key)
|
t.removeTask(key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -136,10 +118,11 @@ func (t *TimeWheel) tick() {
|
|||||||
defer t.updateCurtSlot()
|
defer t.updateCurtSlot()
|
||||||
|
|
||||||
t.execute(list)
|
t.execute(list)
|
||||||
|
t.execute(t.tinyWheel)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TimeWheel) updateCurtSlot() {
|
func (t *TimeWheel) updateCurtSlot() {
|
||||||
t.curtSlot = (t.curtSlot + 1) % len(t.slots)
|
t.curtSlot = (t.curtSlot + 1) % t.slotLen
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TimeWheel) execute(l *list.List) {
|
func (t *TimeWheel) execute(l *list.List) {
|
||||||
@ -173,52 +156,21 @@ func (t *TimeWheel) execute(l *list.List) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TimeWheel) addCycelTask(task *taskSlice) {
|
|
||||||
t.removeCycelTask(task.id)
|
|
||||||
|
|
||||||
poss := t.getCyclePos(task)
|
|
||||||
fmt.Println(poss)
|
|
||||||
if len(poss) > 0 {
|
|
||||||
t.tasksMap[task.id] = poss
|
|
||||||
for _, index := range poss {
|
|
||||||
internalTask := &taskSlice{
|
|
||||||
id: fmt.Sprintf("%s_%d", task.id, index),
|
|
||||||
pos: index,
|
|
||||||
cycle: task.cycle,
|
|
||||||
rawCycle: task.rawCycle,
|
|
||||||
mode: task.mode,
|
|
||||||
task: task.task,
|
|
||||||
}
|
|
||||||
|
|
||||||
t.addTask(internalTask)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
t.addTask(task)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *TimeWheel) removeCycelTask(key string) {
|
|
||||||
poss := t.tasksMap[key]
|
|
||||||
if len(poss) > 0 {
|
|
||||||
for _, index := range poss {
|
|
||||||
id := fmt.Sprintf("%s_%d", key, index)
|
|
||||||
t.removeTask(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
t.removeTask(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *TimeWheel) addTask(task *taskSlice) {
|
func (t *TimeWheel) addTask(task *taskSlice) {
|
||||||
if _, ok := t.taskMap[task.id]; ok {
|
if _, ok := t.taskMap[task.id]; ok {
|
||||||
t.removeTask(task.id)
|
t.removeTask(task.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
list := t.slots[task.pos]
|
var list *list.List
|
||||||
|
if task.mode == TimeTypeLoop {
|
||||||
|
task.cycle = task.cycle*t.slotLen + (task.pos-t.curtSlot+t.slotLen)%t.slotLen
|
||||||
|
task.rawCycle = task.cycle
|
||||||
|
|
||||||
|
list = t.tinyWheel
|
||||||
|
} else {
|
||||||
|
list = t.slots[task.pos]
|
||||||
|
}
|
||||||
|
|
||||||
etask := list.PushBack(task)
|
etask := list.PushBack(task)
|
||||||
t.taskMap[task.id] = etask
|
t.taskMap[task.id] = etask
|
||||||
}
|
}
|
||||||
@ -232,5 +184,10 @@ func (t *TimeWheel) removeTask(key string) {
|
|||||||
delete(t.taskMap, key)
|
delete(t.taskMap, key)
|
||||||
|
|
||||||
task := etask.Value.(*taskSlice)
|
task := etask.Value.(*taskSlice)
|
||||||
|
|
||||||
|
if task.mode == TimeTypeLoop {
|
||||||
|
_ = t.tinyWheel.Remove(etask)
|
||||||
|
} else {
|
||||||
_ = t.slots[task.pos].Remove(etask)
|
_ = t.slots[task.pos].Remove(etask)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -11,7 +11,8 @@ func Test_timeWheel(t *testing.T) {
|
|||||||
|
|
||||||
<-time.After(10 * time.Millisecond)
|
<-time.After(10 * time.Millisecond)
|
||||||
|
|
||||||
t.Errorf("test2, %v", time.Now())
|
t.Errorf("start time, %v", time.Now())
|
||||||
|
|
||||||
timeWheel.AddTask("test1", TimeTypeLoop, func() {
|
timeWheel.AddTask("test1", TimeTypeLoop, func() {
|
||||||
t.Errorf("test1, %v", time.Now())
|
t.Errorf("test1, %v", time.Now())
|
||||||
}, time.Millisecond*300)
|
}, time.Millisecond*300)
|
||||||
@ -22,5 +23,5 @@ func Test_timeWheel(t *testing.T) {
|
|||||||
t.Errorf("test2, %v", time.Now())
|
t.Errorf("test2, %v", time.Now())
|
||||||
}, time.Second)
|
}, time.Second)
|
||||||
|
|
||||||
<-time.After(6 * time.Second)
|
<-time.After(8 * time.Second)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user