package http import ( "compress/gzip" "fmt" "io" "mime" "mime/multipart" "net/http" "net/url" "github.com/gin-gonic/gin" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/http2" "gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types" cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" "gitlink.org.cn/cloudream/jcs-pub/common/ecode" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/reqbuilder" corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" "gorm.io/gorm" ) type PubShardsService struct { *Server } func (s *Server) PubShards() *PubShardsService { return &PubShardsService{s} } func (s *PubShardsService) Create(ctx *gin.Context) { log := logger.WithField("HTTP", "PubShards.Create") req, err := types.ShouldBindJSONEx[cliapi.PubShardsCreate](ctx) if err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "%v", err)) return } if stgglb.StandaloneMode { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "client is not online")) return } corCli := stgglb.CoordinatorRPCPool.Get() defer corCli.Release() resp, cerr := corCli.CreatePubShards(ctx.Request.Context(), &corrpc.CreatePubShards{ Password: req.Password, MasterHub: req.MasterHub, Name: req.Name, Storage: req.Storage, Credential: req.Credential, ShardStore: req.ShardStore, Features: req.Features, WorkingDir: jcstypes.PathFromJcsPathString(req.WorkingDir), }) if cerr != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.ErrorCode(cerr.Code), cerr.Message)) return } ctx.JSON(http.StatusOK, types.OK(cliapi.PubShardsCreateResp{ PubShards: resp.PubShardStore, })) } func (s *PubShardsService) Join(ctx *gin.Context) { log := logger.WithField("HTTP", "PubShards.Join") var req cliapi.PubShardsJoin if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "%v", err)) return } if stgglb.StandaloneMode { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "client is not online")) return } corCli := stgglb.CoordinatorRPCPool.Get() defer corCli.Release() resp, cerr := corCli.UserGetPubShards(ctx.Request.Context(), &corrpc.UserGetPubShards{ PubShardsID: req.PubShardsID, Password: req.Password, }) if cerr != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.ErrorCode(cerr.Code), cerr.Message)) return } resp2, cerr2 := s.svc.UserSpaceSvc().Create(cliapi.UserSpaceCreate{ Name: req.Name, Storage: &jcstypes.PubShardsType{ Type: "PubShards", Base: resp.PubShards.Storage, PubShardsID: req.PubShardsID, Password: req.Password, MasterHub: resp.MasterHub.HubID, }, Credential: resp.PubShards.Credential, ShardStore: &resp.PubShards.ShardStore, Features: resp.PubShards.Features, WorkingDir: resp.PubShards.WorkingDir.PushNew("parts", fmt.Sprintf("%v", s.svc.AccToken.GetToken().UserID)).String(), }) if cerr2 != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.ErrorCode(cerr2.Code), cerr2.Message)) return } ctx.JSON(http.StatusOK, types.OK(cliapi.PubShardsJoinResp{ PubShards: resp.PubShards, UserSpace: resp2.UserSpace, })) } func (s *PubShardsService) List(ctx *gin.Context) { log := logger.WithField("HTTP", "PubShards.List") var req cliapi.PubShardsList if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "%v", err)) return } if stgglb.StandaloneMode { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "client is not online")) return } pubUss, err := s.svc.DB.UserSpace().GetAllPubShards(s.svc.DB.DefCtx()) if err != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "get pub shards: %v", err)) return } corCli := stgglb.CoordinatorRPCPool.Get() defer corCli.Release() var pubs []jcstypes.PubShards for _, us := range pubUss { pubType := us.Storage.(*jcstypes.PubShardsType) resp, cerr := corCli.UserGetPubShards(ctx.Request.Context(), &corrpc.UserGetPubShards{ PubShardsID: pubType.PubShardsID, Password: pubType.Password, }) if cerr != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.ErrorCode(cerr.Code), cerr.Message)) return } pubs = append(pubs, resp.PubShards) } ctx.JSON(http.StatusOK, types.OK(cliapi.PubShardsListResp{ PubShards: pubs, UserSpaces: pubUss, })) } func (s *PubShardsService) Get(ctx *gin.Context) { log := logger.WithField("HTTP", "PubShards.Get") var req cliapi.PubShardsGet if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "%v", err)) return } if stgglb.StandaloneMode { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "client is not online")) return } pubUss, err := s.svc.DB.UserSpace().GetAllPubShards(s.svc.DB.DefCtx()) if err != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "get pub shards: %v", err)) return } corCli := stgglb.CoordinatorRPCPool.Get() defer corCli.Release() for i, us := range pubUss { pubType := us.Storage.(*jcstypes.PubShardsType) resp, cerr := corCli.UserGetPubShards(ctx.Request.Context(), &corrpc.UserGetPubShards{ PubShardsID: pubType.PubShardsID, Password: pubType.Password, }) if cerr != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.ErrorCode(cerr.Code), cerr.Message)) return } if resp.PubShards.Name == req.Name { ctx.JSON(http.StatusOK, types.OK(cliapi.PubShardsGetResp{ PubShards: resp.PubShards, UserSpace: pubUss[i], })) return } } ctx.JSON(http.StatusOK, types.Failed(ecode.DataNotFound, "pub shards %v not found", req.Name)) } func (s *PubShardsService) ExportPackage(ctx *gin.Context) { log := logger.WithField("HTTP", "PubShards.ExportPackage") var req cliapi.PubShardsExportPackage if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "%v", err)) return } if stgglb.StandaloneMode { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "client is not online")) return } lock, err := reqbuilder.NewBuilder().Package().Pin(req.PackageID).MutexLock(s.svc.PubLock) if err != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "lock package: %v", err)) return } defer lock.Unlock() pubs := make(map[jcstypes.UserSpaceID]jcstypes.PubShardsID) for _, id := range req.AvailablePubShards { us, err := s.svc.DB.UserSpace().GetByPubShardsID(s.svc.DB.DefCtx(), id) if err != nil { if err == gorm.ErrRecordNotFound { ctx.JSON(http.StatusOK, types.Failed(ecode.DataNotFound, "pub shards %v not found", id)) } else { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "get pub shards %v: %v", id, err)) } return } pubs[us.UserSpaceID] = id } pkg, err := s.svc.DB.Package().GetByID(s.svc.DB.DefCtx(), req.PackageID) if err != nil { if err == gorm.ErrRecordNotFound { ctx.JSON(http.StatusOK, types.Failed(ecode.DataNotFound, "package %v not found", req.PackageID)) } else { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "get package %v: %v", req.PackageID, err)) } return } objs, err := db.DoTx11(s.svc.DB, s.svc.DB.Object().GetPackageObjectDetails, req.PackageID) if err != nil { if err == gorm.ErrRecordNotFound { ctx.JSON(http.StatusOK, types.Failed(ecode.DataNotFound, "package %v not found", req.PackageID)) } else { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "get package %v: %v", req.PackageID, err)) } return } var pack jcstypes.PubShardsPackFile for _, o := range objs { po := jcstypes.PackObject{ Object: o.Object, } for _, b := range o.Blocks { pub := pubs[b.UserSpaceID] if pub != "" { po.Blocks = append(po.Blocks, jcstypes.PackObjectBlock{ Index: b.Index, PubShardsID: pub, FileHash: b.FileHash, Size: b.Size, }) } } pack.Objects = append(pack.Objects, po) } zw := gzip.NewWriter(ctx.Writer) defer zw.Close() ctx.Header("Content-Disposition", "attachment; filename="+url.PathEscape(pkg.Name)+".pack") ctx.Header("Content-Type", "application/octet-stream") ctx.Header("Content-Transfer-Encoding", "binary") ps := serder.ObjectToJSONStream(pack) defer ps.Close() io.Copy(zw, ps) } func (s *PubShardsService) ImportPackage(ctx *gin.Context) { // log := logger.WithField("HTTP", "PubShards.ImportPackage") if stgglb.StandaloneMode { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "client is not online")) return } contType := ctx.GetHeader("Content-Type") mtype, params, err := mime.ParseMediaType(contType) if err != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "parse content-type: %v", err)) return } if mtype != http2.ContentTypeMultiPart { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "content-type %v not supported", mtype)) return } boundary := params["boundary"] if boundary == "" { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "missing boundary in content-type")) return } mr := multipart.NewReader(ctx.Request.Body, boundary) p, err := mr.NextPart() if err != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "read info part: %v", err)) return } var info cliapi.PubShardsImportPackage err = serder.JSONToObjectStream(p, &info) if err != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "parse info: %v", err)) return } if info.PackageID == 0 { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "missing packageID")) return } fr, err := mr.NextPart() if err != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "read file part: %v", err)) return } gr, err := gzip.NewReader(fr) if err != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "read gzip: %v", err)) return } defer gr.Close() pack, err := serder.JSONToObjectStreamEx[jcstypes.PubShardsPackFile](gr) if err != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "parse pack: %v", err)) return } pubShardsUs, err := s.svc.DB.UserSpace().GetAllPubShards(s.svc.DB.DefCtx()) if err != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "get pub shards: %v", err)) return } uss := make(map[jcstypes.PubShardsID]jcstypes.UserSpace) for _, us := range pubShardsUs { pub, ok := us.Storage.(*jcstypes.PubShardsType) if !ok { continue } uss[pub.PubShardsID] = us } var willAdds []jcstypes.ObjectDetail var invalidObjs []jcstypes.Object // 只添加本客户端加入的PubShards中的块 pubShardsRefs := make(map[jcstypes.PubShardsID][]jcstypes.FileHash) for _, po := range pack.Objects { blkIdx := make(map[int]bool) var addBlks []jcstypes.ObjectBlock for _, b := range po.Blocks { u, ok := uss[b.PubShardsID] if !ok { continue } pubShardsRefs[b.PubShardsID] = append(pubShardsRefs[b.PubShardsID], b.FileHash) addBlks = append(addBlks, jcstypes.ObjectBlock{ Index: b.Index, UserSpaceID: u.UserSpaceID, FileHash: b.FileHash, Size: b.Size, }) blkIdx[b.Index] = true } switch red := po.Object.Redundancy.(type) { case *jcstypes.NoneRedundancy: if len(addBlks) < 1 { invalidObjs = append(invalidObjs, po.Object) continue } case *jcstypes.RepRedundancy: if len(addBlks) < 1 { invalidObjs = append(invalidObjs, po.Object) continue } case *jcstypes.ECRedundancy: if len(blkIdx) < red.K { invalidObjs = append(invalidObjs, po.Object) continue } } willAdds = append(willAdds, jcstypes.ObjectDetail{ Object: po.Object, Blocks: addBlks, }) } // 在每一个PubShards创建FileHash引用,并且根据反馈的不存在的FileHash列表筛选导入列表 for pub, refs := range pubShardsRefs { us := uss[pub] pubType := us.Storage.(*jcstypes.PubShardsType) hubCli := stgglb.HubRPCPool.GetByID(pubType.MasterHub) resp, cerr := hubCli.PubShardsCreateRefs(ctx.Request.Context(), &hubrpc.PubShardsCreateRefs{ PubShardsID: pubType.PubShardsID, Password: pubType.Password, FileHashes: refs, }) if cerr != nil { hubCli.Release() ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "create pub shards refs at %v: %v", pubType.PubShardsID, cerr)) return } invalidHashes := make(map[jcstypes.FileHash]bool) for _, h := range resp.InvalidFileHashes { invalidHashes[h] = true } for i := range willAdds { willAdds[i].Blocks = lo.Reject(willAdds[i].Blocks, func(blk jcstypes.ObjectBlock, idx int) bool { return blk.UserSpaceID == us.UserSpaceID && invalidHashes[blk.FileHash] }) } hubCli.Release() } // 再次检查每个对象是否拥有完整块 willAdds = lo.Filter(willAdds, func(obj jcstypes.ObjectDetail, idx int) bool { blkIdx := make(map[int]bool) for _, blk := range obj.Blocks { blkIdx[blk.Index] = true } switch red := obj.Object.Redundancy.(type) { case *jcstypes.NoneRedundancy: if len(blkIdx) < 1 { invalidObjs = append(invalidObjs, obj.Object) return false } case *jcstypes.RepRedundancy: if len(blkIdx) < 1 { invalidObjs = append(invalidObjs, obj.Object) return false } case *jcstypes.ECRedundancy: if len(blkIdx) < red.K { invalidObjs = append(invalidObjs, obj.Object) return false } } return true }) _, err = db.DoTx21(s.svc.DB, s.svc.DB.Object().BatchCreateByDetails, info.PackageID, willAdds) if err != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "create objects: %v", err)) return } ctx.JSON(http.StatusOK, types.OK(cliapi.PubShardsImportPackageResp{ InvalidObjects: invalidObjs, })) }