package uploader import ( "context" "fmt" "math" "strings" "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/reqbuilder" corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath string, targetBktID clitypes.BucketID, newPkgName string, uploadAffinity clitypes.UserSpaceID) (*clitypes.Package, error) { srcSpace := u.spaceMeta.Get(userSpaceID) if srcSpace == nil { return nil, fmt.Errorf("user space %d not found", userSpaceID) } pkg, err := db.DoTx01(u.db, func(tx db.SQLContext) (clitypes.Package, error) { _, err := u.db.Bucket().GetByID(tx, targetBktID) if err != nil { return clitypes.Package{}, err } return u.db.Package().Create(tx, targetBktID, newPkgName, time.Now()) }) if err != nil { return nil, fmt.Errorf("creating package: %w", err) } delPkg := func() { u.db.Package().Delete(u.db.DefCtx(), pkg.PackageID) } spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx()) if err != nil { delPkg() return nil, fmt.Errorf("getting user space ids: %w", err) } spaceDetails := u.spaceMeta.GetMany(spaceIDs) spaceDetails = lo.Filter(spaceDetails, func(e *clitypes.UserSpaceDetail, i int) bool { return e != nil && e.UserSpace.ShardStore != nil }) var uploadSpaces []UploadSpaceInfo // TODO 应该要分离Standalone模式和RecommendHub为nil的情况 if !stgglb.StandaloneMode && srcSpace.RecommendHub != nil { coorCli := stgglb.CoordinatorRPCPool.Get() defer coorCli.Release() resp, cerr := coorCli.GetHubConnectivities(context.Background(), corrpc.ReqGetHubConnectivities([]cortypes.HubID{srcSpace.RecommendHub.HubID})) if cerr != nil { delPkg() return nil, fmt.Errorf("getting hub connectivities: %w", cerr.ToError()) } cons := make(map[cortypes.HubID]cortypes.HubConnectivity) for _, c := range resp.Connectivities { cons[c.ToHubID] = c } for _, space := range spaceDetails { latency := time.Duration(math.MaxInt64) con, ok := cons[space.RecommendHub.HubID] if ok && con.Latency != nil { latency = time.Duration(*con.Latency * float32(time.Millisecond)) } uploadSpaces = append(uploadSpaces, UploadSpaceInfo{ Space: *space, Delay: latency, IsSameLocation: space.UserSpace.Storage.GetLocation() == srcSpace.UserSpace.Storage.GetLocation(), }) } } else { for _, space := range spaceDetails { uploadSpaces = append(uploadSpaces, UploadSpaceInfo{ Space: *space, IsSameLocation: space.UserSpace.Storage.GetLocation() == srcSpace.UserSpace.Storage.GetLocation(), }) } } if len(uploadSpaces) == 0 { delPkg() return nil, fmt.Errorf("user no available userspaces") } targetSapce := u.chooseUploadStorage(uploadSpaces, uploadAffinity) store, err := u.stgPool.GetBaseStore(srcSpace) if err != nil { delPkg() return nil, fmt.Errorf("getting base store: %w", err) } entries, err := store.ListAll(rootPath) if err != nil { delPkg() return nil, fmt.Errorf("listing base store: %w", err) } mutex, err := reqbuilder.NewBuilder().UserSpace().Buzy(srcSpace.UserSpace.UserSpaceID).Buzy(targetSapce.Space.UserSpace.UserSpaceID).MutexLock(u.pubLock) if err != nil { delPkg() return nil, fmt.Errorf("acquire lock: %w", err) } defer mutex.Unlock() adds, err := u.uploadFromBaseStore(srcSpace, &targetSapce.Space, entries, rootPath) if err != nil { delPkg() return nil, fmt.Errorf("uploading from base store: %w", err) } _, err = db.DoTx21(u.db, u.db.Object().BatchAdd, pkg.PackageID, adds) if err != nil { delPkg() return nil, fmt.Errorf("adding objects: %w", err) } return &pkg, nil } func (u *Uploader) uploadFromBaseStore(srcSpace *clitypes.UserSpaceDetail, targetSpace *clitypes.UserSpaceDetail, entries []types.ListEntry, rootPath string) ([]db.AddObjectEntry, error) { ft := ioswitch2.FromTo{} for _, e := range entries { // 可以考虑增加一个配置项来控制是否上传空目录 if e.IsDir { continue } ft.AddFrom(ioswitch2.NewFromBaseStore(*srcSpace, e.Path)) ft.AddTo(ioswitch2.NewToShardStore(*targetSpace, ioswitch2.RawStream(), e.Path)) } plans := exec.NewPlanBuilder() err := parser.Parse(ft, plans) if err != nil { return nil, fmt.Errorf("parsing plan: %w", err) } exeCtx := exec.NewExecContext() exec.SetValueByType(exeCtx, u.stgPool) ret, err := plans.Execute(exeCtx).Wait(context.Background()) if err != nil { return nil, fmt.Errorf("executing plan: %w", err) } cleanRoot := strings.TrimSuffix(rootPath, clitypes.ObjectPathSeparator) adds := make([]db.AddObjectEntry, 0, len(ret)) for _, e := range entries { if e.IsDir { continue } pat := strings.TrimPrefix(e.Path, cleanRoot+clitypes.ObjectPathSeparator) if pat == cleanRoot { pat = clitypes.BaseName(e.Path) } info := ret[e.Path].(*ops2.FileInfoValue) adds = append(adds, db.AddObjectEntry{ Path: pat, Size: info.Size, FileHash: info.Hash, CreateTime: time.Now(), UserSpaceIDs: []clitypes.UserSpaceID{targetSpace.UserSpace.UserSpaceID}, }) } return adds, nil }