From c87233acb0e15f181edaee184c772e3df5838ab8 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 10 Jul 2025 10:50:36 +0800 Subject: [PATCH] =?UTF-8?q?ioswitch=E6=A8=A1=E5=9D=97=E4=BB=8Ecommon?= =?UTF-8?q?=E4=BB=93=E5=BA=93=E7=A7=BB=E5=8A=A8=E5=88=B0jcs-pub=E4=BB=93?= =?UTF-8?q?=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 8 +- README_en.md | 8 +- client/internal/cmdline/test.go | 2 +- client/internal/downloader/iterator.go | 2 +- .../internal/downloader/lrc_strip_iterator.go | 2 +- client/internal/downloader/strip_iterator.go | 2 +- client/internal/services/object.go | 2 +- client/internal/services/user_space.go | 2 +- client/internal/spacesyncer/execute_diff.go | 2 +- client/internal/spacesyncer/execute_full.go | 2 +- .../internal/ticktock/redundancy_recover.go | 2 +- client/internal/ticktock/redundancy_shrink.go | 2 +- client/internal/uploader/create_load.go | 2 +- client/internal/uploader/update.go | 2 +- client/internal/uploader/uploader.go | 2 +- client/internal/uploader/user_space_upload.go | 2 +- common/pkgs/ioswitch/dag/graph.go | 170 ++++++ common/pkgs/ioswitch/dag/node.go | 517 ++++++++++++++++++ common/pkgs/ioswitch/dag/var.go | 122 +++++ common/pkgs/ioswitch/exec/config.go | 46 ++ common/pkgs/ioswitch/exec/context.go | 56 ++ common/pkgs/ioswitch/exec/driver.go | 118 ++++ common/pkgs/ioswitch/exec/exec.go | 25 + common/pkgs/ioswitch/exec/executor.go | 181 ++++++ common/pkgs/ioswitch/exec/plan_builder.go | 122 +++++ common/pkgs/ioswitch/exec/utils.go | 9 + common/pkgs/ioswitch/exec/var.go | 81 +++ common/pkgs/ioswitch/exec/worker.go | 106 ++++ common/pkgs/ioswitch/plan/compile.go | 199 +++++++ common/pkgs/ioswitch/plan/ops/driver.go | 75 +++ common/pkgs/ioswitch/plan/ops/drop.go | 66 +++ common/pkgs/ioswitch/plan/ops/ops.go | 13 + common/pkgs/ioswitch/plan/ops/send.go | 284 ++++++++++ common/pkgs/ioswitch/plan/ops/store.go | 94 ++++ common/pkgs/ioswitch/plan/ops/sync.go | 206 +++++++ common/pkgs/ioswitch/plan/ops/utils.go | 75 +++ common/pkgs/ioswitch/plan/ops/var.go | 52 ++ common/pkgs/ioswitch/utils/utils.go | 19 + common/pkgs/ioswitch2/fromto.go | 2 +- common/pkgs/ioswitch2/http_hub_worker.go | 22 +- common/pkgs/ioswitch2/hub_worker.go | 2 +- common/pkgs/ioswitch2/ops2/base_store.go | 4 +- common/pkgs/ioswitch2/ops2/bypass.go | 4 +- common/pkgs/ioswitch2/ops2/chunked.go | 6 +- common/pkgs/ioswitch2/ops2/clone.go | 6 +- common/pkgs/ioswitch2/ops2/driver.go | 4 +- common/pkgs/ioswitch2/ops2/ec.go | 6 +- common/pkgs/ioswitch2/ops2/join.go | 4 +- common/pkgs/ioswitch2/ops2/length.go | 2 +- common/pkgs/ioswitch2/ops2/multipart.go | 4 +- common/pkgs/ioswitch2/ops2/ops.go | 6 +- common/pkgs/ioswitch2/ops2/range.go | 4 +- common/pkgs/ioswitch2/ops2/s2s.go | 4 +- common/pkgs/ioswitch2/ops2/segment.go | 6 +- common/pkgs/ioswitch2/ops2/shard_store.go | 4 +- common/pkgs/ioswitch2/parser/gen/generator.go | 2 +- common/pkgs/ioswitch2/parser/opt/chunked.go | 2 +- common/pkgs/ioswitch2/parser/opt/ec.go | 2 +- common/pkgs/ioswitch2/parser/opt/misc.go | 2 +- common/pkgs/ioswitch2/parser/opt/multipart.go | 2 +- common/pkgs/ioswitch2/parser/opt/pin.go | 2 +- common/pkgs/ioswitch2/parser/opt/s2s.go | 2 +- common/pkgs/ioswitch2/parser/opt/segment.go | 2 +- common/pkgs/ioswitch2/parser/opt/utils.go | 2 +- common/pkgs/ioswitch2/parser/parser.go | 4 +- common/pkgs/ioswitch2/parser/state/state.go | 2 +- .../ioswitch2/plans/complete_multipart.go | 4 +- common/pkgs/ioswitch2/plans/utils.go | 2 +- common/pkgs/ioswitchlrc/fromto.go | 2 +- common/pkgs/ioswitchlrc/hub_worker.go | 2 +- common/pkgs/ioswitchlrc/ops2/base_store.go | 4 +- common/pkgs/ioswitchlrc/ops2/chunked.go | 6 +- common/pkgs/ioswitchlrc/ops2/clone.go | 6 +- common/pkgs/ioswitchlrc/ops2/ec.go | 6 +- common/pkgs/ioswitchlrc/ops2/ops.go | 6 +- common/pkgs/ioswitchlrc/ops2/range.go | 4 +- common/pkgs/ioswitchlrc/ops2/shard_store.go | 4 +- common/pkgs/ioswitchlrc/parser/generator.go | 6 +- common/pkgs/ioswitchlrc/parser/passes.go | 2 +- common/pkgs/ioswitchlrc/parser/utils.go | 2 +- common/pkgs/rpc/hub/ioswitch.go | 2 +- hub/internal/cmd/serve.go | 2 +- hub/internal/http/hub_io.go | 2 +- hub/internal/http/service.go | 2 +- hub/internal/rpc/ioswitch.go | 2 +- hub/internal/rpc/rpc.go | 2 +- hub/sdk/api/hub_io.go | 2 +- 87 files changed, 2750 insertions(+), 114 deletions(-) create mode 100644 common/pkgs/ioswitch/dag/graph.go create mode 100644 common/pkgs/ioswitch/dag/node.go create mode 100644 common/pkgs/ioswitch/dag/var.go create mode 100644 common/pkgs/ioswitch/exec/config.go create mode 100644 common/pkgs/ioswitch/exec/context.go create mode 100644 common/pkgs/ioswitch/exec/driver.go create mode 100644 common/pkgs/ioswitch/exec/exec.go create mode 100644 common/pkgs/ioswitch/exec/executor.go create mode 100644 common/pkgs/ioswitch/exec/plan_builder.go create mode 100644 common/pkgs/ioswitch/exec/utils.go create mode 100644 common/pkgs/ioswitch/exec/var.go create mode 100644 common/pkgs/ioswitch/exec/worker.go create mode 100644 common/pkgs/ioswitch/plan/compile.go create mode 100644 common/pkgs/ioswitch/plan/ops/driver.go create mode 100644 common/pkgs/ioswitch/plan/ops/drop.go create mode 100644 common/pkgs/ioswitch/plan/ops/ops.go create mode 100644 common/pkgs/ioswitch/plan/ops/send.go create mode 100644 common/pkgs/ioswitch/plan/ops/store.go create mode 100644 common/pkgs/ioswitch/plan/ops/sync.go create mode 100644 common/pkgs/ioswitch/plan/ops/utils.go create mode 100644 common/pkgs/ioswitch/plan/ops/var.go create mode 100644 common/pkgs/ioswitch/utils/utils.go diff --git a/README.md b/README.md index f399708..52db42f 100644 --- a/README.md +++ b/README.md @@ -281,8 +281,8 @@ type Op interface { - `String()`:用于调试时打印指令的内容。 - `Execute(ctx *ExecContext, e *Executor)error`:指令的执行逻辑,该函数包含两个参数: - - `ctx`:执行指令的上下文,其中的Context字段用于支持中断指令操作,Values则包含执行计划时提供的额外参数,可以通过`gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec`包中的GetValueByType、SetValueByType等函数修改。 - - `e`:执行器,可以通过BindVar和PutVar函数来访问内部的变量表,实现数据在不同指令之间的传递。也可以使用`gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec`中提供的泛型版本的BindVar、BindArray等函数。 + - `ctx`:执行指令的上下文,其中的Context字段用于支持中断指令操作,Values则包含执行计划时提供的额外参数,可以通过`gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec`包中的GetValueByType、SetValueByType等函数修改。 + - `e`:执行器,可以通过BindVar和PutVar函数来访问内部的变量表,实现数据在不同指令之间的传递。也可以使用`gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec`中提供的泛型版本的BindVar、BindArray等函数。 当Execute返回的error不为nil时,整个计划会被视为执行失败,所有正在执行的指令都会被中断。 @@ -306,7 +306,7 @@ type VarValue interface { 当指令的Execute函数返回了nil后,这个指令就执行结束了,而当某个节点上的指令都执行结束,则这个节点上的计划也会结束,因此如果你的指令会产生流,那么建议使用WaitGroup等方式,在产生的流被读取结束后再结束Execute函数。 -最后,当你定义好了指令和数据后,记得使用`gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec`中的UseOp和UseVarValue函数来将它们注册到模块内。 +最后,当你定义好了指令和数据后,记得使用`gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec`中的UseOp和UseVarValue函数来将它们注册到模块内。 ### 2、编写指令对应DAG节点 @@ -331,7 +331,7 @@ type Node interface { - Env代表这个指令将在哪个环境里执行,比如在Driver(发起计划的服务,即JCS客户端)、Hub(代理节点)、Any。大多数指令的Env设置成Any即可,除非你确定这个指令必须在哪个环境执行。 - 最后的GenerateOp函数是计划DAG优化结束后生成指令时被调用的。 -推荐嵌入`gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag`包中的NodeBase结构体,它实现了除GenerateOp以外的其他函数。 +推荐嵌入`gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag`包中的NodeBase结构体,它实现了除GenerateOp以外的其他函数。 ### 3、使用指令 diff --git a/README_en.md b/README_en.md index 5a96ff9..4b36619 100644 --- a/README_en.md +++ b/README_en.md @@ -294,8 +294,8 @@ type Op interface { - `String()`: Used for debugging, returns a description of the instruction. - `Execute(ctx *ExecContext, e *Executor)error`: The core execution logic. - - `ctx`: Execution context, which includes a cancelable context(`Context`) and custom values(`Values`) that can be accessed using functions like `GetValueByType` and `SetValueByType` from the `gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec` package. - - `e`: The executor, which provides access to a variable table through functions like `BindVar`, `PutVar`, or generic variants such as `BindVar` and `BindArray` from the `gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec` package. + - `ctx`: Execution context, which includes a cancelable context(`Context`) and custom values(`Values`) that can be accessed using functions like `GetValueByType` and `SetValueByType` from the `gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec` package. + - `e`: The executor, which provides access to a variable table through functions like `BindVar`, `PutVar`, or generic variants such as `BindVar` and `BindArray` from the `gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec` package. If `Execute` returns a non-nil error, the entire plan is considered failed and all ongoing instructions will be aborted. @@ -320,7 +320,7 @@ If you want to pass a stream (`io.ReadCloser`) between instructions, use the bui If an instruction produces a stream, consider using `WaitGroup` or similar mechanisms to ensure the stream has been fully consumed before `Execute` returns. -Finally, register your new instruction and data types using `UseOp` and `UseVarValue` from the `gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec` package. +Finally, register your new instruction and data types using `UseOp` and `UseVarValue` from the `gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec` package. ### 2. Writing DAG Nodes for Instructions @@ -345,7 +345,7 @@ Key concepts: - Each input slot accepts data from one node; output slots can fan out to multiple nodes. - `Env()` specifies the environment where the instruction should run (e.g., Driver, Hub, Any). Most instructions can use Any. -You may embed the `NodeBase` struct from the `gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag` package, which implements all functions except `GenerateOp`. +You may embed the `NodeBase` struct from the `gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag` package, which implements all functions except `GenerateOp`. ### 3. Using Custom Instructions diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 07b1d94..f6966fd 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/spf13/cobra" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/jcs-pub/client/internal/accessstat" "gitlink.org.cn/cloudream/jcs-pub/client/internal/accesstoken" @@ -23,6 +22,7 @@ import ( stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock" diff --git a/client/internal/downloader/iterator.go b/client/internal/downloader/iterator.go index 11eeeab..18a5d9b 100644 --- a/client/internal/downloader/iterator.go +++ b/client/internal/downloader/iterator.go @@ -6,8 +6,8 @@ import ( "io" "reflect" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" diff --git a/client/internal/downloader/lrc_strip_iterator.go b/client/internal/downloader/lrc_strip_iterator.go index bff86d2..1d067d5 100644 --- a/client/internal/downloader/lrc_strip_iterator.go +++ b/client/internal/downloader/lrc_strip_iterator.go @@ -5,11 +5,11 @@ import ( "io" "sync" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/math2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc/parser" ) diff --git a/client/internal/downloader/strip_iterator.go b/client/internal/downloader/strip_iterator.go index 58e938a..7a57b6b 100644 --- a/client/internal/downloader/strip_iterator.go +++ b/client/internal/downloader/strip_iterator.go @@ -5,11 +5,11 @@ import ( "io" "sync" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" ) diff --git a/client/internal/services/object.go b/client/internal/services/object.go index e4b307b..e4d13da 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -7,7 +7,6 @@ import ( "time" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" @@ -15,6 +14,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/plans" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/reqbuilder" diff --git a/client/internal/services/user_space.go b/client/internal/services/user_space.go index dc3352c..037c2c7 100644 --- a/client/internal/services/user_space.go +++ b/client/internal/services/user_space.go @@ -6,9 +6,9 @@ import ( "time" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gorm.io/gorm" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" diff --git a/client/internal/spacesyncer/execute_diff.go b/client/internal/spacesyncer/execute_diff.go index ceb4b16..8373043 100644 --- a/client/internal/spacesyncer/execute_diff.go +++ b/client/internal/spacesyncer/execute_diff.go @@ -5,11 +5,11 @@ import ( "io" "time" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/trie" "gitlink.org.cn/cloudream/common/utils/math2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" diff --git a/client/internal/spacesyncer/execute_full.go b/client/internal/spacesyncer/execute_full.go index f80eb67..ba6da3f 100644 --- a/client/internal/spacesyncer/execute_full.go +++ b/client/internal/spacesyncer/execute_full.go @@ -6,10 +6,10 @@ import ( "io" "time" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/trie" "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" diff --git a/client/internal/ticktock/redundancy_recover.go b/client/internal/ticktock/redundancy_recover.go index 14348ec..f98fbc5 100644 --- a/client/internal/ticktock/redundancy_recover.go +++ b/client/internal/ticktock/redundancy_recover.go @@ -6,13 +6,13 @@ import ( "strconv" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" diff --git a/client/internal/ticktock/redundancy_shrink.go b/client/internal/ticktock/redundancy_shrink.go index b780615..b33139f 100644 --- a/client/internal/ticktock/redundancy_shrink.go +++ b/client/internal/ticktock/redundancy_shrink.go @@ -9,7 +9,6 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/bitmap" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" @@ -18,6 +17,7 @@ import ( clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/consts" "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" diff --git a/client/internal/uploader/create_load.go b/client/internal/uploader/create_load.go index 4f749a8..c4973ff 100644 --- a/client/internal/uploader/create_load.go +++ b/client/internal/uploader/create_load.go @@ -7,9 +7,9 @@ import ( "sync" "time" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" diff --git a/client/internal/uploader/update.go b/client/internal/uploader/update.go index 25f7117..d0b2627 100644 --- a/client/internal/uploader/update.go +++ b/client/internal/uploader/update.go @@ -8,9 +8,9 @@ import ( "time" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" diff --git a/client/internal/uploader/uploader.go b/client/internal/uploader/uploader.go index 0bfd7ba..19676b3 100644 --- a/client/internal/uploader/uploader.go +++ b/client/internal/uploader/uploader.go @@ -9,7 +9,6 @@ import ( "time" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" @@ -17,6 +16,7 @@ import ( clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" diff --git a/client/internal/uploader/user_space_upload.go b/client/internal/uploader/user_space_upload.go index d181124..3cdb7b5 100644 --- a/client/internal/uploader/user_space_upload.go +++ b/client/internal/uploader/user_space_upload.go @@ -8,10 +8,10 @@ import ( "time" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" diff --git a/common/pkgs/ioswitch/dag/graph.go b/common/pkgs/ioswitch/dag/graph.go new file mode 100644 index 0000000..99ed675 --- /dev/null +++ b/common/pkgs/ioswitch/dag/graph.go @@ -0,0 +1,170 @@ +package dag + +import ( + "fmt" + "reflect" + "strings" + + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/common/utils/reflect2" +) + +type Graph struct { + Nodes []Node + isWalking bool +} + +func NewGraph() *Graph { + return &Graph{} +} + +func (g *Graph) AddNode(node Node) { + g.Nodes = append(g.Nodes, node) + node.SetGraph(g) +} + +func (g *Graph) RemoveNode(node Node) { + for i, n := range g.Nodes { + if n == node { + if g.isWalking { + g.Nodes[i] = nil + } else { + g.Nodes = lo2.RemoveAt(g.Nodes, i) + } + break + } + } +} + +func (g *Graph) Walk(cb func(node Node) bool) { + g.isWalking = true + for i := 0; i < len(g.Nodes); i++ { + if g.Nodes[i] == nil { + continue + } + + if !cb(g.Nodes[i]) { + break + } + } + g.isWalking = false + + g.Nodes = lo2.RemoveAllDefault(g.Nodes) +} + +func (g *Graph) NewStreamVar() *StreamVar { + return &StreamVar{} +} + +func (g *Graph) NewValueVar() *ValueVar { + return &ValueVar{} +} + +func (g *Graph) Dump() string { + nodeIDs := make(map[Node]int) + for i, node := range g.Nodes { + nodeIDs[node] = i + } + + var sb strings.Builder + for _, node := range g.Nodes { + id, ok := nodeIDs[node] + if !ok { + id = len(nodeIDs) + nodeIDs[node] = id + } + sb.WriteString(fmt.Sprintf("[%v]%v\n", id, nodeTypeName(node))) + if node.InputStreams().Len() > 0 { + sb.WriteString("SIn: ") + for i, in := range node.InputStreams().Slots { + if i > 0 { + sb.WriteString(", ") + } + + if in == nil { + sb.WriteString("?") + } else { + sb.WriteString(fmt.Sprintf("%v", nodeIDs[in.Src])) + } + } + sb.WriteString("\n") + } + if node.OutputStreams().Len() > 0 { + sb.WriteString("SOut: ") + for i, out := range node.OutputStreams().Slots { + if i > 0 { + sb.WriteString(", ") + } + + sb.WriteString("(") + for i2, dst := range out.Dst { + if i2 > 0 { + sb.WriteString(", ") + } + sb.WriteString(fmt.Sprintf("%v", nodeIDs[dst])) + } + sb.WriteString(")") + } + sb.WriteString("\n") + } + + if node.InputValues().Len() > 0 { + sb.WriteString("VIn: ") + for i, in := range node.InputValues().Slots { + if i > 0 { + sb.WriteString(", ") + } + + if in == nil { + sb.WriteString("?") + } else { + sb.WriteString(fmt.Sprintf("%v", nodeIDs[in.Src])) + } + } + sb.WriteString("\n") + } + if node.OutputValues().Len() > 0 { + sb.WriteString("VOut: ") + for i, out := range node.OutputValues().Slots { + if i > 0 { + sb.WriteString(", ") + } + + sb.WriteString("(") + for i2, dst := range out.Dst { + if i2 > 0 { + sb.WriteString(", ") + } + sb.WriteString(fmt.Sprintf("%v", nodeIDs[dst])) + } + sb.WriteString(")") + } + sb.WriteString("\n") + } + + } + return sb.String() +} + +func nodeTypeName(node Node) string { + typ := reflect2.TypeOfValue(node) + for typ.Kind() == reflect.Ptr { + typ = typ.Elem() + } + return typ.Name() +} + +func AddNode[N Node](graph *Graph, typ N) N { + graph.AddNode(typ) + return typ +} + +func WalkOnlyType[N Node](g *Graph, cb func(node N) bool) { + g.Walk(func(n Node) bool { + node, ok := n.(N) + if ok { + return cb(node) + } + return true + }) +} diff --git a/common/pkgs/ioswitch/dag/node.go b/common/pkgs/ioswitch/dag/node.go new file mode 100644 index 0000000..352b3c8 --- /dev/null +++ b/common/pkgs/ioswitch/dag/node.go @@ -0,0 +1,517 @@ +package dag + +import ( + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" +) + +type NodeEnvType string + +const ( + EnvAny NodeEnvType = "" + EnvDriver NodeEnvType = "Driver" + EnvWorker NodeEnvType = "Worker" +) + +type NodeEnv struct { + Type NodeEnvType + Worker exec.WorkerInfo + Pinned bool // 如果为true,则不应该改变这个节点的执行环境 +} + +func (e *NodeEnv) ToEnvUnknown(pinned bool) { + e.Type = EnvAny + e.Worker = nil + e.Pinned = pinned +} + +func (e *NodeEnv) ToEnvDriver(pinned bool) { + e.Type = EnvDriver + e.Worker = nil + e.Pinned = pinned +} + +func (e *NodeEnv) ToEnvWorker(worker exec.WorkerInfo, pinned bool) { + e.Type = EnvWorker + e.Worker = worker + e.Pinned = pinned +} + +func (e *NodeEnv) CopyFrom(other *NodeEnv) { + e.Type = other.Type + e.Worker = other.Worker + e.Pinned = other.Pinned +} + +func (e *NodeEnv) Equals(other *NodeEnv) bool { + if e.Type != other.Type { + return false + } + + if e.Type != EnvWorker { + return true + } + + return e.Worker.Equals(other.Worker) +} + +type Node interface { + Graph() *Graph + SetGraph(graph *Graph) + Env() *NodeEnv + InputStreams() *StreamInputSlots + OutputStreams() *StreamOutputSlots + InputValues() *ValueInputSlots + OutputValues() *ValueOutputSlots + GenerateOp() (exec.Op, error) + // String() string +} + +type VarSlots[T any] []*T + +func (s *VarSlots[T]) Len() int { + return len(*s) +} + +func (s *VarSlots[T]) Get(idx int) *T { + return (*s)[idx] +} + +func (s *VarSlots[T]) Set(idx int, val *T) *T { + old := (*s)[idx] + (*s)[idx] = val + return old +} + +func (s *VarSlots[T]) IndexOf(v *T) int { + return lo.IndexOf(*s, v) +} + +func (s *VarSlots[T]) Append(val *T) int { + *s = append(*s, val) + return s.Len() - 1 +} + +func (s *VarSlots[T]) Clear(val *T) { + for i := 0; i < s.Len(); i++ { + if (*s)[i] == val { + (*s)[i] = nil + } + } +} + +func (s *VarSlots[T]) RemoveAt(idx int) { + *s = lo2.RemoveAt(*s, idx) +} + +func (s *VarSlots[T]) RemoveRange(start int, cnt int) { + *s = lo2.RemoveRange(*s, start, cnt) +} + +func (s *VarSlots[T]) Resize(size int) { + if s.Len() < size { + *s = append(*s, make([]*T, size-s.Len())...) + } else if s.Len() > size { + *s = (*s)[:size] + } +} + +func (s *VarSlots[T]) SetRawArray(arr []*T) { + *s = arr +} + +func (s *VarSlots[T]) RawArray() []*T { + return *s +} + +type StreamInputSlots struct { + Slots VarSlots[StreamVar] +} + +func (s *StreamInputSlots) Len() int { + return s.Slots.Len() +} + +func (s *StreamInputSlots) Get(idx int) *StreamVar { + return s.Slots.Get(idx) +} + +func (s *StreamInputSlots) IndexOf(v *StreamVar) int { + return s.Slots.IndexOf(v) +} + +// 初始化输入流槽。调用者应该保证没有正在使用的槽位(即Slots的每一个元素都为nil) +func (s *StreamInputSlots) Init(cnt int) { + s.Slots.Resize(cnt) +} + +func (s *StreamInputSlots) EnlargeOne() int { + s.Slots.Append(nil) + return s.Len() - 1 +} + +func (s *StreamInputSlots) ClearInputAt(my Node, idx int) { + v := s.Get(idx) + if v == nil { + return + } + s.Slots.Set(idx, nil) + + v.Dst.Remove(my) +} + +func (s *StreamInputSlots) ClearAllInput(my Node) { + for i := 0; i < s.Len(); i++ { + v := s.Get(i) + if v == nil { + continue + } + s.Slots.Set(i, nil) + + v.Dst.Remove(my) + } +} + +func (s *StreamInputSlots) GetVarIDs() []exec.VarID { + var ids []exec.VarID + for _, v := range s.Slots.RawArray() { + if v == nil { + continue + } + ids = append(ids, v.VarID) + } + + return ids +} + +func (s *StreamInputSlots) GetVarIDsRanged(start, end int) []exec.VarID { + var ids []exec.VarID + for i := start; i < end; i++ { + v := s.Get(i) + if v == nil { + continue + } + ids = append(ids, v.VarID) + } + + return ids +} + +type ValueInputSlots struct { + Slots VarSlots[ValueVar] +} + +func (s *ValueInputSlots) Len() int { + return s.Slots.Len() +} + +func (s *ValueInputSlots) Get(idx int) *ValueVar { + return s.Slots.Get(idx) +} + +func (s *ValueInputSlots) IndexOf(v *ValueVar) int { + return s.Slots.IndexOf(v) +} + +// 初始化输入流槽。调用者应该保证没有正在使用的槽位(即Slots的每一个元素都为nil) +func (s *ValueInputSlots) Init(cnt int) { + if s.Len() < cnt { + s.Slots = append(s.Slots, make([]*ValueVar, cnt-s.Len())...) + } +} + +func (s *ValueInputSlots) EnlargeOne() int { + s.Slots.Append(nil) + return s.Len() - 1 +} + +func (s *ValueInputSlots) ClearInputAt(my Node, idx int) { + v := s.Get(idx) + if v == nil { + return + } + s.Slots.Set(idx, nil) + + v.Dst.Remove(my) +} + +func (s *ValueInputSlots) GetVarIDs() []exec.VarID { + var ids []exec.VarID + for _, v := range s.Slots.RawArray() { + if v == nil { + continue + } + ids = append(ids, v.VarID) + } + + return ids +} + +func (s *ValueInputSlots) GetVarIDsStart(start int) []exec.VarID { + return s.GetVarIDsRanged(start, s.Len()) +} + +func (s *ValueInputSlots) GetVarIDsRanged(start, end int) []exec.VarID { + var ids []exec.VarID + for i := start; i < end; i++ { + v := s.Get(i) + if v == nil { + continue + } + ids = append(ids, v.VarID) + } + + return ids +} + +type StreamOutputSlots struct { + Slots VarSlots[StreamVar] +} + +func (s *StreamOutputSlots) Len() int { + return s.Slots.Len() +} + +func (s *StreamOutputSlots) Get(idx int) *StreamVar { + return s.Slots.Get(idx) +} + +func (s *StreamOutputSlots) IndexOf(v *StreamVar) int { + return s.Slots.IndexOf(v) +} + +// 设置Slots大小,并为每个Slot创建一个StreamVar。 +// 调用者应该保证没有正在使用的输出流,即每一个输出流的Dst都为空。 +func (s *StreamOutputSlots) Init(my Node, size int) { + s.Slots.Resize(size) + for i := 0; i < size; i++ { + v := my.Graph().NewStreamVar() + v.Src = my + s.Slots.Set(i, v) + } +} + +// 在Slots末尾增加一个StreamVar,并返回它的索引 +func (s *StreamOutputSlots) AppendNew(my Node) StreamOutputSlot { + v := my.Graph().NewStreamVar() + v.Src = my + s.Slots.Append(v) + return StreamOutputSlot{Node: my, Index: s.Len() - 1} +} + +// 断开指定位置的输出流到指定节点的连接 +func (s *StreamOutputSlots) ClearOutputAt(idx int, dst Node) { + v := s.Get(idx) + v.Dst.Remove(dst) + dst.InputStreams().Slots.Clear(v) +} + +// 断开所有输出流的所有连接,完全清空所有输出流。但会保留流变量 +func (s *StreamOutputSlots) ClearAllOutput(my Node) { + for i := 0; i < s.Len(); i++ { + v := s.Get(i) + v.ClearAllDst() + } +} + +func (s *StreamOutputSlots) GetVarIDs() []exec.VarID { + var ids []exec.VarID + for _, v := range s.Slots.RawArray() { + if v == nil { + continue + } + ids = append(ids, v.VarID) + } + + return ids +} + +func (s *StreamOutputSlots) GetVarIDsRanged(start, end int) []exec.VarID { + var ids []exec.VarID + for i := start; i < end; i++ { + v := s.Get(i) + if v == nil { + continue + } + ids = append(ids, v.VarID) + } + + return ids +} + +type ValueOutputSlots struct { + Slots VarSlots[ValueVar] +} + +func (s *ValueOutputSlots) Len() int { + return s.Slots.Len() +} + +func (s *ValueOutputSlots) Get(idx int) *ValueVar { + return s.Slots.Get(idx) +} + +func (s *ValueOutputSlots) IndexOf(v *ValueVar) int { + return s.Slots.IndexOf(v) +} + +// 设置Slots大小,并为每个Slot创建一个StreamVar +// 调用者应该保证没有正在使用的输出流,即每一个输出流的Dst都为空。 +func (s *ValueOutputSlots) Init(my Node, size int) { + s.Slots.Resize(size) + for i := 0; i < size; i++ { + v := my.Graph().NewValueVar() + v.Src = my + s.Slots.Set(i, v) + } +} + +// 在Slots末尾增加一个StreamVar,并返回它的索引 +func (s *ValueOutputSlots) AppendNew(my Node) ValueOutputSlot { + v := my.Graph().NewValueVar() + v.Src = my + s.Slots.Append(v) + return ValueOutputSlot{Node: my, Index: s.Len() - 1} +} + +// 断开指定位置的输出流到指定节点的连接 +func (s *ValueOutputSlots) ClearOutputAt(idx int, dst Node) { + v := s.Get(idx) + v.Dst.Remove(dst) + dst.InputValues().Slots.Clear(v) +} + +// 断开所有输出流的所有连接,完全清空所有输出流。但会保留流变量 +func (s *ValueOutputSlots) ClearAllOutput(my Node) { + for i := 0; i < s.Len(); i++ { + v := s.Get(i) + v.ClearAllDst() + } +} + +func (s *ValueOutputSlots) GetVarIDs() []exec.VarID { + var ids []exec.VarID + for _, v := range s.Slots.RawArray() { + if v == nil { + continue + } + ids = append(ids, v.VarID) + } + + return ids +} + +func (s *ValueOutputSlots) GetVarIDsRanged(start, end int) []exec.VarID { + var ids []exec.VarID + for i := start; i < end; i++ { + v := s.Get(i) + if v == nil { + continue + } + ids = append(ids, v.VarID) + } + + return ids +} + +type NodeBase struct { + env NodeEnv + inputStreams StreamInputSlots + outputStreams StreamOutputSlots + inputValues ValueInputSlots + outputValues ValueOutputSlots + graph *Graph +} + +func (n *NodeBase) Graph() *Graph { + return n.graph +} + +func (n *NodeBase) SetGraph(graph *Graph) { + n.graph = graph +} + +func (n *NodeBase) Env() *NodeEnv { + return &n.env +} + +func (n *NodeBase) InputStreams() *StreamInputSlots { + return &n.inputStreams +} + +func (n *NodeBase) OutputStreams() *StreamOutputSlots { + return &n.outputStreams +} + +func (n *NodeBase) InputValues() *ValueInputSlots { + return &n.inputValues +} + +func (n *NodeBase) OutputValues() *ValueOutputSlots { + return &n.outputValues +} + +type StreamOutputSlot struct { + Node Node + Index int +} + +func (s StreamOutputSlot) Var() *StreamVar { + return s.Node.OutputStreams().Get(s.Index) +} + +func (s StreamOutputSlot) ToSlot(slot StreamInputSlot) { + s.Var().To(slot.Node, slot.Index) +} + +// 查询所有输出的连接的输入槽位 +func (s StreamOutputSlot) ListDstSlots() []StreamInputSlot { + slots := make([]StreamInputSlot, s.Var().Dst.Len()) + myVar := s.Var() + for i, dst := range s.Var().Dst { + slots[i] = StreamInputSlot{Node: dst, Index: dst.InputStreams().IndexOf(myVar)} + } + return slots +} + +type StreamInputSlot struct { + Node Node + Index int +} + +func (s StreamInputSlot) Var() *StreamVar { + return s.Node.InputStreams().Get(s.Index) +} + +type ValueOutputSlot struct { + Node Node + Index int +} + +func (s ValueOutputSlot) Var() *ValueVar { + return s.Node.OutputValues().Get(s.Index) +} + +func (s ValueOutputSlot) ToSlot(slot ValueInputSlot) { + s.Var().To(slot.Node, slot.Index) +} + +// 查询所有输出的连接的输入槽位 +func (s ValueOutputSlot) ListDstSlots() []ValueInputSlot { + slots := make([]ValueInputSlot, s.Var().Dst.Len()) + myVar := s.Var() + for i, dst := range s.Var().Dst { + slots[i] = ValueInputSlot{Node: dst, Index: dst.InputValues().IndexOf(myVar)} + } + return slots +} + +type ValueInputSlot struct { + Node Node + Index int +} + +func (s ValueInputSlot) Var() *ValueVar { + return s.Node.InputValues().Get(s.Index) +} diff --git a/common/pkgs/ioswitch/dag/var.go b/common/pkgs/ioswitch/dag/var.go new file mode 100644 index 0000000..2783e85 --- /dev/null +++ b/common/pkgs/ioswitch/dag/var.go @@ -0,0 +1,122 @@ +package dag + +import ( + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" +) + +type Var interface { + GetVarID() exec.VarID +} + +type StreamVar struct { + VarID exec.VarID + Src Node + Dst DstList +} + +func (v *StreamVar) GetVarID() exec.VarID { + return v.VarID +} + +func (v *StreamVar) IndexAtSrc() int { + return v.Src.OutputStreams().IndexOf(v) +} + +func (v *StreamVar) To(to Node, slotIdx int) { + v.Dst.Add(to) + to.InputStreams().Slots.Set(slotIdx, v) +} + +func (v *StreamVar) ToSlot(slot StreamInputSlot) { + v.Dst.Add(slot.Node) + slot.Node.InputStreams().Slots.Set(slot.Index, v) +} + +func (v *StreamVar) NotTo(node Node) { + v.Dst.Remove(node) + node.InputStreams().Slots.Clear(v) +} + +func (v *StreamVar) ClearAllDst() { + for _, n := range v.Dst { + n.InputStreams().Slots.Clear(v) + } + v.Dst = nil +} + +type ValueVar struct { + VarID exec.VarID + Src Node + Dst DstList +} + +func (v *ValueVar) GetVarID() exec.VarID { + return v.VarID +} + +func (v *ValueVar) IndexAtSrc() int { + return v.Src.InputValues().IndexOf(v) +} + +func (v *ValueVar) To(to Node, slotIdx int) { + v.Dst.Add(to) + to.InputValues().Slots.Set(slotIdx, v) +} + +func (v *ValueVar) ToSlot(slot ValueInputSlot) { + v.Dst.Add(slot.Node) + slot.Node.InputValues().Slots.Set(slot.Index, v) +} + +func (v *ValueVar) NotTo(node Node) { + v.Dst.Remove(node) + node.InputValues().Slots.Clear(v) +} + +func (v *ValueVar) ClearAllDst() { + for _, n := range v.Dst { + n.InputValues().Slots.Clear(v) + } + v.Dst = nil +} + +type DstList []Node + +func (s *DstList) Len() int { + return len(*s) +} + +func (s *DstList) Get(idx int) Node { + return (*s)[idx] +} + +func (s *DstList) Add(n Node) int { + *s = append(*s, n) + return len(*s) - 1 +} + +func (s *DstList) Remove(n Node) { + for i, e := range *s { + if e == n { + *s = lo2.RemoveAt(*s, i) + return + } + } +} + +func (s *DstList) RemoveAt(idx int) { + lo2.RemoveAt(*s, idx) +} + +func (s *DstList) Resize(size int) { + if s.Len() < size { + *s = append(*s, make([]Node, size-s.Len())...) + } else if s.Len() > size { + *s = (*s)[:size] + } +} + +func (s *DstList) RawArray() []Node { + return *s +} diff --git a/common/pkgs/ioswitch/exec/config.go b/common/pkgs/ioswitch/exec/config.go new file mode 100644 index 0000000..0e15df5 --- /dev/null +++ b/common/pkgs/ioswitch/exec/config.go @@ -0,0 +1,46 @@ +package exec + +import ( + "gitlink.org.cn/cloudream/common/pkgs/types" + "gitlink.org.cn/cloudream/common/utils/reflect2" + "gitlink.org.cn/cloudream/common/utils/serder/json" +) + +type ConfigBuilder struct { + unions []*types.AnyTypeUnion + opUnion types.TypeUnion[Op] + workerInfoType reflect2.Type +} + +func (c *ConfigBuilder) UseUnion(u *types.AnyTypeUnion) *ConfigBuilder { + c.unions = append(c.unions, u) + return c +} + +func (c *ConfigBuilder) UseOpType(nilValue Op) *ConfigBuilder { + c.opUnion.Add(reflect2.TypeOfValue(nilValue)) + return c +} + +func (c *ConfigBuilder) UseWorkerInfoType(nilValue WorkerInfo) *ConfigBuilder { + c.workerInfoType = reflect2.TypeOfValue(nilValue) + return c +} + +func (c *ConfigBuilder) Build() Config { + b := json.New().UseUnionExternallyTagged(c.opUnion.ToAny()) + for _, u := range c.unions { + b.UseUnionExternallyTagged(u) + } + + // b.UseExtension(&workerInfoJSONExt{workerInfoType: c.workerInfoType}) + + ser := b.Build() + return Config{ + Serder: ser, + } +} + +type Config struct { + Serder json.Serder +} diff --git a/common/pkgs/ioswitch/exec/context.go b/common/pkgs/ioswitch/exec/context.go new file mode 100644 index 0000000..efeb0fa --- /dev/null +++ b/common/pkgs/ioswitch/exec/context.go @@ -0,0 +1,56 @@ +package exec + +import ( + "context" + "fmt" + + "gitlink.org.cn/cloudream/common/utils/reflect2" +) + +var ErrValueNotFound = fmt.Errorf("value not found") + +type ExecContext struct { + Context context.Context + Values map[any]any +} + +func NewExecContext() *ExecContext { + return NewWithContext(context.Background()) +} + +func NewWithContext(ctx context.Context) *ExecContext { + return &ExecContext{Context: ctx, Values: make(map[any]any)} +} + +// error只会是ErrValueNotFound +func (c *ExecContext) Value(key any) (any, error) { + value, ok := c.Values[key] + if !ok { + return nil, ErrValueNotFound + } + return value, nil +} + +func (c *ExecContext) SetValue(key any, value any) { + c.Values[key] = value +} + +func GetValueByType[T any](ctx *ExecContext) (T, error) { + var ret T + + value, err := ctx.Value(reflect2.TypeOf[T]()) + if err != nil { + return ret, err + } + + ret, ok := value.(T) + if !ok { + return ret, fmt.Errorf("value is %T, not %T", value, ret) + } + + return ret, nil +} + +func SetValueByType[T any](ctx *ExecContext, value T) { + ctx.SetValue(reflect2.TypeOf[T](), value) +} diff --git a/common/pkgs/ioswitch/exec/driver.go b/common/pkgs/ioswitch/exec/driver.go new file mode 100644 index 0000000..433b7c3 --- /dev/null +++ b/common/pkgs/ioswitch/exec/driver.go @@ -0,0 +1,118 @@ +package exec + +import ( + "context" + "fmt" + "io" + "sync" + + "github.com/hashicorp/go-multierror" + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/common/utils/math2" +) + +type Driver struct { + planID PlanID + planBlder *PlanBuilder + callback *future.SetValueFuture[map[string]VarValue] + ctx *ExecContext + cancel context.CancelFunc + driverExec *Executor +} + +// 开始写入一个流。此函数会将输入视为一个完整的流,因此会给流包装一个Range来获取只需要的部分。 +func (e *Driver) BeginWrite(str io.ReadCloser, handle *DriverWriteStream) { + e.driverExec.PutVar(handle.ID, &StreamValue{Stream: io2.NewRange(str, handle.RangeHint.Offset, handle.RangeHint.Length)}) +} + +// 开始写入一个流。此函数默认输入流已经是Handle的RangeHint锁描述的范围,因此不会做任何其他处理 +func (e *Driver) BeginWriteRanged(str io.ReadCloser, handle *DriverWriteStream) { + e.driverExec.PutVar(handle.ID, &StreamValue{Stream: str}) +} + +func (e *Driver) BeginRead(handle *DriverReadStream) (io.ReadCloser, error) { + str, err := BindVar[*StreamValue](e.driverExec, e.ctx.Context, handle.ID) + if err != nil { + return nil, fmt.Errorf("bind vars: %w", err) + } + + return str.Stream, nil +} + +func (e *Driver) Signal(signal *DriverSignalVar) { + e.driverExec.PutVar(signal.ID, &SignalValue{}) +} + +func (e *Driver) Wait(ctx context.Context) (map[string]VarValue, error) { + stored, err := e.callback.Wait(ctx) + if err != nil { + return nil, err + } + + return stored, nil +} + +func (e *Driver) execute() { + wg := sync.WaitGroup{} + + errLock := sync.Mutex{} + var execErr error + for _, p := range e.planBlder.WorkerPlans { + wg.Add(1) + + go func(p *WorkerPlanBuilder, ctx context.Context, cancel context.CancelFunc) { + defer wg.Done() + + plan := Plan{ + ID: e.planID, + Ops: p.Ops, + } + + cli, err := p.Worker.NewClient() + if err != nil { + errLock.Lock() + execErr = multierror.Append(execErr, fmt.Errorf("worker %v: new client: %w", p.Worker, err)) + errLock.Unlock() + cancel() + return + } + defer cli.Close() + + err = cli.ExecutePlan(ctx, plan) + if err != nil { + errLock.Lock() + execErr = multierror.Append(execErr, fmt.Errorf("worker %v: execute plan: %w", p.Worker, err)) + errLock.Unlock() + cancel() + return + } + }(p, e.ctx.Context, e.cancel) + } + + stored, err := e.driverExec.Run(e.ctx) + if err != nil { + errLock.Lock() + execErr = multierror.Append(execErr, fmt.Errorf("driver: execute plan: %w", err)) + errLock.Unlock() + e.cancel() + } + + wg.Wait() + + e.callback.SetComplete(stored, execErr) +} + +type DriverWriteStream struct { + ID VarID + RangeHint *math2.Range +} + +type DriverReadStream struct { + ID VarID +} + +type DriverSignalVar struct { + ID VarID + Signal SignalValue +} diff --git a/common/pkgs/ioswitch/exec/exec.go b/common/pkgs/ioswitch/exec/exec.go new file mode 100644 index 0000000..51de4ea --- /dev/null +++ b/common/pkgs/ioswitch/exec/exec.go @@ -0,0 +1,25 @@ +package exec + +import ( + "gitlink.org.cn/cloudream/common/pkgs/types" + "gitlink.org.cn/cloudream/common/utils/reflect2" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type PlanID string + +type Plan struct { + ID PlanID `json:"id"` + Ops []Op `json:"ops"` +} + +var opUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[Op]())) + +type Op interface { + Execute(ctx *ExecContext, e *Executor) error + String() string +} + +func UseOp[T Op]() { + opUnion.Add(reflect2.TypeOf[T]()) +} diff --git a/common/pkgs/ioswitch/exec/executor.go b/common/pkgs/ioswitch/exec/executor.go new file mode 100644 index 0000000..50ed2d3 --- /dev/null +++ b/common/pkgs/ioswitch/exec/executor.go @@ -0,0 +1,181 @@ +package exec + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/hashicorp/go-multierror" + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/utils/lo2" +) + +type binding struct { + ID VarID + Callback *future.SetValueFuture[VarValue] +} + +type freeVar struct { + ID VarID + Value VarValue +} + +type Executor struct { + plan Plan + vars map[VarID]freeVar + bindings []*binding + lock sync.Mutex + store map[string]VarValue +} + +func NewExecutor(plan Plan) *Executor { + planning := Executor{ + plan: plan, + vars: make(map[VarID]freeVar), + store: make(map[string]VarValue), + } + + return &planning +} + +func (s *Executor) Plan() *Plan { + return &s.plan +} + +func (s *Executor) Run(ctx *ExecContext) (map[string]VarValue, error) { + c, cancel := context.WithCancel(ctx.Context) + ctx = &ExecContext{ + Context: c, + Values: ctx.Values, + } + + defer cancel() + + err := s.runOps(s.plan.Ops, ctx, cancel) + if err != nil { + return nil, err + } + + return s.store, nil +} + +func (s *Executor) runOps(ops []Op, ctx *ExecContext, cancel context.CancelFunc) error { + lock := sync.Mutex{} + + var err error + + var wg sync.WaitGroup + wg.Add(len(ops)) + for i, arg := range ops { + go func(op Op, index int) { + defer wg.Done() + + if e := op.Execute(ctx, s); e != nil { + lock.Lock() + // 尽量不记录 Canceled 错误,除非没有其他错误 + if errors.Is(e, context.Canceled) { + if err == nil { + err = context.Canceled + } + } else { + err = multierror.Append(err, fmt.Errorf("%T: %w", op, e)) + } + lock.Unlock() + + cancel() + } + }(arg, i) + } + wg.Wait() + + return err +} + +func (s *Executor) BindVar(ctx context.Context, id VarID) (VarValue, error) { + s.lock.Lock() + + gv, ok := s.vars[id] + if ok { + delete(s.vars, id) + s.lock.Unlock() + return gv.Value, nil + } + + callback := future.NewSetValue[VarValue]() + s.bindings = append(s.bindings, &binding{ + ID: id, + Callback: callback, + }) + + s.lock.Unlock() + return callback.Wait(ctx) +} + +func (s *Executor) PutVar(id VarID, value VarValue) *Executor { + s.lock.Lock() + defer s.lock.Unlock() + + for ib, b := range s.bindings { + if b.ID != id { + continue + } + + b.Callback.SetValue(value) + s.bindings = lo2.RemoveAt(s.bindings, ib) + + return s + } + + // 如果没有绑定,则直接放入变量表中 + s.vars[id] = freeVar{ID: id, Value: value} + return s +} + +func (s *Executor) Store(key string, val VarValue) { + s.lock.Lock() + defer s.lock.Unlock() + + s.store[key] = val +} + +func BindVar[T VarValue](e *Executor, ctx context.Context, id VarID) (T, error) { + v, err := e.BindVar(ctx, id) + if err != nil { + var def T + return def, err + } + + ret, ok := v.(T) + if !ok { + var def T + return def, fmt.Errorf("binded var %v is %T, not %T", id, v, def) + } + + return ret, nil +} + +func BindArray[T VarValue](e *Executor, ctx context.Context, ids []VarID) ([]T, error) { + ret := make([]T, len(ids)) + for i := range ids { + v, err := e.BindVar(ctx, ids[i]) + if err != nil { + return nil, err + } + + v2, ok := v.(T) + if !ok { + var def T + return nil, fmt.Errorf("binded var %v is %T, not %T", ids[i], v, def) + } + + ret[i] = v2 + } + return ret, nil +} + +func PutArray[T VarValue](e *Executor, ids []VarID, values []T) { + for i := range ids { + e.PutVar(ids[i], values[i]) + } +} diff --git a/common/pkgs/ioswitch/exec/plan_builder.go b/common/pkgs/ioswitch/exec/plan_builder.go new file mode 100644 index 0000000..ac7a263 --- /dev/null +++ b/common/pkgs/ioswitch/exec/plan_builder.go @@ -0,0 +1,122 @@ +package exec + +import ( + "context" + "strings" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/utils/lo2" +) + +type PlanBuilder struct { + NextVarID VarID + WorkerPlans []*WorkerPlanBuilder + DriverPlan DriverPlanBuilder +} + +func NewPlanBuilder() *PlanBuilder { + bld := &PlanBuilder{ + NextVarID: VarID(1), + DriverPlan: DriverPlanBuilder{}, + } + + return bld +} + +func (b *PlanBuilder) AtDriver() *DriverPlanBuilder { + return &b.DriverPlan +} + +func (b *PlanBuilder) AtWorker(worker WorkerInfo) *WorkerPlanBuilder { + for _, p := range b.WorkerPlans { + if p.Worker.Equals(worker) { + return p + } + } + + p := &WorkerPlanBuilder{ + Worker: worker, + } + b.WorkerPlans = append(b.WorkerPlans, p) + + return p +} + +func (b *PlanBuilder) NewVar() VarID { + id := b.NextVarID + b.NextVarID++ + + return id +} + +func (b *PlanBuilder) Execute(ctx *ExecContext) *Driver { + c, cancel := context.WithCancel(ctx.Context) + ctx.Context = c + + planID := genRandomPlanID() + + execPlan := Plan{ + ID: planID, + Ops: b.DriverPlan.Ops, + } + + exec := Driver{ + planID: planID, + planBlder: b, + callback: future.NewSetValue[map[string]VarValue](), + ctx: ctx, + cancel: cancel, + driverExec: NewExecutor(execPlan), + } + go exec.execute() + + return &exec +} + +func (b *PlanBuilder) String() string { + sb := strings.Builder{} + sb.WriteString("Driver:\n") + for _, op := range b.DriverPlan.Ops { + sb.WriteString(op.String()) + sb.WriteRune('\n') + } + sb.WriteRune('\n') + + for _, w := range b.WorkerPlans { + sb.WriteString("Worker(") + sb.WriteString(w.Worker.String()) + sb.WriteString("):\n") + for _, op := range w.Ops { + sb.WriteString(op.String()) + sb.WriteRune('\n') + } + sb.WriteRune('\n') + } + + return sb.String() +} + +type WorkerPlanBuilder struct { + Worker WorkerInfo + Ops []Op +} + +func (b *WorkerPlanBuilder) AddOp(op Op) { + b.Ops = append(b.Ops, op) +} + +func (b *WorkerPlanBuilder) RemoveOp(op Op) { + b.Ops = lo2.Remove(b.Ops, op) +} + +type DriverPlanBuilder struct { + Ops []Op +} + +func (b *DriverPlanBuilder) AddOp(op Op) { + b.Ops = append(b.Ops, op) +} + +func (b *DriverPlanBuilder) RemoveOp(op Op) { + b.Ops = lo2.Remove(b.Ops, op) +} diff --git a/common/pkgs/ioswitch/exec/utils.go b/common/pkgs/ioswitch/exec/utils.go new file mode 100644 index 0000000..5186943 --- /dev/null +++ b/common/pkgs/ioswitch/exec/utils.go @@ -0,0 +1,9 @@ +package exec + +import ( + "github.com/google/uuid" +) + +func genRandomPlanID() PlanID { + return PlanID(uuid.NewString()) +} diff --git a/common/pkgs/ioswitch/exec/var.go b/common/pkgs/ioswitch/exec/var.go new file mode 100644 index 0000000..179d0a2 --- /dev/null +++ b/common/pkgs/ioswitch/exec/var.go @@ -0,0 +1,81 @@ +package exec + +import ( + "io" + + "gitlink.org.cn/cloudream/common/pkgs/types" + "gitlink.org.cn/cloudream/common/utils/reflect2" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type VarID int + +type Var[T VarValue] struct { + ID VarID `json:"id"` + Value T `json:"value"` +} + +// 变量的值 +type VarValue interface { + Clone() VarValue +} + +var valueUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[VarValue]( + (*StreamValue)(nil), + (*SignalValue)(nil), + (*StringValue)(nil), +))) + +func UseVarValue[T VarValue]() { + valueUnion.Add(reflect2.TypeOf[T]()) +} + +type StreamValue struct { + Stream io.ReadCloser `json:"-"` +} + +// 不应该被调用 +func (v *StreamValue) Clone() VarValue { + panic("StreamValue should not be cloned") +} + +type StreamVar = Var[*StreamValue] + +func NewStreamVar(id VarID, stream io.ReadCloser) StreamVar { + return StreamVar{ + ID: id, + Value: &StreamValue{Stream: stream}, + } +} + +type SignalValue struct{} + +func (o *SignalValue) Clone() VarValue { + return &SignalValue{} +} + +type SignalVar = Var[*SignalValue] + +func NewSignalVar(id VarID) SignalVar { + return SignalVar{ + ID: id, + Value: &SignalValue{}, + } +} + +type StringValue struct { + Value string `json:"value"` +} + +func (o *StringValue) Clone() VarValue { + return &StringValue{Value: o.Value} +} + +type StringVar = Var[*StringValue] + +func NewStringVar(id VarID, value string) StringVar { + return StringVar{ + ID: id, + Value: &StringValue{Value: value}, + } +} diff --git a/common/pkgs/ioswitch/exec/worker.go b/common/pkgs/ioswitch/exec/worker.go new file mode 100644 index 0000000..d7fb915 --- /dev/null +++ b/common/pkgs/ioswitch/exec/worker.go @@ -0,0 +1,106 @@ +package exec + +import ( + "context" + "io" + "sync" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/utils/lo2" +) + +type finding struct { + PlanID PlanID + Callback *future.SetValueFuture[*Executor] +} + +type Worker struct { + lock sync.Mutex + executors map[PlanID]*Executor + findings []*finding +} + +func NewWorker() Worker { + return Worker{ + executors: make(map[PlanID]*Executor), + } +} + +func (s *Worker) Add(exe *Executor) { + s.lock.Lock() + defer s.lock.Unlock() + + s.executors[exe.Plan().ID] = exe + + s.findings = lo.Reject(s.findings, func(f *finding, idx int) bool { + if f.PlanID != exe.Plan().ID { + return false + } + + f.Callback.SetValue(exe) + return true + }) +} + +func (s *Worker) Remove(sw *Executor) { + s.lock.Lock() + defer s.lock.Unlock() + + delete(s.executors, sw.Plan().ID) +} + +func (s *Worker) FindByID(id PlanID) *Executor { + s.lock.Lock() + defer s.lock.Unlock() + + return s.executors[id] +} + +func (s *Worker) FindByIDContexted(ctx context.Context, id PlanID) *Executor { + s.lock.Lock() + + sw := s.executors[id] + if sw != nil { + s.lock.Unlock() + return sw + } + + cb := future.NewSetValue[*Executor]() + f := &finding{ + PlanID: id, + Callback: cb, + } + s.findings = append(s.findings, f) + + s.lock.Unlock() + + sw, _ = cb.Wait(ctx) + + s.lock.Lock() + defer s.lock.Unlock() + + s.findings = lo2.Remove(s.findings, f) + + return sw +} + +type WorkerInfo interface { + NewClient() (WorkerClient, error) + // 判断两个worker是否相同 + Equals(worker WorkerInfo) bool + // Worker信息,比如ID、地址等 + String() string +} + +type WorkerClient interface { + ExecutePlan(ctx context.Context, plan Plan) error + + SendStream(ctx context.Context, planID PlanID, id VarID, stream io.ReadCloser) error + SendVar(ctx context.Context, planID PlanID, id VarID, value VarValue) error + + GetStream(ctx context.Context, planID PlanID, streamID VarID, signalID VarID, signal VarValue) (io.ReadCloser, error) + GetVar(ctx context.Context, planID PlanID, varID VarID, signalID VarID, signal VarValue) (VarValue, error) + + Close() error +} diff --git a/common/pkgs/ioswitch/plan/compile.go b/common/pkgs/ioswitch/plan/compile.go new file mode 100644 index 0000000..7e5fb2a --- /dev/null +++ b/common/pkgs/ioswitch/plan/compile.go @@ -0,0 +1,199 @@ +package plan + +import ( + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/plan/ops" +) + +func Compile(graph *dag.Graph, planBld *exec.PlanBuilder) error { + myGraph := &ops.GraphNodeBuilder{graph} + generateSend(myGraph) + return buildPlan(graph, planBld) +} + +// 生成Send指令 +func generateSend(graph *ops.GraphNodeBuilder) { + graph.Walk(func(node dag.Node) bool { + switch node.(type) { + case *ops.SendStreamNode: + return true + case *ops.SendValueNode: + return true + case *ops.GetStreamNode: + return true + case *ops.GetValueNode: + return true + case *ops.HoldUntilNode: + return true + } + + for i := 0; i < node.OutputStreams().Len(); i++ { + out := node.OutputStreams().Get(i) + to := out.Dst.Get(0) + if to.Env().Equals(node.Env()) { + continue + } + + switch to.Env().Type { + case dag.EnvDriver: + + // // 如果是要送到Driver,则只能由Driver主动去拉取 + dstNode := out.Dst.Get(0) + + getNode := graph.NewGetStream(node.Env().Worker) + getNode.Env().ToEnvDriver(true) + + // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 + holdNode := graph.NewHoldUntil() + *holdNode.Env() = *node.Env() + + // 将Get指令的信号送到Hold指令 + holdNode.SetSignal(getNode.SignalVar()) + + out.Dst.RemoveAt(0) + + // 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令 + getNode.Get(holdNode.HoldStream(out)). + // 将Get指令的输出送到目的地 + To(to, dstNode.InputStreams().IndexOf(out)) + + case dag.EnvWorker: + // 如果是要送到Agent,则可以直接发送 + dstNode := out.Dst.Get(0) + n := graph.NewSendStream(to.Env().Worker) + *n.Env() = *node.Env() + + out.Dst.RemoveAt(0) + n.Send(out).To(to, dstNode.InputStreams().IndexOf(out)) + } + } + + for i := 0; i < node.OutputValues().Len(); i++ { + out := node.OutputValues().Get(i) + // 允许Value变量不被使用 + if out.Dst.Len() == 0 { + continue + } + + to := out.Dst.Get(0) + if to.Env().Equals(node.Env()) { + continue + } + + switch to.Env().Type { + case dag.EnvDriver: + // // 如果是要送到Driver,则只能由Driver主动去拉取 + dstNode := out.Dst.Get(0) + getNode := graph.NewGetValue(node.Env().Worker) + getNode.Env().ToEnvDriver(true) + + // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 + holdNode := graph.NewHoldUntil() + *holdNode.Env() = *node.Env() + + // 将Get指令的信号送到Hold指令 + holdNode.SetSignal(getNode.SignalVar()) + + out.Dst.RemoveAt(0) + + // 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令 + getNode.Get(holdNode.HoldVar(out)). + // 将Get指令的输出送到目的地 + To(to, dstNode.InputValues().IndexOf(out)) + + case dag.EnvWorker: + // 如果是要送到Agent,则可以直接发送 + dstNode := out.Dst.Get(0) + t := graph.NewSendValue(to.Env().Worker) + *t.Env() = *node.Env() + + out.Dst.RemoveAt(0) + + t.Send(out).To(to, dstNode.InputValues().IndexOf(out)) + } + } + + return true + }) +} + +// 生成Plan +func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { + var retErr error + graph.Walk(func(node dag.Node) bool { + for i := 0; i < node.OutputStreams().Len(); i++ { + out := node.OutputStreams().Get(i) + if out == nil { + continue + } + + if out.VarID > 0 { + continue + } + + out.VarID = blder.NewVar() + } + + for i := 0; i < node.InputStreams().Len(); i++ { + in := node.InputStreams().Get(i) + if in == nil { + continue + } + + if in.VarID > 0 { + continue + } + + in.VarID = blder.NewVar() + } + + for i := 0; i < node.OutputValues().Len(); i++ { + out := node.OutputValues().Get(i) + if out == nil { + continue + } + + if out.VarID > 0 { + continue + } + + out.VarID = blder.NewVar() + } + + for i := 0; i < node.InputValues().Len(); i++ { + in := node.InputValues().Get(i) + if in == nil { + continue + } + + if in.VarID > 0 { + continue + } + + in.VarID = blder.NewVar() + } + + op, err := node.GenerateOp() + if err != nil { + retErr = err + return false + } + + // TODO 当前ToDriver,FromDriver不会生成Op,所以这里需要判断一下 + if op == nil { + return true + } + + switch node.Env().Type { + case dag.EnvDriver: + blder.AtDriver().AddOp(op) + case dag.EnvWorker: + blder.AtWorker(node.Env().Worker).AddOp(op) + } + + return true + }) + + return retErr +} diff --git a/common/pkgs/ioswitch/plan/ops/driver.go b/common/pkgs/ioswitch/plan/ops/driver.go new file mode 100644 index 0000000..ae03ad0 --- /dev/null +++ b/common/pkgs/ioswitch/plan/ops/driver.go @@ -0,0 +1,75 @@ +package ops + +import ( + "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" +) + +type FromDriverNode struct { + dag.NodeBase + Handle *exec.DriverWriteStream +} + +func (b *GraphNodeBuilder) NewFromDriver(handle *exec.DriverWriteStream) *FromDriverNode { + node := &FromDriverNode{ + Handle: handle, + } + b.AddNode(node) + + node.OutputStreams().Init(node, 1) + + return node +} + +func (t *FromDriverNode) Output() dag.StreamOutputSlot { + return dag.StreamOutputSlot{ + Node: t, + Index: 0, + } +} + +func (t *FromDriverNode) GenerateOp() (exec.Op, error) { + t.Handle.ID = t.OutputStreams().Get(0).VarID + return nil, nil +} + +// func (t *FromDriverType) String() string { +// return fmt.Sprintf("FromDriver[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } + +type ToDriverNode struct { + dag.NodeBase + Handle *exec.DriverReadStream + Range math2.Range +} + +func (b *GraphNodeBuilder) NewToDriver(handle *exec.DriverReadStream) *ToDriverNode { + node := &ToDriverNode{ + Handle: handle, + } + b.AddNode(node) + + node.InputStreams().Init(1) + return node +} + +func (t *ToDriverNode) SetInput(v *dag.StreamVar) { + v.To(t, 0) +} + +func (t *ToDriverNode) Input() dag.StreamInputSlot { + return dag.StreamInputSlot{ + Node: t, + Index: 0, + } +} + +func (t *ToDriverNode) GenerateOp() (exec.Op, error) { + t.Handle.ID = t.InputStreams().Get(0).VarID + return nil, nil +} + +// func (t *ToDriverType) String() string { +// return fmt.Sprintf("ToDriver[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch/plan/ops/drop.go b/common/pkgs/ioswitch/plan/ops/drop.go new file mode 100644 index 0000000..eb661ad --- /dev/null +++ b/common/pkgs/ioswitch/plan/ops/drop.go @@ -0,0 +1,66 @@ +package ops + +import ( + "fmt" + "io" + + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" +) + +func init() { + exec.UseOp[*DropStream]() +} + +type DropStream struct { + Input exec.VarID `json:"input"` +} + +func (o *DropStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + str, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) + if err != nil { + return err + } + defer str.Stream.Close() + + for { + buf := make([]byte, 1024*8) + _, err = str.Stream.Read(buf) + if err == io.EOF { + return nil + } + if err != nil { + return err + } + } +} + +func (o *DropStream) String() string { + return fmt.Sprintf("DropStream %v", o.Input) +} + +type DropNode struct { + dag.NodeBase +} + +func (b *GraphNodeBuilder) NewDropStream() *DropNode { + node := &DropNode{} + b.AddNode(node) + + node.InputStreams().Init(1) + return node +} + +func (t *DropNode) SetInput(v *dag.StreamVar) { + v.To(t, 0) +} + +func (t *DropNode) GenerateOp() (exec.Op, error) { + return &DropStream{ + Input: t.InputStreams().Get(0).VarID, + }, nil +} + +// func (t *DropType) String(node *dag.Node) string { +// return fmt.Sprintf("Drop[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch/plan/ops/ops.go b/common/pkgs/ioswitch/plan/ops/ops.go new file mode 100644 index 0000000..064370d --- /dev/null +++ b/common/pkgs/ioswitch/plan/ops/ops.go @@ -0,0 +1,13 @@ +package ops + +import "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + +type GraphNodeBuilder struct { + *dag.Graph +} + +func NewGraphNodeBuilder() *GraphNodeBuilder { + return &GraphNodeBuilder{ + Graph: dag.NewGraph(), + } +} diff --git a/common/pkgs/ioswitch/plan/ops/send.go b/common/pkgs/ioswitch/plan/ops/send.go new file mode 100644 index 0000000..28c05b0 --- /dev/null +++ b/common/pkgs/ioswitch/plan/ops/send.go @@ -0,0 +1,284 @@ +package ops + +import ( + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" +) + +func init() { + exec.UseOp[*SendStream]() + exec.UseOp[*GetStream]() + exec.UseOp[*SendVar]() + exec.UseOp[*GetVar]() +} + +type SendStream struct { + Input exec.VarID `json:"input"` + Send exec.VarID `json:"send"` + Worker exec.WorkerInfo `json:"worker"` +} + +func (o *SendStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + inputStr, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) + if err != nil { + return err + } + defer inputStr.Stream.Close() + + cli, err := o.Worker.NewClient() + if err != nil { + return fmt.Errorf("new worker %v client: %w", o.Worker, err) + } + defer cli.Close() + + // 发送后流的ID不同 + err = cli.SendStream(ctx.Context, e.Plan().ID, o.Send, inputStr.Stream) + if err != nil { + return fmt.Errorf("sending stream: %w", err) + } + + return nil +} + +func (o *SendStream) String() string { + return fmt.Sprintf("SendStream %v->%v@%v", o.Input, o.Send, o.Worker) +} + +type GetStream struct { + Signal exec.SignalVar `json:"signal"` + Target exec.VarID `json:"target"` + Output exec.VarID `json:"output"` + Worker exec.WorkerInfo `json:"worker"` +} + +func (o *GetStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + cli, err := o.Worker.NewClient() + if err != nil { + return fmt.Errorf("new worker %v client: %w", o.Worker, err) + } + defer cli.Close() + + str, err := cli.GetStream(ctx.Context, e.Plan().ID, o.Target, o.Signal.ID, o.Signal.Value) + if err != nil { + return fmt.Errorf("getting stream: %w", err) + } + + fut := future.NewSetVoid() + // 获取后送到本地的流ID是不同的 + str = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) { + fut.SetVoid() + }) + e.PutVar(o.Output, &exec.StreamValue{Stream: str}) + + return fut.Wait(ctx.Context) +} + +func (o *GetStream) String() string { + return fmt.Sprintf("GetStream %v(S:%v)<-%v@%v", o.Output, o.Signal.ID, o.Target, o.Worker) +} + +type SendVar struct { + Input exec.VarID `json:"input"` + Send exec.VarID `json:"send"` + Worker exec.WorkerInfo `json:"worker"` +} + +func (o *SendVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + input, err := e.BindVar(ctx.Context, o.Input) + if err != nil { + return err + } + + cli, err := o.Worker.NewClient() + if err != nil { + return fmt.Errorf("new worker %v client: %w", o.Worker, err) + } + defer cli.Close() + + err = cli.SendVar(ctx.Context, e.Plan().ID, o.Send, input) + if err != nil { + return fmt.Errorf("sending var: %w", err) + } + + return nil +} + +func (o *SendVar) String() string { + return fmt.Sprintf("SendVar %v->%v@%v", o.Input, o.Send, o.Worker) +} + +type GetVar struct { + Signal exec.SignalVar `json:"signal"` + Target exec.VarID `json:"target"` + Output exec.VarID `json:"output"` + Worker exec.WorkerInfo `json:"worker"` +} + +func (o *GetVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + cli, err := o.Worker.NewClient() + if err != nil { + return fmt.Errorf("new worker %v client: %w", o.Worker, err) + } + defer cli.Close() + + get, err := cli.GetVar(ctx.Context, e.Plan().ID, o.Target, o.Signal.ID, o.Signal.Value) + if err != nil { + return fmt.Errorf("getting var: %w", err) + } + + e.PutVar(o.Output, get) + + return nil +} + +func (o *GetVar) String() string { + return fmt.Sprintf("GetVar %v(S:%v)<-%v@%v", o.Output, o.Signal.ID, o.Target, o.Worker) +} + +type SendStreamNode struct { + dag.NodeBase + ToWorker exec.WorkerInfo +} + +func (b *GraphNodeBuilder) NewSendStream(to exec.WorkerInfo) *SendStreamNode { + node := &SendStreamNode{ + ToWorker: to, + } + b.AddNode(node) + + node.InputStreams().Init(1) + node.OutputStreams().Init(node, 1) + return node +} + +func (t *SendStreamNode) Send(v *dag.StreamVar) *dag.StreamVar { + v.To(t, 0) + return t.OutputStreams().Get(0) +} + +func (t *SendStreamNode) GenerateOp() (exec.Op, error) { + return &SendStream{ + Input: t.InputStreams().Get(0).VarID, + Send: t.OutputStreams().Get(0).VarID, + Worker: t.ToWorker, + }, nil +} + +// func (t *SendStreamType) String() string { +// return fmt.Sprintf("SendStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } + +type SendValueNode struct { + dag.NodeBase + ToWorker exec.WorkerInfo +} + +func (b *GraphNodeBuilder) NewSendValue(to exec.WorkerInfo) *SendValueNode { + node := &SendValueNode{ + ToWorker: to, + } + b.AddNode(node) + + node.InputValues().Init(1) + node.OutputValues().Init(node, 1) + return node +} + +func (t *SendValueNode) Send(v *dag.ValueVar) *dag.ValueVar { + v.To(t, 0) + return t.OutputValues().Get(0) +} + +func (t *SendValueNode) GenerateOp() (exec.Op, error) { + return &SendVar{ + Input: t.InputValues().Get(0).VarID, + Send: t.OutputValues().Get(0).VarID, + Worker: t.ToWorker, + }, nil +} + +// func (t *SendVarType) String() string { +// return fmt.Sprintf("SendVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } + +type GetStreamNode struct { + dag.NodeBase + FromWorker exec.WorkerInfo +} + +func (b *GraphNodeBuilder) NewGetStream(from exec.WorkerInfo) *GetStreamNode { + node := &GetStreamNode{ + FromWorker: from, + } + b.AddNode(node) + + node.InputStreams().Init(1) + node.OutputValues().Init(node, 1) + node.OutputStreams().Init(node, 1) + return node +} + +func (t *GetStreamNode) Get(v *dag.StreamVar) *dag.StreamVar { + v.To(t, 0) + return t.OutputStreams().Get(0) +} + +func (t *GetStreamNode) SignalVar() *dag.ValueVar { + return t.OutputValues().Get(0) +} + +func (t *GetStreamNode) GenerateOp() (exec.Op, error) { + return &GetStream{ + Signal: exec.NewSignalVar(t.OutputValues().Get(0).VarID), + Output: t.OutputStreams().Get(0).VarID, + Target: t.InputStreams().Get(0).VarID, + Worker: t.FromWorker, + }, nil +} + +// func (t *GetStreamType) String() string { +// return fmt.Sprintf("GetStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } + +type GetValueNode struct { + dag.NodeBase + FromWorker exec.WorkerInfo +} + +func (b *GraphNodeBuilder) NewGetValue(from exec.WorkerInfo) *GetValueNode { + node := &GetValueNode{ + FromWorker: from, + } + b.AddNode(node) + + node.InputValues().Init(1) + node.OutputValues().Init(node, 2) + return node +} + +func (t *GetValueNode) Get(v *dag.ValueVar) *dag.ValueVar { + v.To(t, 0) + return t.OutputValues().Get(1) +} + +func (t *GetValueNode) SignalVar() *dag.ValueVar { + return t.OutputValues().Get(0) +} + +func (t *GetValueNode) GenerateOp() (exec.Op, error) { + return &GetVar{ + Signal: exec.NewSignalVar(t.OutputValues().Get(0).VarID), + Output: t.OutputValues().Get(1).VarID, + Target: t.InputValues().Get(0).VarID, + Worker: t.FromWorker, + }, nil +} + +// func (t *GetVaType) String() string { +// return fmt.Sprintf("GetVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch/plan/ops/store.go b/common/pkgs/ioswitch/plan/ops/store.go new file mode 100644 index 0000000..c7fd38d --- /dev/null +++ b/common/pkgs/ioswitch/plan/ops/store.go @@ -0,0 +1,94 @@ +package ops + +import ( + "fmt" + + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" +) + +type Store struct { + Var exec.VarID + Key string +} + +func (o *Store) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + v, err := e.BindVar(ctx.Context, o.Var) + if err != nil { + return err + } + + e.Store(o.Key, v) + return nil +} + +func (o *Store) String() string { + return fmt.Sprintf("Store %v as \"%v\"", o.Var, o.Key) +} + +type StoreConst struct { + Key string + Value exec.VarValue +} + +func (o *StoreConst) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + e.Store(o.Key, o.Value) + return nil +} + +func (o *StoreConst) String() string { + return fmt.Sprintf("StoreConst %v: %v", o.Key, o.Value) +} + +type StoreNode struct { + dag.NodeBase + Key string +} + +func (b *GraphNodeBuilder) NewStore() *StoreNode { + node := &StoreNode{} + b.AddNode(node) + return node +} + +func (t *StoreNode) Store(key string, v *dag.ValueVar) { + t.Key = key + t.InputValues().Init(1) + v.To(t, 0) +} + +func (t *StoreNode) GenerateOp() (exec.Op, error) { + return &Store{ + Var: t.InputValues().Get(0).VarID, + Key: t.Key, + }, nil +} + +// func (t *StoreType) String() string { +// return fmt.Sprintf("Store[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node)) +// } + +type StoreConstNode struct { + dag.NodeBase + Key string + Value exec.VarValue +} + +func (b *GraphNodeBuilder) NewStoreConst(key string, value exec.VarValue) *StoreConstNode { + node := &StoreConstNode{ + Key: key, + Value: value, + } + b.AddNode(node) + return node +} + +func (t *StoreConstNode) GenerateOp() (exec.Op, error) { + return &StoreConst{ + Key: t.Key, + Value: t.Value, + }, nil +} + +// func (t *StoreConstType) String() string { +// return fmt.Sprintf("StoreConst[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node)) diff --git a/common/pkgs/ioswitch/plan/ops/sync.go b/common/pkgs/ioswitch/plan/ops/sync.go new file mode 100644 index 0000000..99e91a8 --- /dev/null +++ b/common/pkgs/ioswitch/plan/ops/sync.go @@ -0,0 +1,206 @@ +package ops + +import ( + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/utils" +) + +func init() { + exec.UseOp[*OnStreamBegin]() + exec.UseOp[*OnStreamEnd]() + exec.UseOp[*HoldUntil]() + exec.UseOp[*HangUntil]() + exec.UseOp[*Broadcast]() +} + +type OnStreamBegin struct { + Raw exec.VarID `json:"raw"` + New exec.VarID `json:"new"` + Signal exec.SignalVar `json:"signal"` +} + +func (o *OnStreamBegin) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + raw, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Raw) + if err != nil { + return err + } + + e.PutVar(o.New, &exec.StreamValue{Stream: raw.Stream}). + PutVar(o.Signal.ID, o.Signal.Value) + return nil +} + +func (o *OnStreamBegin) String() string { + return fmt.Sprintf("OnStreamBegin %v->%v S:%v", o.Raw, o.New, o.Signal.ID) +} + +type OnStreamEnd struct { + Raw exec.VarID `json:"raw"` + New exec.VarID `json:"new"` + Signal *exec.SignalVar `json:"signal"` +} + +type onStreamEnd struct { + inner io.ReadCloser + callback *future.SetVoidFuture +} + +func (o *onStreamEnd) Read(p []byte) (n int, err error) { + n, err = o.inner.Read(p) + if err == io.EOF { + o.callback.SetVoid() + } else if err != nil { + o.callback.SetError(err) + } + return n, err +} + +func (o *onStreamEnd) Close() error { + o.callback.SetError(fmt.Errorf("stream closed early")) + return o.inner.Close() +} + +func (o *OnStreamEnd) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + raw, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Raw) + if err != nil { + return err + } + + cb := future.NewSetVoid() + + e.PutVar(o.New, &exec.StreamValue{Stream: &onStreamEnd{ + inner: raw.Stream, + callback: cb, + }}) + + err = cb.Wait(ctx.Context) + if err != nil { + return err + } + + e.PutVar(o.Signal.ID, o.Signal.Value) + return nil +} + +func (o *OnStreamEnd) String() string { + return fmt.Sprintf("OnStreamEnd %v->%v S:%v", o.Raw, o.New, o.Signal.ID) +} + +type HoldUntil struct { + Waits []exec.VarID `json:"waits"` + Holds []exec.VarID `json:"holds"` + Emits []exec.VarID `json:"emits"` +} + +func (w *HoldUntil) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + holds, err := exec.BindArray[exec.VarValue](e, ctx.Context, w.Holds) + if err != nil { + return err + } + + _, err = exec.BindArray[exec.VarValue](e, ctx.Context, w.Waits) + if err != nil { + return err + } + + exec.PutArray(e, w.Emits, holds) + return nil +} + +func (w *HoldUntil) String() string { + return fmt.Sprintf("HoldUntil(waits=%v): %v -> %v", utils.FormatVarIDs(w.Waits), utils.FormatVarIDs(w.Holds), utils.FormatVarIDs(w.Emits)) +} + +type HangUntil struct { + Waits []exec.VarID `json:"waits"` + Op exec.Op `json:"op"` +} + +func (h *HangUntil) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + _, err := exec.BindArray[exec.VarValue](e, ctx.Context, h.Waits) + if err != nil { + return err + } + + return h.Op.Execute(ctx, e) +} + +func (h *HangUntil) String() string { + return "HangUntil" +} + +type Broadcast struct { + Source exec.VarID `json:"source"` + Targets []exec.VarID `json:"targets"` +} + +func (b *Broadcast) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + src, err := exec.BindVar[*exec.SignalValue](e, ctx.Context, b.Source) + if err != nil { + return err + } + + targets := make([]exec.VarValue, len(b.Targets)) + for i := 0; i < len(b.Targets); i++ { + targets[i] = src.Clone() + } + + exec.PutArray(e, b.Targets, targets) + return nil +} + +func (b *Broadcast) String() string { + return "Broadcast" +} + +type HoldUntilNode struct { + dag.NodeBase +} + +func (b *GraphNodeBuilder) NewHoldUntil() *HoldUntilNode { + node := &HoldUntilNode{} + b.AddNode(node) + node.InputValues().Init(1) + return node +} + +func (t *HoldUntilNode) SetSignal(s *dag.ValueVar) { + s.To(t, 0) +} + +func (t *HoldUntilNode) HoldStream(str *dag.StreamVar) *dag.StreamVar { + str.To(t, t.InputStreams().EnlargeOne()) + return t.OutputStreams().AppendNew(t).Var() +} + +func (t *HoldUntilNode) HoldVar(v *dag.ValueVar) *dag.ValueVar { + v.To(t, t.InputValues().EnlargeOne()) + return t.OutputValues().AppendNew(t).Var() +} + +func (t *HoldUntilNode) GenerateOp() (exec.Op, error) { + o := &HoldUntil{ + Waits: []exec.VarID{t.InputValues().Get(0).VarID}, + } + + for i := 0; i < t.OutputValues().Len(); i++ { + o.Holds = append(o.Holds, t.InputValues().Get(i+1).VarID) + o.Emits = append(o.Emits, t.OutputValues().Get(i).VarID) + } + + for i := 0; i < t.OutputStreams().Len(); i++ { + o.Holds = append(o.Holds, t.InputStreams().Get(i).VarID) + o.Emits = append(o.Emits, t.OutputStreams().Get(i).VarID) + } + + return o, nil +} + +// func (t *HoldUntilType) String() string { +// return fmt.Sprintf("HoldUntil[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch/plan/ops/utils.go b/common/pkgs/ioswitch/plan/ops/utils.go new file mode 100644 index 0000000..ca74824 --- /dev/null +++ b/common/pkgs/ioswitch/plan/ops/utils.go @@ -0,0 +1,75 @@ +package ops + +// import ( +// "fmt" + +// "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" +// ) + +// func formatStreamIO(node *dag.Node) string { +// is := "" +// for i, in := range node.InputStreams { +// if i > 0 { +// is += "," +// } + +// if in == nil { +// is += "." +// } else { +// is += fmt.Sprintf("%v", in.ID) +// } +// } + +// os := "" +// for i, out := range node.OutputStreams { +// if i > 0 { +// os += "," +// } + +// if out == nil { +// os += "." +// } else { +// os += fmt.Sprintf("%v", out.ID) +// } +// } + +// if is == "" && os == "" { +// return "" +// } + +// return fmt.Sprintf("S{%s>%s}", is, os) +// } + +// func formatValueIO(node *dag.Node) string { +// is := "" +// for i, in := range node.InputValues { +// if i > 0 { +// is += "," +// } + +// if in == nil { +// is += "." +// } else { +// is += fmt.Sprintf("%v", in.ID) +// } +// } + +// os := "" +// for i, out := range node.OutputValues { +// if i > 0 { +// os += "," +// } + +// if out == nil { +// os += "." +// } else { +// os += fmt.Sprintf("%v", out.ID) +// } +// } + +// if is == "" && os == "" { +// return "" +// } + +// return fmt.Sprintf("V{%s>%s}", is, os) +// } diff --git a/common/pkgs/ioswitch/plan/ops/var.go b/common/pkgs/ioswitch/plan/ops/var.go new file mode 100644 index 0000000..52a1010 --- /dev/null +++ b/common/pkgs/ioswitch/plan/ops/var.go @@ -0,0 +1,52 @@ +package ops + +import ( + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" +) + +func init() { + exec.UseOp[*ConstVar]() +} + +type ConstVar struct { + ID exec.VarID `json:"id"` + Value exec.VarValue `json:"value"` +} + +func (o *ConstVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + e.PutVar(o.ID, o.Value) + return nil +} + +func (o *ConstVar) String() string { + return "ConstVar" +} + +type ConstNode struct { + dag.NodeBase + Value exec.VarValue +} + +func (b *GraphNodeBuilder) NewConst(val exec.VarValue) *ConstNode { + node := &ConstNode{ + Value: val, + } + b.AddNode(node) + + node.OutputValues().Init(node, 1) + return node +} + +func (t *ConstNode) Output() dag.ValueOutputSlot { + return dag.ValueOutputSlot{ + Node: t, + Index: 0, + } +} + +func (t *ConstNode) GenerateOp() (exec.Op, error) { + return &DropStream{ + Input: t.Output().Var().VarID, + }, nil +} diff --git a/common/pkgs/ioswitch/utils/utils.go b/common/pkgs/ioswitch/utils/utils.go new file mode 100644 index 0000000..90afc29 --- /dev/null +++ b/common/pkgs/ioswitch/utils/utils.go @@ -0,0 +1,19 @@ +package utils + +import ( + "fmt" + "strings" + + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" +) + +func FormatVarIDs(arr []exec.VarID) string { + sb := strings.Builder{} + for i, v := range arr { + sb.WriteString(fmt.Sprintf("%v", v)) + if i < len(arr)-1 { + sb.WriteString(",") + } + } + return sb.String() +} diff --git a/common/pkgs/ioswitch2/fromto.go b/common/pkgs/ioswitch2/fromto.go index 1820af5..c37da0c 100644 --- a/common/pkgs/ioswitch2/fromto.go +++ b/common/pkgs/ioswitch2/fromto.go @@ -1,9 +1,9 @@ package ioswitch2 import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/math2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) diff --git a/common/pkgs/ioswitch2/http_hub_worker.go b/common/pkgs/ioswitch2/http_hub_worker.go index 9b02aca..31f6511 100644 --- a/common/pkgs/ioswitch2/http_hub_worker.go +++ b/common/pkgs/ioswitch2/http_hub_worker.go @@ -5,11 +5,11 @@ import ( "io" "strconv" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" + hubapi "gitlink.org.cn/cloudream/jcs-pub/hub/sdk/api" ) type HttpHubWorker struct { @@ -19,10 +19,10 @@ type HttpHubWorker struct { func (w *HttpHubWorker) NewClient() (exec.WorkerClient, error) { addressInfo := w.Hub.Address.(*cortypes.HttpAddressInfo) baseUrl := "http://" + addressInfo.ExternalIP + ":" + strconv.Itoa(addressInfo.Port) - config := cdsapi.Config{ + config := hubapi.Config{ URL: baseUrl, } - pool := cdsapi.NewPool(&config) + pool := hubapi.NewPool(&config) cli, err := pool.Acquire() defer pool.Release(cli) if err != nil { @@ -47,17 +47,17 @@ func (w *HttpHubWorker) Equals(worker exec.WorkerInfo) bool { type HttpHubWorkerClient struct { hubID cortypes.HubID - cli *cdsapi.Client + cli *hubapi.Client } func (c *HttpHubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { - return c.cli.ExecuteIOPlan(cdsapi.ExecuteIOPlanReq{ + return c.cli.ExecuteIOPlan(hubapi.ExecuteIOPlanReq{ Plan: plan, }) } func (c *HttpHubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { - return c.cli.SendStream(cdsapi.SendStreamReq{ - SendStreamInfo: cdsapi.SendStreamInfo{ + return c.cli.SendStream(hubapi.SendStreamReq{ + SendStreamInfo: hubapi.SendStreamInfo{ PlanID: planID, VarID: id, }, @@ -69,14 +69,14 @@ func (c *HttpHubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID }) } func (c *HttpHubWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, id exec.VarID, value exec.VarValue) error { - return c.cli.SendVar(cdsapi.SendVarReq{ + return c.cli.SendVar(hubapi.SendVarReq{ PlanID: planID, VarID: id, VarValue: value, }) } func (c *HttpHubWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, streamID exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) { - str, err := c.cli.GetStream(cdsapi.GetStreamReq{ + str, err := c.cli.GetStream(hubapi.GetStreamReq{ PlanID: planID, VarID: streamID, SignalID: signalID, @@ -93,7 +93,7 @@ func (c *HttpHubWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, }), nil } func (c *HttpHubWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, varID exec.VarID, signalID exec.VarID, signal exec.VarValue) (exec.VarValue, error) { - resp, err := c.cli.GetVar(cdsapi.GetVarReq{ + resp, err := c.cli.GetVar(hubapi.GetVarReq{ PlanID: planID, VarID: varID, SignalID: signalID, diff --git a/common/pkgs/ioswitch2/hub_worker.go b/common/pkgs/ioswitch2/hub_worker.go index 1370b97..35a17fa 100644 --- a/common/pkgs/ioswitch2/hub_worker.go +++ b/common/pkgs/ioswitch2/hub_worker.go @@ -4,11 +4,11 @@ import ( "context" "io" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/serder" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) diff --git a/common/pkgs/ioswitch2/ops2/base_store.go b/common/pkgs/ioswitch2/ops2/base_store.go index b7842d6..669bb0a 100644 --- a/common/pkgs/ioswitch2/ops2/base_store.go +++ b/common/pkgs/ioswitch2/ops2/base_store.go @@ -5,11 +5,11 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" diff --git a/common/pkgs/ioswitch2/ops2/bypass.go b/common/pkgs/ioswitch2/ops2/bypass.go index d2b5695..12d9d74 100644 --- a/common/pkgs/ioswitch2/ops2/bypass.go +++ b/common/pkgs/ioswitch2/ops2/bypass.go @@ -3,9 +3,9 @@ package ops2 import ( "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) diff --git a/common/pkgs/ioswitch2/ops2/chunked.go b/common/pkgs/ioswitch2/ops2/chunked.go index 6f45793..f0365f9 100644 --- a/common/pkgs/ioswitch2/ops2/chunked.go +++ b/common/pkgs/ioswitch2/ops2/chunked.go @@ -5,10 +5,10 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/utils" "golang.org/x/sync/semaphore" ) diff --git a/common/pkgs/ioswitch2/ops2/clone.go b/common/pkgs/ioswitch2/ops2/clone.go index ddf085f..7de3dac 100644 --- a/common/pkgs/ioswitch2/ops2/clone.go +++ b/common/pkgs/ioswitch2/ops2/clone.go @@ -4,10 +4,10 @@ import ( "fmt" "io" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/utils" "golang.org/x/sync/semaphore" ) diff --git a/common/pkgs/ioswitch2/ops2/driver.go b/common/pkgs/ioswitch2/ops2/driver.go index 92a8b98..e657ecd 100644 --- a/common/pkgs/ioswitch2/ops2/driver.go +++ b/common/pkgs/ioswitch2/ops2/driver.go @@ -1,9 +1,9 @@ package ops2 import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" ) diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index d98daa7..60bb240 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -5,13 +5,13 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/sync2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) diff --git a/common/pkgs/ioswitch2/ops2/join.go b/common/pkgs/ioswitch2/ops2/join.go index 1f30dcc..abf9b0c 100644 --- a/common/pkgs/ioswitch2/ops2/join.go +++ b/common/pkgs/ioswitch2/ops2/join.go @@ -5,9 +5,9 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/utils" ) func init() { diff --git a/common/pkgs/ioswitch2/ops2/length.go b/common/pkgs/ioswitch2/ops2/length.go index 6ec5403..ec8962e 100644 --- a/common/pkgs/ioswitch2/ops2/length.go +++ b/common/pkgs/ioswitch2/ops2/length.go @@ -5,8 +5,8 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" ) func init() { diff --git a/common/pkgs/ioswitch2/ops2/multipart.go b/common/pkgs/ioswitch2/ops2/multipart.go index 489e56a..f8c8171 100644 --- a/common/pkgs/ioswitch2/ops2/multipart.go +++ b/common/pkgs/ioswitch2/ops2/multipart.go @@ -4,10 +4,10 @@ import ( "fmt" "time" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" log "gitlink.org.cn/cloudream/common/pkgs/logger" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) diff --git a/common/pkgs/ioswitch2/ops2/ops.go b/common/pkgs/ioswitch2/ops2/ops.go index c402194..851b63a 100644 --- a/common/pkgs/ioswitch2/ops2/ops.go +++ b/common/pkgs/ioswitch2/ops2/ops.go @@ -1,9 +1,9 @@ package ops2 import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/plan/ops" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) diff --git a/common/pkgs/ioswitch2/ops2/range.go b/common/pkgs/ioswitch2/ops2/range.go index 78e99dd..32ae937 100644 --- a/common/pkgs/ioswitch2/ops2/range.go +++ b/common/pkgs/ioswitch2/ops2/range.go @@ -5,10 +5,10 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" ) func init() { diff --git a/common/pkgs/ioswitch2/ops2/s2s.go b/common/pkgs/ioswitch2/ops2/s2s.go index 92c63e3..5dba5be 100644 --- a/common/pkgs/ioswitch2/ops2/s2s.go +++ b/common/pkgs/ioswitch2/ops2/s2s.go @@ -3,9 +3,9 @@ package ops2 import ( "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" ) diff --git a/common/pkgs/ioswitch2/ops2/segment.go b/common/pkgs/ioswitch2/ops2/segment.go index 525e330..70a4502 100644 --- a/common/pkgs/ioswitch2/ops2/segment.go +++ b/common/pkgs/ioswitch2/ops2/segment.go @@ -6,10 +6,10 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/utils" ) func init() { diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index 75779b3..d07fe62 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -3,9 +3,9 @@ package ops2 import ( "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" ) diff --git a/common/pkgs/ioswitch2/parser/gen/generator.go b/common/pkgs/ioswitch2/parser/gen/generator.go index a536315..1cfdc0f 100644 --- a/common/pkgs/ioswitch2/parser/gen/generator.go +++ b/common/pkgs/ioswitch2/parser/gen/generator.go @@ -4,11 +4,11 @@ import ( "fmt" "math" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/os2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/state" diff --git a/common/pkgs/ioswitch2/parser/opt/chunked.go b/common/pkgs/ioswitch2/parser/opt/chunked.go index def199a..1823d1c 100644 --- a/common/pkgs/ioswitch2/parser/opt/chunked.go +++ b/common/pkgs/ioswitch2/parser/opt/chunked.go @@ -1,7 +1,7 @@ package opt import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/state" ) diff --git a/common/pkgs/ioswitch2/parser/opt/ec.go b/common/pkgs/ioswitch2/parser/opt/ec.go index 9c91f85..1589029 100644 --- a/common/pkgs/ioswitch2/parser/opt/ec.go +++ b/common/pkgs/ioswitch2/parser/opt/ec.go @@ -1,8 +1,8 @@ package opt import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/state" diff --git a/common/pkgs/ioswitch2/parser/opt/misc.go b/common/pkgs/ioswitch2/parser/opt/misc.go index 5be5581..0cf54fb 100644 --- a/common/pkgs/ioswitch2/parser/opt/misc.go +++ b/common/pkgs/ioswitch2/parser/opt/misc.go @@ -1,8 +1,8 @@ package opt import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/state" ) diff --git a/common/pkgs/ioswitch2/parser/opt/multipart.go b/common/pkgs/ioswitch2/parser/opt/multipart.go index 0a8b9a6..511e838 100644 --- a/common/pkgs/ioswitch2/parser/opt/multipart.go +++ b/common/pkgs/ioswitch2/parser/opt/multipart.go @@ -1,8 +1,8 @@ package opt import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/state" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory" diff --git a/common/pkgs/ioswitch2/parser/opt/pin.go b/common/pkgs/ioswitch2/parser/opt/pin.go index e12c745..e3471dd 100644 --- a/common/pkgs/ioswitch2/parser/opt/pin.go +++ b/common/pkgs/ioswitch2/parser/opt/pin.go @@ -1,7 +1,7 @@ package opt import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/state" ) diff --git a/common/pkgs/ioswitch2/parser/opt/s2s.go b/common/pkgs/ioswitch2/parser/opt/s2s.go index a279c86..d588984 100644 --- a/common/pkgs/ioswitch2/parser/opt/s2s.go +++ b/common/pkgs/ioswitch2/parser/opt/s2s.go @@ -1,7 +1,7 @@ package opt import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/state" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory" diff --git a/common/pkgs/ioswitch2/parser/opt/segment.go b/common/pkgs/ioswitch2/parser/opt/segment.go index b33fd7e..d3ca7bb 100644 --- a/common/pkgs/ioswitch2/parser/opt/segment.go +++ b/common/pkgs/ioswitch2/parser/opt/segment.go @@ -1,7 +1,7 @@ package opt import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/state" ) diff --git a/common/pkgs/ioswitch2/parser/opt/utils.go b/common/pkgs/ioswitch2/parser/opt/utils.go index e36dee4..b31b5ce 100644 --- a/common/pkgs/ioswitch2/parser/opt/utils.go +++ b/common/pkgs/ioswitch2/parser/opt/utils.go @@ -3,8 +3,8 @@ package opt import ( "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index fbc985f..9a6485e 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -1,8 +1,8 @@ package parser import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/plan" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/gen" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/opt" diff --git a/common/pkgs/ioswitch2/parser/state/state.go b/common/pkgs/ioswitch2/parser/state/state.go index 725ee29..58fc1de 100644 --- a/common/pkgs/ioswitch2/parser/state/state.go +++ b/common/pkgs/ioswitch2/parser/state/state.go @@ -1,8 +1,8 @@ package state import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" ) diff --git a/common/pkgs/ioswitch2/plans/complete_multipart.go b/common/pkgs/ioswitch2/plans/complete_multipart.go index 8ca52c2..54d508e 100644 --- a/common/pkgs/ioswitch2/plans/complete_multipart.go +++ b/common/pkgs/ioswitch2/plans/complete_multipart.go @@ -3,10 +3,10 @@ package plans import ( "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan" "gitlink.org.cn/cloudream/common/utils/os2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/plan" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) diff --git a/common/pkgs/ioswitch2/plans/utils.go b/common/pkgs/ioswitch2/plans/utils.go index e160b77..7547394 100644 --- a/common/pkgs/ioswitch2/plans/utils.go +++ b/common/pkgs/ioswitch2/plans/utils.go @@ -3,8 +3,8 @@ package plans import ( "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) diff --git a/common/pkgs/ioswitchlrc/fromto.go b/common/pkgs/ioswitchlrc/fromto.go index bf1e298..6402ef2 100644 --- a/common/pkgs/ioswitchlrc/fromto.go +++ b/common/pkgs/ioswitchlrc/fromto.go @@ -1,9 +1,9 @@ package ioswitchlrc import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/math2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) diff --git a/common/pkgs/ioswitchlrc/hub_worker.go b/common/pkgs/ioswitchlrc/hub_worker.go index febfeab..e9b6940 100644 --- a/common/pkgs/ioswitchlrc/hub_worker.go +++ b/common/pkgs/ioswitchlrc/hub_worker.go @@ -4,8 +4,8 @@ import ( "context" "io" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) diff --git a/common/pkgs/ioswitchlrc/ops2/base_store.go b/common/pkgs/ioswitchlrc/ops2/base_store.go index f358594..2b721ae 100644 --- a/common/pkgs/ioswitchlrc/ops2/base_store.go +++ b/common/pkgs/ioswitchlrc/ops2/base_store.go @@ -5,11 +5,11 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" diff --git a/common/pkgs/ioswitchlrc/ops2/chunked.go b/common/pkgs/ioswitchlrc/ops2/chunked.go index a9da6c8..ccf818b 100644 --- a/common/pkgs/ioswitchlrc/ops2/chunked.go +++ b/common/pkgs/ioswitchlrc/ops2/chunked.go @@ -5,10 +5,10 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/utils" "golang.org/x/sync/semaphore" ) diff --git a/common/pkgs/ioswitchlrc/ops2/clone.go b/common/pkgs/ioswitchlrc/ops2/clone.go index 88f3f4b..d451861 100644 --- a/common/pkgs/ioswitchlrc/ops2/clone.go +++ b/common/pkgs/ioswitchlrc/ops2/clone.go @@ -4,10 +4,10 @@ import ( "fmt" "io" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/utils" "golang.org/x/sync/semaphore" ) diff --git a/common/pkgs/ioswitchlrc/ops2/ec.go b/common/pkgs/ioswitchlrc/ops2/ec.go index ebf66a0..6e85bb2 100644 --- a/common/pkgs/ioswitchlrc/ops2/ec.go +++ b/common/pkgs/ioswitchlrc/ops2/ec.go @@ -5,14 +5,14 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/sync2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ec/lrc" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/utils" ) func init() { diff --git a/common/pkgs/ioswitchlrc/ops2/ops.go b/common/pkgs/ioswitchlrc/ops2/ops.go index e642646..41a5b04 100644 --- a/common/pkgs/ioswitchlrc/ops2/ops.go +++ b/common/pkgs/ioswitchlrc/ops2/ops.go @@ -1,9 +1,9 @@ package ops2 import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/plan/ops" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) diff --git a/common/pkgs/ioswitchlrc/ops2/range.go b/common/pkgs/ioswitchlrc/ops2/range.go index 78e99dd..32ae937 100644 --- a/common/pkgs/ioswitchlrc/ops2/range.go +++ b/common/pkgs/ioswitchlrc/ops2/range.go @@ -5,10 +5,10 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" ) func init() { diff --git a/common/pkgs/ioswitchlrc/ops2/shard_store.go b/common/pkgs/ioswitchlrc/ops2/shard_store.go index 92200c8..c68f7dd 100644 --- a/common/pkgs/ioswitchlrc/ops2/shard_store.go +++ b/common/pkgs/ioswitchlrc/ops2/shard_store.go @@ -3,9 +3,9 @@ package ops2 import ( "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" ) diff --git a/common/pkgs/ioswitchlrc/parser/generator.go b/common/pkgs/ioswitchlrc/parser/generator.go index fb43467..c8cf9dc 100644 --- a/common/pkgs/ioswitchlrc/parser/generator.go +++ b/common/pkgs/ioswitchlrc/parser/generator.go @@ -3,11 +3,11 @@ package parser import ( "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan" "gitlink.org.cn/cloudream/common/utils/math2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/plan" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc/ops2" ) diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index 6228467..e2b12af 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -4,9 +4,9 @@ import ( "fmt" "math" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/os2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" diff --git a/common/pkgs/ioswitchlrc/parser/utils.go b/common/pkgs/ioswitchlrc/parser/utils.go index 5bb2c38..9707041 100644 --- a/common/pkgs/ioswitchlrc/parser/utils.go +++ b/common/pkgs/ioswitchlrc/parser/utils.go @@ -3,8 +3,8 @@ package parser import ( "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) diff --git a/common/pkgs/rpc/hub/ioswitch.go b/common/pkgs/rpc/hub/ioswitch.go index a6f5bbf..820dc02 100644 --- a/common/pkgs/rpc/hub/ioswitch.go +++ b/common/pkgs/rpc/hub/ioswitch.go @@ -4,7 +4,7 @@ import ( context "context" "io" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" rpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" ) diff --git a/hub/internal/cmd/serve.go b/hub/internal/cmd/serve.go index a46b62f..1301cf1 100644 --- a/hub/internal/cmd/serve.go +++ b/hub/internal/cmd/serve.go @@ -13,10 +13,10 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/hub/internal/http" myrpc "gitlink.org.cn/cloudream/jcs-pub/hub/internal/rpc" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" diff --git a/hub/internal/http/hub_io.go b/hub/internal/http/hub_io.go index 813737e..2b08ce2 100644 --- a/hub/internal/http/hub_io.go +++ b/hub/internal/http/hub_io.go @@ -11,11 +11,11 @@ import ( "github.com/inhies/go-bytesize" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/http2" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" hubapi "gitlink.org.cn/cloudream/jcs-pub/hub/sdk/api" ) diff --git a/hub/internal/http/service.go b/hub/internal/http/service.go index b5b5bed..c45299f 100644 --- a/hub/internal/http/service.go +++ b/hub/internal/http/service.go @@ -1,7 +1,7 @@ package http import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" ) diff --git a/hub/internal/rpc/ioswitch.go b/hub/internal/rpc/ioswitch.go index dd5375d..83257d1 100644 --- a/hub/internal/rpc/ioswitch.go +++ b/hub/internal/rpc/ioswitch.go @@ -6,9 +6,9 @@ import ( "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" ) diff --git a/hub/internal/rpc/rpc.go b/hub/internal/rpc/rpc.go index 26ab330..aaf4616 100644 --- a/hub/internal/rpc/rpc.go +++ b/hub/internal/rpc/rpc.go @@ -1,7 +1,7 @@ package rpc import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/jcs-pub/hub/internal/accesstoken" diff --git a/hub/sdk/api/hub_io.go b/hub/sdk/api/hub_io.go index 5b9ceeb..1830ec5 100644 --- a/hub/sdk/api/hub_io.go +++ b/hub/sdk/api/hub_io.go @@ -6,10 +6,10 @@ import ( "net/url" "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/http2" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" ) const GetStreamPath = "/hubIO/getStream"