From ccf8b9a462094a3668501effd57c84a252b1c995 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 28 Jul 2025 10:06:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AF=BC=E5=85=A5=E5=AF=BC?= =?UTF-8?q?=E5=87=BAPackage=E7=9A=84=E5=91=BD=E4=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/db/object.go | 95 +++++ client/internal/db/package.go | 18 +- client/internal/db/user_space.go | 12 + client/internal/http/v1/package.go | 40 ++ client/internal/http/v1/pub_shards.go | 392 ++++++++++++++++++ client/internal/http/v1/server.go | 4 + client/internal/ticktock/change_redundancy.go | 32 +- client/sdk/api/v1/package.go | 21 + client/sdk/api/v1/pub_shards.go | 99 +++++ common/pkgs/publock/lockprovider/package.go | 129 ++++++ common/pkgs/publock/reqbuilder/package.go | 38 ++ common/pkgs/publock/service.go | 1 + common/pkgs/rpc/hub/hub.pb.go | 48 ++- common/pkgs/rpc/hub/hub.proto | 1 + common/pkgs/rpc/hub/hub_grpc.pb.go | 37 ++ common/pkgs/rpc/hub/pub_shards.go | 22 + common/types/client.go | 1 + common/types/coordinator.go | 2 +- common/types/object_pack.go | 17 + hub/internal/pubshards/pool.go | 8 +- hub/internal/pubshards/pub_shards.go | 92 +++- hub/internal/rpc/pub_shards.go | 21 + jcsctl/cmd/package/pin.go | 76 ++++ jcsctl/cmd/pubshards/export.go | 115 +++++ jcsctl/cmd/pubshards/import.go | 103 +++++ jcsctl/cmd/pubshards/ls.go | 51 +++ 26 files changed, 1424 insertions(+), 51 deletions(-) create mode 100644 common/pkgs/publock/lockprovider/package.go create mode 100644 common/pkgs/publock/reqbuilder/package.go create mode 100644 common/types/object_pack.go create mode 100644 jcsctl/cmd/package/pin.go create mode 100644 jcsctl/cmd/pubshards/export.go create mode 100644 jcsctl/cmd/pubshards/import.go create mode 100644 jcsctl/cmd/pubshards/ls.go diff --git a/client/internal/db/object.go b/client/internal/db/object.go index 2c7462c..2ab5a26 100644 --- a/client/internal/db/object.go +++ b/client/internal/db/object.go @@ -719,3 +719,98 @@ func (db *ObjectDB) DeleteCompleteByPath(ctx SQLContext, packageID jcstypes.Pack return db.BatchDeleteComplete(ctx, []jcstypes.ObjectID{obj.ObjectID}) } + +func (db *ObjectDB) BatchCreateByDetails(ctx SQLContext, pkgID jcstypes.PackageID, adds []jcstypes.ObjectDetail) ([]jcstypes.Object, error) { + if len(adds) == 0 { + return nil, nil + } + + // 收集所有路径 + pathes := make([]string, 0, len(adds)) + for _, add := range adds { + pathes = append(pathes, add.Object.Path) + } + + // 先查询要更新的对象,不存在也没关系 + existsObjs, err := db.BatchGetByPackagePath(ctx, pkgID, pathes) + if err != nil { + return nil, fmt.Errorf("batch get object by path: %w", err) + } + + existsObjsMap := make(map[string]jcstypes.Object) + for _, obj := range existsObjs { + existsObjsMap[obj.Path] = obj + } + + var updatingObjs []jcstypes.Object + var addingObjs []jcstypes.Object + for i := range adds { + o := adds[i].Object + o.ObjectID = 0 + o.PackageID = pkgID + + e, ok := existsObjsMap[adds[i].Object.Path] + if ok { + o.ObjectID = e.ObjectID + o.CreateTime = e.CreateTime + updatingObjs = append(updatingObjs, o) + + } else { + addingObjs = append(addingObjs, o) + } + } + + // 先进行更新 + err = db.BatchUpdate(ctx, updatingObjs) + if err != nil { + return nil, fmt.Errorf("batch update objects: %w", err) + } + + // 再执行插入,Create函数插入后会填充ObjectID + err = db.BatchCreate(ctx, &addingObjs) + if err != nil { + return nil, fmt.Errorf("batch create objects: %w", err) + } + + // 按照add参数的顺序返回结果 + affectedObjsMp := make(map[string]jcstypes.Object) + for _, o := range updatingObjs { + affectedObjsMp[o.Path] = o + } + for _, o := range addingObjs { + affectedObjsMp[o.Path] = o + } + affectedObjs := make([]jcstypes.Object, 0, len(affectedObjsMp)) + affectedObjIDs := make([]jcstypes.ObjectID, 0, len(affectedObjsMp)) + for i := range adds { + obj := affectedObjsMp[adds[i].Object.Path] + affectedObjs = append(affectedObjs, obj) + affectedObjIDs = append(affectedObjIDs, obj.ObjectID) + } + + if len(affectedObjIDs) > 0 { + // 批量删除 ObjectBlock + if err := db.ObjectBlock().BatchDeleteByObjectID(ctx, affectedObjIDs); err != nil { + return nil, fmt.Errorf("batch delete object blocks: %w", err) + } + + // 批量删除 PinnedObject + if err := db.PinnedObject().BatchDeleteByObjectID(ctx, affectedObjIDs); err != nil { + return nil, fmt.Errorf("batch delete pinned objects: %w", err) + } + } + + // 创建 ObjectBlock + objBlocks := make([]jcstypes.ObjectBlock, 0, len(adds)) + for i, add := range adds { + for i2 := range add.Blocks { + add.Blocks[i2].ObjectID = affectedObjIDs[i] + } + objBlocks = append(objBlocks, add.Blocks...) + } + if err := db.ObjectBlock().BatchCreate(ctx, objBlocks); err != nil { + return nil, fmt.Errorf("batch create object blocks: %w", err) + } + + return affectedObjs, nil +} diff --git a/client/internal/db/package.go b/client/internal/db/package.go index 7d21dca..529f805 100644 --- a/client/internal/db/package.go +++ b/client/internal/db/package.go @@ -44,7 +44,7 @@ func (db *PackageDB) GetDetail(ctx SQLContext, packageID jcstypes.PackageID) (jc err = ctx.Table("Object"). Select("COUNT(*) as ObjectCount, SUM(Size) as TotalSize"). Where("PackageID = ?", packageID). - First(&ret). + Scan(&ret). Error if err != nil { return jcstypes.PackageDetail{}, err @@ -57,6 +57,17 @@ func (db *PackageDB) GetDetail(ctx SQLContext, packageID jcstypes.PackageID) (jc }, nil } +func (db *PackageDB) BatchGetIDPaged(ctx SQLContext, lastPkgID jcstypes.PackageID, count int) ([]jcstypes.PackageID, error) { + var ret []jcstypes.PackageID + err := ctx.Table("Package"). + Select("PackageID"). + Where("PackageID > ?", lastPkgID). + Order("PackageID ASC"). + Limit(count). + Find(&ret).Error + return ret, err +} + func (db *PackageDB) BatchGetDetailPaged(ctx SQLContext, lastPkgID jcstypes.PackageID, count int) ([]jcstypes.PackageDetail, error) { var pkgs []jcstypes.Package err := ctx.Table("Package"). @@ -309,3 +320,8 @@ func (db *PackageDB) TryCreateAll(ctx SQLContext, bktName string, pkgName string return pkg, nil } + +func (db *PackageDB) SetPinned(ctx SQLContext, packageID jcstypes.PackageID, pinned bool) error { + err := ctx.Table("Package").Where("PackageID = ?", packageID).Update("Pinned", pinned).Error + return err +} diff --git a/client/internal/db/user_space.go b/client/internal/db/user_space.go index fbde451..bb2d93a 100644 --- a/client/internal/db/user_space.go +++ b/client/internal/db/user_space.go @@ -63,3 +63,15 @@ func (*UserSpaceDB) UpdateColumns(ctx SQLContext, space jcstypes.UserSpace, colu func (*UserSpaceDB) DeleteByID(ctx SQLContext, spaceID jcstypes.UserSpaceID) error { return ctx.Table("UserSpace").Delete(jcstypes.UserSpace{}, "UserSpaceID = ?", spaceID).Error } + +func (*UserSpaceDB) GetByPubShardsID(ctx SQLContext, pubShardsID jcstypes.PubShardsID) (jcstypes.UserSpace, error) { + var us jcstypes.UserSpace + err := ctx.Table("UserSpace").Where("Storage->'$.type' = 'PubShards' and Storage->'$.pubShardsID' = ?", pubShardsID).First(&us).Error + return us, err +} + +func (*UserSpaceDB) GetAllPubShards(ctx SQLContext) ([]jcstypes.UserSpace, error) { + var stgs []jcstypes.UserSpace + err := ctx.Table("UserSpace").Where("Storage->'$.type' = 'PubShards'").Find(&stgs).Error + return stgs, err +} diff --git a/client/internal/http/v1/package.go b/client/internal/http/v1/package.go index a704442..a05537d 100644 --- a/client/internal/http/v1/package.go +++ b/client/internal/http/v1/package.go @@ -16,11 +16,13 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/http2" "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types" cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" "gitlink.org.cn/cloudream/jcs-pub/common/ecode" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/iterator" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/reqbuilder" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" "gorm.io/gorm" ) @@ -387,3 +389,41 @@ func (s *PackageService) ListBucketPackages(ctx *gin.Context) { Packages: pkgs, })) } + +func (s *PackageService) Pin(ctx *gin.Context) { + log := logger.WithField("HTTP", "Package.Pin") + + var req cliapi.PackagePin + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "missing argument or invalid argument")) + return + } + + lock, err := reqbuilder.NewBuilder().Package().Pin(req.PackageID).MutexLock(s.svc.PubLock) + if err != nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "lock package: %v", err)) + return + } + defer lock.Unlock() + + d := s.svc.DB + err = d.DoTx(func(tx db.SQLContext) error { + _, err := d.Package().GetByID(tx, req.PackageID) + if err != nil { + return err + } + + return d.Package().SetPinned(tx, req.PackageID, req.Pin) + }) + if err != nil { + if err == gorm.ErrRecordNotFound { + ctx.JSON(http.StatusOK, types.Failed(ecode.DataNotFound, "package not found")) + } else { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "%v", err)) + } + return + } + + ctx.JSON(http.StatusOK, types.OK(cliapi.PackagePinResp{})) +} diff --git a/client/internal/http/v1/pub_shards.go b/client/internal/http/v1/pub_shards.go index 03518e0..638c899 100644 --- a/client/internal/http/v1/pub_shards.go +++ b/client/internal/http/v1/pub_shards.go @@ -1,17 +1,29 @@ package http import ( + "compress/gzip" "fmt" + "io" + "mime" + "mime/multipart" "net/http" + "net/url" "github.com/gin-gonic/gin" + "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types" cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" "gitlink.org.cn/cloudream/jcs-pub/common/ecode" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/reqbuilder" corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" + hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" + "gorm.io/gorm" ) type PubShardsService struct { @@ -110,3 +122,383 @@ func (s *PubShardsService) Join(ctx *gin.Context) { UserSpace: resp2.UserSpace, })) } + +func (s *PubShardsService) List(ctx *gin.Context) { + log := logger.WithField("HTTP", "PubShards.List") + + var req cliapi.PubShardsList + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "%v", err)) + return + } + + if stgglb.StandaloneMode { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "client is not online")) + return + } + + pubUss, err := s.svc.DB.UserSpace().GetAllPubShards(s.svc.DB.DefCtx()) + if err != nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "get pub shards: %v", err)) + return + } + + corCli := stgglb.CoordinatorRPCPool.Get() + defer corCli.Release() + + var pubs []jcstypes.PubShards + for _, us := range pubUss { + pubType := us.Storage.(*jcstypes.PubShardsType) + resp, cerr := corCli.UserGetPubShards(ctx.Request.Context(), &corrpc.UserGetPubShards{ + PubShardsID: pubType.PubShardsID, + Password: pubType.Password, + }) + if cerr != nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.ErrorCode(cerr.Code), cerr.Message)) + return + } + pubs = append(pubs, resp.PubShards) + } + + ctx.JSON(http.StatusOK, types.OK(cliapi.PubShardsListResp{ + PubShards: pubs, + UserSpaces: pubUss, + })) +} + +func (s *PubShardsService) Get(ctx *gin.Context) { + log := logger.WithField("HTTP", "PubShards.Get") + + var req cliapi.PubShardsGet + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "%v", err)) + return + } + + if stgglb.StandaloneMode { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "client is not online")) + return + } + + pubUss, err := s.svc.DB.UserSpace().GetAllPubShards(s.svc.DB.DefCtx()) + if err != nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "get pub shards: %v", err)) + return + } + + corCli := stgglb.CoordinatorRPCPool.Get() + defer corCli.Release() + + for i, us := range pubUss { + pubType := us.Storage.(*jcstypes.PubShardsType) + resp, cerr := corCli.UserGetPubShards(ctx.Request.Context(), &corrpc.UserGetPubShards{ + PubShardsID: pubType.PubShardsID, + Password: pubType.Password, + }) + if cerr != nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.ErrorCode(cerr.Code), cerr.Message)) + return + } + if resp.PubShards.Name == req.Name { + ctx.JSON(http.StatusOK, types.OK(cliapi.PubShardsGetResp{ + PubShards: resp.PubShards, + UserSpace: pubUss[i], + })) + return + } + } + + ctx.JSON(http.StatusOK, types.Failed(ecode.DataNotFound, "pub shards %v not found", req.Name)) +} + +func (s *PubShardsService) ExportPackage(ctx *gin.Context) { + log := logger.WithField("HTTP", "PubShards.ExportPackage") + + var req cliapi.PubShardsExportPackage + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "%v", err)) + return + } + + if stgglb.StandaloneMode { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "client is not online")) + return + } + + lock, err := reqbuilder.NewBuilder().Package().Pin(req.PackageID).MutexLock(s.svc.PubLock) + if err != nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "lock package: %v", err)) + return + } + defer lock.Unlock() + + pubs := make(map[jcstypes.UserSpaceID]jcstypes.PubShardsID) + for _, id := range req.AvailablePubShards { + us, err := s.svc.DB.UserSpace().GetByPubShardsID(s.svc.DB.DefCtx(), id) + if err != nil { + if err == gorm.ErrRecordNotFound { + ctx.JSON(http.StatusOK, types.Failed(ecode.DataNotFound, "pub shards %v not found", id)) + } else { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "get pub shards %v: %v", id, err)) + } + + return + } + pubs[us.UserSpaceID] = id + } + + pkg, err := s.svc.DB.Package().GetByID(s.svc.DB.DefCtx(), req.PackageID) + if err != nil { + if err == gorm.ErrRecordNotFound { + ctx.JSON(http.StatusOK, types.Failed(ecode.DataNotFound, "package %v not found", req.PackageID)) + } else { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "get package %v: %v", req.PackageID, err)) + } + return + } + + objs, err := db.DoTx11(s.svc.DB, s.svc.DB.Object().GetPackageObjectDetails, req.PackageID) + if err != nil { + if err == gorm.ErrRecordNotFound { + ctx.JSON(http.StatusOK, types.Failed(ecode.DataNotFound, "package %v not found", req.PackageID)) + } else { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "get package %v: %v", req.PackageID, err)) + } + return + } + + var pack jcstypes.PubShardsPackFile + for _, o := range objs { + po := jcstypes.PackObject{ + Object: o.Object, + } + + for _, b := range o.Blocks { + pub := pubs[b.UserSpaceID] + if pub != "" { + po.Blocks = append(po.Blocks, jcstypes.PackObjectBlock{ + Index: b.Index, + PubShardsID: pub, + FileHash: b.FileHash, + Size: b.Size, + }) + } + } + + pack.Objects = append(pack.Objects, po) + } + + zw := gzip.NewWriter(ctx.Writer) + defer zw.Close() + + ctx.Header("Content-Disposition", "attachment; filename="+url.PathEscape(pkg.Name)+".pack") + ctx.Header("Content-Type", "application/octet-stream") + ctx.Header("Content-Transfer-Encoding", "binary") + + ps := serder.ObjectToJSONStream(pack) + defer ps.Close() + + io.Copy(zw, ps) +} + +func (s *PubShardsService) ImportPackage(ctx *gin.Context) { + // log := logger.WithField("HTTP", "PubShards.ImportPackage") + + if stgglb.StandaloneMode { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "client is not online")) + return + } + + contType := ctx.GetHeader("Content-Type") + + mtype, params, err := mime.ParseMediaType(contType) + if err != nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "parse content-type: %v", err)) + return + } + if mtype != http2.ContentTypeMultiPart { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "content-type %v not supported", mtype)) + return + } + + boundary := params["boundary"] + if boundary == "" { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "missing boundary in content-type")) + return + } + + mr := multipart.NewReader(ctx.Request.Body, boundary) + p, err := mr.NextPart() + if err != nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "read info part: %v", err)) + return + } + + var info cliapi.PubShardsImportPackage + err = serder.JSONToObjectStream(p, &info) + if err != nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "parse info: %v", err)) + return + } + + if info.PackageID == 0 { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "missing packageID")) + return + } + + fr, err := mr.NextPart() + if err != nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "read file part: %v", err)) + return + } + + gr, err := gzip.NewReader(fr) + if err != nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "read gzip: %v", err)) + return + } + defer gr.Close() + + pack, err := serder.JSONToObjectStreamEx[jcstypes.PubShardsPackFile](gr) + if err != nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "parse pack: %v", err)) + return + } + + pubShardsUs, err := s.svc.DB.UserSpace().GetAllPubShards(s.svc.DB.DefCtx()) + if err != nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "get pub shards: %v", err)) + return + } + + uss := make(map[jcstypes.PubShardsID]jcstypes.UserSpace) + for _, us := range pubShardsUs { + pub, ok := us.Storage.(*jcstypes.PubShardsType) + if !ok { + continue + } + uss[pub.PubShardsID] = us + } + + var willAdds []jcstypes.ObjectDetail + var invalidObjs []jcstypes.Object + + // 只添加本客户端加入的PubShards中的块 + pubShardsRefs := make(map[jcstypes.PubShardsID][]jcstypes.FileHash) + for _, po := range pack.Objects { + blkIdx := make(map[int]bool) + var addBlks []jcstypes.ObjectBlock + for _, b := range po.Blocks { + u, ok := uss[b.PubShardsID] + if !ok { + continue + } + + pubShardsRefs[b.PubShardsID] = append(pubShardsRefs[b.PubShardsID], b.FileHash) + addBlks = append(addBlks, jcstypes.ObjectBlock{ + Index: b.Index, + UserSpaceID: u.UserSpaceID, + FileHash: b.FileHash, + Size: b.Size, + }) + blkIdx[b.Index] = true + } + + switch red := po.Object.Redundancy.(type) { + case *jcstypes.NoneRedundancy: + if len(addBlks) < 1 { + invalidObjs = append(invalidObjs, po.Object) + continue + } + + case *jcstypes.RepRedundancy: + if len(addBlks) < 1 { + invalidObjs = append(invalidObjs, po.Object) + continue + } + + case *jcstypes.ECRedundancy: + if len(blkIdx) < red.K { + invalidObjs = append(invalidObjs, po.Object) + continue + } + } + + willAdds = append(willAdds, jcstypes.ObjectDetail{ + Object: po.Object, + Blocks: addBlks, + }) + } + + // 在每一个PubShards创建FileHash引用,并且根据反馈的不存在的FileHash列表筛选导入列表 + for pub, refs := range pubShardsRefs { + us := uss[pub] + pubType := us.Storage.(*jcstypes.PubShardsType) + + hubCli := stgglb.HubRPCPool.GetByID(pubType.MasterHub) + resp, cerr := hubCli.PubShardsCreateRefs(ctx.Request.Context(), &hubrpc.PubShardsCreateRefs{ + PubShardsID: pubType.PubShardsID, + Password: pubType.Password, + FileHashes: refs, + }) + if cerr != nil { + hubCli.Release() + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "create pub shards refs at %v: %v", pubType.PubShardsID, cerr)) + return + } + + invalidHashes := make(map[jcstypes.FileHash]bool) + for _, h := range resp.InvalidFileHashes { + invalidHashes[h] = true + } + + for i := range willAdds { + willAdds[i].Blocks = lo.Reject(willAdds[i].Blocks, func(blk jcstypes.ObjectBlock, idx int) bool { + return blk.UserSpaceID == us.UserSpaceID && invalidHashes[blk.FileHash] + }) + } + hubCli.Release() + } + + // 再次检查每个对象是否拥有完整块 + willAdds = lo.Filter(willAdds, func(obj jcstypes.ObjectDetail, idx int) bool { + blkIdx := make(map[int]bool) + for _, blk := range obj.Blocks { + blkIdx[blk.Index] = true + } + + switch red := obj.Object.Redundancy.(type) { + case *jcstypes.NoneRedundancy: + if len(blkIdx) < 1 { + invalidObjs = append(invalidObjs, obj.Object) + return false + } + + case *jcstypes.RepRedundancy: + if len(blkIdx) < 1 { + invalidObjs = append(invalidObjs, obj.Object) + return false + } + + case *jcstypes.ECRedundancy: + if len(blkIdx) < red.K { + invalidObjs = append(invalidObjs, obj.Object) + return false + } + } + return true + }) + + _, err = db.DoTx21(s.svc.DB, s.svc.DB.Object().BatchCreateByDetails, info.PackageID, willAdds) + if err != nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "create objects: %v", err)) + return + } + + ctx.JSON(http.StatusOK, types.OK(cliapi.PubShardsImportPackageResp{ + InvalidObjects: invalidObjs, + })) +} diff --git a/client/internal/http/v1/server.go b/client/internal/http/v1/server.go index 2fdc349..5f2c81d 100644 --- a/client/internal/http/v1/server.go +++ b/client/internal/http/v1/server.go @@ -89,4 +89,8 @@ func (s *Server) InitRouters(rt gin.IRoutes, ah *auth.Auth) { rt.POST(cliapi.PubShardsCreatePath, certAuth, s.PubShards().Create) rt.POST(cliapi.PubShardsJoinPath, certAuth, s.PubShards().Join) + rt.GET(cliapi.PubShardsListPath, certAuth, s.PubShards().List) + rt.GET(cliapi.PubShardsGetPath, certAuth, s.PubShards().Get) + rt.GET(cliapi.PubShardsExportPackagePath, certAuth, s.PubShards().ExportPackage) + rt.POST(cliapi.PubShardsImportPackagePath, certAuth, s.PubShards().ImportPackage) } diff --git a/client/internal/ticktock/change_redundancy.go b/client/internal/ticktock/change_redundancy.go index 304510a..4600514 100644 --- a/client/internal/ticktock/change_redundancy.go +++ b/client/internal/ticktock/change_redundancy.go @@ -7,6 +7,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/reflect2" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/reqbuilder" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" "gitlink.org.cn/cloudream/jcs-pub/common/types/datamap" @@ -62,23 +63,42 @@ func (j *ChangeRedundancy) Execute(t *TickTock) { loop: for { - pkgs, err := db.DoTx21(t.db, t.db.Package().BatchGetDetailPaged, lastPkgID, BatchGetPackageDetailCount) + pkgIDs, err := db.DoTx21(t.db, t.db.Package().BatchGetIDPaged, lastPkgID, BatchGetPackageDetailCount) if err != nil { - log.Warnf("get package details: %v", err) + log.Warnf("get package ids: %v", err) return } - if len(pkgs) == 0 { + if len(pkgIDs) == 0 { break } - lastPkgID = pkgs[len(pkgs)-1].Package.PackageID + lastPkgID = pkgIDs[len(pkgIDs)-1] - for _, p := range pkgs { + for _, id := range pkgIDs { // 如果执行超过两个小时,则停止 if time.Since(startTime) > time.Hour*2 { break loop } - err := j.changeOne(ctx, p) + lock, err := reqbuilder.NewBuilder().Package().Buzy(id).MutexLock(t.pubLock, publock.WithTimeout(time.Second*10)) + if err != nil { + log.Warnf("lock package: %v", err) + continue + } + + detail, err := db.DoTx11(t.db, t.db.Package().GetDetail, id) + if err != nil { + log.Warnf("get package detail: %v", err) + lock.Unlock() + continue + } + if detail.Package.Pinned { + lock.Unlock() + continue + } + + err = j.changeOne(ctx, detail) + + lock.Unlock() if err != nil { log.Warnf("change redundancy: %v", err) return diff --git a/client/sdk/api/v1/package.go b/client/sdk/api/v1/package.go index fef593c..d0d76ff 100644 --- a/client/sdk/api/v1/package.go +++ b/client/sdk/api/v1/package.go @@ -270,3 +270,24 @@ func (r *PackageListBucketPackagesResp) ParseResponse(resp *http.Response) error func (c *PackageService) ListBucketPackages(req PackageListBucketPackages) (*PackageListBucketPackagesResp, error) { return JSONAPI(&c.cfg, c.httpCli, &req, &PackageListBucketPackagesResp{}) } + +const PackagePinPath = "/package/pin" + +type PackagePin struct { + PackageID jcstypes.PackageID `json:"packageID" binding:"required"` + Pin bool `json:"pin" binding:"required"` +} + +func (r *PackagePin) MakeParam() *sdks.RequestParam { + return sdks.MakeJSONParam(http.MethodPost, PackagePinPath, r) +} + +type PackagePinResp struct{} + +func (r *PackagePinResp) ParseResponse(resp *http.Response) error { + return sdks.ParseCodeDataJSONResponse(resp, r) +} + +func (c *PackageService) Pin(req PackagePin) (*PackagePinResp, error) { + return JSONAPI(&c.cfg, c.httpCli, &req, &PackagePinResp{}) +} diff --git a/client/sdk/api/v1/pub_shards.go b/client/sdk/api/v1/pub_shards.go index fec0edc..33e97b0 100644 --- a/client/sdk/api/v1/pub_shards.go +++ b/client/sdk/api/v1/pub_shards.go @@ -1,6 +1,7 @@ package api import ( + "io" "net/http" "gitlink.org.cn/cloudream/common/sdks" @@ -70,3 +71,101 @@ func (r *PubShardsJoinResp) ParseResponse(resp *http.Response) error { func (c *PubShardsService) Join(req PubShardsJoin) (*PubShardsJoinResp, error) { return JSONAPI(&c.cfg, c.httpCli, &req, &PubShardsJoinResp{}) } + +const PubShardsListPath = "/pubShards/list" + +type PubShardsList struct{} + +func (r *PubShardsList) MakeParam() *sdks.RequestParam { + return sdks.MakeQueryParam(http.MethodGet, PubShardsListPath, r) +} + +type PubShardsListResp struct { + PubShards []jcstypes.PubShards `json:"pubShards"` + UserSpaces []jcstypes.UserSpace `json:"userSpaces"` +} + +func (r *PubShardsListResp) ParseResponse(resp *http.Response) error { + return sdks.ParseCodeDataJSONResponse(resp, r) +} + +func (c *PubShardsService) List(req PubShardsList) (*PubShardsListResp, error) { + return JSONAPI(&c.cfg, c.httpCli, &req, &PubShardsListResp{}) +} + +const PubShardsGetPath = "/pubShards/get" + +type PubShardsGet struct { + Name string `form:"name" url:"name"` +} + +func (r *PubShardsGet) MakeParam() *sdks.RequestParam { + return sdks.MakeQueryParam(http.MethodGet, PubShardsGetPath, r) +} + +type PubShardsGetResp struct { + PubShards jcstypes.PubShards `json:"pubShards"` + UserSpace jcstypes.UserSpace `json:"userSpace"` +} + +func (r *PubShardsGetResp) ParseResponse(resp *http.Response) error { + return sdks.ParseCodeDataJSONResponse(resp, r) +} + +func (c *PubShardsService) Get(req PubShardsGet) (*PubShardsGetResp, error) { + return JSONAPI(&c.cfg, c.httpCli, &req, &PubShardsGetResp{}) +} + +const PubShardsExportPackagePath = "/pubShards/exportPackage" + +type PubShardsExportPackage struct { + PackageID jcstypes.PackageID `form:"packageID" url:"packageID" binding:"required"` + AvailablePubShards []jcstypes.PubShardsID `form:"availablePubShards" url:"availablePubShards"` +} + +func (r *PubShardsExportPackage) MakeParam() *sdks.RequestParam { + return sdks.MakeQueryParam(http.MethodGet, PubShardsExportPackagePath, r) +} + +type PubShardsExportPackageResp struct { + FileName string + PackFile io.ReadCloser +} + +func (r *PubShardsExportPackageResp) ParseResponse(resp *http.Response) error { + fileName, file, err := sdks.ParseFileResponse(resp) + if err != nil { + return err + } + + r.FileName = fileName + r.PackFile = file + return nil +} + +func (c *PubShardsService) ExportPackage(req PubShardsExportPackage) (*PubShardsExportPackageResp, error) { + return JSONAPI(&c.cfg, c.httpCli, &req, &PubShardsExportPackageResp{}) +} + +const PubShardsImportPackagePath = "/pubShards/importPackage" + +type PubShardsImportPackage struct { + PackageID jcstypes.PackageID `json:"packageID"` + PackFile io.ReadCloser `json:"-"` +} + +func (r *PubShardsImportPackage) MakeParam() *sdks.RequestParam { + return sdks.MakeMultipartParam(http.MethodPost, PubShardsImportPackagePath, r, r.PackFile) +} + +type PubShardsImportPackageResp struct { + InvalidObjects []jcstypes.Object `json:"invalidObjects"` +} + +func (r *PubShardsImportPackageResp) ParseResponse(resp *http.Response) error { + return sdks.ParseCodeDataJSONResponse(resp, r) +} + +func (c *PubShardsService) ImportPackage(req PubShardsImportPackage) (*PubShardsImportPackageResp, error) { + return JSONAPI(&c.cfg, c.httpCli, &req, &PubShardsImportPackageResp{}) +} diff --git a/common/pkgs/publock/lockprovider/package.go b/common/pkgs/publock/lockprovider/package.go new file mode 100644 index 0000000..6428501 --- /dev/null +++ b/common/pkgs/publock/lockprovider/package.go @@ -0,0 +1,129 @@ +package lockprovider + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/types" +) + +const ( + PackageLockPathPrefix = "Package" + PackageStorageIDPathIndex = 1 + PackageBuzyLock = "Buzy" + PackagePinLock = "Pin" +) + +type PackageLock struct { + stgLocks map[string]*PackageStorageLock + dummyLock *PackageStorageLock +} + +func NewPackageLock() *PackageLock { + return &PackageLock{ + stgLocks: make(map[string]*PackageStorageLock), + dummyLock: NewPackageStorageLock(), + } +} + +// CanLock 判断这个锁能否锁定成功 +func (l *PackageLock) CanLock(lock types.Lock) error { + nodeLock, ok := l.stgLocks[lock.Path[PackageStorageIDPathIndex]] + if !ok { + // 不能直接返回nil,因为如果锁数据的格式不对,也不能获取锁。 + // 这里使用一个空Provider来进行检查。 + return l.dummyLock.CanLock(lock) + } + + return nodeLock.CanLock(lock) +} + +// 锁定。在内部可以不用判断能否加锁,外部需要保证调用此函数前调用了CanLock进行检查 +func (l *PackageLock) Lock(reqID types.RequestID, lock types.Lock) error { + stgID := lock.Path[PackageStorageIDPathIndex] + + nodeLock, ok := l.stgLocks[stgID] + if !ok { + nodeLock = NewPackageStorageLock() + l.stgLocks[stgID] = nodeLock + } + + return nodeLock.Lock(reqID, lock) +} + +// 解锁 +func (l *PackageLock) Unlock(reqID types.RequestID, lock types.Lock) error { + stgID := lock.Path[PackageStorageIDPathIndex] + + nodeLock, ok := l.stgLocks[stgID] + if !ok { + return nil + } + + return nodeLock.Unlock(reqID, lock) +} + +// Clear 清除内部所有状态 +func (l *PackageLock) Clear() { + l.stgLocks = make(map[string]*PackageStorageLock) +} + +type PackageStorageLock struct { + buzyReqIDs []types.RequestID + pinReqIDs []types.RequestID + + lockCompatibilityTable *LockCompatibilityTable +} + +func NewPackageStorageLock() *PackageStorageLock { + compTable := &LockCompatibilityTable{} + + sdLock := PackageStorageLock{ + lockCompatibilityTable: compTable, + } + + compTable. + Column(PackageBuzyLock, func() bool { return len(sdLock.buzyReqIDs) > 0 }). + Column(PackagePinLock, func() bool { return len(sdLock.pinReqIDs) > 0 }) + + comp := LockCompatible() + uncp := LockUncompatible() + + compTable.MustRow(comp, uncp) + compTable.MustRow(uncp, comp) + + return &sdLock +} + +// CanLock 判断这个锁能否锁定成功 +func (l *PackageStorageLock) CanLock(lock types.Lock) error { + return l.lockCompatibilityTable.Test(lock) +} + +// 锁定 +func (l *PackageStorageLock) Lock(reqID types.RequestID, lock types.Lock) error { + switch lock.Name { + case PackageBuzyLock: + l.buzyReqIDs = append(l.buzyReqIDs, reqID) + case PackagePinLock: + l.pinReqIDs = append(l.pinReqIDs, reqID) + default: + return fmt.Errorf("unknow lock name: %s", lock.Name) + } + + return nil +} + +// 解锁 +func (l *PackageStorageLock) Unlock(reqID types.RequestID, lock types.Lock) error { + switch lock.Name { + case PackageBuzyLock: + l.buzyReqIDs = lo2.Remove(l.buzyReqIDs, reqID) + case PackagePinLock: + l.pinReqIDs = lo2.Remove(l.pinReqIDs, reqID) + default: + return fmt.Errorf("unknow lock name: %s", lock.Name) + } + + return nil +} diff --git a/common/pkgs/publock/reqbuilder/package.go b/common/pkgs/publock/reqbuilder/package.go new file mode 100644 index 0000000..9ee4681 --- /dev/null +++ b/common/pkgs/publock/reqbuilder/package.go @@ -0,0 +1,38 @@ +package reqbuilder + +import ( + "strconv" + + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/lockprovider" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/types" + jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" +) + +type PackageLockReqBuilder struct { + *LockRequestBuilder +} + +func (b *LockRequestBuilder) Package() *PackageLockReqBuilder { + return &PackageLockReqBuilder{LockRequestBuilder: b} +} +func (b *PackageLockReqBuilder) Buzy(pkgID jcstypes.PackageID) *PackageLockReqBuilder { + b.locks = append(b.locks, types.Lock{ + Path: b.makePath(pkgID), + Name: lockprovider.PackageBuzyLock, + Target: lockprovider.NewEmptyTarget(), + }) + return b +} + +func (b *PackageLockReqBuilder) Pin(pkgID jcstypes.PackageID) *PackageLockReqBuilder { + b.locks = append(b.locks, types.Lock{ + Path: b.makePath(pkgID), + Name: lockprovider.PackagePinLock, + Target: lockprovider.NewEmptyTarget(), + }) + return b +} + +func (b *PackageLockReqBuilder) makePath(pkgID jcstypes.PackageID) []string { + return []string{lockprovider.PackageLockPathPrefix, strconv.FormatInt(int64(pkgID), 10)} +} diff --git a/common/pkgs/publock/service.go b/common/pkgs/publock/service.go index af5cc74..df2fc39 100644 --- a/common/pkgs/publock/service.go +++ b/common/pkgs/publock/service.go @@ -39,6 +39,7 @@ func NewService() *Service { } svc.provdersTrie.Create([]any{lockprovider.UserSpaceLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewUserSpaceLock() + svc.provdersTrie.Create([]any{lockprovider.PackageLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewPackageLock() return svc } diff --git a/common/pkgs/rpc/hub/hub.pb.go b/common/pkgs/rpc/hub/hub.pb.go index df1bf3c..ef96cf0 100644 --- a/common/pkgs/rpc/hub/hub.pb.go +++ b/common/pkgs/rpc/hub/hub.pb.go @@ -26,7 +26,7 @@ var file_pkgs_rpc_hub_hub_proto_rawDesc = []byte{ 0x0a, 0x16, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x68, 0x75, 0x62, 0x2f, 0x68, 0x75, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x68, 0x75, 0x62, 0x72, 0x70, 0x63, 0x1a, 0x12, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x32, 0xde, 0x04, 0x0a, 0x03, 0x48, 0x75, 0x62, 0x12, 0x2c, 0x0a, 0x0d, + 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x92, 0x05, 0x0a, 0x03, 0x48, 0x75, 0x62, 0x12, 0x2c, 0x0a, 0x0d, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x49, 0x4f, 0x50, 0x6c, 0x61, 0x6e, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x31, 0x0a, 0x0c, 0x53, 0x65, @@ -64,11 +64,15 @@ var file_pkgs_rpc_hub_hub_proto_rawDesc = []byte{ 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x53, 0x68, 0x61, 0x72, 0x64, 0x73, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x6c, 0x69, 0x6e, 0x6b, - 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x63, 0x6e, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x72, 0x65, 0x61, - 0x6d, 0x2f, 0x6a, 0x63, 0x73, 0x2d, 0x70, 0x75, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, - 0x2f, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x68, 0x75, 0x62, 0x72, 0x70, 0x63, - 0x3b, 0x68, 0x75, 0x62, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x32, 0x0a, 0x13, 0x50, 0x75, 0x62, 0x53, 0x68, 0x61, 0x72, + 0x64, 0x73, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x52, 0x65, 0x66, 0x73, 0x12, 0x0c, 0x2e, 0x72, + 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, + 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, + 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x63, 0x6e, 0x2f, 0x63, 0x6c, 0x6f, 0x75, + 0x64, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73, 0x2d, 0x70, 0x75, 0x62, 0x2f, 0x63, 0x6f, + 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x68, 0x75, + 0x62, 0x72, 0x70, 0x63, 0x3b, 0x68, 0x75, 0x62, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var file_pkgs_rpc_hub_hub_proto_goTypes = []any{ @@ -90,21 +94,23 @@ var file_pkgs_rpc_hub_hub_proto_depIdxs = []int32{ 0, // 10: hubrpc.Hub.PubShardsListAll:input_type -> rpc.Request 0, // 11: hubrpc.Hub.PubShardsGC:input_type -> rpc.Request 0, // 12: hubrpc.Hub.PubShardsStats:input_type -> rpc.Request - 2, // 13: hubrpc.Hub.ExecuteIOPlan:output_type -> rpc.Response - 2, // 14: hubrpc.Hub.SendIOStream:output_type -> rpc.Response - 1, // 15: hubrpc.Hub.GetIOStream:output_type -> rpc.ChunkedData - 2, // 16: hubrpc.Hub.SendIOVar:output_type -> rpc.Response - 2, // 17: hubrpc.Hub.GetIOVar:output_type -> rpc.Response - 2, // 18: hubrpc.Hub.Ping:output_type -> rpc.Response - 2, // 19: hubrpc.Hub.GetState:output_type -> rpc.Response - 2, // 20: hubrpc.Hub.NotifyUserAccessTokenInvalid:output_type -> rpc.Response - 2, // 21: hubrpc.Hub.PubShardsStore:output_type -> rpc.Response - 2, // 22: hubrpc.Hub.PubShardsInfo:output_type -> rpc.Response - 2, // 23: hubrpc.Hub.PubShardsListAll:output_type -> rpc.Response - 2, // 24: hubrpc.Hub.PubShardsGC:output_type -> rpc.Response - 2, // 25: hubrpc.Hub.PubShardsStats:output_type -> rpc.Response - 13, // [13:26] is the sub-list for method output_type - 0, // [0:13] is the sub-list for method input_type + 0, // 13: hubrpc.Hub.PubShardsCreateRefs:input_type -> rpc.Request + 2, // 14: hubrpc.Hub.ExecuteIOPlan:output_type -> rpc.Response + 2, // 15: hubrpc.Hub.SendIOStream:output_type -> rpc.Response + 1, // 16: hubrpc.Hub.GetIOStream:output_type -> rpc.ChunkedData + 2, // 17: hubrpc.Hub.SendIOVar:output_type -> rpc.Response + 2, // 18: hubrpc.Hub.GetIOVar:output_type -> rpc.Response + 2, // 19: hubrpc.Hub.Ping:output_type -> rpc.Response + 2, // 20: hubrpc.Hub.GetState:output_type -> rpc.Response + 2, // 21: hubrpc.Hub.NotifyUserAccessTokenInvalid:output_type -> rpc.Response + 2, // 22: hubrpc.Hub.PubShardsStore:output_type -> rpc.Response + 2, // 23: hubrpc.Hub.PubShardsInfo:output_type -> rpc.Response + 2, // 24: hubrpc.Hub.PubShardsListAll:output_type -> rpc.Response + 2, // 25: hubrpc.Hub.PubShardsGC:output_type -> rpc.Response + 2, // 26: hubrpc.Hub.PubShardsStats:output_type -> rpc.Response + 2, // 27: hubrpc.Hub.PubShardsCreateRefs:output_type -> rpc.Response + 14, // [14:28] is the sub-list for method output_type + 0, // [0:14] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/common/pkgs/rpc/hub/hub.proto b/common/pkgs/rpc/hub/hub.proto index c773fb2..4b19ad8 100644 --- a/common/pkgs/rpc/hub/hub.proto +++ b/common/pkgs/rpc/hub/hub.proto @@ -24,4 +24,5 @@ service Hub { rpc PubShardsListAll(rpc.Request) returns(rpc.Response); rpc PubShardsGC(rpc.Request) returns(rpc.Response); rpc PubShardsStats(rpc.Request) returns(rpc.Response); + rpc PubShardsCreateRefs(rpc.Request) returns(rpc.Response); } \ No newline at end of file diff --git a/common/pkgs/rpc/hub/hub_grpc.pb.go b/common/pkgs/rpc/hub/hub_grpc.pb.go index 8d4264a..8da731f 100644 --- a/common/pkgs/rpc/hub/hub_grpc.pb.go +++ b/common/pkgs/rpc/hub/hub_grpc.pb.go @@ -33,6 +33,7 @@ const ( Hub_PubShardsListAll_FullMethodName = "/hubrpc.Hub/PubShardsListAll" Hub_PubShardsGC_FullMethodName = "/hubrpc.Hub/PubShardsGC" Hub_PubShardsStats_FullMethodName = "/hubrpc.Hub/PubShardsStats" + Hub_PubShardsCreateRefs_FullMethodName = "/hubrpc.Hub/PubShardsCreateRefs" ) // HubClient is the client API for Hub service. @@ -52,6 +53,7 @@ type HubClient interface { PubShardsListAll(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) PubShardsGC(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) PubShardsStats(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) + PubShardsCreateRefs(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) } type hubClient struct { @@ -227,6 +229,15 @@ func (c *hubClient) PubShardsStats(ctx context.Context, in *rpc.Request, opts .. return out, nil } +func (c *hubClient) PubShardsCreateRefs(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { + out := new(rpc.Response) + err := c.cc.Invoke(ctx, Hub_PubShardsCreateRefs_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // HubServer is the server API for Hub service. // All implementations must embed UnimplementedHubServer // for forward compatibility @@ -244,6 +255,7 @@ type HubServer interface { PubShardsListAll(context.Context, *rpc.Request) (*rpc.Response, error) PubShardsGC(context.Context, *rpc.Request) (*rpc.Response, error) PubShardsStats(context.Context, *rpc.Request) (*rpc.Response, error) + PubShardsCreateRefs(context.Context, *rpc.Request) (*rpc.Response, error) mustEmbedUnimplementedHubServer() } @@ -290,6 +302,9 @@ func (UnimplementedHubServer) PubShardsGC(context.Context, *rpc.Request) (*rpc.R func (UnimplementedHubServer) PubShardsStats(context.Context, *rpc.Request) (*rpc.Response, error) { return nil, status.Errorf(codes.Unimplemented, "method PubShardsStats not implemented") } +func (UnimplementedHubServer) PubShardsCreateRefs(context.Context, *rpc.Request) (*rpc.Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method PubShardsCreateRefs not implemented") +} func (UnimplementedHubServer) mustEmbedUnimplementedHubServer() {} // UnsafeHubServer may be embedded to opt out of forward compatibility for this service. @@ -548,6 +563,24 @@ func _Hub_PubShardsStats_Handler(srv interface{}, ctx context.Context, dec func( return interceptor(ctx, in, info, handler) } +func _Hub_PubShardsCreateRefs_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(rpc.Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HubServer).PubShardsCreateRefs(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Hub_PubShardsCreateRefs_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HubServer).PubShardsCreateRefs(ctx, req.(*rpc.Request)) + } + return interceptor(ctx, in, info, handler) +} + // Hub_ServiceDesc is the grpc.ServiceDesc for Hub service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -599,6 +632,10 @@ var Hub_ServiceDesc = grpc.ServiceDesc{ MethodName: "PubShardsStats", Handler: _Hub_PubShardsStats_Handler, }, + { + MethodName: "PubShardsCreateRefs", + Handler: _Hub_PubShardsCreateRefs_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/common/pkgs/rpc/hub/pub_shards.go b/common/pkgs/rpc/hub/pub_shards.go index 1bdc408..0d6dc35 100644 --- a/common/pkgs/rpc/hub/pub_shards.go +++ b/common/pkgs/rpc/hub/pub_shards.go @@ -14,6 +14,7 @@ type PubShardsService interface { PubShardsGC(ctx context.Context, msg *PubShardsGC) (*PubShardsGCResp, *rpc.CodeError) PubShardsListAll(ctx context.Context, msg *PubShardsListAll) (*PubShardsListAllResp, *rpc.CodeError) PubShardsStats(ctx context.Context, msg *PubShardsStats) (*PubShardsStatsResp, *rpc.CodeError) + PubShardsCreateRefs(ctx context.Context, msg *PubShardsCreateRefs) (*PubShardsCreateRefsResp, *rpc.CodeError) } // 将一个分片纳入公共分片存储 @@ -125,3 +126,24 @@ func (c *Client) PubShardsStats(ctx context.Context, msg *PubShardsStats) (*PubS func (s *Server) PubShardsStats(ctx context.Context, msg *rpc.Request) (*rpc.Response, error) { return rpc.UnaryServer(s.svrImpl.PubShardsStats, ctx, msg) } + +type PubShardsCreateRefs struct { + PubShardsID jcstypes.PubShardsID + Password string + FileHashes []jcstypes.FileHash +} +type PubShardsCreateRefsResp struct { + InvalidFileHashes []jcstypes.FileHash +} + +var _ = TokenAuth(Hub_PubShardsCreateRefs_FullMethodName) + +func (c *Client) PubShardsCreateRefs(ctx context.Context, msg *PubShardsCreateRefs) (*PubShardsCreateRefsResp, *rpc.CodeError) { + if c.fusedErr != nil { + return nil, c.fusedErr + } + return rpc.UnaryClient[*PubShardsCreateRefsResp](c.cli.PubShardsCreateRefs, ctx, msg) +} +func (s *Server) PubShardsCreateRefs(ctx context.Context, msg *rpc.Request) (*rpc.Response, error) { + return rpc.UnaryServer(s.svrImpl.PubShardsCreateRefs, ctx, msg) +} diff --git a/common/types/client.go b/common/types/client.go index 649ab4f..bdece9a 100644 --- a/common/types/client.go +++ b/common/types/client.go @@ -33,6 +33,7 @@ func (Bucket) TableName() string { type Package struct { PackageID PackageID `gorm:"column:PackageID; primaryKey; type:bigint; autoIncrement" json:"packageID"` Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"` + Pinned bool `gorm:"column:Pinned; type:boolean; not null" json:"pinned"` BucketID BucketID `gorm:"column:BucketID; type:bigint; not null" json:"bucketID"` CreateTime time.Time `gorm:"column:CreateTime; type:datetime; not null" json:"createTime"` } diff --git a/common/types/coordinator.go b/common/types/coordinator.go index 01dae03..2d21871 100644 --- a/common/types/coordinator.go +++ b/common/types/coordinator.go @@ -142,7 +142,7 @@ type PubShards struct { } func (PubShards) TableName() string { - return "PublicShardStore" + return "PubShards" } func (p *PubShards) String() string { diff --git a/common/types/object_pack.go b/common/types/object_pack.go new file mode 100644 index 0000000..dc0bc41 --- /dev/null +++ b/common/types/object_pack.go @@ -0,0 +1,17 @@ +package jcstypes + +type PubShardsPackFile struct { + Objects []PackObject `json:"objects"` +} + +type PackObject struct { + Object Object `json:"object"` + Blocks []PackObjectBlock `json:"blocks"` +} + +type PackObjectBlock struct { + Index int `json:"index"` + PubShardsID PubShardsID `json:"pubShardsID"` + FileHash FileHash `json:"fileHash"` + Size int64 `json:"size"` +} diff --git a/hub/internal/pubshards/pool.go b/hub/internal/pubshards/pool.go index 5a967e1..82c962b 100644 --- a/hub/internal/pubshards/pool.go +++ b/hub/internal/pubshards/pool.go @@ -96,9 +96,13 @@ func (p *Pool) GetOrLoad(pubStoreID jcstypes.PubShardsID, password string) (*Loa if err != nil { return nil, err } - err = db.AutoMigrate(FileEntry{}) + err = db.AutoMigrate(Shard{}) if err != nil { - return nil, err + return nil, fmt.Errorf("migrate Shard: %w", err) + } + err = db.AutoMigrate(UserRef{}) + if err != nil { + return nil, fmt.Errorf("migrate UserRef: %w", err) } ss.Start(p.stgEventChan) diff --git a/hub/internal/pubshards/pub_shards.go b/hub/internal/pubshards/pub_shards.go index 2aeb68e..e6eff8a 100644 --- a/hub/internal/pubshards/pub_shards.go +++ b/hub/internal/pubshards/pub_shards.go @@ -22,12 +22,25 @@ func (s *LoadedStore) StoreShard(userID jcstypes.UserID, path jcstypes.JPath, ha return stgtypes.FileInfo{}, err } - err = s.ClientFileHashDB.Clauses(clause.Insert{Modifier: "or ignore"}).Create(&FileEntry{ - UserID: userID, - Path: info.Path, - Hash: hash, - Size: size, - }).Error + err = s.ClientFileHashDB.Transaction(func(tx *gorm.DB) error { + err = tx.Clauses(clause.Insert{Modifier: "or ignore"}).Create(&Shard{ + Path: info.Path, + Hash: hash, + Size: size, + }).Error + if err != nil { + return err + } + + err = tx.Clauses(clause.Insert{Modifier: "or ignore"}).Create(&UserRef{ + UserID: userID, + Hash: hash, + }).Error + if err != nil { + return err + } + return nil + }) if err != nil { return stgtypes.FileInfo{}, err } @@ -40,8 +53,8 @@ func (s *LoadedStore) InfoShard(hash jcstypes.FileHash) (stgtypes.FileInfo, erro } func (s *LoadedStore) ListUserAll(userID jcstypes.UserID) ([]stgtypes.FileInfo, error) { - var files []FileEntry - err := s.ClientFileHashDB.Table("Files").Where("UserID = ?", userID).Find(&files).Error + var files []Shard + err := s.ClientFileHashDB.Table("UserRef").Select("Shard.*").Joins("join Shard on UserRef.Hash = Shard.Hash").Where("UserRef.UserID = ?", userID).Find(&files).Error if err != nil { return nil, err } @@ -59,20 +72,21 @@ func (s *LoadedStore) ListUserAll(userID jcstypes.UserID) ([]stgtypes.FileInfo, } func (s *LoadedStore) GC(userID jcstypes.UserID, fileHashes []jcstypes.FileHash) error { + // 从总体上看,GC是在ListUserAll之后调用的,FileHashes的内容只会小于已有的内容,所以创建Ref时可以不检查Hash是否存在 return s.ClientFileHashDB.Transaction(func(tx *gorm.DB) error { - if err := tx.Delete(&FileEntry{}, "UserID = ?", userID).Error; err != nil { - return fmt.Errorf("delete all hashes: %w", err) + if err := tx.Delete(&UserRef{}, "UserID = ?", userID).Error; err != nil { + return fmt.Errorf("delete all user refs: %w", err) } - hashes := make([]FileEntry, len(fileHashes)) + refs := make([]UserRef, len(fileHashes)) for i, hash := range fileHashes { - hashes[i] = FileEntry{ + refs[i] = UserRef{ UserID: userID, Hash: hash, } } - return tx.Clauses(clause.Insert{Modifier: "or ignore"}).Create(&hashes).Error + return tx.Clauses(clause.Insert{Modifier: "or ignore"}).Create(&refs).Error }) } @@ -81,18 +95,56 @@ func (s *LoadedStore) GetUserStats(userID jcstypes.UserID) stgtypes.Stats { return stgtypes.Stats{} } +func (s *LoadedStore) CreateRefs(userID jcstypes.UserID, refs []jcstypes.FileHash) ([]jcstypes.FileHash, error) { + var invalidHashes []jcstypes.FileHash + err := s.ClientFileHashDB.Transaction(func(tx *gorm.DB) error { + var avaiHashes []jcstypes.FileHash + err := tx.Table("Shard").Select("Hash").Where("Hash IN ?", refs).Find(&avaiHashes).Error + if err != nil { + return fmt.Errorf("check avaiable hashes: %w", err) + } + + var avaiRefs []UserRef + avaiHashesMp := make(map[jcstypes.FileHash]bool) + for _, hash := range avaiHashes { + avaiHashesMp[hash] = true + avaiRefs = append(avaiRefs, UserRef{ + UserID: userID, + Hash: hash, + }) + } + + for _, hash := range refs { + if _, ok := avaiHashesMp[hash]; !ok { + invalidHashes = append(invalidHashes, hash) + } + } + + return tx.Clauses(clause.Insert{Modifier: "or ignore"}).Create(&avaiRefs).Error + }) + return invalidHashes, err +} + func (s *LoadedStore) GetAllHashes() ([]jcstypes.FileHash, error) { var hashes []jcstypes.FileHash - return hashes, s.ClientFileHashDB.Distinct("FileHash").Find(&hashes).Error + return hashes, s.ClientFileHashDB.Table("Shard").Select("Hash").Find(&hashes).Error +} + +type Shard struct { + Hash jcstypes.FileHash `gorm:"column:Hash; type:char(68); primaryKey; not null; index" json:"hash"` + Path jcstypes.JPath `gorm:"column:Path; type:varchar(1024); not null; serializer:string" json:"path"` + Size int64 `gorm:"column:Size; type:bigint; not null" json:"size"` +} + +func (t Shard) TableName() string { + return "Shard" } -type FileEntry struct { +type UserRef struct { UserID jcstypes.UserID `gorm:"column:UserID; type:bigint; primaryKey; not null" json:"userID"` - Hash jcstypes.FileHash `gorm:"column:Hash; type:char(68); primaryKey; not null" json:"hash"` - Path jcstypes.JPath `gorm:"column:Path; type:varchar(1024); not null; serializer:string" json:"path"` - Size int64 `gorm:"column:Size; type:bigint; not null" json:"size"` + Hash jcstypes.FileHash `gorm:"column:Hash; type:char(68); primaryKey; not null; index" json:"hash"` } -func (t FileEntry) TableName() string { - return "Files" +func (t UserRef) TableName() string { + return "UserRef" } diff --git a/hub/internal/rpc/pub_shards.go b/hub/internal/rpc/pub_shards.go index 362991c..5971161 100644 --- a/hub/internal/rpc/pub_shards.go +++ b/hub/internal/rpc/pub_shards.go @@ -102,3 +102,24 @@ func (svc *Service) PubShardsStats(ctx context.Context, msg *hubrpc.PubShardsSta Stats: stats, }, nil } + +func (svc *Service) PubShardsCreateRefs(ctx context.Context, msg *hubrpc.PubShardsCreateRefs) (*hubrpc.PubShardsCreateRefsResp, *rpc.CodeError) { + authInfo, ok := rpc.GetAuthInfo(ctx) + if !ok { + return nil, rpc.Failed(ecode.Unauthorized, "unauthorized") + } + + pubShards, err := svc.pubShards.GetOrLoad(msg.PubShardsID, msg.Password) + if err != nil { + return nil, rpc.Failed(ecode.OperationFailed, "load pub shards store: %v", err) + } + + invalids, err := pubShards.CreateRefs(authInfo.UserID, msg.FileHashes) + if err != nil { + return nil, rpc.Failed(ecode.OperationFailed, "create refs: %v", err) + } + + return &hubrpc.PubShardsCreateRefsResp{ + InvalidFileHashes: invalids, + }, nil +} diff --git a/jcsctl/cmd/package/pin.go b/jcsctl/cmd/package/pin.go new file mode 100644 index 0000000..adeccf2 --- /dev/null +++ b/jcsctl/cmd/package/pin.go @@ -0,0 +1,76 @@ +package pkg + +import ( + "fmt" + "strconv" + "strings" + + "github.com/spf13/cobra" + cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" + jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" + "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd" +) + +func init() { + var opt pinOpt + cd := cobra.Command{ + Use: "pin /", + Args: cobra.ExactArgs(1), + RunE: func(c *cobra.Command, args []string) error { + ctx := cmd.GetCmdCtx(c) + return pin(c, ctx, opt, args, true) + }, + } + cd.Flags().BoolVar(&opt.UseID, "id", false, "treat first argument as package ID") + PackageCmd.AddCommand(&cd) + + var unpinOpt pinOpt + cd = cobra.Command{ + Use: "unpin /", + Args: cobra.ExactArgs(1), + RunE: func(c *cobra.Command, args []string) error { + ctx := cmd.GetCmdCtx(c) + return pin(c, ctx, unpinOpt, args, false) + }, + } + cd.Flags().BoolVar(&unpinOpt.UseID, "id", false, "treat first argument as package ID") + PackageCmd.AddCommand(&cd) +} + +type pinOpt struct { + UseID bool +} + +func pin(c *cobra.Command, ctx *cmd.CommandContext, opt pinOpt, args []string, pin bool) error { + var pkgID jcstypes.PackageID + if opt.UseID { + id, err := strconv.ParseInt(args[0], 10, 64) + if err != nil { + return fmt.Errorf("invalid package ID: %v", args[0]) + } + pkgID = jcstypes.PackageID(id) + } else { + comps := strings.Split(args[0], "/") + if len(comps) != 2 { + return fmt.Errorf("invalid package name: %v", args[0]) + } + + getPkg, err := ctx.Client.Package().GetByFullName(cliapi.PackageGetByFullName{ + BucketName: comps[0], + PackageName: comps[1], + }) + if err != nil { + return fmt.Errorf("get package by name %v: %v", args[0], err) + } + pkgID = getPkg.Package.PackageID + } + + if _, err := ctx.Client.Package().Pin(cliapi.PackagePin{ + PackageID: pkgID, + Pin: false, + }); err != nil { + return fmt.Errorf("unpin package %v: %v", pkgID, err) + } + + return nil +} diff --git a/jcsctl/cmd/pubshards/export.go b/jcsctl/cmd/pubshards/export.go new file mode 100644 index 0000000..9207290 --- /dev/null +++ b/jcsctl/cmd/pubshards/export.go @@ -0,0 +1,115 @@ +package pubshards + +import ( + "fmt" + "io" + "os" + "strconv" + "strings" + + "github.com/spf13/cobra" + cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" + jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" + "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd" +) + +func init() { + var opt exportOption + c := &cobra.Command{ + Use: "export ", + Short: "export package object metadata as a file", + Args: cobra.ExactArgs(1), + RunE: func(c *cobra.Command, args []string) error { + ctx := cmd.GetCmdCtx(c) + return export(c, ctx, opt, args) + }, + } + PubShardsCmd.AddCommand(c) + c.Flags().BoolVarP(&opt.UseID, "id", "i", false, "treat first argumnet as package ID instead of package path") + c.Flags().StringArrayVarP(&opt.PubShardsNames, "pubshards", "p", nil, "pub shards name") + c.Flags().StringVarP(&opt.OutputPath, "output", "o", "", "output file path") +} + +type exportOption struct { + UseID bool + PubShardsNames []string + OutputPath string +} + +func export(c *cobra.Command, ctx *cmd.CommandContext, opt exportOption, args []string) error { + if len(opt.PubShardsNames) == 0 { + return fmt.Errorf("you must specify at least one userspace name") + } + + var pkgID jcstypes.PackageID + + if opt.UseID { + id, err := strconv.ParseInt(args[0], 10, 64) + if err != nil { + return fmt.Errorf("invalid package ID: %s", args[0]) + } + pkgID = jcstypes.PackageID(id) + + } else { + comps := strings.Split(args[0], "/") + if len(comps) != 2 { + return fmt.Errorf("invalid package path: %s", args[0]) + } + + resp, err := ctx.Client.Package().GetByFullName(cliapi.PackageGetByFullName{ + BucketName: comps[0], + PackageName: comps[1], + }) + if err != nil { + return fmt.Errorf("get package by full name: %w", err) + } + pkgID = resp.Package.PackageID + } + + _, err := ctx.Client.Package().Pin(cliapi.PackagePin{ + PackageID: pkgID, + Pin: true, + }) + if err != nil { + return fmt.Errorf("pin package: %w", err) + } + + var pubIDs []jcstypes.PubShardsID + for _, name := range opt.PubShardsNames { + resp, err := ctx.Client.PubShards().Get(cliapi.PubShardsGet{ + Name: name, + }) + if err != nil { + return fmt.Errorf("get user space %v by name: %w", name, err) + } + + pubIDs = append(pubIDs, resp.PubShards.PubShardsID) + } + + resp, err := ctx.Client.PubShards().ExportPackage(cliapi.PubShardsExportPackage{ + PackageID: pkgID, + AvailablePubShards: pubIDs, + }) + if err != nil { + return fmt.Errorf("export package: %w", err) + } + + outputPath := opt.OutputPath + if outputPath == "" { + outputPath = resp.FileName + } + + outputFile, err := os.Create(outputPath) + if err != nil { + return fmt.Errorf("create output file: %w", err) + } + defer outputFile.Close() + + _, err = io.Copy(outputFile, resp.PackFile) + if err != nil { + return fmt.Errorf("write output file: %w", err) + } + + fmt.Printf("Package %v exported to %v, which also set pinned to true\n", pkgID, outputPath) + return nil +} diff --git a/jcsctl/cmd/pubshards/import.go b/jcsctl/cmd/pubshards/import.go new file mode 100644 index 0000000..2744d56 --- /dev/null +++ b/jcsctl/cmd/pubshards/import.go @@ -0,0 +1,103 @@ +package pubshards + +import ( + "fmt" + "os" + "strconv" + "strings" + + "github.com/spf13/cobra" + "gitlink.org.cn/cloudream/common/sdks" + cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" + "gitlink.org.cn/cloudream/jcs-pub/common/ecode" + jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" + "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd" +) + +func init() { + var opt import2Option + c := &cobra.Command{ + Use: "import ", + Short: "import package object metadata from local file", + Args: cobra.ExactArgs(2), + RunE: func(c *cobra.Command, args []string) error { + ctx := cmd.GetCmdCtx(c) + return import2(c, ctx, opt, args) + }, + } + PubShardsCmd.AddCommand(c) + c.Flags().BoolVarP(&opt.UseID, "id", "i", false, "treat first argumnet as package ID instead of package path") + c.Flags().BoolVar(&opt.Create, "create", false, "create package if not exists") +} + +type import2Option struct { + UseID bool + Create bool +} + +func import2(c *cobra.Command, ctx *cmd.CommandContext, opt import2Option, args []string) error { + var pkgID jcstypes.PackageID + if opt.UseID { + id, err := strconv.ParseInt(args[1], 10, 64) + if err != nil { + return fmt.Errorf("invalid package ID %v: %w", args[1], err) + } + pkgID = jcstypes.PackageID(id) + } else { + comps := strings.Split(args[1], "/") + + resp, err := ctx.Client.Package().GetByFullName(cliapi.PackageGetByFullName{ + BucketName: comps[0], + PackageName: comps[1], + }) + if err != nil { + if !sdks.IsErrorCode(err, string(ecode.DataNotFound)) { + return err + } + + if !opt.Create { + return fmt.Errorf("package not found") + } + + bkt, err := ctx.Client.Bucket().GetByName(cliapi.BucketGetByName{ + Name: comps[0], + }) + if err != nil { + return fmt.Errorf("get bucket %v: %w", comps[0], err) + } + + cpkg, err := ctx.Client.Package().Create(cliapi.PackageCreate{ + BucketID: bkt.Bucket.BucketID, + Name: comps[1], + }) + if err != nil { + return fmt.Errorf("create package %v: %w", args[1], err) + } + pkgID = cpkg.Package.PackageID + } else { + pkgID = resp.Package.PackageID + } + } + + file, err := os.Open(args[0]) + if err != nil { + return fmt.Errorf("open file %v: %w", args[0], err) + } + defer file.Close() + + resp, err := ctx.Client.PubShards().ImportPackage(cliapi.PubShardsImportPackage{ + PackageID: pkgID, + PackFile: file, + }) + if err != nil { + return fmt.Errorf("import package: %w", err) + } + if len(resp.InvalidObjects) > 0 { + fmt.Printf("below objects are invalid and will not be imported:\n") + for _, obj := range resp.InvalidObjects { + fmt.Printf("%v\n", obj.Path) + } + } + + return nil +} diff --git a/jcsctl/cmd/pubshards/ls.go b/jcsctl/cmd/pubshards/ls.go new file mode 100644 index 0000000..09ed206 --- /dev/null +++ b/jcsctl/cmd/pubshards/ls.go @@ -0,0 +1,51 @@ +package pubshards + +import ( + "fmt" + + "github.com/jedib0t/go-pretty/v6/table" + "github.com/spf13/cobra" + cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" + "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd" +) + +func init() { + var opt lsOption + c := &cobra.Command{ + Use: "ls", + Short: "list all pub shards", + Args: cobra.NoArgs, + RunE: func(c *cobra.Command, args []string) error { + ctx := cmd.GetCmdCtx(c) + return ls(c, ctx, opt, args) + }, + } + PubShardsCmd.AddCommand(c) + c.Flags().BoolVarP(&opt.Long, "long", "l", false, "show more details") +} + +type lsOption struct { + Long bool +} + +func ls(c *cobra.Command, ctx *cmd.CommandContext, opt lsOption, args []string) error { + resp, err := ctx.Client.PubShards().List(cliapi.PubShardsList{}) + if err != nil { + return err + } + + if !opt.Long { + for _, p := range resp.PubShards { + fmt.Printf("%v\n", p.Name) + } + return nil + } + + tb := table.NewWriter() + tb.AppendHeader(table.Row{"PubShards ID", "PubShards Name", "UserSpace Name"}) + for i := range resp.PubShards { + tb.AppendRow(table.Row{resp.PubShards[i].PubShardsID, resp.PubShards[i].Name, resp.UserSpaces[i].Name}) + } + fmt.Println(tb.Render()) + return nil +}