package uploader import ( "context" "fmt" "io" "math" "math/rand" "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/internal/metacache" clitypes "gitlink.org.cn/cloudream/storage2/client/types" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" ) type Uploader struct { distlock *distlock.Service connectivity *connectivity.Collector stgPool *pool.Pool spaceMeta *metacache.UserSpaceMeta db *db.DB } func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgPool *pool.Pool, spaceMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader { return &Uploader{ distlock: distlock, connectivity: connectivity, stgPool: stgPool, spaceMeta: spaceMeta, db: db, } } func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserSpaceID, loadTo []clitypes.UserSpaceID, loadToPath []string) (*UpdateUploader, error) { spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx()) if err != nil { return nil, fmt.Errorf("getting user space ids: %w", err) } spaceDetails := u.spaceMeta.GetMany(spaceIDs) spaceDetails = lo2.RemoveAllDefault(spaceDetails) cons := u.connectivity.GetAll() var uploadSpaces []UploadSpaceInfo for _, space := range spaceDetails { if space.MasterHub == nil { continue } latency := time.Duration(math.MaxInt64) con, ok := cons[space.MasterHub.HubID] if ok && con.Latency != nil { latency = *con.Latency } uploadSpaces = append(uploadSpaces, UploadSpaceInfo{ Space: *space, Delay: latency, IsSameLocation: space.MasterHub.LocationID == stgglb.Local.LocationID, }) } if len(uploadSpaces) == 0 { return nil, fmt.Errorf("user no available storages") } loadToSpaces := make([]clitypes.UserSpaceDetail, len(loadTo)) for i, spaceID := range loadTo { space, ok := lo.Find(spaceDetails, func(space *clitypes.UserSpaceDetail) bool { return space.UserSpace.UserSpaceID == spaceID }) if !ok { return nil, fmt.Errorf("load to storage %v not found", spaceID) } if space.MasterHub == nil { return nil, fmt.Errorf("load to storage %v has no master hub", spaceID) } loadToSpaces[i] = *space } target := u.chooseUploadStorage(uploadSpaces, affinity) // TODO2 加锁 // 给上传节点的IPFS加锁 // TODO 考虑加Object的Create锁 // 防止上传的副本被清除 // distMutex, err := reqbuilder.NewBuilder().Shard().Buzy(target.Space.Storage.StorageID).MutexLock(u.distlock) // if err != nil { // return nil, fmt.Errorf("acquire distlock: %w", err) // } return &UpdateUploader{ uploader: u, pkgID: pkgID, targetSpace: target.Space, // distMutex: distMutex, loadToSpaces: loadToSpaces, loadToPath: loadToPath, }, nil } // chooseUploadStorage 选择一个上传文件的节点 // 1. 选择设置了亲和性的节点 // 2. 从与当前客户端相同地域的节点中随机选一个 // 3. 没有的话从所有节点选择延迟最低的节点 func (w *Uploader) chooseUploadStorage(spaces []UploadSpaceInfo, spaceAffinity clitypes.UserSpaceID) UploadSpaceInfo { if spaceAffinity > 0 { aff, ok := lo.Find(spaces, func(space UploadSpaceInfo) bool { return space.Space.UserSpace.UserSpaceID == spaceAffinity }) if ok { return aff } } sameLocationStorages := lo.Filter(spaces, func(e UploadSpaceInfo, i int) bool { return e.IsSameLocation }) if len(sameLocationStorages) > 0 { return sameLocationStorages[rand.Intn(len(sameLocationStorages))] } // 选择延迟最低的节点 spaces = sort2.Sort(spaces, func(e1, e2 UploadSpaceInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) }) return spaces[0] } func (u *Uploader) BeginCreateLoad(bktID clitypes.BucketID, pkgName string, loadTo []clitypes.UserSpaceID, loadToPath []string) (*CreateLoadUploader, error) { getSpaces := u.spaceMeta.GetMany(loadTo) spacesStgs := make([]clitypes.UserSpaceDetail, len(loadTo)) for i, stg := range getSpaces { if stg == nil { return nil, fmt.Errorf("storage %v not found", loadTo[i]) } spacesStgs[i] = *stg } pkg, err := db.DoTx01(u.db, func(tx db.SQLContext) (clitypes.Package, error) { _, err := u.db.Bucket().GetByID(tx, bktID) if err != nil { return clitypes.Package{}, err } return u.db.Package().Create(u.db.DefCtx(), bktID, pkgName) }) if err != nil { return nil, fmt.Errorf("create package: %w", err) } // TODO2 加锁 // reqBld := reqbuilder.NewBuilder() // for _, stg := range spacesStgs { // reqBld.Shard().Buzy(stg.Storage.StorageID) // reqBld.Storage().Buzy(stg.Storage.StorageID) // } // lock, err := reqBld.MutexLock(u.distlock) // if err != nil { // return nil, fmt.Errorf("acquire distlock: %w", err) // } return &CreateLoadUploader{ pkg: pkg, targetSpaces: spacesStgs, loadRoots: loadToPath, uploader: u, // distlock: lock, }, nil } func (u *Uploader) UploadPart(objID clitypes.ObjectID, index int, stream io.Reader) error { detail, err := u.db.Object().GetDetail(u.db.DefCtx(), objID) if err != nil { return fmt.Errorf("getting object detail: %w", err) } objDe := detail _, ok := objDe.Object.Redundancy.(*clitypes.MultipartUploadRedundancy) if !ok { return fmt.Errorf("object %v is not a multipart upload", objID) } var space clitypes.UserSpaceDetail if len(objDe.Blocks) > 0 { cstg := u.spaceMeta.Get(objDe.Blocks[0].UserSpaceID) if cstg == nil { return fmt.Errorf("space %v not found", objDe.Blocks[0].UserSpaceID) } space = *cstg } else { spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx()) if err != nil { return fmt.Errorf("getting user space ids: %w", err) } spaces := u.spaceMeta.GetMany(spaceIDs) spaces = lo2.RemoveAllDefault(spaces) cons := u.connectivity.GetAll() var userStgs []UploadSpaceInfo for _, space := range spaces { if space.MasterHub == nil { continue } delay := time.Duration(math.MaxInt64) con, ok := cons[space.MasterHub.HubID] if ok && con.Latency != nil { delay = *con.Latency } userStgs = append(userStgs, UploadSpaceInfo{ Space: *space, Delay: delay, IsSameLocation: space.MasterHub.LocationID == stgglb.Local.LocationID, }) } if len(userStgs) == 0 { return fmt.Errorf("user no available storages") } space = u.chooseUploadStorage(userStgs, 0).Space } // TODO2 加锁 // lock, err := reqbuilder.NewBuilder().Shard().Buzy(space.Storage.StorageID).MutexLock(u.distlock) // if err != nil { // return fmt.Errorf("acquire distlock: %w", err) // } // defer lock.Unlock() ft := ioswitch2.NewFromTo() fromDrv, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) ft.AddFrom(fromDrv). AddTo(ioswitch2.NewToShardStore(*space.MasterHub, space, ioswitch2.RawStream(), "shard")) plans := exec.NewPlanBuilder() err = parser.Parse(ft, plans) if err != nil { return fmt.Errorf("parse fromto: %w", err) } exeCtx := exec.NewExecContext() exec.SetValueByType(exeCtx, u.stgPool) exec := plans.Execute(exeCtx) exec.BeginWrite(io.NopCloser(stream), hd) ret, err := exec.Wait(context.TODO()) if err != nil { return fmt.Errorf("executing plan: %w", err) } shardInfo := ret["shard"].(*ops2.ShardInfoValue) err = u.db.DoTx(func(tx db.SQLContext) error { return u.db.Object().AppendPart(tx, clitypes.ObjectBlock{ ObjectID: objID, Index: index, UserSpaceID: space.UserSpace.UserSpaceID, FileHash: shardInfo.Hash, Size: shardInfo.Size, }) }) return err }