package uploader import ( "context" "fmt" "io" "math" "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/reqbuilder" corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" ) func (u *Uploader) UserSpaceUpload(userSpaceID jcstypes.UserSpaceID, rootPath jcstypes.JPath, targetBktID jcstypes.BucketID, newPkgName string, uploadAffinity jcstypes.UserSpaceID) (*jcstypes.Package, error) { srcSpace := u.spaceMeta.Get(userSpaceID) if srcSpace == nil { return nil, fmt.Errorf("user space %d not found", userSpaceID) } pkg, err := db.DoTx01(u.db, func(tx db.SQLContext) (jcstypes.Package, error) { _, err := u.db.Bucket().GetByID(tx, targetBktID) if err != nil { return jcstypes.Package{}, err } return u.db.Package().Create(tx, targetBktID, newPkgName, time.Now()) }) if err != nil { return nil, fmt.Errorf("creating package: %w", err) } delPkg := func() { u.db.Package().Delete(u.db.DefCtx(), pkg.PackageID) } spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx()) if err != nil { delPkg() return nil, fmt.Errorf("getting user space ids: %w", err) } spaceDetails := u.spaceMeta.GetMany(spaceIDs) spaceDetails = lo.Filter(spaceDetails, func(e *jcstypes.UserSpaceDetail, i int) bool { return e != nil && e.UserSpace.ShardStore != nil }) var uploadSpaces []UploadSpaceInfo // TODO 应该要分离Standalone模式和RecommendHub为nil的情况 if !stgglb.StandaloneMode && srcSpace.RecommendHub != nil { coorCli := stgglb.CoordinatorRPCPool.Get() defer coorCli.Release() resp, cerr := coorCli.GetHubConnectivities(context.Background(), corrpc.ReqGetHubConnectivities([]jcstypes.HubID{srcSpace.RecommendHub.HubID})) if cerr != nil { delPkg() return nil, fmt.Errorf("getting hub connectivities: %w", cerr.ToError()) } cons := make(map[jcstypes.HubID]jcstypes.HubConnectivity) for _, c := range resp.Connectivities { cons[c.ToHubID] = c } for _, space := range spaceDetails { latency := time.Duration(math.MaxInt64) con, ok := cons[space.RecommendHub.HubID] if ok && con.Latency != nil { latency = time.Duration(*con.Latency * float32(time.Millisecond)) } uploadSpaces = append(uploadSpaces, UploadSpaceInfo{ Space: *space, Delay: latency, IsSameLocation: space.UserSpace.Storage.GetLocation() == srcSpace.UserSpace.Storage.GetLocation(), }) } } else { for _, space := range spaceDetails { uploadSpaces = append(uploadSpaces, UploadSpaceInfo{ Space: *space, IsSameLocation: space.UserSpace.Storage.GetLocation() == srcSpace.UserSpace.Storage.GetLocation(), }) } } if len(uploadSpaces) == 0 { delPkg() return nil, fmt.Errorf("user no available userspaces") } targetSapce := u.chooseUploadStorage(uploadSpaces, uploadAffinity) store, err := u.stgPool.GetBaseStore(srcSpace) if err != nil { delPkg() return nil, fmt.Errorf("getting base store: %w", err) } mutex, err := reqbuilder.NewBuilder().UserSpace().Buzy(srcSpace.UserSpace.UserSpaceID).Buzy(targetSapce.Space.UserSpace.UserSpaceID).MutexLock(u.pubLock) if err != nil { delPkg() return nil, fmt.Errorf("acquire lock: %w", err) } defer mutex.Unlock() dirReader := store.ReadDir(rootPath) var adds []db.AddObjectEntry entries := make([]stgtypes.DirEntry, 0, 50) for { eof := false for len(entries) < 50 { entry, err := dirReader.Next() if err == io.EOF { eof = true break } if err != nil { delPkg() return nil, fmt.Errorf("reading dir: %w", err) } entries = append(entries, entry) } as, err := u.uploadFromBaseStore(srcSpace, &targetSapce.Space, entries, rootPath) if err != nil { delPkg() return nil, fmt.Errorf("uploading from base store: %w", err) } adds = append(adds, as...) entries = entries[:0] if eof { break } } _, err = db.DoTx21(u.db, u.db.Object().BatchAdd, pkg.PackageID, adds) if err != nil { delPkg() return nil, fmt.Errorf("adding objects: %w", err) } return &pkg, nil } func (u *Uploader) uploadFromBaseStore(srcSpace *jcstypes.UserSpaceDetail, targetSpace *jcstypes.UserSpaceDetail, entries []stgtypes.DirEntry, rootPath jcstypes.JPath) ([]db.AddObjectEntry, error) { ft := ioswitch2.FromTo{} for _, e := range entries { // 可以考虑增加一个配置项来控制是否上传空目录 if e.IsDir { continue } ft.AddFrom(ioswitch2.NewFromBaseStore(*srcSpace, e.Path)) ft.AddTo(ioswitch2.NewToShardStore(*targetSpace, ioswitch2.RawStream(), e.Path.String())) } plans := exec.NewPlanBuilder() err := parser.Parse(ft, plans) if err != nil { return nil, fmt.Errorf("parsing plan: %w", err) } exeCtx := exec.NewExecContext() exec.SetValueByType(exeCtx, u.stgPool) ret, err := plans.Execute(exeCtx).Wait(context.Background()) if err != nil { return nil, fmt.Errorf("executing plan: %w", err) } adds := make([]db.AddObjectEntry, 0, len(ret.Stored)) for _, e := range entries { if e.IsDir { continue } pat := e.Path.Clone() pat.PopFrontN(rootPath.Len() - 1) // 如果对象路径和RootPath相同(即RootPath是一个文件),则用文件名作为对象Path if pat.Len() > 1 { pat.PopFrontN(1) } info := ret.Get(e.Path.String()).(*ops2.FileInfoValue) adds = append(adds, db.AddObjectEntry{ Path: pat.String(), Size: info.Size, FileHash: info.Hash, CreateTime: time.Now(), UserSpaceIDs: []jcstypes.UserSpaceID{targetSpace.UserSpace.UserSpaceID}, }) } return adds, nil }