From 7a74103aba809712440e242f76c84e2d63e1a776 Mon Sep 17 00:00:00 2001 From: redhat <2292650292@qq.com> Date: Fri, 23 May 2025 19:06:43 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E5=A2=9E=E5=8A=A0rest=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E8=BF=9B=E8=A1=8C=E5=AD=98=E5=82=A8=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E3=80=822=E3=80=81=E5=A2=9E=E5=8A=A0rpc=E8=AF=B7=E6=B1=82?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.yaml | 7 +- controller/unis/commonser.go | 61 ++++-------------- dao/sqldb/sqlite/tablecreate.go | 15 ++++- dao/sqldb/{resstore.go => unisresfulstore.go} | 12 ++-- main.go | 8 ++- models/unisparams.go | 40 ++++++++++++ models/unisrpcparams.go | 23 +++++++ pkg/rpcsup/jsonrpc.go | 44 +++++++++++++ routes/middleware.go | 64 +++++++++++++++++++ routes/routes.go | 8 ++- services/unis/common.go | 46 +++++++++++++ services/unis/rpcclient.go | 17 +++++ services/unis/rpcservice.go | 35 ++++++++++ settings/type.go | 7 ++ 14 files changed, 329 insertions(+), 58 deletions(-) rename dao/sqldb/{resstore.go => unisresfulstore.go} (52%) create mode 100644 models/unisparams.go create mode 100644 models/unisrpcparams.go create mode 100644 pkg/rpcsup/jsonrpc.go create mode 100644 services/unis/common.go create mode 100644 services/unis/rpcclient.go create mode 100644 services/unis/rpcservice.go diff --git a/config/config.yaml b/config/config.yaml index aea681e..3d6ce70 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -14,7 +14,7 @@ log: database: # sqlite只需要指定db_name即可 type: "sqlite" - db_name: "./data/sqlite/avcnet.db" # 如果是sqlite需要指定完整路径和文件名称 + db_name: "/tmp/data/sqlite/avcnet.db" # 如果是sqlite需要指定完整路径和文件名称 # 其他远程数据库需要指定下面的内容 user: "root" password: "password" @@ -22,6 +22,11 @@ database: port: 3306 max_conns: 200 +rpc: + type: "json" + host: "" + port: 5501 + # 使用令牌桶限流 rate: fill_interval: 10 # 填充速率 每(ms)填充一个 diff --git a/controller/unis/commonser.go b/controller/unis/commonser.go index 597090e..33d2cd5 100644 --- a/controller/unis/commonser.go +++ b/controller/unis/commonser.go @@ -3,6 +3,8 @@ package unis import ( "bytes" "dashboard/dao/sqldb" + "dashboard/models" + "dashboard/services/unis" "encoding/json" "fmt" "io" @@ -13,48 +15,7 @@ import ( "github.com/gin-gonic/gin" ) -type StationConfigParams struct { - StationConfig UnisConfig `json:"stationConfig,omitempty"` -} -type Screens struct { - ScreenIndex int `json:"screenIndex,omitempty"` - Mode int `json:"mode,omitempty"` - PhysicalX int `json:"physicalX,omitempty"` - PhysicalY int `json:"physicalY,omitempty"` - PhysicalWidth int `json:"physicalWidth,omitempty"` - PhysicalHeight int `json:"physicalHeight,omitempty"` - LogicX int `json:"logicX,omitempty"` - LogicY int `json:"logicY,omitempty"` - LogicWidth int `json:"logicWidth,omitempty"` - LogicHeight int `json:"logicHeight,omitempty"` - LogicFrameRate int `json:"logicFrameRate,omitempty"` - Channel int `json:"channel,omitempty"` - McType int `json:"mcType,omitempty"` - SupportChannel int `json:"supportChannel,omitempty"` - Primary bool `json:"primary,omitempty"` - NodeID string `json:"nodeId,omitempty"` - NodeIP string `json:"nodeIp,omitempty"` - FpgaID string `json:"fpgaId,omitempty"` - SupportHighestTiming string `json:"supportHighestTiming,omitempty"` - UnisCapability string `json:"unisCapability,omitempty"` -} -type UnisConfig struct { - ID string `json:"id,omitempty"` - Name string `json:"name,omitempty"` - RowCount int `json:"rowCount,omitempty"` - ColCount int `json:"colCount,omitempty"` - ScreenNum int `json:"screenNum,omitempty"` - WallWidth int `json:"wallWidth,omitempty"` - WallHeight int `json:"wallHeight,omitempty"` - ReceiveIndex int `json:"receiveIndex,omitempty"` - PrimaryIndex int `json:"primaryIndex,omitempty"` - ConfigType bool `json:"configType,omitempty"` - Screens []Screens `json:"screens,omitempty"` -} - func StationConfig(c *gin.Context) error { - var config StationConfigParams - bodyBytes, err := io.ReadAll(c.Request.Body) if err != nil { return err @@ -62,6 +23,7 @@ func StationConfig(c *gin.Context) error { c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + var config models.StationConfigParams if err := c.ShouldBindJSON(&config); err != nil { return err } @@ -73,29 +35,34 @@ func StationConfig(c *gin.Context) error { return err } - oldConf, err := sqldb.ResDataGet("stationconfig", channel) + fmt.Println("url", c.Request.URL.String()) + + oldConf, err := sqldb.ResFulDataGet(c.Request.Method, c.Request.URL.String(), channel) if err != nil { return err } - var oldconfig StationConfigParams + var oldconfig models.StationConfigParams if oldConf != "" { if err := json.Unmarshal([]byte(oldConf), &oldconfig); err != nil { return err } } - config.StationConfig.ConfigType = false - oldconfig.StationConfig.ConfigType = false + oldconfig.StationConfig.ConfigType = config.StationConfig.ConfigType if !reflect.DeepEqual(config, oldconfig) { fmt.Println("config is not equal") - if err := sqldb.ResDataStore("stationconfig", string(bodyBytes), channel); err != nil { + if err := sqldb.ResFulDataStore(c.Request.Method, c.Request.URL.RawPath, string(bodyBytes), channel); err != nil { return err } } - c.String(http.StatusOK, "ok") + if err:=unis.StationConfig(&config);err!=nil{ + return err + } + + c.JSON(http.StatusOK, "ok") return nil } diff --git a/dao/sqldb/sqlite/tablecreate.go b/dao/sqldb/sqlite/tablecreate.go index 6531e02..d44b859 100644 --- a/dao/sqldb/sqlite/tablecreate.go +++ b/dao/sqldb/sqlite/tablecreate.go @@ -5,7 +5,20 @@ import "github.com/jmoiron/sqlx" const ( createUserSql = `CREATE TABLE IF NOT EXISTS db_unis_aisub (userid TEXT NOT NULL,is_sub INTEGER,CONSTRAINT db_unis_aisub_pk PRIMARY KEY (userid));` - createResStoreSql = `CREATE TABLE IF NOT EXISTS db_unis_res_store (key TEXT NOT NULL,value TEXT,channel INTEGER, CONSTRAINT db_unis_res_pk PRIMARY KEY (key,channel));` + createResStoreSql = `CREATE TABLE IF NOT EXISTS db_unis_res_store ( + key TEXT NOT NULL, + value TEXT, + channel INTEGER, + methord TEXT, + created_at TEXT NOT NULL DEFAULT (DATETIME('now', 'localtime')), + updated_at TEXT NOT NULL DEFAULT (DATETIME('now', 'localtime')), + CONSTRAINT db_unis_res_pk PRIMARY KEY (key,channel,methord) + ); + + CREATE TRIGGER IF NOT EXISTS trigger_db_unis_res_store_updated_at AFTER UPDATE ON db_unis_res_store + BEGIN + UPDATE db_unis_res_store SET updated_at = DATETIME('now', 'localtime') WHERE rowid == NEW.rowid; + END;` ) type Sqlite3 struct { diff --git a/dao/sqldb/resstore.go b/dao/sqldb/unisresfulstore.go similarity index 52% rename from dao/sqldb/resstore.go rename to dao/sqldb/unisresfulstore.go index c638918..de38d35 100644 --- a/dao/sqldb/resstore.go +++ b/dao/sqldb/unisresfulstore.go @@ -5,10 +5,10 @@ import ( "errors" ) -func ResDataStore(key, value string, channel int) error { - sqlStr := `INSERT OR REPLACE INTO db_unis_res_store VALUES(?,?,?)` +func ResFulDataStore(methord, key, value string, channel int) error { + sqlStr := `INSERT OR REPLACE INTO db_unis_res_store(key,value,channel,methord) VALUES(?,?,?,?)` - res, err := db.Exec(sqlStr, key, value, channel) + res, err := db.Exec(sqlStr, key, value, channel,methord) if err != nil { return err } @@ -25,11 +25,11 @@ func ResDataStore(key, value string, channel int) error { return nil } -func ResDataGet(key string, channel int) (string, error) { - sqlStr := `SELECT value FROM db_unis_res_store WHERE key=? AND channel=?` +func ResFulDataGet(methord, key string, channel int) (string, error) { + sqlStr := `SELECT value FROM db_unis_res_store WHERE key=? AND channel=? AND methord=?` var res string - if err := db.Get(&res, sqlStr, key, channel); err != nil { + if err := db.Get(&res, sqlStr, key, channel, methord); err != nil { if errors.Is(err, sql.ErrNoRows) { return "", nil } diff --git a/main.go b/main.go index 36da11a..b5a087f 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "dashboard/dao/sqldb" "dashboard/logger" "dashboard/routes" + "dashboard/services/unis" "dashboard/settings" "errors" "flag" @@ -48,7 +49,12 @@ func main() { log.Sugar().Panicf("New db error: %v", err) } - sqldb.SqlDbInit(log, db) + err = sqldb.SqlDbInit(log, db) + if err != nil { + log.Sugar().Panicf("New db error: %v", err) + } + + err = unis.RpcListenAndServe(log, sets.RpcConfig) if err != nil { log.Sugar().Panicf("New db error: %v", err) } diff --git a/models/unisparams.go b/models/unisparams.go new file mode 100644 index 0000000..8d11445 --- /dev/null +++ b/models/unisparams.go @@ -0,0 +1,40 @@ +package models + +type StationConfigParams struct { + StationConfig UnisConfig `json:"stationConfig,omitempty"` +} +type Screens struct { + ScreenIndex int `json:"screenIndex,omitempty"` + Mode int `json:"mode,omitempty"` + PhysicalX int `json:"physicalX,omitempty"` + PhysicalY int `json:"physicalY,omitempty"` + PhysicalWidth int `json:"physicalWidth,omitempty"` + PhysicalHeight int `json:"physicalHeight,omitempty"` + LogicX int `json:"logicX,omitempty"` + LogicY int `json:"logicY,omitempty"` + LogicWidth int `json:"logicWidth,omitempty"` + LogicHeight int `json:"logicHeight,omitempty"` + LogicFrameRate int `json:"logicFrameRate,omitempty"` + Channel int `json:"channel,omitempty"` + McType int `json:"mcType,omitempty"` + SupportChannel int `json:"supportChannel,omitempty"` + Primary bool `json:"primary,omitempty"` + NodeID string `json:"nodeId,omitempty"` + NodeIP string `json:"nodeIp,omitempty"` + FpgaID string `json:"fpgaId,omitempty"` + SupportHighestTiming string `json:"supportHighestTiming,omitempty"` + UnisCapability string `json:"unisCapability,omitempty"` +} +type UnisConfig struct { + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + RowCount int `json:"rowCount,omitempty"` + ColCount int `json:"colCount,omitempty"` + ScreenNum int `json:"screenNum,omitempty"` + WallWidth int `json:"wallWidth,omitempty"` + WallHeight int `json:"wallHeight,omitempty"` + ReceiveIndex int `json:"receiveIndex,omitempty"` + PrimaryIndex int `json:"primaryIndex,omitempty"` + ConfigType bool `json:"configType,omitempty"` + Screens []Screens `json:"screens,omitempty"` +} \ No newline at end of file diff --git a/models/unisrpcparams.go b/models/unisrpcparams.go new file mode 100644 index 0000000..283e34f --- /dev/null +++ b/models/unisrpcparams.go @@ -0,0 +1,23 @@ +package models + +import "fmt" + +const rpcNamePrefix = "UnisRpcService" + +type UnisRpcMethod string + +func (m UnisRpcMethod)String()string{ + return fmt.Sprintf("%s.%s",rpcNamePrefix,string(m)) +} + +const( + UnisStationConfig UnisRpcMethod = "Config" +) + +type UnisRpcRequest struct{ + Id string +} + +type UnisRpcResponse struct{ + Id string +} \ No newline at end of file diff --git a/pkg/rpcsup/jsonrpc.go b/pkg/rpcsup/jsonrpc.go new file mode 100644 index 0000000..3764f08 --- /dev/null +++ b/pkg/rpcsup/jsonrpc.go @@ -0,0 +1,44 @@ +package rpcsup + +import ( + "dashboard/logger" + "net" + "net/rpc" + "net/rpc/jsonrpc" +) + +func JsonServer(log *logger.Logger, address string, service interface{}) error { + if err := rpc.Register(service); err != nil { + log.Sugar().Error(err) + return err + } + + listen, err := net.Listen("tcp", address) + if err != nil { + log.Sugar().Error(err) + return err + } + + log.Sugar().Infof("Rpc server listen on: %s", address) + + for { + accept, err := listen.Accept() + if err != nil { + log.Sugar().Error(err) + continue + } + + jsonrpc.ServeConn(accept) + } +} + +func JsonClient(log *logger.Logger, address string) (*rpc.Client, error) { + dial, err := net.Dial("tcp", address) + if err != nil { + log.Sugar().Error(err) + + return nil, err + } + + return jsonrpc.NewClient(dial), nil +} diff --git a/routes/middleware.go b/routes/middleware.go index c0f76fe..53a4299 100644 --- a/routes/middleware.go +++ b/routes/middleware.go @@ -1,16 +1,21 @@ package routes import ( + "bytes" + "dashboard/dao/sqldb" "dashboard/logger" "dashboard/models" "dashboard/pkg/jwt" "dashboard/pkg/rate" "dashboard/settings" + "dashboard/utils" + "io" "net" "net/http" "net/http/httputil" "os" "runtime/debug" + "strconv" "strings" "time" @@ -155,3 +160,62 @@ func GinRateLimit(rateC settings.RateLimitConfig) appHandler { return nil } } + +func GinCheckServerId(c *gin.Context) error { + serverId := c.GetHeader("serverId") + + if serverId != serverId { + c.Abort() + return &models.BaseError{ + Code: http.StatusServiceUnavailable, + Msg: "Server Id error", + } + } + + c.Next() + + return nil +} + +func GinStoreRequest(c *gin.Context) (err error) { + defer func() { + if err != nil { + c.Abort() + return + } + + c.Next() + }() + + log, _ := utils.GetLogFromContext(c) + + channel, _err := strconv.Atoi(c.GetHeader("channel")) + if _err != nil { + err = &models.BaseError{ + Code: http.StatusServiceUnavailable, + Msg: "Channel Id error", + } + return + } + + oldConf, _err := sqldb.ResFulDataGet(c.Request.Method, c.Request.URL.String(), channel) + if _err != nil { + err = _err + } + + bodyBytes, _err := io.ReadAll(c.Request.Body) + if _err != nil { + err = _err + } + + c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + + if oldConf != string(bodyBytes) { + log.Sugar().Info("The resful request is not equel local store") + if _err := sqldb.ResFulDataStore(c.Request.Method, c.Request.URL.String(), string(bodyBytes), channel); err != nil { + err = _err + } + } + + return nil +} diff --git a/routes/routes.go b/routes/routes.go index b58a655..4db3632 100644 --- a/routes/routes.go +++ b/routes/routes.go @@ -37,8 +37,6 @@ func Setup(log *logger.Logger, rate settings.RateLimitConfig, jwtC settings.JwtC r.POST("/api/login", errWapper(controller.UserSignInHandler(cjwt))) r.POST("/api/logout", controller.UserLogOutHandler) - r.POST("/api/unis/config/v1/add",errWapper(unis.StationConfig)) - g1 := r.Group("/api") g1.Use(errWapper(GinJwtAuthor(cjwt))) { @@ -50,5 +48,11 @@ func Setup(log *logger.Logger, rate settings.RateLimitConfig, jwtC settings.JwtC r.GET("/ws/terminal", errWapper(GinJwtAuthor(cjwt)), errWapper(controller.TerminalHandle())) + unisr := r.Group("/api/unis") + unisr.Use(errWapper(GinCheckServerId), errWapper(GinStoreRequest)) + { + unisr.POST("/config/v1/add", errWapper(unis.StationConfig)) + } + return r } diff --git a/services/unis/common.go b/services/unis/common.go new file mode 100644 index 0000000..dc4be18 --- /dev/null +++ b/services/unis/common.go @@ -0,0 +1,46 @@ +package unis + +import ( + "dashboard/models" + "dashboard/pkg/rpcsup" + "fmt" + "net/rpc" +) + +type rpcClients map[string]*rpc.Client + +var rpcMapClients rpcClients + +func StationConfig(config *models.StationConfigParams) error { + CreateRpcClients(&rpcMapClients, &config.StationConfig.Screens) + + fmt.Println(GetConfig(models.UnisRpcRequest{Id: "zhangshuo"})) + + return nil +} + +func CreateRpcClients(rawClients *rpcClients, screens *[]models.Screens) { + if *rawClients == nil { + *rawClients = make(map[string]*rpc.Client) + } + + for _, cli := range *rawClients { + if cli != nil { + cli.Close() + } + } + + for _, sc := range *screens { + id := sc.NodeID + address := fmt.Sprintf("%s:%d", sc.NodeIP, 5501) + + cli, err := rpcsup.JsonClient(log, address) + if err != nil { + log.Sugar().Errorf("%s create rpc client error: %s", id, address) + continue + } + (*rawClients)[id] = cli + } + + return +} diff --git a/services/unis/rpcclient.go b/services/unis/rpcclient.go new file mode 100644 index 0000000..7166ed5 --- /dev/null +++ b/services/unis/rpcclient.go @@ -0,0 +1,17 @@ +package unis + +import ( + "dashboard/models" + "fmt" +) + +func GetConfig(req models.UnisRpcRequest) (*models.UnisRpcResponse, error) { + out := &models.UnisRpcResponse{} + + for key, value := range rpcMapClients { + fmt.Println(key) + value.Call(models.UnisStationConfig.String(), req, out) + } + + return out, nil +} diff --git a/services/unis/rpcservice.go b/services/unis/rpcservice.go new file mode 100644 index 0000000..df2a3f0 --- /dev/null +++ b/services/unis/rpcservice.go @@ -0,0 +1,35 @@ +package unis + +import ( + "dashboard/logger" + "dashboard/models" + "dashboard/pkg/rpcsup" + "dashboard/settings" + "fmt" +) + +var log *logger.Logger + +type UnisRpcService struct { +} + +func NewRpcService() *UnisRpcService { + return &UnisRpcService{} +} + +func RpcListenAndServe(_log *logger.Logger, config *settings.RpcConfig) error { + log = _log + addrees := fmt.Sprintf("%s:%d", config.Host, config.Port) + + go rpcsup.JsonServer(log, addrees, NewRpcService()) + + return nil +} + +func (u *UnisRpcService) Config(res models.UnisRpcRequest, rsp *models.UnisRpcResponse) error { + fmt.Println("rpc server get mesage",res) + + rsp.Id = res.Id + + return nil +} diff --git a/settings/type.go b/settings/type.go index 739056f..f95f466 100644 --- a/settings/type.go +++ b/settings/type.go @@ -7,6 +7,7 @@ type AppConfig struct { *RateLimitConfig `mapstructure:"rate"` *JwtConfig `mapstructure:"jwt"` *SqlConfig `mapstructure:"database"` + *RpcConfig `mapstructure:"rpc"` } type BaseConfig struct { @@ -34,6 +35,12 @@ type SqlConfig struct { Type string `mapstructure:"type"` } +type RpcConfig struct { + Port int `mapstructure:"port"` + Host string `mapstructure:"host"` + Type string `mapstructure:"type"` +} + type SnowflakeConfig struct { StartTime string `mapstructure:"start_time"` MachineId int64 `mapstructure:"machine_id"`