diff --git a/pkgs/ioswitch/dag/var.go b/pkgs/ioswitch/dag/var.go index af74643..b8928cd 100644 --- a/pkgs/ioswitch/dag/var.go +++ b/pkgs/ioswitch/dag/var.go @@ -64,17 +64,34 @@ func (v *Var) To() *EndPointSlots { return &v.to } -func (v *Var) Connect(to Node, slotIdx int) { +func (v *Var) ValueTo(to Node, slotIdx int) { v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx}) to.InputValues().Set(slotIdx, v) } -func (v *Var) Disconnect(node Node, slotIdx int) { +func (v *Var) ValueNotTo(node Node, slotIdx int) { v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx}) node.InputValues().Set(slotIdx, nil) } -func (v *Var) DisconnectAll() { +func (v *Var) StreamTo(to Node, slotIdx int) { + v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx}) + to.InputStreams().Set(slotIdx, v) +} + +func (v *Var) StreamNotTo(node Node, slotIdx int) { + v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx}) + node.InputStreams().Set(slotIdx, nil) +} + +func (v *Var) NoInputAllValue() { + for _, ed := range v.to { + ed.Node.InputValues().Set(ed.SlotIndex, nil) + } + v.to = nil +} + +func (v *Var) NoInputAllStream() { for _, ed := range v.to { ed.Node.InputStreams().Set(ed.SlotIndex, nil) } diff --git a/pkgs/ioswitch/plan/generate.go b/pkgs/ioswitch/plan/generate.go index db32a84..6d155d9 100644 --- a/pkgs/ioswitch/plan/generate.go +++ b/pkgs/ioswitch/plan/generate.go @@ -54,7 +54,7 @@ func generateSend(graph *ops.GraphNodeBuilder) { // 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令 getNode.Get(holdType.HoldStream(out)). // 将Get指令的输出送到目的地 - Connect(to.Node, to.SlotIndex) + StreamTo(to.Node, to.SlotIndex) case dag.EnvWorker: // 如果是要送到Agent,则可以直接发送 @@ -62,7 +62,7 @@ func generateSend(graph *ops.GraphNodeBuilder) { *n.Env() = *node.Env() out.To().RemoveAt(0) - n.Send(out).Connect(to.Node, to.SlotIndex) + n.Send(out).StreamTo(to.Node, to.SlotIndex) } } @@ -96,7 +96,7 @@ func generateSend(graph *ops.GraphNodeBuilder) { // 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令 getNode.Get(holdNode.HoldVar(out)). // 将Get指令的输出送到目的地 - Connect(to.Node, to.SlotIndex) + ValueTo(to.Node, to.SlotIndex) case dag.EnvWorker: // 如果是要送到Agent,则可以直接发送 @@ -105,7 +105,7 @@ func generateSend(graph *ops.GraphNodeBuilder) { out.To().RemoveAt(0) - t.Send(out).Connect(to.Node, to.SlotIndex) + t.Send(out).ValueTo(to.Node, to.SlotIndex) } } diff --git a/pkgs/ioswitch/plan/ops/driver.go b/pkgs/ioswitch/plan/ops/driver.go index 192e665..392b88a 100644 --- a/pkgs/ioswitch/plan/ops/driver.go +++ b/pkgs/ioswitch/plan/ops/driver.go @@ -54,7 +54,7 @@ func (b *GraphNodeBuilder) NewToDriver(handle *exec.DriverReadStream) *ToDriverN func (t *ToDriverNode) SetInput(v *dag.Var) { t.InputStreams().EnsureSize(1) - v.Connect(t, 0) + v.StreamTo(t, 0) } func (t *ToDriverNode) Input() dag.Slot { diff --git a/pkgs/ioswitch/plan/ops/drop.go b/pkgs/ioswitch/plan/ops/drop.go index dcfdc78..f03283a 100644 --- a/pkgs/ioswitch/plan/ops/drop.go +++ b/pkgs/ioswitch/plan/ops/drop.go @@ -51,7 +51,7 @@ func (b *GraphNodeBuilder) NewDropStream() *DropNode { func (t *DropNode) SetInput(v *dag.Var) { t.InputStreams().EnsureSize(1) - v.Connect(t, 0) + v.StreamTo(t, 0) } func (t *DropNode) GenerateOp() (exec.Op, error) { diff --git a/pkgs/ioswitch/plan/ops/send.go b/pkgs/ioswitch/plan/ops/send.go index 482630d..625f701 100644 --- a/pkgs/ioswitch/plan/ops/send.go +++ b/pkgs/ioswitch/plan/ops/send.go @@ -155,7 +155,7 @@ func (b *GraphNodeBuilder) NewSendStream(to exec.WorkerInfo) *SendStreamNode { func (t *SendStreamNode) Send(v *dag.Var) *dag.Var { t.InputStreams().EnsureSize(1) - v.Connect(t, 0) + v.StreamTo(t, 0) output := t.Graph().NewVar() t.OutputStreams().Setup(t, output, 0) return output @@ -188,7 +188,7 @@ func (b *GraphNodeBuilder) NewSendValue(to exec.WorkerInfo) *SendValueNode { func (t *SendValueNode) Send(v *dag.Var) *dag.Var { t.InputValues().EnsureSize(1) - v.Connect(t, 0) + v.ValueTo(t, 0) output := t.Graph().NewVar() t.OutputValues().Setup(t, output, 0) return output @@ -222,7 +222,7 @@ func (b *GraphNodeBuilder) NewGetStream(from exec.WorkerInfo) *GetStreamNode { func (t *GetStreamNode) Get(v *dag.Var) *dag.Var { t.InputStreams().EnsureSize(1) - v.Connect(t, 0) + v.StreamTo(t, 0) output := t.Graph().NewVar() t.OutputStreams().Setup(t, output, 0) return output @@ -261,7 +261,7 @@ func (b *GraphNodeBuilder) NewGetValue(from exec.WorkerInfo) *GetValueNode { func (t *GetValueNode) Get(v *dag.Var) *dag.Var { t.InputValues().EnsureSize(1) - v.Connect(t, 0) + v.ValueTo(t, 0) output := t.Graph().NewVar() t.OutputValues().Setup(t, output, 1) return output diff --git a/pkgs/ioswitch/plan/ops/store.go b/pkgs/ioswitch/plan/ops/store.go index f51bbe4..f2a7f74 100644 --- a/pkgs/ioswitch/plan/ops/store.go +++ b/pkgs/ioswitch/plan/ops/store.go @@ -40,7 +40,7 @@ func (b *GraphNodeBuilder) NewStore() *StoreNode { func (t *StoreNode) Store(key string, v *dag.Var) { t.Key = key t.InputValues().EnsureSize(1) - v.Connect(t, 0) + v.ValueTo(t, 0) } func (t *StoreNode) GenerateOp() (exec.Op, error) { diff --git a/pkgs/ioswitch/plan/ops/sync.go b/pkgs/ioswitch/plan/ops/sync.go index d9ac791..b4437f1 100644 --- a/pkgs/ioswitch/plan/ops/sync.go +++ b/pkgs/ioswitch/plan/ops/sync.go @@ -170,18 +170,18 @@ func (b *GraphNodeBuilder) NewHoldUntil() *HoldUntilNode { func (t *HoldUntilNode) SetSignal(s *dag.Var) { t.InputValues().EnsureSize(1) - s.Connect(t, 0) + s.ValueTo(t, 0) } func (t *HoldUntilNode) HoldStream(str *dag.Var) *dag.Var { - str.Connect(t, t.InputStreams().EnlargeOne()) + str.StreamTo(t, t.InputStreams().EnlargeOne()) output := t.Graph().NewVar() t.OutputStreams().SetupNew(t, output) return output } func (t *HoldUntilNode) HoldVar(v *dag.Var) *dag.Var { - v.Connect(t, t.InputValues().EnlargeOne()) + v.ValueTo(t, t.InputValues().EnlargeOne()) output := t.Graph().NewVar() t.OutputValues().SetupNew(t, output) return output diff --git a/sdks/storage/shard_storage.go b/sdks/storage/shard_storage.go index c5e6670..1c5dab0 100644 --- a/sdks/storage/shard_storage.go +++ b/sdks/storage/shard_storage.go @@ -20,8 +20,6 @@ var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[ShardSt type ShardStorage struct { StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey; type:bigint"` - // Shard存储空间在存储服务的目录 - Root string `json:"root" gorm:"column:Root; type:varchar(1024); not null"` // ShardStore配置数据 Config ShardStoreConfig `json:"config" gorm:"column:Config; type:json; not null; serializer:union"` } @@ -32,6 +30,7 @@ func (ShardStorage) TableName() string { type LocalShardStorage struct { serder.Metadata `union:"Local"` + Type string `json:"type"` Root string `json:"root"` MaxSize int64 `json:"maxSize"` } diff --git a/sdks/storage/storage.go b/sdks/storage/storage.go index da0180e..0477b4f 100644 --- a/sdks/storage/storage.go +++ b/sdks/storage/storage.go @@ -20,6 +20,7 @@ var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[Storage type LocalStorageAddress struct { serder.Metadata `union:"Local"` + Type string `json:"type"` } func (a *LocalStorageAddress) GetType() string { diff --git a/sdks/storage/storage_feature.go b/sdks/storage/storage_feature.go index b5787df..a401500 100644 --- a/sdks/storage/storage_feature.go +++ b/sdks/storage/storage_feature.go @@ -20,6 +20,7 @@ var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[Storage // 存储服务支持被非MasterHub直接上传文件 type BypassUploadFeature struct { serder.Metadata `union:"BypassUpload"` + Type string `json:"type"` // 存放上传文件的临时目录 TempRoot string `json:"tempRoot"` } @@ -35,6 +36,7 @@ func (f *BypassUploadFeature) String() string { // 存储服务支持分段上传 type MultipartUploadFeature struct { serder.Metadata `union:"MultipartUpload"` + Type string `json:"type"` } func (f *MultipartUploadFeature) GetType() string {