1、完善unishttpclient

This commit is contained in:
redhat 2025-06-04 17:07:17 +08:00
parent 813076f168
commit 39f477afe5
8 changed files with 154 additions and 76 deletions

View File

@ -43,7 +43,7 @@ func (u *HttpRoute) AddRoute(r *gin.Engine) {
unisr := r.Group(models.UnisHttpUrlPrefix) unisr := r.Group(models.UnisHttpUrlPrefix)
unisr.Use(routes.ErrWapper(u.middleWare.ginCheckServerId), routes.ErrWapper(u.middleWare.ginStoreRequest), u.middleWare.ginSetUnisHttpReq) unisr.Use(routes.ErrWapper(u.middleWare.ginCheckServerId), routes.ErrWapper(u.middleWare.ginStoreRequest), u.middleWare.ginSetUnisHttpReq)
{ {
unisr.POST(models.UNIS_HTTP_URL_CONFIG_ADD.Url(), routes.ErrWapper(u.httpHandle.stationConfig)) unisr.POST(models.UNIS_HTTP_URL_CONFIG_ADD.URL(), routes.ErrWapper(u.httpHandle.stationConfig))
} }
} }
@ -53,7 +53,7 @@ type middleWare struct {
func newmiddleWare() *middleWare { func newmiddleWare() *middleWare {
res := new(middleWare) res := new(middleWare)
res.pool.New = func() any { return createUnisHttpRequest() } res.pool.New = func() any { return createUnisHttpRequest() }
return res return res
@ -203,7 +203,7 @@ func (m *middleWare) ginSetUnisHttpReq(c *gin.Context) {
m.pool.Put(req) m.pool.Put(req)
} }
func getUnisHttpReqFromContext(c *gin.Context)(*models.UnisHttpRequest,error){ func getUnisHttpReqFromContext(c *gin.Context) (*models.UnisHttpRequest, error) {
res, ok := c.Get(models.GinContextUnisHttpReq) res, ok := c.Get(models.GinContextUnisHttpReq)
if !ok { if !ok {
return nil, models.ErrorInvalidData return nil, models.ErrorInvalidData

View File

@ -1,7 +1,11 @@
package models package models
import "strings" import (
"fmt"
"strings"
)
// http server
type UnisHttpRequest struct { type UnisHttpRequest struct {
ResC chan *UnisHttpResponse ResC chan *UnisHttpResponse
ResP *UnisHttpResponse ResP *UnisHttpResponse
@ -26,25 +30,11 @@ func (u *UnisHttpResponse) SetResParam(code int, msg, data interface{}) {
u.Data = data u.Data = data
} }
type UnisHttpClientRequest struct {
Url string
Methord string
Id string
Msg interface{}
}
type UnisHttpClientResponse struct {
Url string
Methord string
Id string
Msg interface{}
}
type UnisHttpUrl string type UnisHttpUrl string
const UnisHttpUrlPrefix = "/api/unis" const UnisHttpUrlPrefix = "/api/unis"
func (u UnisHttpUrl) Url() string { func (u UnisHttpUrl) URL() string {
return strings.TrimPrefix(string(u), UnisHttpUrlPrefix) return strings.TrimPrefix(string(u), UnisHttpUrlPrefix)
} }
@ -67,3 +57,32 @@ var mapHttpUrlId = map[UnisHttpUrl]string{
} }
var UnisHttpResponseOk = &UnisHttpResponse{Code: int(CodeSuccess), Msg: CodeSuccess.String()} var UnisHttpResponseOk = &UnisHttpResponse{Code: int(CodeSuccess), Msg: CodeSuccess.String()}
// http client
type UnisHttpClientHandlerFunc func(*UnisHttpClientRequest) error
type UnisHttpClientRequest struct {
Url string
Methord string
Msg interface{}
Sn uint64
Handle UnisHttpClientHandlerFunc
}
func (u *UnisHttpClientRequest) SetResParam(methord string, url string, body interface{}, sn uint64, handle UnisHttpClientHandlerFunc) {
u.Methord = methord
u.Url = url
u.Msg = body
u.Handle = handle
u.Sn = sn
}
type UnisHttpClientUrl string
func (u UnisHttpClientUrl) URI(host string, port int) string {
return fmt.Sprintf("http://%s:%d%s", host, port, string(u))
}
const (
UNIS_HTTP_CLIENT_URL_SOURCE_LIST UnisHttpClientUrl = "/api/unis/source-list/v1/update"
)

View File

@ -0,0 +1,28 @@
package singlemap
// NOTE Non concurrent map
type SingleMap[K comparable, T any] struct {
mp map[K]T
}
func NewSingleMap[K comparable, T any]() *SingleMap[K, T] {
res := new(SingleMap[K, T])
res.mp = make(map[K]T)
return res
}
func (s *SingleMap[K, T]) Put(key K, value T) {
s.mp[key] = value
}
func (s *SingleMap[K, T]) Get(key K) (T, bool) {
res, ok := s.mp[key]
return res, ok
}
func (s *SingleMap[K, T]) Del(key K) {
delete(s.mp, key)
}

View File

@ -16,6 +16,7 @@ type Swarm[T any] struct {
opts options opts options
} }
// TODO: 需要完善错误处理机制,重试(什么时候重试,重试几次?),投递死信队列
func NewSwarm[T any](num int, work func(T) error, opts ...Option) *Swarm[T] { func NewSwarm[T any](num int, work func(T) error, opts ...Option) *Swarm[T] {
res := new(Swarm[T]) res := new(Swarm[T])

View File

@ -16,7 +16,7 @@ func newcommonService(comm *communicateService) *commonService {
res.comm = comm res.comm = comm
res.comm.httpServerHandler.pushHandle(string(models.UNIS_HTTP_ID_CONFIG_ADD), res.stationConfig) res.comm.httpServerHandler.Put(string(models.UNIS_HTTP_ID_CONFIG_ADD), res.stationConfig)
return res return res
} }
@ -47,10 +47,17 @@ func (c *commonService) stationConfig(reqest *models.UnisHttpRequest, respons *m
unisd.UnisDataSet(unisd.DadisKey_UnisStationInfo.KeyWithChannel(0), req, 0) unisd.UnisDataSet(unisd.DadisKey_UnisStationInfo.KeyWithChannel(0), req, 0)
c.comm.httpClientHandler.curl(&models.UnisHttpClientRequest{ c.comm.httpClientHandler.curl(http.MethodGet,
Url: "http://192.168.177.7:8080/api/unis/source-list/v1/update", models.UNIS_HTTP_CLIENT_URL_SOURCE_LIST.URI("192.168.177.7", 8080),
Methord: http.MethodGet, nil,
}) func(msg *models.UnisHttpClientRequest) error {
fmt.Println(msg.Url)
if err, ok := msg.Msg.(error); ok {
fmt.Println(err)
}
return nil
})
return nil return nil
} }

View File

@ -4,49 +4,98 @@ import (
"bytes" "bytes"
"dashboard/dao/sqldb" "dashboard/dao/sqldb"
"dashboard/models" "dashboard/models"
"dashboard/pkg/singlemap"
"dashboard/pkg/swarm" "dashboard/pkg/swarm"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"net/http" "net/http"
"sync"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
) )
type httpClientHandler struct { type httpClientHandler struct {
swarm *swarm.Swarm[*models.UnisHttpClientRequest] swarm *swarm.Swarm[*models.UnisHttpClientRequest]
httpC chan *models.UnisHttpClientResponse httpC chan *models.UnisHttpClientRequest
client *http.Client client *http.Client
callMap *singlemap.SingleMap[uint64, models.UnisHttpClientHandlerFunc]
pool sync.Pool
sn uint64
} }
func newhttpClientHandle(clients int) *httpClientHandler { func newhttpClientHandle(clients int) *httpClientHandler {
res := new(httpClientHandler) res := new(httpClientHandler)
res.swarm = swarm.NewSwarm(clients, res.doRequest, swarm.WithLog(zap.NewStdLog(log.Logger))) res.swarm = swarm.NewSwarm(clients, res.doRequest, swarm.WithLog(zap.NewStdLog(log.Logger)))
res.httpC = make(chan *models.UnisHttpClientResponse) res.httpC = make(chan *models.UnisHttpClientRequest)
res.client = http.DefaultClient res.client = http.DefaultClient
res.callMap = singlemap.NewSingleMap[uint64, models.UnisHttpClientHandlerFunc]()
res.pool.New = func() any { return new(models.UnisHttpClientRequest) }
return res return res
} }
// cald by mainthred func (c *httpClientHandler) getSn() uint64 {
func (c *httpClientHandler) curl(msg *models.UnisHttpClientRequest) { c.sn++
c.swarm.Submit(msg) return c.sn
} }
// cald by mainthred // cald by mainthred
func (c *httpClientHandler) httpClientHandle(msg *models.UnisHttpClientResponse) { func (c *httpClientHandler) curl(method string, url string, body interface{}, handle models.UnisHttpClientHandlerFunc) {
fmt.Println(string(msg.Msg.([]byte))) req := c.pool.Get().(*models.UnisHttpClientRequest)
req.SetResParam(method, url, body, c.getSn(), handle)
if req.Handle != nil {
c.callMap.Put(req.Sn, req.Handle)
}
c.swarm.Submit(req)
}
// cald by mainthred
func (c *httpClientHandler) httpClientHandle(msg *models.UnisHttpClientRequest) {
defer func() {
c.pool.Put(msg)
c.callMap.Del(msg.Sn)
}()
if call, ok := c.callMap.Get(msg.Sn); ok && call != nil {
if err := call(msg); err != nil {
log.Sugar().Error(err)
return
}
log.Sugar().Infof("sn: %d, url: %s, handle ok", msg.Sn, msg.Url)
return
}
if err, ok := msg.Msg.(error); ok {
log.Sugar().Errorf("sn: %d, url: %s, no handle find and get err: %v", msg.Sn, msg.Url, err)
return
}
log.Sugar().Warnf("sn: %d, url: %s, no handle find body: %s", msg.Sn, msg.Url, string(msg.Msg.([]byte)))
} }
// Note cald by swarm goroutines // Note cald by swarm goroutines
func (c *httpClientHandler) doRequest(req *models.UnisHttpClientRequest) error { func (c *httpClientHandler) doRequest(req *models.UnisHttpClientRequest) error {
defer func() {
c.httpC <- req
}()
body, err := json.Marshal(req.Msg) body, err := json.Marshal(req.Msg)
fmt.Println(string(body), err) if err != nil {
req.Msg = err
log.Sugar().Error(err)
return err
}
reqs, err := http.NewRequest(req.Methord, req.Url, bytes.NewReader(body)) reqs, err := http.NewRequest(req.Methord, req.Url, bytes.NewReader(body))
if err != nil { if err != nil {
fmt.Println(err) req.Msg = err
log.Sugar().Error(err)
return err return err
} }
@ -57,28 +106,26 @@ func (c *httpClientHandler) doRequest(req *models.UnisHttpClientRequest) error {
resp, err := c.client.Do(reqs) resp, err := c.client.Do(reqs)
if err != nil { if err != nil {
fmt.Println(err) req.Msg = err
log.Sugar().Error(err)
return err return err
} }
defer resp.Body.Close() defer resp.Body.Close()
msg, err := io.ReadAll(resp.Body) msg, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
req.Msg = err
log.Sugar().Error(err)
return err return err
} }
if err := sqldb.ResFulDataStore(req.Methord, req.Url, string(body), string(msg), 0); err != nil { if err := sqldb.ResFulDataStore(req.Methord, req.Url, string(body), string(msg), 0); err != nil {
fmt.Println(err) req.Msg = err
log.Sugar().Error(err)
return err return err
} }
response := &models.UnisHttpClientResponse{ req.Msg = msg
Id: req.Id,
Methord: req.Methord,
Msg: msg,
}
c.httpC <- response
return nil return nil
} }

View File

@ -2,21 +2,22 @@ package uniss
import ( import (
"dashboard/models" "dashboard/models"
"dashboard/pkg/singlemap"
) )
type httpServerHandler struct { type httpServerHandler struct {
httpC chan *models.UnisHttpRequest httpC chan *models.UnisHttpRequest
// mu sync.RWMutex // 没有必要使用锁应为完全可以保证在一个协程内访问map // mu sync.RWMutex // 没有必要使用锁应为完全可以保证在一个协程内访问map
mapFn map[string]HttpServerHandlerFunc *singlemap.SingleMap[string,httpServerHandlerFunc]
} }
type HttpServerHandlerFunc func(*models.UnisHttpRequest, *models.UnisHttpResponse) error type httpServerHandlerFunc func(*models.UnisHttpRequest, *models.UnisHttpResponse) error
func newhttpServerHandler() *httpServerHandler { func newhttpServerHandler() *httpServerHandler {
res := new(httpServerHandler) res := new(httpServerHandler)
res.httpC = make(chan *models.UnisHttpRequest) res.httpC = make(chan *models.UnisHttpRequest)
res.mapFn = make(map[string]HttpServerHandlerFunc) res.SingleMap = singlemap.NewSingleMap[string,httpServerHandlerFunc]()
return res return res
} }
@ -31,35 +32,8 @@ func (u *httpServerHandler) httpHandle(msg *models.UnisHttpRequest) {
} }
}() }()
fn, ok := u.getHandle(msg.Id) if fn, ok := u.Get(msg.Id); ok && fn != nil {
if ok {
if err := fn(msg, resP); err == nil { if err := fn(msg, resP); err == nil {
} }
} }
} }
func (u *httpServerHandler) pushHandle(key string, handle HttpServerHandlerFunc) {
// u.mu.Lock()
// defer u.mu.Unlock()
u.mapFn[key] = handle
}
func (u *httpServerHandler) delHandle(key string) {
// u.mu.Lock()
// defer u.mu.Unlock()
delete(u.mapFn, key)
}
func (u *httpServerHandler) getHandle(key string) (HttpServerHandlerFunc, bool) {
// u.mu.RLock()
// defer u.mu.RUnlock()
res, ok := u.mapFn[key]
if !ok {
return nil, false
}
return res, ok
}

View File

@ -38,6 +38,8 @@ func (u *UnisStation) GetHttpChannel() chan *models.UnisHttpRequest {
return u.communicateService.httpServerHandler.httpC return u.communicateService.httpServerHandler.httpC
} }
// High performance threads do not allow any blocking
// If there is blocking work, asynchronous methods need to be used
func (u *UnisStation) mainthread(ctx context.Context) error { func (u *UnisStation) mainthread(ctx context.Context) error {
for { for {
select { select {