118 lines
2.1 KiB
Go
118 lines
2.1 KiB
Go
package swarm
|
||
|
||
import (
|
||
"context"
|
||
"dashboard/pkg/errgroups"
|
||
"log"
|
||
|
||
"golang.org/x/sync/errgroup"
|
||
)
|
||
|
||
const logPrefix = "[swarm] "
|
||
|
||
type Swarm[T any] struct {
|
||
scheduler Scheduler[T]
|
||
bees []*beeJob[T]
|
||
opts options
|
||
}
|
||
|
||
func NewSwarm[T any](num int, work func(T) error, opts ...Option) *Swarm[T] {
|
||
res := new(Swarm[T])
|
||
|
||
for _, o := range opts {
|
||
o(&res.opts)
|
||
}
|
||
|
||
res.opts.repire()
|
||
|
||
res.scheduler = newdefaultScheduler[T](res.opts.log)
|
||
|
||
for index := range num {
|
||
res.bees = append(res.bees, &beeJob[T]{
|
||
done: func(c chan T) { res.scheduler.Done(c) },
|
||
wc: res.scheduler.GenChannel(),
|
||
do: work,
|
||
index: index,
|
||
log: res.opts.log,
|
||
})
|
||
}
|
||
|
||
res.opts.log.Printf("%sStart bees number: %d.", logPrefix, len(res.bees))
|
||
|
||
return res
|
||
}
|
||
|
||
type beeJob[T any] struct {
|
||
done func(chan T)
|
||
wc chan T
|
||
do func(T) error
|
||
index int
|
||
log *log.Logger
|
||
}
|
||
|
||
func (b *beeJob[T]) Run(ctx context.Context) error {
|
||
for {
|
||
b.done(b.wc)
|
||
|
||
select {
|
||
case <-ctx.Done():
|
||
b.log.Printf("Bee num: %d,down\n", b.index)
|
||
return ctx.Err()
|
||
case req := <-b.wc:
|
||
if err := b.do(req); err != nil {
|
||
b.log.Printf("%sBee num: %d, do the work error: %v", logPrefix, b.index, err)
|
||
continue
|
||
}
|
||
|
||
b.log.Printf("%sBee num: %d, do the work success:)", logPrefix, b.index)
|
||
}
|
||
}
|
||
}
|
||
|
||
type Scheduler[T any] interface {
|
||
Run() error
|
||
Submit(T)
|
||
Done(chan T)
|
||
GenChannel() chan T
|
||
Stop()
|
||
}
|
||
|
||
func (s *Swarm[T]) Submit(sub T) {
|
||
s.scheduler.Submit(sub)
|
||
}
|
||
|
||
// implement FuncWithErr interface
|
||
func (s *Swarm[T]) Run(ctx context.Context) error {
|
||
wgs, ctx := errgroup.WithContext(ctx)
|
||
|
||
wgs.Go(func() error {
|
||
defer s.scheduler.Stop()
|
||
|
||
ewgs := errgroups.NewErrGroup()
|
||
|
||
// // 可以保证bee结束,但是不能保证scheduler正常退出
|
||
// schectx, cancel := context.WithCancel(context.TODO())
|
||
// defer cancel()
|
||
|
||
// go s.scheduler.Run(schectx)
|
||
|
||
// ewgs.Add(s.scheduler) // 结束的比bee早,导致bee可能会被永久阻塞,导致进程不能退出。
|
||
|
||
for _, bee := range s.bees {
|
||
ewgs.Add(bee)
|
||
}
|
||
|
||
return ewgs.Run(ctx)
|
||
})
|
||
|
||
wgs.Go(func() error {
|
||
return s.scheduler.Run()
|
||
})
|
||
|
||
if err := wgs.Wait(); err != nil {
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|