From 5a24eff7f17e940591ba72b8998d43876a36791f Mon Sep 17 00:00:00 2001 From: redhat <2292650292@qq.com> Date: Tue, 3 Jun 2025 17:35:50 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E4=B8=AA=E6=95=B0=E8=BF=9B=E8=A1=8C=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=EF=BC=8C=E8=80=8C=E4=B8=8D=E6=98=AF=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E6=95=B0=E7=BB=84=E3=80=822=E3=80=81errorgroups=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=E4=BD=BF=E7=94=A8=E5=8F=82=E6=95=B0=E5=88=97=E8=A1=A8?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E6=B3=A8=E5=86=8C=E3=80=823=E3=80=81?= =?UTF-8?q?=E7=A8=8B=E5=BA=8F=E7=BB=93=E6=9D=9F=E6=97=B6defer=E5=85=B3?= =?UTF-8?q?=E9=97=AD=E6=95=B0=E6=8D=AE=E5=BA=93=E3=80=824=E3=80=81http?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=E4=BD=BF=E7=94=A8syncpool=E6=9D=A5=E6=8F=90?= =?UTF-8?q?=E9=AB=98=E6=80=A7=E8=83=BD=E3=80=825=E3=80=81httproute?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=E4=BD=BF=E7=94=A8=E6=8E=A5=E5=8F=A3=E8=BF=9B?= =?UTF-8?q?=E8=A1=8C=E9=80=9A=E4=BF=A1=E7=AE=A1=E9=81=93=E7=9A=84=E4=BC=A0?= =?UTF-8?q?=E9=80=92=E3=80=826=E3=80=81=E4=B8=AD=E9=97=B4=E4=BB=B6?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9B=9E=E5=A4=8D=E6=95=B0=E6=8D=AE=E7=9A=84?= =?UTF-8?q?=E4=BF=9D=E5=AD=98=E3=80=827=E3=80=81rpc=E5=B0=BD=E9=87=8F?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=8F=AF=E4=BB=A5=E4=BD=BF=E7=94=A8unix?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E3=80=828=E3=80=81rpcserver=E5=8F=AF?= =?UTF-8?q?=E4=BB=A5=E8=87=AA=E9=80=82=E5=BA=94jsonrpc=E6=88=96=E8=80=85go?= =?UTF-8?q?brpc=E3=80=829=E3=80=81swarm=E6=A8=A1=E5=9D=97=E8=BF=9B?= =?UTF-8?q?=E8=A1=8C=E9=87=8D=E5=A4=A7=E8=B0=83=E6=95=B4=EF=BC=8C=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E8=B0=83=E5=BA=A6=E5=86=99=E6=88=90=E7=AD=89=E5=BE=85?= =?UTF-8?q?=E5=B7=A5=E4=BD=9C=E5=8D=8F=E7=A8=8B=E7=BB=93=E6=9D=9F=E4=B9=8B?= =?UTF-8?q?=E5=90=8E=E5=86=8D=E7=BB=93=E6=9D=9F=EF=BC=88=E4=BC=9A=E5=8F=91?= =?UTF-8?q?=E7=94=9F=E6=AD=BB=E9=94=81=E5=AF=BC=E8=87=B4=E7=A8=8B=E5=BA=8F?= =?UTF-8?q?=E6=97=A0=E6=B3=95=E7=BB=93=E6=9D=9F=EF=BC=89=EF=BC=8C=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E8=B0=83=E5=BA=A6=E5=8D=8F=E7=A8=8B=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E6=89=A7=E8=A1=8Cdefer=E5=87=BD=E6=95=B0=E9=97=AE=E9=A2=98?= =?UTF-8?q?=EF=BC=8C=E5=8E=9F=E5=9B=A0=E6=98=AFmain=E7=BB=93=E6=9D=9F?= =?UTF-8?q?=E5=A4=AA=E5=BF=AB=E5=AF=BC=E8=87=B4=E8=B0=83=E5=BA=A6=E5=8D=8F?= =?UTF-8?q?=E7=A8=8B=E7=9A=84defer=E6=9D=A5=E4=B8=8D=E5=8F=8A=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=EF=BC=8C=E4=BD=BF=E7=94=A8=E5=87=BD=E6=95=B0=E9=80=89?= =?UTF-8?q?=E9=A1=B9=E6=9D=A5=E6=B3=A8=E5=85=A5log=E3=80=8210=E3=80=81unis?= =?UTF-8?q?=E4=B8=9A=E5=8A=A1=E8=BF=9B=E8=A1=8C=E9=87=8D=E5=A4=A7=E8=B0=83?= =?UTF-8?q?=E6=95=B4=E3=80=8211=E3=80=81=E6=89=80=E6=9C=89=E7=9A=86?= =?UTF-8?q?=E9=87=87=E7=94=A8=E9=9D=A2=E5=90=91=E5=AF=B9=E8=B1=A1=E7=9A=84?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.yaml | 10 ++- controller/unisc/httphandle.go | 63 ++++++++++------ controller/unisc/httproute.go | 80 ++++++++++++++++---- dao/dadis/unisd/type.go | 8 +- dao/sqldb/sqlite/tablecreate.go | 7 +- dao/sqldb/unisresfulstore.go | 12 +-- main.go | 38 ++++++++-- models/type.go | 11 ++- models/unishttpparams.go | 29 ++++++- models/unisrpcparams.go | 14 ++-- pkg/dadis/dadis.go | 4 +- pkg/errgroups/errgroups.go | 42 +++++++---- pkg/rpcsup/client.go | 26 +++++++ pkg/rpcsup/jsonrpc.go | 74 ------------------ pkg/rpcsup/rpcsup.go | 103 +++++++++++++++++++++++++ pkg/swarm/option.go | 22 ++++++ pkg/swarm/scheduler.go | 69 +++++++++++++++++ pkg/swarm/swarm.go | 117 +++++++++++++++++++++++++++++ pkg/swarm/swarm_test.go | 32 ++++++++ services/uniss/common.go | 34 ++++++--- services/uniss/communicate.go | 54 +++++++++++++ services/uniss/datacenter.go | 20 +++++ services/uniss/httpclienthandle.go | 72 ++++++++++++++++++ services/uniss/httphandle.go | 66 ---------------- services/uniss/httpserverhandle.go | 68 +++++++++++++++++ services/uniss/main.go | 49 ++++++------ services/uniss/rpcclient.go | 35 +++++---- services/uniss/rpcservice.go | 47 +++--------- settings/type.go | 15 +++- utils/runtime.go | 9 +++ 30 files changed, 912 insertions(+), 318 deletions(-) create mode 100644 pkg/rpcsup/client.go delete mode 100644 pkg/rpcsup/jsonrpc.go create mode 100644 pkg/rpcsup/rpcsup.go create mode 100644 pkg/swarm/option.go create mode 100644 pkg/swarm/scheduler.go create mode 100644 pkg/swarm/swarm.go create mode 100644 pkg/swarm/swarm_test.go create mode 100644 services/uniss/communicate.go create mode 100644 services/uniss/datacenter.go create mode 100644 services/uniss/httpclienthandle.go delete mode 100644 services/uniss/httphandle.go create mode 100644 services/uniss/httpserverhandle.go diff --git a/config/config.yaml b/config/config.yaml index ce7d531..2ecf004 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -2,7 +2,7 @@ base: name: "avcnet_dash" port: 8080 mode: "dev" - version: "v0.0.1" + version: "v0.1.0" log: level: "debug" @@ -14,7 +14,7 @@ log: database: # sqlite只需要指定db_name即可 type: "sqlite" - db_name: "/tmp/data/sqlite/avcnet.db" # 如果是sqlite需要指定完整路径和文件名称 + db_name: "/home/zs/tftpboot/cache/data/sqlite/avcnet.db" # 如果是sqlite需要指定完整路径和文件名称 # 其他远程数据库需要指定下面的内容 user: "root" password: "password" @@ -25,8 +25,12 @@ database: unis: rpc: type: "json" + network: "tcp" host: "" - port: 5501 + base_port: 5500 + httpc: + clients: 5 + instances: 2 # 使用令牌桶限流 rate: diff --git a/controller/unisc/httphandle.go b/controller/unisc/httphandle.go index 84d7af6..f2e0b5c 100644 --- a/controller/unisc/httphandle.go +++ b/controller/unisc/httphandle.go @@ -1,23 +1,25 @@ package unisc import ( - "dashboard/dao/dadis/unisd" "dashboard/models" + "dashboard/utils" "net/http" - "reflect" + "sync" "time" "github.com/gin-gonic/gin" ) type httpHandle struct { - httpC chan *models.UnisHttpRequest + httpC []chan *models.UnisHttpRequest + pool sync.Pool } -func newhttpHandle(httpc chan *models.UnisHttpRequest) *httpHandle { +func newhttpHandle(httpc []chan *models.UnisHttpRequest) *httpHandle { res := new(httpHandle) res.httpC = httpc + res.pool.New = func() any { return new(models.UnisHttpRequest) } return res } @@ -29,44 +31,59 @@ func httpSendWithExpire(httpc chan *models.UnisHttpRequest, data *models.UnisHtt select { case httpc <- data: case <-time.After(2 * time.Second): - return nil, models.ErrorTimeOut + return nil, models.ErrorSendTimeOut } select { case resp := <-resc: return resp, nil case <-time.After(2 * time.Second): - return nil, models.ErrorTimeOut + return nil, models.ErrorReciveTimeOut } } func (u *httpHandle) stationConfig(c *gin.Context) error { + log, _ := utils.GetLogFromContext(c) + config := new(models.StationConfigParams) if err := c.ShouldBindJSON(config); err != nil { + log.Sugar().Errorf("stationConfig bind json err: %v", err) return err } - if !config.StationConfig.ConfigType { - res, ok := unisd.UnisDataGet(string(unisd.DadisKey_UnisStationInfo)) - if ok { - oldCfg := res.(*models.StationConfigParams) - - oldCfg.StationConfig.ConfigType = config.StationConfig.ConfigType - if reflect.DeepEqual(config, oldCfg) { - c.JSON(http.StatusOK, models.UnisHttpResponseOk) - - return nil - } - } + channel := c.GetInt(models.GinContextChannel) + if channel > len(u.httpC)-1 { + log.Sugar().Errorf("stationConfig channel err: %d", channel) + return models.ErrorParamsErr } - req := &models.UnisHttpRequest{ - Id: models.UnisHttpUrl(c.Request.URL.String()).GetMsgId(), - Msg: config, - } + // if !config.StationConfig.ConfigType { + // res, ok := unisd.UnisDataGet(unisd.DadisKey_UnisStationInfo.KeyWithChannel(channel)) + // if ok { + // oldCfg := res.(*models.StationConfigParams) - resp, err := httpSendWithExpire(u.httpC, req) + // oldCfg.StationConfig.ConfigType = config.StationConfig.ConfigType + // if reflect.DeepEqual(config, oldCfg) { + // log.Sugar().Infof("stationConfig is same") + // c.JSON(http.StatusOK, models.UnisHttpResponseOk) + + // return nil + // } + // } + // } + + req := u.pool.Get().(*models.UnisHttpRequest) + req.Id = models.UnisHttpUrl(c.Request.URL.String()).GetMsgId() + req.Msg = config + + // req := &models.UnisHttpRequest{ + // Id: models.UnisHttpUrl(c.Request.URL.String()).GetMsgId(), + // Msg: config, + // } + + resp, err := httpSendWithExpire(u.httpC[channel], req) if err != nil { + log.Sugar().Errorf("stationConfig send or recive err: %v", err) return err } diff --git a/controller/unisc/httproute.go b/controller/unisc/httproute.go index 07b57cf..0ec8129 100644 --- a/controller/unisc/httproute.go +++ b/controller/unisc/httproute.go @@ -10,6 +10,7 @@ import ( "io" "net/http" "strconv" + "time" "github.com/gin-gonic/gin" ) @@ -19,20 +20,29 @@ type HttpRoute struct { middleWare *middleWare } -func NewHttpRoute(httpc chan *models.UnisHttpRequest) *HttpRoute { +type UnisHttpChanneler interface { + GetHttpChannel() chan *models.UnisHttpRequest +} + +func NewHttpRoute(httpc ...UnisHttpChanneler) *HttpRoute { res := new(HttpRoute) - res.httpHandle = newhttpHandle(httpc) + var httpC []chan *models.UnisHttpRequest + for _, cha := range httpc { + httpC = append(httpC, cha.GetHttpChannel()) + } + + res.httpHandle = newhttpHandle(httpC) res.middleWare = newmiddleWare() return res } func (u *HttpRoute) AddRoute(r *gin.Engine) { - unisr := r.Group("/api/unis") - unisr.Use(routes.ErrWapper(u.middleWare.GinCheckServerId), routes.ErrWapper(u.middleWare.GinStoreRequest)) + unisr := r.Group(models.UnisHttpUrlPrefix) + unisr.Use(routes.ErrWapper(u.middleWare.ginCheckServerId), routes.ErrWapper(u.middleWare.ginStoreRequest)) { - unisr.POST("/config/v1/add", routes.ErrWapper(u.httpHandle.stationConfig)) + unisr.POST(models.UNIS_HTTP_URL_CONFIG_ADD.Url(), routes.ErrWapper(u.httpHandle.stationConfig)) } } @@ -43,7 +53,7 @@ func newmiddleWare() *middleWare { return new(middleWare) } -func (m *middleWare) GinCheckServerId(c *gin.Context) error { +func (m *middleWare) ginCheckServerId(c *gin.Context) error { serverId := c.GetHeader("serverId") if serverId != serverId { @@ -71,14 +81,30 @@ func (m *middleWare) GinCheckServerId(c *gin.Context) error { return nil } -func (m *middleWare) GinStoreRequest(c *gin.Context) (err error) { +// bodyLogWriter 自定义响应写入器,用于捕获响应体 +type bodyLogWriter struct { + gin.ResponseWriter + body *bytes.Buffer +} + +// Write 实现写入接口,同时写入缓冲区和原始响应 +func (w *bodyLogWriter) Write(b []byte) (int, error) { + w.body.Write(b) + return w.ResponseWriter.Write(b) +} + +// WriteString 实现字符串写入接口(优化性能) +func (w *bodyLogWriter) WriteString(s string) (int, error) { + w.body.WriteString(s) + return w.ResponseWriter.WriteString(s) +} + +func (m *middleWare) ginStoreRequest(c *gin.Context) (err error) { defer func() { if err != nil { c.Abort() return } - - c.Next() }() log, _ := utils.GetLogFromContext(c) @@ -87,18 +113,39 @@ func (m *middleWare) GinStoreRequest(c *gin.Context) (err error) { oldConf, _err := sqldb.ResFulDataGet(c.Request.Method, c.Request.URL.String(), channel) if _err != nil { err = _err + return } bodyBytes, _err := io.ReadAll(c.Request.Body) if _err != nil { err = _err + return } c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + // 创建自定义响应写入器 + blw := &bodyLogWriter{ + body: bytes.NewBufferString(""), + ResponseWriter: c.Writer, + } + c.Writer = blw + + c.Next() + + // 获取响应信息 + // statusCode := c.Writer.Status() + responseBody := blw.body.String() + + // // 安全日志记录:限制大响应体 + // logBody := responseBody + // if len(responseBody) > 1000 { + // logBody = responseBody[:1000] + "...[TRUNCATED]" + // } + if oldConf != string(bodyBytes) { - log.Sugar().Info("The resful request is not equel local store") - if _err := sqldb.ResFulDataStore(c.Request.Method, c.Request.URL.String(), string(bodyBytes), channel); err != nil { + log.Sugar().Info("The resful request is not equel local store so will save to database...") + if _err := sqldb.ResFulDataStore(c.Request.Method, c.Request.URL.String(), responseBody, string(bodyBytes), channel); _err != nil { err = _err } } @@ -106,14 +153,14 @@ func (m *middleWare) GinStoreRequest(c *gin.Context) (err error) { return nil } -func (m *middleWare) GinWithUnisObject(unis *uniss.UnisStation) gin.HandlerFunc { +func (m *middleWare) ginWithUnisObject(unis *uniss.UnisStation) gin.HandlerFunc { return func(c *gin.Context) { c.Set(models.GinContextUnis, unis) c.Next() } } -func (m *middleWare) GetUnisObjectFromContext(c *gin.Context) (*uniss.UnisStation, error) { +func (m *middleWare) getUnisObjectFromContext(c *gin.Context) (*uniss.UnisStation, error) { res, ok := c.Get(models.GinContextUnis) if !ok { return nil, models.ErrorInvalidData @@ -126,3 +173,10 @@ func (m *middleWare) GetUnisObjectFromContext(c *gin.Context) (*uniss.UnisStatio return unis, nil } + +func (m *middleWare) ginWithExpireTime(expire time.Duration) gin.HandlerFunc { + return func(c *gin.Context) { + c.Set(models.GinContextExpire, expire) + c.Next() + } +} diff --git a/dao/dadis/unisd/type.go b/dao/dadis/unisd/type.go index b77ebfb..39736c4 100644 --- a/dao/dadis/unisd/type.go +++ b/dao/dadis/unisd/type.go @@ -1,8 +1,14 @@ package unisd +import "fmt" + type DadisKey string +func (d DadisKey) KeyWithChannel(channel int) string { + return fmt.Sprintf("%d:%s", channel, string(d)) +} + const ( DadisKey_UnisSystemInfo DadisKey = "UnisSystemInfo" DadisKey_UnisStationInfo DadisKey = "UnisStationInfo" -) \ No newline at end of file +) diff --git a/dao/sqldb/sqlite/tablecreate.go b/dao/sqldb/sqlite/tablecreate.go index d44b859..0431c9d 100644 --- a/dao/sqldb/sqlite/tablecreate.go +++ b/dao/sqldb/sqlite/tablecreate.go @@ -6,13 +6,14 @@ const ( createUserSql = `CREATE TABLE IF NOT EXISTS db_unis_aisub (userid TEXT NOT NULL,is_sub INTEGER,CONSTRAINT db_unis_aisub_pk PRIMARY KEY (userid));` createResStoreSql = `CREATE TABLE IF NOT EXISTS db_unis_res_store ( - key TEXT NOT NULL, - value TEXT, + url TEXT NOT NULL, + body TEXT, + response TEXT, channel INTEGER, methord TEXT, created_at TEXT NOT NULL DEFAULT (DATETIME('now', 'localtime')), updated_at TEXT NOT NULL DEFAULT (DATETIME('now', 'localtime')), - CONSTRAINT db_unis_res_pk PRIMARY KEY (key,channel,methord) + CONSTRAINT db_unis_res_pk PRIMARY KEY (url,channel,methord) ); CREATE TRIGGER IF NOT EXISTS trigger_db_unis_res_store_updated_at AFTER UPDATE ON db_unis_res_store diff --git a/dao/sqldb/unisresfulstore.go b/dao/sqldb/unisresfulstore.go index de38d35..05fcd08 100644 --- a/dao/sqldb/unisresfulstore.go +++ b/dao/sqldb/unisresfulstore.go @@ -5,10 +5,10 @@ import ( "errors" ) -func ResFulDataStore(methord, key, value string, channel int) error { - sqlStr := `INSERT OR REPLACE INTO db_unis_res_store(key,value,channel,methord) VALUES(?,?,?,?)` +func ResFulDataStore(methord, url, response, body string, channel int) error { + sqlStr := `INSERT OR REPLACE INTO db_unis_res_store(url,body,channel,methord,response) VALUES(?,?,?,?,?)` - res, err := db.Exec(sqlStr, key, value, channel,methord) + res, err := db.Exec(sqlStr, url, body, channel, methord, response) if err != nil { return err } @@ -25,11 +25,11 @@ func ResFulDataStore(methord, key, value string, channel int) error { return nil } -func ResFulDataGet(methord, key string, channel int) (string, error) { - sqlStr := `SELECT value FROM db_unis_res_store WHERE key=? AND channel=? AND methord=?` +func ResFulDataGet(methord, url string, channel int) (string, error) { + sqlStr := `SELECT body FROM db_unis_res_store WHERE url=? AND channel=? AND methord=?` var res string - if err := db.Get(&res, sqlStr, key, channel, methord); err != nil { + if err := db.Get(&res, sqlStr, url, channel, methord); err != nil { if errors.Is(err, sql.ErrNoRows) { return "", nil } diff --git a/main.go b/main.go index 609a7bd..8fd78cb 100644 --- a/main.go +++ b/main.go @@ -11,14 +11,25 @@ import ( "dashboard/settings" "flag" "fmt" + + "github.com/fsnotify/fsnotify" ) var config = flag.String("f", "./config/config.yaml", "config file path") +func callBack(e fsnotify.Event) { + // os.Exit(0) + // fmt.Println(e, os.Args) + + // out, err := exec.Command("go", "run", os.Args[0]).Output() + + // fmt.Println(out, err) +} + func main() { flag.Parse() - sets := settings.New(settings.WithName(*config)) + sets := settings.New(settings.WithName(*config), settings.WithCallBack(callBack)) log, err := logger.New( logger.WithFileName(sets.LogConfig.Filename), @@ -36,29 +47,42 @@ func main() { _ = log.Sync() }() - log.Info("Settings and log init ok") + log.Info("Settings and log init ok:)") db, err := sqldb.NewDb(sets.SqlConfig) if err != nil { log.Sugar().Panicf("New db error: %v", err) } + defer func() { + _ = db.GetDb().Close() + }() err = sqldb.SqlDbInit(log, db) if err != nil { log.Sugar().Panicf("New db error: %v", err) } - errwg := errgroups.NewErrGroup() - // r := routes.Setup(log, *sets.RateLimitConfig, *sets.JwtConfig) // listenAndServe(fmt.Sprintf(":%d", sets.BaseConfig.Port), r, log) - sunis := uniss.NewUnis(log, sets.UnisConfig) + var sunisa []errgroups.FuncWithErr + var route []unisc.UnisHttpChanneler + for index := range sets.UnisConfig.Instances { + sets.UnisConfig.Instances = index + unis, err := uniss.NewUnis(log, sets.UnisConfig) + if err != nil { + panic(err) + } + sunisa = append(sunisa, unis) + route = append(route, unis) + } + + errwg := errgroups.NewErrGroup() errwg.Add(routes.NewRouter(fmt.Sprintf(":%d", sets.BaseConfig.Port), log, *sets.RateLimitConfig, *sets.JwtConfig, - unisc.NewHttpRoute(sunis.GetHttpChannel()))) - errwg.Add(sunis) + unisc.NewHttpRoute(route...))) + errwg.Add(sunisa...) log.Sugar().Fatalln(errwg.Run(context.Background())) } diff --git a/models/type.go b/models/type.go index bbc8af1..9850fdc 100644 --- a/models/type.go +++ b/models/type.go @@ -10,6 +10,7 @@ const ( GinContextChannel = "channel" GinContextServerId = "serverid" GinContextUnis = "unis" + GinContextExpire = "expire" ) const ( @@ -18,10 +19,12 @@ const ( ) var ( - ErrorInvalidData = errors.New("no such value") - ErrorPasswordErr = errors.New("user or password invalid") - ErrorSqlInitErr = errors.New("database init err") - ErrorTimeOut = errors.New("time out") + ErrorInvalidData = errors.New("no such value") + ErrorPasswordErr = errors.New("user or password invalid") + ErrorSqlInitErr = errors.New("database init err") + ErrorSendTimeOut = errors.New("send time out") + ErrorReciveTimeOut = errors.New("recive time out") + ErrorParamsErr = errors.New("invalid params") ) const ( diff --git a/models/unishttpparams.go b/models/unishttpparams.go index d68d695..0c4e41f 100644 --- a/models/unishttpparams.go +++ b/models/unishttpparams.go @@ -1,9 +1,12 @@ package models +import "strings" + type UnisHttpRequest struct { - ResC chan *UnisHttpResponse - Id string - Msg interface{} + ResC chan *UnisHttpResponse + Channel int + Id string + Msg interface{} } type UnisHttpResponse struct { @@ -12,8 +15,28 @@ type UnisHttpResponse struct { Data interface{} } +type UnisHttpClientRequest struct { + Url string + Methord string + Id string + Msg interface{} +} + +type UnisHttpClientResponse struct { + Url string + Methord string + Id string + Msg interface{} +} + type UnisHttpUrl string +const UnisHttpUrlPrefix = "/api/unis" + +func (u UnisHttpUrl) Url() string { + return strings.TrimPrefix(string(u), UnisHttpUrlPrefix) +} + const ( UNIS_HTTP_URL_CONFIG_ADD UnisHttpUrl = "/api/unis/config/v1/add" ) diff --git a/models/unisrpcparams.go b/models/unisrpcparams.go index 34c5161..4e9235f 100644 --- a/models/unisrpcparams.go +++ b/models/unisrpcparams.go @@ -2,22 +2,22 @@ package models import "fmt" -const rpcNamePrefix = "UnisRpcService" +const rpcNamePrefix = "RpcServerStub" type UnisRpcMethod string -func (m UnisRpcMethod)Methord()string{ - return fmt.Sprintf("%s.%s",rpcNamePrefix,string(m)) +func (m UnisRpcMethod) Methord() string { + return fmt.Sprintf("%s.%s", rpcNamePrefix, string(m)) } -const( +const ( UnisStationConfig UnisRpcMethod = "Config" ) -type UnisRpcRequest struct{ +type UnisRpcRequest struct { Id string } -type UnisRpcResponse struct{ +type UnisRpcResponse struct { Id string -} \ No newline at end of file +} diff --git a/pkg/dadis/dadis.go b/pkg/dadis/dadis.go index 82a13a2..2e7f4f9 100644 --- a/pkg/dadis/dadis.go +++ b/pkg/dadis/dadis.go @@ -10,7 +10,7 @@ import ( type Dadis struct { mu sync.Mutex store map[string]*item - observe *observe.AsyncEventBus + observe *observe.SyncEventBus } type item struct { @@ -21,7 +21,7 @@ type item struct { func NewDadis() *Dadis { return &Dadis{ store: make(map[string]*item), - observe: observe.NewAsyncEventBus(), + observe: observe.NewSyncEventBus(), } } diff --git a/pkg/errgroups/errgroups.go b/pkg/errgroups/errgroups.go index 87b04d5..cebf35c 100644 --- a/pkg/errgroups/errgroups.go +++ b/pkg/errgroups/errgroups.go @@ -3,9 +3,9 @@ package errgroups import ( "context" "dashboard/utils" - "fmt" "sync" + "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -17,6 +17,7 @@ type ErrGroupFunc struct { type FuncWithErrFunc func(context.Context) error +// 注意使用func时,如果想要对数组对象执行多个函数,不可以使用此种方法,因为函数名都是一样的,map里面会被替换。此种场景需要使用接口的方式。 func NewErrGroupFunc() *ErrGroupFunc { return &ErrGroupFunc{ funs: make(map[string]FuncWithErrFunc), @@ -33,8 +34,8 @@ func (e *ErrGroupFunc) Run(ctx context.Context) error { fn := fn name := name e.wg.Go(func() error { - defer fmt.Printf("func: %s stop\n", name) - fmt.Printf("func: %s runing...\n", name) + defer zap.L().Sugar().Warnf("func: %s stop", name) + zap.L().Sugar().Infof("func: %s runing...", name) return fn(ctx) }) } @@ -46,22 +47,24 @@ func (e *ErrGroupFunc) Run(ctx context.Context) error { return nil } -func (e *ErrGroupFunc) Add(fn FuncWithErrFunc) { +func (e *ErrGroupFunc) Add(fn ...FuncWithErrFunc) { e.mu.Lock() defer e.mu.Unlock() - fnStr := utils.GetFuncName(fn) - - e.funs[fnStr] = fn + for _, f := range fn { + fnStr := utils.GetFuncName(f) + e.funs[fnStr] = f + } } -func (e *ErrGroupFunc) Del(fn FuncWithErrFunc) { +func (e *ErrGroupFunc) Del(fn ...FuncWithErrFunc) { e.mu.Lock() defer e.mu.Unlock() - fnStr := utils.GetFuncName(fn) - - delete(e.funs, fnStr) + for _, f := range fn { + fnStr := utils.GetFuncName(f) + delete(e.funs, fnStr) + } } type ErrGroup struct { @@ -87,8 +90,11 @@ func (e *ErrGroup) Run(ctx context.Context) error { e.wg, ctx = errgroup.WithContext(ctx) for fn := range e.funs { - fn := fn + fn := fn // shadow + name := utils.GetInterfaceName(fn) e.wg.Go(func() error { + defer zap.L().Sugar().Warnf("Object: %s stop", name) + zap.L().Sugar().Infof("Object: %s runing...", name) return fn.Run(ctx) }) } @@ -100,16 +106,20 @@ func (e *ErrGroup) Run(ctx context.Context) error { return nil } -func (e *ErrGroup) Add(fn FuncWithErr) { +func (e *ErrGroup) Add(fn ...FuncWithErr) { e.mu.Lock() defer e.mu.Unlock() - e.funs[fn] = struct{}{} + for _, f := range fn { + e.funs[f] = struct{}{} + } } -func (e *ErrGroup) Del(fn FuncWithErr) { +func (e *ErrGroup) Del(fn ...FuncWithErr) { e.mu.Lock() defer e.mu.Unlock() - delete(e.funs, fn) + for _, f := range fn { + delete(e.funs, f) + } } diff --git a/pkg/rpcsup/client.go b/pkg/rpcsup/client.go new file mode 100644 index 0000000..bf6300f --- /dev/null +++ b/pkg/rpcsup/client.go @@ -0,0 +1,26 @@ +package rpcsup + +import ( + "errors" + "net" + "net/rpc" + "net/rpc/jsonrpc" + "strings" +) + +func RpcClient(conf *RpcConfig) (*rpc.Client, error) { + if strings.Contains(conf.Typec, "json") { + dial, err := net.Dial(conf.Network, conf.Address) + if err != nil { + return nil, err + } + + return jsonrpc.NewClient(dial), nil + } + + if strings.Contains(conf.Typec, "grpc") { + return nil, errors.New("currently not supported") + } + + return rpc.Dial(conf.Network, conf.Address) +} diff --git a/pkg/rpcsup/jsonrpc.go b/pkg/rpcsup/jsonrpc.go deleted file mode 100644 index c59eb31..0000000 --- a/pkg/rpcsup/jsonrpc.go +++ /dev/null @@ -1,74 +0,0 @@ -package rpcsup - -import ( - "context" - "dashboard/logger" - "errors" - "net" - "net/rpc" - "net/rpc/jsonrpc" - - "golang.org/x/sync/errgroup" -) - -func JsonServer(ctx context.Context, log *logger.Logger, address string, service ...interface{}) error { - for _, re := range service { - if err := rpc.Register(re); err != nil { - log.Sugar().Error(err) - return err - } - } - - listen, err := net.Listen("tcp", address) - if err != nil { - log.Sugar().Error(err) - return err - } - - log.Sugar().Infof("Rpc server listen on: %s", address) - - wg, ctxx := errgroup.WithContext(ctx) - - wg.Go(func() error { - for { - accept, err := listen.Accept() - if err != nil { - if errors.Is(err, net.ErrClosed) { - log.Sugar().Error(err) - - break - } - log.Sugar().Error(err) - continue - } - - go jsonrpc.ServeConn(accept) - } - - return err - }) - - wg.Go(func() error { - <-ctxx.Done() - log.Sugar().Errorf("Unis Rpc Server cancel by ctx") - listen.Close() - return ctxx.Err() - }) - - if err := wg.Wait(); err != nil { - return err - } - - return nil -} - -func JsonClient(log *logger.Logger, address string) (*rpc.Client, error) { - dial, err := net.Dial("tcp", address) - if err != nil { - log.Sugar().Error(err) - - return nil, err - } - - return jsonrpc.NewClient(dial), nil -} diff --git a/pkg/rpcsup/rpcsup.go b/pkg/rpcsup/rpcsup.go new file mode 100644 index 0000000..dc589ab --- /dev/null +++ b/pkg/rpcsup/rpcsup.go @@ -0,0 +1,103 @@ +package rpcsup + +import ( + "context" + "errors" + "net" + "net/rpc" + "net/rpc/jsonrpc" + "strings" + + "golang.org/x/sync/errgroup" +) + +type RpcServicer interface { + Name() string +} + +type RpcService struct { + rpc *rpc.Server + listen net.Listener + handler func(net.Conn) +} + +type RpcConfig struct { + Typec string + Network string + Address string +} + +func NewRpcService(conf *RpcConfig) (res *RpcService, err error) { + res = new(RpcService) + + res.rpc = rpc.NewServer() + res.listen, err = net.Listen(conf.Network, conf.Address) + if err != nil { + return + } + + if strings.Contains(conf.Typec, "json") { + res.handler = func(c net.Conn) { + res.rpc.ServeCodec(jsonrpc.NewServerCodec(c)) + } + + return + } + + if strings.Contains(conf.Typec, "grpc") { + return nil, errors.New("currently not supported") + } + + res.handler = func(c net.Conn) { + res.rpc.ServeConn(c) + } + + return +} + +func (r *RpcService) Register(service ...interface{}) error { + for _, ser := range service { + if nser, ok := ser.(RpcServicer); ok { + if err := r.rpc.RegisterName(nser.Name(), nser); err != nil { + return err + } + continue + } + + if err := r.rpc.Register(ser); err != nil { + return err + } + } + + return nil +} + +func (r *RpcService) Run(ctx context.Context) error { + wg, ctxx := errgroup.WithContext(ctx) + + wg.Go(func() error { + for { + accept, err := r.listen.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + return err + } + continue + } + + go r.handler(accept) + } + }) + + wg.Go(func() error { + <-ctxx.Done() + r.listen.Close() + return ctxx.Err() + }) + + if err := wg.Wait(); err != nil { + return err + } + + return nil +} diff --git a/pkg/swarm/option.go b/pkg/swarm/option.go new file mode 100644 index 0000000..b9474ed --- /dev/null +++ b/pkg/swarm/option.go @@ -0,0 +1,22 @@ +package swarm + +import "log" + +type options struct { + log *log.Logger +} + +type Option func(*options) + +func WithLog(log *log.Logger) Option { + return func(o *options) { + o.log = log + } +} + +func(o *options)repire(){ + if o.log == nil{ + o.log = log.Default() + } +} + diff --git a/pkg/swarm/scheduler.go b/pkg/swarm/scheduler.go new file mode 100644 index 0000000..fbf996c --- /dev/null +++ b/pkg/swarm/scheduler.go @@ -0,0 +1,69 @@ +package swarm + +import ( + "context" + "log" +) + +type defaultScheduler[T any] struct { + worker chan chan T + request chan T + log *log.Logger + calFn context.CancelFunc + ctx context.Context +} + +func newdefaultScheduler[T any](log *log.Logger) *defaultScheduler[T] { + res := new(defaultScheduler[T]) + + res.worker = make(chan chan T) + res.request = make(chan T) + res.log = log + res.ctx, res.calFn = context.WithCancel(context.TODO()) + + return res +} + +func (s *defaultScheduler[T]) Submit(req T) { + s.request <- req +} + +func (s *defaultScheduler[T]) Done(worker chan T) { + s.worker <- worker +} + +func (s *defaultScheduler[T]) GenChannel() chan T { + return make(chan T) +} + +func (s *defaultScheduler[T]) Stop() { + s.calFn() +} + +func (s *defaultScheduler[T]) Run() error { + var workers []chan T + var requests []T + + for { + var work chan T + var req T + + if len(workers) > 0 && len(requests) > 0 { + work = workers[0] + req = requests[0] + } + + select { + case <-s.ctx.Done(): + s.log.Printf("%sScheduler: %p down...", logPrefix, s) + return s.ctx.Err() + case w := <-s.worker: + workers = append(workers, w) + case r := <-s.request: + requests = append(requests, r) + case work <- req: + workers = workers[1:] + requests = requests[1:] + } + } +} diff --git a/pkg/swarm/swarm.go b/pkg/swarm/swarm.go new file mode 100644 index 0000000..021a288 --- /dev/null +++ b/pkg/swarm/swarm.go @@ -0,0 +1,117 @@ +package swarm + +import ( + "context" + "dashboard/pkg/errgroups" + "log" + + "golang.org/x/sync/errgroup" +) + +const logPrefix = "[swarm] " + +type Swarm[T any] struct { + scheduler Scheduler[T] + bees []*beeJob[T] + opts options +} + +func NewSwarm[T any](num int, work func(T) error, opts ...Option) *Swarm[T] { + res := new(Swarm[T]) + + for _, o := range opts { + o(&res.opts) + } + + res.opts.repire() + + res.scheduler = newdefaultScheduler[T](res.opts.log) + + for index := range num { + res.bees = append(res.bees, &beeJob[T]{ + done: func(c chan T) { res.scheduler.Done(c) }, + wc: res.scheduler.GenChannel(), + do: work, + index: index, + log: res.opts.log, + }) + } + + res.opts.log.Printf("%sStart bees number: %d.", logPrefix, len(res.bees)) + + return res +} + +type beeJob[T any] struct { + done func(chan T) + wc chan T + do func(T) error + index int + log *log.Logger +} + +func (b *beeJob[T]) Run(ctx context.Context) error { + for { + b.done(b.wc) + + select { + case <-ctx.Done(): + b.log.Printf("Bee num: %d,down\n", b.index) + return ctx.Err() + case req := <-b.wc: + if err := b.do(req); err != nil { + b.log.Printf("%sBee num: %d, do the work error: %v", logPrefix, b.index, err) + continue + } + + b.log.Printf("%sBee num: %d, do the work success:)", logPrefix, b.index) + } + } +} + +type Scheduler[T any] interface { + Run() error + Submit(T) + Done(chan T) + GenChannel() chan T + Stop() +} + +func (s *Swarm[T]) Submit(sub T) { + s.scheduler.Submit(sub) +} + +// implement FuncWithErr interface +func (s *Swarm[T]) Run(ctx context.Context) error { + wgs, ctx := errgroup.WithContext(ctx) + + wgs.Go(func() error { + defer s.scheduler.Stop() + + ewgs := errgroups.NewErrGroup() + + // // 可以保证bee结束,但是不能保证scheduler正常退出 + // schectx, cancel := context.WithCancel(context.TODO()) + // defer cancel() + + // go s.scheduler.Run(schectx) + + // ewgs.Add(s.scheduler) // 结束的比bee早,导致bee可能会被永久阻塞,导致进程不能退出。 + + for _, bee := range s.bees { + ewgs.Add(bee) + } + + return ewgs.Run(ctx) + }) + + wgs.Go(func() error { + return s.scheduler.Run() + }) + + if err := wgs.Wait(); err != nil { + return err + } + + return nil +} diff --git a/pkg/swarm/swarm_test.go b/pkg/swarm/swarm_test.go new file mode 100644 index 0000000..729366b --- /dev/null +++ b/pkg/swarm/swarm_test.go @@ -0,0 +1,32 @@ +package swarm + +import ( + "context" + "fmt" + "testing" + "time" +) + +func testwork(req string) error { + fmt.Println(req) + + return nil +} + +func Test_swarm(t *testing.T) { + swarm := NewSwarm(10, testwork) + + for range 10 { + go func() { + for { + swarm.scheduler.Submit("nihao") + + time.Sleep(time.Second) + } + }() + } + + ctx, calFn := context.WithTimeout(context.Background(), 5*time.Second) + defer calFn() + swarm.Run(ctx) +} diff --git a/services/uniss/common.go b/services/uniss/common.go index 560cbb9..b4dd28f 100644 --- a/services/uniss/common.go +++ b/services/uniss/common.go @@ -4,21 +4,24 @@ import ( "dashboard/dao/dadis/unisd" "dashboard/models" "fmt" + "net/http" ) type commonService struct { - rpcClients *rpcClients + comm *communicateService } -func newcommonService(http *httpHandle) *commonService { +func newcommonService(comm *communicateService) *commonService { res := new(commonService) - http.pushHandle(string(models.UNIS_HTTP_ID_CONFIG_ADD), res.stationConfig) + res.comm = comm + + res.comm.httpServerHandler.pushHandle(string(models.UNIS_HTTP_ID_CONFIG_ADD), res.stationConfig) return res } -func (c *commonService) stationConfig(reqest *models.UnisHttpRequest) (*models.UnisHttpResponse, error) { +func (c *commonService) stationConfig(reqest *models.UnisHttpRequest, respons *models.UnisHttpResponse) error { req := reqest.Msg.(*models.StationConfigParams) fmt.Println(req) @@ -32,16 +35,23 @@ func (c *commonService) stationConfig(reqest *models.UnisHttpRequest) (*models.U addres = append(addres, add) } - c.rpcClients.flushRpcClients(addres) + c.comm.rpcClientStub.flushRpcClients(addres) - fmt.Println(c.rpcClients.getConfig(addres[0].id, models.UnisRpcRequest{Id: "zhangshuo"})) + fmt.Println(c.comm.rpcClientStub.getConfig(addres[0].id, models.UnisRpcRequest{Id: "zhangshuo"})) - res := &models.UnisHttpResponse{ - Code: int(models.CodeSuccess), - Msg: "config ok", - } + // res := &models.UnisHttpResponse{ + // Code: int(models.CodeSuccess), + // Msg: "config ok", + // } + respons.Code = int(models.CodeSuccess) + respons.Msg = "config ok" - unisd.UnisDataSet(string(unisd.DadisKey_UnisStationInfo), req, 0) + unisd.UnisDataSet(unisd.DadisKey_UnisStationInfo.KeyWithChannel(reqest.Channel), req, 0) - return res, nil + c.comm.httpClientHandler.curl(&models.UnisHttpClientRequest{ + Url: "http://192.168.177.7:8080/api/unis/source-list/v1/update", + Methord: http.MethodGet, + }) + + return nil } diff --git a/services/uniss/communicate.go b/services/uniss/communicate.go new file mode 100644 index 0000000..4a0a91d --- /dev/null +++ b/services/uniss/communicate.go @@ -0,0 +1,54 @@ +package uniss + +import ( + "context" + "dashboard/pkg/errgroups" + "dashboard/pkg/rpcsup" + "dashboard/settings" + "fmt" +) + +type communicateService struct { + rpcClientStub *rpcClientStub + rpcService *rpcsup.RpcService + httpServerHandler *httpServerHandler + httpClientHandler *httpClientHandler +} + +func newcommunicateService(conf *settings.UnisConfig) (res *communicateService, err error) { + res = new(communicateService) + + var rpcConf = &rpcsup.RpcConfig{ + Typec: conf.RpcConfig.Type, + Network: conf.RpcConfig.Network, + Address: fmt.Sprintf("%s:%d", conf.RpcConfig.Host, conf.RpcConfig.BasePort+conf.Instances)} + + res.rpcService, err = rpcsup.NewRpcService(rpcConf) + if err != nil { + log.Sugar().Errorf("New rpc service error: %v", err) + return + } + log.Sugar().Infof("New rpc service ok type: %s, network: %s, address: %s", rpcConf.Typec, rpcConf.Network, rpcConf.Address) + + err = res.rpcService.Register(newrpcServerStub()) + if err != nil { + log.Sugar().Errorf("Rpc service register error: %v", err) + return + } + log.Sugar().Infof("Rpc service register all ok") + + res.httpServerHandler = newhttpServerHandler() + res.rpcClientStub = newRpcClients(conf.RpcConfig) + res.httpClientHandler = newhttpClientHandle(conf.HttpClientConfig.Clients) + + return +} + +func (c *communicateService) Run(ctx context.Context) error { + ewgs := errgroups.NewErrGroup() + + ewgs.Add(c.rpcService) + ewgs.Add(c.httpClientHandler.swarm) + + return ewgs.Run(ctx) +} diff --git a/services/uniss/datacenter.go b/services/uniss/datacenter.go new file mode 100644 index 0000000..1c92a98 --- /dev/null +++ b/services/uniss/datacenter.go @@ -0,0 +1,20 @@ +package uniss + +import ( + "dashboard/pkg/dadis" + "dashboard/settings" +) + +type dataCenterService struct { + conf *settings.UnisConfig + dataMap *dadis.Dadis +} + +func newDataCenterService(conf *settings.UnisConfig) *dataCenterService { + res := new(dataCenterService) + + res.conf = conf + res.dataMap = dadis.NewDadis() + + return res +} diff --git a/services/uniss/httpclienthandle.go b/services/uniss/httpclienthandle.go new file mode 100644 index 0000000..188107d --- /dev/null +++ b/services/uniss/httpclienthandle.go @@ -0,0 +1,72 @@ +package uniss + +import ( + "bytes" + "dashboard/dao/sqldb" + "dashboard/models" + "dashboard/pkg/swarm" + "encoding/json" + "fmt" + "io" + "net/http" + + "go.uber.org/zap" +) + +type httpClientHandler struct { + swarm *swarm.Swarm[*models.UnisHttpClientRequest] + httpC chan *models.UnisHttpClientResponse +} + +func newhttpClientHandle(clients int) *httpClientHandler { + res := new(httpClientHandler) + res.swarm = swarm.NewSwarm(clients, res.doRequest, swarm.WithLog(zap.NewStdLog(log.Logger))) + res.httpC = make(chan *models.UnisHttpClientResponse) + + return res +} + +func (c *httpClientHandler) curl(msg *models.UnisHttpClientRequest) { + c.swarm.Submit(msg) +} + +func (c *httpClientHandler) doRequest(req *models.UnisHttpClientRequest) error { + body, err := json.Marshal(req.Msg) + fmt.Println(string(body), err) + + reqs, err := http.NewRequest(req.Methord, req.Url, bytes.NewReader(body)) + if err != nil { + fmt.Println(err) + return err + } + + reqs.Header.Add("Content-type", "application/json;charset=utf-8") + + client := http.Client{} + resp, err := client.Do(reqs) + if err != nil { + fmt.Println(err) + return err + } + defer resp.Body.Close() + + msg, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + + if err := sqldb.ResFulDataStore(req.Methord, req.Url, string(body), string(msg), 0); err != nil { + fmt.Println(err) + return err + } + + response := &models.UnisHttpClientResponse{ + Id: req.Id, + Methord: req.Methord, + Msg: msg, + } + + c.httpC <- response + + return nil +} diff --git a/services/uniss/httphandle.go b/services/uniss/httphandle.go deleted file mode 100644 index 1566ad6..0000000 --- a/services/uniss/httphandle.go +++ /dev/null @@ -1,66 +0,0 @@ -package uniss - -import ( - "dashboard/models" - "sync" -) - -type httpHandle struct { - httpC chan *models.UnisHttpRequest - mu sync.RWMutex - mapFn map[string]HttpHandler -} - -type HttpHandler func(*models.UnisHttpRequest) (*models.UnisHttpResponse, error) - -func newhttpHandle() *httpHandle { - res := new(httpHandle) - - res.httpC = make(chan *models.UnisHttpRequest) - res.mapFn = make(map[string]HttpHandler) - - return res -} - -func (u *httpHandle) httpHandle(msg *models.UnisHttpRequest) { - var res *models.UnisHttpResponse - resC := msg.ResC - defer func() { - if resC != nil { - resC <- res - } - }() - - fn, ok := u.getHandle(msg.Id) - if ok { - if re, err := fn(msg); err == nil { - res = re - } - } -} - -func (u *httpHandle) pushHandle(key string, handle HttpHandler) { - u.mu.Lock() - defer u.mu.Unlock() - - u.mapFn[key] = handle -} - -func (u *httpHandle) delHandle(key string) { - u.mu.Lock() - defer u.mu.Unlock() - - delete(u.mapFn, key) -} - -func (u *httpHandle) getHandle(key string) (HttpHandler, bool) { - u.mu.RLock() - defer u.mu.RUnlock() - - res, ok := u.mapFn[key] - if !ok { - return nil, false - } - - return res, ok -} diff --git a/services/uniss/httpserverhandle.go b/services/uniss/httpserverhandle.go new file mode 100644 index 0000000..95bb81a --- /dev/null +++ b/services/uniss/httpserverhandle.go @@ -0,0 +1,68 @@ +package uniss + +import ( + "dashboard/models" + "sync" +) + +type httpServerHandler struct { + httpC chan *models.UnisHttpRequest + mu sync.RWMutex + mapFn map[string]HttpServerHandlerFunc + pool sync.Pool +} + +type HttpServerHandlerFunc func(*models.UnisHttpRequest, *models.UnisHttpResponse) error + +func newhttpServerHandler() *httpServerHandler { + res := new(httpServerHandler) + + res.httpC = make(chan *models.UnisHttpRequest) + res.mapFn = make(map[string]HttpServerHandlerFunc) + res.pool.New = func() any { return new(models.UnisHttpResponse) } + + return res +} + +func (u *httpServerHandler) httpHandle(msg *models.UnisHttpRequest) { + res := u.pool.Get().(*models.UnisHttpResponse) + resC := msg.ResC + + defer func() { + if resC != nil { + resC <- res + } + }() + + fn, ok := u.getHandle(msg.Id) + if ok { + if err := fn(msg, res); err == nil { + } + } +} + +func (u *httpServerHandler) pushHandle(key string, handle HttpServerHandlerFunc) { + u.mu.Lock() + defer u.mu.Unlock() + + u.mapFn[key] = handle +} + +func (u *httpServerHandler) delHandle(key string) { + u.mu.Lock() + defer u.mu.Unlock() + + delete(u.mapFn, key) +} + +func (u *httpServerHandler) getHandle(key string) (HttpServerHandlerFunc, bool) { + u.mu.RLock() + defer u.mu.RUnlock() + + res, ok := u.mapFn[key] + if !ok { + return nil, false + } + + return res, ok +} diff --git a/services/uniss/main.go b/services/uniss/main.go index 368f69e..3e4c603 100644 --- a/services/uniss/main.go +++ b/services/uniss/main.go @@ -1,45 +1,54 @@ package uniss +// TODO +// 1.data center object 2.swarm stop after scheduler + import ( "context" "dashboard/logger" "dashboard/models" "dashboard/pkg/errgroups" "dashboard/settings" + "fmt" ) +var log *logger.Logger + type UnisStation struct { - log *logger.Logger - conf *settings.UnisConfig - httpHandle *httpHandle - commonService *commonService + commonService *commonService + communicateService *communicateService + dataCenterService *dataCenterService } -func NewUnis(log *logger.Logger, conf *settings.UnisConfig) *UnisStation { - res := new(UnisStation) +func NewUnis(_log *logger.Logger, conf *settings.UnisConfig) (res *UnisStation, err error) { + res = new(UnisStation) + log = _log - res.log = log - res.conf = conf + res.dataCenterService = newDataCenterService(conf) + res.communicateService, err = newcommunicateService(conf) + if err != nil { + return + } - res.httpHandle = newhttpHandle() - res.commonService = newcommonService(res.httpHandle) - res.commonService.rpcClients = newRpcClients(log, conf.RpcConfig.Port) + res.commonService = newcommonService(res.communicateService) - return res + return } func (u *UnisStation) GetHttpChannel() chan *models.UnisHttpRequest { - return u.httpHandle.httpC + return u.communicateService.httpServerHandler.httpC } -func (u *UnisStation) mesageHandle(ctx context.Context) error { +func (u *UnisStation) mainthread(ctx context.Context) error { for { select { case <-ctx.Done(): - u.log.Error("Unis mesageHandle cancel by ctx") + log.Error("Unis mesageHandle cancel by ctx") return ctx.Err() - case httpMsg := <-u.httpHandle.httpC: - u.httpHandle.httpHandle(httpMsg) + case httpMsg := <-u.communicateService.httpServerHandler.httpC: + u.communicateService.httpServerHandler.httpHandle(httpMsg) + case httpc := <-u.communicateService.httpClientHandler.httpC: + fmt.Println(string(httpc.Msg.([]byte))) // default: } @@ -47,12 +56,10 @@ func (u *UnisStation) mesageHandle(ctx context.Context) error { } func (u *UnisStation) Run(ctx context.Context) error { - rpc := newUnisRpcServer(u.log, u.conf.RpcConfig) - ewgs := errgroups.NewErrGroupFunc() - ewgs.Add(rpc.rpcListenAndServe) - ewgs.Add(u.mesageHandle) + ewgs.Add(u.communicateService.Run) + ewgs.Add(u.mainthread) return ewgs.Run(ctx) } diff --git a/services/uniss/rpcclient.go b/services/uniss/rpcclient.go index 6390cd2..29ceb61 100644 --- a/services/uniss/rpcclient.go +++ b/services/uniss/rpcclient.go @@ -1,39 +1,38 @@ package uniss import ( - "dashboard/logger" "dashboard/models" "dashboard/pkg/rpcsup" + "dashboard/settings" "fmt" "net/rpc" "sync" ) -type rpcClients struct { - *logger.Logger +// Called by someone else +type rpcClientStub struct { mu sync.RWMutex + conf *settings.RpcConfig rpcMapClients map[string]*rpc.Client - port int } type rpcIdAddres struct { - id string - host string - port int + id string + host string + index int } -func newRpcClients(log *logger.Logger, mport int) *rpcClients { - res := new(rpcClients) +func newRpcClients(conf *settings.RpcConfig) *rpcClientStub { + res := new(rpcClientStub) - res.Logger = log - res.port = mport + res.conf = conf res.rpcMapClients = make(map[string]*rpc.Client) return res } -func (r *rpcClients) flushRpcClients(ids []*rpcIdAddres) { +func (r *rpcClientStub) flushRpcClients(ids []*rpcIdAddres) { r.mu.Lock() defer r.mu.Unlock() @@ -44,18 +43,22 @@ func (r *rpcClients) flushRpcClients(ids []*rpcIdAddres) { } for _, idAddr := range ids { - address := fmt.Sprintf("%s:%d", idAddr.host, r.port) + address := fmt.Sprintf("%s:%d", idAddr.host, idAddr.index+r.conf.BasePort) - cli, err := rpcsup.JsonClient(r.Logger, address) + cli, err := rpcsup.RpcClient(&rpcsup.RpcConfig{ + Typec: r.conf.Type, + Network: r.conf.Network, + Address: address, + }) if err != nil { - r.Sugar().Errorf("%s create rpc client error: %s", idAddr.id, address) + log.Sugar().Errorf("%s create rpc client error: %s", idAddr.id, address) continue } r.rpcMapClients[idAddr.id] = cli } } -func (r *rpcClients) getConfig(id string, req models.UnisRpcRequest) (*models.UnisRpcResponse, error) { +func (r *rpcClientStub) getConfig(id string, req models.UnisRpcRequest) (*models.UnisRpcResponse, error) { r.mu.RLock() defer r.mu.RUnlock() diff --git a/services/uniss/rpcservice.go b/services/uniss/rpcservice.go index d7ef948..baf56ec 100644 --- a/services/uniss/rpcservice.go +++ b/services/uniss/rpcservice.go @@ -1,54 +1,27 @@ package uniss import ( - "context" - "dashboard/logger" "dashboard/models" - "dashboard/pkg/rpcsup" - "dashboard/settings" - "fmt" ) -type UnisRpcService struct { - *logger.Logger +// Alarm concurrency +type RpcServerStub struct { } -func NewRpcService(log *logger.Logger) *UnisRpcService { - res := new(UnisRpcService) - - res.Logger = log +func newrpcServerStub() *RpcServerStub { + res := new(RpcServerStub) return res } -func (u *UnisRpcService) Config(res models.UnisRpcRequest, rsp *models.UnisRpcResponse) error { - u.Sugar().Info("rpc server get mesage", res) +// func (u *RpcServerStub) Name() string { +// return u.name +// } + +func (u *RpcServerStub) Config(res models.UnisRpcRequest, rsp *models.UnisRpcResponse) error { + log.Sugar().Info("rpc server get mesage", res) rsp.Id = res.Id return nil } - -type unisRpcServer struct { - log *logger.Logger - port int - host string - typec string -} - -func newUnisRpcServer(log *logger.Logger, conf *settings.RpcConfig) *unisRpcServer { - res := new(unisRpcServer) - res.log = log - - res.port = conf.Port - res.host = conf.Host - res.typec = conf.Type - - return res -} - -func (u *unisRpcServer) rpcListenAndServe(ctx context.Context) error { - addrees := fmt.Sprintf("%s:%d", u.host, u.port) - - return rpcsup.JsonServer(ctx, u.log, addrees, NewRpcService(u.log)) -} diff --git a/settings/type.go b/settings/type.go index 3177a3d..d02471b 100644 --- a/settings/type.go +++ b/settings/type.go @@ -36,13 +36,20 @@ type SqlConfig struct { } type UnisConfig struct { - *RpcConfig `mapstructure:"rpc"` + Instances int `mapstructure:"instances"` + *RpcConfig `mapstructure:"rpc"` + *HttpClientConfig `mapstructure:"httpc"` } type RpcConfig struct { - Port int `mapstructure:"port"` - Host string `mapstructure:"host"` - Type string `mapstructure:"type"` + BasePort int `mapstructure:"base_port"` + Network string `mapstructure:"network"` + Host string `mapstructure:"host"` + Type string `mapstructure:"type"` +} + +type HttpClientConfig struct { + Clients int `mapstructure:"clients"` } type SnowflakeConfig struct { diff --git a/utils/runtime.go b/utils/runtime.go index fe22460..774f209 100644 --- a/utils/runtime.go +++ b/utils/runtime.go @@ -10,3 +10,12 @@ func GetFuncName(fn interface{}) string { return runtime.FuncForPC(ptr).Name() } + +func GetInterfaceName(obj interface{}) string { + t := reflect.TypeOf(obj) + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + + return t.Name() +}