diff --git a/agent/internal/grpc/io.go b/agent/internal/grpc/io.go index eb71027..b7977de 100644 --- a/agent/internal/grpc/io.go +++ b/agent/internal/grpc/io.go @@ -9,7 +9,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/serder" - agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) @@ -21,9 +20,12 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe } logger.WithField("PlanID", plan.ID).Infof("begin execute io plan") + defer logger.WithField("PlanID", plan.ID).Infof("plan finished") sw := ioswitch.NewSwitch(plan) + s.swMgr.Add(sw) + defer s.swMgr.Remove(sw) err = sw.Run(ctx) if err != nil { @@ -33,12 +35,12 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe return &agtrpc.ExecuteIOPlanResp{}, nil } -func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { +func (s *Service) SendStream(server agtrpc.Agent_SendStreamServer) error { msg, err := server.Recv() if err != nil { return fmt.Errorf("recving stream id packet: %w", err) } - if msg.Type != agentserver.StreamDataPacketType_SendArgs { + if msg.Type != agtrpc.StreamDataPacketType_SendArgs { return fmt.Errorf("first packet must be a SendArgs packet") } @@ -58,8 +60,9 @@ func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { pr, pw := io.Pipe() + varID := ioswitch.VarID(msg.VarID) sw.PutVars(&ioswitch.StreamVar{ - ID: ioswitch.VarID(msg.VarID), + ID: varID, Stream: pr, }) @@ -74,6 +77,7 @@ func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { // 关闭文件写入 pw.CloseWithError(io.ErrClosedPipe) logger.WithField("ReceiveSize", recvSize). + WithField("VarID", varID). Warnf("recv message failed, err: %s", err.Error()) return fmt.Errorf("recv message failed, err: %w", err) } @@ -88,7 +92,7 @@ func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { recvSize += int64(len(msg.Data)) - if msg.Type == agentserver.StreamDataPacketType_EOF { + if msg.Type == agtrpc.StreamDataPacketType_EOF { // 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash err := pw.Close() if err != nil { @@ -97,7 +101,7 @@ func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { } // 并将结果返回到客户端 - err = server.SendAndClose(&agentserver.SendStreamResp{}) + err = server.SendAndClose(&agtrpc.SendStreamResp{}) if err != nil { logger.Warnf("send response failed, err: %s", err.Error()) return fmt.Errorf("send response failed, err: %w", err) @@ -108,7 +112,7 @@ func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { } } -func (s *Service) GetStream(req *agentserver.GetStreamReq, server agentserver.Agent_GetStreamServer) error { +func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Agent_GetStreamServer) error { logger. WithField("PlanID", req.PlanID). WithField("VarID", req.VarID). @@ -141,8 +145,8 @@ func (s *Service) GetStream(req *agentserver.GetStreamReq, server agentserver.Ag if readCnt > 0 { readAllCnt += readCnt - err = server.Send(&agentserver.StreamDataPacket{ - Type: agentserver.StreamDataPacketType_Data, + err = server.Send(&agtrpc.StreamDataPacket{ + Type: agtrpc.StreamDataPacketType_Data, Data: buf[:readCnt], }) if err != nil { @@ -161,8 +165,8 @@ func (s *Service) GetStream(req *agentserver.GetStreamReq, server agentserver.Ag WithField("VarID", req.VarID). Debugf("send data size %d", readAllCnt) // 发送EOF消息 - server.Send(&agentserver.StreamDataPacket{ - Type: agentserver.StreamDataPacketType_EOF, + server.Send(&agtrpc.StreamDataPacket{ + Type: agtrpc.StreamDataPacketType_EOF, }) return nil } @@ -197,10 +201,10 @@ func (s *Service) SendVar(ctx context.Context, req *agtrpc.SendVarReq) (*agtrpc. } func (s *Service) GetVar(ctx context.Context, req *agtrpc.GetVarReq) (*agtrpc.GetVarResp, error) { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) + ctx2, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - sw := s.swMgr.FindByIDContexted(ctx, ioswitch.PlanID(req.PlanID)) + sw := s.swMgr.FindByIDContexted(ctx2, ioswitch.PlanID(req.PlanID)) if sw == nil { return nil, fmt.Errorf("plan not found") } @@ -215,7 +219,7 @@ func (s *Service) GetVar(ctx context.Context, req *agtrpc.GetVarReq) (*agtrpc.Ge return nil, fmt.Errorf("binding vars: %w", err) } - vd, err := serder.ObjectToJSON(v) + vd, err := serder.ObjectToJSONEx(v) if err != nil { return nil, fmt.Errorf("serializing var: %w", err) } diff --git a/common/pkgs/ioswitch/manager.go b/common/pkgs/ioswitch/manager.go index f2cb3bb..940aa66 100644 --- a/common/pkgs/ioswitch/manager.go +++ b/common/pkgs/ioswitch/manager.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/utils/lo2" ) @@ -30,14 +31,15 @@ func (s *Manager) Add(sw *Switch) { defer s.lock.Unlock() s.switchs[sw.Plan().ID] = sw - for i := range s.findings { - if s.findings[i].PlanID != sw.Plan().ID { - continue + + s.findings = lo.Reject(s.findings, func(f *finding, idx int) bool { + if f.PlanID != sw.Plan().ID { + return false } - s.findings[i].Callback.SetValue(sw) - s.findings = lo2.RemoveAt(s.findings, i) - } + f.Callback.SetValue(sw) + return true + }) } func (s *Manager) Remove(sw *Switch) { diff --git a/common/pkgs/ioswitch/ops/grpc.go b/common/pkgs/ioswitch/ops/grpc.go index 4c26328..d1a6c97 100644 --- a/common/pkgs/ioswitch/ops/grpc.go +++ b/common/pkgs/ioswitch/ops/grpc.go @@ -6,6 +6,7 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" @@ -30,6 +31,8 @@ func (o *SendStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { } defer stgglb.AgentRPCPool.Release(agtCli) + logger.Debugf("sending stream %v to node %v", o.Stream.ID, o.Node) + err = agtCli.SendStream(ctx, sw.Plan().ID, o.Stream.ID, o.Stream.Stream) if err != nil { return fmt.Errorf("sending stream: %w", err) @@ -50,6 +53,8 @@ func (o *GetStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { } defer stgglb.AgentRPCPool.Release(agtCli) + logger.Debugf("getting stream %v from node %v", o.Stream.ID, o.Node) + str, err := agtCli.GetStream(sw.Plan().ID, o.Stream.ID) if err != nil { return fmt.Errorf("getting stream: %w", err) @@ -81,6 +86,8 @@ func (o *SendVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { } defer stgglb.AgentRPCPool.Release(agtCli) + logger.Debugf("sending var %v to node %v", o.Var.GetID(), o.Node) + err = agtCli.SendVar(ctx, sw.Plan().ID, o.Var) if err != nil { return fmt.Errorf("sending var: %w", err) @@ -101,6 +108,8 @@ func (o *GetVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { } defer stgglb.AgentRPCPool.Release(agtCli) + logger.Debugf("getting var %v from node %v", o.Var.GetID(), o.Node) + v2, err := agtCli.GetVar(ctx, sw.Plan().ID, o.Var) if err != nil { return fmt.Errorf("getting var: %w", err) diff --git a/common/pkgs/ioswitch/ops/ipfs.go b/common/pkgs/ioswitch/ops/ipfs.go index 2c1d325..2a485bb 100644 --- a/common/pkgs/ioswitch/ops/ipfs.go +++ b/common/pkgs/ioswitch/ops/ipfs.go @@ -22,7 +22,6 @@ type IPFSRead struct { func (o *IPFSRead) Execute(ctx context.Context, sw *ioswitch.Switch) error { logger. WithField("FileHash", o.FileHash). - WithField("Output", o.Output). Debugf("ipfs read op") defer logger.Debugf("ipfs read op finished") @@ -54,8 +53,8 @@ type IPFSWrite struct { func (o *IPFSWrite) Execute(ctx context.Context, sw *ioswitch.Switch) error { logger. - WithField("ResultKey", o.FileHash). - WithField("Input", o.Input). + WithField("Input", o.Input.ID). + WithField("FileHashVar", o.FileHash.ID). Debugf("ipfs write op") ipfsCli, err := stgglb.IPFSPool.Acquire() diff --git a/common/pkgs/ioswitch/plans/executor.go b/common/pkgs/ioswitch/plans/executor.go index c1578bf..e1de800 100644 --- a/common/pkgs/ioswitch/plans/executor.go +++ b/common/pkgs/ioswitch/plans/executor.go @@ -87,6 +87,8 @@ func (e *Executor) execute() { } wg.Wait() + + e.callback.SetVoid() } func (e *Executor) stopWith(err error) { diff --git a/common/pkgs/ioswitch/plans/plan_builder.go b/common/pkgs/ioswitch/plans/plan_builder.go index 65ff671..ccb709f 100644 --- a/common/pkgs/ioswitch/plans/plan_builder.go +++ b/common/pkgs/ioswitch/plans/plan_builder.go @@ -16,10 +16,14 @@ type PlanBuilder struct { storeMap *sync.Map } -func NewPlanBuilder() PlanBuilder { - return PlanBuilder{ +func NewPlanBuilder() *PlanBuilder { + bld := &PlanBuilder{ agentPlans: make(map[cdssdk.NodeID]*AgentPlanBuilder), + storeMap: &sync.Map{}, } + bld.executorPlan.blder = bld + + return bld } func (b *PlanBuilder) AtExecutor() *ExecutorPlanBuilder { diff --git a/common/pkgs/ioswitch/switch.go b/common/pkgs/ioswitch/switch.go index 9760ff2..1dceeab 100644 --- a/common/pkgs/ioswitch/switch.go +++ b/common/pkgs/ioswitch/switch.go @@ -102,23 +102,23 @@ func (s *Switch) PutVars(vs ...Var) { loop: for _, v := range vs { - for _, b := range s.bindings { - for i, w := range b.Waittings { + for ib, b := range s.bindings { + for iw, w := range b.Waittings { if w.GetID() != v.GetID() { continue } - if err := AssignVar(w, v); err != nil { + if err := AssignVar(v, w); err != nil { b.Callback.SetError(fmt.Errorf("assign var %v to %v: %w", v.GetID(), w.GetID(), err)) // 绑定类型不对,说明生成的执行计划有问题,怎么处理都可以,因为最终会执行失败 continue loop } b.Bindeds = append(b.Bindeds, w) - b.Waittings = lo2.RemoveAt(b.Waittings, i) + b.Waittings = lo2.RemoveAt(b.Waittings, iw) if len(b.Waittings) == 0 { b.Callback.SetVoid() - s.bindings = lo2.RemoveAt(s.bindings, i) + s.bindings = lo2.RemoveAt(s.bindings, ib) } // 绑定成功,继续最外层循环 diff --git a/common/pkgs/ioswitch/utils.go b/common/pkgs/ioswitch/utils.go index e5a9ee1..e912053 100644 --- a/common/pkgs/ioswitch/utils.go +++ b/common/pkgs/ioswitch/utils.go @@ -11,6 +11,8 @@ func AssignVar(from Var, to Var) error { } switch from := from.(type) { + case *StreamVar: + to.(*StreamVar).Stream = from.Stream case *IntVar: to.(*IntVar).Value = from.Value case *StringVar: diff --git a/scanner/internal/config/config.go b/scanner/internal/config/config.go index 6fc6830..353e2e4 100644 --- a/scanner/internal/config/config.go +++ b/scanner/internal/config/config.go @@ -9,13 +9,12 @@ import ( ) type Config struct { - ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"` - NodeUnavailableSeconds int `json:"nodeUnavailableSeconds"` // 如果节点上次上报时间超过这个值,则认为节点已经不可用 - - Logger log.Config `json:"logger"` - DB db.Config `json:"db"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` - DistLock distlock.Config `json:"distlock"` + ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"` + NodeUnavailableSeconds int `json:"nodeUnavailableSeconds"` // 如果节点上次上报时间超过这个值,则认为节点已经不可用 + Logger log.Config `json:"logger"` + DB db.Config `json:"db"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` + DistLock distlock.Config `json:"distlock"` } var cfg Config diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index 1994fc8..664acb5 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -133,10 +133,10 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { pinnedAt: obj.PinnedAt, blocks: obj.Blocks, }) - ecObjectsUpdating = append(ecObjectsUpdating, t.makePlansForECObject(allNodeInfos, solu, obj, &planBld, plnningNodeIDs)) + ecObjectsUpdating = append(ecObjectsUpdating, t.makePlansForECObject(allNodeInfos, solu, obj, planBld, plnningNodeIDs)) } - ioSwRets, err := t.executePlans(execCtx, pinPlans, &planBld, plnningNodeIDs) + ioSwRets, err := t.executePlans(execCtx, pinPlans, planBld, plnningNodeIDs) if err != nil { log.Warn(err.Error()) return diff --git a/scanner/main.go b/scanner/main.go index c89b846..f17e269 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -8,6 +8,7 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" + agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" "gitlink.org.cn/cloudream/storage/scanner/internal/config" "gitlink.org.cn/cloudream/storage/scanner/internal/event" @@ -35,6 +36,8 @@ func main() { stgglb.InitMQPool(&config.Cfg().RabbitMQ) + stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) + distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { logger.Warnf("new distlock service failed, err: %s", err.Error())