dashboard/pkg/errgroups/errgroups.go

126 lines
2.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package errgroups
import (
"context"
"dashboard/utils"
"sync"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
type ErrGroupFunc struct {
wg *errgroup.Group
mu sync.RWMutex
funs map[string]FuncWithErrFunc
}
type FuncWithErrFunc func(context.Context) error
// 注意使用func时如果想要对数组对象执行多个函数不可以使用此种方法因为函数名都是一样的map里面会被替换。此种场景需要使用接口的方式。
func NewErrGroupFunc() *ErrGroupFunc {
return &ErrGroupFunc{
funs: make(map[string]FuncWithErrFunc),
}
}
func (e *ErrGroupFunc) Run(ctx context.Context) error {
e.mu.RLock()
defer e.mu.RUnlock()
e.wg, ctx = errgroup.WithContext(ctx)
for name, fn := range e.funs {
fn := fn
name := name
e.wg.Go(func() error {
defer zap.L().Sugar().Warnf("func: %s stop", name)
zap.L().Sugar().Infof("func: %s runing...", name)
return fn(ctx)
})
}
if err := e.wg.Wait(); err != nil {
return err
}
return nil
}
func (e *ErrGroupFunc) Add(fn ...FuncWithErrFunc) {
e.mu.Lock()
defer e.mu.Unlock()
for _, f := range fn {
fnStr := utils.GetFuncName(f)
e.funs[fnStr] = f
}
}
func (e *ErrGroupFunc) Del(fn ...FuncWithErrFunc) {
e.mu.Lock()
defer e.mu.Unlock()
for _, f := range fn {
fnStr := utils.GetFuncName(f)
delete(e.funs, fnStr)
}
}
type ErrGroup struct {
wg *errgroup.Group
mu sync.RWMutex
funs map[FuncWithErr]struct{}
}
type FuncWithErr interface {
Run(context.Context) error
}
func NewErrGroup() *ErrGroup {
return &ErrGroup{
funs: make(map[FuncWithErr]struct{}),
}
}
func (e *ErrGroup) Run(ctx context.Context) error {
e.mu.RLock()
defer e.mu.RUnlock()
e.wg, ctx = errgroup.WithContext(ctx)
for fn := range e.funs {
fn := fn // shadow
name := utils.GetInterfaceName(fn)
e.wg.Go(func() error {
defer zap.L().Sugar().Warnf("Object: %s stop", name)
zap.L().Sugar().Infof("Object: %s runing...", name)
return fn.Run(ctx)
})
}
if err := e.wg.Wait(); err != nil {
return err
}
return nil
}
func (e *ErrGroup) Add(fn ...FuncWithErr) {
e.mu.Lock()
defer e.mu.Unlock()
for _, f := range fn {
e.funs[f] = struct{}{}
}
}
func (e *ErrGroup) Del(fn ...FuncWithErr) {
e.mu.Lock()
defer e.mu.Unlock()
for _, f := range fn {
delete(e.funs, f)
}
}