From c8cb9c79caac50ec3ad1dcc704ff978858f79716 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 9 Apr 2025 10:16:21 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E9=A1=B9=E7=9B=AE=E9=87=8C?= =?UTF-8?q?=E7=9A=84=E5=BC=95=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/db/bucket.go | 21 -- .../internal/downloader/lrc_strip_iterator.go | 8 +- client/internal/http/bucket.go | 27 +- client/internal/http/server.go | 130 +++++----- client/internal/mount/mount.go | 8 +- client/internal/mount/mount_win.go | 8 +- client/internal/services/bucket.go | 4 +- client/internal/services/storage.go | 12 +- client/internal/services/utils.go | 12 +- client/sdk/api/bucket.go | 26 +- client/sdk/api/package.go | 42 +-- client/sdk/api/presigned.go | 44 ++-- client/sdk/api/storage_test.go | 4 +- client/sdk/api/userspace.go | 24 +- .../distlock/reqbuilder/metadata_object.go | 4 +- .../pkgs/distlock/reqbuilder/shard_store.go | 8 +- common/pkgs/distlock/reqbuilder/storage.go | 8 +- common/pkgs/ioswitch2/ops2/faas.go | 62 ----- common/pkgs/ioswitchlrc/ops2/ec.go | 10 +- common/pkgs/ioswitchlrc/parser/generator.go | 10 +- common/pkgs/mq/hub/client.go | 14 +- common/pkgs/mq/hub/server.go | 4 +- hub/internal/config/config.go | 4 +- hub/internal/http/hub_io.go | 14 +- hub/internal/http/server.go | 12 +- hub/sdk/api/client.go | 51 ++++ hub/sdk/api/config.go | 5 + hub/sdk/api/hub_io.go | 244 ++++++++++++++++++ hub/sdk/api/utils.go | 176 +++++++++++++ 29 files changed, 695 insertions(+), 301 deletions(-) delete mode 100644 common/pkgs/ioswitch2/ops2/faas.go create mode 100644 hub/sdk/api/client.go create mode 100644 hub/sdk/api/config.go create mode 100644 hub/sdk/api/hub_io.go create mode 100644 hub/sdk/api/utils.go diff --git a/client/internal/db/bucket.go b/client/internal/db/bucket.go index e159887..ba115f5 100644 --- a/client/internal/db/bucket.go +++ b/client/internal/db/bucket.go @@ -28,33 +28,12 @@ func (db *BucketDB) GetByName(ctx SQLContext, bucketName string) (types.Bucket, return ret, err } -// GetIDByName 根据BucketName查询BucketID -func (db *BucketDB) GetIDByName(ctx SQLContext, bucketName string) (int64, error) { - var result struct { - BucketID int64 `gorm:"column:BucketID"` - BucketName string `gorm:"column:BucketName"` - } - - err := ctx.Table("Bucket").Select("BucketID, BucketName").Where("BucketName = ?", bucketName).Scan(&result).Error - if err != nil { - return 0, err - } - - return result.BucketID, nil -} - func (*BucketDB) GetAll(ctx SQLContext) ([]types.Bucket, error) { var ret []types.Bucket err := ctx.Table("Bucket").Find(&ret).Error return ret, err } -func (*BucketDB) GetUserBuckets(ctx SQLContext) ([]types.Bucket, error) { - var ret []types.Bucket - err := ctx.Table("Bucket").Find(&ret).Error - return ret, err -} - func (db *BucketDB) Create(ctx SQLContext, bucketName string, createTime time.Time) (types.Bucket, error) { var bucketID int64 err := ctx.Table("Bucket"). diff --git a/client/internal/downloader/lrc_strip_iterator.go b/client/internal/downloader/lrc_strip_iterator.go index 7822ed0..56f33d3 100644 --- a/client/internal/downloader/lrc_strip_iterator.go +++ b/client/internal/downloader/lrc_strip_iterator.go @@ -9,16 +9,16 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/math2" - cdssdk "gitlink.org.cn/cloudream/storage2/client/types" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc/parser" ) type LRCStripIterator struct { downloder *Downloader - object cdssdk.Object + object clitypes.Object blocks []downloadBlock - red cdssdk.LRCRedundancy + red clitypes.LRCRedundancy curStripIndex int64 cache *StripCache dataChan chan dataChanEntry @@ -27,7 +27,7 @@ type LRCStripIterator struct { inited bool } -func NewLRCStripIterator(downloder *Downloader, object cdssdk.Object, blocks []downloadBlock, red cdssdk.LRCRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *LRCStripIterator { +func NewLRCStripIterator(downloder *Downloader, object clitypes.Object, blocks []downloadBlock, red clitypes.LRCRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *LRCStripIterator { if maxPrefetch <= 0 { maxPrefetch = 1 } diff --git a/client/internal/http/bucket.go b/client/internal/http/bucket.go index ad8c80f..6ce343f 100644 --- a/client/internal/http/bucket.go +++ b/client/internal/http/bucket.go @@ -1,13 +1,14 @@ package http import ( + "fmt" "net/http" "time" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdsapi "gitlink.org.cn/cloudream/storage2/client/sdk/api" + cliapi "gitlink.org.cn/cloudream/storage2/client/sdk/api" ) type BucketService struct { @@ -23,7 +24,7 @@ func (s *Server) Bucket() *BucketService { func (s *BucketService) GetByName(ctx *gin.Context) { log := logger.WithField("HTTP", "Bucket.GetByName") - var req cdsapi.BucketGetByName + var req cliapi.BucketGetByName if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -37,7 +38,7 @@ func (s *BucketService) GetByName(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.BucketGetByNameResp{ + ctx.JSON(http.StatusOK, OK(cliapi.BucketGetByNameResp{ Bucket: bucket, })) } @@ -45,7 +46,7 @@ func (s *BucketService) GetByName(ctx *gin.Context) { func (s *BucketService) Create(ctx *gin.Context) { log := logger.WithField("HTTP", "Bucket.Create") - var req cdsapi.BucketCreate + var req cliapi.BucketCreate if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -59,7 +60,7 @@ func (s *BucketService) Create(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.BucketCreateResp{ + ctx.JSON(http.StatusOK, OK(cliapi.BucketCreateResp{ Bucket: bucket, })) } @@ -67,7 +68,7 @@ func (s *BucketService) Create(ctx *gin.Context) { func (s *BucketService) Delete(ctx *gin.Context) { log := logger.WithField("HTTP", "Bucket.Delete") - var req cdsapi.BucketDelete + var req cliapi.BucketDelete if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -83,24 +84,24 @@ func (s *BucketService) Delete(ctx *gin.Context) { ctx.JSON(http.StatusOK, OK(nil)) } -func (s *BucketService) ListUserBuckets(ctx *gin.Context) { - log := logger.WithField("HTTP", "Bucket.ListUserBuckets") +func (s *BucketService) ListAll(ctx *gin.Context) { + log := logger.WithField("HTTP", "Bucket.ListAll") - var req cdsapi.BucketListUserBucketsReq + var req cliapi.BucketListAll if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - buckets, err := s.svc.BucketSvc().GetUserBuckets() + buckets, err := s.svc.BucketSvc().ListAllBuckets() if err != nil { - log.Warnf("getting user buckets: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get user buckets failed")) + log.Warnf("list all buckets: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("list all buckets: %v", err))) return } - ctx.JSON(http.StatusOK, OK(cdsapi.BucketListUserBucketsResp{ + ctx.JSON(http.StatusOK, OK(cliapi.BucketListAllResp{ Buckets: buckets, })) } diff --git a/client/internal/http/server.go b/client/internal/http/server.go index bce2664..52d45d5 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -4,7 +4,7 @@ import ( "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage2/client/internal/services" - cdsapi "gitlink.org.cn/cloudream/storage2/client/sdk/api" + cliapi "gitlink.org.cn/cloudream/storage2/client/sdk/api" ) type Server struct { @@ -47,86 +47,86 @@ func (s *Server) initRouters() { s.routeV1(s.engine, rt) - rt.GET(cdsapi.ObjectListPathByPath, s.Object().ListByPath) - rt.GET(cdsapi.ObjectListByIDsPath, s.Object().ListByIDs) - rt.GET(cdsapi.ObjectDownloadPath, s.Object().Download) - rt.GET(cdsapi.ObjectDownloadByPathPath, s.Object().DownloadByPath) - rt.POST(cdsapi.ObjectUploadPath, s.Object().Upload) - rt.GET(cdsapi.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects) - rt.POST(cdsapi.ObjectUpdateInfoPath, s.Object().UpdateInfo) - rt.POST(cdsapi.ObjectUpdateInfoByPathPath, s.Object().UpdateInfoByPath) - rt.POST(cdsapi.ObjectMovePath, s.Object().Move) - rt.POST(cdsapi.ObjectDeletePath, s.Object().Delete) - rt.POST(cdsapi.ObjectDeleteByPathPath, s.Object().DeleteByPath) - rt.POST(cdsapi.ObjectClonePath, s.Object().Clone) - - rt.GET(cdsapi.PackageGetPath, s.Package().Get) - rt.GET(cdsapi.PackageGetByFullNamePath, s.Package().GetByFullName) - rt.POST(cdsapi.PackageCreatePath, s.Package().Create) - rt.POST(cdsapi.PackageCreateLoadPath, s.Package().CreateLoad) - rt.POST(cdsapi.PackageDeletePath, s.Package().Delete) - rt.POST(cdsapi.PackageClonePath, s.Package().Clone) - rt.GET(cdsapi.PackageListBucketPackagesPath, s.Package().ListBucketPackages) + rt.GET(cliapi.ObjectListPathByPath, s.Object().ListByPath) + rt.GET(cliapi.ObjectListByIDsPath, s.Object().ListByIDs) + rt.GET(cliapi.ObjectDownloadPath, s.Object().Download) + rt.GET(cliapi.ObjectDownloadByPathPath, s.Object().DownloadByPath) + rt.POST(cliapi.ObjectUploadPath, s.Object().Upload) + rt.GET(cliapi.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects) + rt.POST(cliapi.ObjectUpdateInfoPath, s.Object().UpdateInfo) + rt.POST(cliapi.ObjectUpdateInfoByPathPath, s.Object().UpdateInfoByPath) + rt.POST(cliapi.ObjectMovePath, s.Object().Move) + rt.POST(cliapi.ObjectDeletePath, s.Object().Delete) + rt.POST(cliapi.ObjectDeleteByPathPath, s.Object().DeleteByPath) + rt.POST(cliapi.ObjectClonePath, s.Object().Clone) + + rt.GET(cliapi.PackageGetPath, s.Package().Get) + rt.GET(cliapi.PackageGetByFullNamePath, s.Package().GetByFullName) + rt.POST(cliapi.PackageCreatePath, s.Package().Create) + rt.POST(cliapi.PackageCreateLoadPath, s.Package().CreateLoad) + rt.POST(cliapi.PackageDeletePath, s.Package().Delete) + rt.POST(cliapi.PackageClonePath, s.Package().Clone) + rt.GET(cliapi.PackageListBucketPackagesPath, s.Package().ListBucketPackages) // rt.GET(cdsapi.PackageGetCachedStoragesPath, s.Package().GetCachedStorages) - rt.POST(cdsapi.UserSpaceLoadPackagePath, s.UserSpace().LoadPackage) - rt.POST(cdsapi.UserSpaceCreatePackagePath, s.UserSpace().CreatePackage) - rt.GET(cdsapi.UserSpaceGetPath, s.UserSpace().Get) + rt.POST(cliapi.UserSpaceLoadPackagePath, s.UserSpace().LoadPackage) + rt.POST(cliapi.UserSpaceCreatePackagePath, s.UserSpace().CreatePackage) + rt.GET(cliapi.UserSpaceGetPath, s.UserSpace().Get) // rt.POST(cdsapi.CacheMovePackagePath, s.Cache().MovePackage) - rt.GET(cdsapi.BucketGetByNamePath, s.Bucket().GetByName) - rt.POST(cdsapi.BucketCreatePath, s.Bucket().Create) - rt.POST(cdsapi.BucketDeletePath, s.Bucket().Delete) - rt.GET(cdsapi.BucketListUserBucketsPath, s.Bucket().ListUserBuckets) + rt.GET(cliapi.BucketGetByNamePath, s.Bucket().GetByName) + rt.POST(cliapi.BucketCreatePath, s.Bucket().Create) + rt.POST(cliapi.BucketDeletePath, s.Bucket().Delete) + rt.GET(cliapi.BucketListAllPath, s.Bucket().ListAll) } func (s *Server) routeV1(eg *gin.Engine, rt gin.IRoutes) { v1 := eg.Group("/v1") - v1.GET(cdsapi.ObjectListPathByPath, s.awsAuth.Auth, s.Object().ListByPath) - v1.GET(cdsapi.ObjectListByIDsPath, s.awsAuth.Auth, s.Object().ListByIDs) - v1.GET(cdsapi.ObjectDownloadPath, s.awsAuth.Auth, s.Object().Download) - v1.GET(cdsapi.ObjectDownloadByPathPath, s.awsAuth.Auth, s.Object().DownloadByPath) - v1.POST(cdsapi.ObjectUploadPath, s.awsAuth.AuthWithoutBody, s.Object().Upload) - v1.GET(cdsapi.ObjectGetPackageObjectsPath, s.awsAuth.Auth, s.Object().GetPackageObjects) - v1.POST(cdsapi.ObjectUpdateInfoPath, s.awsAuth.Auth, s.Object().UpdateInfo) - v1.POST(cdsapi.ObjectUpdateInfoByPathPath, s.awsAuth.Auth, s.Object().UpdateInfoByPath) - v1.POST(cdsapi.ObjectMovePath, s.awsAuth.Auth, s.Object().Move) - v1.POST(cdsapi.ObjectDeletePath, s.awsAuth.Auth, s.Object().Delete) - v1.POST(cdsapi.ObjectDeleteByPathPath, s.awsAuth.Auth, s.Object().DeleteByPath) - v1.POST(cdsapi.ObjectClonePath, s.awsAuth.Auth, s.Object().Clone) - - v1.GET(cdsapi.PackageGetPath, s.awsAuth.Auth, s.Package().Get) - v1.GET(cdsapi.PackageGetByFullNamePath, s.awsAuth.Auth, s.Package().GetByFullName) - v1.POST(cdsapi.PackageCreatePath, s.awsAuth.Auth, s.Package().Create) - v1.POST(cdsapi.PackageCreateLoadPath, s.awsAuth.Auth, s.Package().CreateLoad) - v1.POST(cdsapi.PackageDeletePath, s.awsAuth.Auth, s.Package().Delete) - v1.POST(cdsapi.PackageClonePath, s.awsAuth.Auth, s.Package().Clone) - v1.GET(cdsapi.PackageListBucketPackagesPath, s.awsAuth.Auth, s.Package().ListBucketPackages) + v1.GET(cliapi.ObjectListPathByPath, s.awsAuth.Auth, s.Object().ListByPath) + v1.GET(cliapi.ObjectListByIDsPath, s.awsAuth.Auth, s.Object().ListByIDs) + v1.GET(cliapi.ObjectDownloadPath, s.awsAuth.Auth, s.Object().Download) + v1.GET(cliapi.ObjectDownloadByPathPath, s.awsAuth.Auth, s.Object().DownloadByPath) + v1.POST(cliapi.ObjectUploadPath, s.awsAuth.AuthWithoutBody, s.Object().Upload) + v1.GET(cliapi.ObjectGetPackageObjectsPath, s.awsAuth.Auth, s.Object().GetPackageObjects) + v1.POST(cliapi.ObjectUpdateInfoPath, s.awsAuth.Auth, s.Object().UpdateInfo) + v1.POST(cliapi.ObjectUpdateInfoByPathPath, s.awsAuth.Auth, s.Object().UpdateInfoByPath) + v1.POST(cliapi.ObjectMovePath, s.awsAuth.Auth, s.Object().Move) + v1.POST(cliapi.ObjectDeletePath, s.awsAuth.Auth, s.Object().Delete) + v1.POST(cliapi.ObjectDeleteByPathPath, s.awsAuth.Auth, s.Object().DeleteByPath) + v1.POST(cliapi.ObjectClonePath, s.awsAuth.Auth, s.Object().Clone) + + v1.GET(cliapi.PackageGetPath, s.awsAuth.Auth, s.Package().Get) + v1.GET(cliapi.PackageGetByFullNamePath, s.awsAuth.Auth, s.Package().GetByFullName) + v1.POST(cliapi.PackageCreatePath, s.awsAuth.Auth, s.Package().Create) + v1.POST(cliapi.PackageCreateLoadPath, s.awsAuth.Auth, s.Package().CreateLoad) + v1.POST(cliapi.PackageDeletePath, s.awsAuth.Auth, s.Package().Delete) + v1.POST(cliapi.PackageClonePath, s.awsAuth.Auth, s.Package().Clone) + v1.GET(cliapi.PackageListBucketPackagesPath, s.awsAuth.Auth, s.Package().ListBucketPackages) // v1.GET(cdsapi.PackageGetCachedStoragesPath, s.awsAuth.Auth, s.Package().GetCachedStorages) - v1.POST(cdsapi.UserSpaceLoadPackagePath, s.awsAuth.Auth, s.UserSpace().LoadPackage) - v1.POST(cdsapi.UserSpaceCreatePackagePath, s.awsAuth.Auth, s.UserSpace().CreatePackage) - v1.GET(cdsapi.UserSpaceGetPath, s.awsAuth.Auth, s.UserSpace().Get) + v1.POST(cliapi.UserSpaceLoadPackagePath, s.awsAuth.Auth, s.UserSpace().LoadPackage) + v1.POST(cliapi.UserSpaceCreatePackagePath, s.awsAuth.Auth, s.UserSpace().CreatePackage) + v1.GET(cliapi.UserSpaceGetPath, s.awsAuth.Auth, s.UserSpace().Get) // v1.POST(cdsapi.CacheMovePackagePath, s.awsAuth.Auth, s.Cache().MovePackage) - v1.GET(cdsapi.BucketGetByNamePath, s.awsAuth.Auth, s.Bucket().GetByName) - v1.POST(cdsapi.BucketCreatePath, s.awsAuth.Auth, s.Bucket().Create) - v1.POST(cdsapi.BucketDeletePath, s.awsAuth.Auth, s.Bucket().Delete) - v1.GET(cdsapi.BucketListUserBucketsPath, s.awsAuth.Auth, s.Bucket().ListUserBuckets) + v1.GET(cliapi.BucketGetByNamePath, s.awsAuth.Auth, s.Bucket().GetByName) + v1.POST(cliapi.BucketCreatePath, s.awsAuth.Auth, s.Bucket().Create) + v1.POST(cliapi.BucketDeletePath, s.awsAuth.Auth, s.Bucket().Delete) + v1.GET(cliapi.BucketListAllPath, s.awsAuth.Auth, s.Bucket().ListAll) - rt.POST(cdsapi.ObjectNewMultipartUploadPath, s.Object().NewMultipartUpload) - rt.POST(cdsapi.ObjectUploadPartPath, s.Object().UploadPart) - rt.POST(cdsapi.ObjectCompleteMultipartUploadPath, s.Object().CompleteMultipartUpload) + rt.POST(cliapi.ObjectNewMultipartUploadPath, s.Object().NewMultipartUpload) + rt.POST(cliapi.ObjectUploadPartPath, s.Object().UploadPart) + rt.POST(cliapi.ObjectCompleteMultipartUploadPath, s.Object().CompleteMultipartUpload) - rt.GET(cdsapi.PresignedObjectListByPathPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectListByPath) - rt.GET(cdsapi.PresignedObjectDownloadByPathPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectDownloadByPath) - rt.GET(cdsapi.PresignedObjectDownloadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectDownload) - rt.POST(cdsapi.PresignedObjectUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectUpload) + rt.GET(cliapi.PresignedObjectListByPathPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectListByPath) + rt.GET(cliapi.PresignedObjectDownloadByPathPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectDownloadByPath) + rt.GET(cliapi.PresignedObjectDownloadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectDownload) + rt.POST(cliapi.PresignedObjectUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectUpload) - rt.POST(cdsapi.PresignedObjectNewMultipartUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectNewMultipartUpload) - rt.POST(cdsapi.PresignedObjectUploadPartPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectUploadPart) - rt.POST(cdsapi.PresignedObjectCompleteMultipartUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectCompleteMultipartUpload) + rt.POST(cliapi.PresignedObjectNewMultipartUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectNewMultipartUpload) + rt.POST(cliapi.PresignedObjectUploadPartPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectUploadPart) + rt.POST(cliapi.PresignedObjectCompleteMultipartUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectCompleteMultipartUpload) } diff --git a/client/internal/mount/mount.go b/client/internal/mount/mount.go index 938f617..f4b73e6 100644 --- a/client/internal/mount/mount.go +++ b/client/internal/mount/mount.go @@ -5,7 +5,6 @@ package mount import ( fusefs "github.com/hanwen/go-fuse/v2/fs" "github.com/hanwen/go-fuse/v2/fuse" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/sync2" db2 "gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/internal/downloader" @@ -13,6 +12,7 @@ import ( fuse2 "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" "gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs" "gitlink.org.cn/cloudream/storage2/client/internal/uploader" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) type MountEvent interface { @@ -71,14 +71,14 @@ func (m *Mount) Start() *sync2.UnboundChannel[MountEvent] { return ch } -func (m *Mount) NotifyObjectInvalid(obj cdssdk.Object) { +func (m *Mount) NotifyObjectInvalid(obj clitypes.Object) { } -func (m *Mount) NotifyPackageInvalid(pkg cdssdk.Package) { +func (m *Mount) NotifyPackageInvalid(pkg clitypes.Package) { } -func (m *Mount) NotifyBucketInvalid(bkt cdssdk.Bucket) { +func (m *Mount) NotifyBucketInvalid(bkt clitypes.Bucket) { } diff --git a/client/internal/mount/mount_win.go b/client/internal/mount/mount_win.go index 7e3a782..9305697 100644 --- a/client/internal/mount/mount_win.go +++ b/client/internal/mount/mount_win.go @@ -5,12 +5,12 @@ package mount import ( "fmt" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/sync2" "gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/internal/downloader" "gitlink.org.cn/cloudream/storage2/client/internal/mount/config" "gitlink.org.cn/cloudream/storage2/client/internal/uploader" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) type MountEvent interface { @@ -41,14 +41,14 @@ func (m *Mount) Start() *sync2.UnboundChannel[MountEvent] { return ch } -func (m *Mount) NotifyObjectInvalid(obj cdssdk.Object) { +func (m *Mount) NotifyObjectInvalid(obj clitypes.Object) { } -func (m *Mount) NotifyPackageInvalid(pkg cdssdk.Package) { +func (m *Mount) NotifyPackageInvalid(pkg clitypes.Package) { } -func (m *Mount) NotifyBucketInvalid(bkt cdssdk.Bucket) { +func (m *Mount) NotifyBucketInvalid(bkt clitypes.Bucket) { } diff --git a/client/internal/services/bucket.go b/client/internal/services/bucket.go index 6e40a55..6c797b3 100644 --- a/client/internal/services/bucket.go +++ b/client/internal/services/bucket.go @@ -28,8 +28,8 @@ func (svc *BucketService) GetBucketByName(bucketName string) (types.Bucket, erro return svc.DB.Bucket().GetByName(svc.DB.DefCtx(), bucketName) } -func (svc *BucketService) GetUserBuckets() ([]types.Bucket, error) { - return svc.DB.Bucket().GetUserBuckets(svc.DB.DefCtx()) +func (svc *BucketService) ListAllBuckets() ([]types.Bucket, error) { + return svc.DB.Bucket().GetAll(svc.DB.DefCtx()) } // GetBucketPackages 获取指定用户和桶的所有包 diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index 5094b31..2b5741e 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -6,7 +6,7 @@ import ( "path" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - cdssdk "gitlink.org.cn/cloudream/storage2/client/types" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" @@ -24,7 +24,7 @@ func (svc *Service) UserSpaceSvc() *UserSpaceService { return &UserSpaceService{Service: svc} } -func (svc *UserSpaceService) Get(userspaceID cdssdk.UserSpaceID) (types.UserSpace, error) { +func (svc *UserSpaceService) Get(userspaceID clitypes.UserSpaceID) (types.UserSpace, error) { return svc.DB.UserSpace().GetByID(svc.DB.DefCtx(), userspaceID) } @@ -32,7 +32,7 @@ func (svc *UserSpaceService) GetByName(name string) (types.UserSpace, error) { return svc.DB.UserSpace().GetByName(svc.DB.DefCtx(), name) } -func (svc *UserSpaceService) LoadPackage(packageID cdssdk.PackageID, userspaceID cdssdk.UserSpaceID, rootPath string) error { +func (svc *UserSpaceService) LoadPackage(packageID clitypes.PackageID, userspaceID clitypes.UserSpaceID, rootPath string) error { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return fmt.Errorf("new coordinator client: %w", err) @@ -52,7 +52,7 @@ func (svc *UserSpaceService) LoadPackage(packageID cdssdk.PackageID, userspaceID return err } - var pinned []cdssdk.ObjectID + var pinned []clitypes.ObjectID plans := exec.NewPlanBuilder() for _, obj := range details { strg, err := svc.StrategySelector.Select(strategy.Request{ @@ -117,7 +117,7 @@ func (svc *UserSpaceService) LoadPackage(packageID cdssdk.PackageID, userspaceID } // 请求节点启动从UserSpace中上传文件的任务。会返回节点ID和任务ID -func (svc *UserSpaceService) UserSpaceCreatePackage(bucketID cdssdk.BucketID, name string, userspaceID cdssdk.UserSpaceID, path string, userspaceAffinity cdssdk.UserSpaceID) (cdssdk.Package, error) { +func (svc *UserSpaceService) UserSpaceCreatePackage(bucketID clitypes.BucketID, name string, userspaceID clitypes.UserSpaceID, path string, userspaceAffinity clitypes.UserSpaceID) (clitypes.Package, error) { // coorCli, err := stgglb.CoordinatorMQPool.Acquire() // if err != nil { // return cdssdk.Package{}, fmt.Errorf("new coordinator client: %w", err) @@ -152,5 +152,5 @@ func (svc *UserSpaceService) UserSpaceCreatePackage(bucketID cdssdk.BucketID, na // return createResp.Package, nil // TODO 待实现 - return cdssdk.Package{}, fmt.Errorf("not implemented") + return clitypes.Package{}, fmt.Errorf("not implemented") } diff --git a/client/internal/services/utils.go b/client/internal/services/utils.go index 68468e5..ad7a619 100644 --- a/client/internal/services/utils.go +++ b/client/internal/services/utils.go @@ -1,22 +1,22 @@ package services import ( - cdssdk "gitlink.org.cn/cloudream/storage2/client/types" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/models/datamap" ) -func getBlockTypeFromRed(red cdssdk.Redundancy) string { +func getBlockTypeFromRed(red clitypes.Redundancy) string { switch red.(type) { - case *cdssdk.NoneRedundancy: + case *clitypes.NoneRedundancy: return datamap.BlockTypeRaw - case *cdssdk.ECRedundancy: + case *clitypes.ECRedundancy: return datamap.BlockTypeEC - case *cdssdk.LRCRedundancy: + case *clitypes.LRCRedundancy: return datamap.BlockTypeEC - case *cdssdk.SegmentRedundancy: + case *clitypes.SegmentRedundancy: return datamap.BlockTypeSegment } return "" diff --git a/client/sdk/api/bucket.go b/client/sdk/api/bucket.go index cb60671..8f34668 100644 --- a/client/sdk/api/bucket.go +++ b/client/sdk/api/bucket.go @@ -4,7 +4,7 @@ import ( "net/http" "gitlink.org.cn/cloudream/common/sdks" - cdssdk "gitlink.org.cn/cloudream/storage2/client/types" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) type BucketService struct { @@ -26,7 +26,7 @@ func (r *BucketGetByName) MakeParam() *sdks.RequestParam { } type BucketGetByNameResp struct { - Bucket cdssdk.Bucket `json:"bucket"` + Bucket clitypes.Bucket `json:"bucket"` } func (r *BucketGetByNameResp) ParseResponse(resp *http.Response) error { @@ -48,7 +48,7 @@ func (r *BucketCreate) MakeParam() *sdks.RequestParam { } type BucketCreateResp struct { - Bucket cdssdk.Bucket `json:"bucket"` + Bucket clitypes.Bucket `json:"bucket"` } func (r *BucketCreateResp) ParseResponse(resp *http.Response) error { @@ -62,7 +62,7 @@ func (c *BucketService) Create(req BucketCreate) (*BucketCreateResp, error) { const BucketDeletePath = "/bucket/delete" type BucketDelete struct { - BucketID cdssdk.BucketID `json:"bucketID" binding:"required"` + BucketID clitypes.BucketID `json:"bucketID" binding:"required"` } func (r *BucketDelete) MakeParam() *sdks.RequestParam { @@ -79,23 +79,23 @@ func (c *BucketService) Delete(req BucketDelete) error { return JSONAPINoData(c.cfg, http.DefaultClient, &req) } -const BucketListUserBucketsPath = "/bucket/listUserBuckets" +const BucketListAllPath = "/bucket/listAll" -type BucketListUserBucketsReq struct { +type BucketListAll struct { } -func (r *BucketListUserBucketsReq) MakeParam() *sdks.RequestParam { - return sdks.MakeQueryParam(http.MethodGet, BucketListUserBucketsPath, r) +func (r *BucketListAll) MakeParam() *sdks.RequestParam { + return sdks.MakeQueryParam(http.MethodGet, BucketListAllPath, r) } -type BucketListUserBucketsResp struct { - Buckets []cdssdk.Bucket `json:"buckets"` +type BucketListAllResp struct { + Buckets []clitypes.Bucket `json:"buckets"` } -func (r *BucketListUserBucketsResp) ParseResponse(resp *http.Response) error { +func (r *BucketListAllResp) ParseResponse(resp *http.Response) error { return sdks.ParseCodeDataJSONResponse(resp, r) } -func (c *BucketService) ListUserBuckets(req BucketListUserBucketsReq) (*BucketListUserBucketsResp, error) { - return JSONAPI(c.cfg, http.DefaultClient, &req, &BucketListUserBucketsResp{}) +func (c *BucketService) ListAll(req BucketListAll) (*BucketListAllResp, error) { + return JSONAPI(c.cfg, http.DefaultClient, &req, &BucketListAllResp{}) } diff --git a/client/sdk/api/package.go b/client/sdk/api/package.go index 7c5e6e5..f17e174 100644 --- a/client/sdk/api/package.go +++ b/client/sdk/api/package.go @@ -10,7 +10,7 @@ import ( "gitlink.org.cn/cloudream/common/sdks" "gitlink.org.cn/cloudream/common/utils/http2" "gitlink.org.cn/cloudream/common/utils/serder" - cdssdk "gitlink.org.cn/cloudream/storage2/client/types" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) type PackageService struct { @@ -24,7 +24,7 @@ func (c *Client) Package() *PackageService { const PackageGetPath = "/package/get" type PackageGetReq struct { - PackageID cdssdk.PackageID `form:"packageID" url:"packageID" binding:"required"` + PackageID clitypes.PackageID `form:"packageID" url:"packageID" binding:"required"` } func (r *PackageGetReq) MakeParam() *sdks.RequestParam { @@ -32,7 +32,7 @@ func (r *PackageGetReq) MakeParam() *sdks.RequestParam { } type PackageGetResp struct { - cdssdk.Package + clitypes.Package } func (r *PackageGetResp) ParseResponse(resp *http.Response) error { @@ -55,7 +55,7 @@ func (r *PackageGetByFullName) MakeParam() *sdks.RequestParam { } type PackageGetByFullNameResp struct { - Package cdssdk.Package `json:"package"` + Package clitypes.Package `json:"package"` } func (r *PackageGetByFullNameResp) ParseResponse(resp *http.Response) error { @@ -69,8 +69,8 @@ func (c *PackageService) GetByName(req PackageGetByFullName) (*PackageGetByFullN const PackageCreatePath = "/package/create" type PackageCreate struct { - BucketID cdssdk.BucketID `json:"bucketID"` - Name string `json:"name"` + BucketID clitypes.BucketID `json:"bucketID"` + Name string `json:"name"` } func (r *PackageCreate) MakeParam() *sdks.RequestParam { @@ -78,7 +78,7 @@ func (r *PackageCreate) MakeParam() *sdks.RequestParam { } type PackageCreateResp struct { - Package cdssdk.Package `json:"package"` + Package clitypes.Package `json:"package"` } func (r *PackageCreateResp) ParseResponse(resp *http.Response) error { @@ -96,14 +96,14 @@ type PackageCreateLoad struct { Files UploadObjectIterator `json:"-"` } type PackageCreateLoadInfo struct { - BucketID cdssdk.BucketID `json:"bucketID" binding:"required"` - Name string `json:"name" binding:"required"` - LoadTo []cdssdk.UserSpaceID `json:"loadTo"` - LoadToPath []string `json:"loadToPath"` + BucketID clitypes.BucketID `json:"bucketID" binding:"required"` + Name string `json:"name" binding:"required"` + LoadTo []clitypes.UserSpaceID `json:"loadTo"` + LoadToPath []string `json:"loadToPath"` } type PackageCreateLoadResp struct { - Package cdssdk.Package `json:"package"` - Objects []cdssdk.Object `json:"objects"` + Package clitypes.Package `json:"package"` + Objects []clitypes.Object `json:"objects"` } func (c *PackageService) CreateLoad(req PackageCreateLoad) (*PackageCreateLoadResp, error) { @@ -145,7 +145,7 @@ func (c *PackageService) CreateLoad(req PackageCreateLoad) (*PackageCreateLoadRe const PackageDeletePath = "/package/delete" type PackageDelete struct { - PackageID cdssdk.PackageID `json:"packageID" binding:"required"` + PackageID clitypes.PackageID `json:"packageID" binding:"required"` } func (r *PackageDelete) MakeParam() *sdks.RequestParam { @@ -165,9 +165,9 @@ func (c *PackageService) Delete(req PackageDelete) error { const PackageClonePath = "/package/clone" type PackageClone struct { - PackageID cdssdk.PackageID `json:"packageID" binding:"required"` - BucketID cdssdk.BucketID `json:"bucketID" binding:"required"` - Name string `json:"name" binding:"required"` + PackageID clitypes.PackageID `json:"packageID" binding:"required"` + BucketID clitypes.BucketID `json:"bucketID" binding:"required"` + Name string `json:"name" binding:"required"` } func (r *PackageClone) MakeParam() *sdks.RequestParam { @@ -175,7 +175,7 @@ func (r *PackageClone) MakeParam() *sdks.RequestParam { } type PackageCloneResp struct { - Package cdssdk.Package `json:"package"` + Package clitypes.Package `json:"package"` } func (r *PackageCloneResp) ParseResponse(resp *http.Response) error { @@ -189,7 +189,7 @@ func (c *PackageService) Clone(req PackageClone) (*PackageCloneResp, error) { const PackageListBucketPackagesPath = "/package/listBucketPackages" type PackageListBucketPackages struct { - BucketID cdssdk.BucketID `form:"bucketID" url:"bucketID" binding:"required"` + BucketID clitypes.BucketID `form:"bucketID" url:"bucketID" binding:"required"` } func (r *PackageListBucketPackages) MakeParam() *sdks.RequestParam { @@ -197,7 +197,7 @@ func (r *PackageListBucketPackages) MakeParam() *sdks.RequestParam { } type PackageListBucketPackagesResp struct { - Packages []cdssdk.Package `json:"packages"` + Packages []clitypes.Package `json:"packages"` } func (r *PackageListBucketPackagesResp) ParseResponse(resp *http.Response) error { @@ -211,7 +211,7 @@ func (c *PackageService) ListBucketPackages(req PackageListBucketPackages) (*Pac const PackageGetCachedStoragesPath = "/package/getCachedStorages" type PackageGetCachedStoragesReq struct { - PackageID cdssdk.PackageID `form:"packageID" url:"packageID" binding:"required"` + PackageID clitypes.PackageID `form:"packageID" url:"packageID" binding:"required"` } func (r *PackageGetCachedStoragesReq) MakeParam() *sdks.RequestParam { diff --git a/client/sdk/api/presigned.go b/client/sdk/api/presigned.go index 5ea69d7..49a2390 100644 --- a/client/sdk/api/presigned.go +++ b/client/sdk/api/presigned.go @@ -10,7 +10,7 @@ import ( v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/google/go-querystring/query" - cdssdk "gitlink.org.cn/cloudream/storage2/client/types" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) type PresignedService struct { @@ -36,10 +36,10 @@ func (c *PresignedService) ObjectListByPath(req PresignedObjectListByPath, expir const PresignedObjectDownloadByPathPath = "/v1/presigned/object/downloadByPath" type PresignedObjectDownloadByPath struct { - PackageID cdssdk.PackageID `form:"packageID" url:"packageID" binding:"required"` - Path string `form:"path" url:"path" binding:"required"` - Offset int64 `form:"offset" url:"offset,omitempty"` - Length *int64 `form:"length" url:"length,omitempty"` + PackageID clitypes.PackageID `form:"packageID" url:"packageID" binding:"required"` + Path string `form:"path" url:"path" binding:"required"` + Offset int64 `form:"offset" url:"offset,omitempty"` + Length *int64 `form:"length" url:"length,omitempty"` } func (c *PresignedService) ObjectDownloadByPath(req PresignedObjectDownloadByPath, expireIn int) (string, error) { @@ -49,9 +49,9 @@ func (c *PresignedService) ObjectDownloadByPath(req PresignedObjectDownloadByPat const PresignedObjectDownloadPath = "/v1/presigned/object/download" type PresignedObjectDownload struct { - ObjectID cdssdk.ObjectID `form:"objectID" url:"objectID" binding:"required"` - Offset int64 `form:"offset" url:"offset,omitempty"` - Length *int64 `form:"length" url:"length,omitempty"` + ObjectID clitypes.ObjectID `form:"objectID" url:"objectID" binding:"required"` + Offset int64 `form:"offset" url:"offset,omitempty"` + Length *int64 `form:"length" url:"length,omitempty"` } func (c *PresignedService) ObjectDownload(req PresignedObjectDownload, expireIn int) (string, error) { @@ -61,15 +61,15 @@ func (c *PresignedService) ObjectDownload(req PresignedObjectDownload, expireIn const PresignedObjectUploadPath = "/v1/presigned/object/upload" type PresignedObjectUpload struct { - PackageID cdssdk.PackageID `form:"packageID" binding:"required" url:"packageID"` - Path string `form:"path" binding:"required" url:"path"` - Affinity cdssdk.UserSpaceID `form:"affinity" url:"affinity,omitempty"` - LoadTo []cdssdk.UserSpaceID `form:"loadTo" url:"loadTo,omitempty"` - LoadToPath []string `form:"loadToPath" url:"loadToPath,omitempty"` + PackageID clitypes.PackageID `form:"packageID" binding:"required" url:"packageID"` + Path string `form:"path" binding:"required" url:"path"` + Affinity clitypes.UserSpaceID `form:"affinity" url:"affinity,omitempty"` + LoadTo []clitypes.UserSpaceID `form:"loadTo" url:"loadTo,omitempty"` + LoadToPath []string `form:"loadToPath" url:"loadToPath,omitempty"` } type PresignedObjectUploadResp struct { - Object cdssdk.Object `json:"object"` + Object clitypes.Object `json:"object"` } func (c *PresignedService) ObjectUpload(req PresignedObjectUpload, expireIn int) (string, error) { @@ -79,12 +79,12 @@ func (c *PresignedService) ObjectUpload(req PresignedObjectUpload, expireIn int) const PresignedObjectNewMultipartUploadPath = "/v1/presigned/object/newMultipartUpload" type PresignedObjectNewMultipartUpload struct { - PackageID cdssdk.PackageID `form:"packageID" binding:"required" url:"packageID"` - Path string `form:"path" binding:"required" url:"path"` + PackageID clitypes.PackageID `form:"packageID" binding:"required" url:"packageID"` + Path string `form:"path" binding:"required" url:"path"` } type PresignedObjectNewMultipartUploadResp struct { - Object cdssdk.Object `json:"object"` + Object clitypes.Object `json:"object"` } func (c *PresignedService) ObjectNewMultipartUpload(req PresignedObjectNewMultipartUpload, expireIn int) (string, error) { @@ -94,8 +94,8 @@ func (c *PresignedService) ObjectNewMultipartUpload(req PresignedObjectNewMultip const PresignedObjectUploadPartPath = "/v1/presigned/object/uploadPart" type PresignedObjectUploadPart struct { - ObjectID cdssdk.ObjectID `form:"objectID" binding:"required" url:"objectID"` - Index int `form:"index" binding:"required" url:"index"` + ObjectID clitypes.ObjectID `form:"objectID" binding:"required" url:"objectID"` + Index int `form:"index" binding:"required" url:"index"` } type PresignedUploadPartResp struct{} @@ -107,12 +107,12 @@ func (c *PresignedService) ObjectUploadPart(req PresignedObjectUploadPart, expir const PresignedObjectCompleteMultipartUploadPath = "/v1/presigned/object/completeMultipartUpload" type PresignedObjectCompleteMultipartUpload struct { - ObjectID cdssdk.ObjectID `form:"objectID" binding:"required" url:"objectID"` - Indexes []int `form:"indexes" binding:"required" url:"indexes"` + ObjectID clitypes.ObjectID `form:"objectID" binding:"required" url:"objectID"` + Indexes []int `form:"indexes" binding:"required" url:"indexes"` } type PresignedObjectCompleteMultipartUploadResp struct { - Object cdssdk.Object `json:"object"` + Object clitypes.Object `json:"object"` } func (c *PresignedService) ObjectCompleteMultipartUpload(req PresignedObjectCompleteMultipartUpload, expireIn int) (string, error) { diff --git a/client/sdk/api/storage_test.go b/client/sdk/api/storage_test.go index 4fdedc3..e2b89b4 100644 --- a/client/sdk/api/storage_test.go +++ b/client/sdk/api/storage_test.go @@ -9,7 +9,7 @@ import ( "github.com/google/uuid" . "github.com/smartystreets/goconvey/convey" "gitlink.org.cn/cloudream/common/pkgs/iterator" - cdssdk "gitlink.org.cn/cloudream/storage2/client/types" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) func Test_PackageGet(t *testing.T) { @@ -73,7 +73,7 @@ func Test_Object(t *testing.T) { fileData[i] = byte(i) } - stgAff := cdssdk.UserSpaceID(2) + stgAff := clitypes.UserSpaceID(2) pkgName := uuid.NewString() createResp, err := cli.Package().Create(PackageCreate{ diff --git a/client/sdk/api/userspace.go b/client/sdk/api/userspace.go index f0da6f0..0890072 100644 --- a/client/sdk/api/userspace.go +++ b/client/sdk/api/userspace.go @@ -4,15 +4,15 @@ import ( "net/http" "gitlink.org.cn/cloudream/common/sdks" - cdssdk "gitlink.org.cn/cloudream/storage2/client/types" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) const UserSpaceLoadPackagePath = "/userspace/loadPackage" type UserSpaceLoadPackageReq struct { - PackageID cdssdk.PackageID `json:"packageID" binding:"required"` - UserSpaceID cdssdk.UserSpaceID `json:"userSpaceID" binding:"required"` - RootPath string `json:"rootPath"` + PackageID clitypes.PackageID `json:"packageID" binding:"required"` + UserSpaceID clitypes.UserSpaceID `json:"userSpaceID" binding:"required"` + RootPath string `json:"rootPath"` } func (r *UserSpaceLoadPackageReq) MakeParam() *sdks.RequestParam { @@ -32,11 +32,11 @@ func (c *Client) UserSpaceLoadPackage(req UserSpaceLoadPackageReq) (*UserSpaceLo const UserSpaceCreatePackagePath = "/userspace/createPackage" type UserSpaceCreatePackageReq struct { - UserSpaceID cdssdk.UserSpaceID `json:"userSpaceID" binding:"required"` - Path string `json:"path" binding:"required"` - BucketID cdssdk.BucketID `json:"bucketID" binding:"required"` - Name string `json:"name" binding:"required"` - SpaceAffinity cdssdk.UserSpaceID `json:"spaceAffinity"` + UserSpaceID clitypes.UserSpaceID `json:"userSpaceID" binding:"required"` + Path string `json:"path" binding:"required"` + BucketID clitypes.BucketID `json:"bucketID" binding:"required"` + Name string `json:"name" binding:"required"` + SpaceAffinity clitypes.UserSpaceID `json:"spaceAffinity"` } func (r *UserSpaceCreatePackageReq) MakeParam() *sdks.RequestParam { @@ -44,7 +44,7 @@ func (r *UserSpaceCreatePackageReq) MakeParam() *sdks.RequestParam { } type UserSpaceCreatePackageResp struct { - Package cdssdk.Package `json:"package"` + Package clitypes.Package `json:"package"` } func (r *UserSpaceCreatePackageResp) ParseResponse(resp *http.Response) error { @@ -58,7 +58,7 @@ func (c *Client) UserSpaceCreatePackage(req UserSpaceCreatePackageReq) (*UserSpa const UserSpaceGetPath = "/userspace/get" type UserSpaceGet struct { - UserSpaceID cdssdk.UserSpaceID `form:"userSpaceID" url:"userSpaceID" binding:"required"` + UserSpaceID clitypes.UserSpaceID `form:"userSpaceID" url:"userSpaceID" binding:"required"` } func (r *UserSpaceGet) MakeParam() *sdks.RequestParam { @@ -66,7 +66,7 @@ func (r *UserSpaceGet) MakeParam() *sdks.RequestParam { } type UserSpaceGetResp struct { - cdssdk.UserSpace + clitypes.UserSpace } func (r *UserSpaceGetResp) ParseResponse(resp *http.Response) error { diff --git a/common/pkgs/distlock/reqbuilder/metadata_object.go b/common/pkgs/distlock/reqbuilder/metadata_object.go index 4767b2d..228070d 100644 --- a/common/pkgs/distlock/reqbuilder/metadata_object.go +++ b/common/pkgs/distlock/reqbuilder/metadata_object.go @@ -2,7 +2,7 @@ package reqbuilder import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock/lockprovider" ) @@ -14,7 +14,7 @@ func (b *MetadataLockReqBuilder) Object() *MetadataObjectLockReqBuilder { return &MetadataObjectLockReqBuilder{MetadataLockReqBuilder: b} } -func (b *MetadataObjectLockReqBuilder) CreateOne(packageID cdssdk.PackageID, objectPath string) *MetadataObjectLockReqBuilder { +func (b *MetadataObjectLockReqBuilder) CreateOne(packageID clitypes.PackageID, objectPath string) *MetadataObjectLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath("Object"), Name: lockprovider.MetadataCreateLock, diff --git a/common/pkgs/distlock/reqbuilder/shard_store.go b/common/pkgs/distlock/reqbuilder/shard_store.go index 756ce34..f1907ac 100644 --- a/common/pkgs/distlock/reqbuilder/shard_store.go +++ b/common/pkgs/distlock/reqbuilder/shard_store.go @@ -4,8 +4,8 @@ import ( "strconv" "gitlink.org.cn/cloudream/common/pkgs/distlock" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock/lockprovider" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type ShardStoreLockReqBuilder struct { @@ -15,7 +15,7 @@ type ShardStoreLockReqBuilder struct { func (b *LockRequestBuilder) Shard() *ShardStoreLockReqBuilder { return &ShardStoreLockReqBuilder{LockRequestBuilder: b} } -func (b *ShardStoreLockReqBuilder) Buzy(stgID cdssdk.StorageID) *ShardStoreLockReqBuilder { +func (b *ShardStoreLockReqBuilder) Buzy(stgID cortypes.StorageID) *ShardStoreLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(stgID), Name: lockprovider.ShardStoreBuzyLock, @@ -24,7 +24,7 @@ func (b *ShardStoreLockReqBuilder) Buzy(stgID cdssdk.StorageID) *ShardStoreLockR return b } -func (b *ShardStoreLockReqBuilder) GC(stgID cdssdk.StorageID) *ShardStoreLockReqBuilder { +func (b *ShardStoreLockReqBuilder) GC(stgID cortypes.StorageID) *ShardStoreLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(stgID), Name: lockprovider.ShardStoreGCLock, @@ -33,6 +33,6 @@ func (b *ShardStoreLockReqBuilder) GC(stgID cdssdk.StorageID) *ShardStoreLockReq return b } -func (b *ShardStoreLockReqBuilder) makePath(hubID cdssdk.StorageID) []string { +func (b *ShardStoreLockReqBuilder) makePath(hubID cortypes.StorageID) []string { return []string{lockprovider.ShardStoreLockPathPrefix, strconv.FormatInt(int64(hubID), 10)} } diff --git a/common/pkgs/distlock/reqbuilder/storage.go b/common/pkgs/distlock/reqbuilder/storage.go index 84086f2..10bf2f2 100644 --- a/common/pkgs/distlock/reqbuilder/storage.go +++ b/common/pkgs/distlock/reqbuilder/storage.go @@ -4,8 +4,8 @@ import ( "strconv" "gitlink.org.cn/cloudream/common/pkgs/distlock" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock/lockprovider" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type StorageLockReqBuilder struct { @@ -16,7 +16,7 @@ func (b *LockRequestBuilder) Storage() *StorageLockReqBuilder { return &StorageLockReqBuilder{LockRequestBuilder: b} } -func (b *StorageLockReqBuilder) Buzy(storageID cdssdk.StorageID) *StorageLockReqBuilder { +func (b *StorageLockReqBuilder) Buzy(storageID cortypes.StorageID) *StorageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(storageID), Name: lockprovider.StorageBuzyLock, @@ -25,7 +25,7 @@ func (b *StorageLockReqBuilder) Buzy(storageID cdssdk.StorageID) *StorageLockReq return b } -func (b *StorageLockReqBuilder) GC(storageID cdssdk.StorageID) *StorageLockReqBuilder { +func (b *StorageLockReqBuilder) GC(storageID cortypes.StorageID) *StorageLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(storageID), Name: lockprovider.StorageGCLock, @@ -34,6 +34,6 @@ func (b *StorageLockReqBuilder) GC(storageID cdssdk.StorageID) *StorageLockReqBu return b } -func (b *StorageLockReqBuilder) makePath(storageID cdssdk.StorageID) []string { +func (b *StorageLockReqBuilder) makePath(storageID cortypes.StorageID) []string { return []string{lockprovider.StorageLockPathPrefix, strconv.FormatInt(int64(storageID), 10)} } diff --git a/common/pkgs/ioswitch2/ops2/faas.go b/common/pkgs/ioswitch2/ops2/faas.go deleted file mode 100644 index 5cf006f..0000000 --- a/common/pkgs/ioswitch2/ops2/faas.go +++ /dev/null @@ -1,62 +0,0 @@ -package ops2 - -/* -import ( - "fmt" - - "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/hubpool" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" -) - -type InternalFaaSGalMultiply struct { - Coefs [][]byte `json:"coefs"` - InputFilePathes []exec.VarID `json:"inputFilePathes"` // 输入的文件的路径 - OutputFilePathes []exec.VarID `json:"outputFilePathes"` // 输出的文件的路径 - ChunkSize int `json:"chunkSize"` - StorageID cdssdk.StorageID `json:"storageID"` -} - -func (o *InternalFaaSGalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - stgHubs, err := exec.GetValueByType[*hubpool.HubPool](ctx) - if err != nil { - return err - } - - fass, err := hubpool.GetComponent[types.InternalFaaSCall](stgHubs, o.StorageID) - if err != nil { - return fmt.Errorf("getting faas component: %w", err) - } - - tmp, err := hubpool.GetComponent[types.TempStore](stgHubs, o.StorageID) - if err != nil { - return fmt.Errorf("getting temp store component: %w", err) - } - - inputVars, err := exec.BindArray[*exec.StringValue](e, ctx.Context, o.InputFilePathes) - if err != nil { - return err - } - - var outputs []string - for i := 0; i < len(o.OutputFilePathes); i++ { - outputs = append(outputs, tmp.CreateTemp()) - } - var outputVars []*exec.StringValue - for _, output := range outputs { - outputVars = append(outputVars, &exec.StringValue{Value: output}) - } - - inputs := lo.Map(inputVars, func(v *exec.StringValue, idx int) string { return v.Value }) - - err = fass.GalMultiply(ctx.Context, o.Coefs, inputs, outputs, o.ChunkSize) - if err != nil { - return fmt.Errorf("faas gal multiply: %w", err) - } - - exec.PutArray(e, o.OutputFilePathes, outputVars) - return nil -} -*/ diff --git a/common/pkgs/ioswitchlrc/ops2/ec.go b/common/pkgs/ioswitchlrc/ops2/ec.go index 6e2f627..88d00a4 100644 --- a/common/pkgs/ioswitchlrc/ops2/ec.go +++ b/common/pkgs/ioswitchlrc/ops2/ec.go @@ -8,9 +8,9 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/sync2" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/ec" "gitlink.org.cn/cloudream/storage2/common/pkgs/ec/lrc" ) @@ -114,12 +114,12 @@ func (o *GalMultiply) String() string { type LRCConstructAnyNode struct { dag.NodeBase - LRC cdssdk.LRCRedundancy + LRC clitypes.LRCRedundancy InputIndexes []int OutputIndexes []int } -func (b *GraphNodeBuilder) NewLRCConstructAny(lrc cdssdk.LRCRedundancy) *LRCConstructAnyNode { +func (b *GraphNodeBuilder) NewLRCConstructAny(lrc clitypes.LRCRedundancy) *LRCConstructAnyNode { node := &LRCConstructAnyNode{ LRC: lrc, } @@ -168,11 +168,11 @@ func (t *LRCConstructAnyNode) GenerateOp() (exec.Op, error) { type LRCConstructGroupNode struct { dag.NodeBase - LRC cdssdk.LRCRedundancy + LRC clitypes.LRCRedundancy TargetBlockIndex int } -func (b *GraphNodeBuilder) NewLRCConstructGroup(lrc cdssdk.LRCRedundancy) *LRCConstructGroupNode { +func (b *GraphNodeBuilder) NewLRCConstructGroup(lrc clitypes.LRCRedundancy) *LRCConstructGroupNode { node := &LRCConstructGroupNode{ LRC: lrc, } diff --git a/common/pkgs/ioswitchlrc/parser/generator.go b/common/pkgs/ioswitchlrc/parser/generator.go index 30b167e..688a5fe 100644 --- a/common/pkgs/ioswitchlrc/parser/generator.go +++ b/common/pkgs/ioswitchlrc/parser/generator.go @@ -6,14 +6,14 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/math2" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc/ops2" ) type GenerateContext struct { - LRC cdssdk.LRCRedundancy + LRC clitypes.LRCRedundancy DAG *ops2.GraphNodeBuilder To []ioswitchlrc.To ToNodes map[ioswitchlrc.To]ops2.ToNode @@ -27,7 +27,7 @@ func Encode(fr ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) } ctx := GenerateContext{ - LRC: cdssdk.DefaultLRCRedundancy, + LRC: clitypes.DefaultLRCRedundancy, DAG: ops2.NewGraphNodeBuilder(), To: toes, ToNodes: make(map[ioswitchlrc.To]ops2.ToNode), @@ -124,7 +124,7 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr // 提供数据块+编码块中的k个块,重建任意块,包括完整文件。 func ReconstructAny(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error { ctx := GenerateContext{ - LRC: cdssdk.DefaultLRCRedundancy, + LRC: clitypes.DefaultLRCRedundancy, DAG: ops2.NewGraphNodeBuilder(), To: toes, ToNodes: make(map[ioswitchlrc.To]ops2.ToNode), @@ -245,7 +245,7 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ // 输入同一组的多个块,恢复出剩下缺少的一个块。 func ReconstructGroup(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error { ctx := GenerateContext{ - LRC: cdssdk.DefaultLRCRedundancy, + LRC: clitypes.DefaultLRCRedundancy, DAG: ops2.NewGraphNodeBuilder(), To: toes, ToNodes: make(map[ioswitchlrc.To]ops2.ToNode), diff --git a/common/pkgs/mq/hub/client.go b/common/pkgs/mq/hub/client.go index 7a30149..fe64671 100644 --- a/common/pkgs/mq/hub/client.go +++ b/common/pkgs/mq/hub/client.go @@ -4,16 +4,16 @@ import ( "sync" "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type Client struct { rabbitCli *mq.RabbitMQTransport - id cdssdk.HubID + id cortypes.HubID } -func NewClient(id cdssdk.HubID, cfg mq.Config) (*Client, error) { +func NewClient(id cortypes.HubID, cfg mq.Config) (*Client, error) { rabbitCli, err := mq.NewRabbitMQTransport(cfg, stgmq.MakeHubQueueName(int64(id)), "") if err != nil { return nil, err @@ -30,23 +30,23 @@ func (c *Client) Close() { } type Pool interface { - Acquire(id cdssdk.HubID) (*Client, error) + Acquire(id cortypes.HubID) (*Client, error) Release(cli *Client) } type pool struct { mqcfg mq.Config - shareds map[cdssdk.HubID]*Client + shareds map[cortypes.HubID]*Client lock sync.Mutex } func NewPool(mqcfg mq.Config) Pool { return &pool{ mqcfg: mqcfg, - shareds: make(map[cdssdk.HubID]*Client), + shareds: make(map[cortypes.HubID]*Client), } } -func (p *pool) Acquire(id cdssdk.HubID) (*Client, error) { +func (p *pool) Acquire(id cortypes.HubID) (*Client, error) { p.lock.Lock() defer p.lock.Unlock() diff --git a/common/pkgs/mq/hub/server.go b/common/pkgs/mq/hub/server.go index 2e1c735..4d3edc8 100644 --- a/common/pkgs/mq/hub/server.go +++ b/common/pkgs/mq/hub/server.go @@ -2,9 +2,9 @@ package hub import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/sync2" mymq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type Service interface { @@ -20,7 +20,7 @@ type Server struct { rabbitSvr mq.RabbitMQServer } -func NewServer(svc Service, id cdssdk.HubID, cfg mq.Config) (*Server, error) { +func NewServer(svc Service, id cortypes.HubID, cfg mq.Config) (*Server, error) { srv := &Server{ service: svc, } diff --git a/hub/internal/config/config.go b/hub/internal/config/config.go index d3ee692..dd58079 100644 --- a/hub/internal/config/config.go +++ b/hub/internal/config/config.go @@ -4,15 +4,15 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" log "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" c "gitlink.org.cn/cloudream/common/utils/config" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type Config struct { - ID cdssdk.HubID `json:"id"` + ID cortypes.HubID `json:"id"` Local stgglb.LocalMachineInfo `json:"local"` GRPC *grpc.Config `json:"grpc"` Logger log.Config `json:"logger"` diff --git a/hub/internal/http/hub_io.go b/hub/internal/http/hub_io.go index b11e07f..eda4478 100644 --- a/hub/internal/http/hub_io.go +++ b/hub/internal/http/hub_io.go @@ -13,10 +13,10 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" "gitlink.org.cn/cloudream/common/utils/http2" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/serder" + hubapi "gitlink.org.cn/cloudream/storage2/hub/sdk/api" ) type IOService struct { @@ -32,7 +32,7 @@ func (s *Server) IOSvc() *IOService { func (s *IOService) GetStream(ctx *gin.Context) { log := logger.WithField("HTTP", "HubIO.GetStream") - req, err := serder.JSONToObjectStreamEx[cdsapi.GetStreamReq](ctx.Request.Body) + req, err := serder.JSONToObjectStreamEx[hubapi.GetStreamReq](ctx.Request.Body) if err != nil { log.Warnf("deserializing request: %v", err) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -91,7 +91,7 @@ func (s *IOService) SendStream(ctx *gin.Context) { return } - info, err := serder.JSONToObjectEx[cdsapi.SendStreamInfo](infoData) + info, err := serder.JSONToObjectEx[hubapi.SendStreamInfo](infoData) if err != nil { logger.Warnf("deserializing info data: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, fmt.Sprintf("deserializing info data: %v", err))) @@ -145,7 +145,7 @@ func (s *IOService) ExecuteIOPlan(ctx *gin.Context) { return } - req, err := serder.JSONToObjectEx[cdsapi.ExecuteIOPlanReq](data) + req, err := serder.JSONToObjectEx[hubapi.ExecuteIOPlanReq](data) if err != nil { log.Warnf("deserializing request: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -182,7 +182,7 @@ func (s *IOService) SendVar(ctx *gin.Context) { return } - req, err := serder.JSONToObjectEx[cdsapi.SendVarReq](data) + req, err := serder.JSONToObjectEx[hubapi.SendVarReq](data) if err != nil { log.Warnf("deserializing request: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -213,7 +213,7 @@ func (s *IOService) GetVar(ctx *gin.Context) { return } - req, err := serder.JSONToObjectEx[cdsapi.GetVarReq](data) + req, err := serder.JSONToObjectEx[hubapi.GetVarReq](data) if err != nil { log.Warnf("deserializing request: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -241,7 +241,7 @@ func (s *IOService) GetVar(ctx *gin.Context) { resp := Response{ Code: errorcode.OK, - Data: cdsapi.GetVarResp{ + Data: hubapi.GetVarResp{ Value: v, }, } diff --git a/hub/internal/http/server.go b/hub/internal/http/server.go index 70550ee..9a13bec 100644 --- a/hub/internal/http/server.go +++ b/hub/internal/http/server.go @@ -3,7 +3,7 @@ package http import ( "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" + hubapi "gitlink.org.cn/cloudream/storage2/hub/sdk/api" ) type Server struct { @@ -38,9 +38,9 @@ func (s *Server) Serve() error { } func (s *Server) initRouters() { - s.engine.GET(cdsapi.GetStreamPath, s.IOSvc().GetStream) - s.engine.POST(cdsapi.SendStreamPath, s.IOSvc().SendStream) - s.engine.POST(cdsapi.ExecuteIOPlanPath, s.IOSvc().ExecuteIOPlan) - s.engine.POST(cdsapi.SendVarPath, s.IOSvc().SendVar) - s.engine.GET(cdsapi.GetVarPath, s.IOSvc().GetVar) + s.engine.GET(hubapi.GetStreamPath, s.IOSvc().GetStream) + s.engine.POST(hubapi.SendStreamPath, s.IOSvc().SendStream) + s.engine.POST(hubapi.ExecuteIOPlanPath, s.IOSvc().ExecuteIOPlan) + s.engine.POST(hubapi.SendVarPath, s.IOSvc().SendVar) + s.engine.GET(hubapi.GetVarPath, s.IOSvc().GetVar) } diff --git a/hub/sdk/api/client.go b/hub/sdk/api/client.go new file mode 100644 index 0000000..a16d713 --- /dev/null +++ b/hub/sdk/api/client.go @@ -0,0 +1,51 @@ +package api + +import ( + "gitlink.org.cn/cloudream/common/sdks" +) + +type response[T any] struct { + Code string `json:"code"` + Message string `json:"message"` + Data T `json:"data"` +} + +func (r *response[T]) ToError() *sdks.CodeMessageError { + return &sdks.CodeMessageError{ + Code: r.Code, + Message: r.Message, + } +} + +type Client struct { + cfg *Config +} + +func NewClient(cfg *Config) *Client { + return &Client{ + cfg: cfg, + } +} + +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) +} + +type pool struct { + cfg *Config +} + +func NewPool(cfg *Config) Pool { + return &pool{ + cfg: cfg, + } +} +func (p *pool) Acquire() (*Client, error) { + cli := NewClient(p.cfg) + return cli, nil +} + +func (p *pool) Release(cli *Client) { + +} diff --git a/hub/sdk/api/config.go b/hub/sdk/api/config.go new file mode 100644 index 0000000..02a22b9 --- /dev/null +++ b/hub/sdk/api/config.go @@ -0,0 +1,5 @@ +package api + +type Config struct { + URL string `json:"url"` +} diff --git a/hub/sdk/api/hub_io.go b/hub/sdk/api/hub_io.go new file mode 100644 index 0000000..d013575 --- /dev/null +++ b/hub/sdk/api/hub_io.go @@ -0,0 +1,244 @@ +package api + +import ( + "fmt" + "io" + "net/url" + + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +// TODO2 重新梳理代码 + +const GetStreamPath = "/hubIO/getStream" + +type GetStreamReq struct { + PlanID exec.PlanID `json:"planID"` + VarID exec.VarID `json:"varID"` + SignalID exec.VarID `json:"signalID"` + Signal exec.VarValue `json:"signal"` +} + +func (c *Client) GetStream(req GetStreamReq) (io.ReadCloser, error) { + targetUrl, err := url.JoinPath(c.cfg.URL, GetStreamPath) + if err != nil { + return nil, err + } + + body, err := serder.ObjectToJSONEx(req) + if err != nil { + return nil, fmt.Errorf("request to json: %w", err) + } + + resp, err := http2.GetJSON(targetUrl, http2.RequestParam{ + Body: body, + }) + if err != nil { + return nil, err + } + + cr := http2.NewChunkedReader(resp.Body) + _, str, err := cr.NextPart() + if err != nil { + return nil, fmt.Errorf("reading response: %w", err) + } + + return io2.DelegateReadCloser(str, func() error { + cr.Close() + return nil + }), nil +} + +const SendStreamPath = "/hubIO/sendStream" + +type SendStreamReq struct { + SendStreamInfo + Stream io.ReadCloser +} +type SendStreamInfo struct { + PlanID exec.PlanID `json:"planID"` + VarID exec.VarID `json:"varID"` +} + +func (c *Client) SendStream(req SendStreamReq) error { + targetUrl, err := url.JoinPath(c.cfg.URL, SendStreamPath) + if err != nil { + return err + } + + pr, pw := io.Pipe() + errCh := make(chan error, 1) + go func() { + cw := http2.NewChunkedWriter(pw) + + infoJSON, err := serder.ObjectToJSONEx(req) + if err != nil { + cw.Abort(fmt.Sprintf("info to json: %v", err)) + errCh <- fmt.Errorf("info to json: %w", err) + return + } + + if err := cw.WriteDataPart("info", infoJSON); err != nil { + cw.Close() + errCh <- fmt.Errorf("write info: %w", err) + return + } + + _, err = cw.WriteStreamPart("stream", req.Stream) + if err != nil { + cw.Close() + errCh <- fmt.Errorf("write stream: %w", err) + return + } + + err = cw.Finish() + if err != nil { + errCh <- fmt.Errorf("finish chunked writer: %w", err) + return + } + }() + + resp, err := http2.PostChunked2(targetUrl, http2.Chunked2RequestParam{ + Body: pr, + }) + if err != nil { + return err + } + + err = <-errCh + if err != nil { + return err + } + + codeResp, err := ParseJSONResponse[response[any]](resp) + if err != nil { + return err + } + + if codeResp.Code == errorcode.OK { + return nil + } + + return codeResp.ToError() +} + +const ExecuteIOPlanPath = "/hubIO/executeIOPlan" + +type ExecuteIOPlanReq struct { + Plan exec.Plan `json:"plan"` +} + +func (c *Client) ExecuteIOPlan(req ExecuteIOPlanReq) error { + targetUrl, err := url.JoinPath(c.cfg.URL, ExecuteIOPlanPath) + if err != nil { + return err + } + + body, err := serder.ObjectToJSONEx(req) + if err != nil { + return fmt.Errorf("request to json: %w", err) + } + + resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ + Body: body, + }) + if err != nil { + return err + } + + codeResp, err := ParseJSONResponse[response[any]](resp) + if err != nil { + return err + } + + if codeResp.Code == errorcode.OK { + return nil + } + + return codeResp.ToError() +} + +const SendVarPath = "/hubIO/sendVar" + +type SendVarReq struct { + PlanID exec.PlanID `json:"planID"` + VarID exec.VarID `json:"varID"` + VarValue exec.VarValue `json:"varValue"` +} + +func (c *Client) SendVar(req SendVarReq) error { + targetUrl, err := url.JoinPath(c.cfg.URL, SendVarPath) + if err != nil { + return err + } + + body, err := serder.ObjectToJSONEx(req) + if err != nil { + return fmt.Errorf("request to json: %w", err) + } + + resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ + Body: body, + }) + if err != nil { + return err + } + + jsonResp, err := ParseJSONResponse[response[any]](resp) + if err != nil { + return err + } + + if jsonResp.Code == errorcode.OK { + return nil + } + + return jsonResp.ToError() +} + +const GetVarPath = "/hubIO/getVar" + +type GetVarReq struct { + PlanID exec.PlanID `json:"planID"` + VarID exec.VarID `json:"varID"` + SignalID exec.VarID `json:"signalID"` + Signal exec.VarValue `json:"signal"` +} + +type GetVarResp struct { + Value exec.VarValue `json:"value"` +} + +func (c *Client) GetVar(req GetVarReq) (*GetVarResp, error) { + targetUrl, err := url.JoinPath(c.cfg.URL, GetVarPath) + if err != nil { + return nil, err + } + + body, err := serder.ObjectToJSONEx(req) + if err != nil { + return nil, fmt.Errorf("request to json: %w", err) + } + + resp, err := http2.GetJSON(targetUrl, http2.RequestParam{ + Body: body, + }) + if err != nil { + return nil, err + } + + jsonResp, err := ParseJSONResponse[response[GetVarResp]](resp) + if err != nil { + return nil, err + } + + if jsonResp.Code == errorcode.OK { + return &jsonResp.Data, nil + } + + return nil, jsonResp.ToError() +} diff --git a/hub/sdk/api/utils.go b/hub/sdk/api/utils.go new file mode 100644 index 0000000..77b93fc --- /dev/null +++ b/hub/sdk/api/utils.go @@ -0,0 +1,176 @@ +package api + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "mime/multipart" + "net/http" + ul "net/url" + "path/filepath" + "strings" + + "github.com/google/go-querystring/query" + "gitlink.org.cn/cloudream/common/pkgs/iterator" + "gitlink.org.cn/cloudream/common/sdks" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +func MakeIPFSFilePath(fileHash string) string { + return filepath.Join("ipfs", fileHash) +} + +func ParseJSONResponse[TBody any](resp *http.Response) (TBody, error) { + var ret TBody + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var err error + if ret, err = serder.JSONToObjectStreamEx[TBody](resp.Body); err != nil { + return ret, fmt.Errorf("parsing response: %w", err) + } + + return ret, nil + } + + cont, err := io.ReadAll(resp.Body) + if err != nil { + return ret, fmt.Errorf("unknow response content type: %s, status: %d", contType, resp.StatusCode) + } + strCont := string(cont) + + return ret, fmt.Errorf("unknow response content type: %s, status: %d, body(prefix): %s", contType, resp.StatusCode, strCont[:math2.Min(len(strCont), 200)]) +} + +func JSONAPI[Resp sdks.APIResponse, Req sdks.APIRequest](cfg *Config, cli *http.Client, req Req, resp Resp) (Resp, error) { + + param := req.MakeParam() + + httpReq, err := param.MakeRequest(cfg.URL) + if err != nil { + return resp, err + } + + httpResp, err := cli.Do(httpReq) + if err != nil { + return resp, err + } + + err = resp.ParseResponse(httpResp) + return resp, err +} + +func JSONAPINoData[Req sdks.APIRequest](cfg *Config, cli *http.Client, req Req) error { + param := req.MakeParam() + + httpReq, err := param.MakeRequest(cfg.URL) + if err != nil { + return err + } + + resp, err := cli.Do(httpReq) + if err != nil { + return err + } + + return sdks.ParseCodeDataJSONResponse(resp, any(nil)) +} + +func calcSha256(body sdks.RequestBody) string { + hasher := sha256.New() + switch body := body.(type) { + case *sdks.StringBody: + hasher.Write([]byte(body.Value)) + return hex.EncodeToString(hasher.Sum(nil)) + + case *sdks.BytesBody: + hasher.Write(body.Value) + return hex.EncodeToString(hasher.Sum(nil)) + + case *sdks.StreamBody: + return "" + + default: + hash := sha256.Sum256([]byte("")) + return hex.EncodeToString(hash[:]) + } +} + +func PostMultiPart(cfg *Config, url string, info any, files http2.MultiPartFileIterator) (*http.Response, error) { + req, err := http.NewRequest(http.MethodPost, url, nil) + if err != nil { + return nil, err + } + + pr, pw := io.Pipe() + muWriter := multipart.NewWriter(pw) + + req.Header.Set("Content-Type", fmt.Sprintf("%s;boundary=%s", http2.ContentTypeMultiPart, muWriter.Boundary())) + + writeResult := make(chan error, 1) + go func() { + writeResult <- func() error { + defer pw.Close() + defer muWriter.Close() + + if info != nil { + mp, err := query.Values(info) + if err != nil { + return fmt.Errorf("formValues object to map failed, err: %w", err) + } + + for k, v := range mp { + err := muWriter.WriteField(k, v[0]) + if err != nil { + return fmt.Errorf("write form field failed, err: %w", err) + } + } + } + + for { + file, err := files.MoveNext() + if err == iterator.ErrNoMoreItem { + break + } + if err != nil { + return fmt.Errorf("opening file: %w", err) + } + + err = sendFileOnePart(muWriter, file.FieldName, file.FileName, file.File) + file.File.Close() + if err != nil { + return err + } + } + + return nil + }() + }() + + req.Body = pr + + cli := http.Client{} + resp, err := cli.Do(req) + if err != nil { + return nil, err + } + + writeErr := <-writeResult + if writeErr != nil { + return nil, writeErr + } + + return resp, nil +} + +func sendFileOnePart(muWriter *multipart.Writer, fieldName, fileName string, file io.ReadCloser) error { + w, err := muWriter.CreateFormFile(fieldName, ul.PathEscape(fileName)) + if err != nil { + return fmt.Errorf("create form file failed, err: %w", err) + } + + _, err = io.Copy(w, file) + return err +}