package swarm import ( "context" "dashboard/pkg/errgroups" "log" "golang.org/x/sync/errgroup" ) const logPrefix = "[swarm] " type Swarm[T any] struct { scheduler Scheduler[T] bees []*beeJob[T] opts options } // TODO: 需要完善错误处理机制,重试(什么时候重试,重试几次?),投递死信队列 func NewSwarm[T any](num int, work func(T) error, opts ...Option) *Swarm[T] { res := new(Swarm[T]) for _, o := range opts { o(&res.opts) } res.opts.repire() res.scheduler = newdefaultScheduler[T](res.opts.log) for index := range num { res.bees = append(res.bees, &beeJob[T]{ done: func(c chan T) { res.scheduler.Done(c) }, wc: res.scheduler.GenChannel(), do: work, index: index, log: res.opts.log, }) } res.opts.log.Printf("%sStart bees number: %d.", logPrefix, len(res.bees)) return res } type beeJob[T any] struct { done func(chan T) wc chan T do func(T) error index int log *log.Logger } func (b *beeJob[T]) Run(ctx context.Context) error { for { b.done(b.wc) select { case <-ctx.Done(): b.log.Printf("Bee num: %d,down\n", b.index) return ctx.Err() case req := <-b.wc: if err := b.do(req); err != nil { b.log.Printf("%sBee num: %d, do the work error: %v", logPrefix, b.index, err) continue } b.log.Printf("%sBee num: %d, do the work success:)", logPrefix, b.index) } } } type Scheduler[T any] interface { Run() error Submit(T) Done(chan T) GenChannel() chan T Stop() } func (s *Swarm[T]) Submit(sub T) { s.scheduler.Submit(sub) } // implement FuncWithErr interface func (s *Swarm[T]) Run(ctx context.Context) error { wgs, ctx := errgroup.WithContext(ctx) wgs.Go(func() error { defer s.scheduler.Stop() ewgs := errgroups.NewErrGroup() // // 可以保证bee结束,但是不能保证scheduler正常退出 // schectx, cancel := context.WithCancel(context.TODO()) // defer cancel() // go s.scheduler.Run(schectx) // ewgs.Add(s.scheduler) // 结束的比bee早,导致bee可能会被永久阻塞,导致进程不能退出。 for _, bee := range s.bees { ewgs.Add(bee) } return ewgs.Run(ctx) }) wgs.Go(func() error { return s.scheduler.Run() }) if err := wgs.Wait(); err != nil { return err } return nil }