diff --git a/config/config.yaml b/config/config.yaml index 2ecf004..ac898b4 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -27,10 +27,10 @@ unis: type: "json" network: "tcp" host: "" - base_port: 5500 + port: 5500 httpc: clients: 5 - instances: 2 + instances: 1 # 使用令牌桶限流 rate: diff --git a/controller/unisc/httphandle.go b/controller/unisc/httphandle.go index f2e0b5c..e6293f8 100644 --- a/controller/unisc/httphandle.go +++ b/controller/unisc/httphandle.go @@ -4,7 +4,6 @@ import ( "dashboard/models" "dashboard/utils" "net/http" - "sync" "time" "github.com/gin-gonic/gin" @@ -12,30 +11,25 @@ import ( type httpHandle struct { httpC []chan *models.UnisHttpRequest - pool sync.Pool } func newhttpHandle(httpc []chan *models.UnisHttpRequest) *httpHandle { res := new(httpHandle) res.httpC = httpc - res.pool.New = func() any { return new(models.UnisHttpRequest) } return res } -func httpSendWithExpire(httpc chan *models.UnisHttpRequest, data *models.UnisHttpRequest) (*models.UnisHttpResponse, error) { - resc := make(chan *models.UnisHttpResponse) - data.ResC = resc - +func (u *httpHandle)httpSendWithExpire(req *models.UnisHttpRequest) (*models.UnisHttpResponse, error) { select { - case httpc <- data: + case u.httpC[0] <- req: case <-time.After(2 * time.Second): return nil, models.ErrorSendTimeOut } select { - case resp := <-resc: + case resp := <-req.ResC: return resp, nil case <-time.After(2 * time.Second): return nil, models.ErrorReciveTimeOut @@ -51,11 +45,12 @@ func (u *httpHandle) stationConfig(c *gin.Context) error { return err } - channel := c.GetInt(models.GinContextChannel) - if channel > len(u.httpC)-1 { - log.Sugar().Errorf("stationConfig channel err: %d", channel) - return models.ErrorParamsErr - } + // 不再使用channel的方式,而是使用配置文件启动不同进程的方式 + // channel := c.GetInt(models.GinContextChannel) + // if channel > len(u.httpC)-1 { + // log.Sugar().Errorf("stationConfig channel err: %d", channel) + // return models.ErrorParamsErr + // } // if !config.StationConfig.ConfigType { // res, ok := unisd.UnisDataGet(unisd.DadisKey_UnisStationInfo.KeyWithChannel(channel)) @@ -72,16 +67,20 @@ func (u *httpHandle) stationConfig(c *gin.Context) error { // } // } - req := u.pool.Get().(*models.UnisHttpRequest) - req.Id = models.UnisHttpUrl(c.Request.URL.String()).GetMsgId() - req.Msg = config + req, err := getUnisHttpReqFromContext(c) + if err != nil { + log.Sugar().Errorf("stationConfig get http req from context err: %v", err) + return models.ErrorParamsErr + } + + req.SetReqParam(models.UnisHttpUrl(c.Request.URL.String()).GetMsgId(), config) // req := &models.UnisHttpRequest{ // Id: models.UnisHttpUrl(c.Request.URL.String()).GetMsgId(), // Msg: config, // } - resp, err := httpSendWithExpire(u.httpC[channel], req) + resp, err := u.httpSendWithExpire(req) if err != nil { log.Sugar().Errorf("stationConfig send or recive err: %v", err) return err diff --git a/controller/unisc/httproute.go b/controller/unisc/httproute.go index 0ec8129..e5e1597 100644 --- a/controller/unisc/httproute.go +++ b/controller/unisc/httproute.go @@ -10,6 +10,7 @@ import ( "io" "net/http" "strconv" + "sync" "time" "github.com/gin-gonic/gin" @@ -40,17 +41,31 @@ func NewHttpRoute(httpc ...UnisHttpChanneler) *HttpRoute { func (u *HttpRoute) AddRoute(r *gin.Engine) { unisr := r.Group(models.UnisHttpUrlPrefix) - unisr.Use(routes.ErrWapper(u.middleWare.ginCheckServerId), routes.ErrWapper(u.middleWare.ginStoreRequest)) + unisr.Use(routes.ErrWapper(u.middleWare.ginCheckServerId), routes.ErrWapper(u.middleWare.ginStoreRequest), u.middleWare.ginSetUnisHttpReq) { unisr.POST(models.UNIS_HTTP_URL_CONFIG_ADD.Url(), routes.ErrWapper(u.httpHandle.stationConfig)) } } type middleWare struct { + pool sync.Pool } func newmiddleWare() *middleWare { - return new(middleWare) + res := new(middleWare) + + res.pool.New = func() any { return createUnisHttpRequest() } + + return res +} + +func createUnisHttpRequest() *models.UnisHttpRequest { + res := new(models.UnisHttpRequest) + + res.ResC = make(chan *models.UnisHttpResponse) + res.ResP = new(models.UnisHttpResponse) + + return res } func (m *middleWare) ginCheckServerId(c *gin.Context) error { @@ -160,7 +175,7 @@ func (m *middleWare) ginWithUnisObject(unis *uniss.UnisStation) gin.HandlerFunc } } -func (m *middleWare) getUnisObjectFromContext(c *gin.Context) (*uniss.UnisStation, error) { +func getUnisObjectFromContext(c *gin.Context) (*uniss.UnisStation, error) { res, ok := c.Get(models.GinContextUnis) if !ok { return nil, models.ErrorInvalidData @@ -180,3 +195,24 @@ func (m *middleWare) ginWithExpireTime(expire time.Duration) gin.HandlerFunc { c.Next() } } + +func (m *middleWare) ginSetUnisHttpReq(c *gin.Context) { + req := m.pool.Get() + c.Set(models.GinContextUnisHttpReq, req) + c.Next() + m.pool.Put(req) +} + +func getUnisHttpReqFromContext(c *gin.Context)(*models.UnisHttpRequest,error){ + res, ok := c.Get(models.GinContextUnisHttpReq) + if !ok { + return nil, models.ErrorInvalidData + } + + unis, ok := res.(*models.UnisHttpRequest) + if !ok { + return nil, models.ErrorInvalidData + } + + return unis, nil +} diff --git a/models/type.go b/models/type.go index 9850fdc..a4183a7 100644 --- a/models/type.go +++ b/models/type.go @@ -6,11 +6,12 @@ import ( ) const ( - GinContextLog = "log" - GinContextChannel = "channel" - GinContextServerId = "serverid" - GinContextUnis = "unis" - GinContextExpire = "expire" + GinContextLog = "log" + GinContextChannel = "channel" + GinContextServerId = "serverid" + GinContextUnis = "unis" + GinContextExpire = "expire" + GinContextUnisHttpReq = "unis_http_req" ) const ( diff --git a/models/unishttpparams.go b/models/unishttpparams.go index 0c4e41f..2f54aa4 100644 --- a/models/unishttpparams.go +++ b/models/unishttpparams.go @@ -3,10 +3,15 @@ package models import "strings" type UnisHttpRequest struct { - ResC chan *UnisHttpResponse - Channel int - Id string - Msg interface{} + ResC chan *UnisHttpResponse + ResP *UnisHttpResponse + Id string + Msg interface{} +} + +func (u *UnisHttpRequest) SetReqParam(id string, msg interface{}) { + u.Id = id + u.Msg = msg } type UnisHttpResponse struct { @@ -15,6 +20,12 @@ type UnisHttpResponse struct { Data interface{} } +func (u *UnisHttpResponse) SetResParam(code int, msg, data interface{}) { + u.Code = code + u.Msg = msg + u.Data = data +} + type UnisHttpClientRequest struct { Url string Methord string diff --git a/services/uniss/common.go b/services/uniss/common.go index b4dd28f..ff7c07c 100644 --- a/services/uniss/common.go +++ b/services/uniss/common.go @@ -43,10 +43,9 @@ func (c *commonService) stationConfig(reqest *models.UnisHttpRequest, respons *m // Code: int(models.CodeSuccess), // Msg: "config ok", // } - respons.Code = int(models.CodeSuccess) - respons.Msg = "config ok" + respons.SetResParam(int(models.CodeSuccess), "config ok", nil) - unisd.UnisDataSet(unisd.DadisKey_UnisStationInfo.KeyWithChannel(reqest.Channel), req, 0) + unisd.UnisDataSet(unisd.DadisKey_UnisStationInfo.KeyWithChannel(0), req, 0) c.comm.httpClientHandler.curl(&models.UnisHttpClientRequest{ Url: "http://192.168.177.7:8080/api/unis/source-list/v1/update", diff --git a/services/uniss/communicate.go b/services/uniss/communicate.go index 4a0a91d..5c346d8 100644 --- a/services/uniss/communicate.go +++ b/services/uniss/communicate.go @@ -21,7 +21,7 @@ func newcommunicateService(conf *settings.UnisConfig) (res *communicateService, var rpcConf = &rpcsup.RpcConfig{ Typec: conf.RpcConfig.Type, Network: conf.RpcConfig.Network, - Address: fmt.Sprintf("%s:%d", conf.RpcConfig.Host, conf.RpcConfig.BasePort+conf.Instances)} + Address: fmt.Sprintf("%s:%d", conf.RpcConfig.Host, conf.RpcConfig.Port+conf.Instances)} res.rpcService, err = rpcsup.NewRpcService(rpcConf) if err != nil { diff --git a/services/uniss/httpclienthandle.go b/services/uniss/httpclienthandle.go index 188107d..571ea16 100644 --- a/services/uniss/httpclienthandle.go +++ b/services/uniss/httpclienthandle.go @@ -9,27 +9,37 @@ import ( "fmt" "io" "net/http" + "time" "go.uber.org/zap" ) type httpClientHandler struct { - swarm *swarm.Swarm[*models.UnisHttpClientRequest] - httpC chan *models.UnisHttpClientResponse + swarm *swarm.Swarm[*models.UnisHttpClientRequest] + httpC chan *models.UnisHttpClientResponse + client *http.Client } func newhttpClientHandle(clients int) *httpClientHandler { res := new(httpClientHandler) res.swarm = swarm.NewSwarm(clients, res.doRequest, swarm.WithLog(zap.NewStdLog(log.Logger))) res.httpC = make(chan *models.UnisHttpClientResponse) + res.client = http.DefaultClient return res } +// cald by mainthred func (c *httpClientHandler) curl(msg *models.UnisHttpClientRequest) { c.swarm.Submit(msg) } +// cald by mainthred +func (c *httpClientHandler) httpClientHandle(msg *models.UnisHttpClientResponse) { + fmt.Println(string(msg.Msg.([]byte))) +} + +// Note cald by swarm goroutines func (c *httpClientHandler) doRequest(req *models.UnisHttpClientRequest) error { body, err := json.Marshal(req.Msg) fmt.Println(string(body), err) @@ -40,10 +50,12 @@ func (c *httpClientHandler) doRequest(req *models.UnisHttpClientRequest) error { return err } - reqs.Header.Add("Content-type", "application/json;charset=utf-8") + c.client.Timeout = time.Second - client := http.Client{} - resp, err := client.Do(reqs) + reqs.Header.Add("Content-type", "application/json;charset=utf-8") + reqs.Header.Add("User-Agent", "avcnetRuntime/0.0.1") + + resp, err := c.client.Do(reqs) if err != nil { fmt.Println(err) return err diff --git a/services/uniss/httpserverhandle.go b/services/uniss/httpserverhandle.go index 95bb81a..1e7e0a7 100644 --- a/services/uniss/httpserverhandle.go +++ b/services/uniss/httpserverhandle.go @@ -2,14 +2,12 @@ package uniss import ( "dashboard/models" - "sync" ) type httpServerHandler struct { httpC chan *models.UnisHttpRequest - mu sync.RWMutex + // mu sync.RWMutex // 没有必要使用锁,应为完全可以保证在一个协程内访问map mapFn map[string]HttpServerHandlerFunc - pool sync.Pool } type HttpServerHandlerFunc func(*models.UnisHttpRequest, *models.UnisHttpResponse) error @@ -19,45 +17,44 @@ func newhttpServerHandler() *httpServerHandler { res.httpC = make(chan *models.UnisHttpRequest) res.mapFn = make(map[string]HttpServerHandlerFunc) - res.pool.New = func() any { return new(models.UnisHttpResponse) } return res } func (u *httpServerHandler) httpHandle(msg *models.UnisHttpRequest) { - res := u.pool.Get().(*models.UnisHttpResponse) + resP := msg.ResP resC := msg.ResC defer func() { if resC != nil { - resC <- res + resC <- resP } }() fn, ok := u.getHandle(msg.Id) if ok { - if err := fn(msg, res); err == nil { + if err := fn(msg, resP); err == nil { } } } func (u *httpServerHandler) pushHandle(key string, handle HttpServerHandlerFunc) { - u.mu.Lock() - defer u.mu.Unlock() + // u.mu.Lock() + // defer u.mu.Unlock() u.mapFn[key] = handle } func (u *httpServerHandler) delHandle(key string) { - u.mu.Lock() - defer u.mu.Unlock() + // u.mu.Lock() + // defer u.mu.Unlock() delete(u.mapFn, key) } func (u *httpServerHandler) getHandle(key string) (HttpServerHandlerFunc, bool) { - u.mu.RLock() - defer u.mu.RUnlock() + // u.mu.RLock() + // defer u.mu.RUnlock() res, ok := u.mapFn[key] if !ok { diff --git a/services/uniss/main.go b/services/uniss/main.go index 3e4c603..f09f8e2 100644 --- a/services/uniss/main.go +++ b/services/uniss/main.go @@ -9,7 +9,6 @@ import ( "dashboard/models" "dashboard/pkg/errgroups" "dashboard/settings" - "fmt" ) var log *logger.Logger @@ -48,7 +47,7 @@ func (u *UnisStation) mainthread(ctx context.Context) error { case httpMsg := <-u.communicateService.httpServerHandler.httpC: u.communicateService.httpServerHandler.httpHandle(httpMsg) case httpc := <-u.communicateService.httpClientHandler.httpC: - fmt.Println(string(httpc.Msg.([]byte))) + u.communicateService.httpClientHandler.httpClientHandle(httpc) // default: } diff --git a/services/uniss/rpcclient.go b/services/uniss/rpcclient.go index 29ceb61..d7bd81c 100644 --- a/services/uniss/rpcclient.go +++ b/services/uniss/rpcclient.go @@ -43,7 +43,7 @@ func (r *rpcClientStub) flushRpcClients(ids []*rpcIdAddres) { } for _, idAddr := range ids { - address := fmt.Sprintf("%s:%d", idAddr.host, idAddr.index+r.conf.BasePort) + address := fmt.Sprintf("%s:%d", idAddr.host, idAddr.index+r.conf.Port) cli, err := rpcsup.RpcClient(&rpcsup.RpcConfig{ Typec: r.conf.Type, diff --git a/settings/type.go b/settings/type.go index d02471b..c8f3192 100644 --- a/settings/type.go +++ b/settings/type.go @@ -42,10 +42,10 @@ type UnisConfig struct { } type RpcConfig struct { - BasePort int `mapstructure:"base_port"` - Network string `mapstructure:"network"` - Host string `mapstructure:"host"` - Type string `mapstructure:"type"` + Port int `mapstructure:"port"` + Network string `mapstructure:"network"` + Host string `mapstructure:"host"` + Type string `mapstructure:"type"` } type HttpClientConfig struct {