diff --git a/client/internal/http/object.go b/client/internal/http/object.go index 34acdfd..0fc8593 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -403,3 +403,76 @@ func (s *ObjectService) GetPackageObjects(ctx *gin.Context) { ctx.JSON(http.StatusOK, OK(cdsapi.ObjectGetPackageObjectsResp{Objects: objs})) } + +func (s *ObjectService) NewMultipartUpload(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.NewMultipartUpload") + + var req cdsapi.ObjectNewMultipartUpload + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + obj, err := s.svc.ObjectSvc().NewMultipartUploadObject(req.UserID, req.PackageID, req.Path) + if err != nil { + log.Warnf("new multipart upload object: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "new multipart upload object failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdsapi.ObjectNewMultipartUploadResp{Object: obj})) +} + +type ObjectUploadPartReq struct { + Info cdsapi.ObjectUploadPartInfo `form:"info" binding:"required"` + File *multipart.FileHeader `form:"file"` +} + +func (s *ObjectService) UploadPart(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.UploadPart") + + var req ObjectUploadPartReq + if err := ctx.ShouldBind(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + file, err := req.File.Open() + if err != nil { + log.Warnf("open file: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "open file failed")) + return + } + defer file.Close() + + err = s.svc.Uploader.UploadPart(req.Info.UserID, req.Info.ObjectID, req.Info.Index, file) + if err != nil { + log.Warnf("uploading part: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("upload part: %v", err))) + return + } + + ctx.JSON(http.StatusOK, OK(cdsapi.ObjectUploadPartResp{})) +} + +func (s *ObjectService) CompleteMultipartUpload(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.CompleteMultipartUpload") + + var req cdsapi.ObjectCompleteMultipartUpload + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + obj, err := s.svc.ObjectSvc().CompleteMultipartUpload(req.UserID, req.ObjectID, req.Indexes) + if err != nil { + log.Warnf("completing multipart upload: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("complete multipart upload: %v", err))) + return + } + + ctx.JSON(http.StatusOK, OK(cdsapi.ObjectCompleteMultipartUploadResp{Object: obj})) +} diff --git a/client/internal/http/server.go b/client/internal/http/server.go index aca1faf..4597dca 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -119,4 +119,8 @@ func (s *Server) routeV1(eg *gin.Engine, rt gin.IRoutes) { rt.POST(cdsapi.UserCreatePath, s.User().Create) rt.POST(cdsapi.UserDeletePath, s.User().Delete) + + rt.POST(cdsapi.ObjectNewMultipartUploadPath, s.Object().NewMultipartUpload) + rt.POST(cdsapi.ObjectUploadPartPath, s.Object().UploadPart) + rt.POST(cdsapi.ObjectCompleteMultipartUploadPath, s.Object().CompleteMultipartUpload) } diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 1476d35..780be64 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -1,14 +1,18 @@ package services import ( + "context" "fmt" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/plans" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -163,3 +167,120 @@ func (svc *ObjectService) GetObjectDetail(objectID cdssdk.ObjectID) (*stgmod.Obj return getResp.Objects[0], nil } + +func (svc *ObjectService) NewMultipartUploadObject(userID cdssdk.UserID, pkgID cdssdk.PackageID, path string) (cdssdk.Object, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return cdssdk.Object{}, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + resp, err := coorCli.NewMultipartUploadObject(coormq.ReqNewMultipartUploadObject(userID, pkgID, path)) + if err != nil { + return cdssdk.Object{}, err + } + + return resp.Object, nil +} + +func (svc *ObjectService) CompleteMultipartUpload(userID cdssdk.UserID, objectID cdssdk.ObjectID, indexes []int) (cdssdk.Object, error) { + if len(indexes) == 0 { + return cdssdk.Object{}, fmt.Errorf("no block indexes specified") + } + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return cdssdk.Object{}, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + details, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails([]cdssdk.ObjectID{objectID})) + if err != nil { + return cdssdk.Object{}, err + } + + if details.Objects[0] == nil { + return cdssdk.Object{}, fmt.Errorf("object %v not found", objectID) + } + + objDe := details.Objects[0] + + _, ok := objDe.Object.Redundancy.(*cdssdk.MultipartUploadRedundancy) + if !ok { + return cdssdk.Object{}, fmt.Errorf("object %v is not a multipart upload", objectID) + } + + if len(objDe.Blocks) == 0 { + return cdssdk.Object{}, fmt.Errorf("object %v has no blocks", objectID) + } + + objBlkMap := make(map[int]stgmod.ObjectBlock) + for _, blk := range objDe.Blocks { + objBlkMap[blk.Index] = blk + } + + var compBlks []stgmod.ObjectBlock + var compBlkStgs []stgmod.StorageDetail + var targetStg stgmod.StorageDetail + for i, idx := range indexes { + blk, ok := objBlkMap[idx] + if !ok { + return cdssdk.Object{}, fmt.Errorf("block %d not found in object %v", idx, objectID) + } + + stg := svc.StorageMeta.Get(blk.StorageID) + if stg == nil { + return cdssdk.Object{}, fmt.Errorf("storage %d not found", blk.StorageID) + } + + compBlks = append(compBlks, blk) + compBlkStgs = append(compBlkStgs, *stg) + if i == 0 { + targetStg = *stg + } + } + + bld := exec.NewPlanBuilder() + err = plans.CompleteMultipart(compBlks, compBlkStgs, targetStg, "shard", bld) + if err != nil { + return cdssdk.Object{}, err + } + + exeCtx := exec.NewExecContext() + ret, err := bld.Execute(exeCtx).Wait(context.Background()) + if err != nil { + return cdssdk.Object{}, err + } + + shardInfo := ret["shard"].(*ops2.ShardInfoValue) + _, err = coorCli.UpdateObjectRedundancy(coormq.ReqUpdateObjectRedundancy([]coormq.UpdatingObjectRedundancy{ + { + ObjectID: objectID, + FileHash: shardInfo.Hash, + Size: shardInfo.Size, + Redundancy: cdssdk.NewNoneRedundancy(), + Blocks: []stgmod.ObjectBlock{{ + ObjectID: objectID, + Index: 0, + StorageID: targetStg.Storage.StorageID, + FileHash: shardInfo.Hash, + Size: shardInfo.Size, + }}, + }, + })) + + if err != nil { + return cdssdk.Object{}, err + } + + getObj, err := coorCli.GetObjects(coormq.ReqGetObjects(userID, []cdssdk.ObjectID{objectID})) + if err != nil { + return cdssdk.Object{}, err + } + + if getObj.Objects[0] == nil { + return cdssdk.Object{}, fmt.Errorf("object %v not found", objectID) + } + + return *getObj.Objects[0], nil +} diff --git a/common/pkgs/db2/object_block.go b/common/pkgs/db2/object_block.go index 53550a6..b6e8511 100644 --- a/common/pkgs/db2/object_block.go +++ b/common/pkgs/db2/object_block.go @@ -43,8 +43,8 @@ func (*ObjectBlockDB) GetInPackageID(ctx SQLContext, packageID cdssdk.PackageID) return rets, err } -func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, stgID cdssdk.StorageID, fileHash cdssdk.FileHash) error { - block := stgmod.ObjectBlock{ObjectID: objectID, Index: index, StorageID: stgID, FileHash: fileHash} +func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, stgID cdssdk.StorageID, fileHash cdssdk.FileHash, size int64) error { + block := stgmod.ObjectBlock{ObjectID: objectID, Index: index, StorageID: stgID, FileHash: fileHash, Size: size} return ctx.Table("ObjectBlock").Create(&block).Error } @@ -60,6 +60,10 @@ func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.Object return ctx.Table("ObjectBlock").Where("ObjectID = ?", objectID).Delete(&stgmod.ObjectBlock{}).Error } +func (db *ObjectBlockDB) DeleteByObjectIDIndex(ctx SQLContext, objectID cdssdk.ObjectID, index int) error { + return ctx.Table("ObjectBlock").Where("ObjectID = ? AND `Index` = ?", objectID, index).Delete(&stgmod.ObjectBlock{}).Error +} + func (db *ObjectBlockDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { if len(objectIDs) == 0 { return nil diff --git a/common/pkgs/ioswitch2/plans/complete_multipart.go b/common/pkgs/ioswitch2/plans/complete_multipart.go new file mode 100644 index 0000000..1644a1d --- /dev/null +++ b/common/pkgs/ioswitch2/plans/complete_multipart.go @@ -0,0 +1,49 @@ +package plans + +import ( + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +func CompleteMultipart(blocks []stgmod.ObjectBlock, blockStgs []stgmod.StorageDetail, targetStg stgmod.StorageDetail, shardInfoKey string, blder *exec.PlanBuilder) error { + da := ops2.NewGraphNodeBuilder() + + sizes := make([]int64, len(blocks)) + for i, blk := range blocks { + sizes[i] = blk.Size + } + joinNode := da.NewSegmentJoin(sizes) + joinNode.Env().ToEnvWorker(getWorkerInfo(*targetStg.MasterHub)) + joinNode.Env().Pinned = true + + for i, blk := range blocks { + rd := da.NewShardRead(nil, blk.StorageID, types.NewOpen(blk.FileHash)) + rd.Env().ToEnvWorker(getWorkerInfo(*blockStgs[i].MasterHub)) + rd.Env().Pinned = true + + rd.Output().ToSlot(joinNode.InputSlot(i)) + } + + // TODO 应该采取更合理的方式同时支持Parser和直接生成DAG + wr := da.NewShardWrite(nil, targetStg, shardInfoKey) + wr.Env().ToEnvWorker(getWorkerInfo(*targetStg.MasterHub)) + wr.Env().Pinned = true + + joinNode.Joined().ToSlot(wr.Input()) + + if shardInfoKey != "" { + store := da.NewStore() + store.Env().ToEnvDriver() + store.Store(shardInfoKey, wr.FileHashVar()) + } + + err := plan.Compile(da.Graph, blder) + if err != nil { + return err + } + + return nil +} diff --git a/common/pkgs/ioswitch2/plans/utils.go b/common/pkgs/ioswitch2/plans/utils.go new file mode 100644 index 0000000..47e9003 --- /dev/null +++ b/common/pkgs/ioswitch2/plans/utils.go @@ -0,0 +1,20 @@ +package plans + +import ( + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" +) + +func getWorkerInfo(hub cdssdk.Hub) exec.WorkerInfo { + switch addr := hub.Address.(type) { + case *cdssdk.HttpAddressInfo: + return &ioswitch2.HttpHubWorker{Hub: hub} + + case *cdssdk.GRPCAddressInfo: + return &ioswitch2.AgentWorker{Hub: hub, Address: *addr} + + default: + return nil + } +} diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 4ee0005..6ac2248 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -33,6 +33,10 @@ type ObjectService interface { GetDatabaseAll(msg *GetDatabaseAll) (*GetDatabaseAllResp, *mq.CodeMessage) AddAccessStat(msg *AddAccessStat) + + NewMultipartUploadObject(msg *NewMultipartUploadObject) (*NewMultipartUploadObjectResp, *mq.CodeMessage) + + AddMultipartUploadPart(msg *AddMultipartUploadPart) (*AddMultipartUploadPartResp, *mq.CodeMessage) } var _ = Register(Service.GetObjects) @@ -190,6 +194,8 @@ type UpdateObjectRedundancyResp struct { } type UpdatingObjectRedundancy struct { ObjectID cdssdk.ObjectID `json:"objectID"` + FileHash cdssdk.FileHash `json:"fileHash"` + Size int64 `json:"size"` Redundancy cdssdk.Redundancy `json:"redundancy"` PinnedAt []cdssdk.StorageID `json:"pinnedAt"` Blocks []stgmod.ObjectBlock `json:"blocks"` @@ -346,3 +352,59 @@ func ReqAddAccessStat(entries []AddAccessStatEntry) *AddAccessStat { func (client *Client) AddAccessStat(msg *AddAccessStat) error { return mq.Send(Service.AddAccessStat, client.rabbitCli, msg) } + +var _ = Register(Service.NewMultipartUploadObject) + +type NewMultipartUploadObject struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` + Path string `json:"path"` +} +type NewMultipartUploadObjectResp struct { + mq.MessageBodyBase + Object cdssdk.Object `json:"object"` +} + +func ReqNewMultipartUploadObject(userID cdssdk.UserID, packageID cdssdk.PackageID, path string) *NewMultipartUploadObject { + return &NewMultipartUploadObject{ + UserID: userID, + PackageID: packageID, + Path: path, + } +} +func RespNewMultipartUploadObject(object cdssdk.Object) *NewMultipartUploadObjectResp { + return &NewMultipartUploadObjectResp{ + Object: object, + } +} +func (client *Client) NewMultipartUploadObject(msg *NewMultipartUploadObject) (*NewMultipartUploadObjectResp, error) { + return mq.Request(Service.NewMultipartUploadObject, client.rabbitCli, msg) +} + +var _ = Register(Service.AddMultipartUploadPart) + +type AddMultipartUploadPart struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + ObjectID cdssdk.ObjectID `json:"objectID"` + Block stgmod.ObjectBlock `json:"block"` +} + +type AddMultipartUploadPartResp struct { + mq.MessageBodyBase +} + +func ReqAddMultipartUploadPart(userID cdssdk.UserID, objectID cdssdk.ObjectID, blk stgmod.ObjectBlock) *AddMultipartUploadPart { + return &AddMultipartUploadPart{ + UserID: userID, + ObjectID: objectID, + Block: blk, + } +} +func RespAddMultipartUploadPart() *AddMultipartUploadPartResp { + return &AddMultipartUploadPartResp{} +} +func (client *Client) AddMultipartUploadPart(msg *AddMultipartUploadPart) (*AddMultipartUploadPartResp, error) { + return mq.Request(Service.AddMultipartUploadPart, client.rabbitCli, msg) +} diff --git a/common/pkgs/uploader/uploader.go b/common/pkgs/uploader/uploader.go index 245e36f..b98a05b 100644 --- a/common/pkgs/uploader/uploader.go +++ b/common/pkgs/uploader/uploader.go @@ -1,12 +1,15 @@ package uploader import ( + "context" "fmt" + "io" "math" "math/rand" "time" "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/sort2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" @@ -14,6 +17,9 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/storage/common/pkgs/metacache" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" @@ -25,8 +31,6 @@ type Uploader struct { connectivity *connectivity.Collector stgAgts *agtpool.AgentPool stgMeta *metacache.StorageMeta - loadTo []cdssdk.StorageID - loadToPath []string } func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgAgts *agtpool.AgentPool, stgMeta *metacache.StorageMeta) *Uploader { @@ -177,3 +181,105 @@ func (u *Uploader) BeginCreateLoad(userID cdssdk.UserID, bktID cdssdk.BucketID, distlock: lock, }, nil } + +func (u *Uploader) UploadPart(userID cdssdk.UserID, objID cdssdk.ObjectID, index int, stream io.Reader) error { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + details, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails([]cdssdk.ObjectID{objID})) + if err != nil { + return err + } + + if details.Objects[0] == nil { + return fmt.Errorf("object %v not found", objID) + } + + objDe := details.Objects[0] + _, ok := objDe.Object.Redundancy.(*cdssdk.MultipartUploadRedundancy) + if !ok { + return fmt.Errorf("object %v is not a multipart upload", objID) + } + + var stg stgmod.StorageDetail + if len(objDe.Blocks) > 0 { + cstg := u.stgMeta.Get(objDe.Blocks[0].StorageID) + if cstg == nil { + return fmt.Errorf("storage %v not found", objDe.Blocks[0].StorageID) + } + + stg = *cstg + + } else { + getUserStgsResp, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(userID)) + if err != nil { + return fmt.Errorf("getting user storages: %w", err) + } + + cons := u.connectivity.GetAll() + var userStgs []UploadStorageInfo + for _, stg := range getUserStgsResp.Storages { + if stg.MasterHub == nil { + continue + } + + delay := time.Duration(math.MaxInt64) + + con, ok := cons[stg.MasterHub.HubID] + if ok && con.Latency != nil { + delay = *con.Latency + } + + userStgs = append(userStgs, UploadStorageInfo{ + Storage: stg, + Delay: delay, + IsSameLocation: stg.MasterHub.LocationID == stgglb.Local.LocationID, + }) + } + + if len(userStgs) == 0 { + return fmt.Errorf("user no available storages") + } + + stg = u.chooseUploadStorage(userStgs, 0).Storage + } + + lock, err := reqbuilder.NewBuilder().Shard().Buzy(stg.Storage.StorageID).MutexLock(u.distlock) + if err != nil { + return fmt.Errorf("acquire distlock: %w", err) + } + defer lock.Unlock() + + ft := ioswitch2.NewFromTo() + fromDrv, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) + ft.AddFrom(fromDrv). + AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg, ioswitch2.RawStream(), "shard")) + + plans := exec.NewPlanBuilder() + err = parser.Parse(ft, plans) + if err != nil { + return fmt.Errorf("parse fromto: %w", err) + } + + exeCtx := exec.NewExecContext() + exec.SetValueByType(exeCtx, u.stgAgts) + exec := plans.Execute(exeCtx) + exec.BeginWrite(io.NopCloser(stream), hd) + ret, err := exec.Wait(context.TODO()) + if err != nil { + return fmt.Errorf("executing plan: %w", err) + } + + shardInfo := ret["shard"].(*ops2.ShardInfoValue) + _, err = coorCli.AddMultipartUploadPart(coormq.ReqAddMultipartUploadPart(userID, objID, stgmod.ObjectBlock{ + ObjectID: objID, + Index: index, + StorageID: stg.Storage.StorageID, + FileHash: shardInfo.Hash, + Size: shardInfo.Size, + })) + return err +} diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index fa5e012..9fb9ba2 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -235,13 +235,15 @@ func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) ( for _, obj := range objs { dummyObjs = append(dummyObjs, cdssdk.Object{ ObjectID: obj.ObjectID, + FileHash: obj.FileHash, + Size: obj.Size, Redundancy: obj.Redundancy, CreateTime: nowTime, // 实际不会更新,只因为不能是0值 UpdateTime: nowTime, }) } - err = db.Object().BatchUpdateColumns(ctx, dummyObjs, []string{"Redundancy", "UpdateTime"}) + err = db.Object().BatchUpdateColumns(ctx, dummyObjs, []string{"FileHash", "Size", "Redundancy", "UpdateTime"}) if err != nil { return fmt.Errorf("batch update object redundancy: %w", err) } @@ -764,3 +766,117 @@ func (svc *Service) CloneObjects(msg *coormq.CloneObjects) (*coormq.CloneObjects return mq.ReplyOK(coormq.RespCloneObjects(ret)) } + +func (svc *Service) NewMultipartUploadObject(msg *coormq.NewMultipartUploadObject) (*coormq.NewMultipartUploadObjectResp, *mq.CodeMessage) { + var obj cdssdk.Object + err := svc.db2.DoTx(func(tx db2.SQLContext) error { + oldObjs, err := svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path) + if err == nil && len(oldObjs) > 0 { + obj = oldObjs[0] + err := svc.db2.ObjectBlock().DeleteByObjectID(tx, obj.ObjectID) + if err != nil { + return fmt.Errorf("delete object blocks: %w", err) + } + + obj.FileHash = cdssdk.EmptyHash + obj.Size = 0 + obj.Redundancy = cdssdk.NewMultipartUploadRedundancy() + obj.UpdateTime = time.Now() + + err = svc.db2.Object().BatchUpdate(tx, []cdssdk.Object{obj}) + if err != nil { + return fmt.Errorf("update object: %w", err) + } + + return nil + } + + obj = cdssdk.Object{ + PackageID: msg.PackageID, + Path: msg.Path, + FileHash: cdssdk.EmptyHash, + Size: 0, + Redundancy: cdssdk.NewMultipartUploadRedundancy(), + CreateTime: time.Now(), + UpdateTime: time.Now(), + } + objID, err := svc.db2.Object().Create(tx, obj) + if err != nil { + return fmt.Errorf("create object: %w", err) + } + + obj.ObjectID = objID + return nil + }) + if err != nil { + logger.Warnf("new multipart upload object: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("new multipart upload object: %v", err)) + } + + return mq.ReplyOK(coormq.RespNewMultipartUploadObject(obj)) +} + +func (svc *Service) AddMultipartUploadPart(msg *coormq.AddMultipartUploadPart) (*coormq.AddMultipartUploadPartResp, *mq.CodeMessage) { + err := svc.db2.DoTx(func(tx db2.SQLContext) error { + obj, err := svc.db2.Object().GetByID(tx, msg.ObjectID) + if err != nil { + return fmt.Errorf("getting object by id: %w", err) + } + + _, ok := obj.Redundancy.(*cdssdk.MultipartUploadRedundancy) + if !ok { + return fmt.Errorf("object is not a multipart upload object") + } + + blks, err := svc.db2.ObjectBlock().BatchGetByObjectID(tx, []cdssdk.ObjectID{obj.ObjectID}) + if err != nil { + return fmt.Errorf("batch getting object blocks: %w", err) + } + + blks = lo.Reject(blks, func(blk stgmod.ObjectBlock, idx int) bool { return blk.Index == msg.Block.Index }) + blks = append(blks, msg.Block) + + blks = sort2.Sort(blks, func(a, b stgmod.ObjectBlock) int { return a.Index - b.Index }) + + totalSize := int64(0) + var hashes [][]byte + for _, blk := range blks { + totalSize += blk.Size + hashes = append(hashes, blk.FileHash.GetHashBytes()) + } + + newObjHash := cdssdk.CalculateCompositeHash(hashes) + obj.Size = totalSize + obj.FileHash = newObjHash + obj.UpdateTime = time.Now() + + err = svc.db2.ObjectBlock().DeleteByObjectIDIndex(tx, msg.ObjectID, msg.Block.Index) + if err != nil { + return fmt.Errorf("delete object block: %w", err) + } + + err = svc.db2.ObjectBlock().Create(tx, msg.ObjectID, msg.Block.Index, msg.Block.StorageID, msg.Block.FileHash, msg.Block.Size) + if err != nil { + return fmt.Errorf("create object block: %w", err) + } + + err = svc.db2.Object().BatchUpdate(tx, []cdssdk.Object{obj}) + if err != nil { + return fmt.Errorf("update object: %w", err) + } + + return nil + }) + if err != nil { + logger.Warnf("add multipart upload part: %s", err.Error()) + + code := errorcode.OperationFailed + if errors.Is(err, gorm.ErrRecordNotFound) { + code = errorcode.DataNotFound + } + + return nil, mq.Failed(code, fmt.Sprintf("add multipart upload part: %v", err)) + } + + return mq.ReplyOK(coormq.RespAddMultipartUploadPart()) +} diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 0883030..e66fabd 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -515,6 +515,8 @@ func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.Object return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, + FileHash: obj.Object.FileHash, + Size: obj.Object.Size, Redundancy: red, Blocks: blocks, }, nil @@ -599,6 +601,8 @@ func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectD return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, + FileHash: obj.Object.FileHash, + Size: obj.Object.Size, Redundancy: red, Blocks: blocks, }, nil @@ -682,6 +686,8 @@ func (t *CheckPackageRedundancy) noneToLRC(ctx ExecuteContext, obj stgmod.Object return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, + FileHash: obj.Object.FileHash, + Size: obj.Object.Size, Redundancy: red, Blocks: blocks, }, nil @@ -771,6 +777,8 @@ func (t *CheckPackageRedundancy) noneToSeg(ctx ExecuteContext, obj stgmod.Object return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, + FileHash: obj.Object.FileHash, + Size: obj.Object.Size, Redundancy: red, Blocks: blocks, }, nil @@ -845,6 +853,8 @@ func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectD return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, + FileHash: obj.Object.FileHash, + Size: obj.Object.Size, Redundancy: red, Blocks: blocks, }, nil @@ -975,6 +985,8 @@ func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDe return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, + FileHash: obj.Object.FileHash, + Size: obj.Object.Size, Redundancy: tarRed, Blocks: blocks, }, nil @@ -1126,6 +1138,8 @@ func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDet return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, + FileHash: obj.Object.FileHash, + Size: obj.Object.Size, Redundancy: tarRed, Blocks: newBlocks, }, nil @@ -1334,6 +1348,8 @@ func (t *CheckPackageRedundancy) reconstructLRC(ctx ExecuteContext, obj stgmod.O return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, + FileHash: obj.Object.FileHash, + Size: obj.Object.Size, Redundancy: red, Blocks: newBlocks, }, nil diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index 693549a..7d7d6fc 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -750,6 +750,8 @@ func (t *CleanPinned) alwaysAccept(curTemp float64, dScore float64, coolingRate func (t *CleanPinned) makePlansForRepObject(allStgInfos map[cdssdk.StorageID]*stgmod.StorageDetail, solu annealingSolution, obj stgmod.ObjectDetail, planBld *exec.PlanBuilder, planningHubIDs map[cdssdk.StorageID]bool) coormq.UpdatingObjectRedundancy { entry := coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, + FileHash: obj.Object.FileHash, + Size: obj.Object.Size, Redundancy: obj.Object.Redundancy, } @@ -850,6 +852,8 @@ func (t *CleanPinned) generateSysEventForRepObject(solu annealingSolution, obj s func (t *CleanPinned) makePlansForECObject(allStgInfos map[cdssdk.StorageID]*stgmod.StorageDetail, solu annealingSolution, obj stgmod.ObjectDetail, planBld *exec.PlanBuilder, planningHubIDs map[cdssdk.StorageID]bool) coormq.UpdatingObjectRedundancy { entry := coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, + FileHash: obj.Object.FileHash, + Size: obj.Object.Size, Redundancy: obj.Object.Redundancy, }