package observe import ( "context" "fmt" "sync" ) type Event struct { Topic string Value interface{} } type Observer interface { OnChange(context.Context, *Event) error } type EventBus interface { Subscribe(string, Observer) Unsubscribe(string, Observer) Publish(context.Context, *Event) } type baseEventBus struct { mu sync.RWMutex observers map[string]map[Observer]struct{} } func newBaseEventBus() *baseEventBus { return &baseEventBus{ observers: make(map[string]map[Observer]struct{}), } } func (b *baseEventBus) Subscribe(key string, ob Observer) { b.mu.Lock() defer b.mu.Unlock() if _, ok := b.observers[key]; !ok { b.observers[key] = make(map[Observer]struct{}) } b.observers[key][ob] = struct{}{} } func (b *baseEventBus) Unsubscribe(key string, ob Observer) { b.mu.Lock() defer b.mu.Unlock() delete(b.observers[key], ob) } type SyncEventBus struct { baseEventBus } func NewSyncEventBus() *SyncEventBus { return &SyncEventBus{ baseEventBus: *newBaseEventBus(), } } func (s *SyncEventBus) Publish(ctx context.Context, e *Event) { s.mu.RLock() defer s.mu.RUnlock() observers := s.observers[e.Topic] errs := make(map[Observer]error) for ob := range observers { if err := ob.OnChange(ctx, e); err != nil { errs[ob] = err } } s.handleErr(ctx, errs) } func (s *SyncEventBus) handleErr(_ context.Context, errs map[Observer]error) { for ob, err := range errs { fmt.Println(ob, err) } } type AsyncEventBus struct { baseEventBus errC chan *observerWithErr celFn context.CancelFunc ctx context.Context } type observerWithErr struct { ob Observer err error } func NewAsyncEventBus() *AsyncEventBus { res := &AsyncEventBus{ baseEventBus: *newBaseEventBus(), } res.errC = make(chan *observerWithErr) res.ctx, res.celFn = context.WithCancel(context.Background()) go func() { for { select { case <-res.ctx.Done(): return case err := <-res.errC: fmt.Println(err.ob, err.err) } } }() return res } func (a *AsyncEventBus) Stop(){ a.celFn() } func (a *AsyncEventBus) Publish(ctx context.Context, e *Event) { a.mu.RLock() defer a.mu.RUnlock() observers := a.observers[e.Topic] for ob := range observers { ob := ob go func() { if err := ob.OnChange(ctx, e); err != nil { select { case <-a.ctx.Done(): case a.errC <- &observerWithErr{ ob: ob, err: err, }: } } }() } }