diff --git a/dash b/dash deleted file mode 100755 index 69964a7..0000000 Binary files a/dash and /dev/null differ diff --git a/go.mod b/go.mod index 9fa4f0c..2c9d3c2 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/golang-jwt/jwt/v5 v5.2.2 github.com/gorilla/websocket v1.5.3 github.com/juju/ratelimit v1.0.2 + github.com/looplab/fsm v1.0.3 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/shirou/gopsutil/v4 v4.25.4 github.com/spf13/viper v1.20.1 diff --git a/go.sum b/go.sum index 65918ad..d0cedb4 100644 --- a/go.sum +++ b/go.sum @@ -65,6 +65,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/looplab/fsm v1.0.3 h1:qtxBsa2onOs0qFOtkqwf5zE0uP0+Te+wlIvXctPKpcw= +github.com/looplab/fsm v1.0.3/go.mod h1:PmD3fFvQEIsjMEfvZdrCDZ6y8VwKTwWNjlpEr6IKPO4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= diff --git a/pkg/fsm/event.go b/pkg/fsm/event.go new file mode 100644 index 0000000..c63067e --- /dev/null +++ b/pkg/fsm/event.go @@ -0,0 +1,24 @@ +package fsm + +// 定义任务状态 +const ( + StatePending = "pending" // 待派发 + StateAssigned = "assigned" // 已派发 + StateAccepted = "accepted" // 已接单 + StateSubmitted = "submitted" // 已提交 + StateApproved = "approved" // 已审核 + StateRejected = "rejected" // 已驳回 + StateSettled = "settled" // 已结算 + StateCanceled = "canceled" // 已取消 +) + +// 定义任务事件 +const ( + EventAssign = "assign" // 派发任务 + EventAccept = "accept" // 接受任务 + EventSubmit = "submit" // 提交任务 + EventApprove = "approve" // 审核通过 + EventReject = "reject" // 审核驳回 + EventSettle = "settle" // 结算任务 + EventCancel = "cancel" // 取消任务 +) diff --git a/pkg/fsm/fsm.go b/pkg/fsm/fsm.go new file mode 100644 index 0000000..6cfdb70 --- /dev/null +++ b/pkg/fsm/fsm.go @@ -0,0 +1,88 @@ +package fsm + +import ( + "context" + "fmt" + + "github.com/looplab/fsm" +) + +// Task 任务结构体 +type Task struct { + ID string + FSM *fsm.FSM + UserID string // 示例:记录操作者 +} + +// NewTask 创建新任务 +func NewTask(id string) *Task { + task := &Task{ + ID: id, + } + + // 创建状态机 + task.FSM = fsm.NewFSM( + StatePending, // 初始状态 + fsm.Events{ // 定义状态转换规则 + {Name: EventAssign, Src: []string{StatePending}, Dst: StateAssigned}, + {Name: EventAccept, Src: []string{StateAssigned}, Dst: StateAccepted}, + {Name: EventSubmit, Src: []string{StateAccepted}, Dst: StateSubmitted}, + {Name: EventApprove, Src: []string{StateSubmitted}, Dst: StateApproved}, + {Name: EventReject, Src: []string{StateSubmitted}, Dst: StateRejected}, + {Name: EventSettle, Src: []string{StateApproved}, Dst: StateSettled}, + // Cancel 事件可以从 Pending 或 Assigned 状态触发 + {Name: EventCancel, Src: []string{StatePending, StateAssigned}, Dst: StateCanceled}, + }, + fsm.Callbacks{ // 定义回调函数 + // 通用回调:每次触发事件前执行(通过 e.Cancel() 可以组织内存中实际状态变化) + "before_event": func(_ context.Context, e *fsm.Event) { + if e.Event == EventCancel { + e.Cancel(fmt.Errorf("failed to handle event %s", e.Event)) + return + } + fmt.Printf("[Event]Task %s event %s triggered\n", task.ID, e.Event) + }, + // 通用回调:每次进入新状态时触发 + "enter_state": func(_ context.Context, e *fsm.Event) { + fmt.Printf("[State] Task %s state changed from %s to %s\n", task.ID, e.Src, e.Dst) + // 在这里可以添加通用逻辑,如更新数据库状态 + }, + // 特定状态回调:进入 Assigned 状态后执行 + StateAssigned: func(_ context.Context, e *fsm.Event) { + fmt.Printf("[State] Task %s assigned to user %s\n", task.ID, task.UserID) + // 在这里可以发送通知给被分配者 + }, + }, + ) + + return task +} + +// 任务状态流转方法 +func (t *Task) Assign(ctx context.Context, userID string) error { + t.UserID = userID // 可以在事件触发前设置相关信息 + return t.FSM.Event(ctx, EventAssign) // 触发 Assign 事件 +} + +func (t *Task) Accept(ctx context.Context) error { + return t.FSM.Event(ctx, EventAccept) +} + +func (t *Task) Submit(ctx context.Context) error { + return t.FSM.Event(ctx, EventSubmit) +} + +func (t *Task) Approve(ctx context.Context) error { + return t.FSM.Event(ctx, EventApprove) +} + +func (t *Task) Settle(ctx context.Context) error { + return t.FSM.Event(ctx, EventSettle) +} + +func (t *Task) Cancel(ctx context.Context) error { + return t.FSM.Event(ctx, EventCancel) +} + +// ... 其他事件触发方法 ... + diff --git a/pkg/fsm/fsm_test.go b/pkg/fsm/fsm_test.go new file mode 100644 index 0000000..684e1ad --- /dev/null +++ b/pkg/fsm/fsm_test.go @@ -0,0 +1,67 @@ +package fsm + +import ( + "context" + "fmt" + "testing" + + "github.com/looplab/fsm" +) + +func Test_fsm(t *testing.T) { + ctx := context.Background() + + // 创建新任务 + fmt.Println("----------------task1----------------") + fmt.Println("-------------验证状态流转--------------") + task := NewTask("task-001") + + // 任务状态流转 + fmt.Println("Current state:", task.FSM.Current()) // pending + + _ = task.Assign(ctx, "user-001") + fmt.Println("Current state:", task.FSM.Current()) // assigned + + _ = task.Accept(ctx) + fmt.Println("Current state:", task.FSM.Current()) // accepted + + _ = task.Submit(ctx) + fmt.Println("Current state:", task.FSM.Current()) // submitted + + _ = task.Approve(ctx) + fmt.Println("Current state:", task.FSM.Current()) // approved + + _ = task.Settle(ctx) + fmt.Println("Current state:", task.FSM.Current()) // settled + + // 回调报错情况 + fmt.Println("----------------task2----------------") + fmt.Println("-------------验证回调报错情况-------------") + task2 := NewTask("task-002") + fmt.Println("Current state:", task2.FSM.Current()) // pending + + _ = task2.Assign(ctx, "user-002") + fmt.Println("Current state:", task2.FSM.Current()) // assigned + + e := task2.Cancel(ctx) + fmt.Println("Error:", e) + fmt.Println("Current state:", task2.FSM.Current()) // assigned(cancel failed) +} + +func TestMermaidOutput(t *testing.T) { + task := NewTask("task-001") + + gotDiagram, err := fsm.VisualizeForMermaidWithGraphType(task.FSM, fsm.StateDiagram) + if err != nil { + t.Errorf("got error for visualizing with type MERMAID: %s", err) + } + + gotFlowchart, err := fsm.VisualizeForMermaidWithGraphType(task.FSM, fsm.FlowChart) + if err != nil { + t.Errorf("got error for visualizing with type MERMAID: %s", err) + } + + fmt.Println(gotDiagram) + fmt.Println("--------------------------------") + fmt.Println(gotFlowchart) +} diff --git a/pkg/observe/observe.go b/pkg/observe/observe.go new file mode 100644 index 0000000..3030f13 --- /dev/null +++ b/pkg/observe/observe.go @@ -0,0 +1,141 @@ +package observe + +import ( + "context" + "fmt" + "sync" +) + +type Event struct { + Topic string + Value interface{} +} + +type Observer interface { + OnChange(context.Context, *Event) error +} + +type EventBus interface { + Subscribe(string, Observer) + Unsubscribe(string, Observer) + Publish(context.Context, *Event) +} + +type baseEventBus struct { + mu sync.RWMutex + observers map[string]map[Observer]struct{} +} + +func newBaseEventBus() *baseEventBus { + return &baseEventBus{ + observers: make(map[string]map[Observer]struct{}), + } +} + +func (b *baseEventBus) Subscribe(key string, ob Observer) { + b.mu.Lock() + defer b.mu.Unlock() + if _, ok := b.observers[key]; !ok { + b.observers[key] = make(map[Observer]struct{}) + } + + b.observers[key][ob] = struct{}{} +} + +func (b *baseEventBus) Unsubscribe(key string, ob Observer) { + b.mu.Lock() + defer b.mu.Unlock() + + delete(b.observers[key], ob) +} + +type SyncEventBus struct { + baseEventBus +} + +func NewSyncEventBus() *SyncEventBus { + return &SyncEventBus{ + baseEventBus: *newBaseEventBus(), + } +} + +func (s *SyncEventBus) Publish(ctx context.Context, e *Event) { + s.mu.RLock() + defer s.mu.RUnlock() + + observers := s.observers[e.Topic] + errs := make(map[Observer]error) + for ob := range observers { + if err := ob.OnChange(ctx, e); err != nil { + errs[ob] = err + } + } + + s.handleErr(ctx, errs) +} + +func (s *SyncEventBus) handleErr(_ context.Context, errs map[Observer]error) { + for ob, err := range errs { + fmt.Println(ob, err) + } +} + +type AsyncEventBus struct { + baseEventBus + errC chan *observerWithErr + celFn context.CancelFunc + ctx context.Context +} + +type observerWithErr struct { + ob Observer + err error +} + +func NewAsyncEventBus() *AsyncEventBus { + res := &AsyncEventBus{ + baseEventBus: *newBaseEventBus(), + } + + res.errC = make(chan *observerWithErr) + + res.ctx, res.celFn = context.WithCancel(context.Background()) + + go func() { + for { + select { + case <-res.ctx.Done(): + return + case err := <-res.errC: + fmt.Println(err.ob, err.err) + } + } + }() + + return res +} + +func (a *AsyncEventBus) Stop(){ + a.celFn() +} + +func (a *AsyncEventBus) Publish(ctx context.Context, e *Event) { + a.mu.RLock() + defer a.mu.RUnlock() + + observers := a.observers[e.Topic] + for ob := range observers { + ob := ob + go func() { + if err := ob.OnChange(ctx, e); err != nil { + select { + case <-a.ctx.Done(): + case a.errC <- &observerWithErr{ + ob: ob, + err: err, + }: + } + } + }() + } +} diff --git a/pkg/observe/observe_test.go b/pkg/observe/observe_test.go new file mode 100644 index 0000000..18fbf20 --- /dev/null +++ b/pkg/observe/observe_test.go @@ -0,0 +1,67 @@ +package observe + +import ( + "context" + "errors" + "fmt" + "testing" + "time" +) + +type baseObserver struct { + name string +} + +func newBaseObserver(name string) *baseObserver { + return &baseObserver{ + name: name, + } +} + +func (b *baseObserver) OnChange(ctx context.Context, e *Event) error { + fmt.Println(b.name, e.Topic, e.Value) + + return errors.New("jjjjj") +} + +func Test_syncEventBus(t *testing.T) { + obA := newBaseObserver("A") + obB := newBaseObserver("B") + obC := newBaseObserver("C") + obD := newBaseObserver("D") + + sBus := NewSyncEventBus() + topic := "sync" + sBus.Subscribe(topic, obA) + sBus.Subscribe(topic, obB) + sBus.Subscribe(topic, obC) + sBus.Subscribe(topic, obD) + + sBus.Publish(context.Background(), &Event{ + Topic: topic, + Value: "hello redhat", + }) +} + +func Test_asyncEventBus(t *testing.T) { + obA := newBaseObserver("A") + obB := newBaseObserver("B") + obC := newBaseObserver("C") + obD := newBaseObserver("D") + + sBus := NewAsyncEventBus() + defer sBus.Stop() + + topic := "async" + sBus.Subscribe(topic, obA) + sBus.Subscribe(topic, obB) + sBus.Subscribe(topic, obC) + sBus.Subscribe(topic, obD) + + sBus.Publish(context.Background(), &Event{ + Topic: topic, + Value: "hello redhat", + }) + + <-time.After(time.Second) +}