diff --git a/controller/unisc/httproute.go b/controller/unisc/httproute.go index e5e1597..fa1d32d 100644 --- a/controller/unisc/httproute.go +++ b/controller/unisc/httproute.go @@ -43,7 +43,7 @@ func (u *HttpRoute) AddRoute(r *gin.Engine) { unisr := r.Group(models.UnisHttpUrlPrefix) unisr.Use(routes.ErrWapper(u.middleWare.ginCheckServerId), routes.ErrWapper(u.middleWare.ginStoreRequest), u.middleWare.ginSetUnisHttpReq) { - unisr.POST(models.UNIS_HTTP_URL_CONFIG_ADD.Url(), routes.ErrWapper(u.httpHandle.stationConfig)) + unisr.POST(models.UNIS_HTTP_URL_CONFIG_ADD.URL(), routes.ErrWapper(u.httpHandle.stationConfig)) } } @@ -53,7 +53,7 @@ type middleWare struct { func newmiddleWare() *middleWare { res := new(middleWare) - + res.pool.New = func() any { return createUnisHttpRequest() } return res @@ -203,7 +203,7 @@ func (m *middleWare) ginSetUnisHttpReq(c *gin.Context) { m.pool.Put(req) } -func getUnisHttpReqFromContext(c *gin.Context)(*models.UnisHttpRequest,error){ +func getUnisHttpReqFromContext(c *gin.Context) (*models.UnisHttpRequest, error) { res, ok := c.Get(models.GinContextUnisHttpReq) if !ok { return nil, models.ErrorInvalidData diff --git a/models/unishttpparams.go b/models/unishttpparams.go index 2f54aa4..423c6cf 100644 --- a/models/unishttpparams.go +++ b/models/unishttpparams.go @@ -1,7 +1,11 @@ package models -import "strings" +import ( + "fmt" + "strings" +) +// http server type UnisHttpRequest struct { ResC chan *UnisHttpResponse ResP *UnisHttpResponse @@ -26,25 +30,11 @@ func (u *UnisHttpResponse) SetResParam(code int, msg, data interface{}) { u.Data = data } -type UnisHttpClientRequest struct { - Url string - Methord string - Id string - Msg interface{} -} - -type UnisHttpClientResponse struct { - Url string - Methord string - Id string - Msg interface{} -} - type UnisHttpUrl string const UnisHttpUrlPrefix = "/api/unis" -func (u UnisHttpUrl) Url() string { +func (u UnisHttpUrl) URL() string { return strings.TrimPrefix(string(u), UnisHttpUrlPrefix) } @@ -67,3 +57,32 @@ var mapHttpUrlId = map[UnisHttpUrl]string{ } var UnisHttpResponseOk = &UnisHttpResponse{Code: int(CodeSuccess), Msg: CodeSuccess.String()} + +// http client +type UnisHttpClientHandlerFunc func(*UnisHttpClientRequest) error + +type UnisHttpClientRequest struct { + Url string + Methord string + Msg interface{} + Sn uint64 + Handle UnisHttpClientHandlerFunc +} + +func (u *UnisHttpClientRequest) SetResParam(methord string, url string, body interface{}, sn uint64, handle UnisHttpClientHandlerFunc) { + u.Methord = methord + u.Url = url + u.Msg = body + u.Handle = handle + u.Sn = sn +} + +type UnisHttpClientUrl string + +func (u UnisHttpClientUrl) URI(host string, port int) string { + return fmt.Sprintf("http://%s:%d%s", host, port, string(u)) +} + +const ( + UNIS_HTTP_CLIENT_URL_SOURCE_LIST UnisHttpClientUrl = "/api/unis/source-list/v1/update" +) diff --git a/pkg/singlemap/singlemap.go b/pkg/singlemap/singlemap.go new file mode 100644 index 0000000..d261942 --- /dev/null +++ b/pkg/singlemap/singlemap.go @@ -0,0 +1,28 @@ +package singlemap + +// NOTE Non concurrent map + +type SingleMap[K comparable, T any] struct { + mp map[K]T +} + +func NewSingleMap[K comparable, T any]() *SingleMap[K, T] { + res := new(SingleMap[K, T]) + + res.mp = make(map[K]T) + + return res +} + +func (s *SingleMap[K, T]) Put(key K, value T) { + s.mp[key] = value +} + +func (s *SingleMap[K, T]) Get(key K) (T, bool) { + res, ok := s.mp[key] + return res, ok +} + +func (s *SingleMap[K, T]) Del(key K) { + delete(s.mp, key) +} diff --git a/pkg/swarm/swarm.go b/pkg/swarm/swarm.go index 021a288..3f6c5d5 100644 --- a/pkg/swarm/swarm.go +++ b/pkg/swarm/swarm.go @@ -16,6 +16,7 @@ type Swarm[T any] struct { opts options } +// TODO: 需要完善错误处理机制,重试(什么时候重试,重试几次?),投递死信队列 func NewSwarm[T any](num int, work func(T) error, opts ...Option) *Swarm[T] { res := new(Swarm[T]) diff --git a/services/uniss/common.go b/services/uniss/common.go index ff7c07c..885cb02 100644 --- a/services/uniss/common.go +++ b/services/uniss/common.go @@ -16,7 +16,7 @@ func newcommonService(comm *communicateService) *commonService { res.comm = comm - res.comm.httpServerHandler.pushHandle(string(models.UNIS_HTTP_ID_CONFIG_ADD), res.stationConfig) + res.comm.httpServerHandler.Put(string(models.UNIS_HTTP_ID_CONFIG_ADD), res.stationConfig) return res } @@ -47,10 +47,17 @@ func (c *commonService) stationConfig(reqest *models.UnisHttpRequest, respons *m unisd.UnisDataSet(unisd.DadisKey_UnisStationInfo.KeyWithChannel(0), req, 0) - c.comm.httpClientHandler.curl(&models.UnisHttpClientRequest{ - Url: "http://192.168.177.7:8080/api/unis/source-list/v1/update", - Methord: http.MethodGet, - }) + c.comm.httpClientHandler.curl(http.MethodGet, + models.UNIS_HTTP_CLIENT_URL_SOURCE_LIST.URI("192.168.177.7", 8080), + nil, + func(msg *models.UnisHttpClientRequest) error { + fmt.Println(msg.Url) + + if err, ok := msg.Msg.(error); ok { + fmt.Println(err) + } + return nil + }) return nil } diff --git a/services/uniss/httpclienthandle.go b/services/uniss/httpclienthandle.go index 571ea16..771f385 100644 --- a/services/uniss/httpclienthandle.go +++ b/services/uniss/httpclienthandle.go @@ -4,49 +4,98 @@ import ( "bytes" "dashboard/dao/sqldb" "dashboard/models" + "dashboard/pkg/singlemap" "dashboard/pkg/swarm" "encoding/json" - "fmt" "io" "net/http" + "sync" "time" "go.uber.org/zap" ) type httpClientHandler struct { - swarm *swarm.Swarm[*models.UnisHttpClientRequest] - httpC chan *models.UnisHttpClientResponse - client *http.Client + swarm *swarm.Swarm[*models.UnisHttpClientRequest] + httpC chan *models.UnisHttpClientRequest + client *http.Client + callMap *singlemap.SingleMap[uint64, models.UnisHttpClientHandlerFunc] + pool sync.Pool + sn uint64 } func newhttpClientHandle(clients int) *httpClientHandler { res := new(httpClientHandler) + res.swarm = swarm.NewSwarm(clients, res.doRequest, swarm.WithLog(zap.NewStdLog(log.Logger))) - res.httpC = make(chan *models.UnisHttpClientResponse) + res.httpC = make(chan *models.UnisHttpClientRequest) res.client = http.DefaultClient + res.callMap = singlemap.NewSingleMap[uint64, models.UnisHttpClientHandlerFunc]() + res.pool.New = func() any { return new(models.UnisHttpClientRequest) } return res } -// cald by mainthred -func (c *httpClientHandler) curl(msg *models.UnisHttpClientRequest) { - c.swarm.Submit(msg) +func (c *httpClientHandler) getSn() uint64 { + c.sn++ + return c.sn } // cald by mainthred -func (c *httpClientHandler) httpClientHandle(msg *models.UnisHttpClientResponse) { - fmt.Println(string(msg.Msg.([]byte))) +func (c *httpClientHandler) curl(method string, url string, body interface{}, handle models.UnisHttpClientHandlerFunc) { + req := c.pool.Get().(*models.UnisHttpClientRequest) + + req.SetResParam(method, url, body, c.getSn(), handle) + + if req.Handle != nil { + c.callMap.Put(req.Sn, req.Handle) + } + c.swarm.Submit(req) +} + +// cald by mainthred +func (c *httpClientHandler) httpClientHandle(msg *models.UnisHttpClientRequest) { + defer func() { + c.pool.Put(msg) + c.callMap.Del(msg.Sn) + }() + + if call, ok := c.callMap.Get(msg.Sn); ok && call != nil { + if err := call(msg); err != nil { + log.Sugar().Error(err) + return + } + + log.Sugar().Infof("sn: %d, url: %s, handle ok", msg.Sn, msg.Url) + + return + } + + if err, ok := msg.Msg.(error); ok { + log.Sugar().Errorf("sn: %d, url: %s, no handle find and get err: %v", msg.Sn, msg.Url, err) + return + } + + log.Sugar().Warnf("sn: %d, url: %s, no handle find body: %s", msg.Sn, msg.Url, string(msg.Msg.([]byte))) } // Note cald by swarm goroutines func (c *httpClientHandler) doRequest(req *models.UnisHttpClientRequest) error { + defer func() { + c.httpC <- req + }() + body, err := json.Marshal(req.Msg) - fmt.Println(string(body), err) + if err != nil { + req.Msg = err + log.Sugar().Error(err) + return err + } reqs, err := http.NewRequest(req.Methord, req.Url, bytes.NewReader(body)) if err != nil { - fmt.Println(err) + req.Msg = err + log.Sugar().Error(err) return err } @@ -57,28 +106,26 @@ func (c *httpClientHandler) doRequest(req *models.UnisHttpClientRequest) error { resp, err := c.client.Do(reqs) if err != nil { - fmt.Println(err) + req.Msg = err + log.Sugar().Error(err) return err } defer resp.Body.Close() msg, err := io.ReadAll(resp.Body) if err != nil { + req.Msg = err + log.Sugar().Error(err) return err } if err := sqldb.ResFulDataStore(req.Methord, req.Url, string(body), string(msg), 0); err != nil { - fmt.Println(err) + req.Msg = err + log.Sugar().Error(err) return err } - response := &models.UnisHttpClientResponse{ - Id: req.Id, - Methord: req.Methord, - Msg: msg, - } - - c.httpC <- response + req.Msg = msg return nil } diff --git a/services/uniss/httpserverhandle.go b/services/uniss/httpserverhandle.go index 1e7e0a7..b119cd9 100644 --- a/services/uniss/httpserverhandle.go +++ b/services/uniss/httpserverhandle.go @@ -2,21 +2,22 @@ package uniss import ( "dashboard/models" + "dashboard/pkg/singlemap" ) type httpServerHandler struct { httpC chan *models.UnisHttpRequest // mu sync.RWMutex // 没有必要使用锁,应为完全可以保证在一个协程内访问map - mapFn map[string]HttpServerHandlerFunc + *singlemap.SingleMap[string,httpServerHandlerFunc] } -type HttpServerHandlerFunc func(*models.UnisHttpRequest, *models.UnisHttpResponse) error +type httpServerHandlerFunc func(*models.UnisHttpRequest, *models.UnisHttpResponse) error func newhttpServerHandler() *httpServerHandler { res := new(httpServerHandler) res.httpC = make(chan *models.UnisHttpRequest) - res.mapFn = make(map[string]HttpServerHandlerFunc) + res.SingleMap = singlemap.NewSingleMap[string,httpServerHandlerFunc]() return res } @@ -31,35 +32,8 @@ func (u *httpServerHandler) httpHandle(msg *models.UnisHttpRequest) { } }() - fn, ok := u.getHandle(msg.Id) - if ok { + if fn, ok := u.Get(msg.Id); ok && fn != nil { if err := fn(msg, resP); err == nil { } } } - -func (u *httpServerHandler) pushHandle(key string, handle HttpServerHandlerFunc) { - // u.mu.Lock() - // defer u.mu.Unlock() - - u.mapFn[key] = handle -} - -func (u *httpServerHandler) delHandle(key string) { - // u.mu.Lock() - // defer u.mu.Unlock() - - delete(u.mapFn, key) -} - -func (u *httpServerHandler) getHandle(key string) (HttpServerHandlerFunc, bool) { - // u.mu.RLock() - // defer u.mu.RUnlock() - - res, ok := u.mapFn[key] - if !ok { - return nil, false - } - - return res, ok -} diff --git a/services/uniss/main.go b/services/uniss/main.go index f09f8e2..34a4b49 100644 --- a/services/uniss/main.go +++ b/services/uniss/main.go @@ -38,6 +38,8 @@ func (u *UnisStation) GetHttpChannel() chan *models.UnisHttpRequest { return u.communicateService.httpServerHandler.httpC } +// High performance threads do not allow any blocking +// If there is blocking work, asynchronous methods need to be used func (u *UnisStation) mainthread(ctx context.Context) error { for { select {