1、数据对象私有化,完全面向对象。

This commit is contained in:
redhat 2025-06-05 17:15:43 +08:00
parent b28f12a425
commit a64ab4f5f9
10 changed files with 71 additions and 78 deletions

View File

@ -1,7 +1,7 @@
base: base:
name: "avcnet_dash" name: "avcnet_dash"
port: 8080 port: 8080
mode: "dev" mode: "dev" # release or dev
version: "v0.1.0" version: "v0.1.0"
log: log:

View File

@ -21,9 +21,9 @@ func newhttpHandle(httpc []chan *models.UnisHttpRequest) *httpHandle {
return res return res
} }
func (u *httpHandle)httpSendWithExpire(req *models.UnisHttpRequest) (*models.UnisHttpResponse, error) { func (u *httpHandle) httpSendWithExpire(channel int, req *models.UnisHttpRequest) (*models.UnisHttpResponse, error) {
select { select {
case u.httpC[0] <- req: case u.httpC[channel] <- req:
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
return nil, models.ErrorSendTimeOut return nil, models.ErrorSendTimeOut
} }
@ -31,7 +31,7 @@ func (u *httpHandle)httpSendWithExpire(req *models.UnisHttpRequest) (*models.Uni
select { select {
case resp := <-req.ResC: case resp := <-req.ResC:
return resp, nil return resp, nil
case <-time.After(2 * time.Second): case <-time.After(10 * time.Second): // 这里的消息必须要等到,如果等不到业务进程可能会出现空指针操作
return nil, models.ErrorReciveTimeOut return nil, models.ErrorReciveTimeOut
} }
} }
@ -46,26 +46,12 @@ func (u *httpHandle) stationConfig(c *gin.Context) error {
} }
// 不再使用channel的方式而是使用配置文件启动不同进程的方式 // 不再使用channel的方式而是使用配置文件启动不同进程的方式
// channel := c.GetInt(models.GinContextChannel) // 当前先兼容使用channel的方式
// if channel > len(u.httpC)-1 { channel := c.GetInt(models.GinContextChannel)
// log.Sugar().Errorf("stationConfig channel err: %d", channel) if channel > len(u.httpC)-1 {
// return models.ErrorParamsErr log.Sugar().Errorf("stationConfig channel err: %d", channel)
// } return models.ErrorParamsErr
}
// if !config.StationConfig.ConfigType {
// res, ok := unisd.UnisDataGet(unisd.DadisKey_UnisStationInfo.KeyWithChannel(channel))
// if ok {
// oldCfg := res.(*models.StationConfigParams)
// oldCfg.StationConfig.ConfigType = config.StationConfig.ConfigType
// if reflect.DeepEqual(config, oldCfg) {
// log.Sugar().Infof("stationConfig is same")
// c.JSON(http.StatusOK, models.UnisHttpResponseOk)
// return nil
// }
// }
// }
req, err := getUnisHttpContextFromContext(c) req, err := getUnisHttpContextFromContext(c)
if err != nil { if err != nil {
@ -80,7 +66,7 @@ func (u *httpHandle) stationConfig(c *gin.Context) error {
// Msg: config, // Msg: config,
// } // }
resp, err := u.httpSendWithExpire(req) resp, err := u.httpSendWithExpire(channel, req)
if err != nil { if err != nil {
log.Sugar().Errorf("stationConfig send or recive err: %v", err) log.Sugar().Errorf("stationConfig send or recive err: %v", err)
return err return err

View File

@ -5,6 +5,7 @@ import (
"time" "time"
) )
// 保留作为共享数据库比如controller和service公共数据或者全局数据
var unisDataCenter = dadis.NewDadis() var unisDataCenter = dadis.NewDadis()
func UnisDataSet(key string, value interface{}, expire time.Duration) { func UnisDataSet(key string, value interface{}, expire time.Duration) {

View File

@ -6,10 +6,12 @@ const codeBase = 1000
const ( const (
CodeSuccess resCode = iota + codeBase CodeSuccess resCode = iota + codeBase
CodeNotChange
) )
var codeMsg = map[resCode]string{ var codeMsg = map[resCode]string{
CodeSuccess:"success", CodeSuccess:"success",
CodeNotChange:"not change",
} }
func (r resCode) String() string { func (r resCode) String() string {

View File

@ -8,7 +8,7 @@ import (
) )
type Dadis struct { type Dadis struct {
mu sync.Mutex mu sync.RWMutex
store map[string]*item store map[string]*item
observe *observe.SyncEventBus observe *observe.SyncEventBus
} }
@ -48,8 +48,8 @@ func (d *Dadis) Set(key string, value interface{}, expire time.Duration) {
} }
func (d *Dadis) Get(key string) (interface{}, bool) { func (d *Dadis) Get(key string) (interface{}, bool) {
d.mu.Lock() d.mu.RLock()
defer d.mu.Unlock() defer d.mu.RUnlock()
if item, ok := d.store[key]; ok { if item, ok := d.store[key]; ok {
if item.expire != 0 && (time.Now().UnixNano() > item.expire) { if item.expire != 0 && (time.Now().UnixNano() > item.expire) {
@ -64,9 +64,6 @@ func (d *Dadis) Get(key string) (interface{}, bool) {
} }
func (d *Dadis) Sub(key string, ob observe.Observer) { func (d *Dadis) Sub(key string, ob observe.Observer) {
d.mu.Lock()
defer d.mu.Unlock()
d.observe.Subscribe(key, ob) d.observe.Subscribe(key, ob)
} }

View File

@ -96,7 +96,7 @@ func GinLog(log *logger.Logger) gin.HandlerFunc {
} }
} }
func GinJwtAuthor(jwtC *jwt.Jwt) appHandler { func GinJwtAuthor(jwtC *jwt.Jwt) AppHandler {
return func(c *gin.Context) error { return func(c *gin.Context) error {
authHeader, err := c.Cookie(models.GinAuthorKey) authHeader, err := c.Cookie(models.GinAuthorKey)
if err != nil || !strings.HasPrefix(authHeader, models.GinAuthorPrefixKey) { if err != nil || !strings.HasPrefix(authHeader, models.GinAuthorPrefixKey) {
@ -131,7 +131,7 @@ func GinJwtAuthor(jwtC *jwt.Jwt) appHandler {
} }
} }
func GinRateLimit(rateC settings.RateLimitConfig) appHandler { func GinRateLimit(rateC settings.RateLimitConfig) AppHandler {
lrate, err := rate.New(rate.WithCapacity(rateC.Capacity), lrate, err := rate.New(rate.WithCapacity(rateC.Capacity),
rate.WithFillInterval(time.Duration(rateC.FillInterval)*time.Millisecond), rate.WithFillInterval(time.Duration(rateC.FillInterval)*time.Millisecond),
) )

View File

@ -44,7 +44,7 @@ func NewRouter(addr string, log *logger.Logger, rate settings.RateLimitConfig, j
MaxAge: 12 * time.Hour, MaxAge: 12 * time.Hour,
})) }))
r.Use(GinLogger(log), GinRecovery(log, true), GinLog(log), errWapper(GinRateLimit(rate))) r.Use(GinLogger(log), GinRecovery(log, true), GinLog(log), ErrWapper(GinRateLimit(rate)))
// 静态文件服务 // 静态文件服务
r.Static("/static", "./static") r.Static("/static", "./static")
@ -52,19 +52,19 @@ func NewRouter(addr string, log *logger.Logger, rate settings.RateLimitConfig, j
r.GET("login", controller.LoginPage) r.GET("login", controller.LoginPage)
r.POST("/api/login", errWapper(controller.UserSignInHandler(cjwt))) r.POST("/api/login", ErrWapper(controller.UserSignInHandler(cjwt)))
r.POST("/api/logout", controller.UserLogOutHandler) r.POST("/api/logout", controller.UserLogOutHandler)
g1 := r.Group("/api") g1 := r.Group("/api")
g1.Use(errWapper(GinJwtAuthor(cjwt))) g1.Use(ErrWapper(GinJwtAuthor(cjwt)))
{ {
g1.GET("/system-info", errWapper(controller.SystemInfoHandle)) g1.GET("/system-info", ErrWapper(controller.SystemInfoHandle))
g1.POST("/upload", errWapper(controller.FileUploadHandle)) g1.POST("/upload", ErrWapper(controller.FileUploadHandle))
g1.GET("/files", errWapper(controller.FileListHandle)) g1.GET("/files", ErrWapper(controller.FileListHandle))
g1.GET("/download", errWapper(controller.FileDownloadHandle)) g1.GET("/download", ErrWapper(controller.FileDownloadHandle))
} }
r.GET("/ws/terminal", errWapper(GinJwtAuthor(cjwt)), errWapper(controller.TerminalHandle())) r.GET("/ws/terminal", ErrWapper(GinJwtAuthor(cjwt)), ErrWapper(controller.TerminalHandle()))
for _, route := range routes { for _, route := range routes {
route.AddRoute(r) route.AddRoute(r)

View File

@ -10,35 +10,9 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type appHandler func(*gin.Context) error
type AppHandler func(*gin.Context) error type AppHandler func(*gin.Context) error
func errWapper(appH appHandler) gin.HandlerFunc { func ErrWapper(appH AppHandler) gin.HandlerFunc {
return func(c *gin.Context) {
log, _ := utils.GetLogFromContext(c)
err := appH(c)
if err != nil {
var baseErr *models.BaseError
if errors.As(err, &baseErr) {
log.Error("Base error", zap.Any("baseErr", baseErr))
c.JSON(http.StatusBadRequest, baseErr)
return
}
log.Error("Other error", zap.Error(err))
c.String(http.StatusBadGateway, err.Error())
return
}
}
}
func ErrWapper(appH appHandler) gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
log, _ := utils.GetLogFromContext(c) log, _ := utils.GetLogFromContext(c)

View File

@ -5,16 +5,24 @@ import (
"dashboard/models" "dashboard/models"
"fmt" "fmt"
"net/http" "net/http"
"reflect"
) )
type commonService struct { type commonService struct {
comm *communicateService comm *communicateService
dataCenter *dataCenterService
} }
func newcommonService(comm *communicateService) *commonService { type communicateWithDataCenter interface {
getCommunicate() *communicateService
getDataCenter() *dataCenterService
}
func newcommonService(comm communicateWithDataCenter) *commonService {
res := new(commonService) res := new(commonService)
res.comm = comm res.comm = comm.getCommunicate()
res.dataCenter = comm.getDataCenter()
res.comm.httpServerHandler.Put(string(models.UNIS_HTTP_ID_CONFIG_ADD), res.stationConfig) res.comm.httpServerHandler.Put(string(models.UNIS_HTTP_ID_CONFIG_ADD), res.stationConfig)
@ -22,11 +30,27 @@ func newcommonService(comm *communicateService) *commonService {
} }
func (c *commonService) stationConfig(reqest *models.UnisHttpRequest, respons *models.UnisHttpResponse) error { func (c *commonService) stationConfig(reqest *models.UnisHttpRequest, respons *models.UnisHttpResponse) error {
req := reqest.Msg.(*models.StationConfigParams) config := reqest.Msg.(*models.StationConfigParams)
fmt.Println(req) fmt.Println(config)
if !config.StationConfig.ConfigType {
res, ok := c.dataCenter.dataMap.Get(unisd.DadisKey_UnisStationInfo.KeyWithChannel(0))
if ok {
oldCfg := res.(*models.StationConfigParams)
oldCfg.StationConfig.ConfigType = config.StationConfig.ConfigType
if reflect.DeepEqual(config, oldCfg) {
log.Sugar().Warnf("stationConfig is same")
respons.SetResParam(int(models.CodeNotChange), models.CodeNotChange.String(), nil)
return nil
}
}
}
var addres []*rpcIdAddres var addres []*rpcIdAddres
for _, sc := range req.StationConfig.Screens { for _, sc := range config.StationConfig.Screens {
add := new(rpcIdAddres) add := new(rpcIdAddres)
add.host = sc.NodeIP add.host = sc.NodeIP
@ -45,7 +69,7 @@ func (c *commonService) stationConfig(reqest *models.UnisHttpRequest, respons *m
// } // }
respons.SetResParam(int(models.CodeSuccess), "config ok", nil) respons.SetResParam(int(models.CodeSuccess), "config ok", nil)
unisd.UnisDataSet(unisd.DadisKey_UnisStationInfo.KeyWithChannel(0), req, 0) c.dataCenter.dataMap.Set(unisd.DadisKey_UnisStationInfo.KeyWithChannel(0), config, 0)
c.comm.httpClientHandler.curl(http.MethodGet, c.comm.httpClientHandler.curl(http.MethodGet,
models.UNIS_HTTP_CLIENT_URL_SOURCE_LIST.URI("192.168.177.7", 8080), models.UNIS_HTTP_CLIENT_URL_SOURCE_LIST.URI("192.168.177.7", 8080),

View File

@ -16,6 +16,7 @@ var log *logger.Logger
type UnisStation struct { type UnisStation struct {
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
commonService *commonService commonService *commonService
communicateService *communicateService communicateService *communicateService
dataCenterService *dataCenterService dataCenterService *dataCenterService
@ -31,7 +32,7 @@ func NewUnis(_log *logger.Logger, conf *settings.UnisConfig) (res *UnisStation,
return return
} }
res.commonService = newcommonService(res.communicateService) res.commonService = newcommonService(res)
return return
} }
@ -46,6 +47,14 @@ func (u *UnisStation) stop() {
} }
} }
func (u *UnisStation) getCommunicate() *communicateService {
return u.communicateService
}
func (u *UnisStation) getDataCenter() *dataCenterService {
return u.dataCenterService
}
// High performance threads do not allow any blocking // High performance threads do not allow any blocking
// If there is blocking work, asynchronous methods need to be used // If there is blocking work, asynchronous methods need to be used
// Ensure the last exit // Ensure the last exit
@ -58,7 +67,7 @@ func (u *UnisStation) mainthread() error {
case <-ctx.Done(): case <-ctx.Done():
log.Error("Unis mesageHandle cancel by ctx") log.Error("Unis mesageHandle cancel by ctx")
return ctx.Err() return ctx.Err()
case httpMsg := <-u.communicateService.httpServerHandler.httpC: case httpMsg := <-u.GetHttpServerChannel():
u.communicateService.httpServerHandler.httpServerHandle(httpMsg) u.communicateService.httpServerHandler.httpServerHandle(httpMsg)
case httpMsg := <-u.communicateService.httpClientHandler.httpC: case httpMsg := <-u.communicateService.httpClientHandler.httpC:
u.communicateService.httpClientHandler.httpClientHandle(httpMsg) u.communicateService.httpClientHandler.httpClientHandle(httpMsg)