142 lines
2.4 KiB
Go
142 lines
2.4 KiB
Go
package observe
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
)
|
|
|
|
type Event struct {
|
|
Topic string
|
|
Value interface{}
|
|
}
|
|
|
|
type Observer interface {
|
|
OnChange(context.Context, *Event) error
|
|
}
|
|
|
|
type EventBus interface {
|
|
Subscribe(string, Observer)
|
|
Unsubscribe(string, Observer)
|
|
Publish(context.Context, *Event)
|
|
}
|
|
|
|
type baseEventBus struct {
|
|
mu sync.RWMutex
|
|
observers map[string]map[Observer]struct{}
|
|
}
|
|
|
|
func newBaseEventBus() *baseEventBus {
|
|
return &baseEventBus{
|
|
observers: make(map[string]map[Observer]struct{}),
|
|
}
|
|
}
|
|
|
|
func (b *baseEventBus) Subscribe(key string, ob Observer) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
if _, ok := b.observers[key]; !ok {
|
|
b.observers[key] = make(map[Observer]struct{})
|
|
}
|
|
|
|
b.observers[key][ob] = struct{}{}
|
|
}
|
|
|
|
func (b *baseEventBus) Unsubscribe(key string, ob Observer) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
delete(b.observers[key], ob)
|
|
}
|
|
|
|
type SyncEventBus struct {
|
|
*baseEventBus
|
|
}
|
|
|
|
func NewSyncEventBus() *SyncEventBus {
|
|
return &SyncEventBus{
|
|
baseEventBus: newBaseEventBus(),
|
|
}
|
|
}
|
|
|
|
func (s *SyncEventBus) Publish(ctx context.Context, e *Event) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
observers := s.observers[e.Topic]
|
|
errs := make(map[Observer]error)
|
|
for ob := range observers {
|
|
if err := ob.OnChange(ctx, e); err != nil {
|
|
errs[ob] = err
|
|
}
|
|
}
|
|
|
|
s.handleErr(ctx, errs)
|
|
}
|
|
|
|
func (s *SyncEventBus) handleErr(_ context.Context, errs map[Observer]error) {
|
|
for ob, err := range errs {
|
|
fmt.Println(ob, err)
|
|
}
|
|
}
|
|
|
|
type AsyncEventBus struct {
|
|
*baseEventBus
|
|
errC chan *observerWithErr
|
|
celFn context.CancelFunc
|
|
ctx context.Context
|
|
}
|
|
|
|
type observerWithErr struct {
|
|
ob Observer
|
|
err error
|
|
}
|
|
|
|
func NewAsyncEventBus() *AsyncEventBus {
|
|
res := &AsyncEventBus{
|
|
baseEventBus: newBaseEventBus(),
|
|
}
|
|
|
|
res.errC = make(chan *observerWithErr)
|
|
|
|
res.ctx, res.celFn = context.WithCancel(context.Background())
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-res.ctx.Done():
|
|
return
|
|
case err := <-res.errC:
|
|
fmt.Println(err.ob, err.err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
return res
|
|
}
|
|
|
|
func (a *AsyncEventBus) Stop(){
|
|
a.celFn()
|
|
}
|
|
|
|
func (a *AsyncEventBus) Publish(ctx context.Context, e *Event) {
|
|
a.mu.RLock()
|
|
defer a.mu.RUnlock()
|
|
|
|
observers := a.observers[e.Topic]
|
|
for ob := range observers {
|
|
ob := ob
|
|
go func() {
|
|
if err := ob.OnChange(ctx, e); err != nil {
|
|
select {
|
|
case <-a.ctx.Done():
|
|
case a.errC <- &observerWithErr{
|
|
ob: ob,
|
|
err: err,
|
|
}:
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
}
|