diff --git a/controller/unisc/httphandle.go b/controller/unisc/httphandle.go index e6293f8..ff05d80 100644 --- a/controller/unisc/httphandle.go +++ b/controller/unisc/httphandle.go @@ -67,7 +67,7 @@ func (u *httpHandle) stationConfig(c *gin.Context) error { // } // } - req, err := getUnisHttpReqFromContext(c) + req, err := getUnisHttpContextFromContext(c) if err != nil { log.Sugar().Errorf("stationConfig get http req from context err: %v", err) return models.ErrorParamsErr diff --git a/controller/unisc/httproute.go b/controller/unisc/httproute.go index fa1d32d..ef7afbd 100644 --- a/controller/unisc/httproute.go +++ b/controller/unisc/httproute.go @@ -22,7 +22,7 @@ type HttpRoute struct { } type UnisHttpChanneler interface { - GetHttpChannel() chan *models.UnisHttpRequest + GetHttpServerChannel() chan *models.UnisHttpRequest } func NewHttpRoute(httpc ...UnisHttpChanneler) *HttpRoute { @@ -30,7 +30,7 @@ func NewHttpRoute(httpc ...UnisHttpChanneler) *HttpRoute { var httpC []chan *models.UnisHttpRequest for _, cha := range httpc { - httpC = append(httpC, cha.GetHttpChannel()) + httpC = append(httpC, cha.GetHttpServerChannel()) } res.httpHandle = newhttpHandle(httpC) @@ -40,10 +40,10 @@ func NewHttpRoute(httpc ...UnisHttpChanneler) *HttpRoute { } func (u *HttpRoute) AddRoute(r *gin.Engine) { - unisr := r.Group(models.UnisHttpUrlPrefix) - unisr.Use(routes.ErrWapper(u.middleWare.ginCheckServerId), routes.ErrWapper(u.middleWare.ginStoreRequest), u.middleWare.ginSetUnisHttpReq) + unis := r.Group(models.UnisHttpUrlPrefix) + unis.Use(routes.ErrWapper(u.middleWare.ginCheckServerId), routes.ErrWapper(u.middleWare.ginStoreRequest), u.middleWare.ginSetUnisHttpContext) { - unisr.POST(models.UNIS_HTTP_URL_CONFIG_ADD.URL(), routes.ErrWapper(u.httpHandle.stationConfig)) + unis.POST(models.UNIS_HTTP_URL_CONFIG_ADD.URL(), routes.ErrWapper(u.httpHandle.stationConfig)) } } @@ -196,14 +196,14 @@ func (m *middleWare) ginWithExpireTime(expire time.Duration) gin.HandlerFunc { } } -func (m *middleWare) ginSetUnisHttpReq(c *gin.Context) { +func (m *middleWare) ginSetUnisHttpContext(c *gin.Context) { req := m.pool.Get() c.Set(models.GinContextUnisHttpReq, req) c.Next() m.pool.Put(req) } -func getUnisHttpReqFromContext(c *gin.Context) (*models.UnisHttpRequest, error) { +func getUnisHttpContextFromContext(c *gin.Context) (*models.UnisHttpRequest, error) { res, ok := c.Get(models.GinContextUnisHttpReq) if !ok { return nil, models.ErrorInvalidData diff --git a/pkg/singlemap/singlemap.go b/pkg/singlemap/singlemap.go index d261942..60c990a 100644 --- a/pkg/singlemap/singlemap.go +++ b/pkg/singlemap/singlemap.go @@ -1,6 +1,6 @@ package singlemap -// NOTE Non concurrent map +// NOTE: Non concurrent map type SingleMap[K comparable, T any] struct { mp map[K]T diff --git a/services/uniss/httpclienthandle.go b/services/uniss/httpclienthandle.go index 771f385..21c11ac 100644 --- a/services/uniss/httpclienthandle.go +++ b/services/uniss/httpclienthandle.go @@ -10,7 +10,6 @@ import ( "io" "net/http" "sync" - "time" "go.uber.org/zap" ) @@ -99,7 +98,7 @@ func (c *httpClientHandler) doRequest(req *models.UnisHttpClientRequest) error { return err } - c.client.Timeout = time.Second + // c.client.Timeout = time.Second reqs.Header.Add("Content-type", "application/json;charset=utf-8") reqs.Header.Add("User-Agent", "avcnetRuntime/0.0.1") diff --git a/services/uniss/httpserverhandle.go b/services/uniss/httpserverhandle.go index b119cd9..6fd3181 100644 --- a/services/uniss/httpserverhandle.go +++ b/services/uniss/httpserverhandle.go @@ -7,7 +7,6 @@ import ( type httpServerHandler struct { httpC chan *models.UnisHttpRequest - // mu sync.RWMutex // 没有必要使用锁,应为完全可以保证在一个协程内访问map *singlemap.SingleMap[string,httpServerHandlerFunc] } @@ -22,7 +21,7 @@ func newhttpServerHandler() *httpServerHandler { return res } -func (u *httpServerHandler) httpHandle(msg *models.UnisHttpRequest) { +func (u *httpServerHandler) httpServerHandle(msg *models.UnisHttpRequest) { resP := msg.ResP resC := msg.ResC diff --git a/services/uniss/main.go b/services/uniss/main.go index 34a4b49..caa68d2 100644 --- a/services/uniss/main.go +++ b/services/uniss/main.go @@ -7,13 +7,15 @@ import ( "context" "dashboard/logger" "dashboard/models" - "dashboard/pkg/errgroups" "dashboard/settings" + + "golang.org/x/sync/errgroup" ) var log *logger.Logger type UnisStation struct { + cancelFunc context.CancelFunc commonService *commonService communicateService *communicateService dataCenterService *dataCenterService @@ -34,33 +36,52 @@ func NewUnis(_log *logger.Logger, conf *settings.UnisConfig) (res *UnisStation, return } -func (u *UnisStation) GetHttpChannel() chan *models.UnisHttpRequest { +func (u *UnisStation) GetHttpServerChannel() chan *models.UnisHttpRequest { return u.communicateService.httpServerHandler.httpC } +func (u *UnisStation) stop() { + if u.cancelFunc != nil { + u.cancelFunc() + } +} + // High performance threads do not allow any blocking // If there is blocking work, asynchronous methods need to be used -func (u *UnisStation) mainthread(ctx context.Context) error { +// Ensure the last exit +func (u *UnisStation) mainthread() error { + var ctx context.Context + ctx, u.cancelFunc = context.WithCancel(context.TODO()) + for { select { case <-ctx.Done(): log.Error("Unis mesageHandle cancel by ctx") return ctx.Err() case httpMsg := <-u.communicateService.httpServerHandler.httpC: - u.communicateService.httpServerHandler.httpHandle(httpMsg) - case httpc := <-u.communicateService.httpClientHandler.httpC: - u.communicateService.httpClientHandler.httpClientHandle(httpc) + u.communicateService.httpServerHandler.httpServerHandle(httpMsg) + case httpMsg := <-u.communicateService.httpClientHandler.httpC: + u.communicateService.httpClientHandler.httpClientHandle(httpMsg) // default: } - } } func (u *UnisStation) Run(ctx context.Context) error { - ewgs := errgroups.NewErrGroupFunc() + ewgs, nctx := errgroup.WithContext(ctx) - ewgs.Add(u.communicateService.Run) - ewgs.Add(u.mainthread) + ewgs.Go(func() error { + defer u.stop() + return u.communicateService.Run(nctx) + }) - return ewgs.Run(ctx) + ewgs.Go(func() error { + return u.mainthread() + }) + + if err := ewgs.Wait(); err != nil { + return err + } + + return nil }