| @@ -9,7 +9,6 @@ import ( | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/utils/io2" | |||
| "gitlink.org.cn/cloudream/common/utils/serder" | |||
| agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" | |||
| agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" | |||
| "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" | |||
| ) | |||
| @@ -21,9 +20,12 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe | |||
| } | |||
| logger.WithField("PlanID", plan.ID).Infof("begin execute io plan") | |||
| defer logger.WithField("PlanID", plan.ID).Infof("plan finished") | |||
| sw := ioswitch.NewSwitch(plan) | |||
| s.swMgr.Add(sw) | |||
| defer s.swMgr.Remove(sw) | |||
| err = sw.Run(ctx) | |||
| if err != nil { | |||
| @@ -33,12 +35,12 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe | |||
| return &agtrpc.ExecuteIOPlanResp{}, nil | |||
| } | |||
| func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { | |||
| func (s *Service) SendStream(server agtrpc.Agent_SendStreamServer) error { | |||
| msg, err := server.Recv() | |||
| if err != nil { | |||
| return fmt.Errorf("recving stream id packet: %w", err) | |||
| } | |||
| if msg.Type != agentserver.StreamDataPacketType_SendArgs { | |||
| if msg.Type != agtrpc.StreamDataPacketType_SendArgs { | |||
| return fmt.Errorf("first packet must be a SendArgs packet") | |||
| } | |||
| @@ -58,8 +60,9 @@ func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { | |||
| pr, pw := io.Pipe() | |||
| varID := ioswitch.VarID(msg.VarID) | |||
| sw.PutVars(&ioswitch.StreamVar{ | |||
| ID: ioswitch.VarID(msg.VarID), | |||
| ID: varID, | |||
| Stream: pr, | |||
| }) | |||
| @@ -74,6 +77,7 @@ func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { | |||
| // 关闭文件写入 | |||
| pw.CloseWithError(io.ErrClosedPipe) | |||
| logger.WithField("ReceiveSize", recvSize). | |||
| WithField("VarID", varID). | |||
| Warnf("recv message failed, err: %s", err.Error()) | |||
| return fmt.Errorf("recv message failed, err: %w", err) | |||
| } | |||
| @@ -88,7 +92,7 @@ func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { | |||
| recvSize += int64(len(msg.Data)) | |||
| if msg.Type == agentserver.StreamDataPacketType_EOF { | |||
| if msg.Type == agtrpc.StreamDataPacketType_EOF { | |||
| // 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash | |||
| err := pw.Close() | |||
| if err != nil { | |||
| @@ -97,7 +101,7 @@ func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { | |||
| } | |||
| // 并将结果返回到客户端 | |||
| err = server.SendAndClose(&agentserver.SendStreamResp{}) | |||
| err = server.SendAndClose(&agtrpc.SendStreamResp{}) | |||
| if err != nil { | |||
| logger.Warnf("send response failed, err: %s", err.Error()) | |||
| return fmt.Errorf("send response failed, err: %w", err) | |||
| @@ -108,7 +112,7 @@ func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { | |||
| } | |||
| } | |||
| func (s *Service) GetStream(req *agentserver.GetStreamReq, server agentserver.Agent_GetStreamServer) error { | |||
| func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Agent_GetStreamServer) error { | |||
| logger. | |||
| WithField("PlanID", req.PlanID). | |||
| WithField("VarID", req.VarID). | |||
| @@ -141,8 +145,8 @@ func (s *Service) GetStream(req *agentserver.GetStreamReq, server agentserver.Ag | |||
| if readCnt > 0 { | |||
| readAllCnt += readCnt | |||
| err = server.Send(&agentserver.StreamDataPacket{ | |||
| Type: agentserver.StreamDataPacketType_Data, | |||
| err = server.Send(&agtrpc.StreamDataPacket{ | |||
| Type: agtrpc.StreamDataPacketType_Data, | |||
| Data: buf[:readCnt], | |||
| }) | |||
| if err != nil { | |||
| @@ -161,8 +165,8 @@ func (s *Service) GetStream(req *agentserver.GetStreamReq, server agentserver.Ag | |||
| WithField("VarID", req.VarID). | |||
| Debugf("send data size %d", readAllCnt) | |||
| // 发送EOF消息 | |||
| server.Send(&agentserver.StreamDataPacket{ | |||
| Type: agentserver.StreamDataPacketType_EOF, | |||
| server.Send(&agtrpc.StreamDataPacket{ | |||
| Type: agtrpc.StreamDataPacketType_EOF, | |||
| }) | |||
| return nil | |||
| } | |||
| @@ -197,10 +201,10 @@ func (s *Service) SendVar(ctx context.Context, req *agtrpc.SendVarReq) (*agtrpc. | |||
| } | |||
| func (s *Service) GetVar(ctx context.Context, req *agtrpc.GetVarReq) (*agtrpc.GetVarResp, error) { | |||
| ctx, cancel := context.WithTimeout(ctx, time.Second*30) | |||
| ctx2, cancel := context.WithTimeout(ctx, time.Second*30) | |||
| defer cancel() | |||
| sw := s.swMgr.FindByIDContexted(ctx, ioswitch.PlanID(req.PlanID)) | |||
| sw := s.swMgr.FindByIDContexted(ctx2, ioswitch.PlanID(req.PlanID)) | |||
| if sw == nil { | |||
| return nil, fmt.Errorf("plan not found") | |||
| } | |||
| @@ -215,7 +219,7 @@ func (s *Service) GetVar(ctx context.Context, req *agtrpc.GetVarReq) (*agtrpc.Ge | |||
| return nil, fmt.Errorf("binding vars: %w", err) | |||
| } | |||
| vd, err := serder.ObjectToJSON(v) | |||
| vd, err := serder.ObjectToJSONEx(v) | |||
| if err != nil { | |||
| return nil, fmt.Errorf("serializing var: %w", err) | |||
| } | |||
| @@ -4,6 +4,7 @@ import ( | |||
| "context" | |||
| "sync" | |||
| "github.com/samber/lo" | |||
| "gitlink.org.cn/cloudream/common/pkgs/future" | |||
| "gitlink.org.cn/cloudream/common/utils/lo2" | |||
| ) | |||
| @@ -30,14 +31,15 @@ func (s *Manager) Add(sw *Switch) { | |||
| defer s.lock.Unlock() | |||
| s.switchs[sw.Plan().ID] = sw | |||
| for i := range s.findings { | |||
| if s.findings[i].PlanID != sw.Plan().ID { | |||
| continue | |||
| s.findings = lo.Reject(s.findings, func(f *finding, idx int) bool { | |||
| if f.PlanID != sw.Plan().ID { | |||
| return false | |||
| } | |||
| s.findings[i].Callback.SetValue(sw) | |||
| s.findings = lo2.RemoveAt(s.findings, i) | |||
| } | |||
| f.Callback.SetValue(sw) | |||
| return true | |||
| }) | |||
| } | |||
| func (s *Manager) Remove(sw *Switch) { | |||
| @@ -6,6 +6,7 @@ import ( | |||
| "io" | |||
| "gitlink.org.cn/cloudream/common/pkgs/future" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||
| "gitlink.org.cn/cloudream/common/utils/io2" | |||
| stgglb "gitlink.org.cn/cloudream/storage/common/globals" | |||
| @@ -30,6 +31,8 @@ func (o *SendStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { | |||
| } | |||
| defer stgglb.AgentRPCPool.Release(agtCli) | |||
| logger.Debugf("sending stream %v to node %v", o.Stream.ID, o.Node) | |||
| err = agtCli.SendStream(ctx, sw.Plan().ID, o.Stream.ID, o.Stream.Stream) | |||
| if err != nil { | |||
| return fmt.Errorf("sending stream: %w", err) | |||
| @@ -50,6 +53,8 @@ func (o *GetStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { | |||
| } | |||
| defer stgglb.AgentRPCPool.Release(agtCli) | |||
| logger.Debugf("getting stream %v from node %v", o.Stream.ID, o.Node) | |||
| str, err := agtCli.GetStream(sw.Plan().ID, o.Stream.ID) | |||
| if err != nil { | |||
| return fmt.Errorf("getting stream: %w", err) | |||
| @@ -81,6 +86,8 @@ func (o *SendVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { | |||
| } | |||
| defer stgglb.AgentRPCPool.Release(agtCli) | |||
| logger.Debugf("sending var %v to node %v", o.Var.GetID(), o.Node) | |||
| err = agtCli.SendVar(ctx, sw.Plan().ID, o.Var) | |||
| if err != nil { | |||
| return fmt.Errorf("sending var: %w", err) | |||
| @@ -101,6 +108,8 @@ func (o *GetVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { | |||
| } | |||
| defer stgglb.AgentRPCPool.Release(agtCli) | |||
| logger.Debugf("getting var %v from node %v", o.Var.GetID(), o.Node) | |||
| v2, err := agtCli.GetVar(ctx, sw.Plan().ID, o.Var) | |||
| if err != nil { | |||
| return fmt.Errorf("getting var: %w", err) | |||
| @@ -22,7 +22,6 @@ type IPFSRead struct { | |||
| func (o *IPFSRead) Execute(ctx context.Context, sw *ioswitch.Switch) error { | |||
| logger. | |||
| WithField("FileHash", o.FileHash). | |||
| WithField("Output", o.Output). | |||
| Debugf("ipfs read op") | |||
| defer logger.Debugf("ipfs read op finished") | |||
| @@ -54,8 +53,8 @@ type IPFSWrite struct { | |||
| func (o *IPFSWrite) Execute(ctx context.Context, sw *ioswitch.Switch) error { | |||
| logger. | |||
| WithField("ResultKey", o.FileHash). | |||
| WithField("Input", o.Input). | |||
| WithField("Input", o.Input.ID). | |||
| WithField("FileHashVar", o.FileHash.ID). | |||
| Debugf("ipfs write op") | |||
| ipfsCli, err := stgglb.IPFSPool.Acquire() | |||
| @@ -87,6 +87,8 @@ func (e *Executor) execute() { | |||
| } | |||
| wg.Wait() | |||
| e.callback.SetVoid() | |||
| } | |||
| func (e *Executor) stopWith(err error) { | |||
| @@ -16,10 +16,14 @@ type PlanBuilder struct { | |||
| storeMap *sync.Map | |||
| } | |||
| func NewPlanBuilder() PlanBuilder { | |||
| return PlanBuilder{ | |||
| func NewPlanBuilder() *PlanBuilder { | |||
| bld := &PlanBuilder{ | |||
| agentPlans: make(map[cdssdk.NodeID]*AgentPlanBuilder), | |||
| storeMap: &sync.Map{}, | |||
| } | |||
| bld.executorPlan.blder = bld | |||
| return bld | |||
| } | |||
| func (b *PlanBuilder) AtExecutor() *ExecutorPlanBuilder { | |||
| @@ -102,23 +102,23 @@ func (s *Switch) PutVars(vs ...Var) { | |||
| loop: | |||
| for _, v := range vs { | |||
| for _, b := range s.bindings { | |||
| for i, w := range b.Waittings { | |||
| for ib, b := range s.bindings { | |||
| for iw, w := range b.Waittings { | |||
| if w.GetID() != v.GetID() { | |||
| continue | |||
| } | |||
| if err := AssignVar(w, v); err != nil { | |||
| if err := AssignVar(v, w); err != nil { | |||
| b.Callback.SetError(fmt.Errorf("assign var %v to %v: %w", v.GetID(), w.GetID(), err)) | |||
| // 绑定类型不对,说明生成的执行计划有问题,怎么处理都可以,因为最终会执行失败 | |||
| continue loop | |||
| } | |||
| b.Bindeds = append(b.Bindeds, w) | |||
| b.Waittings = lo2.RemoveAt(b.Waittings, i) | |||
| b.Waittings = lo2.RemoveAt(b.Waittings, iw) | |||
| if len(b.Waittings) == 0 { | |||
| b.Callback.SetVoid() | |||
| s.bindings = lo2.RemoveAt(s.bindings, i) | |||
| s.bindings = lo2.RemoveAt(s.bindings, ib) | |||
| } | |||
| // 绑定成功,继续最外层循环 | |||
| @@ -11,6 +11,8 @@ func AssignVar(from Var, to Var) error { | |||
| } | |||
| switch from := from.(type) { | |||
| case *StreamVar: | |||
| to.(*StreamVar).Stream = from.Stream | |||
| case *IntVar: | |||
| to.(*IntVar).Value = from.Value | |||
| case *StringVar: | |||
| @@ -9,13 +9,12 @@ import ( | |||
| ) | |||
| type Config struct { | |||
| ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"` | |||
| NodeUnavailableSeconds int `json:"nodeUnavailableSeconds"` // 如果节点上次上报时间超过这个值,则认为节点已经不可用 | |||
| Logger log.Config `json:"logger"` | |||
| DB db.Config `json:"db"` | |||
| RabbitMQ stgmq.Config `json:"rabbitMQ"` | |||
| DistLock distlock.Config `json:"distlock"` | |||
| ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"` | |||
| NodeUnavailableSeconds int `json:"nodeUnavailableSeconds"` // 如果节点上次上报时间超过这个值,则认为节点已经不可用 | |||
| Logger log.Config `json:"logger"` | |||
| DB db.Config `json:"db"` | |||
| RabbitMQ stgmq.Config `json:"rabbitMQ"` | |||
| DistLock distlock.Config `json:"distlock"` | |||
| } | |||
| var cfg Config | |||
| @@ -133,10 +133,10 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { | |||
| pinnedAt: obj.PinnedAt, | |||
| blocks: obj.Blocks, | |||
| }) | |||
| ecObjectsUpdating = append(ecObjectsUpdating, t.makePlansForECObject(allNodeInfos, solu, obj, &planBld, plnningNodeIDs)) | |||
| ecObjectsUpdating = append(ecObjectsUpdating, t.makePlansForECObject(allNodeInfos, solu, obj, planBld, plnningNodeIDs)) | |||
| } | |||
| ioSwRets, err := t.executePlans(execCtx, pinPlans, &planBld, plnningNodeIDs) | |||
| ioSwRets, err := t.executePlans(execCtx, pinPlans, planBld, plnningNodeIDs) | |||
| if err != nil { | |||
| log.Warn(err.Error()) | |||
| return | |||
| @@ -8,6 +8,7 @@ import ( | |||
| stgglb "gitlink.org.cn/cloudream/storage/common/globals" | |||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db" | |||
| "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" | |||
| agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" | |||
| scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" | |||
| "gitlink.org.cn/cloudream/storage/scanner/internal/config" | |||
| "gitlink.org.cn/cloudream/storage/scanner/internal/event" | |||
| @@ -35,6 +36,8 @@ func main() { | |||
| stgglb.InitMQPool(&config.Cfg().RabbitMQ) | |||
| stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) | |||
| distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) | |||
| if err != nil { | |||
| logger.Warnf("new distlock service failed, err: %s", err.Error()) | |||