| @@ -1,6 +1,7 @@ | |||||
| package http | package http | ||||
| import ( | import ( | ||||
| "bytes" | |||||
| "context" | "context" | ||||
| "fmt" | "fmt" | ||||
| "github.com/gin-gonic/gin" | "github.com/gin-gonic/gin" | ||||
| @@ -11,13 +12,13 @@ import ( | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | ||||
| "gitlink.org.cn/cloudream/common/utils/serder" | "gitlink.org.cn/cloudream/common/utils/serder" | ||||
| "io" | "io" | ||||
| "io/ioutil" | |||||
| "net/http" | "net/http" | ||||
| "time" | "time" | ||||
| ) | ) | ||||
| type IOService struct { | type IOService struct { | ||||
| *Server | *Server | ||||
| swWorker *exec.Worker | |||||
| } | } | ||||
| func (s *Server) IOSvc() *IOService { | func (s *Server) IOSvc() *IOService { | ||||
| @@ -27,12 +28,8 @@ func (s *Server) IOSvc() *IOService { | |||||
| } | } | ||||
| func (s *IOService) GetStream(ctx *gin.Context) { | func (s *IOService) GetStream(ctx *gin.Context) { | ||||
| //planID := ctx.Query("plan_id") | |||||
| //varID := ctx.Query("var_id") | |||||
| //signalData := ctx.Query("signal") | |||||
| var req cdssdk.GetStreamReq | var req cdssdk.GetStreamReq | ||||
| if err := ctx.ShouldBindQuery(&req); err != nil { | |||||
| if err := ctx.ShouldBindJSON(&req); err != nil { | |||||
| logger.Warnf("binding body: %s", err.Error()) | logger.Warnf("binding body: %s", err.Error()) | ||||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | ||||
| return | return | ||||
| @@ -47,7 +44,7 @@ func (s *IOService) GetStream(ctx *gin.Context) { | |||||
| c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) | c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) | ||||
| defer cancel() | defer cancel() | ||||
| sw := s.swWorker.FindByIDContexted(c, req.PlanID) | |||||
| sw := s.svc.swWorker.FindByIDContexted(c, req.PlanID) | |||||
| if sw == nil { | if sw == nil { | ||||
| ctx.JSON(http.StatusNotFound, gin.H{"error": "plan not found"}) | ctx.JSON(http.StatusNotFound, gin.H{"error": "plan not found"}) | ||||
| return | return | ||||
| @@ -145,7 +142,7 @@ func (s *IOService) SendStream(ctx *gin.Context) { | |||||
| c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) | c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) | ||||
| defer cancel() | defer cancel() | ||||
| sw := s.swWorker.FindByIDContexted(c, req.PlanID) | |||||
| sw := s.svc.swWorker.FindByIDContexted(c, req.PlanID) | |||||
| if sw == nil { | if sw == nil { | ||||
| ctx.JSON(http.StatusNotFound, gin.H{"error": "plan not found"}) | ctx.JSON(http.StatusNotFound, gin.H{"error": "plan not found"}) | ||||
| return | return | ||||
| @@ -197,6 +194,15 @@ func (s *IOService) SendStream(ctx *gin.Context) { | |||||
| } | } | ||||
| func (s *IOService) ExecuteIOPlan(ctx *gin.Context) { | func (s *IOService) ExecuteIOPlan(ctx *gin.Context) { | ||||
| bodyBytes, err := ioutil.ReadAll(ctx.Request.Body) | |||||
| if err != nil { | |||||
| logger.Warnf("reading body: %s", err.Error()) | |||||
| ctx.JSON(http.StatusInternalServerError, Failed("400", "internal error")) | |||||
| return | |||||
| } | |||||
| println("Received body: %s", string(bodyBytes)) | |||||
| ctx.Request.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes)) // Reset body for subsequent reads | |||||
| var req cdssdk.ExecuteIOPlanReq | var req cdssdk.ExecuteIOPlanReq | ||||
| if err := ctx.ShouldBindJSON(&req); err != nil { | if err := ctx.ShouldBindJSON(&req); err != nil { | ||||
| logger.Warnf("binding body: %s", err.Error()) | logger.Warnf("binding body: %s", err.Error()) | ||||
| @@ -217,8 +223,8 @@ func (s *IOService) ExecuteIOPlan(ctx *gin.Context) { | |||||
| sw := exec.NewExecutor(plan) | sw := exec.NewExecutor(plan) | ||||
| s.swWorker.Add(sw) | |||||
| defer s.swWorker.Remove(sw) | |||||
| s.svc.swWorker.Add(sw) | |||||
| defer s.svc.swWorker.Remove(sw) | |||||
| // 设置上下文超时 | // 设置上下文超时 | ||||
| c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) | c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) | ||||
| @@ -244,7 +250,7 @@ func (s *IOService) SendVar(ctx *gin.Context) { | |||||
| c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) | c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) | ||||
| defer cancel() | defer cancel() | ||||
| sw := s.swWorker.FindByIDContexted(c, req.PlanID) | |||||
| sw := s.svc.swWorker.FindByIDContexted(c, req.PlanID) | |||||
| if sw == nil { | if sw == nil { | ||||
| ctx.JSON(http.StatusNotFound, gin.H{"error": "plan not found"}) | ctx.JSON(http.StatusNotFound, gin.H{"error": "plan not found"}) | ||||
| return | return | ||||
| @@ -272,7 +278,7 @@ func (s *IOService) GetVar(ctx *gin.Context) { | |||||
| c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) | c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) | ||||
| defer cancel() | defer cancel() | ||||
| sw := s.swWorker.FindByIDContexted(c, req.PlanID) | |||||
| sw := s.svc.swWorker.FindByIDContexted(c, req.PlanID) | |||||
| if sw == nil { | if sw == nil { | ||||
| ctx.JSON(http.StatusNotFound, gin.H{"error": "plan not found"}) | ctx.JSON(http.StatusNotFound, gin.H{"error": "plan not found"}) | ||||
| return | return | ||||
| @@ -1,8 +1,13 @@ | |||||
| package http | package http | ||||
| import "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | |||||
| type Service struct { | type Service struct { | ||||
| swWorker *exec.Worker | |||||
| } | } | ||||
| func NewService() (*Service, error) { | |||||
| return &Service{}, nil | |||||
| func NewService(swWorker *exec.Worker) *Service { | |||||
| return &Service{ | |||||
| swWorker: swWorker, | |||||
| } | |||||
| } | } | ||||
| @@ -52,7 +52,9 @@ func main() { | |||||
| stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) | stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) | ||||
| stgglb.InitIPFSPool(&config.Cfg().IPFS) | stgglb.InitIPFSPool(&config.Cfg().IPFS) | ||||
| svc, err := http.NewService() | |||||
| sw := exec.NewWorker() | |||||
| svc := http.NewService(&sw) | |||||
| if err != nil { | if err != nil { | ||||
| logger.Fatalf("new http service failed, err: %s", err.Error()) | logger.Fatalf("new http service failed, err: %s", err.Error()) | ||||
| } | } | ||||
| @@ -108,8 +110,6 @@ func main() { | |||||
| logger.Fatalf("new ipfs failed, err: %s", err.Error()) | logger.Fatalf("new ipfs failed, err: %s", err.Error()) | ||||
| } | } | ||||
| sw := exec.NewWorker() | |||||
| dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol) | dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol) | ||||
| taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat) | taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat) | ||||
| @@ -0,0 +1,24 @@ | |||||
| package db2 | |||||
| import ( | |||||
| _ "github.com/go-sql-driver/mysql" | |||||
| "github.com/sirupsen/logrus" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db/config" | |||||
| "gorm.io/driver/mysql" | |||||
| "gorm.io/gorm" | |||||
| ) | |||||
| type DB struct { | |||||
| db *gorm.DB | |||||
| } | |||||
| func NewDB(cfg *config.Config) (*DB, error) { | |||||
| mydb, err := gorm.Open(mysql.Open(cfg.MakeSourceString()), &gorm.Config{}) | |||||
| if err != nil { | |||||
| logrus.Fatalf("failed to connect to database: %v", err) | |||||
| } | |||||
| return &DB{ | |||||
| db: mydb, | |||||
| }, nil | |||||
| } | |||||
| @@ -0,0 +1,52 @@ | |||||
| package db2 | |||||
| import ( | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| "time" | |||||
| ) | |||||
| type NodeDB struct { | |||||
| *DB | |||||
| } | |||||
| func (nodeDB *DB) Node() *NodeDB { | |||||
| return &NodeDB{DB: nodeDB} | |||||
| } | |||||
| func (nodeDB *NodeDB) GetAllNodes() ([]cdssdk.Node, error) { | |||||
| var ret []cdssdk.Node | |||||
| err := nodeDB.DB.db.Table("node").Find(&ret).Error | |||||
| return ret, err | |||||
| } | |||||
| func (nodeDB *NodeDB) GetByID(nodeID cdssdk.NodeID) (cdssdk.Node, error) { | |||||
| var ret cdssdk.Node | |||||
| err := nodeDB.DB.db.Table("node").Where("NodeID = ?", nodeID).Find(&ret).Error | |||||
| return ret, err | |||||
| } | |||||
| // GetUserNodes 根据用户id查询可用node | |||||
| func (nodeDB *NodeDB) GetUserNodes(userID cdssdk.UserID) ([]cdssdk.Node, error) { | |||||
| var nodes []cdssdk.Node | |||||
| err := nodeDB.DB.db. | |||||
| Table("Node"). | |||||
| Select("Node.*"). | |||||
| Joins("JOIN UserNode ON UserNode.NodeID = Node.NodeID"). | |||||
| Where("UserNode.UserID = ?", userID). | |||||
| Find(&nodes).Error | |||||
| return nodes, err | |||||
| } | |||||
| // UpdateState 更新状态,并且设置上次上报时间为现在 | |||||
| func (nodeDB *NodeDB) UpdateState(nodeID cdssdk.NodeID, state string) error { | |||||
| err := nodeDB.DB.db. | |||||
| Model(&cdssdk.Node{}). | |||||
| Where("NodeID = ?", nodeID). | |||||
| Updates(map[string]interface{}{ | |||||
| "State": state, | |||||
| "LastReportTime": time.Now(), | |||||
| }).Error | |||||
| return err | |||||
| } | |||||
| @@ -3,6 +3,7 @@ package ioswitch2 | |||||
| import ( | import ( | ||||
| "context" | "context" | ||||
| "io" | "io" | ||||
| "strconv" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/types" | "gitlink.org.cn/cloudream/common/pkgs/types" | ||||
| @@ -19,13 +20,10 @@ type HttpHubWorker struct { | |||||
| } | } | ||||
| func (w *HttpHubWorker) NewClient() (exec.WorkerClient, error) { | func (w *HttpHubWorker) NewClient() (exec.WorkerClient, error) { | ||||
| //cli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&w.Node)) | |||||
| //if err != nil { | |||||
| // return nil, err | |||||
| //} | |||||
| addressInfo := w.Node.Address.(*cdssdk.HttpAddressInfo) | |||||
| baseUrl := "http://" + addressInfo.ExternalIP + ":" + strconv.Itoa(addressInfo.Port) | |||||
| config := cdssdk.Config{ | config := cdssdk.Config{ | ||||
| URL: "", | |||||
| URL: baseUrl, | |||||
| } | } | ||||
| pool := cdssdk.NewPool(&config) | pool := cdssdk.NewPool(&config) | ||||
| cli, err := pool.Acquire() | cli, err := pool.Acquire() | ||||
| @@ -253,8 +253,18 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2 | |||||
| } | } | ||||
| if f.Node != nil { | if f.Node != nil { | ||||
| t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: *f.Node}) | |||||
| t.Env().Pinned = true | |||||
| switch typeInfo := f.Node.Address.(type) { | |||||
| case *cdssdk.HttpAddressInfo: | |||||
| t.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Node: *f.Node}) | |||||
| t.Env().Pinned = true | |||||
| case *cdssdk.GRPCAddressInfo: | |||||
| t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: *f.Node}) | |||||
| t.Env().Pinned = true | |||||
| default: | |||||
| return nil, fmt.Errorf("unsupported node address type %T", typeInfo) | |||||
| } | |||||
| } | } | ||||
| return t, nil | return t, nil | ||||
| @@ -27,7 +27,7 @@ func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.Co | |||||
| if msg.NodeIDs == nil { | if msg.NodeIDs == nil { | ||||
| var err error | var err error | ||||
| nodes, err = svc.db.Node().GetAllNodes(svc.db.SQLCtx()) | |||||
| nodes, err = svc.db2.Node().GetAllNodes() | |||||
| if err != nil { | if err != nil { | ||||
| logger.Warnf("getting all nodes: %s", err.Error()) | logger.Warnf("getting all nodes: %s", err.Error()) | ||||
| return nil, mq.Failed(errorcode.OperationFailed, "get all node failed") | return nil, mq.Failed(errorcode.OperationFailed, "get all node failed") | ||||
| @@ -36,7 +36,7 @@ func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.Co | |||||
| } else { | } else { | ||||
| // 可以不用事务 | // 可以不用事务 | ||||
| for _, id := range msg.NodeIDs { | for _, id := range msg.NodeIDs { | ||||
| node, err := svc.db.Node().GetByID(svc.db.SQLCtx(), id) | |||||
| node, err := svc.db2.Node().GetByID(id) | |||||
| if err != nil { | if err != nil { | ||||
| logger.WithField("NodeID", id). | logger.WithField("NodeID", id). | ||||
| Warnf("query node failed, err: %s", err.Error()) | Warnf("query node failed, err: %s", err.Error()) | ||||
| @@ -2,14 +2,17 @@ package mq | |||||
| import ( | import ( | ||||
| mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" | mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" | ||||
| mydb2 "gitlink.org.cn/cloudream/storage/common/pkgs/db2" | |||||
| ) | ) | ||||
| type Service struct { | type Service struct { | ||||
| db *mydb.DB | |||||
| db *mydb.DB | |||||
| db2 *mydb2.DB | |||||
| } | } | ||||
| func NewService(db *mydb.DB) *Service { | |||||
| func NewService(db *mydb.DB, db2 *mydb2.DB) *Service { | |||||
| return &Service{ | return &Service{ | ||||
| db: db, | |||||
| db: db, | |||||
| db2: db2, | |||||
| } | } | ||||
| } | } | ||||
| @@ -6,6 +6,7 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" | mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" | ||||
| mydb2 "gitlink.org.cn/cloudream/storage/common/pkgs/db2" | |||||
| coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | ||||
| "gitlink.org.cn/cloudream/storage/coordinator/internal/config" | "gitlink.org.cn/cloudream/storage/coordinator/internal/config" | ||||
| "gitlink.org.cn/cloudream/storage/coordinator/internal/mq" | "gitlink.org.cn/cloudream/storage/coordinator/internal/mq" | ||||
| @@ -29,7 +30,12 @@ func main() { | |||||
| logger.Fatalf("new db failed, err: %s", err.Error()) | logger.Fatalf("new db failed, err: %s", err.Error()) | ||||
| } | } | ||||
| coorSvr, err := coormq.NewServer(mq.NewService(db), &config.Cfg().RabbitMQ) | |||||
| db2, err := mydb2.NewDB(&config.Cfg().DB) | |||||
| if err != nil { | |||||
| logger.Fatalf("new db failed, err: %s", err.Error()) | |||||
| } | |||||
| coorSvr, err := coormq.NewServer(mq.NewService(db, db2), &config.Cfg().RabbitMQ) | |||||
| if err != nil { | if err != nil { | ||||
| logger.Fatalf("new coordinator server failed, err: %s", err.Error()) | logger.Fatalf("new coordinator server failed, err: %s", err.Error()) | ||||
| } | } | ||||