From a64ab4f5f91ae9a911bced253e46527e55fce4f6 Mon Sep 17 00:00:00 2001 From: redhat <2292650292@qq.com> Date: Thu, 5 Jun 2025 17:15:43 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E6=95=B0=E6=8D=AE=E5=AF=B9=E8=B1=A1?= =?UTF-8?q?=E7=A7=81=E6=9C=89=E5=8C=96=EF=BC=8C=E5=AE=8C=E5=85=A8=E9=9D=A2?= =?UTF-8?q?=E5=90=91=E5=AF=B9=E8=B1=A1=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.yaml | 2 +- controller/unisc/httphandle.go | 34 +++++++++--------------------- dao/dadis/unisd/datacenter.go | 1 + models/errorcode.go | 2 ++ pkg/dadis/dadis.go | 9 +++----- routes/middleware.go | 4 ++-- routes/routes.go | 16 +++++++------- routes/wrapperr.go | 28 +------------------------ services/uniss/common.go | 38 +++++++++++++++++++++++++++------- services/uniss/main.go | 15 +++++++++++--- 10 files changed, 71 insertions(+), 78 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index ac898b4..bd1b6f4 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,7 +1,7 @@ base: name: "avcnet_dash" port: 8080 - mode: "dev" + mode: "dev" # release or dev version: "v0.1.0" log: diff --git a/controller/unisc/httphandle.go b/controller/unisc/httphandle.go index ff05d80..6697eef 100644 --- a/controller/unisc/httphandle.go +++ b/controller/unisc/httphandle.go @@ -21,9 +21,9 @@ func newhttpHandle(httpc []chan *models.UnisHttpRequest) *httpHandle { return res } -func (u *httpHandle)httpSendWithExpire(req *models.UnisHttpRequest) (*models.UnisHttpResponse, error) { +func (u *httpHandle) httpSendWithExpire(channel int, req *models.UnisHttpRequest) (*models.UnisHttpResponse, error) { select { - case u.httpC[0] <- req: + case u.httpC[channel] <- req: case <-time.After(2 * time.Second): return nil, models.ErrorSendTimeOut } @@ -31,7 +31,7 @@ func (u *httpHandle)httpSendWithExpire(req *models.UnisHttpRequest) (*models.Uni select { case resp := <-req.ResC: return resp, nil - case <-time.After(2 * time.Second): + case <-time.After(10 * time.Second): // 这里的消息必须要等到,如果等不到业务进程可能会出现空指针操作 return nil, models.ErrorReciveTimeOut } } @@ -46,26 +46,12 @@ func (u *httpHandle) stationConfig(c *gin.Context) error { } // 不再使用channel的方式,而是使用配置文件启动不同进程的方式 - // channel := c.GetInt(models.GinContextChannel) - // if channel > len(u.httpC)-1 { - // log.Sugar().Errorf("stationConfig channel err: %d", channel) - // return models.ErrorParamsErr - // } - - // if !config.StationConfig.ConfigType { - // res, ok := unisd.UnisDataGet(unisd.DadisKey_UnisStationInfo.KeyWithChannel(channel)) - // if ok { - // oldCfg := res.(*models.StationConfigParams) - - // oldCfg.StationConfig.ConfigType = config.StationConfig.ConfigType - // if reflect.DeepEqual(config, oldCfg) { - // log.Sugar().Infof("stationConfig is same") - // c.JSON(http.StatusOK, models.UnisHttpResponseOk) - - // return nil - // } - // } - // } + // 当前先兼容使用channel的方式 + channel := c.GetInt(models.GinContextChannel) + if channel > len(u.httpC)-1 { + log.Sugar().Errorf("stationConfig channel err: %d", channel) + return models.ErrorParamsErr + } req, err := getUnisHttpContextFromContext(c) if err != nil { @@ -80,7 +66,7 @@ func (u *httpHandle) stationConfig(c *gin.Context) error { // Msg: config, // } - resp, err := u.httpSendWithExpire(req) + resp, err := u.httpSendWithExpire(channel, req) if err != nil { log.Sugar().Errorf("stationConfig send or recive err: %v", err) return err diff --git a/dao/dadis/unisd/datacenter.go b/dao/dadis/unisd/datacenter.go index f66f603..91afb8a 100644 --- a/dao/dadis/unisd/datacenter.go +++ b/dao/dadis/unisd/datacenter.go @@ -5,6 +5,7 @@ import ( "time" ) +// 保留作为共享数据库,比如controller和service公共数据或者全局数据 var unisDataCenter = dadis.NewDadis() func UnisDataSet(key string, value interface{}, expire time.Duration) { diff --git a/models/errorcode.go b/models/errorcode.go index 57bc59c..5c69b86 100644 --- a/models/errorcode.go +++ b/models/errorcode.go @@ -6,10 +6,12 @@ const codeBase = 1000 const ( CodeSuccess resCode = iota + codeBase + CodeNotChange ) var codeMsg = map[resCode]string{ CodeSuccess:"success", + CodeNotChange:"not change", } func (r resCode) String() string { diff --git a/pkg/dadis/dadis.go b/pkg/dadis/dadis.go index 2e7f4f9..732a307 100644 --- a/pkg/dadis/dadis.go +++ b/pkg/dadis/dadis.go @@ -8,7 +8,7 @@ import ( ) type Dadis struct { - mu sync.Mutex + mu sync.RWMutex store map[string]*item observe *observe.SyncEventBus } @@ -48,8 +48,8 @@ func (d *Dadis) Set(key string, value interface{}, expire time.Duration) { } func (d *Dadis) Get(key string) (interface{}, bool) { - d.mu.Lock() - defer d.mu.Unlock() + d.mu.RLock() + defer d.mu.RUnlock() if item, ok := d.store[key]; ok { if item.expire != 0 && (time.Now().UnixNano() > item.expire) { @@ -64,9 +64,6 @@ func (d *Dadis) Get(key string) (interface{}, bool) { } func (d *Dadis) Sub(key string, ob observe.Observer) { - d.mu.Lock() - defer d.mu.Unlock() - d.observe.Subscribe(key, ob) } diff --git a/routes/middleware.go b/routes/middleware.go index 36b36a9..762fb40 100644 --- a/routes/middleware.go +++ b/routes/middleware.go @@ -96,7 +96,7 @@ func GinLog(log *logger.Logger) gin.HandlerFunc { } } -func GinJwtAuthor(jwtC *jwt.Jwt) appHandler { +func GinJwtAuthor(jwtC *jwt.Jwt) AppHandler { return func(c *gin.Context) error { authHeader, err := c.Cookie(models.GinAuthorKey) if err != nil || !strings.HasPrefix(authHeader, models.GinAuthorPrefixKey) { @@ -131,7 +131,7 @@ func GinJwtAuthor(jwtC *jwt.Jwt) appHandler { } } -func GinRateLimit(rateC settings.RateLimitConfig) appHandler { +func GinRateLimit(rateC settings.RateLimitConfig) AppHandler { lrate, err := rate.New(rate.WithCapacity(rateC.Capacity), rate.WithFillInterval(time.Duration(rateC.FillInterval)*time.Millisecond), ) diff --git a/routes/routes.go b/routes/routes.go index bbfb3de..eb75e85 100644 --- a/routes/routes.go +++ b/routes/routes.go @@ -44,7 +44,7 @@ func NewRouter(addr string, log *logger.Logger, rate settings.RateLimitConfig, j MaxAge: 12 * time.Hour, })) - r.Use(GinLogger(log), GinRecovery(log, true), GinLog(log), errWapper(GinRateLimit(rate))) + r.Use(GinLogger(log), GinRecovery(log, true), GinLog(log), ErrWapper(GinRateLimit(rate))) // 静态文件服务 r.Static("/static", "./static") @@ -52,19 +52,19 @@ func NewRouter(addr string, log *logger.Logger, rate settings.RateLimitConfig, j r.GET("login", controller.LoginPage) - r.POST("/api/login", errWapper(controller.UserSignInHandler(cjwt))) + r.POST("/api/login", ErrWapper(controller.UserSignInHandler(cjwt))) r.POST("/api/logout", controller.UserLogOutHandler) g1 := r.Group("/api") - g1.Use(errWapper(GinJwtAuthor(cjwt))) + g1.Use(ErrWapper(GinJwtAuthor(cjwt))) { - g1.GET("/system-info", errWapper(controller.SystemInfoHandle)) - g1.POST("/upload", errWapper(controller.FileUploadHandle)) - g1.GET("/files", errWapper(controller.FileListHandle)) - g1.GET("/download", errWapper(controller.FileDownloadHandle)) + g1.GET("/system-info", ErrWapper(controller.SystemInfoHandle)) + g1.POST("/upload", ErrWapper(controller.FileUploadHandle)) + g1.GET("/files", ErrWapper(controller.FileListHandle)) + g1.GET("/download", ErrWapper(controller.FileDownloadHandle)) } - r.GET("/ws/terminal", errWapper(GinJwtAuthor(cjwt)), errWapper(controller.TerminalHandle())) + r.GET("/ws/terminal", ErrWapper(GinJwtAuthor(cjwt)), ErrWapper(controller.TerminalHandle())) for _, route := range routes { route.AddRoute(r) diff --git a/routes/wrapperr.go b/routes/wrapperr.go index fbb8d0c..fa29364 100644 --- a/routes/wrapperr.go +++ b/routes/wrapperr.go @@ -10,35 +10,9 @@ import ( "go.uber.org/zap" ) -type appHandler func(*gin.Context) error - type AppHandler func(*gin.Context) error -func errWapper(appH appHandler) gin.HandlerFunc { - return func(c *gin.Context) { - log, _ := utils.GetLogFromContext(c) - - err := appH(c) - - if err != nil { - var baseErr *models.BaseError - if errors.As(err, &baseErr) { - log.Error("Base error", zap.Any("baseErr", baseErr)) - - c.JSON(http.StatusBadRequest, baseErr) - - return - } - - log.Error("Other error", zap.Error(err)) - c.String(http.StatusBadGateway, err.Error()) - - return - } - } -} - -func ErrWapper(appH appHandler) gin.HandlerFunc { +func ErrWapper(appH AppHandler) gin.HandlerFunc { return func(c *gin.Context) { log, _ := utils.GetLogFromContext(c) diff --git a/services/uniss/common.go b/services/uniss/common.go index 885cb02..c54325f 100644 --- a/services/uniss/common.go +++ b/services/uniss/common.go @@ -5,16 +5,24 @@ import ( "dashboard/models" "fmt" "net/http" + "reflect" ) type commonService struct { - comm *communicateService + comm *communicateService + dataCenter *dataCenterService } -func newcommonService(comm *communicateService) *commonService { +type communicateWithDataCenter interface { + getCommunicate() *communicateService + getDataCenter() *dataCenterService +} + +func newcommonService(comm communicateWithDataCenter) *commonService { res := new(commonService) - res.comm = comm + res.comm = comm.getCommunicate() + res.dataCenter = comm.getDataCenter() res.comm.httpServerHandler.Put(string(models.UNIS_HTTP_ID_CONFIG_ADD), res.stationConfig) @@ -22,11 +30,27 @@ func newcommonService(comm *communicateService) *commonService { } func (c *commonService) stationConfig(reqest *models.UnisHttpRequest, respons *models.UnisHttpResponse) error { - req := reqest.Msg.(*models.StationConfigParams) - fmt.Println(req) + config := reqest.Msg.(*models.StationConfigParams) + fmt.Println(config) + + if !config.StationConfig.ConfigType { + res, ok := c.dataCenter.dataMap.Get(unisd.DadisKey_UnisStationInfo.KeyWithChannel(0)) + if ok { + oldCfg := res.(*models.StationConfigParams) + + oldCfg.StationConfig.ConfigType = config.StationConfig.ConfigType + if reflect.DeepEqual(config, oldCfg) { + log.Sugar().Warnf("stationConfig is same") + + respons.SetResParam(int(models.CodeNotChange), models.CodeNotChange.String(), nil) + + return nil + } + } + } var addres []*rpcIdAddres - for _, sc := range req.StationConfig.Screens { + for _, sc := range config.StationConfig.Screens { add := new(rpcIdAddres) add.host = sc.NodeIP @@ -45,7 +69,7 @@ func (c *commonService) stationConfig(reqest *models.UnisHttpRequest, respons *m // } respons.SetResParam(int(models.CodeSuccess), "config ok", nil) - unisd.UnisDataSet(unisd.DadisKey_UnisStationInfo.KeyWithChannel(0), req, 0) + c.dataCenter.dataMap.Set(unisd.DadisKey_UnisStationInfo.KeyWithChannel(0), config, 0) c.comm.httpClientHandler.curl(http.MethodGet, models.UNIS_HTTP_CLIENT_URL_SOURCE_LIST.URI("192.168.177.7", 8080), diff --git a/services/uniss/main.go b/services/uniss/main.go index caa68d2..a57de85 100644 --- a/services/uniss/main.go +++ b/services/uniss/main.go @@ -15,7 +15,8 @@ import ( var log *logger.Logger type UnisStation struct { - cancelFunc context.CancelFunc + cancelFunc context.CancelFunc + commonService *commonService communicateService *communicateService dataCenterService *dataCenterService @@ -31,7 +32,7 @@ func NewUnis(_log *logger.Logger, conf *settings.UnisConfig) (res *UnisStation, return } - res.commonService = newcommonService(res.communicateService) + res.commonService = newcommonService(res) return } @@ -46,6 +47,14 @@ func (u *UnisStation) stop() { } } +func (u *UnisStation) getCommunicate() *communicateService { + return u.communicateService +} + +func (u *UnisStation) getDataCenter() *dataCenterService { + return u.dataCenterService +} + // High performance threads do not allow any blocking // If there is blocking work, asynchronous methods need to be used // Ensure the last exit @@ -58,7 +67,7 @@ func (u *UnisStation) mainthread() error { case <-ctx.Done(): log.Error("Unis mesageHandle cancel by ctx") return ctx.Err() - case httpMsg := <-u.communicateService.httpServerHandler.httpC: + case httpMsg := <-u.GetHttpServerChannel(): u.communicateService.httpServerHandler.httpServerHandle(httpMsg) case httpMsg := <-u.communicateService.httpClientHandler.httpC: u.communicateService.httpClientHandler.httpClientHandle(httpMsg)