From 3d1047c55be7d4ca172a2ff6fb662028aa668c3c Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 26 Aug 2025 14:52:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=B0=83=E5=BA=A6=E6=97=B6?= =?UTF-8?q?=E7=9A=84=E4=B8=8B=E8=BD=BD=E9=87=8F=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/services/user_space.go | 26 +++++++ common/pkgs/ioswitch/plan/compile.go | 18 +++-- common/pkgs/ioswitch/plan/ops/send.go | 67 +++++++++++-------- common/pkgs/ioswitch2/fromto.go | 2 + common/pkgs/ioswitch2/ops2/base_store.go | 16 ++++- common/pkgs/ioswitch2/parser/gen/generator.go | 4 +- common/pkgs/ioswitch2/parser/parser.go | 4 +- .../ioswitch2/plans/complete_multipart.go | 4 +- common/pkgs/ioswitchlrc/parser/generator.go | 7 +- 9 files changed, 103 insertions(+), 45 deletions(-) diff --git a/client/internal/services/user_space.go b/client/internal/services/user_space.go index a188fff..60f5b04 100644 --- a/client/internal/services/user_space.go +++ b/client/internal/services/user_space.go @@ -3,13 +3,16 @@ package services import ( "context" "fmt" + "strconv" "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/plan/ops" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" + "gitlink.org.cn/cloudream/jcs-pub/common/types/datamap" "gorm.io/gorm" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" @@ -277,6 +280,9 @@ func (svc *UserSpaceService) DownloadPackage(req cliapi.UserSpaceDownloadPackage }) } + // 单独统计每一个对象的下载速度信息 + ft.StatsCtx = fmt.Sprintf("%v", dIndex) + err = parser.Parse(ft, plans) if err != nil { return fmt.Errorf("parse plan: %w", err) @@ -288,6 +294,7 @@ func (svc *UserSpaceService) DownloadPackage(req cliapi.UserSpaceDownloadPackage for _, obj := range details { svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, req.PackageID, req.UserSpaceID, 1) } + exeCtx := exec.NewExecContext() exec.SetValueByType(exeCtx, svc.StgPool) drv := plans.Execute(exeCtx) @@ -296,9 +303,28 @@ func (svc *UserSpaceService) DownloadPackage(req cliapi.UserSpaceDownloadPackage return err } + // 统计下载速度 + trans := make(map[int]int64) + elapseds := make(map[int]time.Duration) for _, v := range ret.GetArray(ops2.BaseReadStatsStoreKey) { v2 := v.(*ops2.BaseReadStatsValue) svc.SpeedStats.Record(v2.Length, v2.ElapsedTime, v2.Location.IsDriver) + idx, _ := strconv.Atoi(v2.StatsCtx) + trans[idx] += v2.Length + elapseds[idx] = v2.ElapsedTime + } + for _, v := range ret.GetArray(ops.SendStreamStatsStoreKey) { + v2 := v.(*ops.SendStreamStatsValue) + idx, _ := strconv.Atoi(v2.StatsCtx) + trans[idx] += v2.Length + } + for idx, len := range trans { + svc.EvtPub.Publish(&datamap.BodyObjectAccessStats{ + ObjectID: details[idx].Object.ObjectID, + RequestSize: details[idx].Object.Size, + TransferAmount: len, + ElapsedTime: elapseds[idx], + }) } err = svc.DB.DoTx(func(tx db.SQLContext) error { diff --git a/common/pkgs/ioswitch/plan/compile.go b/common/pkgs/ioswitch/plan/compile.go index 7e5fb2a..4529ed4 100644 --- a/common/pkgs/ioswitch/plan/compile.go +++ b/common/pkgs/ioswitch/plan/compile.go @@ -6,14 +6,18 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/plan/ops" ) -func Compile(graph *dag.Graph, planBld *exec.PlanBuilder) error { +type CompileOption struct { + StatsCtx string +} + +func Compile(graph *dag.Graph, planBld *exec.PlanBuilder, opt CompileOption) error { myGraph := &ops.GraphNodeBuilder{graph} - generateSend(myGraph) - return buildPlan(graph, planBld) + generateSend(myGraph, opt) + return buildPlan(graph, planBld, opt) } // 生成Send指令 -func generateSend(graph *ops.GraphNodeBuilder) { +func generateSend(graph *ops.GraphNodeBuilder, opt CompileOption) { graph.Walk(func(node dag.Node) bool { switch node.(type) { case *ops.SendStreamNode: @@ -41,7 +45,7 @@ func generateSend(graph *ops.GraphNodeBuilder) { // // 如果是要送到Driver,则只能由Driver主动去拉取 dstNode := out.Dst.Get(0) - getNode := graph.NewGetStream(node.Env().Worker) + getNode := graph.NewGetStream(node.Env().Worker, opt.StatsCtx) getNode.Env().ToEnvDriver(true) // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 @@ -61,7 +65,7 @@ func generateSend(graph *ops.GraphNodeBuilder) { case dag.EnvWorker: // 如果是要送到Agent,则可以直接发送 dstNode := out.Dst.Get(0) - n := graph.NewSendStream(to.Env().Worker) + n := graph.NewSendStream(to.Env().Worker, opt.StatsCtx) *n.Env() = *node.Env() out.Dst.RemoveAt(0) @@ -119,7 +123,7 @@ func generateSend(graph *ops.GraphNodeBuilder) { } // 生成Plan -func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { +func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder, opt CompileOption) error { var retErr error graph.Walk(func(node dag.Node) bool { for i := 0; i < node.OutputStreams().Len(); i++ { diff --git a/common/pkgs/ioswitch/plan/ops/send.go b/common/pkgs/ioswitch/plan/ops/send.go index 209f688..de7f712 100644 --- a/common/pkgs/ioswitch/plan/ops/send.go +++ b/common/pkgs/ioswitch/plan/ops/send.go @@ -23,11 +23,12 @@ func init() { } type SendStreamStatsValue struct { - IsSend bool - Length int64 - Time time.Duration - Src exec.Location - Dst exec.Location + StatsCtx string + IsSend bool + Length int64 + Time time.Duration + Src exec.Location + Dst exec.Location } func (v *SendStreamStatsValue) Clone() exec.VarValue { @@ -36,9 +37,10 @@ func (v *SendStreamStatsValue) Clone() exec.VarValue { } type SendStream struct { - Input exec.VarID - Send exec.VarID - Worker exec.WorkerInfo + Input exec.VarID + Send exec.VarID + Worker exec.WorkerInfo + StatsCtx string } func (o *SendStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { @@ -65,10 +67,11 @@ func (o *SendStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } e.Store(SendStreamStatsStoreKey, &SendStreamStatsValue{ - IsSend: true, - Length: counter.Count(), - Time: time.Since(startTime), - Src: e.Location(), + StatsCtx: o.StatsCtx, + IsSend: true, + Length: counter.Count(), + Time: time.Since(startTime), + Src: e.Location(), Dst: exec.Location{ IsDriver: false, WorkerName: o.Worker.Name(), @@ -83,10 +86,11 @@ func (o *SendStream) String() string { } type GetStream struct { - Signal exec.SignalVar `json:"signal"` - Target exec.VarID `json:"target"` - Output exec.VarID `json:"output"` - Worker exec.WorkerInfo `json:"worker"` + Signal exec.SignalVar + Target exec.VarID + Output exec.VarID + Worker exec.WorkerInfo + StatsCtx string } func (o *GetStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { @@ -113,9 +117,10 @@ func (o *GetStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { err = fut.Wait(ctx.Context) e.Store(SendStreamStatsStoreKey, &SendStreamStatsValue{ - IsSend: false, - Length: counter.Count(), - Time: time.Since(startTime), + StatsCtx: o.StatsCtx, + IsSend: false, + Length: counter.Count(), + Time: time.Since(startTime), Src: exec.Location{ IsDriver: false, WorkerName: o.Worker.Name(), @@ -191,11 +196,13 @@ func (o *GetVar) String() string { type SendStreamNode struct { dag.NodeBase ToWorker exec.WorkerInfo + StatsCtx string } -func (b *GraphNodeBuilder) NewSendStream(to exec.WorkerInfo) *SendStreamNode { +func (b *GraphNodeBuilder) NewSendStream(to exec.WorkerInfo, statsCtx string) *SendStreamNode { node := &SendStreamNode{ ToWorker: to, + StatsCtx: statsCtx, } b.AddNode(node) @@ -211,9 +218,10 @@ func (t *SendStreamNode) Send(v *dag.StreamVar) *dag.StreamVar { func (t *SendStreamNode) GenerateOp() (exec.Op, error) { return &SendStream{ - Input: t.InputStreams().Get(0).VarID, - Send: t.OutputStreams().Get(0).VarID, - Worker: t.ToWorker, + Input: t.InputStreams().Get(0).VarID, + Send: t.OutputStreams().Get(0).VarID, + Worker: t.ToWorker, + StatsCtx: t.StatsCtx, }, nil } @@ -257,11 +265,13 @@ func (t *SendValueNode) GenerateOp() (exec.Op, error) { type GetStreamNode struct { dag.NodeBase FromWorker exec.WorkerInfo + StatsCtx string } -func (b *GraphNodeBuilder) NewGetStream(from exec.WorkerInfo) *GetStreamNode { +func (b *GraphNodeBuilder) NewGetStream(from exec.WorkerInfo, statsCtx string) *GetStreamNode { node := &GetStreamNode{ FromWorker: from, + StatsCtx: statsCtx, } b.AddNode(node) @@ -282,10 +292,11 @@ func (t *GetStreamNode) SignalVar() *dag.ValueVar { func (t *GetStreamNode) GenerateOp() (exec.Op, error) { return &GetStream{ - Signal: exec.NewSignalVar(t.OutputValues().Get(0).VarID), - Output: t.OutputStreams().Get(0).VarID, - Target: t.InputStreams().Get(0).VarID, - Worker: t.FromWorker, + Signal: exec.NewSignalVar(t.OutputValues().Get(0).VarID), + Output: t.OutputStreams().Get(0).VarID, + Target: t.InputStreams().Get(0).VarID, + Worker: t.FromWorker, + StatsCtx: t.StatsCtx, }, nil } diff --git a/common/pkgs/ioswitch2/fromto.go b/common/pkgs/ioswitch2/fromto.go index 3cd0e0e..0f13b26 100644 --- a/common/pkgs/ioswitch2/fromto.go +++ b/common/pkgs/ioswitch2/fromto.go @@ -74,6 +74,8 @@ type FromTo struct { SegmentParam *jcstypes.SegmentRedundancy Froms []From Toes []To + // 用于识别不同的统计信息对应的上下文 + StatsCtx string } func NewFromTo() FromTo { diff --git a/common/pkgs/ioswitch2/ops2/base_store.go b/common/pkgs/ioswitch2/ops2/base_store.go index 4d01210..a9088d8 100644 --- a/common/pkgs/ioswitch2/ops2/base_store.go +++ b/common/pkgs/ioswitch2/ops2/base_store.go @@ -28,6 +28,7 @@ func init() { } type BaseReadStatsValue struct { + StatsCtx string Length int64 ElapsedTime time.Duration Location exec.Location @@ -35,6 +36,7 @@ type BaseReadStatsValue struct { func (v *BaseReadStatsValue) Clone() exec.VarValue { return &BaseReadStatsValue{ + StatsCtx: v.StatsCtx, Length: v.Length, ElapsedTime: v.ElapsedTime, Location: v.Location, @@ -46,6 +48,7 @@ type BaseRead struct { UserSpace jcstypes.UserSpaceDetail Path jcstypes.JPath Option stgtypes.OpenOption + StatsCtx string } func (o *BaseRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { @@ -84,6 +87,7 @@ func (o *BaseRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { e.PutVar(o.Output, output) err = fut.Wait(ctx.Context) e.Store(BaseReadStatsStoreKey, &BaseReadStatsValue{ + StatsCtx: o.StatsCtx, Length: counter.Count(), ElapsedTime: time.Since(startTime), Location: e.Location(), @@ -100,6 +104,7 @@ type BaseReadDyn struct { Output exec.VarID FileInfo exec.VarID Option stgtypes.OpenOption + StatsCtx string } func (o *BaseReadDyn) Execute(ctx *exec.ExecContext, e *exec.Executor) error { @@ -147,6 +152,7 @@ func (o *BaseReadDyn) Execute(ctx *exec.ExecContext, e *exec.Executor) error { Length: counter.Count(), ElapsedTime: time.Since(startTime), Location: e.Location(), + StatsCtx: o.StatsCtx, }) return err } @@ -206,14 +212,16 @@ type BaseReadNode struct { UserSpace jcstypes.UserSpaceDetail Path jcstypes.JPath Option stgtypes.OpenOption + StatsCtx string } -func (b *GraphNodeBuilder) NewBaseRead(from ioswitch2.From, userSpace jcstypes.UserSpaceDetail, path jcstypes.JPath, opt stgtypes.OpenOption) *BaseReadNode { +func (b *GraphNodeBuilder) NewBaseRead(from ioswitch2.From, userSpace jcstypes.UserSpaceDetail, path jcstypes.JPath, opt stgtypes.OpenOption, statsCtx string) *BaseReadNode { node := &BaseReadNode{ From: from, UserSpace: userSpace, Path: path, Option: opt, + StatsCtx: statsCtx, } b.AddNode(node) @@ -238,6 +246,7 @@ func (t *BaseReadNode) GenerateOp() (exec.Op, error) { UserSpace: t.UserSpace, Path: t.Path, Option: t.Option, + StatsCtx: t.StatsCtx, }, nil } @@ -246,13 +255,15 @@ type BaseReadDynNode struct { From ioswitch2.From UserSpace jcstypes.UserSpaceDetail Option stgtypes.OpenOption + StatsCtx string } -func (b *GraphNodeBuilder) NewBaseReadDyn(from ioswitch2.From, userSpace jcstypes.UserSpaceDetail, opt stgtypes.OpenOption) *BaseReadDynNode { +func (b *GraphNodeBuilder) NewBaseReadDyn(from ioswitch2.From, userSpace jcstypes.UserSpaceDetail, opt stgtypes.OpenOption, statsCtx string) *BaseReadDynNode { node := &BaseReadDynNode{ From: from, UserSpace: userSpace, Option: opt, + StatsCtx: statsCtx, } b.AddNode(node) @@ -285,6 +296,7 @@ func (t *BaseReadDynNode) GenerateOp() (exec.Op, error) { Output: t.Output().Var().VarID, FileInfo: t.FileInfoSlot().Var().VarID, Option: t.Option, + StatsCtx: t.StatsCtx, }, nil } diff --git a/common/pkgs/ioswitch2/parser/gen/generator.go b/common/pkgs/ioswitch2/parser/gen/generator.go index c2069d7..7977379 100644 --- a/common/pkgs/ioswitch2/parser/gen/generator.go +++ b/common/pkgs/ioswitch2/parser/gen/generator.go @@ -261,7 +261,7 @@ func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, e getShard := ctx.DAG.NewGetShardInfo(f.UserSpace, f.FileHash) getShard.Env().ToEnvDriver(true) - read := ctx.DAG.NewBaseReadDyn(f, f.UserSpace, stgtypes.DefaultOpen()) + read := ctx.DAG.NewBaseReadDyn(f, f.UserSpace, stgtypes.DefaultOpen(), ctx.Ft.StatsCtx) getShard.FileInfoVar().ToSlot(read.FileInfoSlot()) @@ -332,7 +332,7 @@ func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, e case *ioswitch2.FromBaseStore: // TODO 可以考虑支持设置读取范围 - n := ctx.DAG.NewBaseRead(f, f.UserSpace, f.Path, stgtypes.DefaultOpen()) + n := ctx.DAG.NewBaseRead(f, f.UserSpace, f.Path, stgtypes.DefaultOpen(), ctx.Ft.StatsCtx) if err := setEnvBySpace(n, &f.UserSpace); err != nil { return nil, fmt.Errorf("set node env by user space: %w", err) } diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index 9a6485e..a396a4a 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -85,5 +85,7 @@ func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error { opt.GenerateRange(state) opt.GenerateClone(state) - return plan.Compile(state.DAG.Graph, blder) + return plan.Compile(state.DAG.Graph, blder, plan.CompileOption{ + StatsCtx: ft.StatsCtx, + }) } diff --git a/common/pkgs/ioswitch2/plans/complete_multipart.go b/common/pkgs/ioswitch2/plans/complete_multipart.go index 84f5f33..de596e9 100644 --- a/common/pkgs/ioswitch2/plans/complete_multipart.go +++ b/common/pkgs/ioswitch2/plans/complete_multipart.go @@ -27,7 +27,7 @@ func CompleteMultipart(blocks []jcstypes.ObjectBlock, blockSpaces []jcstypes.Use gs := da.NewGetShardInfo(blockSpaces[i], blk.FileHash) gs.Env().ToEnvDriver(true) - br := da.NewBaseReadDyn(nil, blockSpaces[i], stgtypes.DefaultOpen()) + br := da.NewBaseReadDyn(nil, blockSpaces[i], stgtypes.DefaultOpen(), "") if err := setEnvBySpace(br, &blockSpaces[i]); err != nil { return fmt.Errorf("set node env by user space: %w", err) } @@ -54,7 +54,7 @@ func CompleteMultipart(blocks []jcstypes.ObjectBlock, blockSpaces []jcstypes.Use store.Store(shardInfoKey, as.ShardInfoVar().Var()) } - err := plan.Compile(da.Graph, blder) + err := plan.Compile(da.Graph, blder, plan.CompileOption{}) if err != nil { return err } diff --git a/common/pkgs/ioswitchlrc/parser/generator.go b/common/pkgs/ioswitchlrc/parser/generator.go index 0802bff..d25f597 100644 --- a/common/pkgs/ioswitchlrc/parser/generator.go +++ b/common/pkgs/ioswitchlrc/parser/generator.go @@ -49,7 +49,8 @@ func Encode(fr ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) generateClone(&ctx) generateRange(&ctx) - return plan.Compile(ctx.DAG.Graph, blder) + // TODO 设置StatsCtx + return plan.Compile(ctx.DAG.Graph, blder, plan.CompileOption{}) } func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlrc.To) error { @@ -146,7 +147,7 @@ func ReconstructAny(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.P generateClone(&ctx) generateRange(&ctx) - return plan.Compile(ctx.DAG.Graph, blder) + return plan.Compile(ctx.DAG.Graph, blder, plan.CompileOption{}) } func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error { @@ -267,7 +268,7 @@ func ReconstructGroup(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec generateClone(&ctx) generateRange(&ctx) - return plan.Compile(ctx.DAG.Graph, blder) + return plan.Compile(ctx.DAG.Graph, blder, plan.CompileOption{}) } func buildDAGReconstructGroup(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error {