From bf0955d5698aa7ffb7cf3483bc3ef2275d6a139d Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 3 Apr 2025 11:13:01 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=81=E7=A7=BB=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/uploader/create_load.go | 133 +++++++++++ client/internal/uploader/update.go | 154 +++++++++++++ client/internal/uploader/uploader.go | 287 ++++++++++++++++++++++++ 3 files changed, 574 insertions(+) create mode 100644 client/internal/uploader/create_load.go create mode 100644 client/internal/uploader/update.go create mode 100644 client/internal/uploader/uploader.go diff --git a/client/internal/uploader/create_load.go b/client/internal/uploader/create_load.go new file mode 100644 index 0000000..48df400 --- /dev/null +++ b/client/internal/uploader/create_load.go @@ -0,0 +1,133 @@ +package uploader + +import ( + "context" + "fmt" + "io" + "path" + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage2/client/types" + stgglb "gitlink.org.cn/cloudream/storage2/common/globals" + "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" + coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" +) + +type CreateLoadUploader struct { + pkg cdssdk.Package + userID cdssdk.UserID + targetSpaces []types.UserSpaceDetail + loadRoots []string + uploader *Uploader + distlock *distlock.Mutex + successes []coormq.AddObjectEntry + lock sync.Mutex + commited bool +} + +type CreateLoadResult struct { + Package cdssdk.Package + Objects map[string]cdssdk.Object +} + +func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) error { + uploadTime := time.Now() + spaceIDs := make([]cdssdk.StorageID, 0, len(u.targetSpaces)) + + ft := ioswitch2.FromTo{} + fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) + ft.AddFrom(fromExec) + for i, space := range u.targetSpaces { + ft.AddTo(ioswitch2.NewToShardStore(*space.MasterHub, space, ioswitch2.RawStream(), "fileHash")) + ft.AddTo(ioswitch2.NewLoadToPublic(*space.MasterHub, space, path.Join(u.loadRoots[i], pa))) + spaceIDs = append(spaceIDs, space.Storage.StorageID) + } + + plans := exec.NewPlanBuilder() + err := parser.Parse(ft, plans) + if err != nil { + return fmt.Errorf("parsing plan: %w", err) + } + + exeCtx := exec.NewExecContext() + exec.SetValueByType(exeCtx, u.uploader.stgAgts) + exec := plans.Execute(exeCtx) + exec.BeginWrite(io.NopCloser(stream), hd) + ret, err := exec.Wait(context.TODO()) + if err != nil { + return fmt.Errorf("executing plan: %w", err) + } + + u.lock.Lock() + defer u.lock.Unlock() + + // 记录上传结果 + fileHash := ret["fileHash"].(*ops2.ShardInfoValue).Hash + u.successes = append(u.successes, coormq.AddObjectEntry{ + Path: pa, + Size: size, + FileHash: fileHash, + UploadTime: uploadTime, + StorageIDs: spaceIDs, + }) + return nil +} + +func (u *CreateLoadUploader) Commit() (CreateLoadResult, error) { + u.lock.Lock() + defer u.lock.Unlock() + + if u.commited { + return CreateLoadResult{}, fmt.Errorf("package already commited") + } + u.commited = true + + defer u.distlock.Unlock() + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return CreateLoadResult{}, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(u.pkg.PackageID, u.successes)) + if err != nil { + return CreateLoadResult{}, fmt.Errorf("updating package: %w", err) + } + + ret := CreateLoadResult{ + Package: u.pkg, + Objects: make(map[string]cdssdk.Object), + } + + for _, entry := range updateResp.Added { + ret.Objects[entry.Path] = entry + } + + for i, stg := range u.targetSpaces { + // 不关注是否成功 + coorCli.StoragePackageLoaded(coormq.ReqStoragePackageLoaded(u.userID, stg.Storage.StorageID, u.pkg.PackageID, u.loadRoots[i], nil)) + } + + return ret, nil +} + +func (u *CreateLoadUploader) Abort() { + u.lock.Lock() + defer u.lock.Unlock() + + if u.commited { + return + } + u.commited = true + + u.distlock.Unlock() + + // TODO 可以考虑删除PackageID +} diff --git a/client/internal/uploader/update.go b/client/internal/uploader/update.go new file mode 100644 index 0000000..4d628b7 --- /dev/null +++ b/client/internal/uploader/update.go @@ -0,0 +1,154 @@ +package uploader + +import ( + "context" + "fmt" + "io" + "path" + "sync" + "time" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgglb "gitlink.org.cn/cloudream/storage2/common/globals" + stgmod "gitlink.org.cn/cloudream/storage2/common/models" + "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" + coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" +) + +type UpdateUploader struct { + uploader *Uploader + pkgID cdssdk.PackageID + targetStg stgmod.StorageDetail + distMutex *distlock.Mutex + loadToStgs []stgmod.StorageDetail + loadToPath []string + successes []coormq.AddObjectEntry + lock sync.Mutex + commited bool +} + +type UploadStorageInfo struct { + Storage stgmod.StorageDetail + Delay time.Duration + IsSameLocation bool +} + +type UpdateResult struct { + // 上传成功的文件列表,Key为Path + Objects map[string]cdssdk.Object +} + +func (w *UpdateUploader) Upload(pat string, stream io.Reader) error { + uploadTime := time.Now() + + ft := ioswitch2.NewFromTo() + fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) + ft.AddFrom(fromExec). + AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "shardInfo")) + + for i, stg := range w.loadToStgs { + ft.AddTo(ioswitch2.NewLoadToPublic(*stg.MasterHub, stg, path.Join(w.loadToPath[i], pat))) + } + + plans := exec.NewPlanBuilder() + err := parser.Parse(ft, plans) + if err != nil { + return fmt.Errorf("parsing plan: %w", err) + } + + exeCtx := exec.NewExecContext() + exec.SetValueByType(exeCtx, w.uploader.stgAgts) + exec := plans.Execute(exeCtx) + exec.BeginWrite(io.NopCloser(stream), hd) + ret, err := exec.Wait(context.TODO()) + if err != nil { + return fmt.Errorf("executing plan: %w", err) + } + + w.lock.Lock() + defer w.lock.Unlock() + + // 记录上传结果 + shardInfo := ret["shardInfo"].(*ops2.ShardInfoValue) + w.successes = append(w.successes, coormq.AddObjectEntry{ + Path: pat, + Size: shardInfo.Size, + FileHash: shardInfo.Hash, + UploadTime: uploadTime, + StorageIDs: []cdssdk.StorageID{w.targetStg.Storage.StorageID}, + }) + return nil +} + +// 取消上传对象。必须在对象调用了Upload之后调用。 +func (w *UpdateUploader) CancelObject(path string) { + w.lock.Lock() + defer w.lock.Unlock() + + w.successes = lo.Reject(w.successes, func(e coormq.AddObjectEntry, i int) bool { + return e.Path == path + }) +} + +// 重命名对象。必须在对象调用了Upload之后调用。不会检查新路径是否已经存在,需由调用方去做保证。 +func (w *UpdateUploader) RenameObject(path string, newPath string) { + w.lock.Lock() + defer w.lock.Unlock() + + for i := range w.successes { + if w.successes[i].Path == path { + w.successes[i].Path = newPath + break + } + } +} + +func (w *UpdateUploader) Commit() (UpdateResult, error) { + w.lock.Lock() + defer w.lock.Unlock() + + if w.commited { + return UpdateResult{}, fmt.Errorf("package already commited") + } + w.commited = true + + defer w.distMutex.Unlock() + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return UpdateResult{}, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(w.pkgID, w.successes)) + if err != nil { + return UpdateResult{}, fmt.Errorf("updating package: %w", err) + } + + ret := UpdateResult{ + Objects: make(map[string]cdssdk.Object), + } + + for _, entry := range updateResp.Added { + ret.Objects[entry.Path] = entry + } + + return ret, nil +} + +func (w *UpdateUploader) Abort() { + w.lock.Lock() + defer w.lock.Unlock() + + if w.commited { + return + } + + w.commited = true + w.distMutex.Unlock() +} diff --git a/client/internal/uploader/uploader.go b/client/internal/uploader/uploader.go new file mode 100644 index 0000000..239bcc5 --- /dev/null +++ b/client/internal/uploader/uploader.go @@ -0,0 +1,287 @@ +package uploader + +import ( + "context" + "fmt" + "io" + "math" + "math/rand" + "time" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/utils/sort2" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/db" + cdssdk "gitlink.org.cn/cloudream/storage2/client/types" + stgglb "gitlink.org.cn/cloudream/storage2/common/globals" + stgmod "gitlink.org.cn/cloudream/storage2/common/models" + "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" + "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock/reqbuilder" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2" + "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" + "gitlink.org.cn/cloudream/storage2/common/pkgs/metacache" + coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory" +) + +type Uploader struct { + distlock *distlock.Service + connectivity *connectivity.Collector + stgAgts *agtpool.AgentPool + stgMeta *metacache.UserSpaceMeta + db *db.DB +} + +func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgAgts *agtpool.AgentPool, stgMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader { + return &Uploader{ + distlock: distlock, + connectivity: connectivity, + stgAgts: stgAgts, + stgMeta: stgMeta, + } +} + +func (u *Uploader) BeginUpdate(pkgID cdssdk.PackageID, affinity cdssdk.UserSpaceID, loadTo []cdssdk.UserSpaceID, loadToPath []string) (*UpdateUploader, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getUserStgsResp, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(userID)) + if err != nil { + return nil, fmt.Errorf("getting user storages: %w", err) + } + + cons := u.connectivity.GetAll() + var userStgs []UploadStorageInfo + for _, stg := range getUserStgsResp.Storages { + if stg.MasterHub == nil { + continue + } + + delay := time.Duration(math.MaxInt64) + + con, ok := cons[stg.MasterHub.HubID] + if ok && con.Latency != nil { + delay = *con.Latency + } + + userStgs = append(userStgs, UploadStorageInfo{ + Storage: stg, + Delay: delay, + IsSameLocation: stg.MasterHub.LocationID == stgglb.Local.LocationID, + }) + } + + if len(userStgs) == 0 { + return nil, fmt.Errorf("user no available storages") + } + + loadToStgs := make([]stgmod.StorageDetail, len(loadTo)) + for i, stgID := range loadTo { + stg, ok := lo.Find(getUserStgsResp.Storages, func(stg stgmod.StorageDetail) bool { + return stg.Storage.StorageID == stgID + }) + if !ok { + return nil, fmt.Errorf("load to storage %v not found", stgID) + } + if stg.MasterHub == nil { + return nil, fmt.Errorf("load to storage %v has no master hub", stgID) + } + if !factory.GetBuilder(stg).PublicStoreDesc().Enabled() { + return nil, fmt.Errorf("load to storage %v has no public store", stgID) + } + + loadToStgs[i] = stg + } + + target := u.chooseUploadStorage(userStgs, affinity) + + // 给上传节点的IPFS加锁 + // TODO 考虑加Object的Create锁 + // 防止上传的副本被清除 + distMutex, err := reqbuilder.NewBuilder().Shard().Buzy(target.Storage.Storage.StorageID).MutexLock(u.distlock) + if err != nil { + return nil, fmt.Errorf("acquire distlock: %w", err) + } + + return &UpdateUploader{ + uploader: u, + pkgID: pkgID, + targetStg: target.Storage, + distMutex: distMutex, + loadToStgs: loadToStgs, + loadToPath: loadToPath, + }, nil +} + +// chooseUploadStorage 选择一个上传文件的节点 +// 1. 选择设置了亲和性的节点 +// 2. 从与当前客户端相同地域的节点中随机选一个 +// 3. 没有的话从所有节点选择延迟最低的节点 +func (w *Uploader) chooseUploadStorage(storages []UploadStorageInfo, stgAffinity cdssdk.StorageID) UploadStorageInfo { + if stgAffinity > 0 { + aff, ok := lo.Find(storages, func(storage UploadStorageInfo) bool { return storage.Storage.Storage.StorageID == stgAffinity }) + if ok { + return aff + } + } + + sameLocationStorages := lo.Filter(storages, func(e UploadStorageInfo, i int) bool { return e.IsSameLocation }) + if len(sameLocationStorages) > 0 { + return sameLocationStorages[rand.Intn(len(sameLocationStorages))] + } + + // 选择延迟最低的节点 + storages = sort2.Sort(storages, func(e1, e2 UploadStorageInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) }) + + return storages[0] +} + +func (u *Uploader) BeginCreateLoad(userID cdssdk.UserID, bktID cdssdk.BucketID, pkgName string, loadTo []cdssdk.StorageID, loadToPath []string) (*CreateLoadUploader, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getStgs := u.stgMeta.GetMany(loadTo) + + targetStgs := make([]stgmod.StorageDetail, len(loadTo)) + for i, stg := range getStgs { + if stg == nil { + return nil, fmt.Errorf("storage %v not found", loadTo[i]) + } + targetStgs[i] = *stg + } + + createPkg, err := coorCli.CreatePackage(coormq.NewCreatePackage(userID, bktID, pkgName)) + if err != nil { + return nil, fmt.Errorf("create package: %w", err) + } + + reqBld := reqbuilder.NewBuilder() + for _, stg := range targetStgs { + reqBld.Shard().Buzy(stg.Storage.StorageID) + reqBld.Storage().Buzy(stg.Storage.StorageID) + } + lock, err := reqBld.MutexLock(u.distlock) + if err != nil { + return nil, fmt.Errorf("acquire distlock: %w", err) + } + + return &CreateLoadUploader{ + pkg: createPkg.Package, + userID: userID, + targetSpaces: targetStgs, + loadRoots: loadToPath, + uploader: u, + distlock: lock, + }, nil +} + +func (u *Uploader) UploadPart(userID cdssdk.UserID, objID cdssdk.ObjectID, index int, stream io.Reader) error { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + details, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails([]cdssdk.ObjectID{objID})) + if err != nil { + return err + } + + if details.Objects[0] == nil { + return fmt.Errorf("object %v not found", objID) + } + + objDe := details.Objects[0] + _, ok := objDe.Object.Redundancy.(*cdssdk.MultipartUploadRedundancy) + if !ok { + return fmt.Errorf("object %v is not a multipart upload", objID) + } + + var stg stgmod.StorageDetail + if len(objDe.Blocks) > 0 { + cstg := u.stgMeta.Get(objDe.Blocks[0].StorageID) + if cstg == nil { + return fmt.Errorf("storage %v not found", objDe.Blocks[0].StorageID) + } + + stg = *cstg + + } else { + getUserStgsResp, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(userID)) + if err != nil { + return fmt.Errorf("getting user storages: %w", err) + } + + cons := u.connectivity.GetAll() + var userStgs []UploadStorageInfo + for _, stg := range getUserStgsResp.Storages { + if stg.MasterHub == nil { + continue + } + + delay := time.Duration(math.MaxInt64) + + con, ok := cons[stg.MasterHub.HubID] + if ok && con.Latency != nil { + delay = *con.Latency + } + + userStgs = append(userStgs, UploadStorageInfo{ + Storage: stg, + Delay: delay, + IsSameLocation: stg.MasterHub.LocationID == stgglb.Local.LocationID, + }) + } + + if len(userStgs) == 0 { + return fmt.Errorf("user no available storages") + } + + stg = u.chooseUploadStorage(userStgs, 0).Storage + } + + lock, err := reqbuilder.NewBuilder().Shard().Buzy(stg.Storage.StorageID).MutexLock(u.distlock) + if err != nil { + return fmt.Errorf("acquire distlock: %w", err) + } + defer lock.Unlock() + + ft := ioswitch2.NewFromTo() + fromDrv, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) + ft.AddFrom(fromDrv). + AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg, ioswitch2.RawStream(), "shard")) + + plans := exec.NewPlanBuilder() + err = parser.Parse(ft, plans) + if err != nil { + return fmt.Errorf("parse fromto: %w", err) + } + + exeCtx := exec.NewExecContext() + exec.SetValueByType(exeCtx, u.stgAgts) + exec := plans.Execute(exeCtx) + exec.BeginWrite(io.NopCloser(stream), hd) + ret, err := exec.Wait(context.TODO()) + if err != nil { + return fmt.Errorf("executing plan: %w", err) + } + + shardInfo := ret["shard"].(*ops2.ShardInfoValue) + _, err = coorCli.AddMultipartUploadPart(coormq.ReqAddMultipartUploadPart(userID, objID, stgmod.ObjectBlock{ + ObjectID: objID, + Index: index, + StorageID: stg.Storage.StorageID, + FileHash: shardInfo.Hash, + Size: shardInfo.Size, + })) + return err +}