diff --git a/agent/internal/grpc/io.go b/agent/internal/grpc/io.go index 49d7805..de593ec 100644 --- a/agent/internal/grpc/io.go +++ b/agent/internal/grpc/io.go @@ -66,10 +66,7 @@ func (s *Service) SendStream(server agtrpc.Agent_SendStreamServer) error { pr, pw := io.Pipe() varID := exec.VarID(msg.VarID) - sw.PutVars(&exec.StreamVar{ - ID: varID, - Stream: pr, - }) + sw.PutVar(varID, &exec.StreamValue{Stream: pr}) // 然后读取文件数据 var recvSize int64 @@ -132,17 +129,14 @@ func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Agent_GetStr return fmt.Errorf("plan not found") } - signal, err := serder.JSONToObjectEx[*exec.SignalVar]([]byte(req.Signal)) + signal, err := serder.JSONToObjectEx[exec.VarValue]([]byte(req.Signal)) if err != nil { return fmt.Errorf("deserializing var: %w", err) } - sw.PutVars(signal) + sw.PutVar(exec.VarID(req.SignalID), signal) - strVar := &exec.StreamVar{ - ID: exec.VarID(req.VarID), - } - err = sw.BindVars(server.Context(), strVar) + strVar, err := exec.BindVar[*exec.StreamValue](sw, server.Context(), exec.VarID(req.VarID)) if err != nil { return fmt.Errorf("binding vars: %w", err) } @@ -205,12 +199,12 @@ func (s *Service) SendVar(ctx context.Context, req *agtrpc.SendVarReq) (*agtrpc. return nil, fmt.Errorf("plan not found") } - v, err := serder.JSONToObjectEx[exec.Var]([]byte(req.Var)) + v, err := serder.JSONToObjectEx[exec.VarValue]([]byte(req.VarValue)) if err != nil { return nil, fmt.Errorf("deserializing var: %w", err) } - sw.PutVars(v) + sw.PutVar(exec.VarID(req.VarID), v) return &agtrpc.SendVarResp{}, nil } @@ -223,19 +217,14 @@ func (s *Service) GetVar(ctx context.Context, req *agtrpc.GetVarReq) (*agtrpc.Ge return nil, fmt.Errorf("plan not found") } - v, err := serder.JSONToObjectEx[exec.Var]([]byte(req.Var)) - if err != nil { - return nil, fmt.Errorf("deserializing var: %w", err) - } - - signal, err := serder.JSONToObjectEx[*exec.SignalVar]([]byte(req.Signal)) + signal, err := serder.JSONToObjectEx[exec.VarValue]([]byte(req.Signal)) if err != nil { return nil, fmt.Errorf("deserializing var: %w", err) } - sw.PutVars(signal) + sw.PutVar(exec.VarID(req.SignalID), signal) - err = sw.BindVars(ctx, v) + v, err := sw.BindVar(ctx, exec.VarID(req.VarID)) if err != nil { return nil, fmt.Errorf("binding vars: %w", err) } diff --git a/common/pkgs/grpc/agent/agent.pb.go b/common/pkgs/grpc/agent/agent.pb.go index 08bec19..ce50847 100644 --- a/common/pkgs/grpc/agent/agent.pb.go +++ b/common/pkgs/grpc/agent/agent.pb.go @@ -327,9 +327,10 @@ type GetStreamReq struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - PlanID string `protobuf:"bytes,1,opt,name=PlanID,proto3" json:"PlanID,omitempty"` - VarID int32 `protobuf:"varint,2,opt,name=VarID,proto3" json:"VarID,omitempty"` - Signal string `protobuf:"bytes,3,opt,name=Signal,proto3" json:"Signal,omitempty"` + PlanID string `protobuf:"bytes,1,opt,name=PlanID,proto3" json:"PlanID,omitempty"` + VarID int32 `protobuf:"varint,2,opt,name=VarID,proto3" json:"VarID,omitempty"` + SignalID int32 `protobuf:"varint,3,opt,name=SignalID,proto3" json:"SignalID,omitempty"` + Signal string `protobuf:"bytes,4,opt,name=Signal,proto3" json:"Signal,omitempty"` } func (x *GetStreamReq) Reset() { @@ -378,6 +379,13 @@ func (x *GetStreamReq) GetVarID() int32 { return 0 } +func (x *GetStreamReq) GetSignalID() int32 { + if x != nil { + return x.SignalID + } + return 0 +} + func (x *GetStreamReq) GetSignal() string { if x != nil { return x.Signal @@ -390,8 +398,9 @@ type SendVarReq struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - PlanID string `protobuf:"bytes,1,opt,name=PlanID,proto3" json:"PlanID,omitempty"` - Var string `protobuf:"bytes,2,opt,name=Var,proto3" json:"Var,omitempty"` + PlanID string `protobuf:"bytes,1,opt,name=PlanID,proto3" json:"PlanID,omitempty"` + VarID int32 `protobuf:"varint,2,opt,name=VarID,proto3" json:"VarID,omitempty"` + VarValue string `protobuf:"bytes,3,opt,name=VarValue,proto3" json:"VarValue,omitempty"` } func (x *SendVarReq) Reset() { @@ -433,9 +442,16 @@ func (x *SendVarReq) GetPlanID() string { return "" } -func (x *SendVarReq) GetVar() string { +func (x *SendVarReq) GetVarID() int32 { if x != nil { - return x.Var + return x.VarID + } + return 0 +} + +func (x *SendVarReq) GetVarValue() string { + if x != nil { + return x.VarValue } return "" } @@ -483,9 +499,10 @@ type GetVarReq struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - PlanID string `protobuf:"bytes,1,opt,name=PlanID,proto3" json:"PlanID,omitempty"` - Var string `protobuf:"bytes,2,opt,name=Var,proto3" json:"Var,omitempty"` - Signal string `protobuf:"bytes,3,opt,name=Signal,proto3" json:"Signal,omitempty"` + PlanID string `protobuf:"bytes,1,opt,name=PlanID,proto3" json:"PlanID,omitempty"` + VarID int32 `protobuf:"varint,2,opt,name=VarID,proto3" json:"VarID,omitempty"` + SignalID int32 `protobuf:"varint,3,opt,name=SignalID,proto3" json:"SignalID,omitempty"` + Signal string `protobuf:"bytes,4,opt,name=Signal,proto3" json:"Signal,omitempty"` } func (x *GetVarReq) Reset() { @@ -527,11 +544,18 @@ func (x *GetVarReq) GetPlanID() string { return "" } -func (x *GetVarReq) GetVar() string { +func (x *GetVarReq) GetVarID() int32 { if x != nil { - return x.Var + return x.VarID } - return "" + return 0 +} + +func (x *GetVarReq) GetSignalID() int32 { + if x != nil { + return x.SignalID + } + return 0 } func (x *GetVarReq) GetSignal() string { @@ -546,7 +570,7 @@ type GetVarResp struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Var string `protobuf:"bytes,1,opt,name=Var,proto3" json:"Var,omitempty"` // 此处不使用VarID的原因是,Switch的BindVars函数还需要知道Var的类型 + Var string `protobuf:"bytes,1,opt,name=Var,proto3" json:"Var,omitempty"` } func (x *GetVarResp) Reset() { @@ -686,48 +710,54 @@ var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{ 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x10, 0x0a, 0x0e, - 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x54, + 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x70, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x12, 0x16, 0x0a, 0x06, - 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x53, 0x69, - 0x67, 0x6e, 0x61, 0x6c, 0x22, 0x36, 0x0a, 0x0a, 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x52, - 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x12, 0x10, 0x0a, 0x03, 0x56, 0x61, - 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x56, 0x61, 0x72, 0x22, 0x0d, 0x0a, 0x0b, - 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x52, 0x65, 0x73, 0x70, 0x22, 0x4d, 0x0a, 0x09, 0x47, - 0x65, 0x74, 0x56, 0x61, 0x72, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, - 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, - 0x12, 0x10, 0x0a, 0x03, 0x56, 0x61, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x56, - 0x61, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x06, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x22, 0x1e, 0x0a, 0x0a, 0x47, 0x65, - 0x74, 0x56, 0x61, 0x72, 0x52, 0x65, 0x73, 0x70, 0x12, 0x10, 0x0a, 0x03, 0x56, 0x61, 0x72, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x56, 0x61, 0x72, 0x22, 0x09, 0x0a, 0x07, 0x50, 0x69, - 0x6e, 0x67, 0x52, 0x65, 0x71, 0x22, 0x0a, 0x0a, 0x08, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, - 0x70, 0x2a, 0x37, 0x0a, 0x14, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, - 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, 0x46, - 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, - 0x53, 0x65, 0x6e, 0x64, 0x41, 0x72, 0x67, 0x73, 0x10, 0x02, 0x32, 0x96, 0x02, 0x0a, 0x05, 0x41, - 0x67, 0x65, 0x6e, 0x74, 0x12, 0x38, 0x0a, 0x0d, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x49, - 0x4f, 0x50, 0x6c, 0x61, 0x6e, 0x12, 0x11, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x49, - 0x4f, 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x65, 0x71, 0x1a, 0x12, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, - 0x74, 0x65, 0x49, 0x4f, 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x12, 0x34, - 0x0a, 0x0a, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x11, 0x2e, 0x53, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, - 0x0f, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, - 0x22, 0x00, 0x28, 0x01, 0x12, 0x31, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x12, 0x0d, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, - 0x1a, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, - 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, 0x12, 0x26, 0x0a, 0x07, 0x53, 0x65, 0x6e, 0x64, 0x56, - 0x61, 0x72, 0x12, 0x0b, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x52, 0x65, 0x71, 0x1a, - 0x0c, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x12, - 0x23, 0x0a, 0x06, 0x47, 0x65, 0x74, 0x56, 0x61, 0x72, 0x12, 0x0a, 0x2e, 0x47, 0x65, 0x74, 0x56, - 0x61, 0x72, 0x52, 0x65, 0x71, 0x1a, 0x0b, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x61, 0x72, 0x52, 0x65, - 0x73, 0x70, 0x22, 0x00, 0x12, 0x1d, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x08, 0x2e, 0x50, - 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x1a, 0x09, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, - 0x70, 0x22, 0x00, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x12, 0x1a, 0x0a, 0x08, + 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, + 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x49, 0x44, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x69, 0x67, 0x6e, + 0x61, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, + 0x22, 0x56, 0x0a, 0x0a, 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x52, 0x65, 0x71, 0x12, 0x16, + 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, + 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x12, 0x1a, 0x0a, 0x08, + 0x56, 0x61, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, + 0x56, 0x61, 0x72, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x0d, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, + 0x56, 0x61, 0x72, 0x52, 0x65, 0x73, 0x70, 0x22, 0x6d, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x56, 0x61, + 0x72, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, + 0x56, 0x61, 0x72, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x56, 0x61, 0x72, + 0x49, 0x44, 0x12, 0x1a, 0x0a, 0x08, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x49, 0x44, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x49, 0x44, 0x12, 0x16, + 0x0a, 0x06, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, + 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x22, 0x1e, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x56, 0x61, 0x72, + 0x52, 0x65, 0x73, 0x70, 0x12, 0x10, 0x0a, 0x03, 0x56, 0x61, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x56, 0x61, 0x72, 0x22, 0x09, 0x0a, 0x07, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, + 0x71, 0x22, 0x0a, 0x0a, 0x08, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x2a, 0x37, 0x0a, + 0x14, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, + 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, 0x46, 0x10, 0x00, 0x12, 0x08, + 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, + 0x41, 0x72, 0x67, 0x73, 0x10, 0x02, 0x32, 0x96, 0x02, 0x0a, 0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, + 0x12, 0x38, 0x0a, 0x0d, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x49, 0x4f, 0x50, 0x6c, 0x61, + 0x6e, 0x12, 0x11, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x49, 0x4f, 0x50, 0x6c, 0x61, + 0x6e, 0x52, 0x65, 0x71, 0x1a, 0x12, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x49, 0x4f, + 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x12, 0x34, 0x0a, 0x0a, 0x53, 0x65, + 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x0f, 0x2e, 0x53, 0x65, + 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, + 0x12, 0x31, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x0d, 0x2e, + 0x47, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, + 0x00, 0x30, 0x01, 0x12, 0x26, 0x0a, 0x07, 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x12, 0x0b, + 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x52, 0x65, 0x71, 0x1a, 0x0c, 0x2e, 0x53, 0x65, + 0x6e, 0x64, 0x56, 0x61, 0x72, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x12, 0x23, 0x0a, 0x06, 0x47, + 0x65, 0x74, 0x56, 0x61, 0x72, 0x12, 0x0a, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x61, 0x72, 0x52, 0x65, + 0x71, 0x1a, 0x0b, 0x2e, 0x47, 0x65, 0x74, 0x56, 0x61, 0x72, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, + 0x12, 0x1d, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x08, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, + 0x65, 0x71, 0x1a, 0x09, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x42, + 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/common/pkgs/grpc/agent/agent.proto b/common/pkgs/grpc/agent/agent.proto index c4c7627..3233dee 100644 --- a/common/pkgs/grpc/agent/agent.proto +++ b/common/pkgs/grpc/agent/agent.proto @@ -37,22 +37,25 @@ message SendStreamResp {} message GetStreamReq { string PlanID = 1; int32 VarID = 2; - string Signal = 3; + int32 SignalID = 3; + string Signal = 4; } message SendVarReq { string PlanID = 1; - string Var = 2; + int32 VarID = 2; + string VarValue = 3; } message SendVarResp {} message GetVarReq { string PlanID = 1; - string Var = 2; - string Signal = 3; + int32 VarID = 2; + int32 SignalID = 3; + string Signal = 4; } message GetVarResp { - string Var = 1; // 此处不使用VarID的原因是,Switch的BindVars函数还需要知道Var的类型 + string Var = 1; } message PingReq {} diff --git a/common/pkgs/grpc/agent/client.go b/common/pkgs/grpc/agent/client.go index 3c5cdb9..3588002 100644 --- a/common/pkgs/grpc/agent/client.go +++ b/common/pkgs/grpc/agent/client.go @@ -132,7 +132,7 @@ func (c *Client) SendStream(ctx context.Context, planID exec.PlanID, varID exec. } } -func (c *Client) GetStream(ctx context.Context, planID exec.PlanID, varID exec.VarID, signal *exec.SignalVar) (io.ReadCloser, error) { +func (c *Client) GetStream(ctx context.Context, planID exec.PlanID, varID exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) { ctx, cancel := context.WithCancel(ctx) sdata, err := serder.ObjectToJSONEx(signal) @@ -142,9 +142,10 @@ func (c *Client) GetStream(ctx context.Context, planID exec.PlanID, varID exec.V } stream, err := c.cli.GetStream(ctx, &GetStreamReq{ - PlanID: string(planID), - VarID: int32(varID), - Signal: string(sdata), + PlanID: string(planID), + VarID: int32(varID), + SignalID: int32(signalID), + Signal: string(sdata), }) if err != nil { cancel() @@ -157,50 +158,42 @@ func (c *Client) GetStream(ctx context.Context, planID exec.PlanID, varID exec.V }, nil } -func (c *Client) SendVar(ctx context.Context, planID exec.PlanID, v exec.Var) error { - data, err := serder.ObjectToJSONEx(v) +func (c *Client) SendVar(ctx context.Context, planID exec.PlanID, id exec.VarID, value exec.VarValue) error { + data, err := serder.ObjectToJSONEx(value) if err != nil { return err } _, err = c.cli.SendVar(ctx, &SendVarReq{ - PlanID: string(planID), - Var: string(data), + PlanID: string(planID), + VarID: int32(id), + VarValue: string(data), }) return err } -func (c *Client) GetVar(ctx context.Context, planID exec.PlanID, v exec.Var, signal *exec.SignalVar) error { - vdata, err := serder.ObjectToJSONEx(v) - if err != nil { - return err - } - +func (c *Client) GetVar(ctx context.Context, planID exec.PlanID, varID exec.VarID, signalID exec.VarID, signal exec.VarValue) (exec.VarValue, error) { sdata, err := serder.ObjectToJSONEx(signal) if err != nil { - return err + return nil, err } resp, err := c.cli.GetVar(ctx, &GetVarReq{ - PlanID: string(planID), - Var: string(vdata), - Signal: string(sdata), + PlanID: string(planID), + VarID: int32(varID), + SignalID: int32(signalID), + Signal: string(sdata), }) if err != nil { - return err - } - - v2, err := serder.JSONToObjectEx[exec.Var]([]byte(resp.Var)) - if err != nil { - return err + return nil, err } - err = exec.AssignVar(v2, v) + getVar, err := serder.JSONToObjectEx[exec.VarValue]([]byte(resp.Var)) if err != nil { - return err + return nil, err } - return nil + return getVar, nil } func (c *Client) Ping() error { diff --git a/common/pkgs/ioswitch2/agent_worker.go b/common/pkgs/ioswitch2/agent_worker.go index cdca6d9..c1a8a1a 100644 --- a/common/pkgs/ioswitch2/agent_worker.go +++ b/common/pkgs/ioswitch2/agent_worker.go @@ -49,17 +49,17 @@ type AgentWorkerClient struct { func (c *AgentWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { return c.cli.ExecuteIOPlan(ctx, plan) } -func (c *AgentWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, v *exec.StreamVar, str io.ReadCloser) error { - return c.cli.SendStream(ctx, planID, v.ID, str) +func (c *AgentWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { + return c.cli.SendStream(ctx, planID, id, stream) } -func (c *AgentWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, v exec.Var) error { - return c.cli.SendVar(ctx, planID, v) +func (c *AgentWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, id exec.VarID, value exec.VarValue) error { + return c.cli.SendVar(ctx, planID, id, value) } -func (c *AgentWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, v *exec.StreamVar, signal *exec.SignalVar) (io.ReadCloser, error) { - return c.cli.GetStream(ctx, planID, v.ID, signal) +func (c *AgentWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, streamID exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) { + return c.cli.GetStream(ctx, planID, streamID, signalID, signal) } -func (c *AgentWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, v exec.Var, signal *exec.SignalVar) error { - return c.cli.GetVar(ctx, planID, v, signal) +func (c *AgentWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, varID exec.VarID, signalID exec.VarID, signal exec.VarValue) (exec.VarValue, error) { + return c.cli.GetVar(ctx, planID, varID, signalID, signal) } func (c *AgentWorkerClient) Close() error { stgglb.AgentRPCPool.Release(c.cli) diff --git a/common/pkgs/ioswitch2/http_hub_worker.go b/common/pkgs/ioswitch2/http_hub_worker.go index c8658d5..ef40dbc 100644 --- a/common/pkgs/ioswitch2/http_hub_worker.go +++ b/common/pkgs/ioswitch2/http_hub_worker.go @@ -56,18 +56,17 @@ type HttpHubWorkerClient struct { func (c *HttpHubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { return c.cli.ExecuteIOPlan(plan) } -func (c *HttpHubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, v *exec.StreamVar, str io.ReadCloser) error { - return c.cli.SendStream(planID, v.ID, str) +func (c *HttpHubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { + return c.cli.SendStream(planID, id, stream) } -func (c *HttpHubWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, v exec.Var) error { - return c.cli.SendVar(planID, v) +func (c *HttpHubWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, id exec.VarID, value exec.VarValue) error { + return c.cli.SendVar(planID, id, value) } -func (c *HttpHubWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, v *exec.StreamVar, signal *exec.SignalVar) (io.ReadCloser, error) { - return c.cli.GetStream(planID, v.ID, signal) +func (c *HttpHubWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, streamID exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) { + return c.cli.GetStream(planID, streamID, signalID, signal) } -func (c *HttpHubWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, v exec.Var, signal *exec.SignalVar) error { - return c.cli.GetVar(planID, v, signal) - //return nil +func (c *HttpHubWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, varID exec.VarID, signalID exec.VarID, signal exec.VarValue) (exec.VarValue, error) { + return c.cli.GetVar(planID, varID, signalID, signal) } func (c *HttpHubWorkerClient) Close() error { //stgglb.AgentRPCPool.Release(c.cli) diff --git a/common/pkgs/ioswitch2/ops2/chunked.go b/common/pkgs/ioswitch2/ops2/chunked.go index b8b4a11..dfceafa 100644 --- a/common/pkgs/ioswitch2/ops2/chunked.go +++ b/common/pkgs/ioswitch2/ops2/chunked.go @@ -19,20 +19,20 @@ func init() { } type ChunkedSplit struct { - Input *exec.StreamVar `json:"input"` - Outputs []*exec.StreamVar `json:"outputs"` - ChunkSize int `json:"chunkSize"` - PaddingZeros bool `json:"paddingZeros"` + Input exec.VarID `json:"input"` + Outputs []exec.VarID `json:"outputs"` + ChunkSize int `json:"chunkSize"` + PaddingZeros bool `json:"paddingZeros"` } func (o *ChunkedSplit) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Input) + input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) if err != nil { return err } - defer o.Input.Stream.Close() + defer input.Stream.Close() - outputs := io2.ChunkedSplit(o.Input.Stream, o.ChunkSize, len(o.Outputs), io2.ChunkedSplitOption{ + outputs := io2.ChunkedSplit(input.Stream, o.ChunkSize, len(o.Outputs), io2.ChunkedSplitOption{ PaddingZeros: o.PaddingZeros, }) @@ -40,11 +40,12 @@ func (o *ChunkedSplit) Execute(ctx *exec.ExecContext, e *exec.Executor) error { for i := range outputs { sem.Acquire(ctx.Context, 1) - o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { - sem.Release(1) + e.PutVar(o.Outputs[i], &exec.StreamValue{ + Stream: io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { + sem.Release(1) + }), }) } - exec.PutArrayVars(e, o.Outputs) return sem.Acquire(ctx.Context, int64(len(outputs))) } @@ -54,38 +55,39 @@ func (o *ChunkedSplit) String() string { "ChunkedSplit(chunkSize=%v, paddingZeros=%v), %v -> (%v)", o.ChunkSize, o.PaddingZeros, - o.Input.ID, + o.Input, utils.FormatVarIDs(o.Outputs), ) } type ChunkedJoin struct { - Inputs []*exec.StreamVar `json:"inputs"` - Output *exec.StreamVar `json:"output"` - ChunkSize int `json:"chunkSize"` + Inputs []exec.VarID `json:"inputs"` + Output exec.VarID `json:"output"` + ChunkSize int `json:"chunkSize"` } func (o *ChunkedJoin) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := exec.BindArrayVars(e, ctx.Context, o.Inputs) + inputs, err := exec.BindArray[*exec.StreamValue](e, ctx.Context, o.Inputs) if err != nil { return err } var strReaders []io.Reader - for _, s := range o.Inputs { + for _, s := range inputs { strReaders = append(strReaders, s.Stream) } defer func() { - for _, str := range o.Inputs { + for _, str := range inputs { str.Stream.Close() } }() fut := future.NewSetVoid() - o.Output.Stream = io2.AfterReadClosedOnce(io2.BufferedChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) { - fut.SetVoid() + e.PutVar(o.Output, &exec.StreamValue{ + Stream: io2.AfterReadClosedOnce(io2.BufferedChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) { + fut.SetVoid() + }), }) - e.PutVars(o.Output) return fut.Wait(ctx.Context) } @@ -95,7 +97,7 @@ func (o *ChunkedJoin) String() string { "ChunkedJoin(chunkSize=%v), (%v) -> %v", o.ChunkSize, utils.FormatVarIDs(o.Inputs), - o.Output.ID, + o.Output, ) } @@ -112,16 +114,16 @@ func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode { return node } -func (t *ChunkedSplitNode) Split(input *dag.StreamVar, cnt int) { +func (t *ChunkedSplitNode) Split(input *dag.Var, cnt int) { t.InputStreams().EnsureSize(1) input.Connect(t, 0) t.OutputStreams().Resize(cnt) for i := 0; i < cnt; i++ { - t.OutputStreams().Setup(t, t.Graph().NewStreamVar(), i) + t.OutputStreams().Setup(t, t.Graph().NewVar(), i) } } -func (t *ChunkedSplitNode) SubStream(idx int) *dag.StreamVar { +func (t *ChunkedSplitNode) SubStream(idx int) *dag.Var { return t.OutputStreams().Get(idx) } @@ -145,9 +147,9 @@ func (t *ChunkedSplitNode) Clear() { func (t *ChunkedSplitNode) GenerateOp() (exec.Op, error) { return &ChunkedSplit{ - Input: t.InputStreams().Get(0).Var, - Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { - return v.Var + Input: t.InputStreams().Get(0).VarID, + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { + return v.VarID }), ChunkSize: t.ChunkSize, PaddingZeros: true, @@ -168,16 +170,16 @@ func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode { ChunkSize: chunkSize, } b.AddNode(node) - node.OutputStreams().SetupNew(node, b.Graph.NewStreamVar()) + node.OutputStreams().SetupNew(node, b.Graph.NewVar()) return node } -func (t *ChunkedJoinNode) AddInput(str *dag.StreamVar) { +func (t *ChunkedJoinNode) AddInput(str *dag.Var) { idx := t.InputStreams().EnlargeOne() str.Connect(t, idx) } -func (t *ChunkedJoinNode) Joined() *dag.StreamVar { +func (t *ChunkedJoinNode) Joined() *dag.Var { return t.OutputStreams().Get(0) } @@ -190,10 +192,10 @@ func (t *ChunkedJoinNode) RemoveAllInputs() { func (t *ChunkedJoinNode) GenerateOp() (exec.Op, error) { return &ChunkedJoin{ - Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { - return v.Var + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { + return v.VarID }), - Output: t.OutputStreams().Get(0).Var, + Output: t.OutputStreams().Get(0).VarID, ChunkSize: t.ChunkSize, }, nil } diff --git a/common/pkgs/ioswitch2/ops2/clone.go b/common/pkgs/ioswitch2/ops2/clone.go index e195057..aed4094 100644 --- a/common/pkgs/ioswitch2/ops2/clone.go +++ b/common/pkgs/ioswitch2/ops2/clone.go @@ -18,59 +18,57 @@ func init() { } type CloneStream struct { - Raw *exec.StreamVar `json:"raw"` - Cloneds []*exec.StreamVar `json:"cloneds"` + Raw exec.VarID `json:"raw"` + Cloneds []exec.VarID `json:"cloneds"` } func (o *CloneStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Raw) + raw, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Raw) if err != nil { return err } - defer o.Raw.Stream.Close() + defer raw.Stream.Close() - cloned := io2.Clone(o.Raw.Stream, len(o.Cloneds)) + cloned := io2.Clone(raw.Stream, len(o.Cloneds)) sem := semaphore.NewWeighted(int64(len(o.Cloneds))) for i, s := range cloned { sem.Acquire(ctx.Context, 1) - o.Cloneds[i].Stream = io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { - sem.Release(1) + e.PutVar(o.Cloneds[i], &exec.StreamValue{ + Stream: io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { + sem.Release(1) + }), }) } - exec.PutArrayVars(e, o.Cloneds) return sem.Acquire(ctx.Context, int64(len(o.Cloneds))) } func (o *CloneStream) String() string { - return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.ID, utils.FormatVarIDs(o.Cloneds)) + return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw, utils.FormatVarIDs(o.Cloneds)) } type CloneVar struct { - Raw exec.Var `json:"raw"` - Cloneds []exec.Var `json:"cloneds"` + Raw exec.VarID `json:"raw"` + Cloneds []exec.VarID `json:"cloneds"` } func (o *CloneVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Raw) + raw, err := e.BindVar(ctx.Context, o.Raw) if err != nil { return err } - for _, v := range o.Cloneds { - if err := exec.AssignVar(o.Raw, v); err != nil { - return fmt.Errorf("clone var: %w", err) - } + for i := range o.Cloneds { + e.PutVar(o.Cloneds[i], raw.Clone()) } - e.PutVars(o.Cloneds...) return nil } func (o *CloneVar) String() string { - return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.GetID(), utils.FormatVarIDs(o.Cloneds)) + return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw, utils.FormatVarIDs(o.Cloneds)) } type CloneStreamType struct { @@ -83,22 +81,22 @@ func (b *GraphNodeBuilder) NewCloneStream() *CloneStreamType { return node } -func (t *CloneStreamType) SetInput(raw *dag.StreamVar) { +func (t *CloneStreamType) SetInput(raw *dag.Var) { t.InputStreams().EnsureSize(1) raw.Connect(t, 0) } -func (t *CloneStreamType) NewOutput() *dag.StreamVar { - output := t.Graph().NewStreamVar() +func (t *CloneStreamType) NewOutput() *dag.Var { + output := t.Graph().NewVar() t.OutputStreams().SetupNew(t, output) return output } func (t *CloneStreamType) GenerateOp() (exec.Op, error) { return &CloneStream{ - Raw: t.InputStreams().Get(0).Var, - Cloneds: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { - return v.Var + Raw: t.InputStreams().Get(0).VarID, + Cloneds: lo.Map(t.OutputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { + return v.VarID }), }, nil } @@ -117,22 +115,22 @@ func (b *GraphNodeBuilder) NewCloneValue() *CloneVarType { return node } -func (t *CloneVarType) SetInput(raw *dag.ValueVar) { +func (t *CloneVarType) SetInput(raw *dag.Var) { t.InputValues().EnsureSize(1) raw.Connect(t, 0) } -func (t *CloneVarType) NewOutput() *dag.ValueVar { - output := t.Graph().NewValueVar(t.InputValues().Get(0).Type) +func (t *CloneVarType) NewOutput() *dag.Var { + output := t.Graph().NewVar() t.OutputValues().SetupNew(t, output) return output } func (t *CloneVarType) GenerateOp() (exec.Op, error) { return &CloneVar{ - Raw: t.InputValues().Get(0).Var, - Cloneds: lo.Map(t.OutputValues().RawArray(), func(v *dag.ValueVar, idx int) exec.Var { - return v.Var + Raw: t.InputValues().Get(0).VarID, + Cloneds: lo.Map(t.OutputValues().RawArray(), func(v *dag.Var, idx int) exec.VarID { + return v.VarID }), }, nil } diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index 0cca461..616e370 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -1,7 +1,6 @@ package ops2 import ( - "context" "fmt" "io" @@ -14,7 +13,6 @@ import ( "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/sync2" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" - "golang.org/x/sync/semaphore" ) func init() { @@ -23,116 +21,118 @@ func init() { exec.UseOp[*ECMultiply]() } -type ECReconstructAny struct { - EC cdssdk.ECRedundancy `json:"ec"` - Inputs []*exec.StreamVar `json:"inputs"` - Outputs []*exec.StreamVar `json:"outputs"` - InputBlockIndexes []int `json:"inputBlockIndexes"` - OutputBlockIndexes []int `json:"outputBlockIndexes"` -} - -func (o *ECReconstructAny) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - rs, err := ec.NewStreamRs(o.EC.K, o.EC.N, o.EC.ChunkSize) - if err != nil { - return fmt.Errorf("new ec: %w", err) +/* + type ECReconstructAny struct { + EC cdssdk.ECRedundancy `json:"ec"` + Inputs []exec.VarID `json:"inputs"` + Outputs []exec.VarID `json:"outputs"` + InputBlockIndexes []int `json:"inputBlockIndexes"` + OutputBlockIndexes []int `json:"outputBlockIndexes"` } - err = exec.BindArrayVars(e, ctx.Context, o.Inputs) - if err != nil { - return err - } - defer func() { + func (o *ECReconstructAny) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + rs, err := ec.NewStreamRs(o.EC.K, o.EC.N, o.EC.ChunkSize) + if err != nil { + return fmt.Errorf("new ec: %w", err) + } + + err = exec.BindArrayVars(e, ctx.Context, inputs) + if err != nil { + return err + } + defer func() { + for _, s := range o.Inputs { + s.Stream.Close() + } + }() + + var inputs []io.Reader for _, s := range o.Inputs { - s.Stream.Close() + inputs = append(inputs, s.Stream) } - }() - var inputs []io.Reader - for _, s := range o.Inputs { - inputs = append(inputs, s.Stream) - } + outputs := rs.ReconstructAny(inputs, o.InputBlockIndexes, o.OutputBlockIndexes) - outputs := rs.ReconstructAny(inputs, o.InputBlockIndexes, o.OutputBlockIndexes) + sem := semaphore.NewWeighted(int64(len(o.Outputs))) + for i := range o.Outputs { + sem.Acquire(ctx.Context, 1) - sem := semaphore.NewWeighted(int64(len(o.Outputs))) - for i := range o.Outputs { - sem.Acquire(ctx.Context, 1) + o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { + sem.Release(1) + }) + } + e.PutVar(o.Outputs) - o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { - sem.Release(1) - }) + return sem.Acquire(ctx.Context, int64(len(o.Outputs))) } - exec.PutArrayVars(e, o.Outputs) - return sem.Acquire(ctx.Context, int64(len(o.Outputs))) -} + type ECReconstruct struct { + EC cdssdk.ECRedundancy `json:"ec"` + Inputs []exec.VarID `json:"inputs"` + Outputs []exec.VarID `json:"outputs"` + InputBlockIndexes []int `json:"inputBlockIndexes"` + } -type ECReconstruct struct { - EC cdssdk.ECRedundancy `json:"ec"` - Inputs []*exec.StreamVar `json:"inputs"` - Outputs []*exec.StreamVar `json:"outputs"` - InputBlockIndexes []int `json:"inputBlockIndexes"` -} + func (o *ECReconstruct) Execute(ctx context.Context, e *exec.Executor) error { + rs, err := ec.NewStreamRs(o.EC.K, o.EC.N, o.EC.ChunkSize) + if err != nil { + return fmt.Errorf("new ec: %w", err) + } -func (o *ECReconstruct) Execute(ctx context.Context, e *exec.Executor) error { - rs, err := ec.NewStreamRs(o.EC.K, o.EC.N, o.EC.ChunkSize) - if err != nil { - return fmt.Errorf("new ec: %w", err) - } + err = exec.BindArrayVars(e, ctx, o.Inputs) + if err != nil { + return err + } + defer func() { + for _, s := range o.Inputs { + s.Stream.Close() + } + }() - err = exec.BindArrayVars(e, ctx, o.Inputs) - if err != nil { - return err - } - defer func() { + var inputs []io.Reader for _, s := range o.Inputs { - s.Stream.Close() + inputs = append(inputs, s.Stream) } - }() - var inputs []io.Reader - for _, s := range o.Inputs { - inputs = append(inputs, s.Stream) - } + outputs := rs.ReconstructData(inputs, o.InputBlockIndexes) - outputs := rs.ReconstructData(inputs, o.InputBlockIndexes) + sem := semaphore.NewWeighted(int64(len(o.Outputs))) + for i := range o.Outputs { + sem.Acquire(ctx, 1) - sem := semaphore.NewWeighted(int64(len(o.Outputs))) - for i := range o.Outputs { - sem.Acquire(ctx, 1) + o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { + sem.Release(1) + }) + } + e.PutVar(o.Outputs) - o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { - sem.Release(1) - }) + return sem.Acquire(ctx, int64(len(o.Outputs))) } - exec.PutArrayVars(e, o.Outputs) - - return sem.Acquire(ctx, int64(len(o.Outputs))) -} - +*/ type ECMultiply struct { - Coef [][]byte `json:"coef"` - Inputs []*exec.StreamVar `json:"inputs"` - Outputs []*exec.StreamVar `json:"outputs"` - ChunkSize int `json:"chunkSize"` + Coef [][]byte `json:"coef"` + Inputs []exec.VarID `json:"inputs"` + Outputs []exec.VarID `json:"outputs"` + ChunkSize int `json:"chunkSize"` } func (o *ECMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := exec.BindArrayVars(e, ctx.Context, o.Inputs) + inputs, err := exec.BindArray[*exec.StreamValue](e, ctx.Context, o.Inputs) if err != nil { return err } defer func() { - for _, s := range o.Inputs { + for _, s := range inputs { s.Stream.Close() } }() outputWrs := make([]*io.PipeWriter, len(o.Outputs)) + outputVars := make([]*exec.StreamValue, len(o.Outputs)) for i := range o.Outputs { rd, wr := io.Pipe() - o.Outputs[i].Stream = rd + outputVars[i].Stream = rd outputWrs[i] = wr } @@ -150,7 +150,7 @@ func (o *ECMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } for { - err := sync2.ParallelDo(o.Inputs, func(s *exec.StreamVar, i int) error { + err := sync2.ParallelDo(inputs, func(s *exec.StreamValue, i int) error { _, err := io.ReadFull(s.Stream, inputChunks[i]) return err }) @@ -179,7 +179,8 @@ func (o *ECMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } }() - exec.PutArrayVars(e, o.Outputs) + exec.PutArray(e, o.Outputs, outputVars) + err = fut.Wait(ctx.Context) if err != nil { for _, wr := range outputWrs { @@ -218,7 +219,7 @@ func (b *GraphNodeBuilder) NewECMultiply(ec cdssdk.ECRedundancy) *ECMultiplyNode return node } -func (t *ECMultiplyNode) AddInput(str *dag.StreamVar, dataIndex int) { +func (t *ECMultiplyNode) AddInput(str *dag.Var, dataIndex int) { t.InputIndexes = append(t.InputIndexes, dataIndex) idx := t.InputStreams().EnlargeOne() str.Connect(t, idx) @@ -232,9 +233,9 @@ func (t *ECMultiplyNode) RemoveAllInputs() { t.InputIndexes = nil } -func (t *ECMultiplyNode) NewOutput(dataIndex int) *dag.StreamVar { +func (t *ECMultiplyNode) NewOutput(dataIndex int) *dag.Var { t.OutputIndexes = append(t.OutputIndexes, dataIndex) - output := t.Graph().NewStreamVar() + output := t.Graph().NewVar() t.OutputStreams().SetupNew(t, output) return output } @@ -251,8 +252,8 @@ func (t *ECMultiplyNode) GenerateOp() (exec.Op, error) { return &ECMultiply{ Coef: coef, - Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), - Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { return v.VarID }), + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { return v.VarID }), ChunkSize: t.EC.ChunkSize, }, nil } diff --git a/common/pkgs/ioswitch2/ops2/file.go b/common/pkgs/ioswitch2/ops2/file.go index 0c8d79c..61a3694 100644 --- a/common/pkgs/ioswitch2/ops2/file.go +++ b/common/pkgs/ioswitch2/ops2/file.go @@ -18,16 +18,16 @@ func init() { } type FileWrite struct { - Input *exec.StreamVar `json:"input"` - FilePath string `json:"filePath"` + Input exec.VarID `json:"input"` + FilePath string `json:"filePath"` } func (o *FileWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Input) + input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) if err != nil { return err } - defer o.Input.Stream.Close() + defer input.Stream.Close() dir := path.Dir(o.FilePath) err = os.MkdirAll(dir, 0777) @@ -41,7 +41,7 @@ func (o *FileWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } defer file.Close() - _, err = io.Copy(file, o.Input.Stream) + _, err = io.Copy(file, input.Stream) if err != nil { return fmt.Errorf("copying data to file: %w", err) } @@ -50,12 +50,12 @@ func (o *FileWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *FileWrite) String() string { - return fmt.Sprintf("FileWrite %v -> %s", o.Input.ID, o.FilePath) + return fmt.Sprintf("FileWrite %v -> %s", o.Input, o.FilePath) } type FileRead struct { - Output *exec.StreamVar `json:"output"` - FilePath string `json:"filePath"` + Output exec.VarID `json:"output"` + FilePath string `json:"filePath"` } func (o *FileRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { @@ -65,17 +65,18 @@ func (o *FileRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } fut := future.NewSetVoid() - o.Output.Stream = io2.AfterReadClosed(file, func(closer io.ReadCloser) { - fut.SetVoid() + e.PutVar(o.Output, &exec.StreamValue{ + Stream: io2.AfterReadClosed(file, func(closer io.ReadCloser) { + fut.SetVoid() + }), }) - e.PutVars(o.Output) fut.Wait(ctx.Context) return nil } func (o *FileRead) String() string { - return fmt.Sprintf("FileRead %s -> %v", o.FilePath, o.Output.ID) + return fmt.Sprintf("FileRead %s -> %v", o.FilePath, o.Output) } type FileReadNode struct { @@ -88,12 +89,12 @@ func (b *GraphNodeBuilder) NewFileRead(filePath string) *FileReadNode { FilePath: filePath, } b.AddNode(node) - node.OutputStreams().SetupNew(node, b.NewStreamVar()) + node.OutputStreams().SetupNew(node, b.NewVar()) return node } -func (t *FileReadNode) Output() dag.StreamSlot { - return dag.StreamSlot{ +func (t *FileReadNode) Output() dag.Slot { + return dag.Slot{ Var: t.OutputStreams().Get(0), Index: 0, } @@ -101,7 +102,7 @@ func (t *FileReadNode) Output() dag.StreamSlot { func (t *FileReadNode) GenerateOp() (exec.Op, error) { return &FileRead{ - Output: t.OutputStreams().Get(0).Var, + Output: t.OutputStreams().Get(0).VarID, FilePath: t.FilePath, }, nil } @@ -123,21 +124,21 @@ func (b *GraphNodeBuilder) NewFileWrite(filePath string) *FileWriteNode { return node } -func (t *FileWriteNode) Input() dag.StreamSlot { - return dag.StreamSlot{ +func (t *FileWriteNode) Input() dag.Slot { + return dag.Slot{ Var: t.InputStreams().Get(0), Index: 0, } } -func (t *FileWriteNode) SetInput(str *dag.StreamVar) { +func (t *FileWriteNode) SetInput(str *dag.Var) { t.InputStreams().EnsureSize(1) str.Connect(t, 0) } func (t *FileWriteNode) GenerateOp() (exec.Op, error) { return &FileWrite{ - Input: t.InputStreams().Get(0).Var, + Input: t.InputStreams().Get(0).VarID, FilePath: t.FilePath, }, nil } diff --git a/common/pkgs/ioswitch2/ops2/join.go b/common/pkgs/ioswitch2/ops2/join.go index 12f960a..1f30dcc 100644 --- a/common/pkgs/ioswitch2/ops2/join.go +++ b/common/pkgs/ioswitch2/ops2/join.go @@ -1,45 +1,51 @@ package ops2 import ( - "context" + "fmt" "io" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" ) func init() { - // OpUnion.AddT((*Join)(nil)) + exec.UseOp[*Join]() } type Join struct { - Inputs []*exec.StreamVar `json:"inputs"` - Output *exec.StreamVar `json:"output"` - Length int64 `json:"length"` + Inputs []exec.VarID `json:"inputs"` + Output exec.VarID `json:"output"` + Length int64 `json:"length"` } -func (o *Join) Execute(ctx context.Context, e *exec.Executor) error { - err := exec.BindArrayVars(e, ctx, o.Inputs) +func (o *Join) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + inputs, err := exec.BindArray[*exec.StreamValue](e, ctx.Context, o.Inputs) if err != nil { return err } var strReaders []io.Reader - for _, s := range o.Inputs { + for _, s := range inputs { strReaders = append(strReaders, s.Stream) } defer func() { - for _, str := range o.Inputs { + for _, str := range inputs { str.Stream.Close() } }() fut := future.NewSetVoid() - o.Output.Stream = io2.AfterReadClosedOnce(io2.Length(io2.Join(strReaders), o.Length), func(closer io.ReadCloser) { - fut.SetVoid() + e.PutVar(o.Output, &exec.StreamValue{ + Stream: io2.AfterReadClosedOnce(io2.Length(io2.Join(strReaders), o.Length), func(closer io.ReadCloser) { + fut.SetVoid() + }), }) - e.PutVars(o.Output) - return fut.Wait(ctx) + return fut.Wait(ctx.Context) +} + +func (o *Join) String() string { + return fmt.Sprintf("Join %v->%v", utils.FormatVarIDs(o.Inputs), o.Output) } diff --git a/common/pkgs/ioswitch2/ops2/length.go b/common/pkgs/ioswitch2/ops2/length.go index 92212b4..6ec5403 100644 --- a/common/pkgs/ioswitch2/ops2/length.go +++ b/common/pkgs/ioswitch2/ops2/length.go @@ -1,7 +1,7 @@ package ops2 import ( - "context" + "fmt" "io" "gitlink.org.cn/cloudream/common/pkgs/future" @@ -14,23 +14,28 @@ func init() { } type Length struct { - Input *exec.StreamVar `json:"input"` - Output *exec.StreamVar `json:"output"` - Length int64 `json:"length"` + Input exec.VarID `json:"input"` + Output exec.VarID `json:"output"` + Length int64 `json:"length"` } -func (o *Length) Execute(ctx context.Context, e *exec.Executor) error { - err := e.BindVars(ctx, o.Input) +func (o *Length) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + str, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) if err != nil { return err } - defer o.Input.Stream.Close() + defer str.Stream.Close() fut := future.NewSetVoid() - o.Output.Stream = io2.AfterReadClosedOnce(io2.Length(o.Input.Stream, o.Length), func(closer io.ReadCloser) { - fut.SetVoid() + e.PutVar(o.Output, &exec.StreamValue{ + Stream: io2.AfterReadClosedOnce(io2.Length(str.Stream, o.Length), func(closer io.ReadCloser) { + fut.SetVoid() + }), }) - e.PutVars(o.Output) - return fut.Wait(ctx) + return fut.Wait(ctx.Context) +} + +func (o *Length) String() string { + return fmt.Sprintf("Length(length=%v) %v->%v", o.Length, o.Input, o.Output) } diff --git a/common/pkgs/ioswitch2/ops2/ops.go b/common/pkgs/ioswitch2/ops2/ops.go index c12ce53..0a4afc6 100644 --- a/common/pkgs/ioswitch2/ops2/ops.go +++ b/common/pkgs/ioswitch2/ops2/ops.go @@ -15,13 +15,13 @@ func NewGraphNodeBuilder() *GraphNodeBuilder { type FromNode interface { dag.Node - Output() dag.StreamSlot + Output() dag.Slot } type ToNode interface { dag.Node - Input() dag.StreamSlot - SetInput(input *dag.StreamVar) + Input() dag.Slot + SetInput(input *dag.Var) } // func formatStreamIO(node *dag.Node) string { diff --git a/common/pkgs/ioswitch2/ops2/range.go b/common/pkgs/ioswitch2/ops2/range.go index 90e6f5c..a8379a3 100644 --- a/common/pkgs/ioswitch2/ops2/range.go +++ b/common/pkgs/ioswitch2/ops2/range.go @@ -16,25 +16,25 @@ func init() { } type Range struct { - Input *exec.StreamVar `json:"input"` - Output *exec.StreamVar `json:"output"` - Offset int64 `json:"offset"` - Length *int64 `json:"length"` + Input exec.VarID `json:"input"` + Output exec.VarID `json:"output"` + Offset int64 `json:"offset"` + Length *int64 `json:"length"` } func (o *Range) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Input) + input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) if err != nil { return err } - defer o.Input.Stream.Close() + defer input.Stream.Close() buf := make([]byte, 1024*16) // 跳过前Offset个字节 for o.Offset > 0 { rdCnt := math2.Min(o.Offset, int64(len(buf))) - rd, err := o.Input.Stream.Read(buf[:rdCnt]) + rd, err := input.Stream.Read(buf[:rdCnt]) if err == io.EOF { // 输入流不够长度也不报错,只是产生一个空的流 break @@ -48,30 +48,31 @@ func (o *Range) Execute(ctx *exec.ExecContext, e *exec.Executor) error { fut := future.NewSetVoid() if o.Length == nil { - o.Output.Stream = io2.AfterEOF(o.Input.Stream, func(closer io.ReadCloser, err error) { - fut.SetVoid() - }) - e.PutVars(o.Output) + e.PutVar(o.Output, &exec.StreamValue{ + Stream: io2.AfterEOF(input.Stream, func(closer io.ReadCloser, err error) { + fut.SetVoid() + }), + }) return fut.Wait(ctx.Context) } - o.Output.Stream = io2.AfterEOF(io2.Length(o.Input.Stream, *o.Length), func(closer io.ReadCloser, err error) { - fut.SetVoid() + e.PutVar(o.Output, &exec.StreamValue{ + Stream: io2.AfterEOF(io2.Length(input.Stream, *o.Length), func(closer io.ReadCloser, err error) { + fut.SetVoid() + }), }) - - e.PutVars(o.Output) err = fut.Wait(ctx.Context) if err != nil { return err } - io2.DropWithBuf(o.Input.Stream, buf) + io2.DropWithBuf(input.Stream, buf) return nil } func (o *Range) String() string { - return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input.ID, o.Output.ID) + return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input, o.Output) } type RangeNode struct { @@ -85,19 +86,19 @@ func (b *GraphNodeBuilder) NewRange() *RangeNode { return node } -func (t *RangeNode) RangeStream(input *dag.StreamVar, rng exec.Range) *dag.StreamVar { +func (t *RangeNode) RangeStream(input *dag.Var, rng exec.Range) *dag.Var { t.InputStreams().EnsureSize(1) input.Connect(t, 0) t.Range = rng - output := t.Graph().NewStreamVar() + output := t.Graph().NewVar() t.OutputStreams().Setup(t, output, 0) return output } func (t *RangeNode) GenerateOp() (exec.Op, error) { return &Range{ - Input: t.InputStreams().Get(0).Var, - Output: t.OutputStreams().Get(0).Var, + Input: t.InputStreams().Get(0).VarID, + Output: t.OutputStreams().Get(0).VarID, Offset: t.Range.Offset, Length: t.Range.Length, }, nil diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index 15cb69f..cc813fa 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -17,10 +17,19 @@ import ( func init() { exec.UseOp[*ShardRead]() exec.UseOp[*ShardWrite]() + exec.UseVarValue[*FileHashValue]() +} + +type FileHashValue struct { + Hash types.FileHash `json:"hash"` +} + +func (v *FileHashValue) Clone() exec.VarValue { + return &FileHashValue{Hash: v.Hash} } type ShardRead struct { - Output *exec.StreamVar `json:"output"` + Output exec.VarID `json:"output"` StorageID cdssdk.StorageID `json:"storageID"` Open types.OpenOption `json:"option"` } @@ -47,28 +56,29 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } fut := future.NewSetVoid() - o.Output.Stream = io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) { - fut.SetVoid() + e.PutVar(o.Output, &exec.StreamValue{ + Stream: io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) { + fut.SetVoid() + }), }) - e.PutVars(o.Output) return fut.Wait(ctx.Context) } func (o *ShardRead) String() string { - return fmt.Sprintf("ShardRead %v -> %v", o.Open, o.Output.ID) + return fmt.Sprintf("ShardRead %v -> %v", o.Open, o.Output) } type ShardWrite struct { - Input *exec.StreamVar `json:"input"` - FileHash *exec.StringVar `json:"fileHash"` + Input exec.VarID `json:"input"` + FileHash exec.VarID `json:"fileHash"` StorageID cdssdk.StorageID `json:"storageID"` } func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { logger. - WithField("Input", o.Input.ID). - WithField("FileHashVar", o.FileHash.ID). + WithField("Input", o.Input). + WithField("FileHash", o.FileHash). Debugf("writting file to shard store") defer logger.Debugf("write to shard store finished") @@ -82,16 +92,16 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { return fmt.Errorf("getting shard store %v: %w", o.StorageID, err) } - err = e.BindVars(ctx.Context, o.Input) + input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) if err != nil { return err } - defer o.Input.Stream.Close() + defer input.Stream.Close() writer := store.New() defer writer.Abort() - _, err = io.Copy(writer, o.Input.Stream) + _, err = io.Copy(writer, input.Stream) if err != nil { return fmt.Errorf("writing file to shard store: %w", err) } @@ -101,14 +111,14 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { return fmt.Errorf("finishing writing file to shard store: %w", err) } - o.FileHash.Value = string(fileInfo.Hash) - - e.PutVars(o.FileHash) + e.PutVar(o.FileHash, &FileHashValue{ + Hash: fileInfo.Hash, + }) return nil } func (o *ShardWrite) String() string { - return fmt.Sprintf("ShardWrite %v -> %v", o.Input.ID, o.FileHash.ID) + return fmt.Sprintf("ShardWrite %v -> %v", o.Input, o.FileHash) } type ShardReadNode struct { @@ -123,12 +133,12 @@ func (b *GraphNodeBuilder) NewShardRead(stgID cdssdk.StorageID, open types.OpenO Open: open, } b.AddNode(node) - node.OutputStreams().SetupNew(node, b.NewStreamVar()) + node.OutputStreams().SetupNew(node, b.NewVar()) return node } -func (t *ShardReadNode) Output() dag.StreamSlot { - return dag.StreamSlot{ +func (t *ShardReadNode) Output() dag.Slot { + return dag.Slot{ Var: t.OutputStreams().Get(0), Index: 0, } @@ -136,7 +146,7 @@ func (t *ShardReadNode) Output() dag.StreamSlot { func (t *ShardReadNode) GenerateOp() (exec.Op, error) { return &ShardRead{ - Output: t.OutputStreams().Get(0).Var, + Output: t.OutputStreams().Get(0).VarID, StorageID: t.StorageID, Open: t.Open, }, nil @@ -159,27 +169,27 @@ func (b *GraphNodeBuilder) NewShardWrite(fileHashStoreKey string) *ShardWriteNod return node } -func (t *ShardWriteNode) SetInput(input *dag.StreamVar) { +func (t *ShardWriteNode) SetInput(input *dag.Var) { t.InputStreams().EnsureSize(1) input.Connect(t, 0) - t.OutputValues().SetupNew(t, t.Graph().NewValueVar(dag.StringValueVar)) + t.OutputValues().SetupNew(t, t.Graph().NewVar()) } -func (t *ShardWriteNode) Input() dag.StreamSlot { - return dag.StreamSlot{ +func (t *ShardWriteNode) Input() dag.Slot { + return dag.Slot{ Var: t.InputStreams().Get(0), Index: 0, } } -func (t *ShardWriteNode) FileHashVar() *dag.ValueVar { +func (t *ShardWriteNode) FileHashVar() *dag.Var { return t.OutputValues().Get(0) } func (t *ShardWriteNode) GenerateOp() (exec.Op, error) { return &ShardWrite{ - Input: t.InputStreams().Get(0).Var, - FileHash: t.OutputValues().Get(0).Var.(*exec.StringVar), + Input: t.InputStreams().Get(0).VarID, + FileHash: t.OutputValues().Get(0).VarID, }, nil } diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index 81f6cdb..f9b150c 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -26,7 +26,7 @@ func NewParser(ec cdssdk.ECRedundancy) *DefaultParser { } type IndexedStream struct { - Stream *dag.StreamVar + Stream *dag.Var DataIndex int } @@ -94,8 +94,8 @@ func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) erro return plan.Generate(ctx.DAG.Graph, blder) } -func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *dag.StreamVar { - var ret *dag.StreamVar +func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *dag.Var { + var ret *dag.Var for _, s := range ctx.IndexedStreams { if s.DataIndex == streamIndex { ret = s.Stream @@ -166,7 +166,7 @@ func (p *DefaultParser) extend(ctx *ParseContext) error { } // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令 - ecInputStrs := make(map[int]*dag.StreamVar) + ecInputStrs := make(map[int]*dag.Var) for _, s := range ctx.IndexedStreams { if s.DataIndex >= 0 && ecInputStrs[s.DataIndex] == nil { ecInputStrs[s.DataIndex] = s.Stream diff --git a/common/pkgs/ioswitchlrc/agent_worker.go b/common/pkgs/ioswitchlrc/agent_worker.go index 122a54a..88e569c 100644 --- a/common/pkgs/ioswitchlrc/agent_worker.go +++ b/common/pkgs/ioswitchlrc/agent_worker.go @@ -47,17 +47,17 @@ type AgentWorkerClient struct { func (c *AgentWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { return c.cli.ExecuteIOPlan(ctx, plan) } -func (c *AgentWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, v *exec.StreamVar, str io.ReadCloser) error { - return c.cli.SendStream(ctx, planID, v.ID, str) +func (c *AgentWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { + return c.cli.SendStream(ctx, planID, id, stream) } -func (c *AgentWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, v exec.Var) error { - return c.cli.SendVar(ctx, planID, v) +func (c *AgentWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, id exec.VarID, value exec.VarValue) error { + return c.cli.SendVar(ctx, planID, id, value) } -func (c *AgentWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, v *exec.StreamVar, signal *exec.SignalVar) (io.ReadCloser, error) { - return c.cli.GetStream(ctx, planID, v.ID, signal) +func (c *AgentWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, streamID exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) { + return c.cli.GetStream(ctx, planID, streamID, signalID, signal) } -func (c *AgentWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, v exec.Var, signal *exec.SignalVar) error { - return c.cli.GetVar(ctx, planID, v, signal) +func (c *AgentWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, varID exec.VarID, signalID exec.VarID, signal exec.VarValue) (exec.VarValue, error) { + return c.cli.GetVar(ctx, planID, varID, signalID, signal) } func (c *AgentWorkerClient) Close() error { stgglb.AgentRPCPool.Release(c.cli) diff --git a/common/pkgs/ioswitchlrc/ops2/chunked.go b/common/pkgs/ioswitchlrc/ops2/chunked.go index ab849b2..dfceafa 100644 --- a/common/pkgs/ioswitchlrc/ops2/chunked.go +++ b/common/pkgs/ioswitchlrc/ops2/chunked.go @@ -19,20 +19,20 @@ func init() { } type ChunkedSplit struct { - Input *exec.StreamVar `json:"input"` - Outputs []*exec.StreamVar `json:"outputs"` - ChunkSize int `json:"chunkSize"` - PaddingZeros bool `json:"paddingZeros"` + Input exec.VarID `json:"input"` + Outputs []exec.VarID `json:"outputs"` + ChunkSize int `json:"chunkSize"` + PaddingZeros bool `json:"paddingZeros"` } func (o *ChunkedSplit) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Input) + input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) if err != nil { return err } - defer o.Input.Stream.Close() + defer input.Stream.Close() - outputs := io2.ChunkedSplit(o.Input.Stream, o.ChunkSize, len(o.Outputs), io2.ChunkedSplitOption{ + outputs := io2.ChunkedSplit(input.Stream, o.ChunkSize, len(o.Outputs), io2.ChunkedSplitOption{ PaddingZeros: o.PaddingZeros, }) @@ -40,11 +40,12 @@ func (o *ChunkedSplit) Execute(ctx *exec.ExecContext, e *exec.Executor) error { for i := range outputs { sem.Acquire(ctx.Context, 1) - o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { - sem.Release(1) + e.PutVar(o.Outputs[i], &exec.StreamValue{ + Stream: io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { + sem.Release(1) + }), }) } - exec.PutArrayVars(e, o.Outputs) return sem.Acquire(ctx.Context, int64(len(outputs))) } @@ -54,38 +55,39 @@ func (o *ChunkedSplit) String() string { "ChunkedSplit(chunkSize=%v, paddingZeros=%v), %v -> (%v)", o.ChunkSize, o.PaddingZeros, - o.Input.ID, + o.Input, utils.FormatVarIDs(o.Outputs), ) } type ChunkedJoin struct { - Inputs []*exec.StreamVar `json:"inputs"` - Output *exec.StreamVar `json:"output"` - ChunkSize int `json:"chunkSize"` + Inputs []exec.VarID `json:"inputs"` + Output exec.VarID `json:"output"` + ChunkSize int `json:"chunkSize"` } func (o *ChunkedJoin) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := exec.BindArrayVars(e, ctx.Context, o.Inputs) + inputs, err := exec.BindArray[*exec.StreamValue](e, ctx.Context, o.Inputs) if err != nil { return err } var strReaders []io.Reader - for _, s := range o.Inputs { + for _, s := range inputs { strReaders = append(strReaders, s.Stream) } defer func() { - for _, str := range o.Inputs { + for _, str := range inputs { str.Stream.Close() } }() fut := future.NewSetVoid() - o.Output.Stream = io2.AfterReadClosedOnce(io2.BufferedChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) { - fut.SetVoid() + e.PutVar(o.Output, &exec.StreamValue{ + Stream: io2.AfterReadClosedOnce(io2.BufferedChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) { + fut.SetVoid() + }), }) - e.PutVars(o.Output) return fut.Wait(ctx.Context) } @@ -95,7 +97,7 @@ func (o *ChunkedJoin) String() string { "ChunkedJoin(chunkSize=%v), (%v) -> %v", o.ChunkSize, utils.FormatVarIDs(o.Inputs), - o.Output.ID, + o.Output, ) } @@ -112,16 +114,16 @@ func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode { return node } -func (t *ChunkedSplitNode) Split(input *dag.StreamVar, cnt int) { +func (t *ChunkedSplitNode) Split(input *dag.Var, cnt int) { t.InputStreams().EnsureSize(1) input.Connect(t, 0) t.OutputStreams().Resize(cnt) for i := 0; i < cnt; i++ { - t.OutputStreams().Setup(t, t.Graph().NewStreamVar(), i) + t.OutputStreams().Setup(t, t.Graph().NewVar(), i) } } -func (t *ChunkedSplitNode) SubStream(idx int) *dag.StreamVar { +func (t *ChunkedSplitNode) SubStream(idx int) *dag.Var { return t.OutputStreams().Get(idx) } @@ -129,11 +131,25 @@ func (t *ChunkedSplitNode) SplitCount() int { return t.OutputStreams().Len() } +func (t *ChunkedSplitNode) Clear() { + if t.InputStreams().Len() == 0 { + return + } + + t.InputStreams().Get(0).Disconnect(t, 0) + t.InputStreams().Resize(0) + + for _, out := range t.OutputStreams().RawArray() { + out.DisconnectAll() + } + t.OutputStreams().Resize(0) +} + func (t *ChunkedSplitNode) GenerateOp() (exec.Op, error) { return &ChunkedSplit{ - Input: t.InputStreams().Get(0).Var, - Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { - return v.Var + Input: t.InputStreams().Get(0).VarID, + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { + return v.VarID }), ChunkSize: t.ChunkSize, PaddingZeros: true, @@ -154,25 +170,32 @@ func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode { ChunkSize: chunkSize, } b.AddNode(node) - node.OutputStreams().SetupNew(node, b.Graph.NewStreamVar()) + node.OutputStreams().SetupNew(node, b.Graph.NewVar()) return node } -func (t *ChunkedJoinNode) AddInput(str *dag.StreamVar) { +func (t *ChunkedJoinNode) AddInput(str *dag.Var) { idx := t.InputStreams().EnlargeOne() str.Connect(t, idx) } -func (t *ChunkedJoinNode) Joined() *dag.StreamVar { +func (t *ChunkedJoinNode) Joined() *dag.Var { return t.OutputStreams().Get(0) } +func (t *ChunkedJoinNode) RemoveAllInputs() { + for i, in := range t.InputStreams().RawArray() { + in.Disconnect(t, i) + } + t.InputStreams().Resize(0) +} + func (t *ChunkedJoinNode) GenerateOp() (exec.Op, error) { return &ChunkedJoin{ - Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { - return v.Var + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { + return v.VarID }), - Output: t.OutputStreams().Get(0).Var, + Output: t.OutputStreams().Get(0).VarID, ChunkSize: t.ChunkSize, }, nil } diff --git a/common/pkgs/ioswitchlrc/ops2/clone.go b/common/pkgs/ioswitchlrc/ops2/clone.go index e195057..aed4094 100644 --- a/common/pkgs/ioswitchlrc/ops2/clone.go +++ b/common/pkgs/ioswitchlrc/ops2/clone.go @@ -18,59 +18,57 @@ func init() { } type CloneStream struct { - Raw *exec.StreamVar `json:"raw"` - Cloneds []*exec.StreamVar `json:"cloneds"` + Raw exec.VarID `json:"raw"` + Cloneds []exec.VarID `json:"cloneds"` } func (o *CloneStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Raw) + raw, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Raw) if err != nil { return err } - defer o.Raw.Stream.Close() + defer raw.Stream.Close() - cloned := io2.Clone(o.Raw.Stream, len(o.Cloneds)) + cloned := io2.Clone(raw.Stream, len(o.Cloneds)) sem := semaphore.NewWeighted(int64(len(o.Cloneds))) for i, s := range cloned { sem.Acquire(ctx.Context, 1) - o.Cloneds[i].Stream = io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { - sem.Release(1) + e.PutVar(o.Cloneds[i], &exec.StreamValue{ + Stream: io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { + sem.Release(1) + }), }) } - exec.PutArrayVars(e, o.Cloneds) return sem.Acquire(ctx.Context, int64(len(o.Cloneds))) } func (o *CloneStream) String() string { - return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.ID, utils.FormatVarIDs(o.Cloneds)) + return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw, utils.FormatVarIDs(o.Cloneds)) } type CloneVar struct { - Raw exec.Var `json:"raw"` - Cloneds []exec.Var `json:"cloneds"` + Raw exec.VarID `json:"raw"` + Cloneds []exec.VarID `json:"cloneds"` } func (o *CloneVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Raw) + raw, err := e.BindVar(ctx.Context, o.Raw) if err != nil { return err } - for _, v := range o.Cloneds { - if err := exec.AssignVar(o.Raw, v); err != nil { - return fmt.Errorf("clone var: %w", err) - } + for i := range o.Cloneds { + e.PutVar(o.Cloneds[i], raw.Clone()) } - e.PutVars(o.Cloneds...) return nil } func (o *CloneVar) String() string { - return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.GetID(), utils.FormatVarIDs(o.Cloneds)) + return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw, utils.FormatVarIDs(o.Cloneds)) } type CloneStreamType struct { @@ -83,22 +81,22 @@ func (b *GraphNodeBuilder) NewCloneStream() *CloneStreamType { return node } -func (t *CloneStreamType) SetInput(raw *dag.StreamVar) { +func (t *CloneStreamType) SetInput(raw *dag.Var) { t.InputStreams().EnsureSize(1) raw.Connect(t, 0) } -func (t *CloneStreamType) NewOutput() *dag.StreamVar { - output := t.Graph().NewStreamVar() +func (t *CloneStreamType) NewOutput() *dag.Var { + output := t.Graph().NewVar() t.OutputStreams().SetupNew(t, output) return output } func (t *CloneStreamType) GenerateOp() (exec.Op, error) { return &CloneStream{ - Raw: t.InputStreams().Get(0).Var, - Cloneds: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { - return v.Var + Raw: t.InputStreams().Get(0).VarID, + Cloneds: lo.Map(t.OutputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { + return v.VarID }), }, nil } @@ -117,22 +115,22 @@ func (b *GraphNodeBuilder) NewCloneValue() *CloneVarType { return node } -func (t *CloneVarType) SetInput(raw *dag.ValueVar) { +func (t *CloneVarType) SetInput(raw *dag.Var) { t.InputValues().EnsureSize(1) raw.Connect(t, 0) } -func (t *CloneVarType) NewOutput() *dag.ValueVar { - output := t.Graph().NewValueVar(t.InputValues().Get(0).Type) +func (t *CloneVarType) NewOutput() *dag.Var { + output := t.Graph().NewVar() t.OutputValues().SetupNew(t, output) return output } func (t *CloneVarType) GenerateOp() (exec.Op, error) { return &CloneVar{ - Raw: t.InputValues().Get(0).Var, - Cloneds: lo.Map(t.OutputValues().RawArray(), func(v *dag.ValueVar, idx int) exec.Var { - return v.Var + Raw: t.InputValues().Get(0).VarID, + Cloneds: lo.Map(t.OutputValues().RawArray(), func(v *dag.Var, idx int) exec.VarID { + return v.VarID }), }, nil } diff --git a/common/pkgs/ioswitchlrc/ops2/ec.go b/common/pkgs/ioswitchlrc/ops2/ec.go index 6ce0d7e..1d6d5e3 100644 --- a/common/pkgs/ioswitchlrc/ops2/ec.go +++ b/common/pkgs/ioswitchlrc/ops2/ec.go @@ -21,28 +21,28 @@ func init() { } type GalMultiply struct { - Coef [][]byte `json:"coef"` - Inputs []*exec.StreamVar `json:"inputs"` - Outputs []*exec.StreamVar `json:"outputs"` - ChunkSize int `json:"chunkSize"` + Coef [][]byte `json:"coef"` + Inputs []exec.VarID `json:"inputs"` + Outputs []exec.VarID `json:"outputs"` + ChunkSize int `json:"chunkSize"` } func (o *GalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := exec.BindArrayVars(e, ctx.Context, o.Inputs) + inputs, err := exec.BindArray[*exec.StreamValue](e, ctx.Context, o.Inputs) if err != nil { return err } defer func() { - for _, s := range o.Inputs { + for _, s := range inputs { s.Stream.Close() } }() outputWrs := make([]*io.PipeWriter, len(o.Outputs)) - + outputVars := make([]*exec.StreamValue, len(o.Outputs)) for i := range o.Outputs { rd, wr := io.Pipe() - o.Outputs[i].Stream = rd + outputVars[i].Stream = rd outputWrs[i] = wr } @@ -60,7 +60,7 @@ func (o *GalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } for { - err := sync2.ParallelDo(o.Inputs, func(s *exec.StreamVar, i int) error { + err := sync2.ParallelDo(inputs, func(s *exec.StreamValue, i int) error { _, err := io.ReadFull(s.Stream, inputChunks[i]) return err }) @@ -89,7 +89,7 @@ func (o *GalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } }() - exec.PutArrayVars(e, o.Outputs) + exec.PutArray(e, o.Outputs, outputVars) err = fut.Wait(ctx.Context) if err != nil { for _, wr := range outputWrs { @@ -128,7 +128,7 @@ func (b *GraphNodeBuilder) NewLRCConstructAny(lrc cdssdk.LRCRedundancy) *LRCCons return node } -func (t *LRCConstructAnyNode) AddInput(str *dag.StreamVar, dataIndex int) { +func (t *LRCConstructAnyNode) AddInput(str *dag.Var, dataIndex int) { t.InputIndexes = append(t.InputIndexes, dataIndex) idx := t.InputStreams().EnlargeOne() str.Connect(t, idx) @@ -142,9 +142,9 @@ func (t *LRCConstructAnyNode) RemoveAllInputs() { t.InputIndexes = nil } -func (t *LRCConstructAnyNode) NewOutput(dataIndex int) *dag.StreamVar { +func (t *LRCConstructAnyNode) NewOutput(dataIndex int) *dag.Var { t.OutputIndexes = append(t.OutputIndexes, dataIndex) - output := t.Graph().NewStreamVar() + output := t.Graph().NewVar() t.OutputStreams().SetupNew(t, output) return output } @@ -161,8 +161,8 @@ func (t *LRCConstructAnyNode) GenerateOp() (exec.Op, error) { return &GalMultiply{ Coef: coef, - Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), - Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { return v.VarID }), + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { return v.VarID }), ChunkSize: t.LRC.ChunkSize, }, nil } @@ -185,7 +185,7 @@ func (b *GraphNodeBuilder) NewLRCConstructGroup(lrc cdssdk.LRCRedundancy) *LRCCo return node } -func (t *LRCConstructGroupNode) SetupForTarget(blockIdx int, inputs []*dag.StreamVar) *dag.StreamVar { +func (t *LRCConstructGroupNode) SetupForTarget(blockIdx int, inputs []*dag.Var) *dag.Var { t.TargetBlockIndex = blockIdx t.InputStreams().Resize(0) @@ -194,7 +194,7 @@ func (t *LRCConstructGroupNode) SetupForTarget(blockIdx int, inputs []*dag.Strea in.Connect(t, idx) } - output := t.Graph().NewStreamVar() + output := t.Graph().NewVar() t.OutputStreams().Setup(t, output, 0) return output } @@ -211,8 +211,8 @@ func (t *LRCConstructGroupNode) GenerateOp() (exec.Op, error) { return &GalMultiply{ Coef: coef, - Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), - Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { return v.VarID }), + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { return v.VarID }), ChunkSize: t.LRC.ChunkSize, }, nil } diff --git a/common/pkgs/ioswitchlrc/ops2/ops.go b/common/pkgs/ioswitchlrc/ops2/ops.go index a41ec08..83f31b8 100644 --- a/common/pkgs/ioswitchlrc/ops2/ops.go +++ b/common/pkgs/ioswitchlrc/ops2/ops.go @@ -15,13 +15,13 @@ func NewGraphNodeBuilder() *GraphNodeBuilder { type FromNode interface { dag.Node - Output() dag.StreamSlot + Output() dag.Slot } type ToNode interface { dag.Node - Input() dag.StreamSlot - SetInput(input *dag.StreamVar) + Input() dag.Slot + SetInput(input *dag.Var) } // func formatStreamIO(node *dag.Node) string { diff --git a/common/pkgs/ioswitchlrc/ops2/range.go b/common/pkgs/ioswitchlrc/ops2/range.go index 90e6f5c..a8379a3 100644 --- a/common/pkgs/ioswitchlrc/ops2/range.go +++ b/common/pkgs/ioswitchlrc/ops2/range.go @@ -16,25 +16,25 @@ func init() { } type Range struct { - Input *exec.StreamVar `json:"input"` - Output *exec.StreamVar `json:"output"` - Offset int64 `json:"offset"` - Length *int64 `json:"length"` + Input exec.VarID `json:"input"` + Output exec.VarID `json:"output"` + Offset int64 `json:"offset"` + Length *int64 `json:"length"` } func (o *Range) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Input) + input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) if err != nil { return err } - defer o.Input.Stream.Close() + defer input.Stream.Close() buf := make([]byte, 1024*16) // 跳过前Offset个字节 for o.Offset > 0 { rdCnt := math2.Min(o.Offset, int64(len(buf))) - rd, err := o.Input.Stream.Read(buf[:rdCnt]) + rd, err := input.Stream.Read(buf[:rdCnt]) if err == io.EOF { // 输入流不够长度也不报错,只是产生一个空的流 break @@ -48,30 +48,31 @@ func (o *Range) Execute(ctx *exec.ExecContext, e *exec.Executor) error { fut := future.NewSetVoid() if o.Length == nil { - o.Output.Stream = io2.AfterEOF(o.Input.Stream, func(closer io.ReadCloser, err error) { - fut.SetVoid() - }) - e.PutVars(o.Output) + e.PutVar(o.Output, &exec.StreamValue{ + Stream: io2.AfterEOF(input.Stream, func(closer io.ReadCloser, err error) { + fut.SetVoid() + }), + }) return fut.Wait(ctx.Context) } - o.Output.Stream = io2.AfterEOF(io2.Length(o.Input.Stream, *o.Length), func(closer io.ReadCloser, err error) { - fut.SetVoid() + e.PutVar(o.Output, &exec.StreamValue{ + Stream: io2.AfterEOF(io2.Length(input.Stream, *o.Length), func(closer io.ReadCloser, err error) { + fut.SetVoid() + }), }) - - e.PutVars(o.Output) err = fut.Wait(ctx.Context) if err != nil { return err } - io2.DropWithBuf(o.Input.Stream, buf) + io2.DropWithBuf(input.Stream, buf) return nil } func (o *Range) String() string { - return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input.ID, o.Output.ID) + return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input, o.Output) } type RangeNode struct { @@ -85,19 +86,19 @@ func (b *GraphNodeBuilder) NewRange() *RangeNode { return node } -func (t *RangeNode) RangeStream(input *dag.StreamVar, rng exec.Range) *dag.StreamVar { +func (t *RangeNode) RangeStream(input *dag.Var, rng exec.Range) *dag.Var { t.InputStreams().EnsureSize(1) input.Connect(t, 0) t.Range = rng - output := t.Graph().NewStreamVar() + output := t.Graph().NewVar() t.OutputStreams().Setup(t, output, 0) return output } func (t *RangeNode) GenerateOp() (exec.Op, error) { return &Range{ - Input: t.InputStreams().Get(0).Var, - Output: t.OutputStreams().Get(0).Var, + Input: t.InputStreams().Get(0).VarID, + Output: t.OutputStreams().Get(0).VarID, Offset: t.Range.Offset, Length: t.Range.Length, }, nil diff --git a/common/pkgs/ioswitchlrc/ops2/shard_store.go b/common/pkgs/ioswitchlrc/ops2/shard_store.go index 15cb69f..cc813fa 100644 --- a/common/pkgs/ioswitchlrc/ops2/shard_store.go +++ b/common/pkgs/ioswitchlrc/ops2/shard_store.go @@ -17,10 +17,19 @@ import ( func init() { exec.UseOp[*ShardRead]() exec.UseOp[*ShardWrite]() + exec.UseVarValue[*FileHashValue]() +} + +type FileHashValue struct { + Hash types.FileHash `json:"hash"` +} + +func (v *FileHashValue) Clone() exec.VarValue { + return &FileHashValue{Hash: v.Hash} } type ShardRead struct { - Output *exec.StreamVar `json:"output"` + Output exec.VarID `json:"output"` StorageID cdssdk.StorageID `json:"storageID"` Open types.OpenOption `json:"option"` } @@ -47,28 +56,29 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } fut := future.NewSetVoid() - o.Output.Stream = io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) { - fut.SetVoid() + e.PutVar(o.Output, &exec.StreamValue{ + Stream: io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) { + fut.SetVoid() + }), }) - e.PutVars(o.Output) return fut.Wait(ctx.Context) } func (o *ShardRead) String() string { - return fmt.Sprintf("ShardRead %v -> %v", o.Open, o.Output.ID) + return fmt.Sprintf("ShardRead %v -> %v", o.Open, o.Output) } type ShardWrite struct { - Input *exec.StreamVar `json:"input"` - FileHash *exec.StringVar `json:"fileHash"` + Input exec.VarID `json:"input"` + FileHash exec.VarID `json:"fileHash"` StorageID cdssdk.StorageID `json:"storageID"` } func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { logger. - WithField("Input", o.Input.ID). - WithField("FileHashVar", o.FileHash.ID). + WithField("Input", o.Input). + WithField("FileHash", o.FileHash). Debugf("writting file to shard store") defer logger.Debugf("write to shard store finished") @@ -82,16 +92,16 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { return fmt.Errorf("getting shard store %v: %w", o.StorageID, err) } - err = e.BindVars(ctx.Context, o.Input) + input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) if err != nil { return err } - defer o.Input.Stream.Close() + defer input.Stream.Close() writer := store.New() defer writer.Abort() - _, err = io.Copy(writer, o.Input.Stream) + _, err = io.Copy(writer, input.Stream) if err != nil { return fmt.Errorf("writing file to shard store: %w", err) } @@ -101,14 +111,14 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { return fmt.Errorf("finishing writing file to shard store: %w", err) } - o.FileHash.Value = string(fileInfo.Hash) - - e.PutVars(o.FileHash) + e.PutVar(o.FileHash, &FileHashValue{ + Hash: fileInfo.Hash, + }) return nil } func (o *ShardWrite) String() string { - return fmt.Sprintf("ShardWrite %v -> %v", o.Input.ID, o.FileHash.ID) + return fmt.Sprintf("ShardWrite %v -> %v", o.Input, o.FileHash) } type ShardReadNode struct { @@ -123,12 +133,12 @@ func (b *GraphNodeBuilder) NewShardRead(stgID cdssdk.StorageID, open types.OpenO Open: open, } b.AddNode(node) - node.OutputStreams().SetupNew(node, b.NewStreamVar()) + node.OutputStreams().SetupNew(node, b.NewVar()) return node } -func (t *ShardReadNode) Output() dag.StreamSlot { - return dag.StreamSlot{ +func (t *ShardReadNode) Output() dag.Slot { + return dag.Slot{ Var: t.OutputStreams().Get(0), Index: 0, } @@ -136,7 +146,7 @@ func (t *ShardReadNode) Output() dag.StreamSlot { func (t *ShardReadNode) GenerateOp() (exec.Op, error) { return &ShardRead{ - Output: t.OutputStreams().Get(0).Var, + Output: t.OutputStreams().Get(0).VarID, StorageID: t.StorageID, Open: t.Open, }, nil @@ -159,27 +169,27 @@ func (b *GraphNodeBuilder) NewShardWrite(fileHashStoreKey string) *ShardWriteNod return node } -func (t *ShardWriteNode) SetInput(input *dag.StreamVar) { +func (t *ShardWriteNode) SetInput(input *dag.Var) { t.InputStreams().EnsureSize(1) input.Connect(t, 0) - t.OutputValues().SetupNew(t, t.Graph().NewValueVar(dag.StringValueVar)) + t.OutputValues().SetupNew(t, t.Graph().NewVar()) } -func (t *ShardWriteNode) Input() dag.StreamSlot { - return dag.StreamSlot{ +func (t *ShardWriteNode) Input() dag.Slot { + return dag.Slot{ Var: t.InputStreams().Get(0), Index: 0, } } -func (t *ShardWriteNode) FileHashVar() *dag.ValueVar { +func (t *ShardWriteNode) FileHashVar() *dag.Var { return t.OutputValues().Get(0) } func (t *ShardWriteNode) GenerateOp() (exec.Op, error) { return &ShardWrite{ - Input: t.InputStreams().Get(0).Var, - FileHash: t.OutputValues().Get(0).Var.(*exec.StringVar), + Input: t.InputStreams().Get(0).VarID, + FileHash: t.OutputValues().Get(0).VarID, }, nil } diff --git a/common/pkgs/ioswitchlrc/parser/generator.go b/common/pkgs/ioswitchlrc/parser/generator.go index 33593a3..0ce31ce 100644 --- a/common/pkgs/ioswitchlrc/parser/generator.go +++ b/common/pkgs/ioswitchlrc/parser/generator.go @@ -271,7 +271,7 @@ func ReconstructGroup(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec func buildDAGReconstructGroup(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error { - var inputs []*dag.StreamVar + var inputs []*dag.Var for _, fr := range frs { frNode, err := buildFromNode(ctx, fr) if err != nil {