diff --git a/client/internal/services/object.go b/client/internal/services/object.go index e4d13da..1af247a 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -725,7 +725,7 @@ func (svc *ObjectService) CompleteMultipartUpload(objectID types.ObjectID, index return types.Object{}, err } - shardInfo := ret["shard"].(*ops2.FileInfoValue) + shardInfo := ret.Get("shard").(*ops2.FileInfoValue) err = db.DoTx10(svc.DB, svc.DB.Object().BatchUpdateRedundancy, []db.UpdatingObjectRedundancy{ { diff --git a/client/internal/ticktock/redundancy_recover.go b/client/internal/ticktock/redundancy_recover.go index f98fbc5..2415fbb 100644 --- a/client/internal/ticktock/redundancy_recover.go +++ b/client/internal/ticktock/redundancy_recover.go @@ -377,7 +377,7 @@ func (t *ChangeRedundancy) noneToRep(ctx *changeRedundancyContext, obj clitypes. var blocks []clitypes.ObjectBlock var blockChgs []datamap.BlockChange for i, stg := range uploadStgs { - r := ret[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue) + r := ret.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue) blocks = append(blocks, clitypes.ObjectBlock{ ObjectID: obj.Object.ObjectID, Index: 0, @@ -445,7 +445,7 @@ func (t *ChangeRedundancy) noneToEC(ctx *changeRedundancyContext, obj clitypes.O var evtTargetBlocks []datamap.Block var evtBlockTrans []datamap.DataTransfer for i := 0; i < red.N; i++ { - r := ioRet[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue) + r := ioRet.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue) blocks = append(blocks, clitypes.ObjectBlock{ ObjectID: obj.Object.ObjectID, Index: i, @@ -526,7 +526,7 @@ func (t *ChangeRedundancy) noneToLRC(ctx *changeRedundancyContext, obj clitypes. var evtTargetBlocks []datamap.Block var evtBlockTrans []datamap.DataTransfer for i := 0; i < red.N; i++ { - r := ioRet[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue) + r := ioRet.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue) blocks = append(blocks, clitypes.ObjectBlock{ ObjectID: obj.Object.ObjectID, Index: i, @@ -614,7 +614,7 @@ func (t *ChangeRedundancy) noneToSeg(ctx *changeRedundancyContext, obj clitypes. var evtTargetBlocks []datamap.Block var evtBlockTrans []datamap.DataTransfer for i, stg := range uploadStgs { - r := ret[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue) + r := ret.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue) blocks = append(blocks, clitypes.ObjectBlock{ ObjectID: obj.Object.ObjectID, Index: i, @@ -700,7 +700,7 @@ func (t *ChangeRedundancy) repToRep(ctx *changeRedundancyContext, obj clitypes.O var blocks []clitypes.ObjectBlock var blockChgs []datamap.BlockChange for i, stg := range uploadStgs { - r := ret[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue) + r := ret.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue) blocks = append(blocks, clitypes.ObjectBlock{ ObjectID: obj.Object.ObjectID, Index: 0, @@ -798,7 +798,7 @@ func (t *ChangeRedundancy) ecToRep(ctx *changeRedundancyContext, obj clitypes.Ob var blocks []clitypes.ObjectBlock for i := range uploadStgs { - r := ioRet[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue) + r := ioRet.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue) blocks = append(blocks, clitypes.ObjectBlock{ ObjectID: obj.Object.ObjectID, Index: 0, @@ -960,11 +960,15 @@ func (t *ChangeRedundancy) ecToEC(ctx *changeRedundancyContext, obj clitypes.Obj return nil, nil, nil } - for k, v := range ret { + for k, vs := range ret.Stored { idx, err := strconv.ParseInt(k, 10, 64) if err != nil { return nil, nil, fmt.Errorf("parsing result key %s as index: %w", k, err) } + if len(vs) == 0 { + continue + } + v := vs[0] r := v.(*ops2.FileInfoValue) newBlocks[idx].FileHash = r.Hash @@ -1202,11 +1206,15 @@ func (t *ChangeRedundancy) reconstructLRC(ctx *changeRedundancyContext, obj clit return nil, nil, nil } - for k, v := range ret { + for k, vs := range ret.Stored { idx, err := strconv.ParseInt(k, 10, 64) if err != nil { return nil, nil, fmt.Errorf("parsing result key %s as index: %w", k, err) } + if len(vs) == 0 { + continue + } + v := vs[0] r := v.(*ops2.FileInfoValue) newBlocks[idx].FileHash = r.Hash diff --git a/client/internal/ticktock/redundancy_shrink.go b/client/internal/ticktock/redundancy_shrink.go index b33139f..b032d2c 100644 --- a/client/internal/ticktock/redundancy_shrink.go +++ b/client/internal/ticktock/redundancy_shrink.go @@ -906,20 +906,20 @@ func (t *ChangeRedundancy) generateSysEventForECObject(solu annealingSolution, o return []datamap.SysEventBody{transEvt, distEvt} } -func (t *ChangeRedundancy) executePlans(ctx *changeRedundancyContext, planBld *exec.PlanBuilder, planningSpaceIDs map[clitypes.UserSpaceID]bool, reen *publock.Reentrant) (map[string]exec.VarValue, error) { +func (t *ChangeRedundancy) executePlans(ctx *changeRedundancyContext, planBld *exec.PlanBuilder, planningSpaceIDs map[clitypes.UserSpaceID]bool, reen *publock.Reentrant) (exec.PlanResult, error) { reqBlder := reqbuilder.NewBuilder() for id, _ := range planningSpaceIDs { reqBlder.UserSpace().Buzy(id) } err := reen.Lock(reqBlder.Build()) if err != nil { - return nil, fmt.Errorf("locking shard resources: %w", err) + return exec.PlanResult{}, fmt.Errorf("locking shard resources: %w", err) } wg := sync.WaitGroup{} // 执行IO计划 - var ioSwRets map[string]exec.VarValue + var ioSwRets exec.PlanResult var ioSwErr error wg.Add(1) go func() { @@ -938,13 +938,13 @@ func (t *ChangeRedundancy) executePlans(ctx *changeRedundancyContext, planBld *e wg.Wait() if ioSwErr != nil { - return nil, ioSwErr + return exec.PlanResult{}, ioSwErr } return ioSwRets, nil } -func (t *ChangeRedundancy) populateECObjectEntry(entry *db.UpdatingObjectRedundancy, obj clitypes.ObjectDetail, ioRets map[string]exec.VarValue) { +func (t *ChangeRedundancy) populateECObjectEntry(entry *db.UpdatingObjectRedundancy, obj clitypes.ObjectDetail, ioRets exec.PlanResult) { for i := range entry.Blocks { if entry.Blocks[i].FileHash != "" { continue @@ -952,7 +952,7 @@ func (t *ChangeRedundancy) populateECObjectEntry(entry *db.UpdatingObjectRedunda key := fmt.Sprintf("%d.%d", obj.Object.ObjectID, entry.Blocks[i].Index) // 不应该出现key不存在的情况 - r := ioRets[key].(*ops2.FileInfoValue) + r := ioRets.Get(key).(*ops2.FileInfoValue) entry.Blocks[i].FileHash = r.Hash entry.Blocks[i].Size = r.Size } diff --git a/client/internal/uploader/create_load.go b/client/internal/uploader/create_load.go index c4973ff..27648fc 100644 --- a/client/internal/uploader/create_load.go +++ b/client/internal/uploader/create_load.go @@ -72,7 +72,7 @@ func (u *CreateUploader) Upload(pa types.JPath, stream io.Reader, opts ...Upload defer u.lock.Unlock() // 记录上传结果 - shardInfo := ret["fileHash"].(*ops2.FileInfoValue) + shardInfo := ret.Get("fileHash").(*ops2.FileInfoValue) u.successes = append(u.successes, db.AddObjectEntry{ Path: pa.String(), Size: shardInfo.Size, diff --git a/client/internal/uploader/update.go b/client/internal/uploader/update.go index d0b2627..ed68df1 100644 --- a/client/internal/uploader/update.go +++ b/client/internal/uploader/update.go @@ -82,7 +82,7 @@ func (w *UpdateUploader) Upload(pat types.JPath, stream io.Reader, opts ...Uploa defer w.lock.Unlock() // 记录上传结果 - shardInfo := ret["shardInfo"].(*ops2.FileInfoValue) + shardInfo := ret.Get("shardInfo").(*ops2.FileInfoValue) w.successes = append(w.successes, db.AddObjectEntry{ Path: pat.String(), Size: shardInfo.Size, diff --git a/client/internal/uploader/uploader.go b/client/internal/uploader/uploader.go index 19676b3..6e8e35a 100644 --- a/client/internal/uploader/uploader.go +++ b/client/internal/uploader/uploader.go @@ -270,7 +270,7 @@ func (u *Uploader) UploadPart(objID clitypes.ObjectID, index int, stream io.Read return fmt.Errorf("executing plan: %w", err) } - shardInfo := ret["shard"].(*ops2.FileInfoValue) + shardInfo := ret.Get("shard").(*ops2.FileInfoValue) err = u.db.DoTx(func(tx db.SQLContext) error { return u.db.Object().AppendPart(tx, clitypes.ObjectBlock{ ObjectID: objID, diff --git a/client/internal/uploader/user_space_upload.go b/client/internal/uploader/user_space_upload.go index 3cdb7b5..ff8c4f0 100644 --- a/client/internal/uploader/user_space_upload.go +++ b/client/internal/uploader/user_space_upload.go @@ -179,7 +179,7 @@ func (u *Uploader) uploadFromBaseStore(srcSpace *clitypes.UserSpaceDetail, targe return nil, fmt.Errorf("executing plan: %w", err) } - adds := make([]db.AddObjectEntry, 0, len(ret)) + adds := make([]db.AddObjectEntry, 0, len(ret.Stored)) for _, e := range entries { if e.IsDir { continue @@ -192,7 +192,7 @@ func (u *Uploader) uploadFromBaseStore(srcSpace *clitypes.UserSpaceDetail, targe pat.DropFrontN(1) } - info := ret[e.Path.String()].(*ops2.FileInfoValue) + info := ret.Get(e.Path.String()).(*ops2.FileInfoValue) adds = append(adds, db.AddObjectEntry{ Path: pat.String(), Size: info.Size, diff --git a/common/pkgs/ioswitch/exec/driver.go b/common/pkgs/ioswitch/exec/driver.go index 433b7c3..c0b6bbe 100644 --- a/common/pkgs/ioswitch/exec/driver.go +++ b/common/pkgs/ioswitch/exec/driver.go @@ -15,7 +15,7 @@ import ( type Driver struct { planID PlanID planBlder *PlanBuilder - callback *future.SetValueFuture[map[string]VarValue] + callback *future.SetValueFuture[PlanResult] ctx *ExecContext cancel context.CancelFunc driverExec *Executor @@ -44,20 +44,22 @@ func (e *Driver) Signal(signal *DriverSignalVar) { e.driverExec.PutVar(signal.ID, &SignalValue{}) } -func (e *Driver) Wait(ctx context.Context) (map[string]VarValue, error) { - stored, err := e.callback.Wait(ctx) +func (e *Driver) Wait(ctx context.Context) (PlanResult, error) { + ret, err := e.callback.Wait(ctx) if err != nil { - return nil, err + return PlanResult{}, err } - return stored, nil + return ret, nil } func (e *Driver) execute() { wg := sync.WaitGroup{} - errLock := sync.Mutex{} + retLock := sync.Mutex{} var execErr error + stored := make(map[string][]VarValue) + for _, p := range e.planBlder.WorkerPlans { wg.Add(1) @@ -71,36 +73,49 @@ func (e *Driver) execute() { cli, err := p.Worker.NewClient() if err != nil { - errLock.Lock() + retLock.Lock() execErr = multierror.Append(execErr, fmt.Errorf("worker %v: new client: %w", p.Worker, err)) - errLock.Unlock() + retLock.Unlock() cancel() return } defer cli.Close() - err = cli.ExecutePlan(ctx, plan) + ret, err := cli.ExecutePlan(ctx, plan) if err != nil { - errLock.Lock() + retLock.Lock() execErr = multierror.Append(execErr, fmt.Errorf("worker %v: execute plan: %w", p.Worker, err)) - errLock.Unlock() + retLock.Unlock() cancel() return } + + retLock.Lock() + for k, v := range ret.Stored { + stored[k] = append(stored[k], v...) + } + retLock.Unlock() + }(p, e.ctx.Context, e.cancel) } - stored, err := e.driverExec.Run(e.ctx) + ret, err := e.driverExec.Run(e.ctx) if err != nil { - errLock.Lock() + retLock.Lock() execErr = multierror.Append(execErr, fmt.Errorf("driver: execute plan: %w", err)) - errLock.Unlock() + retLock.Unlock() e.cancel() } wg.Wait() - e.callback.SetComplete(stored, execErr) + for k, v := range ret.Stored { + stored[k] = append(stored[k], v...) + } + + e.callback.SetComplete(PlanResult{ + Stored: stored, + }, execErr) } type DriverWriteStream struct { @@ -116,3 +131,25 @@ type DriverSignalVar struct { ID VarID Signal SignalValue } + +type PlanResult struct { + Stored map[string][]VarValue +} + +func (r *PlanResult) Get(key string) VarValue { + v, ok := r.Stored[key] + if !ok || len(v) == 0 { + return nil + } + + return v[0] +} + +func (r *PlanResult) GetArray(key string) []VarValue { + v, ok := r.Stored[key] + if !ok { + return nil + } + + return v +} diff --git a/common/pkgs/ioswitch/exec/executor.go b/common/pkgs/ioswitch/exec/executor.go index 50ed2d3..42cae9c 100644 --- a/common/pkgs/ioswitch/exec/executor.go +++ b/common/pkgs/ioswitch/exec/executor.go @@ -26,14 +26,14 @@ type Executor struct { vars map[VarID]freeVar bindings []*binding lock sync.Mutex - store map[string]VarValue + store map[string][]VarValue } func NewExecutor(plan Plan) *Executor { planning := Executor{ plan: plan, vars: make(map[VarID]freeVar), - store: make(map[string]VarValue), + store: make(map[string][]VarValue), } return &planning @@ -43,7 +43,7 @@ func (s *Executor) Plan() *Plan { return &s.plan } -func (s *Executor) Run(ctx *ExecContext) (map[string]VarValue, error) { +func (s *Executor) Run(ctx *ExecContext) (ExecutorResult, error) { c, cancel := context.WithCancel(ctx.Context) ctx = &ExecContext{ Context: c, @@ -54,10 +54,10 @@ func (s *Executor) Run(ctx *ExecContext) (map[string]VarValue, error) { err := s.runOps(s.plan.Ops, ctx, cancel) if err != nil { - return nil, err + return ExecutorResult{}, err } - return s.store, nil + return ExecutorResult{Stored: s.store}, nil } func (s *Executor) runOps(ops []Op, ctx *ExecContext, cancel context.CancelFunc) error { @@ -136,7 +136,11 @@ func (s *Executor) Store(key string, val VarValue) { s.lock.Lock() defer s.lock.Unlock() - s.store[key] = val + s.store[key] = append(s.store[key], val) +} + +type ExecutorResult struct { + Stored map[string][]VarValue } func BindVar[T VarValue](e *Executor, ctx context.Context, id VarID) (T, error) { diff --git a/common/pkgs/ioswitch/exec/plan_builder.go b/common/pkgs/ioswitch/exec/plan_builder.go index ac7a263..21eae7d 100644 --- a/common/pkgs/ioswitch/exec/plan_builder.go +++ b/common/pkgs/ioswitch/exec/plan_builder.go @@ -63,7 +63,7 @@ func (b *PlanBuilder) Execute(ctx *ExecContext) *Driver { exec := Driver{ planID: planID, planBlder: b, - callback: future.NewSetValue[map[string]VarValue](), + callback: future.NewSetValue[PlanResult](), ctx: ctx, cancel: cancel, driverExec: NewExecutor(execPlan), diff --git a/common/pkgs/ioswitch/exec/worker.go b/common/pkgs/ioswitch/exec/worker.go index d7fb915..2091563 100644 --- a/common/pkgs/ioswitch/exec/worker.go +++ b/common/pkgs/ioswitch/exec/worker.go @@ -94,7 +94,7 @@ type WorkerInfo interface { } type WorkerClient interface { - ExecutePlan(ctx context.Context, plan Plan) error + ExecutePlan(ctx context.Context, plan Plan) (ExecutorResult, error) SendStream(ctx context.Context, planID PlanID, id VarID, stream io.ReadCloser) error SendVar(ctx context.Context, planID PlanID, id VarID, value VarValue) error diff --git a/common/pkgs/ioswitch2/http_hub_worker.go b/common/pkgs/ioswitch2/http_hub_worker.go index 31f6511..691ca67 100644 --- a/common/pkgs/ioswitch2/http_hub_worker.go +++ b/common/pkgs/ioswitch2/http_hub_worker.go @@ -50,10 +50,15 @@ type HttpHubWorkerClient struct { cli *hubapi.Client } -func (c *HttpHubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { - return c.cli.ExecuteIOPlan(hubapi.ExecuteIOPlanReq{ +func (c *HttpHubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) (exec.ExecutorResult, error) { + resp, err := c.cli.ExecuteIOPlan(hubapi.ExecuteIOPlanReq{ Plan: plan, }) + if err != nil { + return exec.ExecutorResult{}, err + } + + return resp.Result, nil } func (c *HttpHubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { return c.cli.SendStream(hubapi.SendStreamReq{ diff --git a/common/pkgs/ioswitch2/hub_worker.go b/common/pkgs/ioswitch2/hub_worker.go index 35a17fa..ee35b04 100644 --- a/common/pkgs/ioswitch2/hub_worker.go +++ b/common/pkgs/ioswitch2/hub_worker.go @@ -46,9 +46,13 @@ type HubWorkerClient struct { cli *hubrpc.Client } -func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { - _, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan}) - return err.ToError() +func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) (exec.ExecutorResult, error) { + resp, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan}) + if err != nil { + return exec.ExecutorResult{}, err.ToError() + } + + return resp.Result, nil } func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { _, err := c.cli.SendIOStream(ctx, &hubrpc.SendIOStream{ diff --git a/common/pkgs/ioswitch2/parser/opt/misc.go b/common/pkgs/ioswitch2/parser/opt/misc.go index 0cf54fb..c444cc2 100644 --- a/common/pkgs/ioswitch2/parser/opt/misc.go +++ b/common/pkgs/ioswitch2/parser/opt/misc.go @@ -65,7 +65,7 @@ func StoreShardWriteResult(ctx *state.GenerateState) { } storeNode := ctx.DAG.NewStore() - storeNode.Env().ToEnvDriver(true) + storeNode.Env().CopyFrom(n.Env()) storeNode.Store(n.ShardInfoKey, n.ShardInfoVar().Var()) return true diff --git a/common/pkgs/ioswitchlrc/hub_worker.go b/common/pkgs/ioswitchlrc/hub_worker.go index e9b6940..c557dd1 100644 --- a/common/pkgs/ioswitchlrc/hub_worker.go +++ b/common/pkgs/ioswitchlrc/hub_worker.go @@ -41,9 +41,13 @@ type HubWorkerClient struct { cli *hubrpc.Client } -func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { - _, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan}) - return err.ToError() +func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) (exec.ExecutorResult, error) { + resp, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan}) + if err != nil { + return exec.ExecutorResult{}, err.ToError() + } + + return resp.Result, nil } func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { _, err := c.cli.SendIOStream(ctx, &hubrpc.SendIOStream{ diff --git a/common/pkgs/rpc/hub/ioswitch.go b/common/pkgs/rpc/hub/ioswitch.go index 820dc02..b761acc 100644 --- a/common/pkgs/rpc/hub/ioswitch.go +++ b/common/pkgs/rpc/hub/ioswitch.go @@ -20,7 +20,9 @@ type IOSwitchSvc interface { type ExecuteIOPlan struct { Plan exec.Plan } -type ExecuteIOPlanResp struct{} +type ExecuteIOPlanResp struct { + Result exec.ExecutorResult +} var _ = TokenAuth(Hub_ExecuteIOPlan_FullMethodName) diff --git a/hub/sdk/api/hub_io.go b/hub/sdk/api/hub_io.go index 1830ec5..200f8b4 100644 --- a/hub/sdk/api/hub_io.go +++ b/hub/sdk/api/hub_io.go @@ -129,35 +129,38 @@ const ExecuteIOPlanPath = "/hubIO/executeIOPlan" type ExecuteIOPlanReq struct { Plan exec.Plan `json:"plan"` } +type ExecuteIOPlanResp struct { + Result exec.ExecutorResult `json:"result"` +} -func (c *Client) ExecuteIOPlan(req ExecuteIOPlanReq) error { +func (c *Client) ExecuteIOPlan(req ExecuteIOPlanReq) (*ExecuteIOPlanResp, error) { targetUrl, err := url.JoinPath(c.cfg.URL, ExecuteIOPlanPath) if err != nil { - return err + return nil, err } body, err := serder.ObjectToJSONEx(req) if err != nil { - return fmt.Errorf("request to json: %w", err) + return nil, fmt.Errorf("request to json: %w", err) } resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ Body: body, }) if err != nil { - return err + return nil, err } - codeResp, err := ParseJSONResponse[response[any]](resp) + codeResp, err := ParseJSONResponse[response[ExecuteIOPlanResp]](resp) if err != nil { - return err + return nil, err } if codeResp.Code == errorcode.OK { - return nil + return &codeResp.Data, nil } - return codeResp.ToError() + return nil, codeResp.ToError() } const SendVarPath = "/hubIO/sendVar"