1、保证座席主线程最后退出,其他线程先退出会导致管道通信死锁。

This commit is contained in:
redhat 2025-06-05 11:50:48 +08:00
parent 39f477afe5
commit b28f12a425
6 changed files with 43 additions and 24 deletions

View File

@ -67,7 +67,7 @@ func (u *httpHandle) stationConfig(c *gin.Context) error {
// } // }
// } // }
req, err := getUnisHttpReqFromContext(c) req, err := getUnisHttpContextFromContext(c)
if err != nil { if err != nil {
log.Sugar().Errorf("stationConfig get http req from context err: %v", err) log.Sugar().Errorf("stationConfig get http req from context err: %v", err)
return models.ErrorParamsErr return models.ErrorParamsErr

View File

@ -22,7 +22,7 @@ type HttpRoute struct {
} }
type UnisHttpChanneler interface { type UnisHttpChanneler interface {
GetHttpChannel() chan *models.UnisHttpRequest GetHttpServerChannel() chan *models.UnisHttpRequest
} }
func NewHttpRoute(httpc ...UnisHttpChanneler) *HttpRoute { func NewHttpRoute(httpc ...UnisHttpChanneler) *HttpRoute {
@ -30,7 +30,7 @@ func NewHttpRoute(httpc ...UnisHttpChanneler) *HttpRoute {
var httpC []chan *models.UnisHttpRequest var httpC []chan *models.UnisHttpRequest
for _, cha := range httpc { for _, cha := range httpc {
httpC = append(httpC, cha.GetHttpChannel()) httpC = append(httpC, cha.GetHttpServerChannel())
} }
res.httpHandle = newhttpHandle(httpC) res.httpHandle = newhttpHandle(httpC)
@ -40,10 +40,10 @@ func NewHttpRoute(httpc ...UnisHttpChanneler) *HttpRoute {
} }
func (u *HttpRoute) AddRoute(r *gin.Engine) { func (u *HttpRoute) AddRoute(r *gin.Engine) {
unisr := r.Group(models.UnisHttpUrlPrefix) unis := r.Group(models.UnisHttpUrlPrefix)
unisr.Use(routes.ErrWapper(u.middleWare.ginCheckServerId), routes.ErrWapper(u.middleWare.ginStoreRequest), u.middleWare.ginSetUnisHttpReq) unis.Use(routes.ErrWapper(u.middleWare.ginCheckServerId), routes.ErrWapper(u.middleWare.ginStoreRequest), u.middleWare.ginSetUnisHttpContext)
{ {
unisr.POST(models.UNIS_HTTP_URL_CONFIG_ADD.URL(), routes.ErrWapper(u.httpHandle.stationConfig)) unis.POST(models.UNIS_HTTP_URL_CONFIG_ADD.URL(), routes.ErrWapper(u.httpHandle.stationConfig))
} }
} }
@ -196,14 +196,14 @@ func (m *middleWare) ginWithExpireTime(expire time.Duration) gin.HandlerFunc {
} }
} }
func (m *middleWare) ginSetUnisHttpReq(c *gin.Context) { func (m *middleWare) ginSetUnisHttpContext(c *gin.Context) {
req := m.pool.Get() req := m.pool.Get()
c.Set(models.GinContextUnisHttpReq, req) c.Set(models.GinContextUnisHttpReq, req)
c.Next() c.Next()
m.pool.Put(req) m.pool.Put(req)
} }
func getUnisHttpReqFromContext(c *gin.Context) (*models.UnisHttpRequest, error) { func getUnisHttpContextFromContext(c *gin.Context) (*models.UnisHttpRequest, error) {
res, ok := c.Get(models.GinContextUnisHttpReq) res, ok := c.Get(models.GinContextUnisHttpReq)
if !ok { if !ok {
return nil, models.ErrorInvalidData return nil, models.ErrorInvalidData

View File

@ -1,6 +1,6 @@
package singlemap package singlemap
// NOTE Non concurrent map // NOTE: Non concurrent map
type SingleMap[K comparable, T any] struct { type SingleMap[K comparable, T any] struct {
mp map[K]T mp map[K]T

View File

@ -10,7 +10,6 @@ import (
"io" "io"
"net/http" "net/http"
"sync" "sync"
"time"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -99,7 +98,7 @@ func (c *httpClientHandler) doRequest(req *models.UnisHttpClientRequest) error {
return err return err
} }
c.client.Timeout = time.Second // c.client.Timeout = time.Second
reqs.Header.Add("Content-type", "application/json;charset=utf-8") reqs.Header.Add("Content-type", "application/json;charset=utf-8")
reqs.Header.Add("User-Agent", "avcnetRuntime/0.0.1") reqs.Header.Add("User-Agent", "avcnetRuntime/0.0.1")

View File

@ -7,7 +7,6 @@ import (
type httpServerHandler struct { type httpServerHandler struct {
httpC chan *models.UnisHttpRequest httpC chan *models.UnisHttpRequest
// mu sync.RWMutex // 没有必要使用锁应为完全可以保证在一个协程内访问map
*singlemap.SingleMap[string,httpServerHandlerFunc] *singlemap.SingleMap[string,httpServerHandlerFunc]
} }
@ -22,7 +21,7 @@ func newhttpServerHandler() *httpServerHandler {
return res return res
} }
func (u *httpServerHandler) httpHandle(msg *models.UnisHttpRequest) { func (u *httpServerHandler) httpServerHandle(msg *models.UnisHttpRequest) {
resP := msg.ResP resP := msg.ResP
resC := msg.ResC resC := msg.ResC

View File

@ -7,13 +7,15 @@ import (
"context" "context"
"dashboard/logger" "dashboard/logger"
"dashboard/models" "dashboard/models"
"dashboard/pkg/errgroups"
"dashboard/settings" "dashboard/settings"
"golang.org/x/sync/errgroup"
) )
var log *logger.Logger var log *logger.Logger
type UnisStation struct { type UnisStation struct {
cancelFunc context.CancelFunc
commonService *commonService commonService *commonService
communicateService *communicateService communicateService *communicateService
dataCenterService *dataCenterService dataCenterService *dataCenterService
@ -34,33 +36,52 @@ func NewUnis(_log *logger.Logger, conf *settings.UnisConfig) (res *UnisStation,
return return
} }
func (u *UnisStation) GetHttpChannel() chan *models.UnisHttpRequest { func (u *UnisStation) GetHttpServerChannel() chan *models.UnisHttpRequest {
return u.communicateService.httpServerHandler.httpC return u.communicateService.httpServerHandler.httpC
} }
func (u *UnisStation) stop() {
if u.cancelFunc != nil {
u.cancelFunc()
}
}
// High performance threads do not allow any blocking // High performance threads do not allow any blocking
// If there is blocking work, asynchronous methods need to be used // If there is blocking work, asynchronous methods need to be used
func (u *UnisStation) mainthread(ctx context.Context) error { // Ensure the last exit
func (u *UnisStation) mainthread() error {
var ctx context.Context
ctx, u.cancelFunc = context.WithCancel(context.TODO())
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Error("Unis mesageHandle cancel by ctx") log.Error("Unis mesageHandle cancel by ctx")
return ctx.Err() return ctx.Err()
case httpMsg := <-u.communicateService.httpServerHandler.httpC: case httpMsg := <-u.communicateService.httpServerHandler.httpC:
u.communicateService.httpServerHandler.httpHandle(httpMsg) u.communicateService.httpServerHandler.httpServerHandle(httpMsg)
case httpc := <-u.communicateService.httpClientHandler.httpC: case httpMsg := <-u.communicateService.httpClientHandler.httpC:
u.communicateService.httpClientHandler.httpClientHandle(httpc) u.communicateService.httpClientHandler.httpClientHandle(httpMsg)
// default: // default:
} }
} }
} }
func (u *UnisStation) Run(ctx context.Context) error { func (u *UnisStation) Run(ctx context.Context) error {
ewgs := errgroups.NewErrGroupFunc() ewgs, nctx := errgroup.WithContext(ctx)
ewgs.Add(u.communicateService.Run) ewgs.Go(func() error {
ewgs.Add(u.mainthread) defer u.stop()
return u.communicateService.Run(nctx)
})
return ewgs.Run(ctx) ewgs.Go(func() error {
return u.mainthread()
})
if err := ewgs.Wait(); err != nil {
return err
}
return nil
} }