From 2507069cf2b718a5b0f5ef57eea7e5403892d225 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 28 Aug 2024 17:26:53 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=B0=83=E8=AF=95=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/ioswitch/exec/exec.go | 1 + pkgs/ioswitch/exec/plan_builder.go | 24 ++++++++++++++++++++++++ pkgs/ioswitch/plan/ops/driver.go | 3 ++- pkgs/ioswitch/plan/ops/drop.go | 5 +++++ pkgs/ioswitch/plan/ops/send.go | 16 ++++++++++++++++ pkgs/ioswitch/plan/ops/store.go | 4 ++++ pkgs/ioswitch/plan/ops/sync.go | 21 +++++++++++++++++++++ pkgs/ioswitch/plan/ops/var.go | 4 ++++ pkgs/ioswitch/utils/utils.go | 19 +++++++++++++++++++ sdks/storage/models.go | 27 ++++++++++++++++++++++++++- 10 files changed, 122 insertions(+), 2 deletions(-) create mode 100644 pkgs/ioswitch/utils/utils.go diff --git a/pkgs/ioswitch/exec/exec.go b/pkgs/ioswitch/exec/exec.go index ec16299..873ad59 100644 --- a/pkgs/ioswitch/exec/exec.go +++ b/pkgs/ioswitch/exec/exec.go @@ -19,6 +19,7 @@ var opUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[O type Op interface { Execute(ctx context.Context, e *Executor) error + String() string } func UseOp[T Op]() { diff --git a/pkgs/ioswitch/exec/plan_builder.go b/pkgs/ioswitch/exec/plan_builder.go index 24b0525..0e1e44a 100644 --- a/pkgs/ioswitch/exec/plan_builder.go +++ b/pkgs/ioswitch/exec/plan_builder.go @@ -2,6 +2,7 @@ package exec import ( "context" + "strings" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/utils/lo2" @@ -97,6 +98,29 @@ func (b *PlanBuilder) Execute() *Driver { return &exec } +func (b *PlanBuilder) String() string { + sb := strings.Builder{} + sb.WriteString("Driver:\n") + for _, op := range b.DriverPlan.Ops { + sb.WriteString(op.String()) + sb.WriteRune('\n') + } + sb.WriteRune('\n') + + for _, w := range b.WorkerPlans { + sb.WriteString("Worker(") + sb.WriteString(w.Worker.String()) + sb.WriteString("):\n") + for _, op := range w.Ops { + sb.WriteString(op.String()) + sb.WriteRune('\n') + } + sb.WriteRune('\n') + } + + return sb.String() +} + type WorkerPlanBuilder struct { Worker WorkerInfo Ops []Op diff --git a/pkgs/ioswitch/plan/ops/driver.go b/pkgs/ioswitch/plan/ops/driver.go index 214f333..7de285e 100644 --- a/pkgs/ioswitch/plan/ops/driver.go +++ b/pkgs/ioswitch/plan/ops/driver.go @@ -5,6 +5,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" ) type FromDriverType struct { @@ -12,7 +13,7 @@ type FromDriverType struct { } func (t *FromDriverType) InitNode(node *dag.Node) { - dag.NodeNewOutputStream(node, nil) + dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) } func (t *FromDriverType) GenerateOp(op *dag.Node) (exec.Op, error) { diff --git a/pkgs/ioswitch/plan/ops/drop.go b/pkgs/ioswitch/plan/ops/drop.go index 22f2d5d..b1cbf2e 100644 --- a/pkgs/ioswitch/plan/ops/drop.go +++ b/pkgs/ioswitch/plan/ops/drop.go @@ -22,6 +22,7 @@ func (o *DropStream) Execute(ctx context.Context, e *exec.Executor) error { if err != nil { return err } + defer o.Input.Stream.Close() for { buf := make([]byte, 1024*8) @@ -35,6 +36,10 @@ func (o *DropStream) Execute(ctx context.Context, e *exec.Executor) error { } } +func (o *DropStream) String() string { + return fmt.Sprintf("DropStream %v", o.Input.ID) +} + type DropType struct{} func (t *DropType) InitNode(node *dag.Node) { diff --git a/pkgs/ioswitch/plan/ops/send.go b/pkgs/ioswitch/plan/ops/send.go index 659b5e2..08f44fa 100644 --- a/pkgs/ioswitch/plan/ops/send.go +++ b/pkgs/ioswitch/plan/ops/send.go @@ -49,6 +49,10 @@ func (o *SendStream) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *SendStream) String() string { + return fmt.Sprintf("SendStream %v->%v@%v", o.Input.ID, o.Send.ID, o.Worker) +} + type GetStream struct { Signal *exec.SignalVar `json:"signal"` Target *exec.StreamVar `json:"target"` @@ -80,6 +84,10 @@ func (o *GetStream) Execute(ctx context.Context, e *exec.Executor) error { return fut.Wait(ctx) } +func (o *GetStream) String() string { + return fmt.Sprintf("GetStream %v(S:%v)<-%v@%v", o.Output.ID, o.Signal.ID, o.Target.ID, o.Worker) +} + type SendVar struct { Input exec.Var `json:"input"` Send exec.Var `json:"send"` @@ -109,6 +117,10 @@ func (o *SendVar) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *SendVar) String() string { + return fmt.Sprintf("SendVar %v->%v@%v", o.Input.GetID(), o.Send.GetID(), o.Worker) +} + type GetVar struct { Signal *exec.SignalVar `json:"signal"` Target exec.Var `json:"target"` @@ -135,6 +147,10 @@ func (o *GetVar) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *GetVar) String() string { + return fmt.Sprintf("GetVar %v(S:%v)<-%v@%v", o.Output.GetID(), o.Signal.ID, o.Target.GetID(), o.Worker) +} + type SendStreamType struct { ToWorker exec.WorkerInfo } diff --git a/pkgs/ioswitch/plan/ops/store.go b/pkgs/ioswitch/plan/ops/store.go index 47ae9b5..ad48ab5 100644 --- a/pkgs/ioswitch/plan/ops/store.go +++ b/pkgs/ioswitch/plan/ops/store.go @@ -29,6 +29,10 @@ func (o *Store) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *Store) String() string { + return fmt.Sprintf("Store %v: %v", o.Key, o.Var.GetID()) +} + type StoreType struct { StoreKey string } diff --git a/pkgs/ioswitch/plan/ops/sync.go b/pkgs/ioswitch/plan/ops/sync.go index f30db3e..4689e70 100644 --- a/pkgs/ioswitch/plan/ops/sync.go +++ b/pkgs/ioswitch/plan/ops/sync.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" ) func init() { @@ -36,6 +37,10 @@ func (o *OnStreamBegin) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *OnStreamBegin) String() string { + return fmt.Sprintf("OnStreamBegin %v->%v S:%v", o.Raw.ID, o.New.ID, o.Signal.ID) +} + type OnStreamEnd struct { Raw *exec.StreamVar `json:"raw"` New *exec.StreamVar `json:"new"` @@ -85,6 +90,10 @@ func (o *OnStreamEnd) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *OnStreamEnd) String() string { + return fmt.Sprintf("OnStreamEnd %v->%v S:%v", o.Raw.ID, o.New.ID, o.Signal.ID) +} + type HoldUntil struct { Waits []*exec.SignalVar `json:"waits"` Holds []exec.Var `json:"holds"` @@ -113,6 +122,10 @@ func (w *HoldUntil) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (w *HoldUntil) String() string { + return fmt.Sprintf("HoldUntil Waits: %v, (%v) -> (%v)", utils.FormatVarIDs(w.Waits), utils.FormatVarIDs(w.Holds), utils.FormatVarIDs(w.Emits)) +} + type HangUntil struct { Waits []*exec.SignalVar `json:"waits"` Op exec.Op `json:"op"` @@ -127,6 +140,10 @@ func (h *HangUntil) Execute(ctx context.Context, e *exec.Executor) error { return h.Op.Execute(ctx, e) } +func (h *HangUntil) String() string { + return "HangUntil" +} + type Broadcast struct { Source *exec.SignalVar `json:"source"` Targets []*exec.SignalVar `json:"targets"` @@ -142,6 +159,10 @@ func (b *Broadcast) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (b *Broadcast) String() string { + return "Broadcast" +} + type HoldUntilType struct { } diff --git a/pkgs/ioswitch/plan/ops/var.go b/pkgs/ioswitch/plan/ops/var.go index d2a7fe6..cb6aec0 100644 --- a/pkgs/ioswitch/plan/ops/var.go +++ b/pkgs/ioswitch/plan/ops/var.go @@ -18,3 +18,7 @@ func (o *ConstVar) Execute(ctx context.Context, e *exec.Executor) error { e.PutVars(o.Var) return nil } + +func (o *ConstVar) String() string { + return "ConstVar" +} diff --git a/pkgs/ioswitch/utils/utils.go b/pkgs/ioswitch/utils/utils.go new file mode 100644 index 0000000..2db1ff6 --- /dev/null +++ b/pkgs/ioswitch/utils/utils.go @@ -0,0 +1,19 @@ +package utils + +import ( + "fmt" + "strings" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" +) + +func FormatVarIDs[T exec.Var](arr []T) string { + sb := strings.Builder{} + for i, v := range arr { + sb.WriteString(fmt.Sprintf("%v", v.GetID())) + if i < len(arr)-1 { + sb.WriteString(",") + } + } + return sb.String() +} diff --git a/sdks/storage/models.go b/sdks/storage/models.go index c07d95f..7645a59 100644 --- a/sdks/storage/models.go +++ b/sdks/storage/models.go @@ -94,7 +94,7 @@ func (b *ECRedundancy) Value() (driver.Value, error) { return serder.ObjectToJSONEx[Redundancy](b) } -var DefaultLRCRedundancy = *NewLRCRedundancy(3, 2, []int{2}, 1024*1024*5) +var DefaultLRCRedundancy = *NewLRCRedundancy(2, 4, []int{2}, 1024*1024*5) type LRCRedundancy struct { serder.Metadata `union:"lrc"` @@ -120,6 +120,10 @@ func (b *LRCRedundancy) Value() (driver.Value, error) { // 判断指定块属于哪个组。如果都不属于,则返回-1。 func (b *LRCRedundancy) FindGroup(idx int) int { + if idx >= b.N-len(b.Groups) { + return idx - (b.N - len(b.Groups)) + } + for i, group := range b.Groups { if idx < group { return i @@ -130,6 +134,27 @@ func (b *LRCRedundancy) FindGroup(idx int) int { return -1 } +// M = N - len(Groups),即数据块+校验块的总数,不包括组校验块。 +func (b *LRCRedundancy) M() int { + return b.N - len(b.Groups) +} + +func (b *LRCRedundancy) GetGroupElements(grp int) []int { + var idxes []int + + grpStart := 0 + for i := 0; i < grp; i++ { + grpStart += b.Groups[i] + } + + for i := 0; i < b.Groups[grp]; i++ { + idxes = append(idxes, grpStart+i) + } + + idxes = append(idxes, b.N-len(b.Groups)+grp) + return idxes +} + const ( PackageStateNormal = "Normal" PackageStateDeleted = "Deleted"