From 8fed4c9e74ab03446761fb17f708ab22a64cf5c0 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 10 Jan 2025 10:17:31 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/ioswitch/dag/node.go | 4 ++++ pkgs/ioswitch/dag/var.go | 5 ----- pkgs/ioswitch/plan/ops/store.go | 39 +++++++++++++++++++++++++++++++++ sdks/storage/storage_feature.go | 13 +++++++---- 4 files changed, 52 insertions(+), 9 deletions(-) diff --git a/pkgs/ioswitch/dag/node.go b/pkgs/ioswitch/dag/node.go index 60f8ea9..e8f0f1d 100644 --- a/pkgs/ioswitch/dag/node.go +++ b/pkgs/ioswitch/dag/node.go @@ -475,6 +475,10 @@ func (s ValueOutputSlot) Var() *ValueVar { return s.Node.OutputValues().Get(s.Index) } +func (s ValueOutputSlot) ToSlot(slot ValueInputSlot) { + s.Var().To(slot.Node, slot.Index) +} + type ValueInputSlot struct { Node Node Index int diff --git a/pkgs/ioswitch/dag/var.go b/pkgs/ioswitch/dag/var.go index 1a2f49c..27168a3 100644 --- a/pkgs/ioswitch/dag/var.go +++ b/pkgs/ioswitch/dag/var.go @@ -64,11 +64,6 @@ func (v *ValueVar) To(to Node, slotIdx int) { to.InputValues().Slots.Set(slotIdx, v) } -func (v *ValueVar) ToSlot(slot ValueInputSlot) { - v.Dst.Add(slot.Node) - slot.Node.InputValues().Slots.Set(slot.Index, v) -} - func (v *ValueVar) NotTo(node Node) { v.Dst.Remove(node) node.InputValues().Slots.Clear(v) diff --git a/pkgs/ioswitch/plan/ops/store.go b/pkgs/ioswitch/plan/ops/store.go index 977af42..4353dc7 100644 --- a/pkgs/ioswitch/plan/ops/store.go +++ b/pkgs/ioswitch/plan/ops/store.go @@ -26,6 +26,20 @@ func (o *Store) String() string { return fmt.Sprintf("Store %v: %v", o.Key, o.Var) } +type StoreConst struct { + Key string + Value exec.VarValue +} + +func (o *StoreConst) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + e.Store(o.Key, o.Value) + return nil +} + +func (o *StoreConst) String() string { + return fmt.Sprintf("StoreConst %v: %v", o.Key, o.Value) +} + type StoreNode struct { dag.NodeBase Key string @@ -53,3 +67,28 @@ func (t *StoreNode) GenerateOp() (exec.Op, error) { // func (t *StoreType) String() string { // return fmt.Sprintf("Store[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node)) // } + +type StoreConstNode struct { + dag.NodeBase + Key string + Value exec.VarValue +} + +func (b *GraphNodeBuilder) NewStoreConst(key string, value exec.VarValue) *StoreConstNode { + node := &StoreConstNode{ + Key: key, + Value: value, + } + b.AddNode(node) + return node +} + +func (t *StoreConstNode) GenerateOp() (exec.Op, error) { + return &StoreConst{ + Key: t.Key, + Value: t.Value, + }, nil +} + +// func (t *StoreConstType) String() string { +// return fmt.Sprintf("StoreConst[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node)) diff --git a/sdks/storage/storage_feature.go b/sdks/storage/storage_feature.go index 2c6e1d8..5cf85b7 100644 --- a/sdks/storage/storage_feature.go +++ b/sdks/storage/storage_feature.go @@ -80,11 +80,16 @@ func (f *InternalServerlessCallFeature) String() string { } // 存储服务之间直传文件 -type S2SFeature struct { - serder.Metadata `union:"S2S"` +type S2STransferFeature struct { + serder.Metadata `union:"S2STransfer"` Type string `json:"type"` + TempDir string `json:"tempDir"` // 临时文件存放目录 } -func (f *S2SFeature) GetFeatureType() string { - return "S2S" +func (f *S2STransferFeature) GetFeatureType() string { + return "S2STransfer" +} + +func (f *S2STransferFeature) String() string { + return "S2STransfer" }