1、时间轮使用插入的方式,计算复杂,且不准确。
This commit is contained in:
parent
5cd7328291
commit
ab69108550
5
go.mod
5
go.mod
@ -1,10 +1,9 @@
|
|||||||
module dashboard
|
module dashboard
|
||||||
|
|
||||||
go 1.23.0
|
go 1.24.3
|
||||||
|
|
||||||
toolchain go1.23.9
|
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
git.zhangshuocauc.cn/redhat/timewheel v1.0.1
|
||||||
github.com/bwmarrin/snowflake v0.3.0
|
github.com/bwmarrin/snowflake v0.3.0
|
||||||
github.com/creack/pty v1.1.24
|
github.com/creack/pty v1.1.24
|
||||||
github.com/fsnotify/fsnotify v1.9.0
|
github.com/fsnotify/fsnotify v1.9.0
|
||||||
|
2
go.sum
2
go.sum
@ -1,5 +1,7 @@
|
|||||||
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
|
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
|
||||||
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
|
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
|
||||||
|
git.zhangshuocauc.cn/redhat/timewheel v1.0.1 h1:llUaze4uk7yo5p/VsjbmYl0daWIvCEonrqCQnTpC6dw=
|
||||||
|
git.zhangshuocauc.cn/redhat/timewheel v1.0.1/go.mod h1:PNpKCPvSb0ynhx3k2oiigYEcKKq8G/d4P9Fh4VFZiac=
|
||||||
github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg=
|
github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg=
|
||||||
github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
|
github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
|
||||||
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
|
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
|
||||||
|
@ -4,6 +4,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.zhangshuocauc.cn/redhat/timewheel"
|
||||||
)
|
)
|
||||||
|
|
||||||
type EventName string
|
type EventName string
|
||||||
@ -11,6 +13,7 @@ type EventName string
|
|||||||
const (
|
const (
|
||||||
EventEntry EventName = "EventEntry"
|
EventEntry EventName = "EventEntry"
|
||||||
EventExit EventName = "EventExit"
|
EventExit EventName = "EventExit"
|
||||||
|
EventTimeOut EventName = "EventTimeOut"
|
||||||
|
|
||||||
EventOK EventName = "EventOK"
|
EventOK EventName = "EventOK"
|
||||||
EventNoProc EventName = "EventNoProc"
|
EventNoProc EventName = "EventNoProc"
|
||||||
@ -32,6 +35,7 @@ type FSM struct {
|
|||||||
currentState *FSMState
|
currentState *FSMState
|
||||||
eventNames *[]string
|
eventNames *[]string
|
||||||
fsmStats map[string]*FSMState
|
fsmStats map[string]*FSMState
|
||||||
|
timer *timewheel.TimeWheel
|
||||||
}
|
}
|
||||||
|
|
||||||
type Event struct {
|
type Event struct {
|
||||||
@ -64,6 +68,7 @@ func NewFSM(name string, eventNames *[]string, fsmStats ...*FSMState) (*FSM, err
|
|||||||
currentState: fsmStats[0],
|
currentState: fsmStats[0],
|
||||||
eventNames: eventNames,
|
eventNames: eventNames,
|
||||||
fsmStats: make(map[string]*FSMState),
|
fsmStats: make(map[string]*FSMState),
|
||||||
|
timer: timewheel.New(1, time.Millisecond*100),
|
||||||
}
|
}
|
||||||
|
|
||||||
for index := range fsmStats {
|
for index := range fsmStats {
|
||||||
@ -81,6 +86,16 @@ func NewFSM(name string, eventNames *[]string, fsmStats ...*FSMState) (*FSM, err
|
|||||||
return fsm, nil
|
return fsm, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fsm *FSM) StartTimer(typeT timewheel.TimeMode, cycle time.Duration, arg interface{}) {
|
||||||
|
fsm.timer.AddTask(fsm.currentState.name, typeT, func() {
|
||||||
|
fsm.ExecuteEvent(EventTimeOut, arg)
|
||||||
|
}, time.Now().Add(cycle))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fsm *FSM) StopTimer() {
|
||||||
|
fsm.timer.RemoveTask(fsm.currentState.name)
|
||||||
|
}
|
||||||
|
|
||||||
func (fsm *FSM) ExecuteEvent(event EventName, arg interface{}) error {
|
func (fsm *FSM) ExecuteEvent(event EventName, arg interface{}) error {
|
||||||
if fsm == nil {
|
if fsm == nil {
|
||||||
return fmt.Errorf("FSM is nil")
|
return fmt.Errorf("FSM is nil")
|
||||||
@ -114,6 +129,7 @@ func (fsm *FSM) ChangeState(newState string, arg interface{}) error {
|
|||||||
if err := fsm.ExecuteEvent(EventExit, nil); err != nil {
|
if err := fsm.ExecuteEvent(EventExit, nil); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
fsm.StopTimer()
|
||||||
|
|
||||||
if newState == tempState.name {
|
if newState == tempState.name {
|
||||||
if err := fsm.ExecuteEvent(EventEntry, arg); err != nil {
|
if err := fsm.ExecuteEvent(EventEntry, arg); err != nil {
|
||||||
|
@ -4,6 +4,8 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.zhangshuocauc.cn/redhat/timewheel"
|
||||||
)
|
)
|
||||||
|
|
||||||
type myFsm struct {
|
type myFsm struct {
|
||||||
@ -30,6 +32,8 @@ func state2(fsm *FSM, event EventName, arg interface{}) EventName {
|
|||||||
log.Println("state2", event)
|
log.Println("state2", event)
|
||||||
case EventExit:
|
case EventExit:
|
||||||
log.Println("state2", event)
|
log.Println("state2", event)
|
||||||
|
case EventTimeOut:
|
||||||
|
log.Println("state2", event)
|
||||||
default:
|
default:
|
||||||
log.Println("state2", event)
|
log.Println("state2", event)
|
||||||
return EventNoProc
|
return EventNoProc
|
||||||
@ -46,6 +50,8 @@ func state3(fsm *FSM, event EventName, arg interface{}) EventName {
|
|||||||
log.Println("state3", event)
|
log.Println("state3", event)
|
||||||
case "test event":
|
case "test event":
|
||||||
log.Println("state3", event)
|
log.Println("state3", event)
|
||||||
|
case EventTimeOut:
|
||||||
|
log.Println("state3", event)
|
||||||
default:
|
default:
|
||||||
log.Println("state3", event)
|
log.Println("state3", event)
|
||||||
return EventNoProc
|
return EventNoProc
|
||||||
@ -59,7 +65,7 @@ func newMyFsm() *myFsm {
|
|||||||
state2 := NewFSMState("State2", "State1", "", state2)
|
state2 := NewFSMState("State2", "State1", "", state2)
|
||||||
state3 := NewFSMState("State3", "State1", "", state3)
|
state3 := NewFSMState("State3", "State1", "", state3)
|
||||||
|
|
||||||
fsm, err := NewFSM("MyFSM", &[]string{string(EventEntry), string(EventExit)}, state2, state1, state3)
|
fsm, err := NewFSM("MyFSM", &[]string{string(EventEntry), string(EventExit)}, state1, state2, state3)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -76,12 +82,14 @@ func Test_fsm(t *testing.T) {
|
|||||||
|
|
||||||
myfsm.fsm.ExecuteEvent("test event", nil)
|
myfsm.fsm.ExecuteEvent("test event", nil)
|
||||||
|
|
||||||
// Start a timer (example)
|
myfsm.fsm.StartTimer(timewheel.TimerTypeOnce, time.Millisecond*100, nil)
|
||||||
// timerHandle := &ReactorTimerHandle{}
|
|
||||||
// if err := fsm.StartReactorTimer(timerHandle, 0, 1000, EventEntry, nil); err != nil {
|
<-time.After(200 * time.Millisecond)
|
||||||
// log.Fatal(err)
|
|
||||||
// }
|
myfsm.fsm.ChangeState("State2", nil)
|
||||||
|
|
||||||
|
myfsm.fsm.StartTimer(timewheel.TimeTypeLoop, time.Second, nil)
|
||||||
|
|
||||||
// Keep the program running to observeinterior
|
// Keep the program running to observeinterior
|
||||||
time.Sleep(2 * time.Second)
|
<-time.After(3 * time.Second)
|
||||||
}
|
}
|
||||||
|
236
pkg/timewheel/timewheel.go
Normal file
236
pkg/timewheel/timewheel.go
Normal file
@ -0,0 +1,236 @@
|
|||||||
|
package timewheel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/list"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type taskSlice struct {
|
||||||
|
id string
|
||||||
|
pos int
|
||||||
|
cycle int
|
||||||
|
rawCycle int
|
||||||
|
flower time.Duration
|
||||||
|
mode TimeMode
|
||||||
|
task func()
|
||||||
|
}
|
||||||
|
|
||||||
|
type TimeWheel struct {
|
||||||
|
sync.Once
|
||||||
|
curtSlot int
|
||||||
|
interval time.Duration
|
||||||
|
ticker *time.Ticker
|
||||||
|
slots []*list.List
|
||||||
|
taskMap map[string]*list.Element
|
||||||
|
tasksMap map[string][]int
|
||||||
|
stopC chan struct{}
|
||||||
|
addTaskC chan *taskSlice
|
||||||
|
removeTaskC chan string
|
||||||
|
}
|
||||||
|
|
||||||
|
type TimeMode int
|
||||||
|
|
||||||
|
const (
|
||||||
|
TimerTypeOnce TimeMode = iota
|
||||||
|
TimeTypeLoop
|
||||||
|
)
|
||||||
|
|
||||||
|
func New(slots int, interval time.Duration) *TimeWheel {
|
||||||
|
if slots <= 0 {
|
||||||
|
slots = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
if interval <= 0 {
|
||||||
|
interval = time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
t := &TimeWheel{
|
||||||
|
interval: interval,
|
||||||
|
ticker: time.NewTicker(interval),
|
||||||
|
slots: make([]*list.List, 0, slots),
|
||||||
|
taskMap: make(map[string]*list.Element),
|
||||||
|
stopC: make(chan struct{}),
|
||||||
|
addTaskC: make(chan *taskSlice),
|
||||||
|
removeTaskC: make(chan string),
|
||||||
|
tasksMap: make(map[string][]int),
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < slots; i++ {
|
||||||
|
t.slots = append(t.slots, list.New())
|
||||||
|
}
|
||||||
|
|
||||||
|
go t.run()
|
||||||
|
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeWheel) AddTask(id string, mode TimeMode, task func(), flower time.Duration) {
|
||||||
|
cycle, pos := t.getSlotPosAndCycle(flower)
|
||||||
|
|
||||||
|
t.addTaskC <- &taskSlice{
|
||||||
|
id: id,
|
||||||
|
task: task,
|
||||||
|
cycle: cycle,
|
||||||
|
mode: mode,
|
||||||
|
rawCycle: cycle,
|
||||||
|
pos: pos,
|
||||||
|
flower: flower,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeWheel) RemoveTask(key string) {
|
||||||
|
t.removeTaskC <- key
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeWheel) Stop() {
|
||||||
|
t.Do(func() {
|
||||||
|
t.ticker.Stop()
|
||||||
|
close(t.stopC)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeWheel) getSlotPosAndCycle(flower time.Duration) (int, int) {
|
||||||
|
delay := time.Until(time.Now().Add(flower))
|
||||||
|
cycle := delay / (t.interval * time.Duration(len(t.slots)))
|
||||||
|
pos := (t.curtSlot + int(delay/t.interval)) % len(t.slots)
|
||||||
|
|
||||||
|
return int(cycle), pos
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeWheel) getCyclePos(task *taskSlice) []int {
|
||||||
|
if task.cycle == 0 && task.mode == TimeTypeLoop {
|
||||||
|
var listc []int
|
||||||
|
dur := int(math.Ceil(float64(float64(task.flower) / float64(t.interval))))
|
||||||
|
start := task.pos
|
||||||
|
for i := 0; i < int(math.Ceil(float64(len(t.slots))/float64(dur))); i++ {
|
||||||
|
listc = append(listc, start%len(t.slots))
|
||||||
|
start += dur
|
||||||
|
}
|
||||||
|
|
||||||
|
return listc
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeWheel) run() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.stopC:
|
||||||
|
return
|
||||||
|
case <-t.ticker.C:
|
||||||
|
t.tick()
|
||||||
|
case task := <-t.addTaskC:
|
||||||
|
t.addCycelTask(task)
|
||||||
|
case key := <-t.removeTaskC:
|
||||||
|
t.removeCycelTask(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeWheel) tick() {
|
||||||
|
list := t.slots[t.curtSlot]
|
||||||
|
defer t.updateCurtSlot()
|
||||||
|
|
||||||
|
t.execute(list)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeWheel) updateCurtSlot() {
|
||||||
|
t.curtSlot = (t.curtSlot + 1) % len(t.slots)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeWheel) execute(l *list.List) {
|
||||||
|
for e := l.Front(); e != nil; {
|
||||||
|
event := e.Value.(*taskSlice)
|
||||||
|
if event.cycle > 0 {
|
||||||
|
event.cycle--
|
||||||
|
e = e.Next()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
if err := recover(); err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
event.task()
|
||||||
|
}()
|
||||||
|
|
||||||
|
if event.mode == TimerTypeOnce {
|
||||||
|
next := e.Next()
|
||||||
|
l.Remove(e)
|
||||||
|
delete(t.taskMap, event.id)
|
||||||
|
e = next
|
||||||
|
} else {
|
||||||
|
e = e.Next()
|
||||||
|
event.cycle = event.rawCycle
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeWheel) addCycelTask(task *taskSlice) {
|
||||||
|
t.removeCycelTask(task.id)
|
||||||
|
|
||||||
|
poss := t.getCyclePos(task)
|
||||||
|
fmt.Println(poss)
|
||||||
|
if len(poss) > 0 {
|
||||||
|
t.tasksMap[task.id] = poss
|
||||||
|
for _, index := range poss {
|
||||||
|
internalTask := &taskSlice{
|
||||||
|
id: fmt.Sprintf("%s_%d", task.id, index),
|
||||||
|
pos: index,
|
||||||
|
cycle: task.cycle,
|
||||||
|
rawCycle: task.rawCycle,
|
||||||
|
mode: task.mode,
|
||||||
|
task: task.task,
|
||||||
|
}
|
||||||
|
|
||||||
|
t.addTask(internalTask)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
t.addTask(task)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeWheel) removeCycelTask(key string) {
|
||||||
|
poss := t.tasksMap[key]
|
||||||
|
if len(poss) > 0 {
|
||||||
|
for _, index := range poss {
|
||||||
|
id := fmt.Sprintf("%s_%d", key, index)
|
||||||
|
t.removeTask(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
t.removeTask(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeWheel) addTask(task *taskSlice) {
|
||||||
|
if _, ok := t.taskMap[task.id]; ok {
|
||||||
|
t.removeTask(task.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
list := t.slots[task.pos]
|
||||||
|
etask := list.PushBack(task)
|
||||||
|
t.taskMap[task.id] = etask
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TimeWheel) removeTask(key string) {
|
||||||
|
etask, ok := t.taskMap[key]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(t.taskMap, key)
|
||||||
|
|
||||||
|
task := etask.Value.(*taskSlice)
|
||||||
|
_ = t.slots[task.pos].Remove(etask)
|
||||||
|
}
|
26
pkg/timewheel/timewheel_test.go
Normal file
26
pkg/timewheel/timewheel_test.go
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
package timewheel
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_timeWheel(t *testing.T) {
|
||||||
|
timeWheel := New(10, 100*time.Millisecond)
|
||||||
|
defer timeWheel.Stop()
|
||||||
|
|
||||||
|
<-time.After(10 * time.Millisecond)
|
||||||
|
|
||||||
|
t.Errorf("test2, %v", time.Now())
|
||||||
|
timeWheel.AddTask("test1", TimeTypeLoop, func() {
|
||||||
|
t.Errorf("test1, %v", time.Now())
|
||||||
|
}, time.Millisecond*300)
|
||||||
|
timeWheel.AddTask("test2", TimeTypeLoop, func() {
|
||||||
|
t.Errorf("test2, %v", time.Now())
|
||||||
|
}, time.Second)
|
||||||
|
timeWheel.AddTask("test2", TimerTypeOnce, func() {
|
||||||
|
t.Errorf("test2, %v", time.Now())
|
||||||
|
}, time.Second)
|
||||||
|
|
||||||
|
<-time.After(6 * time.Second)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user