dashboard/pkg/swarm/swarm.go

118 lines
2.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package swarm
import (
"context"
"dashboard/pkg/errgroups"
"log"
"golang.org/x/sync/errgroup"
)
const logPrefix = "[swarm] "
type Swarm[T any] struct {
scheduler Scheduler[T]
bees []*beeJob[T]
opts options
}
func NewSwarm[T any](num int, work func(T) error, opts ...Option) *Swarm[T] {
res := new(Swarm[T])
for _, o := range opts {
o(&res.opts)
}
res.opts.repire()
res.scheduler = newdefaultScheduler[T](res.opts.log)
for index := range num {
res.bees = append(res.bees, &beeJob[T]{
done: func(c chan T) { res.scheduler.Done(c) },
wc: res.scheduler.GenChannel(),
do: work,
index: index,
log: res.opts.log,
})
}
res.opts.log.Printf("%sStart bees number: %d.", logPrefix, len(res.bees))
return res
}
type beeJob[T any] struct {
done func(chan T)
wc chan T
do func(T) error
index int
log *log.Logger
}
func (b *beeJob[T]) Run(ctx context.Context) error {
for {
b.done(b.wc)
select {
case <-ctx.Done():
b.log.Printf("Bee num: %d,down\n", b.index)
return ctx.Err()
case req := <-b.wc:
if err := b.do(req); err != nil {
b.log.Printf("%sBee num: %d, do the work error: %v", logPrefix, b.index, err)
continue
}
b.log.Printf("%sBee num: %d, do the work success:)", logPrefix, b.index)
}
}
}
type Scheduler[T any] interface {
Run() error
Submit(T)
Done(chan T)
GenChannel() chan T
Stop()
}
func (s *Swarm[T]) Submit(sub T) {
s.scheduler.Submit(sub)
}
// implement FuncWithErr interface
func (s *Swarm[T]) Run(ctx context.Context) error {
wgs, ctx := errgroup.WithContext(ctx)
wgs.Go(func() error {
defer s.scheduler.Stop()
ewgs := errgroups.NewErrGroup()
// // 可以保证bee结束但是不能保证scheduler正常退出
// schectx, cancel := context.WithCancel(context.TODO())
// defer cancel()
// go s.scheduler.Run(schectx)
// ewgs.Add(s.scheduler) // 结束的比bee早导致bee可能会被永久阻塞导致进程不能退出。
for _, bee := range s.bees {
ewgs.Add(bee)
}
return ewgs.Run(ctx)
})
wgs.Go(func() error {
return s.scheduler.Run()
})
if err := wgs.Wait(); err != nil {
return err
}
return nil
}