1、增加rest接口进行存储功能。2、增加rpc请求。

This commit is contained in:
redhat 2025-05-23 19:06:43 +08:00
parent 71d19ac426
commit 7a74103aba
14 changed files with 329 additions and 58 deletions

View File

@ -14,7 +14,7 @@ log:
database:
# sqlite只需要指定db_name即可
type: "sqlite"
db_name: "./data/sqlite/avcnet.db" # 如果是sqlite需要指定完整路径和文件名称
db_name: "/tmp/data/sqlite/avcnet.db" # 如果是sqlite需要指定完整路径和文件名称
# 其他远程数据库需要指定下面的内容
user: "root"
password: "password"
@ -22,6 +22,11 @@ database:
port: 3306
max_conns: 200
rpc:
type: "json"
host: ""
port: 5501
# 使用令牌桶限流
rate:
fill_interval: 10 # 填充速率 每(ms)填充一个

View File

@ -3,6 +3,8 @@ package unis
import (
"bytes"
"dashboard/dao/sqldb"
"dashboard/models"
"dashboard/services/unis"
"encoding/json"
"fmt"
"io"
@ -13,48 +15,7 @@ import (
"github.com/gin-gonic/gin"
)
type StationConfigParams struct {
StationConfig UnisConfig `json:"stationConfig,omitempty"`
}
type Screens struct {
ScreenIndex int `json:"screenIndex,omitempty"`
Mode int `json:"mode,omitempty"`
PhysicalX int `json:"physicalX,omitempty"`
PhysicalY int `json:"physicalY,omitempty"`
PhysicalWidth int `json:"physicalWidth,omitempty"`
PhysicalHeight int `json:"physicalHeight,omitempty"`
LogicX int `json:"logicX,omitempty"`
LogicY int `json:"logicY,omitempty"`
LogicWidth int `json:"logicWidth,omitempty"`
LogicHeight int `json:"logicHeight,omitempty"`
LogicFrameRate int `json:"logicFrameRate,omitempty"`
Channel int `json:"channel,omitempty"`
McType int `json:"mcType,omitempty"`
SupportChannel int `json:"supportChannel,omitempty"`
Primary bool `json:"primary,omitempty"`
NodeID string `json:"nodeId,omitempty"`
NodeIP string `json:"nodeIp,omitempty"`
FpgaID string `json:"fpgaId,omitempty"`
SupportHighestTiming string `json:"supportHighestTiming,omitempty"`
UnisCapability string `json:"unisCapability,omitempty"`
}
type UnisConfig struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
RowCount int `json:"rowCount,omitempty"`
ColCount int `json:"colCount,omitempty"`
ScreenNum int `json:"screenNum,omitempty"`
WallWidth int `json:"wallWidth,omitempty"`
WallHeight int `json:"wallHeight,omitempty"`
ReceiveIndex int `json:"receiveIndex,omitempty"`
PrimaryIndex int `json:"primaryIndex,omitempty"`
ConfigType bool `json:"configType,omitempty"`
Screens []Screens `json:"screens,omitempty"`
}
func StationConfig(c *gin.Context) error {
var config StationConfigParams
bodyBytes, err := io.ReadAll(c.Request.Body)
if err != nil {
return err
@ -62,6 +23,7 @@ func StationConfig(c *gin.Context) error {
c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
var config models.StationConfigParams
if err := c.ShouldBindJSON(&config); err != nil {
return err
}
@ -73,29 +35,34 @@ func StationConfig(c *gin.Context) error {
return err
}
oldConf, err := sqldb.ResDataGet("stationconfig", channel)
fmt.Println("url", c.Request.URL.String())
oldConf, err := sqldb.ResFulDataGet(c.Request.Method, c.Request.URL.String(), channel)
if err != nil {
return err
}
var oldconfig StationConfigParams
var oldconfig models.StationConfigParams
if oldConf != "" {
if err := json.Unmarshal([]byte(oldConf), &oldconfig); err != nil {
return err
}
}
config.StationConfig.ConfigType = false
oldconfig.StationConfig.ConfigType = false
oldconfig.StationConfig.ConfigType = config.StationConfig.ConfigType
if !reflect.DeepEqual(config, oldconfig) {
fmt.Println("config is not equal")
if err := sqldb.ResDataStore("stationconfig", string(bodyBytes), channel); err != nil {
if err := sqldb.ResFulDataStore(c.Request.Method, c.Request.URL.RawPath, string(bodyBytes), channel); err != nil {
return err
}
}
c.String(http.StatusOK, "ok")
if err:=unis.StationConfig(&config);err!=nil{
return err
}
c.JSON(http.StatusOK, "ok")
return nil
}

View File

@ -5,7 +5,20 @@ import "github.com/jmoiron/sqlx"
const (
createUserSql = `CREATE TABLE IF NOT EXISTS db_unis_aisub (userid TEXT NOT NULL,is_sub INTEGER,CONSTRAINT db_unis_aisub_pk PRIMARY KEY (userid));`
createResStoreSql = `CREATE TABLE IF NOT EXISTS db_unis_res_store (key TEXT NOT NULL,value TEXT,channel INTEGER, CONSTRAINT db_unis_res_pk PRIMARY KEY (key,channel));`
createResStoreSql = `CREATE TABLE IF NOT EXISTS db_unis_res_store (
key TEXT NOT NULL,
value TEXT,
channel INTEGER,
methord TEXT,
created_at TEXT NOT NULL DEFAULT (DATETIME('now', 'localtime')),
updated_at TEXT NOT NULL DEFAULT (DATETIME('now', 'localtime')),
CONSTRAINT db_unis_res_pk PRIMARY KEY (key,channel,methord)
);
CREATE TRIGGER IF NOT EXISTS trigger_db_unis_res_store_updated_at AFTER UPDATE ON db_unis_res_store
BEGIN
UPDATE db_unis_res_store SET updated_at = DATETIME('now', 'localtime') WHERE rowid == NEW.rowid;
END;`
)
type Sqlite3 struct {

View File

@ -5,10 +5,10 @@ import (
"errors"
)
func ResDataStore(key, value string, channel int) error {
sqlStr := `INSERT OR REPLACE INTO db_unis_res_store VALUES(?,?,?)`
func ResFulDataStore(methord, key, value string, channel int) error {
sqlStr := `INSERT OR REPLACE INTO db_unis_res_store(key,value,channel,methord) VALUES(?,?,?,?)`
res, err := db.Exec(sqlStr, key, value, channel)
res, err := db.Exec(sqlStr, key, value, channel,methord)
if err != nil {
return err
}
@ -25,11 +25,11 @@ func ResDataStore(key, value string, channel int) error {
return nil
}
func ResDataGet(key string, channel int) (string, error) {
sqlStr := `SELECT value FROM db_unis_res_store WHERE key=? AND channel=?`
func ResFulDataGet(methord, key string, channel int) (string, error) {
sqlStr := `SELECT value FROM db_unis_res_store WHERE key=? AND channel=? AND methord=?`
var res string
if err := db.Get(&res, sqlStr, key, channel); err != nil {
if err := db.Get(&res, sqlStr, key, channel, methord); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return "", nil
}

View File

@ -5,6 +5,7 @@ import (
"dashboard/dao/sqldb"
"dashboard/logger"
"dashboard/routes"
"dashboard/services/unis"
"dashboard/settings"
"errors"
"flag"
@ -48,7 +49,12 @@ func main() {
log.Sugar().Panicf("New db error: %v", err)
}
sqldb.SqlDbInit(log, db)
err = sqldb.SqlDbInit(log, db)
if err != nil {
log.Sugar().Panicf("New db error: %v", err)
}
err = unis.RpcListenAndServe(log, sets.RpcConfig)
if err != nil {
log.Sugar().Panicf("New db error: %v", err)
}

40
models/unisparams.go Normal file
View File

@ -0,0 +1,40 @@
package models
type StationConfigParams struct {
StationConfig UnisConfig `json:"stationConfig,omitempty"`
}
type Screens struct {
ScreenIndex int `json:"screenIndex,omitempty"`
Mode int `json:"mode,omitempty"`
PhysicalX int `json:"physicalX,omitempty"`
PhysicalY int `json:"physicalY,omitempty"`
PhysicalWidth int `json:"physicalWidth,omitempty"`
PhysicalHeight int `json:"physicalHeight,omitempty"`
LogicX int `json:"logicX,omitempty"`
LogicY int `json:"logicY,omitempty"`
LogicWidth int `json:"logicWidth,omitempty"`
LogicHeight int `json:"logicHeight,omitempty"`
LogicFrameRate int `json:"logicFrameRate,omitempty"`
Channel int `json:"channel,omitempty"`
McType int `json:"mcType,omitempty"`
SupportChannel int `json:"supportChannel,omitempty"`
Primary bool `json:"primary,omitempty"`
NodeID string `json:"nodeId,omitempty"`
NodeIP string `json:"nodeIp,omitempty"`
FpgaID string `json:"fpgaId,omitempty"`
SupportHighestTiming string `json:"supportHighestTiming,omitempty"`
UnisCapability string `json:"unisCapability,omitempty"`
}
type UnisConfig struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
RowCount int `json:"rowCount,omitempty"`
ColCount int `json:"colCount,omitempty"`
ScreenNum int `json:"screenNum,omitempty"`
WallWidth int `json:"wallWidth,omitempty"`
WallHeight int `json:"wallHeight,omitempty"`
ReceiveIndex int `json:"receiveIndex,omitempty"`
PrimaryIndex int `json:"primaryIndex,omitempty"`
ConfigType bool `json:"configType,omitempty"`
Screens []Screens `json:"screens,omitempty"`
}

23
models/unisrpcparams.go Normal file
View File

@ -0,0 +1,23 @@
package models
import "fmt"
const rpcNamePrefix = "UnisRpcService"
type UnisRpcMethod string
func (m UnisRpcMethod)String()string{
return fmt.Sprintf("%s.%s",rpcNamePrefix,string(m))
}
const(
UnisStationConfig UnisRpcMethod = "Config"
)
type UnisRpcRequest struct{
Id string
}
type UnisRpcResponse struct{
Id string
}

44
pkg/rpcsup/jsonrpc.go Normal file
View File

@ -0,0 +1,44 @@
package rpcsup
import (
"dashboard/logger"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
func JsonServer(log *logger.Logger, address string, service interface{}) error {
if err := rpc.Register(service); err != nil {
log.Sugar().Error(err)
return err
}
listen, err := net.Listen("tcp", address)
if err != nil {
log.Sugar().Error(err)
return err
}
log.Sugar().Infof("Rpc server listen on: %s", address)
for {
accept, err := listen.Accept()
if err != nil {
log.Sugar().Error(err)
continue
}
jsonrpc.ServeConn(accept)
}
}
func JsonClient(log *logger.Logger, address string) (*rpc.Client, error) {
dial, err := net.Dial("tcp", address)
if err != nil {
log.Sugar().Error(err)
return nil, err
}
return jsonrpc.NewClient(dial), nil
}

View File

@ -1,16 +1,21 @@
package routes
import (
"bytes"
"dashboard/dao/sqldb"
"dashboard/logger"
"dashboard/models"
"dashboard/pkg/jwt"
"dashboard/pkg/rate"
"dashboard/settings"
"dashboard/utils"
"io"
"net"
"net/http"
"net/http/httputil"
"os"
"runtime/debug"
"strconv"
"strings"
"time"
@ -155,3 +160,62 @@ func GinRateLimit(rateC settings.RateLimitConfig) appHandler {
return nil
}
}
func GinCheckServerId(c *gin.Context) error {
serverId := c.GetHeader("serverId")
if serverId != serverId {
c.Abort()
return &models.BaseError{
Code: http.StatusServiceUnavailable,
Msg: "Server Id error",
}
}
c.Next()
return nil
}
func GinStoreRequest(c *gin.Context) (err error) {
defer func() {
if err != nil {
c.Abort()
return
}
c.Next()
}()
log, _ := utils.GetLogFromContext(c)
channel, _err := strconv.Atoi(c.GetHeader("channel"))
if _err != nil {
err = &models.BaseError{
Code: http.StatusServiceUnavailable,
Msg: "Channel Id error",
}
return
}
oldConf, _err := sqldb.ResFulDataGet(c.Request.Method, c.Request.URL.String(), channel)
if _err != nil {
err = _err
}
bodyBytes, _err := io.ReadAll(c.Request.Body)
if _err != nil {
err = _err
}
c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
if oldConf != string(bodyBytes) {
log.Sugar().Info("The resful request is not equel local store")
if _err := sqldb.ResFulDataStore(c.Request.Method, c.Request.URL.String(), string(bodyBytes), channel); err != nil {
err = _err
}
}
return nil
}

View File

@ -37,8 +37,6 @@ func Setup(log *logger.Logger, rate settings.RateLimitConfig, jwtC settings.JwtC
r.POST("/api/login", errWapper(controller.UserSignInHandler(cjwt)))
r.POST("/api/logout", controller.UserLogOutHandler)
r.POST("/api/unis/config/v1/add",errWapper(unis.StationConfig))
g1 := r.Group("/api")
g1.Use(errWapper(GinJwtAuthor(cjwt)))
{
@ -50,5 +48,11 @@ func Setup(log *logger.Logger, rate settings.RateLimitConfig, jwtC settings.JwtC
r.GET("/ws/terminal", errWapper(GinJwtAuthor(cjwt)), errWapper(controller.TerminalHandle()))
unisr := r.Group("/api/unis")
unisr.Use(errWapper(GinCheckServerId), errWapper(GinStoreRequest))
{
unisr.POST("/config/v1/add", errWapper(unis.StationConfig))
}
return r
}

46
services/unis/common.go Normal file
View File

@ -0,0 +1,46 @@
package unis
import (
"dashboard/models"
"dashboard/pkg/rpcsup"
"fmt"
"net/rpc"
)
type rpcClients map[string]*rpc.Client
var rpcMapClients rpcClients
func StationConfig(config *models.StationConfigParams) error {
CreateRpcClients(&rpcMapClients, &config.StationConfig.Screens)
fmt.Println(GetConfig(models.UnisRpcRequest{Id: "zhangshuo"}))
return nil
}
func CreateRpcClients(rawClients *rpcClients, screens *[]models.Screens) {
if *rawClients == nil {
*rawClients = make(map[string]*rpc.Client)
}
for _, cli := range *rawClients {
if cli != nil {
cli.Close()
}
}
for _, sc := range *screens {
id := sc.NodeID
address := fmt.Sprintf("%s:%d", sc.NodeIP, 5501)
cli, err := rpcsup.JsonClient(log, address)
if err != nil {
log.Sugar().Errorf("%s create rpc client error: %s", id, address)
continue
}
(*rawClients)[id] = cli
}
return
}

View File

@ -0,0 +1,17 @@
package unis
import (
"dashboard/models"
"fmt"
)
func GetConfig(req models.UnisRpcRequest) (*models.UnisRpcResponse, error) {
out := &models.UnisRpcResponse{}
for key, value := range rpcMapClients {
fmt.Println(key)
value.Call(models.UnisStationConfig.String(), req, out)
}
return out, nil
}

View File

@ -0,0 +1,35 @@
package unis
import (
"dashboard/logger"
"dashboard/models"
"dashboard/pkg/rpcsup"
"dashboard/settings"
"fmt"
)
var log *logger.Logger
type UnisRpcService struct {
}
func NewRpcService() *UnisRpcService {
return &UnisRpcService{}
}
func RpcListenAndServe(_log *logger.Logger, config *settings.RpcConfig) error {
log = _log
addrees := fmt.Sprintf("%s:%d", config.Host, config.Port)
go rpcsup.JsonServer(log, addrees, NewRpcService())
return nil
}
func (u *UnisRpcService) Config(res models.UnisRpcRequest, rsp *models.UnisRpcResponse) error {
fmt.Println("rpc server get mesage",res)
rsp.Id = res.Id
return nil
}

View File

@ -7,6 +7,7 @@ type AppConfig struct {
*RateLimitConfig `mapstructure:"rate"`
*JwtConfig `mapstructure:"jwt"`
*SqlConfig `mapstructure:"database"`
*RpcConfig `mapstructure:"rpc"`
}
type BaseConfig struct {
@ -34,6 +35,12 @@ type SqlConfig struct {
Type string `mapstructure:"type"`
}
type RpcConfig struct {
Port int `mapstructure:"port"`
Host string `mapstructure:"host"`
Type string `mapstructure:"type"`
}
type SnowflakeConfig struct {
StartTime string `mapstructure:"start_time"`
MachineId int64 `mapstructure:"machine_id"`