1、增加座席业务框架。

This commit is contained in:
redhat 2025-05-27 19:55:35 +08:00
parent 38fb2a6df8
commit 784a868d05
28 changed files with 1008 additions and 249 deletions

View File

@ -22,7 +22,8 @@ database:
port: 3306
max_conns: 200
rpc:
unis:
rpc:
type: "json"
host: ""
port: 5501

View File

@ -1,27 +0,0 @@
package unis
import (
"dashboard/models"
"dashboard/services/unis"
"net/http"
"github.com/gin-gonic/gin"
)
func StationConfig(c *gin.Context) error {
var config models.StationConfigParams
if err := c.ShouldBindJSON(&config); err != nil {
return err
}
if err := unis.StationConfig(&config); err != nil {
return err
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"msg": "Server station config ok",
})
return nil
}

View File

@ -1,5 +0,0 @@
package unis
const (
UNIS_HTTP_URL_CONFIG_ADD = "/api/unis/config/v1/add"
)

View File

@ -0,0 +1,76 @@
package unisc
import (
"dashboard/dao/dadis/unisd"
"dashboard/models"
"net/http"
"reflect"
"time"
"github.com/gin-gonic/gin"
)
type httpHandle struct {
httpC chan *models.UnisHttpRequest
}
func newhttpHandle(httpc chan *models.UnisHttpRequest) *httpHandle {
res := new(httpHandle)
res.httpC = httpc
return res
}
func httpSendWithExpire(httpc chan *models.UnisHttpRequest, data *models.UnisHttpRequest) (*models.UnisHttpResponse, error) {
resc := make(chan *models.UnisHttpResponse)
data.ResC = resc
select {
case httpc <- data:
case <-time.After(2 * time.Second):
return nil, models.ErrorTimeOut
}
select {
case resp := <-resc:
return resp, nil
case <-time.After(2 * time.Second):
return nil, models.ErrorTimeOut
}
}
func (u *httpHandle) stationConfig(c *gin.Context) error {
config := new(models.StationConfigParams)
if err := c.ShouldBindJSON(config); err != nil {
return err
}
if !config.StationConfig.ConfigType {
res, ok := unisd.UnisDataGet(string(unisd.DadisKey_UnisStationInfo))
if ok {
oldCfg := res.(*models.StationConfigParams)
oldCfg.StationConfig.ConfigType = config.StationConfig.ConfigType
if reflect.DeepEqual(config, oldCfg) {
c.JSON(http.StatusOK, models.UnisHttpResponseOk)
return nil
}
}
}
req := &models.UnisHttpRequest{
Id: models.UnisHttpUrl(c.Request.URL.String()).GetMsgId(),
Msg: config,
}
resp, err := httpSendWithExpire(u.httpC, req)
if err != nil {
return err
}
c.JSON(http.StatusOK, resp)
return nil
}

View File

@ -0,0 +1,128 @@
package unisc
import (
"bytes"
"dashboard/dao/sqldb"
"dashboard/models"
"dashboard/routes"
"dashboard/services/uniss"
"dashboard/utils"
"io"
"net/http"
"strconv"
"github.com/gin-gonic/gin"
)
type HttpRoute struct {
httpHandle *httpHandle
middleWare *middleWare
}
func NewHttpRoute(httpc chan *models.UnisHttpRequest) *HttpRoute {
res := new(HttpRoute)
res.httpHandle = newhttpHandle(httpc)
res.middleWare = newmiddleWare()
return res
}
func (u *HttpRoute) AddRoute(r *gin.Engine) {
unisr := r.Group("/api/unis")
unisr.Use(routes.ErrWapper(u.middleWare.GinCheckServerId), routes.ErrWapper(u.middleWare.GinStoreRequest))
{
unisr.POST("/config/v1/add", routes.ErrWapper(u.httpHandle.stationConfig))
}
}
type middleWare struct {
}
func newmiddleWare() *middleWare {
return new(middleWare)
}
func (m *middleWare) GinCheckServerId(c *gin.Context) error {
serverId := c.GetHeader("serverId")
if serverId != serverId {
c.Abort()
return &models.BaseError{
Code: http.StatusServiceUnavailable,
Msg: "Server Id error",
}
}
channel, err := strconv.Atoi(c.GetHeader("channel"))
if err != nil {
c.Abort()
return &models.BaseError{
Code: http.StatusServiceUnavailable,
Msg: "Channel Id error",
}
}
c.Set(models.GinContextServerId, serverId)
c.Set(models.GinContextChannel, channel)
c.Next()
return nil
}
func (m *middleWare) GinStoreRequest(c *gin.Context) (err error) {
defer func() {
if err != nil {
c.Abort()
return
}
c.Next()
}()
log, _ := utils.GetLogFromContext(c)
channel := c.GetInt(models.GinContextChannel)
oldConf, _err := sqldb.ResFulDataGet(c.Request.Method, c.Request.URL.String(), channel)
if _err != nil {
err = _err
}
bodyBytes, _err := io.ReadAll(c.Request.Body)
if _err != nil {
err = _err
}
c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
if oldConf != string(bodyBytes) {
log.Sugar().Info("The resful request is not equel local store")
if _err := sqldb.ResFulDataStore(c.Request.Method, c.Request.URL.String(), string(bodyBytes), channel); err != nil {
err = _err
}
}
return nil
}
func (m *middleWare) GinWithUnisObject(unis *uniss.UnisStation) gin.HandlerFunc {
return func(c *gin.Context) {
c.Set(models.GinContextUnis, unis)
c.Next()
}
}
func (m *middleWare) GetUnisObjectFromContext(c *gin.Context) (*uniss.UnisStation, error) {
res, ok := c.Get(models.GinContextUnis)
if !ok {
return nil, models.ErrorInvalidData
}
unis, ok := res.(*uniss.UnisStation)
if !ok {
return nil, models.ErrorInvalidData
}
return unis, nil
}

View File

@ -0,0 +1,16 @@
package unisd
import (
"dashboard/pkg/dadis"
"time"
)
var unisDataCenter = dadis.NewDadis()
func UnisDataSet(key string, value interface{}, expire time.Duration) {
unisDataCenter.Set(key, value, expire)
}
func UnisDataGet(key string) (interface{}, bool) {
return unisDataCenter.Get(key)
}

8
dao/dadis/unisd/type.go Normal file
View File

@ -0,0 +1,8 @@
package unisd
type DadisKey string
const (
DadisKey_UnisSystemInfo DadisKey = "UnisSystemInfo"
DadisKey_UnisStationInfo DadisKey = "UnisStationInfo"
)

11
go.mod
View File

@ -9,6 +9,7 @@ require (
github.com/creack/pty v1.1.24
github.com/fsnotify/fsnotify v1.9.0
github.com/gin-contrib/cors v1.7.5
github.com/gin-contrib/pprof v1.5.3
github.com/gin-gonic/gin v1.10.0
github.com/go-sql-driver/mysql v1.8.1
github.com/golang-jwt/jwt/v5 v5.2.2
@ -21,7 +22,7 @@ require (
github.com/shirou/gopsutil/v4 v4.25.4
github.com/spf13/viper v1.20.1
go.uber.org/zap v1.27.0
golang.org/x/sync v0.12.0
golang.org/x/sync v0.13.0
)
require (
@ -60,11 +61,11 @@ require (
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/arch v0.15.0 // indirect
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/arch v0.16.0 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect

22
go.sum
View File

@ -27,6 +27,8 @@ github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3G
github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
github.com/gin-contrib/cors v1.7.5 h1:cXC9SmofOrRg0w9PigwGlHG3ztswH6bqq4vJVXnvYMk=
github.com/gin-contrib/cors v1.7.5/go.mod h1:4q3yi7xBEDDWKapjT2o1V7mScKDDr8k+jZ0fSquGoy0=
github.com/gin-contrib/pprof v1.5.3 h1:Bj5SxJ3kQDVez/s/+f9+meedJIqLS+xlkIVDe/lcvgM=
github.com/gin-contrib/pprof v1.5.3/go.mod h1:0+LQSZ4SLO0B6+2n6JBzaEygpTBxe/nI+YEYpfQQ6xY=
github.com/gin-contrib/sse v1.0.0 h1:y3bT1mUWUxDpW4JLQg/HnTqV4rozuW4tC9eFKTxYI9E=
github.com/gin-contrib/sse v1.0.0/go.mod h1:zNuFdwarAygJBht0NTKiSi3jRf6RbqeILZ9Sp6Slhe0=
github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU=
@ -141,23 +143,23 @@ go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/arch v0.15.0 h1:QtOrQd0bTUnhNVNndMpLHNWrDmYzZ2KDqSrEymqInZw=
golang.org/x/arch v0.15.0/go.mod h1:JmwW7aLIoRUKgaTzhkiEFxvcEiQGyOg9BMonBJUS7EE=
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
golang.org/x/arch v0.16.0 h1:foMtLTdyOmIniqWCHjY6+JxuC54XP1fDwx4N0ASyW+U=
golang.org/x/arch v0.16.0/go.mod h1:JmwW7aLIoRUKgaTzhkiEFxvcEiQGyOg9BMonBJUS7EE=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=

31
main.go
View File

@ -2,21 +2,15 @@ package main
import (
"context"
"dashboard/controller/unisc"
"dashboard/dao/sqldb"
"dashboard/logger"
"dashboard/pkg/errgroups"
"dashboard/routes"
"dashboard/services/unis"
"dashboard/services/uniss"
"dashboard/settings"
"errors"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"go.uber.org/zap"
)
var config = flag.String("f", "./config/config.yaml", "config file path")
@ -54,16 +48,22 @@ func main() {
log.Sugar().Panicf("New db error: %v", err)
}
err = unis.RpcListenAndServe(log, sets.RpcConfig)
if err != nil {
log.Sugar().Panicf("New db error: %v", err)
}
errwg := errgroups.NewErrGroup()
r := routes.Setup(log, *sets.RateLimitConfig, *sets.JwtConfig)
// r := routes.Setup(log, *sets.RateLimitConfig, *sets.JwtConfig)
listenAndServe(fmt.Sprintf(":%d", sets.BaseConfig.Port), r, log)
// listenAndServe(fmt.Sprintf(":%d", sets.BaseConfig.Port), r, log)
sunis := uniss.NewUnis(log, sets.UnisConfig)
errwg.Add(routes.NewRouter(fmt.Sprintf(":%d", sets.BaseConfig.Port),
log, *sets.RateLimitConfig, *sets.JwtConfig,
unisc.NewHttpRoute(sunis.GetHttpChannel())))
errwg.Add(sunis)
log.Sugar().Fatalln(errwg.Run(context.Background()))
}
/*
func listenAndServe(addr string, handle http.Handler, log *logger.Logger) {
srv := &http.Server{
Addr: addr,
@ -97,3 +97,4 @@ func listenAndServe(addr string, handle http.Handler, log *logger.Logger) {
log.Info("Server exiting")
}
*/

View File

@ -9,6 +9,7 @@ const (
GinContextLog = "log"
GinContextChannel = "channel"
GinContextServerId = "serverid"
GinContextUnis = "unis"
)
const (
@ -20,6 +21,7 @@ var (
ErrorInvalidData = errors.New("no such value")
ErrorPasswordErr = errors.New("user or password invalid")
ErrorSqlInitErr = errors.New("database init err")
ErrorTimeOut = errors.New("time out")
)
const (

35
models/unishttpparams.go Normal file
View File

@ -0,0 +1,35 @@
package models
type UnisHttpRequest struct {
ResC chan *UnisHttpResponse
Id string
Msg interface{}
}
type UnisHttpResponse struct {
Code int
Msg interface{}
Data interface{}
}
type UnisHttpUrl string
const (
UNIS_HTTP_URL_CONFIG_ADD UnisHttpUrl = "/api/unis/config/v1/add"
)
func (u UnisHttpUrl) GetMsgId() string {
return mapHttpUrlId[u]
}
type UnisHttpMsgId string
const (
UNIS_HTTP_ID_CONFIG_ADD UnisHttpMsgId = "/api/unis/config/v1/add"
)
var mapHttpUrlId = map[UnisHttpUrl]string{
UNIS_HTTP_URL_CONFIG_ADD: string(UNIS_HTTP_ID_CONFIG_ADD),
}
var UnisHttpResponseOk = &UnisHttpResponse{Code: int(CodeSuccess), Msg: CodeSuccess.String()}

115
pkg/errgroups/errgroups.go Normal file
View File

@ -0,0 +1,115 @@
package errgroups
import (
"context"
"dashboard/utils"
"fmt"
"sync"
"golang.org/x/sync/errgroup"
)
type ErrGroupFunc struct {
wg *errgroup.Group
mu sync.RWMutex
funs map[string]FuncWithErrFunc
}
type FuncWithErrFunc func(context.Context) error
func NewErrGroupFunc() *ErrGroupFunc {
return &ErrGroupFunc{
funs: make(map[string]FuncWithErrFunc),
}
}
func (e *ErrGroupFunc) Run(ctx context.Context) error {
e.mu.RLock()
defer e.mu.RUnlock()
e.wg, ctx = errgroup.WithContext(ctx)
for name, fn := range e.funs {
fn := fn
name := name
e.wg.Go(func() error {
defer fmt.Printf("func: %s stop\n", name)
fmt.Printf("func: %s runing...\n", name)
return fn(ctx)
})
}
if err := e.wg.Wait(); err != nil {
return err
}
return nil
}
func (e *ErrGroupFunc) Add(fn FuncWithErrFunc) {
e.mu.Lock()
defer e.mu.Unlock()
fnStr := utils.GetFuncName(fn)
e.funs[fnStr] = fn
}
func (e *ErrGroupFunc) Del(fn FuncWithErrFunc) {
e.mu.Lock()
defer e.mu.Unlock()
fnStr := utils.GetFuncName(fn)
delete(e.funs, fnStr)
}
type ErrGroup struct {
wg *errgroup.Group
mu sync.RWMutex
funs map[FuncWithErr]struct{}
}
type FuncWithErr interface {
Run(context.Context) error
}
func NewErrGroup() *ErrGroup {
return &ErrGroup{
funs: make(map[FuncWithErr]struct{}),
}
}
func (e *ErrGroup) Run(ctx context.Context) error {
e.mu.RLock()
defer e.mu.RUnlock()
e.wg, ctx = errgroup.WithContext(ctx)
for fn := range e.funs {
fn := fn
e.wg.Go(func() error {
return fn.Run(ctx)
})
}
if err := e.wg.Wait(); err != nil {
return err
}
return nil
}
func (e *ErrGroup) Add(fn FuncWithErr) {
e.mu.Lock()
defer e.mu.Unlock()
e.funs[fn] = struct{}{}
}
func (e *ErrGroup) Del(fn FuncWithErr) {
e.mu.Lock()
defer e.mu.Unlock()
delete(e.funs, fn)
}

View File

@ -0,0 +1,78 @@
package errgroups
import (
"context"
"errors"
"fmt"
"testing"
"time"
)
func fn_test1(ctx context.Context) error {
select {
case <-ctx.Done():
fmt.Println("fn_test1 called cancel")
return ctx.Err()
case <-time.After(time.Second):
fmt.Println("fn_test1 called ok")
}
return nil
}
func fn_test2(ctx context.Context) error {
fmt.Println("fn_test2 called")
return nil
}
func fn_test3(ctx context.Context) error {
fmt.Println("fn_test3 called")
return errors.New("call fn_test3 get error")
}
type err_test1 struct{}
func(e *err_test1)Run(ctx context.Context)error{
return fn_test1(ctx)
}
type err_test2 struct{}
func(e *err_test2)Run(ctx context.Context)error{
return fn_test2(ctx)
}
type err_test3 struct{}
func(e *err_test3)Run(ctx context.Context)error{
return fn_test3(ctx)
}
func Test_errGroupsFunc(t *testing.T) {
errgws := NewErrGroupFunc()
errgws.Add(fn_test1)
errgws.Add(fn_test2)
errgws.Add(fn_test3)
err := errgws.Run(context.Background())
if err != nil {
t.Errorf("err: %v", err)
}
t.Log("ok")
}
func Test_errGroups(t *testing.T) {
errgws := NewErrGroup()
errgws.Add(&err_test1{})
errgws.Add(&err_test2{})
errgws.Add(&err_test3{})
err := errgws.Run(context.Background())
if err != nil {
t.Errorf("err: %v", err)
}
t.Log("ok")
}

View File

@ -1,17 +1,23 @@
package rpcsup
import (
"context"
"dashboard/logger"
"errors"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"golang.org/x/sync/errgroup"
)
func JsonServer(log *logger.Logger, address string, service interface{}) error {
if err := rpc.Register(service); err != nil {
func JsonServer(ctx context.Context, log *logger.Logger, address string, service ...interface{}) error {
for _, re := range service {
if err := rpc.Register(re); err != nil {
log.Sugar().Error(err)
return err
}
}
listen, err := net.Listen("tcp", address)
if err != nil {
@ -21,15 +27,39 @@ func JsonServer(log *logger.Logger, address string, service interface{}) error {
log.Sugar().Infof("Rpc server listen on: %s", address)
wg, ctxx := errgroup.WithContext(ctx)
wg.Go(func() error {
for {
accept, err := listen.Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
log.Sugar().Error(err)
break
}
log.Sugar().Error(err)
continue
}
jsonrpc.ServeConn(accept)
go jsonrpc.ServeConn(accept)
}
return err
})
wg.Go(func() error {
<-ctxx.Done()
log.Sugar().Errorf("Unis Rpc Server cancel by ctx")
listen.Close()
return ctxx.Err()
})
if err := wg.Wait(); err != nil {
return err
}
return nil
}
func JsonClient(log *logger.Logger, address string) (*rpc.Client, error) {

View File

@ -1,21 +1,16 @@
package routes
import (
"bytes"
"dashboard/dao/sqldb"
"dashboard/logger"
"dashboard/models"
"dashboard/pkg/jwt"
"dashboard/pkg/rate"
"dashboard/settings"
"dashboard/utils"
"io"
"net"
"net/http"
"net/http/httputil"
"os"
"runtime/debug"
"strconv"
"strings"
"time"
@ -160,66 +155,3 @@ func GinRateLimit(rateC settings.RateLimitConfig) appHandler {
return nil
}
}
func GinCheckServerId(c *gin.Context) error {
serverId := c.GetHeader("serverId")
if serverId != serverId {
c.Abort()
return &models.BaseError{
Code: http.StatusServiceUnavailable,
Msg: "Server Id error",
}
}
channel, err := strconv.Atoi(c.GetHeader("channel"))
if err != nil {
c.Abort()
return &models.BaseError{
Code: http.StatusServiceUnavailable,
Msg: "Channel Id error",
}
}
c.Set(models.GinContextServerId, serverId)
c.Set(models.GinContextChannel, channel)
c.Next()
return nil
}
func GinStoreRequest(c *gin.Context) (err error) {
defer func() {
if err != nil {
c.Abort()
return
}
c.Next()
}()
log, _ := utils.GetLogFromContext(c)
channel := c.GetInt(models.GinContextChannel)
oldConf, _err := sqldb.ResFulDataGet(c.Request.Method, c.Request.URL.String(), channel)
if _err != nil {
err = _err
}
bodyBytes, _err := io.ReadAll(c.Request.Body)
if _err != nil {
err = _err
}
c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
if oldConf != string(bodyBytes) {
log.Sugar().Info("The resful request is not equel local store")
if _err := sqldb.ResFulDataStore(c.Request.Method, c.Request.URL.String(), string(bodyBytes), channel); err != nil {
err = _err
}
}
return nil
}

View File

@ -1,17 +1,142 @@
package routes
import (
"context"
"dashboard/controller"
"dashboard/controller/unis"
"dashboard/logger"
"dashboard/pkg/jwt"
"dashboard/settings"
"errors"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/gin-contrib/cors"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
type Router struct {
r *gin.Engine
addr string
log *logger.Logger
}
type AddRouter interface {
AddRoute(*gin.Engine)
}
func NewRouter(addr string, log *logger.Logger, rate settings.RateLimitConfig, jwtC settings.JwtConfig, routes ...AddRouter) *Router {
cjwt := jwt.New(jwt.WithSalt(jwtC.Salt), jwt.WithExpire(time.Duration(jwtC.Expire)*time.Second))
r := gin.New()
r.Use(cors.New(cors.Config{
AllowOrigins: []string{"*"},
AllowMethods: []string{"GET", "POST"},
AllowHeaders: []string{"Origin", "Content-Type"},
ExposeHeaders: []string{"Content-Length"},
AllowCredentials: true,
MaxAge: 12 * time.Hour,
}))
r.Use(GinLogger(log), GinRecovery(log, true), GinLog(log), errWapper(GinRateLimit(rate)))
// 静态文件服务
r.Static("/static", "./static")
r.StaticFile("/", "./static/index.html")
r.GET("login", controller.LoginPage)
r.POST("/api/login", errWapper(controller.UserSignInHandler(cjwt)))
r.POST("/api/logout", controller.UserLogOutHandler)
g1 := r.Group("/api")
g1.Use(errWapper(GinJwtAuthor(cjwt)))
{
g1.GET("/system-info", errWapper(controller.SystemInfoHandle))
g1.POST("/upload", errWapper(controller.FileUploadHandle))
g1.GET("/files", errWapper(controller.FileListHandle))
g1.GET("/download", errWapper(controller.FileDownloadHandle))
}
r.GET("/ws/terminal", errWapper(GinJwtAuthor(cjwt)), errWapper(controller.TerminalHandle()))
for _, route := range routes {
route.AddRoute(r)
}
pprof.Register(r)
return &Router{
r: r,
addr: addr,
log: log,
}
}
func (r *Router) Run(ctx context.Context) error {
srv := &http.Server{
Addr: r.addr,
Handler: r.r,
}
r.log.Sugar().Infof("Server listen on %s", r.addr)
wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
// 开启一个goroutine启动服务
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
r.log.Error("listen: %s\n", zap.Error(err))
return err
}
return nil
})
wg.Go(func() error {
// 等待中断信号来优雅地关闭服务器为关闭服务器操作设置一个5秒的超时
quit := make(chan os.Signal, 1) // 创建一个接收信号的通道
// kill 默认会发送 syscall.SIGTERM 信号
// kill -2 发送 syscall.SIGINT 信号我们常用的Ctrl+C就是触发系统SIGINT信号
// kill -9 发送 syscall.SIGKILL 信号,但是不能被捕获,所以不需要添加它
// signal.Notify把收到的 syscall.SIGINT或syscall.SIGTERM 信号转发给quit
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) // 此处不会阻塞
select {
case <-ctx.Done():
r.log.Error("Http Server Cancel by ctx")
case <-quit:
}
// 阻塞在此,当接收到上述两种信号时才会往下执行
r.log.Info("Shutdown Server ...")
// 创建一个5秒超时的context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 5秒内优雅关闭服务将未处理完的请求处理完再关闭服务超过5秒就超时退出
if err := srv.Shutdown(ctx); err != nil {
r.log.Error("Server Shutdown: ", zap.Error(err))
return err
}
r.log.Info("Server exiting")
return errors.New("Server Shutdown")
})
if err := wg.Wait(); err != nil {
return err
}
return nil
}
/*
func Setup(log *logger.Logger, rate settings.RateLimitConfig, jwtC settings.JwtConfig) *gin.Engine {
cjwt := jwt.New(jwt.WithSalt(jwtC.Salt), jwt.WithExpire(time.Duration(jwtC.Expire)*time.Second))
@ -48,11 +173,12 @@ func Setup(log *logger.Logger, rate settings.RateLimitConfig, jwtC settings.JwtC
r.GET("/ws/terminal", errWapper(GinJwtAuthor(cjwt)), errWapper(controller.TerminalHandle()))
unisr := r.Group("/api/unis")
unisr.Use(errWapper(GinCheckServerId), errWapper(GinStoreRequest))
{
unisr.POST("/config/v1/add", errWapper(unis.StationConfig))
}
// unisr := r.Group("/api/unis")
// unisr.Use(errWapper(GinCheckServerId), errWapper(GinStoreRequest))
// {
// unisr.POST("/config/v1/add", errWapper(unis.StationConfig))
// }
return r
}
*/

View File

@ -12,6 +12,8 @@ import (
type appHandler func(*gin.Context) error
type AppHandler func(*gin.Context) error
func errWapper(appH appHandler) gin.HandlerFunc {
return func(c *gin.Context) {
log, _ := utils.GetLogFromContext(c)
@ -35,3 +37,27 @@ func errWapper(appH appHandler) gin.HandlerFunc {
}
}
}
func ErrWapper(appH appHandler) gin.HandlerFunc {
return func(c *gin.Context) {
log, _ := utils.GetLogFromContext(c)
err := appH(c)
if err != nil {
var baseErr *models.BaseError
if errors.As(err, &baseErr) {
log.Error("Base error", zap.Any("baseErr", baseErr))
c.JSON(http.StatusBadRequest, baseErr)
return
}
log.Error("Other error", zap.Error(err))
c.String(http.StatusBadGateway, err.Error())
return
}
}
}

View File

@ -1,46 +0,0 @@
package unis
import (
"dashboard/models"
"dashboard/pkg/rpcsup"
"fmt"
"net/rpc"
)
type rpcClients map[string]*rpc.Client
var rpcMapClients rpcClients
func StationConfig(config *models.StationConfigParams) error {
CreateRpcClients(&rpcMapClients, &config.StationConfig.Screens)
fmt.Println(GetConfig(models.UnisRpcRequest{Id: "zhangshuo"}))
return nil
}
func CreateRpcClients(rawClients *rpcClients, screens *[]models.Screens) {
if *rawClients == nil {
*rawClients = make(map[string]*rpc.Client)
}
for _, cli := range *rawClients {
if cli != nil {
cli.Close()
}
}
for _, sc := range *screens {
id := sc.NodeID
address := fmt.Sprintf("%s:%d", sc.NodeIP, 5501)
cli, err := rpcsup.JsonClient(log, address)
if err != nil {
log.Sugar().Errorf("%s create rpc client error: %s", id, address)
continue
}
(*rawClients)[id] = cli
}
return
}

View File

@ -1,17 +0,0 @@
package unis
import (
"dashboard/models"
"fmt"
)
func GetConfig(req models.UnisRpcRequest) (*models.UnisRpcResponse, error) {
out := &models.UnisRpcResponse{}
for key, value := range rpcMapClients {
fmt.Println(key)
value.Call(models.UnisStationConfig.Methord(), req, out)
}
return out, nil
}

View File

@ -1,35 +0,0 @@
package unis
import (
"dashboard/logger"
"dashboard/models"
"dashboard/pkg/rpcsup"
"dashboard/settings"
"fmt"
)
var log *logger.Logger
type UnisRpcService struct {
}
func NewRpcService() *UnisRpcService {
return &UnisRpcService{}
}
func RpcListenAndServe(_log *logger.Logger, config *settings.RpcConfig) error {
log = _log
addrees := fmt.Sprintf("%s:%d", config.Host, config.Port)
go rpcsup.JsonServer(log, addrees, NewRpcService())
return nil
}
func (u *UnisRpcService) Config(res models.UnisRpcRequest, rsp *models.UnisRpcResponse) error {
fmt.Println("rpc server get mesage",res)
rsp.Id = res.Id
return nil
}

47
services/uniss/common.go Normal file
View File

@ -0,0 +1,47 @@
package uniss
import (
"dashboard/dao/dadis/unisd"
"dashboard/models"
"fmt"
)
type commonService struct {
rpcClients *rpcClients
}
func newcommonService(http *httpHandle) *commonService {
res := new(commonService)
http.pushHandle(string(models.UNIS_HTTP_ID_CONFIG_ADD), res.stationConfig)
return res
}
func (c *commonService) stationConfig(reqest *models.UnisHttpRequest) (*models.UnisHttpResponse, error) {
req := reqest.Msg.(*models.StationConfigParams)
fmt.Println(req)
var addres []*rpcIdAddres
for _, sc := range req.StationConfig.Screens {
add := new(rpcIdAddres)
add.host = sc.NodeIP
add.id = sc.NodeID
addres = append(addres, add)
}
c.rpcClients.flushRpcClients(addres)
fmt.Println(c.rpcClients.getConfig(addres[0].id, models.UnisRpcRequest{Id: "zhangshuo"}))
res := &models.UnisHttpResponse{
Code: int(models.CodeSuccess),
Msg: "config ok",
}
unisd.UnisDataSet(string(unisd.DadisKey_UnisStationInfo), req, 0)
return res, nil
}

View File

@ -0,0 +1,66 @@
package uniss
import (
"dashboard/models"
"sync"
)
type httpHandle struct {
httpC chan *models.UnisHttpRequest
mu sync.RWMutex
mapFn map[string]HttpHandler
}
type HttpHandler func(*models.UnisHttpRequest) (*models.UnisHttpResponse, error)
func newhttpHandle() *httpHandle {
res := new(httpHandle)
res.httpC = make(chan *models.UnisHttpRequest)
res.mapFn = make(map[string]HttpHandler)
return res
}
func (u *httpHandle) httpHandle(msg *models.UnisHttpRequest) {
var res *models.UnisHttpResponse
resC := msg.ResC
defer func() {
if resC != nil {
resC <- res
}
}()
fn, ok := u.getHandle(msg.Id)
if ok {
if re, err := fn(msg); err == nil {
res = re
}
}
}
func (u *httpHandle) pushHandle(key string, handle HttpHandler) {
u.mu.Lock()
defer u.mu.Unlock()
u.mapFn[key] = handle
}
func (u *httpHandle) delHandle(key string) {
u.mu.Lock()
defer u.mu.Unlock()
delete(u.mapFn, key)
}
func (u *httpHandle) getHandle(key string) (HttpHandler, bool) {
u.mu.RLock()
defer u.mu.RUnlock()
res, ok := u.mapFn[key]
if !ok {
return nil, false
}
return res, ok
}

58
services/uniss/main.go Normal file
View File

@ -0,0 +1,58 @@
package uniss
import (
"context"
"dashboard/logger"
"dashboard/models"
"dashboard/pkg/errgroups"
"dashboard/settings"
)
type UnisStation struct {
log *logger.Logger
conf *settings.UnisConfig
httpHandle *httpHandle
commonService *commonService
}
func NewUnis(log *logger.Logger, conf *settings.UnisConfig) *UnisStation {
res := new(UnisStation)
res.log = log
res.conf = conf
res.httpHandle = newhttpHandle()
res.commonService = newcommonService(res.httpHandle)
res.commonService.rpcClients = newRpcClients(log, conf.RpcConfig.Port)
return res
}
func (u *UnisStation) GetHttpChannel() chan *models.UnisHttpRequest {
return u.httpHandle.httpC
}
func (u *UnisStation) mesageHandle(ctx context.Context) error {
for {
select {
case <-ctx.Done():
u.log.Error("Unis mesageHandle cancel by ctx")
return ctx.Err()
case httpMsg := <-u.httpHandle.httpC:
u.httpHandle.httpHandle(httpMsg)
// default:
}
}
}
func (u *UnisStation) Run(ctx context.Context) error {
rpc := newUnisRpcServer(u.log, u.conf.RpcConfig)
ewgs := errgroups.NewErrGroupFunc()
ewgs.Add(rpc.rpcListenAndServe)
ewgs.Add(u.mesageHandle)
return ewgs.Run(ctx)
}

View File

@ -0,0 +1,71 @@
package uniss
import (
"dashboard/logger"
"dashboard/models"
"dashboard/pkg/rpcsup"
"fmt"
"net/rpc"
"sync"
)
type rpcClients struct {
*logger.Logger
mu sync.RWMutex
rpcMapClients map[string]*rpc.Client
port int
}
type rpcIdAddres struct {
id string
host string
port int
}
func newRpcClients(log *logger.Logger, mport int) *rpcClients {
res := new(rpcClients)
res.Logger = log
res.port = mport
res.rpcMapClients = make(map[string]*rpc.Client)
return res
}
func (r *rpcClients) flushRpcClients(ids []*rpcIdAddres) {
r.mu.Lock()
defer r.mu.Unlock()
for _, cli := range r.rpcMapClients {
if cli != nil {
cli.Close()
}
}
for _, idAddr := range ids {
address := fmt.Sprintf("%s:%d", idAddr.host, r.port)
cli, err := rpcsup.JsonClient(r.Logger, address)
if err != nil {
r.Sugar().Errorf("%s create rpc client error: %s", idAddr.id, address)
continue
}
r.rpcMapClients[idAddr.id] = cli
}
}
func (r *rpcClients) getConfig(id string, req models.UnisRpcRequest) (*models.UnisRpcResponse, error) {
r.mu.RLock()
defer r.mu.RUnlock()
out := &models.UnisRpcResponse{}
if client := r.rpcMapClients[id]; client != nil {
if err := client.Call(models.UnisStationConfig.Methord(), req, out); err != nil {
return nil, err
}
}
return out, nil
}

View File

@ -0,0 +1,54 @@
package uniss
import (
"context"
"dashboard/logger"
"dashboard/models"
"dashboard/pkg/rpcsup"
"dashboard/settings"
"fmt"
)
type UnisRpcService struct {
*logger.Logger
}
func NewRpcService(log *logger.Logger) *UnisRpcService {
res := new(UnisRpcService)
res.Logger = log
return res
}
func (u *UnisRpcService) Config(res models.UnisRpcRequest, rsp *models.UnisRpcResponse) error {
u.Sugar().Info("rpc server get mesage", res)
rsp.Id = res.Id
return nil
}
type unisRpcServer struct {
log *logger.Logger
port int
host string
typec string
}
func newUnisRpcServer(log *logger.Logger, conf *settings.RpcConfig) *unisRpcServer {
res := new(unisRpcServer)
res.log = log
res.port = conf.Port
res.host = conf.Host
res.typec = conf.Type
return res
}
func (u *unisRpcServer) rpcListenAndServe(ctx context.Context) error {
addrees := fmt.Sprintf("%s:%d", u.host, u.port)
return rpcsup.JsonServer(ctx, u.log, addrees, NewRpcService(u.log))
}

View File

@ -7,7 +7,7 @@ type AppConfig struct {
*RateLimitConfig `mapstructure:"rate"`
*JwtConfig `mapstructure:"jwt"`
*SqlConfig `mapstructure:"database"`
*RpcConfig `mapstructure:"rpc"`
*UnisConfig `mapstructure:"unis"`
}
type BaseConfig struct {
@ -35,6 +35,10 @@ type SqlConfig struct {
Type string `mapstructure:"type"`
}
type UnisConfig struct {
*RpcConfig `mapstructure:"rpc"`
}
type RpcConfig struct {
Port int `mapstructure:"port"`
Host string `mapstructure:"host"`

12
utils/runtime.go Normal file
View File

@ -0,0 +1,12 @@
package utils
import (
"reflect"
"runtime"
)
func GetFuncName(fn interface{}) string {
ptr := reflect.ValueOf(fn).Pointer()
return runtime.FuncForPC(ptr).Name()
}