| @@ -19,6 +19,7 @@ var opUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[O | |||||
| type Op interface { | type Op interface { | ||||
| Execute(ctx context.Context, e *Executor) error | Execute(ctx context.Context, e *Executor) error | ||||
| String() string | |||||
| } | } | ||||
| func UseOp[T Op]() { | func UseOp[T Op]() { | ||||
| @@ -2,6 +2,7 @@ package exec | |||||
| import ( | import ( | ||||
| "context" | "context" | ||||
| "strings" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/future" | "gitlink.org.cn/cloudream/common/pkgs/future" | ||||
| "gitlink.org.cn/cloudream/common/utils/lo2" | "gitlink.org.cn/cloudream/common/utils/lo2" | ||||
| @@ -97,6 +98,29 @@ func (b *PlanBuilder) Execute() *Driver { | |||||
| return &exec | return &exec | ||||
| } | } | ||||
| func (b *PlanBuilder) String() string { | |||||
| sb := strings.Builder{} | |||||
| sb.WriteString("Driver:\n") | |||||
| for _, op := range b.DriverPlan.Ops { | |||||
| sb.WriteString(op.String()) | |||||
| sb.WriteRune('\n') | |||||
| } | |||||
| sb.WriteRune('\n') | |||||
| for _, w := range b.WorkerPlans { | |||||
| sb.WriteString("Worker(") | |||||
| sb.WriteString(w.Worker.String()) | |||||
| sb.WriteString("):\n") | |||||
| for _, op := range w.Ops { | |||||
| sb.WriteString(op.String()) | |||||
| sb.WriteRune('\n') | |||||
| } | |||||
| sb.WriteRune('\n') | |||||
| } | |||||
| return sb.String() | |||||
| } | |||||
| type WorkerPlanBuilder struct { | type WorkerPlanBuilder struct { | ||||
| Worker WorkerInfo | Worker WorkerInfo | ||||
| Ops []Op | Ops []Op | ||||
| @@ -5,6 +5,7 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" | "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | ||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" | |||||
| ) | ) | ||||
| type FromDriverType struct { | type FromDriverType struct { | ||||
| @@ -12,7 +13,7 @@ type FromDriverType struct { | |||||
| } | } | ||||
| func (t *FromDriverType) InitNode(node *dag.Node) { | func (t *FromDriverType) InitNode(node *dag.Node) { | ||||
| dag.NodeNewOutputStream(node, nil) | |||||
| dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) | |||||
| } | } | ||||
| func (t *FromDriverType) GenerateOp(op *dag.Node) (exec.Op, error) { | func (t *FromDriverType) GenerateOp(op *dag.Node) (exec.Op, error) { | ||||
| @@ -22,6 +22,7 @@ func (o *DropStream) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| if err != nil { | if err != nil { | ||||
| return err | return err | ||||
| } | } | ||||
| defer o.Input.Stream.Close() | |||||
| for { | for { | ||||
| buf := make([]byte, 1024*8) | buf := make([]byte, 1024*8) | ||||
| @@ -35,6 +36,10 @@ func (o *DropStream) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| } | } | ||||
| } | } | ||||
| func (o *DropStream) String() string { | |||||
| return fmt.Sprintf("DropStream %v", o.Input.ID) | |||||
| } | |||||
| type DropType struct{} | type DropType struct{} | ||||
| func (t *DropType) InitNode(node *dag.Node) { | func (t *DropType) InitNode(node *dag.Node) { | ||||
| @@ -49,6 +49,10 @@ func (o *SendStream) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| return nil | return nil | ||||
| } | } | ||||
| func (o *SendStream) String() string { | |||||
| return fmt.Sprintf("SendStream %v->%v@%v", o.Input.ID, o.Send.ID, o.Worker) | |||||
| } | |||||
| type GetStream struct { | type GetStream struct { | ||||
| Signal *exec.SignalVar `json:"signal"` | Signal *exec.SignalVar `json:"signal"` | ||||
| Target *exec.StreamVar `json:"target"` | Target *exec.StreamVar `json:"target"` | ||||
| @@ -80,6 +84,10 @@ func (o *GetStream) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| return fut.Wait(ctx) | return fut.Wait(ctx) | ||||
| } | } | ||||
| func (o *GetStream) String() string { | |||||
| return fmt.Sprintf("GetStream %v(S:%v)<-%v@%v", o.Output.ID, o.Signal.ID, o.Target.ID, o.Worker) | |||||
| } | |||||
| type SendVar struct { | type SendVar struct { | ||||
| Input exec.Var `json:"input"` | Input exec.Var `json:"input"` | ||||
| Send exec.Var `json:"send"` | Send exec.Var `json:"send"` | ||||
| @@ -109,6 +117,10 @@ func (o *SendVar) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| return nil | return nil | ||||
| } | } | ||||
| func (o *SendVar) String() string { | |||||
| return fmt.Sprintf("SendVar %v->%v@%v", o.Input.GetID(), o.Send.GetID(), o.Worker) | |||||
| } | |||||
| type GetVar struct { | type GetVar struct { | ||||
| Signal *exec.SignalVar `json:"signal"` | Signal *exec.SignalVar `json:"signal"` | ||||
| Target exec.Var `json:"target"` | Target exec.Var `json:"target"` | ||||
| @@ -135,6 +147,10 @@ func (o *GetVar) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| return nil | return nil | ||||
| } | } | ||||
| func (o *GetVar) String() string { | |||||
| return fmt.Sprintf("GetVar %v(S:%v)<-%v@%v", o.Output.GetID(), o.Signal.ID, o.Target.GetID(), o.Worker) | |||||
| } | |||||
| type SendStreamType struct { | type SendStreamType struct { | ||||
| ToWorker exec.WorkerInfo | ToWorker exec.WorkerInfo | ||||
| } | } | ||||
| @@ -29,6 +29,10 @@ func (o *Store) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| return nil | return nil | ||||
| } | } | ||||
| func (o *Store) String() string { | |||||
| return fmt.Sprintf("Store %v: %v", o.Key, o.Var.GetID()) | |||||
| } | |||||
| type StoreType struct { | type StoreType struct { | ||||
| StoreKey string | StoreKey string | ||||
| } | } | ||||
| @@ -8,6 +8,7 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/future" | "gitlink.org.cn/cloudream/common/pkgs/future" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" | "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" | |||||
| ) | ) | ||||
| func init() { | func init() { | ||||
| @@ -36,6 +37,10 @@ func (o *OnStreamBegin) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| return nil | return nil | ||||
| } | } | ||||
| func (o *OnStreamBegin) String() string { | |||||
| return fmt.Sprintf("OnStreamBegin %v->%v S:%v", o.Raw.ID, o.New.ID, o.Signal.ID) | |||||
| } | |||||
| type OnStreamEnd struct { | type OnStreamEnd struct { | ||||
| Raw *exec.StreamVar `json:"raw"` | Raw *exec.StreamVar `json:"raw"` | ||||
| New *exec.StreamVar `json:"new"` | New *exec.StreamVar `json:"new"` | ||||
| @@ -85,6 +90,10 @@ func (o *OnStreamEnd) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| return nil | return nil | ||||
| } | } | ||||
| func (o *OnStreamEnd) String() string { | |||||
| return fmt.Sprintf("OnStreamEnd %v->%v S:%v", o.Raw.ID, o.New.ID, o.Signal.ID) | |||||
| } | |||||
| type HoldUntil struct { | type HoldUntil struct { | ||||
| Waits []*exec.SignalVar `json:"waits"` | Waits []*exec.SignalVar `json:"waits"` | ||||
| Holds []exec.Var `json:"holds"` | Holds []exec.Var `json:"holds"` | ||||
| @@ -113,6 +122,10 @@ func (w *HoldUntil) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| return nil | return nil | ||||
| } | } | ||||
| func (w *HoldUntil) String() string { | |||||
| return fmt.Sprintf("HoldUntil Waits: %v, (%v) -> (%v)", utils.FormatVarIDs(w.Waits), utils.FormatVarIDs(w.Holds), utils.FormatVarIDs(w.Emits)) | |||||
| } | |||||
| type HangUntil struct { | type HangUntil struct { | ||||
| Waits []*exec.SignalVar `json:"waits"` | Waits []*exec.SignalVar `json:"waits"` | ||||
| Op exec.Op `json:"op"` | Op exec.Op `json:"op"` | ||||
| @@ -127,6 +140,10 @@ func (h *HangUntil) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| return h.Op.Execute(ctx, e) | return h.Op.Execute(ctx, e) | ||||
| } | } | ||||
| func (h *HangUntil) String() string { | |||||
| return "HangUntil" | |||||
| } | |||||
| type Broadcast struct { | type Broadcast struct { | ||||
| Source *exec.SignalVar `json:"source"` | Source *exec.SignalVar `json:"source"` | ||||
| Targets []*exec.SignalVar `json:"targets"` | Targets []*exec.SignalVar `json:"targets"` | ||||
| @@ -142,6 +159,10 @@ func (b *Broadcast) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| return nil | return nil | ||||
| } | } | ||||
| func (b *Broadcast) String() string { | |||||
| return "Broadcast" | |||||
| } | |||||
| type HoldUntilType struct { | type HoldUntilType struct { | ||||
| } | } | ||||
| @@ -18,3 +18,7 @@ func (o *ConstVar) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| e.PutVars(o.Var) | e.PutVars(o.Var) | ||||
| return nil | return nil | ||||
| } | } | ||||
| func (o *ConstVar) String() string { | |||||
| return "ConstVar" | |||||
| } | |||||
| @@ -0,0 +1,19 @@ | |||||
| package utils | |||||
| import ( | |||||
| "fmt" | |||||
| "strings" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | |||||
| ) | |||||
| func FormatVarIDs[T exec.Var](arr []T) string { | |||||
| sb := strings.Builder{} | |||||
| for i, v := range arr { | |||||
| sb.WriteString(fmt.Sprintf("%v", v.GetID())) | |||||
| if i < len(arr)-1 { | |||||
| sb.WriteString(",") | |||||
| } | |||||
| } | |||||
| return sb.String() | |||||
| } | |||||
| @@ -94,7 +94,7 @@ func (b *ECRedundancy) Value() (driver.Value, error) { | |||||
| return serder.ObjectToJSONEx[Redundancy](b) | return serder.ObjectToJSONEx[Redundancy](b) | ||||
| } | } | ||||
| var DefaultLRCRedundancy = *NewLRCRedundancy(3, 2, []int{2}, 1024*1024*5) | |||||
| var DefaultLRCRedundancy = *NewLRCRedundancy(2, 4, []int{2}, 1024*1024*5) | |||||
| type LRCRedundancy struct { | type LRCRedundancy struct { | ||||
| serder.Metadata `union:"lrc"` | serder.Metadata `union:"lrc"` | ||||
| @@ -120,6 +120,10 @@ func (b *LRCRedundancy) Value() (driver.Value, error) { | |||||
| // 判断指定块属于哪个组。如果都不属于,则返回-1。 | // 判断指定块属于哪个组。如果都不属于,则返回-1。 | ||||
| func (b *LRCRedundancy) FindGroup(idx int) int { | func (b *LRCRedundancy) FindGroup(idx int) int { | ||||
| if idx >= b.N-len(b.Groups) { | |||||
| return idx - (b.N - len(b.Groups)) | |||||
| } | |||||
| for i, group := range b.Groups { | for i, group := range b.Groups { | ||||
| if idx < group { | if idx < group { | ||||
| return i | return i | ||||
| @@ -130,6 +134,27 @@ func (b *LRCRedundancy) FindGroup(idx int) int { | |||||
| return -1 | return -1 | ||||
| } | } | ||||
| // M = N - len(Groups),即数据块+校验块的总数,不包括组校验块。 | |||||
| func (b *LRCRedundancy) M() int { | |||||
| return b.N - len(b.Groups) | |||||
| } | |||||
| func (b *LRCRedundancy) GetGroupElements(grp int) []int { | |||||
| var idxes []int | |||||
| grpStart := 0 | |||||
| for i := 0; i < grp; i++ { | |||||
| grpStart += b.Groups[i] | |||||
| } | |||||
| for i := 0; i < b.Groups[grp]; i++ { | |||||
| idxes = append(idxes, grpStart+i) | |||||
| } | |||||
| idxes = append(idxes, b.N-len(b.Groups)+grp) | |||||
| return idxes | |||||
| } | |||||
| const ( | const ( | ||||
| PackageStateNormal = "Normal" | PackageStateNormal = "Normal" | ||||
| PackageStateDeleted = "Deleted" | PackageStateDeleted = "Deleted" | ||||