diff --git a/controller/unis/commonser.go b/controller/unis/commonser.go new file mode 100644 index 0000000..597090e --- /dev/null +++ b/controller/unis/commonser.go @@ -0,0 +1,101 @@ +package unis + +import ( + "bytes" + "dashboard/dao/sqldb" + "encoding/json" + "fmt" + "io" + "net/http" + "reflect" + "strconv" + + "github.com/gin-gonic/gin" +) + +type StationConfigParams struct { + StationConfig UnisConfig `json:"stationConfig,omitempty"` +} +type Screens struct { + ScreenIndex int `json:"screenIndex,omitempty"` + Mode int `json:"mode,omitempty"` + PhysicalX int `json:"physicalX,omitempty"` + PhysicalY int `json:"physicalY,omitempty"` + PhysicalWidth int `json:"physicalWidth,omitempty"` + PhysicalHeight int `json:"physicalHeight,omitempty"` + LogicX int `json:"logicX,omitempty"` + LogicY int `json:"logicY,omitempty"` + LogicWidth int `json:"logicWidth,omitempty"` + LogicHeight int `json:"logicHeight,omitempty"` + LogicFrameRate int `json:"logicFrameRate,omitempty"` + Channel int `json:"channel,omitempty"` + McType int `json:"mcType,omitempty"` + SupportChannel int `json:"supportChannel,omitempty"` + Primary bool `json:"primary,omitempty"` + NodeID string `json:"nodeId,omitempty"` + NodeIP string `json:"nodeIp,omitempty"` + FpgaID string `json:"fpgaId,omitempty"` + SupportHighestTiming string `json:"supportHighestTiming,omitempty"` + UnisCapability string `json:"unisCapability,omitempty"` +} +type UnisConfig struct { + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + RowCount int `json:"rowCount,omitempty"` + ColCount int `json:"colCount,omitempty"` + ScreenNum int `json:"screenNum,omitempty"` + WallWidth int `json:"wallWidth,omitempty"` + WallHeight int `json:"wallHeight,omitempty"` + ReceiveIndex int `json:"receiveIndex,omitempty"` + PrimaryIndex int `json:"primaryIndex,omitempty"` + ConfigType bool `json:"configType,omitempty"` + Screens []Screens `json:"screens,omitempty"` +} + +func StationConfig(c *gin.Context) error { + var config StationConfigParams + + bodyBytes, err := io.ReadAll(c.Request.Body) + if err != nil { + return err + } + + c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + + if err := c.ShouldBindJSON(&config); err != nil { + return err + } + + // serverId := c.GetHeader("serverId") + channelStr := c.GetHeader("channel") + channel, err := strconv.Atoi(channelStr) + if err != nil { + return err + } + + oldConf, err := sqldb.ResDataGet("stationconfig", channel) + if err != nil { + return err + } + + var oldconfig StationConfigParams + if oldConf != "" { + if err := json.Unmarshal([]byte(oldConf), &oldconfig); err != nil { + return err + } + } + + config.StationConfig.ConfigType = false + oldconfig.StationConfig.ConfigType = false + + if !reflect.DeepEqual(config, oldconfig) { + fmt.Println("config is not equal") + if err := sqldb.ResDataStore("stationconfig", string(bodyBytes), channel); err != nil { + return err + } + } + + c.String(http.StatusOK, "ok") + + return nil +} diff --git a/controller/unis/datatype.go b/controller/unis/datatype.go new file mode 100644 index 0000000..d8870ba --- /dev/null +++ b/controller/unis/datatype.go @@ -0,0 +1,5 @@ +package unis + +const ( + UNIS_HTTP_URL_CONFIG_ADD = "/api/unis/config/v1/add" +) diff --git a/dao/sqldb/base.go b/dao/sqldb/base.go index 7c1767b..120fd66 100644 --- a/dao/sqldb/base.go +++ b/dao/sqldb/base.go @@ -14,6 +14,7 @@ var ( type BaseDb interface { GetDb() *sqlx.DB CreateUserSql() string + CreateResStoreSql() string } func SqlDbInit(_log *logger.Logger, _db BaseDb) error { @@ -26,5 +27,11 @@ func SqlDbInit(_log *logger.Logger, _db BaseDb) error { return err } + _, err = db.Exec(_db.CreateResStoreSql()) + if err != nil { + log.Sugar().Errorf("Crete user table error: %v", err) + return err + } + return nil } diff --git a/dao/sqldb/mysql/tablecreate.go b/dao/sqldb/mysql/tablecreate.go index 29ae941..9ceabd0 100644 --- a/dao/sqldb/mysql/tablecreate.go +++ b/dao/sqldb/mysql/tablecreate.go @@ -15,3 +15,7 @@ func (s *Mysql) GetDb() *sqlx.DB { func (s *Mysql) CreateUserSql() string { return createUserSql } + +func (s *Mysql) CreateResStoreSql() string { + return "createResStoreSql" +} diff --git a/dao/sqldb/resstore.go b/dao/sqldb/resstore.go new file mode 100644 index 0000000..c638918 --- /dev/null +++ b/dao/sqldb/resstore.go @@ -0,0 +1,40 @@ +package sqldb + +import ( + "database/sql" + "errors" +) + +func ResDataStore(key, value string, channel int) error { + sqlStr := `INSERT OR REPLACE INTO db_unis_res_store VALUES(?,?,?)` + + res, err := db.Exec(sqlStr, key, value, channel) + if err != nil { + return err + } + + count, err := res.RowsAffected() + if err != nil { + return err + } + + if count == 0 { + return errors.New("sql insert res_store rows affected 0") + } + + return nil +} + +func ResDataGet(key string, channel int) (string, error) { + sqlStr := `SELECT value FROM db_unis_res_store WHERE key=? AND channel=?` + + var res string + if err := db.Get(&res, sqlStr, key, channel); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return "", nil + } + return "", err + } + + return res, nil +} diff --git a/dao/sqldb/sqlite/tablecreate.go b/dao/sqldb/sqlite/tablecreate.go index 4851cbe..6531e02 100644 --- a/dao/sqldb/sqlite/tablecreate.go +++ b/dao/sqldb/sqlite/tablecreate.go @@ -2,7 +2,11 @@ package sqlite import "github.com/jmoiron/sqlx" -const createUserSql = `CREATE TABLE IF NOT EXISTS db_unis_aisub (userid TEXT NOT NULL,is_sub INTEGER,CONSTRAINT db_unis_aisub_pk PRIMARY KEY (userid));` +const ( + createUserSql = `CREATE TABLE IF NOT EXISTS db_unis_aisub (userid TEXT NOT NULL,is_sub INTEGER,CONSTRAINT db_unis_aisub_pk PRIMARY KEY (userid));` + + createResStoreSql = `CREATE TABLE IF NOT EXISTS db_unis_res_store (key TEXT NOT NULL,value TEXT,channel INTEGER, CONSTRAINT db_unis_res_pk PRIMARY KEY (key,channel));` +) type Sqlite3 struct { Db *sqlx.DB @@ -15,3 +19,7 @@ func (s *Sqlite3) GetDb() *sqlx.DB { func (s *Sqlite3) CreateUserSql() string { return createUserSql } + +func (s *Sqlite3) CreateResStoreSql() string { + return createResStoreSql +} diff --git a/pkg/dadis/dadis.go b/pkg/dadis/dadis.go new file mode 100644 index 0000000..82a13a2 --- /dev/null +++ b/pkg/dadis/dadis.go @@ -0,0 +1,78 @@ +package dadis + +import ( + "context" + "dashboard/pkg/observe" + "sync" + "time" +) + +type Dadis struct { + mu sync.Mutex + store map[string]*item + observe *observe.AsyncEventBus +} + +type item struct { + value interface{} + expire int64 +} + +func NewDadis() *Dadis { + return &Dadis{ + store: make(map[string]*item), + observe: observe.NewAsyncEventBus(), + } +} + +func (d *Dadis) Set(key string, value interface{}, expire time.Duration) { + d.mu.Lock() + defer d.mu.Unlock() + + var exp int64 + if expire != 0 { + exp = time.Now().Add(expire).UnixNano() + } + + d.store[key] = &item{ + value: value, + expire: exp, + } + + e := &observe.Event{ + Key: key, + Value: value, + } + + _ = d.observe.Publish(context.Background(), e) +} + +func (d *Dadis) Get(key string) (interface{}, bool) { + d.mu.Lock() + defer d.mu.Unlock() + + if item, ok := d.store[key]; ok { + if item.expire != 0 && (time.Now().UnixNano() > item.expire) { + delete(d.store, key) + return nil, false + } + + return item.value, true + } + + return nil, false +} + +func (d *Dadis) Sub(key string, ob observe.Observer) { + d.mu.Lock() + defer d.mu.Unlock() + + d.observe.Subscribe(key, ob) +} + +func (d *Dadis) Del(key string) { + d.mu.Lock() + defer d.mu.Unlock() + + delete(d.store, key) +} diff --git a/pkg/dadis/dadis_test.go b/pkg/dadis/dadis_test.go new file mode 100644 index 0000000..1090f12 --- /dev/null +++ b/pkg/dadis/dadis_test.go @@ -0,0 +1,72 @@ +package dadis + +import ( + "context" + "dashboard/pkg/observe" + "fmt" + "testing" + "time" +) + +type baseObserver struct { + name string + conn chan interface{} +} + +func newBaseObserver(name string) *baseObserver { + res := &baseObserver{ + name: name, + conn: make(chan interface{}), + } + + go res.getMsg() + + return res +} + +func (b *baseObserver) getMsg() { + for res := range b.conn { + fmt.Println(b.name, res) + } +} + +func (b *baseObserver) OnChange(_ context.Context, e *observe.Event) error { + b.conn <- e + + return nil +} + +func Test_Dadis(t *testing.T) { + obA := newBaseObserver("A") + obB := newBaseObserver("B") + obC := newBaseObserver("C") + obD := newBaseObserver("D") + + topic := "async" + dadis := NewDadis() + dadis.Sub(topic, obA) + dadis.Sub(topic, obB) + dadis.Sub(topic, obC) + dadis.Sub(topic, obD) + + go func() { + for i := range 10 { + i := i + go func() { + key := fmt.Sprintf("%s,%d", "nohao", i) + dadis.Set(topic, key, 0) + }() + + } + }() + + go func() { + for range 10 { + go func() { + fmt.Println(dadis.Get(topic)) + }() + } + }() + + <-time.After(time.Second * 1) +} diff --git a/pkg/observe/observe.go b/pkg/observe/observe.go index d423aae..db5f80b 100644 --- a/pkg/observe/observe.go +++ b/pkg/observe/observe.go @@ -2,12 +2,13 @@ package observe import ( "context" + "errors" "fmt" "sync" ) type Event struct { - Topic string + Key string Value interface{} } @@ -18,35 +19,36 @@ type Observer interface { type EventBus interface { Subscribe(string, Observer) Unsubscribe(string, Observer) - Publish(context.Context, *Event) + Publish(context.Context, *Event) error } type baseEventBus struct { - mu sync.RWMutex - observers map[string]map[Observer]struct{} + mu sync.RWMutex + sub map[string]map[Observer]struct{} } func newBaseEventBus() *baseEventBus { return &baseEventBus{ - observers: make(map[string]map[Observer]struct{}), + sub: make(map[string]map[Observer]struct{}), } } func (b *baseEventBus) Subscribe(key string, ob Observer) { b.mu.Lock() defer b.mu.Unlock() - if _, ok := b.observers[key]; !ok { - b.observers[key] = make(map[Observer]struct{}) + + if _, ok := b.sub[key]; !ok { + b.sub[key] = make(map[Observer]struct{}) } - b.observers[key][ob] = struct{}{} + b.sub[key][ob] = struct{}{} } func (b *baseEventBus) Unsubscribe(key string, ob Observer) { b.mu.Lock() defer b.mu.Unlock() - delete(b.observers[key], ob) + delete(b.sub[key], ob) } type SyncEventBus struct { @@ -59,83 +61,102 @@ func NewSyncEventBus() *SyncEventBus { } } -func (s *SyncEventBus) Publish(ctx context.Context, e *Event) { +func (s *SyncEventBus) Publish(ctx context.Context, e *Event) error { s.mu.RLock() defer s.mu.RUnlock() - observers := s.observers[e.Topic] + subs, ok := s.baseEventBus.sub[e.Key] + if !ok { + return errors.New("no subscriber") + } + errs := make(map[Observer]error) - for ob := range observers { - if err := ob.OnChange(ctx, e); err != nil { - errs[ob] = err + for sub := range subs { + if err := sub.OnChange(ctx, e); err != nil { + errs[sub] = err } } - s.handleErr(ctx, errs) + s.errHandler(errs) + + return nil } -func (s *SyncEventBus) handleErr(_ context.Context, errs map[Observer]error) { +func (s *SyncEventBus) errHandler(errs map[Observer]error) { for ob, err := range errs { fmt.Println(ob, err) } } -type AsyncEventBus struct { - *baseEventBus - errC chan *observerWithErr - celFn context.CancelFunc - ctx context.Context -} - -type observerWithErr struct { +type observeWithErr struct { ob Observer err error } +type AsyncEventBus struct { + *baseEventBus + errC chan *observeWithErr + ctx context.Context + calFn context.CancelFunc +} + func NewAsyncEventBus() *AsyncEventBus { res := &AsyncEventBus{ baseEventBus: newBaseEventBus(), + errC: make(chan *observeWithErr), } - res.errC = make(chan *observerWithErr) + res.ctx, res.calFn = context.WithCancel(context.Background()) - res.ctx, res.celFn = context.WithCancel(context.Background()) - - go func() { - for { - select { - case <-res.ctx.Done(): - return - case err := <-res.errC: - fmt.Println(err.ob, err.err) - } - } - }() + go res.errHandler() return res } -func (a *AsyncEventBus) Stop(){ - a.celFn() +func (a *AsyncEventBus) errHandler() { + for { + select { + case <-a.ctx.Done(): + return + case err := <-a.errC: + fmt.Println(err.ob, err.err) + } + } } -func (a *AsyncEventBus) Publish(ctx context.Context, e *Event) { +func (a *AsyncEventBus) Publish(ctx context.Context, e *Event) error { a.mu.RLock() defer a.mu.RUnlock() - observers := a.observers[e.Topic] - for ob := range observers { - ob := ob + subs, ok := a.baseEventBus.sub[e.Key] + if !ok { + return errors.New("no subscriber") + } + + for sub := range subs { + sub := sub + + select { + case <-a.ctx.Done(): + return a.ctx.Err() + default: + } + go func() { - if err := ob.OnChange(ctx, e); err != nil { - select { - case <-a.ctx.Done(): - case a.errC <- &observerWithErr{ - ob: ob, + if err := sub.OnChange(ctx, e); err != nil { + a.errC <- &observeWithErr{ + ob: sub, err: err, - }: } } }() } + + return nil +} + +func (a *AsyncEventBus) Stop() { + if a.calFn != nil { + a.calFn() + } } diff --git a/pkg/observe/observe_test.go b/pkg/observe/observe_test.go index 18fbf20..a71ac04 100644 --- a/pkg/observe/observe_test.go +++ b/pkg/observe/observe_test.go @@ -19,7 +19,7 @@ func newBaseObserver(name string) *baseObserver { } func (b *baseObserver) OnChange(ctx context.Context, e *Event) error { - fmt.Println(b.name, e.Topic, e.Value) + fmt.Println(b.name, e.Key, e.Value) return errors.New("jjjjj") } @@ -38,7 +38,7 @@ func Test_syncEventBus(t *testing.T) { sBus.Subscribe(topic, obD) sBus.Publish(context.Background(), &Event{ - Topic: topic, + Key: topic, Value: "hello redhat", }) } @@ -59,7 +59,7 @@ func Test_asyncEventBus(t *testing.T) { sBus.Subscribe(topic, obD) sBus.Publish(context.Background(), &Event{ - Topic: topic, + Key: topic, Value: "hello redhat", }) diff --git a/routes/routes.go b/routes/routes.go index aad9b93..b58a655 100644 --- a/routes/routes.go +++ b/routes/routes.go @@ -2,6 +2,7 @@ package routes import ( "dashboard/controller" + "dashboard/controller/unis" "dashboard/logger" "dashboard/pkg/jwt" "dashboard/settings" @@ -36,6 +37,8 @@ func Setup(log *logger.Logger, rate settings.RateLimitConfig, jwtC settings.JwtC r.POST("/api/login", errWapper(controller.UserSignInHandler(cjwt))) r.POST("/api/logout", controller.UserLogOutHandler) + r.POST("/api/unis/config/v1/add",errWapper(unis.StationConfig)) + g1 := r.Group("/api") g1.Use(errWapper(GinJwtAuthor(cjwt))) {