From 784a868d05a70eef279b38bbd4df66b1ea095da2 Mon Sep 17 00:00:00 2001 From: redhat <2292650292@qq.com> Date: Tue, 27 May 2025 19:55:35 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E5=A2=9E=E5=8A=A0=E5=BA=A7=E5=B8=AD?= =?UTF-8?q?=E4=B8=9A=E5=8A=A1=E6=A1=86=E6=9E=B6=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config.yaml | 9 ++- controller/unis/commonser.go | 27 ------- controller/unis/datatype.go | 5 -- controller/unisc/httphandle.go | 76 ++++++++++++++++++ controller/unisc/httproute.go | 128 +++++++++++++++++++++++++++++ dao/dadis/unisd/datacenter.go | 16 ++++ dao/dadis/unisd/type.go | 8 ++ go.mod | 11 +-- go.sum | 22 ++--- main.go | 31 +++---- models/type.go | 2 + models/unishttpparams.go | 35 ++++++++ pkg/errgroups/errgroups.go | 115 ++++++++++++++++++++++++++ pkg/errgroups/errgroups_test.go | 78 ++++++++++++++++++ pkg/rpcsup/jsonrpc.go | 50 +++++++++--- routes/middleware.go | 68 ---------------- routes/routes.go | 138 ++++++++++++++++++++++++++++++-- routes/wrapperr.go | 26 ++++++ services/unis/common.go | 46 ----------- services/unis/rpcclient.go | 17 ---- services/unis/rpcservice.go | 35 -------- services/uniss/common.go | 47 +++++++++++ services/uniss/httphandle.go | 66 +++++++++++++++ services/uniss/main.go | 58 ++++++++++++++ services/uniss/rpcclient.go | 71 ++++++++++++++++ services/uniss/rpcservice.go | 54 +++++++++++++ settings/type.go | 6 +- utils/runtime.go | 12 +++ 28 files changed, 1008 insertions(+), 249 deletions(-) delete mode 100644 controller/unis/commonser.go delete mode 100644 controller/unis/datatype.go create mode 100644 controller/unisc/httphandle.go create mode 100644 controller/unisc/httproute.go create mode 100644 dao/dadis/unisd/datacenter.go create mode 100644 dao/dadis/unisd/type.go create mode 100644 models/unishttpparams.go create mode 100644 pkg/errgroups/errgroups.go create mode 100644 pkg/errgroups/errgroups_test.go delete mode 100644 services/unis/common.go delete mode 100644 services/unis/rpcclient.go delete mode 100644 services/unis/rpcservice.go create mode 100644 services/uniss/common.go create mode 100644 services/uniss/httphandle.go create mode 100644 services/uniss/main.go create mode 100644 services/uniss/rpcclient.go create mode 100644 services/uniss/rpcservice.go create mode 100644 utils/runtime.go diff --git a/config/config.yaml b/config/config.yaml index 3d6ce70..ce7d531 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -22,10 +22,11 @@ database: port: 3306 max_conns: 200 -rpc: - type: "json" - host: "" - port: 5501 +unis: + rpc: + type: "json" + host: "" + port: 5501 # 使用令牌桶限流 rate: diff --git a/controller/unis/commonser.go b/controller/unis/commonser.go deleted file mode 100644 index cab78bb..0000000 --- a/controller/unis/commonser.go +++ /dev/null @@ -1,27 +0,0 @@ -package unis - -import ( - "dashboard/models" - "dashboard/services/unis" - "net/http" - - "github.com/gin-gonic/gin" -) - -func StationConfig(c *gin.Context) error { - var config models.StationConfigParams - if err := c.ShouldBindJSON(&config); err != nil { - return err - } - - if err := unis.StationConfig(&config); err != nil { - return err - } - - c.JSON(http.StatusOK, gin.H{ - "code": 0, - "msg": "Server station config ok", - }) - - return nil -} diff --git a/controller/unis/datatype.go b/controller/unis/datatype.go deleted file mode 100644 index d8870ba..0000000 --- a/controller/unis/datatype.go +++ /dev/null @@ -1,5 +0,0 @@ -package unis - -const ( - UNIS_HTTP_URL_CONFIG_ADD = "/api/unis/config/v1/add" -) diff --git a/controller/unisc/httphandle.go b/controller/unisc/httphandle.go new file mode 100644 index 0000000..84d7af6 --- /dev/null +++ b/controller/unisc/httphandle.go @@ -0,0 +1,76 @@ +package unisc + +import ( + "dashboard/dao/dadis/unisd" + "dashboard/models" + "net/http" + "reflect" + "time" + + "github.com/gin-gonic/gin" +) + +type httpHandle struct { + httpC chan *models.UnisHttpRequest +} + +func newhttpHandle(httpc chan *models.UnisHttpRequest) *httpHandle { + res := new(httpHandle) + + res.httpC = httpc + + return res +} + +func httpSendWithExpire(httpc chan *models.UnisHttpRequest, data *models.UnisHttpRequest) (*models.UnisHttpResponse, error) { + resc := make(chan *models.UnisHttpResponse) + data.ResC = resc + + select { + case httpc <- data: + case <-time.After(2 * time.Second): + return nil, models.ErrorTimeOut + } + + select { + case resp := <-resc: + return resp, nil + case <-time.After(2 * time.Second): + return nil, models.ErrorTimeOut + } +} + +func (u *httpHandle) stationConfig(c *gin.Context) error { + config := new(models.StationConfigParams) + if err := c.ShouldBindJSON(config); err != nil { + return err + } + + if !config.StationConfig.ConfigType { + res, ok := unisd.UnisDataGet(string(unisd.DadisKey_UnisStationInfo)) + if ok { + oldCfg := res.(*models.StationConfigParams) + + oldCfg.StationConfig.ConfigType = config.StationConfig.ConfigType + if reflect.DeepEqual(config, oldCfg) { + c.JSON(http.StatusOK, models.UnisHttpResponseOk) + + return nil + } + } + } + + req := &models.UnisHttpRequest{ + Id: models.UnisHttpUrl(c.Request.URL.String()).GetMsgId(), + Msg: config, + } + + resp, err := httpSendWithExpire(u.httpC, req) + if err != nil { + return err + } + + c.JSON(http.StatusOK, resp) + + return nil +} diff --git a/controller/unisc/httproute.go b/controller/unisc/httproute.go new file mode 100644 index 0000000..07b57cf --- /dev/null +++ b/controller/unisc/httproute.go @@ -0,0 +1,128 @@ +package unisc + +import ( + "bytes" + "dashboard/dao/sqldb" + "dashboard/models" + "dashboard/routes" + "dashboard/services/uniss" + "dashboard/utils" + "io" + "net/http" + "strconv" + + "github.com/gin-gonic/gin" +) + +type HttpRoute struct { + httpHandle *httpHandle + middleWare *middleWare +} + +func NewHttpRoute(httpc chan *models.UnisHttpRequest) *HttpRoute { + res := new(HttpRoute) + + res.httpHandle = newhttpHandle(httpc) + res.middleWare = newmiddleWare() + + return res +} + +func (u *HttpRoute) AddRoute(r *gin.Engine) { + unisr := r.Group("/api/unis") + unisr.Use(routes.ErrWapper(u.middleWare.GinCheckServerId), routes.ErrWapper(u.middleWare.GinStoreRequest)) + { + unisr.POST("/config/v1/add", routes.ErrWapper(u.httpHandle.stationConfig)) + } +} + +type middleWare struct { +} + +func newmiddleWare() *middleWare { + return new(middleWare) +} + +func (m *middleWare) GinCheckServerId(c *gin.Context) error { + serverId := c.GetHeader("serverId") + + if serverId != serverId { + c.Abort() + return &models.BaseError{ + Code: http.StatusServiceUnavailable, + Msg: "Server Id error", + } + } + + channel, err := strconv.Atoi(c.GetHeader("channel")) + if err != nil { + c.Abort() + return &models.BaseError{ + Code: http.StatusServiceUnavailable, + Msg: "Channel Id error", + } + } + + c.Set(models.GinContextServerId, serverId) + c.Set(models.GinContextChannel, channel) + + c.Next() + + return nil +} + +func (m *middleWare) GinStoreRequest(c *gin.Context) (err error) { + defer func() { + if err != nil { + c.Abort() + return + } + + c.Next() + }() + + log, _ := utils.GetLogFromContext(c) + channel := c.GetInt(models.GinContextChannel) + + oldConf, _err := sqldb.ResFulDataGet(c.Request.Method, c.Request.URL.String(), channel) + if _err != nil { + err = _err + } + + bodyBytes, _err := io.ReadAll(c.Request.Body) + if _err != nil { + err = _err + } + + c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + + if oldConf != string(bodyBytes) { + log.Sugar().Info("The resful request is not equel local store") + if _err := sqldb.ResFulDataStore(c.Request.Method, c.Request.URL.String(), string(bodyBytes), channel); err != nil { + err = _err + } + } + + return nil +} + +func (m *middleWare) GinWithUnisObject(unis *uniss.UnisStation) gin.HandlerFunc { + return func(c *gin.Context) { + c.Set(models.GinContextUnis, unis) + c.Next() + } +} + +func (m *middleWare) GetUnisObjectFromContext(c *gin.Context) (*uniss.UnisStation, error) { + res, ok := c.Get(models.GinContextUnis) + if !ok { + return nil, models.ErrorInvalidData + } + + unis, ok := res.(*uniss.UnisStation) + if !ok { + return nil, models.ErrorInvalidData + } + + return unis, nil +} diff --git a/dao/dadis/unisd/datacenter.go b/dao/dadis/unisd/datacenter.go new file mode 100644 index 0000000..f66f603 --- /dev/null +++ b/dao/dadis/unisd/datacenter.go @@ -0,0 +1,16 @@ +package unisd + +import ( + "dashboard/pkg/dadis" + "time" +) + +var unisDataCenter = dadis.NewDadis() + +func UnisDataSet(key string, value interface{}, expire time.Duration) { + unisDataCenter.Set(key, value, expire) +} + +func UnisDataGet(key string) (interface{}, bool) { + return unisDataCenter.Get(key) +} diff --git a/dao/dadis/unisd/type.go b/dao/dadis/unisd/type.go new file mode 100644 index 0000000..b77ebfb --- /dev/null +++ b/dao/dadis/unisd/type.go @@ -0,0 +1,8 @@ +package unisd + +type DadisKey string + +const ( + DadisKey_UnisSystemInfo DadisKey = "UnisSystemInfo" + DadisKey_UnisStationInfo DadisKey = "UnisStationInfo" +) \ No newline at end of file diff --git a/go.mod b/go.mod index e65da7a..7cd4594 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/creack/pty v1.1.24 github.com/fsnotify/fsnotify v1.9.0 github.com/gin-contrib/cors v1.7.5 + github.com/gin-contrib/pprof v1.5.3 github.com/gin-gonic/gin v1.10.0 github.com/go-sql-driver/mysql v1.8.1 github.com/golang-jwt/jwt/v5 v5.2.2 @@ -21,7 +22,7 @@ require ( github.com/shirou/gopsutil/v4 v4.25.4 github.com/spf13/viper v1.20.1 go.uber.org/zap v1.27.0 - golang.org/x/sync v0.12.0 + golang.org/x/sync v0.13.0 ) require ( @@ -60,11 +61,11 @@ require ( github.com/ugorji/go/codec v1.2.12 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/arch v0.15.0 // indirect - golang.org/x/crypto v0.36.0 // indirect + golang.org/x/arch v0.16.0 // indirect + golang.org/x/crypto v0.37.0 // indirect golang.org/x/net v0.38.0 // indirect - golang.org/x/sys v0.31.0 // indirect - golang.org/x/text v0.23.0 // indirect + golang.org/x/sys v0.32.0 // indirect + golang.org/x/text v0.24.0 // indirect google.golang.org/protobuf v1.36.6 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 3af0459..8a5e3d7 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,8 @@ github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3G github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8= github.com/gin-contrib/cors v1.7.5 h1:cXC9SmofOrRg0w9PigwGlHG3ztswH6bqq4vJVXnvYMk= github.com/gin-contrib/cors v1.7.5/go.mod h1:4q3yi7xBEDDWKapjT2o1V7mScKDDr8k+jZ0fSquGoy0= +github.com/gin-contrib/pprof v1.5.3 h1:Bj5SxJ3kQDVez/s/+f9+meedJIqLS+xlkIVDe/lcvgM= +github.com/gin-contrib/pprof v1.5.3/go.mod h1:0+LQSZ4SLO0B6+2n6JBzaEygpTBxe/nI+YEYpfQQ6xY= github.com/gin-contrib/sse v1.0.0 h1:y3bT1mUWUxDpW4JLQg/HnTqV4rozuW4tC9eFKTxYI9E= github.com/gin-contrib/sse v1.0.0/go.mod h1:zNuFdwarAygJBht0NTKiSi3jRf6RbqeILZ9Sp6Slhe0= github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU= @@ -141,23 +143,23 @@ go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/arch v0.15.0 h1:QtOrQd0bTUnhNVNndMpLHNWrDmYzZ2KDqSrEymqInZw= -golang.org/x/arch v0.15.0/go.mod h1:JmwW7aLIoRUKgaTzhkiEFxvcEiQGyOg9BMonBJUS7EE= -golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= -golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= +golang.org/x/arch v0.16.0 h1:foMtLTdyOmIniqWCHjY6+JxuC54XP1fDwx4N0ASyW+U= +golang.org/x/arch v0.16.0/go.mod h1:JmwW7aLIoRUKgaTzhkiEFxvcEiQGyOg9BMonBJUS7EE= +golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= +golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= -golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= -golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= +golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= -golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= -golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= diff --git a/main.go b/main.go index b5a087f..609a7bd 100644 --- a/main.go +++ b/main.go @@ -2,21 +2,15 @@ package main import ( "context" + "dashboard/controller/unisc" "dashboard/dao/sqldb" "dashboard/logger" + "dashboard/pkg/errgroups" "dashboard/routes" - "dashboard/services/unis" + "dashboard/services/uniss" "dashboard/settings" - "errors" "flag" "fmt" - "net/http" - "os" - "os/signal" - "syscall" - "time" - - "go.uber.org/zap" ) var config = flag.String("f", "./config/config.yaml", "config file path") @@ -54,16 +48,22 @@ func main() { log.Sugar().Panicf("New db error: %v", err) } - err = unis.RpcListenAndServe(log, sets.RpcConfig) - if err != nil { - log.Sugar().Panicf("New db error: %v", err) - } + errwg := errgroups.NewErrGroup() - r := routes.Setup(log, *sets.RateLimitConfig, *sets.JwtConfig) + // r := routes.Setup(log, *sets.RateLimitConfig, *sets.JwtConfig) - listenAndServe(fmt.Sprintf(":%d", sets.BaseConfig.Port), r, log) + // listenAndServe(fmt.Sprintf(":%d", sets.BaseConfig.Port), r, log) + sunis := uniss.NewUnis(log, sets.UnisConfig) + + errwg.Add(routes.NewRouter(fmt.Sprintf(":%d", sets.BaseConfig.Port), + log, *sets.RateLimitConfig, *sets.JwtConfig, + unisc.NewHttpRoute(sunis.GetHttpChannel()))) + errwg.Add(sunis) + + log.Sugar().Fatalln(errwg.Run(context.Background())) } +/* func listenAndServe(addr string, handle http.Handler, log *logger.Logger) { srv := &http.Server{ Addr: addr, @@ -97,3 +97,4 @@ func listenAndServe(addr string, handle http.Handler, log *logger.Logger) { log.Info("Server exiting") } +*/ diff --git a/models/type.go b/models/type.go index d9d3a70..bbc8af1 100644 --- a/models/type.go +++ b/models/type.go @@ -9,6 +9,7 @@ const ( GinContextLog = "log" GinContextChannel = "channel" GinContextServerId = "serverid" + GinContextUnis = "unis" ) const ( @@ -20,6 +21,7 @@ var ( ErrorInvalidData = errors.New("no such value") ErrorPasswordErr = errors.New("user or password invalid") ErrorSqlInitErr = errors.New("database init err") + ErrorTimeOut = errors.New("time out") ) const ( diff --git a/models/unishttpparams.go b/models/unishttpparams.go new file mode 100644 index 0000000..d68d695 --- /dev/null +++ b/models/unishttpparams.go @@ -0,0 +1,35 @@ +package models + +type UnisHttpRequest struct { + ResC chan *UnisHttpResponse + Id string + Msg interface{} +} + +type UnisHttpResponse struct { + Code int + Msg interface{} + Data interface{} +} + +type UnisHttpUrl string + +const ( + UNIS_HTTP_URL_CONFIG_ADD UnisHttpUrl = "/api/unis/config/v1/add" +) + +func (u UnisHttpUrl) GetMsgId() string { + return mapHttpUrlId[u] +} + +type UnisHttpMsgId string + +const ( + UNIS_HTTP_ID_CONFIG_ADD UnisHttpMsgId = "/api/unis/config/v1/add" +) + +var mapHttpUrlId = map[UnisHttpUrl]string{ + UNIS_HTTP_URL_CONFIG_ADD: string(UNIS_HTTP_ID_CONFIG_ADD), +} + +var UnisHttpResponseOk = &UnisHttpResponse{Code: int(CodeSuccess), Msg: CodeSuccess.String()} diff --git a/pkg/errgroups/errgroups.go b/pkg/errgroups/errgroups.go new file mode 100644 index 0000000..87b04d5 --- /dev/null +++ b/pkg/errgroups/errgroups.go @@ -0,0 +1,115 @@ +package errgroups + +import ( + "context" + "dashboard/utils" + "fmt" + "sync" + + "golang.org/x/sync/errgroup" +) + +type ErrGroupFunc struct { + wg *errgroup.Group + mu sync.RWMutex + funs map[string]FuncWithErrFunc +} + +type FuncWithErrFunc func(context.Context) error + +func NewErrGroupFunc() *ErrGroupFunc { + return &ErrGroupFunc{ + funs: make(map[string]FuncWithErrFunc), + } +} + +func (e *ErrGroupFunc) Run(ctx context.Context) error { + e.mu.RLock() + defer e.mu.RUnlock() + + e.wg, ctx = errgroup.WithContext(ctx) + + for name, fn := range e.funs { + fn := fn + name := name + e.wg.Go(func() error { + defer fmt.Printf("func: %s stop\n", name) + fmt.Printf("func: %s runing...\n", name) + return fn(ctx) + }) + } + + if err := e.wg.Wait(); err != nil { + return err + } + + return nil +} + +func (e *ErrGroupFunc) Add(fn FuncWithErrFunc) { + e.mu.Lock() + defer e.mu.Unlock() + + fnStr := utils.GetFuncName(fn) + + e.funs[fnStr] = fn +} + +func (e *ErrGroupFunc) Del(fn FuncWithErrFunc) { + e.mu.Lock() + defer e.mu.Unlock() + + fnStr := utils.GetFuncName(fn) + + delete(e.funs, fnStr) +} + +type ErrGroup struct { + wg *errgroup.Group + mu sync.RWMutex + funs map[FuncWithErr]struct{} +} + +type FuncWithErr interface { + Run(context.Context) error +} + +func NewErrGroup() *ErrGroup { + return &ErrGroup{ + funs: make(map[FuncWithErr]struct{}), + } +} + +func (e *ErrGroup) Run(ctx context.Context) error { + e.mu.RLock() + defer e.mu.RUnlock() + + e.wg, ctx = errgroup.WithContext(ctx) + + for fn := range e.funs { + fn := fn + e.wg.Go(func() error { + return fn.Run(ctx) + }) + } + + if err := e.wg.Wait(); err != nil { + return err + } + + return nil +} + +func (e *ErrGroup) Add(fn FuncWithErr) { + e.mu.Lock() + defer e.mu.Unlock() + + e.funs[fn] = struct{}{} +} + +func (e *ErrGroup) Del(fn FuncWithErr) { + e.mu.Lock() + defer e.mu.Unlock() + + delete(e.funs, fn) +} diff --git a/pkg/errgroups/errgroups_test.go b/pkg/errgroups/errgroups_test.go new file mode 100644 index 0000000..e703422 --- /dev/null +++ b/pkg/errgroups/errgroups_test.go @@ -0,0 +1,78 @@ +package errgroups + +import ( + "context" + "errors" + "fmt" + "testing" + "time" +) + +func fn_test1(ctx context.Context) error { + select { + case <-ctx.Done(): + fmt.Println("fn_test1 called cancel") + return ctx.Err() + case <-time.After(time.Second): + fmt.Println("fn_test1 called ok") + } + return nil +} + +func fn_test2(ctx context.Context) error { + fmt.Println("fn_test2 called") + return nil +} + +func fn_test3(ctx context.Context) error { + fmt.Println("fn_test3 called") + return errors.New("call fn_test3 get error") +} + +type err_test1 struct{} + +func(e *err_test1)Run(ctx context.Context)error{ + return fn_test1(ctx) +} + +type err_test2 struct{} + +func(e *err_test2)Run(ctx context.Context)error{ + return fn_test2(ctx) +} + +type err_test3 struct{} + +func(e *err_test3)Run(ctx context.Context)error{ + return fn_test3(ctx) +} + +func Test_errGroupsFunc(t *testing.T) { + errgws := NewErrGroupFunc() + + errgws.Add(fn_test1) + errgws.Add(fn_test2) + errgws.Add(fn_test3) + + err := errgws.Run(context.Background()) + if err != nil { + t.Errorf("err: %v", err) + } + + t.Log("ok") +} + +func Test_errGroups(t *testing.T) { + errgws := NewErrGroup() + + errgws.Add(&err_test1{}) + errgws.Add(&err_test2{}) + errgws.Add(&err_test3{}) + + err := errgws.Run(context.Background()) + if err != nil { + t.Errorf("err: %v", err) + } + + t.Log("ok") +} diff --git a/pkg/rpcsup/jsonrpc.go b/pkg/rpcsup/jsonrpc.go index 3764f08..c59eb31 100644 --- a/pkg/rpcsup/jsonrpc.go +++ b/pkg/rpcsup/jsonrpc.go @@ -1,16 +1,22 @@ package rpcsup import ( + "context" "dashboard/logger" + "errors" "net" "net/rpc" "net/rpc/jsonrpc" + + "golang.org/x/sync/errgroup" ) -func JsonServer(log *logger.Logger, address string, service interface{}) error { - if err := rpc.Register(service); err != nil { - log.Sugar().Error(err) - return err +func JsonServer(ctx context.Context, log *logger.Logger, address string, service ...interface{}) error { + for _, re := range service { + if err := rpc.Register(re); err != nil { + log.Sugar().Error(err) + return err + } } listen, err := net.Listen("tcp", address) @@ -21,15 +27,39 @@ func JsonServer(log *logger.Logger, address string, service interface{}) error { log.Sugar().Infof("Rpc server listen on: %s", address) - for { - accept, err := listen.Accept() - if err != nil { - log.Sugar().Error(err) - continue + wg, ctxx := errgroup.WithContext(ctx) + + wg.Go(func() error { + for { + accept, err := listen.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + log.Sugar().Error(err) + + break + } + log.Sugar().Error(err) + continue + } + + go jsonrpc.ServeConn(accept) } - jsonrpc.ServeConn(accept) + return err + }) + + wg.Go(func() error { + <-ctxx.Done() + log.Sugar().Errorf("Unis Rpc Server cancel by ctx") + listen.Close() + return ctxx.Err() + }) + + if err := wg.Wait(); err != nil { + return err } + + return nil } func JsonClient(log *logger.Logger, address string) (*rpc.Client, error) { diff --git a/routes/middleware.go b/routes/middleware.go index c018e19..36b36a9 100644 --- a/routes/middleware.go +++ b/routes/middleware.go @@ -1,21 +1,16 @@ package routes import ( - "bytes" - "dashboard/dao/sqldb" "dashboard/logger" "dashboard/models" "dashboard/pkg/jwt" "dashboard/pkg/rate" "dashboard/settings" - "dashboard/utils" - "io" "net" "net/http" "net/http/httputil" "os" "runtime/debug" - "strconv" "strings" "time" @@ -160,66 +155,3 @@ func GinRateLimit(rateC settings.RateLimitConfig) appHandler { return nil } } - -func GinCheckServerId(c *gin.Context) error { - serverId := c.GetHeader("serverId") - - if serverId != serverId { - c.Abort() - return &models.BaseError{ - Code: http.StatusServiceUnavailable, - Msg: "Server Id error", - } - } - - channel, err := strconv.Atoi(c.GetHeader("channel")) - if err != nil { - c.Abort() - return &models.BaseError{ - Code: http.StatusServiceUnavailable, - Msg: "Channel Id error", - } - } - - c.Set(models.GinContextServerId, serverId) - c.Set(models.GinContextChannel, channel) - - c.Next() - - return nil -} - -func GinStoreRequest(c *gin.Context) (err error) { - defer func() { - if err != nil { - c.Abort() - return - } - - c.Next() - }() - - log, _ := utils.GetLogFromContext(c) - channel := c.GetInt(models.GinContextChannel) - - oldConf, _err := sqldb.ResFulDataGet(c.Request.Method, c.Request.URL.String(), channel) - if _err != nil { - err = _err - } - - bodyBytes, _err := io.ReadAll(c.Request.Body) - if _err != nil { - err = _err - } - - c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) - - if oldConf != string(bodyBytes) { - log.Sugar().Info("The resful request is not equel local store") - if _err := sqldb.ResFulDataStore(c.Request.Method, c.Request.URL.String(), string(bodyBytes), channel); err != nil { - err = _err - } - } - - return nil -} diff --git a/routes/routes.go b/routes/routes.go index 4db3632..bbfb3de 100644 --- a/routes/routes.go +++ b/routes/routes.go @@ -1,17 +1,142 @@ package routes import ( + "context" "dashboard/controller" - "dashboard/controller/unis" "dashboard/logger" "dashboard/pkg/jwt" "dashboard/settings" + "errors" + "net/http" + "os" + "os/signal" + "syscall" "time" "github.com/gin-contrib/cors" + "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) +type Router struct { + r *gin.Engine + addr string + log *logger.Logger +} + +type AddRouter interface { + AddRoute(*gin.Engine) +} + +func NewRouter(addr string, log *logger.Logger, rate settings.RateLimitConfig, jwtC settings.JwtConfig, routes ...AddRouter) *Router { + cjwt := jwt.New(jwt.WithSalt(jwtC.Salt), jwt.WithExpire(time.Duration(jwtC.Expire)*time.Second)) + + r := gin.New() + + r.Use(cors.New(cors.Config{ + AllowOrigins: []string{"*"}, + AllowMethods: []string{"GET", "POST"}, + AllowHeaders: []string{"Origin", "Content-Type"}, + ExposeHeaders: []string{"Content-Length"}, + AllowCredentials: true, + MaxAge: 12 * time.Hour, + })) + + r.Use(GinLogger(log), GinRecovery(log, true), GinLog(log), errWapper(GinRateLimit(rate))) + + // 静态文件服务 + r.Static("/static", "./static") + r.StaticFile("/", "./static/index.html") + + r.GET("login", controller.LoginPage) + + r.POST("/api/login", errWapper(controller.UserSignInHandler(cjwt))) + r.POST("/api/logout", controller.UserLogOutHandler) + + g1 := r.Group("/api") + g1.Use(errWapper(GinJwtAuthor(cjwt))) + { + g1.GET("/system-info", errWapper(controller.SystemInfoHandle)) + g1.POST("/upload", errWapper(controller.FileUploadHandle)) + g1.GET("/files", errWapper(controller.FileListHandle)) + g1.GET("/download", errWapper(controller.FileDownloadHandle)) + } + + r.GET("/ws/terminal", errWapper(GinJwtAuthor(cjwt)), errWapper(controller.TerminalHandle())) + + for _, route := range routes { + route.AddRoute(r) + } + + pprof.Register(r) + + return &Router{ + r: r, + addr: addr, + log: log, + } +} + +func (r *Router) Run(ctx context.Context) error { + srv := &http.Server{ + Addr: r.addr, + Handler: r.r, + } + + r.log.Sugar().Infof("Server listen on %s", r.addr) + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + // 开启一个goroutine启动服务 + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + r.log.Error("listen: %s\n", zap.Error(err)) + + return err + } + + return nil + }) + + wg.Go(func() error { + // 等待中断信号来优雅地关闭服务器,为关闭服务器操作设置一个5秒的超时 + quit := make(chan os.Signal, 1) // 创建一个接收信号的通道 + // kill 默认会发送 syscall.SIGTERM 信号 + // kill -2 发送 syscall.SIGINT 信号,我们常用的Ctrl+C就是触发系统SIGINT信号 + // kill -9 发送 syscall.SIGKILL 信号,但是不能被捕获,所以不需要添加它 + // signal.Notify把收到的 syscall.SIGINT或syscall.SIGTERM 信号转发给quit + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) // 此处不会阻塞 + select { + case <-ctx.Done(): + r.log.Error("Http Server Cancel by ctx") + case <-quit: + } + // 阻塞在此,当接收到上述两种信号时才会往下执行 + r.log.Info("Shutdown Server ...") + // 创建一个5秒超时的context + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + // 5秒内优雅关闭服务(将未处理完的请求处理完再关闭服务),超过5秒就超时退出 + if err := srv.Shutdown(ctx); err != nil { + r.log.Error("Server Shutdown: ", zap.Error(err)) + + return err + } + + r.log.Info("Server exiting") + + return errors.New("Server Shutdown") + }) + + if err := wg.Wait(); err != nil { + return err + } + + return nil +} + +/* func Setup(log *logger.Logger, rate settings.RateLimitConfig, jwtC settings.JwtConfig) *gin.Engine { cjwt := jwt.New(jwt.WithSalt(jwtC.Salt), jwt.WithExpire(time.Duration(jwtC.Expire)*time.Second)) @@ -48,11 +173,12 @@ func Setup(log *logger.Logger, rate settings.RateLimitConfig, jwtC settings.JwtC r.GET("/ws/terminal", errWapper(GinJwtAuthor(cjwt)), errWapper(controller.TerminalHandle())) - unisr := r.Group("/api/unis") - unisr.Use(errWapper(GinCheckServerId), errWapper(GinStoreRequest)) - { - unisr.POST("/config/v1/add", errWapper(unis.StationConfig)) - } + // unisr := r.Group("/api/unis") + // unisr.Use(errWapper(GinCheckServerId), errWapper(GinStoreRequest)) + // { + // unisr.POST("/config/v1/add", errWapper(unis.StationConfig)) + // } return r } +*/ diff --git a/routes/wrapperr.go b/routes/wrapperr.go index 44e960a..fbb8d0c 100644 --- a/routes/wrapperr.go +++ b/routes/wrapperr.go @@ -12,6 +12,8 @@ import ( type appHandler func(*gin.Context) error +type AppHandler func(*gin.Context) error + func errWapper(appH appHandler) gin.HandlerFunc { return func(c *gin.Context) { log, _ := utils.GetLogFromContext(c) @@ -35,3 +37,27 @@ func errWapper(appH appHandler) gin.HandlerFunc { } } } + +func ErrWapper(appH appHandler) gin.HandlerFunc { + return func(c *gin.Context) { + log, _ := utils.GetLogFromContext(c) + + err := appH(c) + + if err != nil { + var baseErr *models.BaseError + if errors.As(err, &baseErr) { + log.Error("Base error", zap.Any("baseErr", baseErr)) + + c.JSON(http.StatusBadRequest, baseErr) + + return + } + + log.Error("Other error", zap.Error(err)) + c.String(http.StatusBadGateway, err.Error()) + + return + } + } +} diff --git a/services/unis/common.go b/services/unis/common.go deleted file mode 100644 index dc4be18..0000000 --- a/services/unis/common.go +++ /dev/null @@ -1,46 +0,0 @@ -package unis - -import ( - "dashboard/models" - "dashboard/pkg/rpcsup" - "fmt" - "net/rpc" -) - -type rpcClients map[string]*rpc.Client - -var rpcMapClients rpcClients - -func StationConfig(config *models.StationConfigParams) error { - CreateRpcClients(&rpcMapClients, &config.StationConfig.Screens) - - fmt.Println(GetConfig(models.UnisRpcRequest{Id: "zhangshuo"})) - - return nil -} - -func CreateRpcClients(rawClients *rpcClients, screens *[]models.Screens) { - if *rawClients == nil { - *rawClients = make(map[string]*rpc.Client) - } - - for _, cli := range *rawClients { - if cli != nil { - cli.Close() - } - } - - for _, sc := range *screens { - id := sc.NodeID - address := fmt.Sprintf("%s:%d", sc.NodeIP, 5501) - - cli, err := rpcsup.JsonClient(log, address) - if err != nil { - log.Sugar().Errorf("%s create rpc client error: %s", id, address) - continue - } - (*rawClients)[id] = cli - } - - return -} diff --git a/services/unis/rpcclient.go b/services/unis/rpcclient.go deleted file mode 100644 index d1c4fc0..0000000 --- a/services/unis/rpcclient.go +++ /dev/null @@ -1,17 +0,0 @@ -package unis - -import ( - "dashboard/models" - "fmt" -) - -func GetConfig(req models.UnisRpcRequest) (*models.UnisRpcResponse, error) { - out := &models.UnisRpcResponse{} - - for key, value := range rpcMapClients { - fmt.Println(key) - value.Call(models.UnisStationConfig.Methord(), req, out) - } - - return out, nil -} diff --git a/services/unis/rpcservice.go b/services/unis/rpcservice.go deleted file mode 100644 index df2a3f0..0000000 --- a/services/unis/rpcservice.go +++ /dev/null @@ -1,35 +0,0 @@ -package unis - -import ( - "dashboard/logger" - "dashboard/models" - "dashboard/pkg/rpcsup" - "dashboard/settings" - "fmt" -) - -var log *logger.Logger - -type UnisRpcService struct { -} - -func NewRpcService() *UnisRpcService { - return &UnisRpcService{} -} - -func RpcListenAndServe(_log *logger.Logger, config *settings.RpcConfig) error { - log = _log - addrees := fmt.Sprintf("%s:%d", config.Host, config.Port) - - go rpcsup.JsonServer(log, addrees, NewRpcService()) - - return nil -} - -func (u *UnisRpcService) Config(res models.UnisRpcRequest, rsp *models.UnisRpcResponse) error { - fmt.Println("rpc server get mesage",res) - - rsp.Id = res.Id - - return nil -} diff --git a/services/uniss/common.go b/services/uniss/common.go new file mode 100644 index 0000000..560cbb9 --- /dev/null +++ b/services/uniss/common.go @@ -0,0 +1,47 @@ +package uniss + +import ( + "dashboard/dao/dadis/unisd" + "dashboard/models" + "fmt" +) + +type commonService struct { + rpcClients *rpcClients +} + +func newcommonService(http *httpHandle) *commonService { + res := new(commonService) + + http.pushHandle(string(models.UNIS_HTTP_ID_CONFIG_ADD), res.stationConfig) + + return res +} + +func (c *commonService) stationConfig(reqest *models.UnisHttpRequest) (*models.UnisHttpResponse, error) { + req := reqest.Msg.(*models.StationConfigParams) + fmt.Println(req) + + var addres []*rpcIdAddres + for _, sc := range req.StationConfig.Screens { + add := new(rpcIdAddres) + + add.host = sc.NodeIP + add.id = sc.NodeID + + addres = append(addres, add) + } + + c.rpcClients.flushRpcClients(addres) + + fmt.Println(c.rpcClients.getConfig(addres[0].id, models.UnisRpcRequest{Id: "zhangshuo"})) + + res := &models.UnisHttpResponse{ + Code: int(models.CodeSuccess), + Msg: "config ok", + } + + unisd.UnisDataSet(string(unisd.DadisKey_UnisStationInfo), req, 0) + + return res, nil +} diff --git a/services/uniss/httphandle.go b/services/uniss/httphandle.go new file mode 100644 index 0000000..1566ad6 --- /dev/null +++ b/services/uniss/httphandle.go @@ -0,0 +1,66 @@ +package uniss + +import ( + "dashboard/models" + "sync" +) + +type httpHandle struct { + httpC chan *models.UnisHttpRequest + mu sync.RWMutex + mapFn map[string]HttpHandler +} + +type HttpHandler func(*models.UnisHttpRequest) (*models.UnisHttpResponse, error) + +func newhttpHandle() *httpHandle { + res := new(httpHandle) + + res.httpC = make(chan *models.UnisHttpRequest) + res.mapFn = make(map[string]HttpHandler) + + return res +} + +func (u *httpHandle) httpHandle(msg *models.UnisHttpRequest) { + var res *models.UnisHttpResponse + resC := msg.ResC + defer func() { + if resC != nil { + resC <- res + } + }() + + fn, ok := u.getHandle(msg.Id) + if ok { + if re, err := fn(msg); err == nil { + res = re + } + } +} + +func (u *httpHandle) pushHandle(key string, handle HttpHandler) { + u.mu.Lock() + defer u.mu.Unlock() + + u.mapFn[key] = handle +} + +func (u *httpHandle) delHandle(key string) { + u.mu.Lock() + defer u.mu.Unlock() + + delete(u.mapFn, key) +} + +func (u *httpHandle) getHandle(key string) (HttpHandler, bool) { + u.mu.RLock() + defer u.mu.RUnlock() + + res, ok := u.mapFn[key] + if !ok { + return nil, false + } + + return res, ok +} diff --git a/services/uniss/main.go b/services/uniss/main.go new file mode 100644 index 0000000..368f69e --- /dev/null +++ b/services/uniss/main.go @@ -0,0 +1,58 @@ +package uniss + +import ( + "context" + "dashboard/logger" + "dashboard/models" + "dashboard/pkg/errgroups" + "dashboard/settings" +) + +type UnisStation struct { + log *logger.Logger + conf *settings.UnisConfig + httpHandle *httpHandle + commonService *commonService +} + +func NewUnis(log *logger.Logger, conf *settings.UnisConfig) *UnisStation { + res := new(UnisStation) + + res.log = log + res.conf = conf + + res.httpHandle = newhttpHandle() + res.commonService = newcommonService(res.httpHandle) + res.commonService.rpcClients = newRpcClients(log, conf.RpcConfig.Port) + + return res +} + +func (u *UnisStation) GetHttpChannel() chan *models.UnisHttpRequest { + return u.httpHandle.httpC +} + +func (u *UnisStation) mesageHandle(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + u.log.Error("Unis mesageHandle cancel by ctx") + return ctx.Err() + case httpMsg := <-u.httpHandle.httpC: + u.httpHandle.httpHandle(httpMsg) + // default: + } + + } +} + +func (u *UnisStation) Run(ctx context.Context) error { + rpc := newUnisRpcServer(u.log, u.conf.RpcConfig) + + ewgs := errgroups.NewErrGroupFunc() + + ewgs.Add(rpc.rpcListenAndServe) + ewgs.Add(u.mesageHandle) + + return ewgs.Run(ctx) +} diff --git a/services/uniss/rpcclient.go b/services/uniss/rpcclient.go new file mode 100644 index 0000000..6390cd2 --- /dev/null +++ b/services/uniss/rpcclient.go @@ -0,0 +1,71 @@ +package uniss + +import ( + "dashboard/logger" + "dashboard/models" + "dashboard/pkg/rpcsup" + "fmt" + "net/rpc" + "sync" +) + +type rpcClients struct { + *logger.Logger + mu sync.RWMutex + rpcMapClients map[string]*rpc.Client + port int +} + +type rpcIdAddres struct { + id string + host string + port int +} + +func newRpcClients(log *logger.Logger, mport int) *rpcClients { + res := new(rpcClients) + + res.Logger = log + res.port = mport + + res.rpcMapClients = make(map[string]*rpc.Client) + + return res +} + +func (r *rpcClients) flushRpcClients(ids []*rpcIdAddres) { + r.mu.Lock() + defer r.mu.Unlock() + + for _, cli := range r.rpcMapClients { + if cli != nil { + cli.Close() + } + } + + for _, idAddr := range ids { + address := fmt.Sprintf("%s:%d", idAddr.host, r.port) + + cli, err := rpcsup.JsonClient(r.Logger, address) + if err != nil { + r.Sugar().Errorf("%s create rpc client error: %s", idAddr.id, address) + continue + } + r.rpcMapClients[idAddr.id] = cli + } +} + +func (r *rpcClients) getConfig(id string, req models.UnisRpcRequest) (*models.UnisRpcResponse, error) { + r.mu.RLock() + defer r.mu.RUnlock() + + out := &models.UnisRpcResponse{} + + if client := r.rpcMapClients[id]; client != nil { + if err := client.Call(models.UnisStationConfig.Methord(), req, out); err != nil { + return nil, err + } + } + + return out, nil +} diff --git a/services/uniss/rpcservice.go b/services/uniss/rpcservice.go new file mode 100644 index 0000000..d7ef948 --- /dev/null +++ b/services/uniss/rpcservice.go @@ -0,0 +1,54 @@ +package uniss + +import ( + "context" + "dashboard/logger" + "dashboard/models" + "dashboard/pkg/rpcsup" + "dashboard/settings" + "fmt" +) + +type UnisRpcService struct { + *logger.Logger +} + +func NewRpcService(log *logger.Logger) *UnisRpcService { + res := new(UnisRpcService) + + res.Logger = log + + return res +} + +func (u *UnisRpcService) Config(res models.UnisRpcRequest, rsp *models.UnisRpcResponse) error { + u.Sugar().Info("rpc server get mesage", res) + + rsp.Id = res.Id + + return nil +} + +type unisRpcServer struct { + log *logger.Logger + port int + host string + typec string +} + +func newUnisRpcServer(log *logger.Logger, conf *settings.RpcConfig) *unisRpcServer { + res := new(unisRpcServer) + res.log = log + + res.port = conf.Port + res.host = conf.Host + res.typec = conf.Type + + return res +} + +func (u *unisRpcServer) rpcListenAndServe(ctx context.Context) error { + addrees := fmt.Sprintf("%s:%d", u.host, u.port) + + return rpcsup.JsonServer(ctx, u.log, addrees, NewRpcService(u.log)) +} diff --git a/settings/type.go b/settings/type.go index f95f466..3177a3d 100644 --- a/settings/type.go +++ b/settings/type.go @@ -7,7 +7,7 @@ type AppConfig struct { *RateLimitConfig `mapstructure:"rate"` *JwtConfig `mapstructure:"jwt"` *SqlConfig `mapstructure:"database"` - *RpcConfig `mapstructure:"rpc"` + *UnisConfig `mapstructure:"unis"` } type BaseConfig struct { @@ -35,6 +35,10 @@ type SqlConfig struct { Type string `mapstructure:"type"` } +type UnisConfig struct { + *RpcConfig `mapstructure:"rpc"` +} + type RpcConfig struct { Port int `mapstructure:"port"` Host string `mapstructure:"host"` diff --git a/utils/runtime.go b/utils/runtime.go new file mode 100644 index 0000000..fe22460 --- /dev/null +++ b/utils/runtime.go @@ -0,0 +1,12 @@ +package utils + +import ( + "reflect" + "runtime" +) + +func GetFuncName(fn interface{}) string { + ptr := reflect.ValueOf(fn).Pointer() + + return runtime.FuncForPC(ptr).Name() +}