163 lines
2.5 KiB
Go
163 lines
2.5 KiB
Go
package observe
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
)
|
|
|
|
type Event struct {
|
|
Key string
|
|
Value interface{}
|
|
}
|
|
|
|
type Observer interface {
|
|
OnChange(context.Context, *Event) error
|
|
}
|
|
|
|
type EventBus interface {
|
|
Subscribe(string, Observer)
|
|
Unsubscribe(string, Observer)
|
|
Publish(context.Context, *Event) error
|
|
}
|
|
|
|
type baseEventBus struct {
|
|
mu sync.RWMutex
|
|
sub map[string]map[Observer]struct{}
|
|
}
|
|
|
|
func newBaseEventBus() *baseEventBus {
|
|
return &baseEventBus{
|
|
sub: make(map[string]map[Observer]struct{}),
|
|
}
|
|
}
|
|
|
|
func (b *baseEventBus) Subscribe(key string, ob Observer) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
if _, ok := b.sub[key]; !ok {
|
|
b.sub[key] = make(map[Observer]struct{})
|
|
}
|
|
|
|
b.sub[key][ob] = struct{}{}
|
|
}
|
|
|
|
func (b *baseEventBus) Unsubscribe(key string, ob Observer) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
delete(b.sub[key], ob)
|
|
}
|
|
|
|
type SyncEventBus struct {
|
|
*baseEventBus
|
|
}
|
|
|
|
func NewSyncEventBus() *SyncEventBus {
|
|
return &SyncEventBus{
|
|
baseEventBus: newBaseEventBus(),
|
|
}
|
|
}
|
|
|
|
func (s *SyncEventBus) Publish(ctx context.Context, e *Event) error {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
subs, ok := s.baseEventBus.sub[e.Key]
|
|
if !ok {
|
|
return errors.New("no subscriber")
|
|
}
|
|
|
|
errs := make(map[Observer]error)
|
|
for sub := range subs {
|
|
if err := sub.OnChange(ctx, e); err != nil {
|
|
errs[sub] = err
|
|
}
|
|
}
|
|
|
|
s.errHandler(errs)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *SyncEventBus) errHandler(errs map[Observer]error) {
|
|
for ob, err := range errs {
|
|
fmt.Println(ob, err)
|
|
}
|
|
}
|
|
|
|
type observeWithErr struct {
|
|
ob Observer
|
|
err error
|
|
}
|
|
|
|
type AsyncEventBus struct {
|
|
*baseEventBus
|
|
errC chan *observeWithErr
|
|
ctx context.Context
|
|
calFn context.CancelFunc
|
|
}
|
|
|
|
func NewAsyncEventBus() *AsyncEventBus {
|
|
res := &AsyncEventBus{
|
|
baseEventBus: newBaseEventBus(),
|
|
errC: make(chan *observeWithErr),
|
|
}
|
|
|
|
res.ctx, res.calFn = context.WithCancel(context.Background())
|
|
|
|
go res.errHandler()
|
|
|
|
return res
|
|
}
|
|
|
|
func (a *AsyncEventBus) errHandler() {
|
|
for {
|
|
select {
|
|
case <-a.ctx.Done():
|
|
return
|
|
case err := <-a.errC:
|
|
fmt.Println(err.ob, err.err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *AsyncEventBus) Publish(ctx context.Context, e *Event) error {
|
|
a.mu.RLock()
|
|
defer a.mu.RUnlock()
|
|
|
|
subs, ok := a.baseEventBus.sub[e.Key]
|
|
if !ok {
|
|
return errors.New("no subscriber")
|
|
}
|
|
|
|
for sub := range subs {
|
|
sub := sub
|
|
|
|
select {
|
|
case <-a.ctx.Done():
|
|
return a.ctx.Err()
|
|
default:
|
|
}
|
|
|
|
go func() {
|
|
if err := sub.OnChange(ctx, e); err != nil {
|
|
a.errC <- &observeWithErr{
|
|
ob: sub,
|
|
err: err,
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *AsyncEventBus) Stop() {
|
|
if a.calFn != nil {
|
|
a.calFn()
|
|
}
|
|
}
|