From 9c982a0426a1bb81e96cd08fe714a1879c064f3d Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 8 Apr 2025 14:56:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=83=A8=E5=88=86=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/accessstat/access_stat.go | 32 +++---- client/internal/config/config.go | 30 +++---- client/internal/db/db.go | 9 +- client/internal/db/object_access_stat.go | 7 -- client/internal/db/package.go | 47 +++++++++++ client/internal/downloader/downloader.go | 8 +- client/internal/downloader/iterator.go | 6 +- .../internal/downloader/lrc_strip_iterator.go | 6 +- .../internal/downloader/strategy/selector.go | 2 +- client/internal/downloader/strip_iterator.go | 2 +- client/internal/http/object.go | 68 +++++++-------- client/internal/mount/mount_win.go | 8 +- client/internal/mount/vfs/cache/cache.go | 23 +++-- client/internal/mount/vfs/cache/file.go | 6 +- client/internal/mount/vfs/fuse_bucket.go | 20 ++--- client/internal/mount/vfs/fuse_dir.go | 48 +++++------ client/internal/mount/vfs/fuse_file.go | 6 +- client/internal/mount/vfs/fuse_package.go | 34 ++++---- client/internal/mount/vfs/fuse_root.go | 8 +- client/internal/mount/vfs/vfs.go | 10 +-- client/internal/publics/publics.go | 9 -- client/internal/services/object.go | 84 ++----------------- client/internal/services/package.go | 2 +- client/internal/services/service.go | 18 ++-- client/internal/services/storage.go | 57 +++++++------ client/internal/task/task.go | 40 --------- client/internal/uploader/create_load.go | 2 +- client/internal/uploader/update.go | 2 +- client/internal/uploader/uploader.go | 18 ++-- client/types/types.go | 24 ++++++ client/types/utils.go | 18 ++++ common/globals/globals.go | 14 +++- common/globals/utils.go | 4 +- common/models/datamap/datamap.go | 18 ---- common/pkgs/accessstat/access_stat.go | 76 ----------------- common/pkgs/accessstat/config.go | 7 -- common/pkgs/connectivity/collector.go | 20 ++--- common/pkgs/ioswitch2/ops2/s2s.go | 2 +- common/pkgs/ioswitch2/parser/opt/ec.go | 2 +- common/pkgs/ioswitch2/parser/opt/s2s.go | 6 +- common/pkgs/mq/agent/server.go | 2 +- common/pkgs/mq/agent/storage.go | 50 +++++------ common/pkgs/sysevent/publisher.go | 6 +- common/pkgs/sysevent/sysevent.go | 6 +- 44 files changed, 368 insertions(+), 499 deletions(-) delete mode 100644 client/internal/publics/publics.go delete mode 100644 client/internal/task/task.go create mode 100644 client/types/utils.go delete mode 100644 common/pkgs/accessstat/access_stat.go delete mode 100644 common/pkgs/accessstat/config.go diff --git a/client/internal/accessstat/access_stat.go b/client/internal/accessstat/access_stat.go index b7d1982..7cc416f 100644 --- a/client/internal/accessstat/access_stat.go +++ b/client/internal/accessstat/access_stat.go @@ -1,40 +1,40 @@ package accessstat import ( - "fmt" "sync" "time" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/sync2" - stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage2/client/internal/db" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) type AccessStatEvent interface{} type AccessStat struct { cfg Config - stats []coormq.AddAccessStatEntry + stats []db.AddAccessStatEntry lock sync.Mutex + db *db.DB } -func NewAccessStat(cfg Config) *AccessStat { +func NewAccessStat(cfg Config, db *db.DB) *AccessStat { return &AccessStat{ cfg: cfg, + db: db, } } -func (p *AccessStat) AddAccessCounter(objID cdssdk.ObjectID, pkgID cdssdk.PackageID, stgID cdssdk.StorageID, value float64) { +func (p *AccessStat) AddAccessCounter(objID clitypes.ObjectID, pkgID clitypes.PackageID, spaceID clitypes.UserSpaceID, value float64) { p.lock.Lock() defer p.lock.Unlock() - p.stats = append(p.stats, coormq.AddAccessStatEntry{ - ObjectID: objID, - PackageID: pkgID, - StorageID: stgID, - Counter: value, + p.stats = append(p.stats, db.AddAccessStatEntry{ + ObjectID: objID, + PackageID: pkgID, + UserSpaceID: spaceID, + Counter: value, }) } @@ -42,12 +42,6 @@ func (p *AccessStat) Start() *sync2.UnboundChannel[AccessStatEvent] { ch := sync2.NewUnboundChannel[AccessStatEvent]() go func() { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - ch.Send(fmt.Errorf("new coordinator client: %w", err)) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - ticker := time.NewTicker(p.cfg.ReportInterval) for { <-ticker.C @@ -61,7 +55,7 @@ func (p *AccessStat) Start() *sync2.UnboundChannel[AccessStatEvent] { continue } - err := coorCli.AddAccessStat(coormq.ReqAddAccessStat(st)) + err := db.DoTx11(p.db, p.db.Package().BatchAddPackageAccessStat, st) if err != nil { logger.Errorf("add all package access stat counter: %v", err) diff --git a/client/internal/config/config.go b/client/internal/config/config.go index b69a910..36f4ed3 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -5,29 +5,29 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/utils/config" + "gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/internal/downloader" "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" - "gitlink.org.cn/cloudream/storage2/client/types" + stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" - db "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/config" agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type Config struct { - Local types.LocalMachineInfo `json:"local"` - AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` - Logger logger.Config `json:"logger"` - RabbitMQ mq.Config `json:"rabbitMQ"` - DistLock distlock.Config `json:"distlock"` - Connectivity connectivity.Config `json:"connectivity"` - Downloader downloader.Config `json:"downloader"` - DownloadStrategy strategy.Config `json:"downloadStrategy"` - StorageID cortypes.StorageID `json:"storageID"` // TODO 进行访问量统计时,当前客户端所属的存储ID。临时解决方案。 - AuthAccessKey string `json:"authAccessKey"` // TODO 临时办法 - AuthSecretKey string `json:"authSecretKey"` - MaxHTTPBodySize int64 `json:"maxHttpBodySize"` - DB db.Config `json:"db"` + Local stgglb.LocalMachineInfo `json:"local"` + AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` + Logger logger.Config `json:"logger"` + RabbitMQ mq.Config `json:"rabbitMQ"` + DistLock distlock.Config `json:"distlock"` + Connectivity connectivity.Config `json:"connectivity"` + Downloader downloader.Config `json:"downloader"` + DownloadStrategy strategy.Config `json:"downloadStrategy"` + StorageID cortypes.StorageID `json:"storageID"` // TODO 进行访问量统计时,当前客户端所属的存储ID。临时解决方案。 + AuthAccessKey string `json:"authAccessKey"` // TODO 临时办法 + AuthSecretKey string `json:"authSecretKey"` + MaxHTTPBodySize int64 `json:"maxHttpBodySize"` + DB db.Config `json:"db"` } var cfg Config diff --git a/client/internal/db/db.go b/client/internal/db/db.go index c9d6fbd..588f2d3 100644 --- a/client/internal/db/db.go +++ b/client/internal/db/db.go @@ -3,7 +3,6 @@ package db import ( _ "github.com/go-sql-driver/mysql" "github.com/sirupsen/logrus" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/config" "gorm.io/driver/mysql" "gorm.io/gorm" ) @@ -12,7 +11,7 @@ type DB struct { db *gorm.DB } -func NewDB(cfg *config.Config) (*DB, error) { +func NewDB(cfg *Config) (*DB, error) { mydb, err := gorm.Open(mysql.Open(cfg.MakeSourceString()), &gorm.Config{}) if err != nil { logrus.Fatalf("failed to connect to database: %v", err) @@ -29,6 +28,12 @@ func (db *DB) DoTx(do func(tx SQLContext) error) error { }) } +func DoTx11[T any](db *DB, do func(tx SQLContext, t T) error, t T) error { + return db.db.Transaction(func(tx *gorm.DB) error { + return do(SQLContext{tx}, t) + }) +} + func DoTx02[R any](db *DB, do func(tx SQLContext) (R, error)) (R, error) { var ret R err := db.db.Transaction(func(tx *gorm.DB) error { diff --git a/client/internal/db/object_access_stat.go b/client/internal/db/object_access_stat.go index 255fc0e..9ddcc5a 100644 --- a/client/internal/db/object_access_stat.go +++ b/client/internal/db/object_access_stat.go @@ -54,13 +54,6 @@ func (*ObjectAccessStatDB) BatchGetByObjectIDOnStorage(ctx SQLContext, objIDs [] return ret, err } -type AddAccessStatEntry struct { - ObjectID types.ObjectID `json:"objectID"` - PackageID types.PackageID `json:"packageID"` - UserSpaceID types.UserSpaceID `json:"userSpaceID"` - Counter float64 `json:"counter"` -} - func (*ObjectAccessStatDB) BatchAddCounter(ctx SQLContext, entries []AddAccessStatEntry) error { if len(entries) == 0 { return nil diff --git a/client/internal/db/package.go b/client/internal/db/package.go index 20f7a1f..eea249f 100644 --- a/client/internal/db/package.go +++ b/client/internal/db/package.go @@ -163,3 +163,50 @@ func (*PackageDB) Move(ctx SQLContext, packageID types.PackageID, newBktID types err := ctx.Table("Package").Where("PackageID = ?", packageID).Update("BucketID", newBktID).Update("Name", newName).Error return err } + +type AddAccessStatEntry struct { + ObjectID types.ObjectID `json:"objectID"` + PackageID types.PackageID `json:"packageID"` + UserSpaceID types.UserSpaceID `json:"userSpaceID"` + Counter float64 `json:"counter"` +} + +func (db *PackageDB) BatchAddPackageAccessStat(ctx SQLContext, entries []AddAccessStatEntry) error { + pkgIDs := make([]types.PackageID, len(entries)) + objIDs := make([]types.ObjectID, len(entries)) + for i, e := range entries { + pkgIDs[i] = e.PackageID + objIDs[i] = e.ObjectID + } + + avaiPkgIDs, err := db.Package().BatchTestPackageID(ctx, pkgIDs) + if err != nil { + return fmt.Errorf("batch test package id: %w", err) + } + + avaiObjIDs, err := db.Object().BatchTestObjectID(ctx, objIDs) + if err != nil { + return fmt.Errorf("batch test object id: %w", err) + } + + var willAdds []AddAccessStatEntry + for _, e := range entries { + if avaiPkgIDs[e.PackageID] && avaiObjIDs[e.ObjectID] { + willAdds = append(willAdds, e) + } + } + + if len(willAdds) > 0 { + err := db.PackageAccessStat().BatchAddCounter(ctx, willAdds) + if err != nil { + return fmt.Errorf("batch add package access stat counter: %w", err) + } + + err = db.ObjectAccessStat().BatchAddCounter(ctx, willAdds) + if err != nil { + return fmt.Errorf("batch add object access stat counter: %w", err) + } + } + + return nil +} diff --git a/client/internal/downloader/downloader.go b/client/internal/downloader/downloader.go index 433e6d6..19755e7 100644 --- a/client/internal/downloader/downloader.go +++ b/client/internal/downloader/downloader.go @@ -10,7 +10,7 @@ import ( "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" ) const ( @@ -40,12 +40,12 @@ type Downloader struct { strips *StripCache cfg Config conn *connectivity.Collector - stgAgts *agtpool.AgentPool + stgPool *pool.Pool selector *strategy.Selector db *db.DB } -func NewDownloader(cfg Config, conn *connectivity.Collector, stgAgts *agtpool.AgentPool, sel *strategy.Selector, db *db.DB) Downloader { +func NewDownloader(cfg Config, conn *connectivity.Collector, stgPool *pool.Pool, sel *strategy.Selector, db *db.DB) Downloader { if cfg.MaxStripCacheCount == 0 { cfg.MaxStripCacheCount = DefaultMaxStripCacheCount } @@ -55,7 +55,7 @@ func NewDownloader(cfg Config, conn *connectivity.Collector, stgAgts *agtpool.Ag strips: ch, cfg: cfg, conn: conn, - stgAgts: stgAgts, + stgPool: stgPool, selector: sel, db: db, } diff --git a/client/internal/downloader/iterator.go b/client/internal/downloader/iterator.go index df90cb1..5c428c5 100644 --- a/client/internal/downloader/iterator.go +++ b/client/internal/downloader/iterator.go @@ -12,8 +12,8 @@ import ( "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" - "gitlink.org.cn/cloudream/storage2/client/internal/publics" "gitlink.org.cn/cloudream/storage2/client/types" + stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" @@ -64,7 +64,7 @@ func (i *DownloadObjectIterator) MoveNext() (*Downloading, error) { Detail: *req.Detail, Range: math2.NewRange(req.Raw.Offset, req.Raw.Length), DestHub: destHub, - DestLocation: publics.Local.LocationID, + DestLocation: stgglb.Local.LocationID, }) if err != nil { return nil, fmt.Errorf("selecting download strategy: %w", err) @@ -132,7 +132,7 @@ func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strat } exeCtx := exec.NewExecContext() - exec.SetValueByType(exeCtx, i.downloader.stgAgts) + exec.SetValueByType(exeCtx, i.downloader.stgPool) exec := plans.Execute(exeCtx) go exec.Wait(context.TODO()) diff --git a/client/internal/downloader/lrc_strip_iterator.go b/client/internal/downloader/lrc_strip_iterator.go index 26cad4f..7822ed0 100644 --- a/client/internal/downloader/lrc_strip_iterator.go +++ b/client/internal/downloader/lrc_strip_iterator.go @@ -98,8 +98,8 @@ func (s *LRCStripIterator) Close() { func (s *LRCStripIterator) downloading() { var froms []ioswitchlrc.From for _, b := range s.blocks { - stg := b.Space - froms = append(froms, ioswitchlrc.NewFromStorage(b.Block.FileHash, *stg.MasterHub, stg.Storage, b.Block.Index)) + space := b.Space + froms = append(froms, ioswitchlrc.NewFromStorage(b.Block.FileHash, *space.MasterHub, space, b.Block.Index)) } toExec, hd := ioswitchlrc.NewToDriverWithRange(-1, math2.Range{ @@ -114,7 +114,7 @@ func (s *LRCStripIterator) downloading() { } exeCtx := exec.NewExecContext() - exec.SetValueByType(exeCtx, s.downloder.stgAgts) + exec.SetValueByType(exeCtx, s.downloder.stgPool) exec := plans.Execute(exeCtx) diff --git a/client/internal/downloader/strategy/selector.go b/client/internal/downloader/strategy/selector.go index d0964ac..0968f0b 100644 --- a/client/internal/downloader/strategy/selector.go +++ b/client/internal/downloader/strategy/selector.go @@ -10,9 +10,9 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/bitmap" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/sort2" + "gitlink.org.cn/cloudream/storage2/client/internal/metacache" "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/consts" - "gitlink.org.cn/cloudream/storage2/common/pkgs/metacache" cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) diff --git a/client/internal/downloader/strip_iterator.go b/client/internal/downloader/strip_iterator.go index 45aa6f7..b0c2d84 100644 --- a/client/internal/downloader/strip_iterator.go +++ b/client/internal/downloader/strip_iterator.go @@ -217,7 +217,7 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { } exeCtx := exec.NewExecContext() - exec.SetValueByType(exeCtx, s.downloader.stgAgts) + exec.SetValueByType(exeCtx, s.downloader.stgPool) exec := plans.Execute(exeCtx) ctx, cancel := context.WithCancel(context.Background()) diff --git a/client/internal/http/object.go b/client/internal/http/object.go index 4dbc3e9..40c7fa4 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -14,9 +14,9 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/storage2/client/internal/config" - cdsapi "gitlink.org.cn/cloudream/storage2/client/sdk/api" - cdssdk "gitlink.org.cn/cloudream/storage2/client/types" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader" + cliapi "gitlink.org.cn/cloudream/storage2/client/sdk/api" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) type ObjectService struct { @@ -32,7 +32,7 @@ func (s *Server) Object() *ObjectService { func (s *ObjectService) ListByPath(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.ListByPath") - var req cdsapi.ObjectListByPath + var req cliapi.ObjectListByPath if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -52,7 +52,7 @@ func (s *ObjectService) ListByPath(ctx *gin.Context) { func (s *ObjectService) ListByIDs(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.ListByIDs") - var req cdsapi.ObjectListByIDs + var req cliapi.ObjectListByIDs if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -66,11 +66,11 @@ func (s *ObjectService) ListByIDs(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.ObjectListByIDsResp{Objects: objs})) + ctx.JSON(http.StatusOK, OK(cliapi.ObjectListByIDsResp{Objects: objs})) } type ObjectUploadReq struct { - Info cdsapi.ObjectUploadInfo `form:"info" binding:"required"` + Info cliapi.ObjectUploadInfo `form:"info" binding:"required"` Files []*multipart.FileHeader `form:"files"` } @@ -125,18 +125,18 @@ func (s *ObjectService) Upload(ctx *gin.Context) { return } - uploadeds := make([]cdssdk.Object, len(pathes)) + uploadeds := make([]clitypes.Object, len(pathes)) for i := range pathes { uploadeds[i] = ret.Objects[pathes[i]] } - ctx.JSON(http.StatusOK, OK(cdsapi.ObjectUploadResp{Uploadeds: uploadeds})) + ctx.JSON(http.StatusOK, OK(cliapi.ObjectUploadResp{Uploadeds: uploadeds})) } func (s *ObjectService) Download(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.Download") - var req cdsapi.ObjectDownload + var req cliapi.ObjectDownload if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -184,14 +184,14 @@ func (s *ObjectService) Download(ctx *gin.Context) { func (s *ObjectService) DownloadByPath(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.DownloadByPath") - var req cdsapi.ObjectDownloadByPath + var req cliapi.ObjectDownloadByPath if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{ + resp, err := s.svc.ObjectSvc().GetByPath(cliapi.ObjectListByPath{ PackageID: req.PackageID, Path: req.Path, }) if err != nil { @@ -241,7 +241,7 @@ func (s *ObjectService) DownloadByPath(ctx *gin.Context) { func (s *ObjectService) UpdateInfo(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.UpdateInfo") - var req cdsapi.ObjectUpdateInfo + var req cliapi.ObjectUpdateInfo if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -255,20 +255,20 @@ func (s *ObjectService) UpdateInfo(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.ObjectUpdateInfoResp{Successes: sucs})) + ctx.JSON(http.StatusOK, OK(cliapi.ObjectUpdateInfoResp{Successes: sucs})) } func (s *ObjectService) UpdateInfoByPath(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.UpdateInfoByPath") - var req cdsapi.ObjectUpdateInfoByPath + var req cliapi.ObjectUpdateInfoByPath if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{ + resp, err := s.svc.ObjectSvc().GetByPath(cliapi.ObjectListByPath{ PackageID: req.PackageID, Path: req.Path, }) if err != nil { @@ -282,7 +282,7 @@ func (s *ObjectService) UpdateInfoByPath(ctx *gin.Context) { return } - sucs, err := s.svc.ObjectSvc().UpdateInfo([]cdsapi.UpdatingObject{{ + sucs, err := s.svc.ObjectSvc().UpdateInfo([]cliapi.UpdatingObject{{ ObjectID: resp.Objects[0].ObjectID, UpdateTime: req.UpdateTime, }}) @@ -294,13 +294,13 @@ func (s *ObjectService) UpdateInfoByPath(ctx *gin.Context) { if len(sucs) == 0 { } - ctx.JSON(http.StatusOK, OK(cdsapi.ObjectUpdateInfoByPathResp{})) + ctx.JSON(http.StatusOK, OK(cliapi.ObjectUpdateInfoByPathResp{})) } func (s *ObjectService) Move(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.Move") - var req cdsapi.ObjectMove + var req cliapi.ObjectMove if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -314,13 +314,13 @@ func (s *ObjectService) Move(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.ObjectMoveResp{Successes: sucs})) + ctx.JSON(http.StatusOK, OK(cliapi.ObjectMoveResp{Successes: sucs})) } func (s *ObjectService) Delete(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.Delete") - var req cdsapi.ObjectDelete + var req cliapi.ObjectDelete if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -340,14 +340,14 @@ func (s *ObjectService) Delete(ctx *gin.Context) { func (s *ObjectService) DeleteByPath(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.DeleteByPath") - var req cdsapi.ObjectDeleteByPath + var req cliapi.ObjectDeleteByPath if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - resp, err := s.svc.ObjectSvc().GetByPath(cdsapi.ObjectListByPath{ + resp, err := s.svc.ObjectSvc().GetByPath(cliapi.ObjectListByPath{ PackageID: req.PackageID, Path: req.Path, }) if err != nil { @@ -360,7 +360,7 @@ func (s *ObjectService) DeleteByPath(ctx *gin.Context) { return } - err = s.svc.ObjectSvc().Delete([]cdssdk.ObjectID{resp.Objects[0].ObjectID}) + err = s.svc.ObjectSvc().Delete([]clitypes.ObjectID{resp.Objects[0].ObjectID}) if err != nil { log.Warnf("deleting objects: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete objects failed")) @@ -373,7 +373,7 @@ func (s *ObjectService) DeleteByPath(ctx *gin.Context) { func (s *ObjectService) Clone(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.Clone") - var req cdsapi.ObjectClone + var req cliapi.ObjectClone if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -387,13 +387,13 @@ func (s *ObjectService) Clone(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.ObjectCloneResp{Objects: objs})) + ctx.JSON(http.StatusOK, OK(cliapi.ObjectCloneResp{Objects: objs})) } func (s *ObjectService) GetPackageObjects(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.GetPackageObjects") - var req cdsapi.ObjectGetPackageObjects + var req cliapi.ObjectGetPackageObjects if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -407,13 +407,13 @@ func (s *ObjectService) GetPackageObjects(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.ObjectGetPackageObjectsResp{Objects: objs})) + ctx.JSON(http.StatusOK, OK(cliapi.ObjectGetPackageObjectsResp{Objects: objs})) } func (s *ObjectService) NewMultipartUpload(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.NewMultipartUpload") - var req cdsapi.ObjectNewMultipartUpload + var req cliapi.ObjectNewMultipartUpload if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -427,11 +427,11 @@ func (s *ObjectService) NewMultipartUpload(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.ObjectNewMultipartUploadResp{Object: obj})) + ctx.JSON(http.StatusOK, OK(cliapi.ObjectNewMultipartUploadResp{Object: obj})) } type ObjectUploadPartReq struct { - Info cdsapi.ObjectUploadPartInfo `form:"info" binding:"required"` + Info cliapi.ObjectUploadPartInfo `form:"info" binding:"required"` File *multipart.FileHeader `form:"file"` } @@ -460,13 +460,13 @@ func (s *ObjectService) UploadPart(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.ObjectUploadPartResp{})) + ctx.JSON(http.StatusOK, OK(cliapi.ObjectUploadPartResp{})) } func (s *ObjectService) CompleteMultipartUpload(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.CompleteMultipartUpload") - var req cdsapi.ObjectCompleteMultipartUpload + var req cliapi.ObjectCompleteMultipartUpload if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -480,5 +480,5 @@ func (s *ObjectService) CompleteMultipartUpload(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(cdsapi.ObjectCompleteMultipartUploadResp{Object: obj})) + ctx.JSON(http.StatusOK, OK(cliapi.ObjectCompleteMultipartUploadResp{Object: obj})) } diff --git a/client/internal/mount/mount_win.go b/client/internal/mount/mount_win.go index ef24924..7e3a782 100644 --- a/client/internal/mount/mount_win.go +++ b/client/internal/mount/mount_win.go @@ -7,10 +7,10 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/sync2" + "gitlink.org.cn/cloudream/storage2/client/internal/db" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader" "gitlink.org.cn/cloudream/storage2/client/internal/mount/config" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" + "gitlink.org.cn/cloudream/storage2/client/internal/uploader" ) type MountEvent interface { @@ -29,7 +29,7 @@ type MountingFailedEvent struct { type Mount struct { } -func NewMount(cfg *config.Config, db *db2.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Mount { +func NewMount(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Mount { return &Mount{} } diff --git a/client/internal/mount/vfs/cache/cache.go b/client/internal/mount/vfs/cache/cache.go index 2b2df5a..8f75893 100644 --- a/client/internal/mount/vfs/cache/cache.go +++ b/client/internal/mount/vfs/cache/cache.go @@ -13,14 +13,14 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/trie" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/lo2" + db2 "gitlink.org.cn/cloudream/storage2/client/internal/db" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader" "gitlink.org.cn/cloudream/storage2/client/internal/mount/config" "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" + "gitlink.org.cn/cloudream/storage2/client/internal/uploader" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) type CacheEntry interface { @@ -161,7 +161,7 @@ func (c *Cache) CreateFile(pathComps []string) *CacheFile { } // 尝试加载缓存文件,如果文件不存在,则使用obj的信息创建一个新缓存文件,而如果obj为nil,那么会返回nil。 -func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile { +func (c *Cache) LoadFile(pathComps []string, obj *clitypes.Object) *CacheFile { c.lock.Lock() defer c.lock.Unlock() @@ -399,7 +399,7 @@ func (c *Cache) Move(pathComps []string, newPathComps []string) error { type uploadingPackage struct { bktName string pkgName string - pkg cdssdk.Package + pkg clitypes.Package upObjs []*uploadingObject } @@ -521,8 +521,7 @@ func (c *Cache) doUploading(pkgs []*uploadingPackage) { var sucPkgs []*uploadingPackage var failedPkgs []*uploadingPackage for _, pkg := range pkgs { - // TODO 用户ID - p, err := c.db.Package().GetUserPackageByName(c.db.DefCtx(), 1, pkg.bktName, pkg.pkgName) + p, err := c.db.Package().GetUserPackageByName(c.db.DefCtx(), pkg.bktName, pkg.pkgName) if err != nil { logger.Warnf("get user package %v/%v: %v", pkg.bktName, pkg.pkgName, err) failedPkgs = append(failedPkgs, pkg) @@ -551,7 +550,7 @@ func (c *Cache) doUploading(pkgs []*uploadingPackage) { /// 3. 开始上传每个Package for _, p := range sucPkgs { - uploader, err := c.uploader.BeginUpdate(1, p.pkg.PackageID, 0, nil, nil) + uploader, err := c.uploader.BeginUpdate(p.pkg.PackageID, 0, nil, nil) if err != nil { logger.Warnf("begin update package %v/%v: %v", p.bktName, p.pkgName, err) continue @@ -570,7 +569,7 @@ func (c *Cache) doUploading(pkgs []*uploadingPackage) { counter := io2.Counter(&rd) - err = uploader.Upload(cdssdk.JoinObjectPath(o.pathComps[2:]...), counter) + err = uploader.Upload(clitypes.JoinObjectPath(o.pathComps[2:]...), counter) if err != nil { logger.Warnf("upload object %v: %v", o.pathComps, err) upFailed++ @@ -596,8 +595,8 @@ func (c *Cache) doUploading(pkgs []*uploadingPackage) { continue } - oldPath := cdssdk.JoinObjectPath(o.pathComps[2:]...) - newPath := cdssdk.JoinObjectPath(o.cache.pathComps[2:]...) + oldPath := clitypes.JoinObjectPath(o.pathComps[2:]...) + newPath := clitypes.JoinObjectPath(o.cache.pathComps[2:]...) if o.isDeleted { uploader.CancelObject(oldPath) diff --git a/client/internal/mount/vfs/cache/file.go b/client/internal/mount/vfs/cache/file.go index 6274601..1a26476 100644 --- a/client/internal/mount/vfs/cache/file.go +++ b/client/internal/mount/vfs/cache/file.go @@ -8,12 +8,12 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) type FileInfo struct { @@ -81,7 +81,7 @@ type CacheFile struct { cache *Cache pathComps []string info FileInfo - remoteObj *cdssdk.Object + remoteObj *clitypes.Object rwLock *sync.RWMutex readers []*CacheFileHandle writers []*CacheFileHandle @@ -199,7 +199,7 @@ func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { return ch, nil } -func newCacheFileFromObject(cache *Cache, pathComps []string, obj *cdssdk.Object) (*CacheFile, error) { +func newCacheFileFromObject(cache *Cache, pathComps []string, obj *clitypes.Object) (*CacheFile, error) { metaPath := cache.GetCacheMetaPath(pathComps...) dataPath := cache.GetCacheDataPath(pathComps...) diff --git a/client/internal/mount/vfs/fuse_bucket.go b/client/internal/mount/vfs/fuse_bucket.go index 094cfb9..ee802ce 100644 --- a/client/internal/mount/vfs/fuse_bucket.go +++ b/client/internal/mount/vfs/fuse_bucket.go @@ -6,10 +6,10 @@ import ( "os" "time" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" "gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gorm.io/gorm" ) @@ -70,7 +70,7 @@ func (r *FuseBucket) Child(ctx context.Context, name string) (fuse.FsEntry, erro if ca == nil { // TODO UserID - pkg, err := r.vfs.db.Package().GetUserPackageByName(r.vfs.db.DefCtx(), 1, r.bktName, name) + pkg, err := r.vfs.db.Package().GetUserPackageByName(r.vfs.db.DefCtx(), r.bktName, name) if err == nil { dir := r.vfs.cache.LoadDir(childPathComps, &cache.CreateDirOption{ ModTime: pkg.CreateTime, @@ -119,7 +119,7 @@ func (r *FuseBucket) listChildren() ([]fuse.FsEntry, error) { return nil, err } - pkgMap := make(map[string]*cdssdk.Package) + pkgMap := make(map[string]*clitypes.Package) for _, pkg := range pkgs { p := pkg pkgMap[pkg.Name] = &p @@ -158,7 +158,7 @@ func (r *FuseBucket) NewDir(ctx context.Context, name string) (fuse.FsDir, error // TODO 用户ID,失败了可以打个日志 // TODO 生成系统事件 // 不关注创建是否成功,仅尝试一下 - r.vfs.db.DoTx(func(tx db2.SQLContext) error { + r.vfs.db.DoTx(func(tx db.SQLContext) error { db := r.vfs.db bkt, err := db.Bucket().GetByName(tx, r.bktName) if err != nil { @@ -192,10 +192,10 @@ func (r *FuseBucket) NewFile(ctx context.Context, name string, flags uint32) (fu func (r *FuseBucket) RemoveChild(ctx context.Context, name string) error { // TODO 生成系统事件 - return r.vfs.db.DoTx(func(tx db2.SQLContext) error { + return r.vfs.db.DoTx(func(tx db.SQLContext) error { d := r.vfs.db - pkg, err := d.Package().GetUserPackageByName(tx, 1, r.bktName, name) + pkg, err := d.Package().GetUserPackageByName(tx, r.bktName, name) if err == nil { has, err := d.Object().HasObjectWithPrefix(tx, pkg.PackageID, "") if err != nil { @@ -231,8 +231,8 @@ func (r *FuseBucket) MoveChild(ctx context.Context, oldName string, newName stri } d := r.vfs.db - return d.DoTx(func(tx db2.SQLContext) error { - _, err := d.Package().GetUserPackageByName(tx, 1, newParentPath[0], newName) + return d.DoTx(func(tx db.SQLContext) error { + _, err := d.Package().GetUserPackageByName(tx, newParentPath[0], newName) if err == nil { // 目标节点已经存在,不能重命名,直接退出 return fuse.ErrExists @@ -242,7 +242,7 @@ func (r *FuseBucket) MoveChild(ctx context.Context, oldName string, newName stri newBkt, err := d.Bucket().GetByName(tx, newParentPath[0]) if err == nil { - oldPkg, err := d.Package().GetUserPackageByName(tx, 1, r.bktName, oldName) + oldPkg, err := d.Package().GetUserPackageByName(tx, r.bktName, oldName) if err == nil { err = d.Package().Move(tx, oldPkg.PackageID, newBkt.BucketID, newName) if err != nil { diff --git a/client/internal/mount/vfs/fuse_dir.go b/client/internal/mount/vfs/fuse_dir.go index 2b9b78c..8848b5e 100644 --- a/client/internal/mount/vfs/fuse_dir.go +++ b/client/internal/mount/vfs/fuse_dir.go @@ -6,11 +6,11 @@ import ( "strings" "time" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" + db2 "gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" "gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gorm.io/gorm" ) @@ -72,12 +72,12 @@ func (r *FuseDir) Child(ctx context.Context, name string) (fuse.FsEntry, error) db := r.vfs.db err := db.DoTx(func(tx db2.SQLContext) error { - pkg, err := db.Package().GetUserPackageByName(tx, 1, r.pathComps[0], r.pathComps[1]) + pkg, err := db.Package().GetUserPackageByName(tx, r.pathComps[0], r.pathComps[1]) if err != nil { return err } - objPath := cdssdk.JoinObjectPath(childPathComps[2:]...) + objPath := clitypes.JoinObjectPath(childPathComps[2:]...) obj, err := db.Object().GetByPath(tx, pkg.PackageID, objPath) if err == nil { ret = newFileFromObject(r.vfs, childPathComps, obj) @@ -87,7 +87,7 @@ func (r *FuseDir) Child(ctx context.Context, name string) (fuse.FsEntry, error) return err } - has, err := db.Object().HasObjectWithPrefix(tx, pkg.PackageID, objPath+cdssdk.ObjectPathSeparator) + has, err := db.Object().HasObjectWithPrefix(tx, pkg.PackageID, objPath+clitypes.ObjectPathSeparator) if err != nil { return err } @@ -146,20 +146,20 @@ func (r *FuseDir) listChildren() ([]fuse.FsEntry, error) { db := r.vfs.db db.DoTx(func(tx db2.SQLContext) error { // TODO UserID - pkg, err := db.Package().GetUserPackageByName(tx, 1, r.pathComps[0], r.pathComps[1]) + pkg, err := db.Package().GetUserPackageByName(tx, r.pathComps[0], r.pathComps[1]) if err != nil { return err } - objPath := cdssdk.JoinObjectPath(r.pathComps[2:]...) + objPath := clitypes.JoinObjectPath(r.pathComps[2:]...) - objs, coms, err := db.Object().GetByPrefixGrouped(tx, pkg.PackageID, objPath+cdssdk.ObjectPathSeparator) + objs, coms, err := db.Object().GetByPrefixGrouped(tx, pkg.PackageID, objPath+clitypes.ObjectPathSeparator) if err != nil { return err } for _, dir := range coms { - dir = strings.TrimSuffix(dir, cdssdk.ObjectPathSeparator) - pathComps := lo2.AppendNew(r.pathComps, cdssdk.BaseName(dir)) + dir = strings.TrimSuffix(dir, clitypes.ObjectPathSeparator) + pathComps := lo2.AppendNew(r.pathComps, clitypes.BaseName(dir)) cd := r.vfs.cache.LoadDir(pathComps, &cache.CreateDirOption{ ModTime: time.Now(), @@ -172,7 +172,7 @@ func (r *FuseDir) listChildren() ([]fuse.FsEntry, error) { } for _, obj := range objs { - pathComps := lo2.AppendNew(r.pathComps, cdssdk.BaseName(obj.Path)) + pathComps := lo2.AppendNew(r.pathComps, clitypes.BaseName(obj.Path)) file := newFileFromObject(r.vfs, pathComps, obj) dbEntries[file.Name()] = file } @@ -222,14 +222,14 @@ func (r *FuseDir) NewFile(ctx context.Context, name string, flags uint32) (fuse. func (r *FuseDir) RemoveChild(ctx context.Context, name string) error { pathComps := lo2.AppendNew(r.pathComps, name) - joinedPath := cdssdk.JoinObjectPath(pathComps[2:]...) + joinedPath := clitypes.JoinObjectPath(pathComps[2:]...) d := r.vfs.db // TODO 生成系统事件 return r.vfs.db.DoTx(func(tx db2.SQLContext) error { - pkg, err := d.Package().GetUserPackageByName(tx, 1, pathComps[0], pathComps[1]) + pkg, err := d.Package().GetUserPackageByName(tx, pathComps[0], pathComps[1]) if err == nil { - has, err := d.Object().HasObjectWithPrefix(tx, pkg.PackageID, joinedPath+cdssdk.ObjectPathSeparator) + has, err := d.Object().HasObjectWithPrefix(tx, pkg.PackageID, joinedPath+clitypes.ObjectPathSeparator) if err != nil { return err } @@ -256,7 +256,7 @@ func (r *FuseDir) MoveChild(ctx context.Context, oldName string, newName string, newParentNode := newParent.(FuseNode) newParentPath := newParentNode.PathComps() newChildPath := lo2.AppendNew(newParentPath, newName) - newChildPathJoined := cdssdk.JoinObjectPath(newChildPath[2:]...) + newChildPathJoined := clitypes.JoinObjectPath(newChildPath[2:]...) // 不允许移动任何内容到Package层级以上 if len(newParentPath) < 2 { @@ -264,12 +264,12 @@ func (r *FuseDir) MoveChild(ctx context.Context, oldName string, newName string, } oldChildPath := lo2.AppendNew(r.PathComps(), oldName) - oldChildPathJoined := cdssdk.JoinObjectPath(oldChildPath[2:]...) + oldChildPathJoined := clitypes.JoinObjectPath(oldChildPath[2:]...) // 先更新远程,再更新本地,因为远程使用事务更新,可以回滚,而本地不行 d := r.vfs.db return r.vfs.db.DoTx(func(tx db2.SQLContext) error { - newPkg, err := d.Package().GetUserPackageByName(tx, 1, newParentPath[0], newParentPath[1]) + newPkg, err := d.Package().GetUserPackageByName(tx, newParentPath[0], newParentPath[1]) if err != nil { if err == gorm.ErrRecordNotFound { return fuse.ErrNotExists @@ -277,7 +277,7 @@ func (r *FuseDir) MoveChild(ctx context.Context, oldName string, newName string, return err } - oldPkg, err := d.Package().GetUserPackageByName(tx, 1, oldChildPath[0], oldChildPath[1]) + oldPkg, err := d.Package().GetUserPackageByName(tx, oldChildPath[0], oldChildPath[1]) if err != nil { if err == gorm.ErrRecordNotFound { return fuse.ErrNotExists @@ -291,7 +291,7 @@ func (r *FuseDir) MoveChild(ctx context.Context, oldName string, newName string, return fuse.ErrExists } - has, err := d.Object().HasObjectWithPrefix(tx, newPkg.PackageID, newChildPathJoined+cdssdk.ObjectPathSeparator) + has, err := d.Object().HasObjectWithPrefix(tx, newPkg.PackageID, newChildPathJoined+clitypes.ObjectPathSeparator) if err != nil { return err } @@ -305,15 +305,15 @@ func (r *FuseDir) MoveChild(ctx context.Context, oldName string, newName string, oldObj.PackageID = newPkg.PackageID oldObj.Path = newChildPathJoined - err = d.Object().BatchUpdate(tx, []cdssdk.Object{oldObj}) + err = d.Object().BatchUpdate(tx, []clitypes.Object{oldObj}) if err != nil { return err } } err = d.Object().MoveByPrefix(tx, - oldPkg.PackageID, oldChildPathJoined+cdssdk.ObjectPathSeparator, - newPkg.PackageID, newChildPathJoined+cdssdk.ObjectPathSeparator, + oldPkg.PackageID, oldChildPathJoined+clitypes.ObjectPathSeparator, + newPkg.PackageID, newChildPathJoined+clitypes.ObjectPathSeparator, ) if err != nil { return err @@ -327,12 +327,12 @@ func (r *FuseDir) loadCacheDir() *cache.CacheDir { var createOpt *cache.CreateDirOption err := r.vfs.db.DoTx(func(tx db2.SQLContext) error { - pkg, err := r.vfs.db.Package().GetUserPackageByName(tx, 1, r.pathComps[0], r.pathComps[1]) + pkg, err := r.vfs.db.Package().GetUserPackageByName(tx, r.pathComps[0], r.pathComps[1]) if err != nil { return err } - has, err := r.vfs.db.Object().HasObjectWithPrefix(tx, pkg.PackageID, cdssdk.JoinObjectPath(r.pathComps[2:]...)) + has, err := r.vfs.db.Object().HasObjectWithPrefix(tx, pkg.PackageID, clitypes.JoinObjectPath(r.pathComps[2:]...)) if err != nil { return err } diff --git a/client/internal/mount/vfs/fuse_file.go b/client/internal/mount/vfs/fuse_file.go index f1c61bb..7a8a236 100644 --- a/client/internal/mount/vfs/fuse_file.go +++ b/client/internal/mount/vfs/fuse_file.go @@ -4,9 +4,9 @@ import ( "os" "time" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" "gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gorm.io/gorm" ) @@ -28,7 +28,7 @@ func newFileFromCache(info cache.CacheEntryInfo, vfs *Vfs) *FuseFileNode { } } -func newFileFromObject(vfs *Vfs, pathComps []string, obj cdssdk.Object) *FuseFileNode { +func newFileFromObject(vfs *Vfs, pathComps []string, obj clitypes.Object) *FuseFileNode { return &FuseFileNode{ vfs: vfs, pathComps: pathComps, @@ -95,7 +95,7 @@ func (n *FuseFileNode) loadCacheFile() *cache.CacheFile { return n.vfs.cache.LoadFile(n.pathComps, nil) } - cdsObj, err := n.vfs.db.Object().GetByFullPath(n.vfs.db.DefCtx(), n.pathComps[0], n.pathComps[1], cdssdk.JoinObjectPath(n.pathComps[2:]...)) + cdsObj, err := n.vfs.db.Object().GetByFullPath(n.vfs.db.DefCtx(), n.pathComps[0], n.pathComps[1], clitypes.JoinObjectPath(n.pathComps[2:]...)) if err == nil { file := n.vfs.cache.LoadFile(n.pathComps, &cdsObj) if file == nil { diff --git a/client/internal/mount/vfs/fuse_package.go b/client/internal/mount/vfs/fuse_package.go index 1ab1a77..805bd54 100644 --- a/client/internal/mount/vfs/fuse_package.go +++ b/client/internal/mount/vfs/fuse_package.go @@ -6,11 +6,11 @@ import ( "strings" "time" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" + db2 "gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" "gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gorm.io/gorm" ) @@ -76,7 +76,7 @@ func (r *FusePackage) Child(ctx context.Context, name string) (fuse.FsEntry, err db := r.vfs.db err := db.DoTx(func(tx db2.SQLContext) error { - pkg, err := db.Package().GetUserPackageByName(tx, 1, r.bktName, r.pkgName) + pkg, err := db.Package().GetUserPackageByName(tx, r.bktName, r.pkgName) if err != nil { return err } @@ -90,7 +90,7 @@ func (r *FusePackage) Child(ctx context.Context, name string) (fuse.FsEntry, err return err } - has, err := db.Object().HasObjectWithPrefix(tx, pkg.PackageID, name+cdssdk.ObjectPathSeparator) + has, err := db.Object().HasObjectWithPrefix(tx, pkg.PackageID, name+clitypes.ObjectPathSeparator) if err != nil { return err } @@ -148,7 +148,7 @@ func (r *FusePackage) listChildren() ([]fuse.FsEntry, error) { db := r.vfs.db db.DoTx(func(tx db2.SQLContext) error { - pkg, err := db.Package().GetUserPackageByName(tx, 1, r.bktName, r.pkgName) + pkg, err := db.Package().GetUserPackageByName(tx, r.bktName, r.pkgName) if err != nil { return err } @@ -159,7 +159,7 @@ func (r *FusePackage) listChildren() ([]fuse.FsEntry, error) { } for _, dir := range coms { - dir = strings.TrimSuffix(dir, cdssdk.ObjectPathSeparator) + dir = strings.TrimSuffix(dir, clitypes.ObjectPathSeparator) pathComps := []string{r.bktName, r.pkgName, dir} cd := r.vfs.cache.LoadDir(pathComps, &cache.CreateDirOption{ ModTime: time.Now(), @@ -223,9 +223,9 @@ func (r *FusePackage) RemoveChild(ctx context.Context, name string) error { // TODO 生成系统事件 d := r.vfs.db return r.vfs.db.DoTx(func(tx db2.SQLContext) error { - pkg, err := d.Package().GetUserPackageByName(tx, 1, r.bktName, r.pkgName) + pkg, err := d.Package().GetUserPackageByName(tx, r.bktName, r.pkgName) if err == nil { - has, err := d.Object().HasObjectWithPrefix(tx, pkg.PackageID, name+cdssdk.ObjectPathSeparator) + has, err := d.Object().HasObjectWithPrefix(tx, pkg.PackageID, name+clitypes.ObjectPathSeparator) if err != nil { return err } @@ -252,7 +252,7 @@ func (r *FusePackage) MoveChild(ctx context.Context, oldName string, newName str newParentNode := newParent.(FuseNode) newParentPath := newParentNode.PathComps() newChildPath := lo2.AppendNew(newParentPath, newName) - newChildPathJoined := cdssdk.JoinObjectPath(newChildPath[2:]...) + newChildPathJoined := clitypes.JoinObjectPath(newChildPath[2:]...) // 不允许移动任何内容到Package层级以上 if len(newParentPath) < 2 { @@ -260,12 +260,12 @@ func (r *FusePackage) MoveChild(ctx context.Context, oldName string, newName str } oldChildPath := lo2.AppendNew(r.PathComps(), oldName) - oldChildPathJoined := cdssdk.JoinObjectPath(oldChildPath[2:]...) + oldChildPathJoined := clitypes.JoinObjectPath(oldChildPath[2:]...) // 先更新远程,再更新本地,因为远程使用事务更新,可以回滚,而本地不行 d := r.vfs.db return r.vfs.db.DoTx(func(tx db2.SQLContext) error { - newPkg, err := d.Package().GetUserPackageByName(tx, 1, newParentPath[0], newParentPath[1]) + newPkg, err := d.Package().GetUserPackageByName(tx, newParentPath[0], newParentPath[1]) if err != nil { if err == gorm.ErrRecordNotFound { return fuse.ErrNotExists @@ -273,7 +273,7 @@ func (r *FusePackage) MoveChild(ctx context.Context, oldName string, newName str return err } - oldPkg, err := d.Package().GetUserPackageByName(tx, 1, r.bktName, r.pkgName) + oldPkg, err := d.Package().GetUserPackageByName(tx, r.bktName, r.pkgName) if err != nil { if err == gorm.ErrRecordNotFound { return fuse.ErrNotExists @@ -287,7 +287,7 @@ func (r *FusePackage) MoveChild(ctx context.Context, oldName string, newName str return fuse.ErrExists } - has, err := d.Object().HasObjectWithPrefix(tx, newPkg.PackageID, newChildPathJoined+cdssdk.ObjectPathSeparator) + has, err := d.Object().HasObjectWithPrefix(tx, newPkg.PackageID, newChildPathJoined+clitypes.ObjectPathSeparator) if err != nil { return err } @@ -301,15 +301,15 @@ func (r *FusePackage) MoveChild(ctx context.Context, oldName string, newName str oldObj.PackageID = newPkg.PackageID oldObj.Path = newChildPathJoined - err = d.Object().BatchUpdate(tx, []cdssdk.Object{oldObj}) + err = d.Object().BatchUpdate(tx, []clitypes.Object{oldObj}) if err != nil { return err } } err = d.Object().MoveByPrefix(tx, - oldPkg.PackageID, oldChildPathJoined+cdssdk.ObjectPathSeparator, - newPkg.PackageID, newChildPathJoined+cdssdk.ObjectPathSeparator, + oldPkg.PackageID, oldChildPathJoined+clitypes.ObjectPathSeparator, + newPkg.PackageID, newChildPathJoined+clitypes.ObjectPathSeparator, ) if err != nil { return err @@ -321,7 +321,7 @@ func (r *FusePackage) MoveChild(ctx context.Context, oldName string, newName str func (r *FusePackage) loadCacheDir() *cache.CacheDir { var createOpt *cache.CreateDirOption - pkg, err := r.vfs.db.Package().GetUserPackageByName(r.vfs.db.DefCtx(), 1, r.bktName, r.pkgName) + pkg, err := r.vfs.db.Package().GetUserPackageByName(r.vfs.db.DefCtx(), r.bktName, r.pkgName) if err == nil { createOpt = &cache.CreateDirOption{ ModTime: pkg.CreateTime, diff --git a/client/internal/mount/vfs/fuse_root.go b/client/internal/mount/vfs/fuse_root.go index fe3ff5c..a1bf776 100644 --- a/client/internal/mount/vfs/fuse_root.go +++ b/client/internal/mount/vfs/fuse_root.go @@ -5,10 +5,10 @@ import ( "os" "time" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + db2 "gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" "gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" + clitypes "gitlink.org.cn/cloudream/storage2/client/types" "gorm.io/gorm" ) @@ -104,7 +104,7 @@ func (r *FuseRoot) listChildren() ([]fuse.FsEntry, error) { return nil, err } - bktMap := make(map[string]*cdssdk.Bucket) + bktMap := make(map[string]*clitypes.Bucket) for _, bkt := range bkts { b := bkt bktMap[bkt.Name] = &b @@ -145,7 +145,7 @@ func (r *FuseRoot) NewDir(ctx context.Context, name string) (fuse.FsDir, error) // TODO 用户ID,失败了可以打个日志 // TODO 生成系统事件 // 不关注创建是否成功,仅尝试一下 - r.vfs.db.Bucket().Create(r.vfs.db.DefCtx(), 1, name, cache.ModTime()) + r.vfs.db.Bucket().Create(r.vfs.db.DefCtx(), name, cache.ModTime()) return newBucketFromCache(cache.Info(), r.vfs), nil } diff --git a/client/internal/mount/vfs/vfs.go b/client/internal/mount/vfs/vfs.go index af0e19c..77f07c3 100644 --- a/client/internal/mount/vfs/vfs.go +++ b/client/internal/mount/vfs/vfs.go @@ -1,21 +1,21 @@ package vfs import ( + "gitlink.org.cn/cloudream/storage2/client/internal/db" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader" "gitlink.org.cn/cloudream/storage2/client/internal/mount/config" "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" "gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache" - "gitlink.org.cn/cloudream/storage2/common/pkgs/db2" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" + "gitlink.org.cn/cloudream/storage2/client/internal/uploader" ) type Vfs struct { - db *db2.DB + db *db.DB config *config.Config cache *cache.Cache } -func NewVfs(cfg *config.Config, db *db2.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Vfs { +func NewVfs(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Vfs { return &Vfs{ db: db, config: cfg, diff --git a/client/internal/publics/publics.go b/client/internal/publics/publics.go deleted file mode 100644 index bdef69b..0000000 --- a/client/internal/publics/publics.go +++ /dev/null @@ -1,9 +0,0 @@ -package publics - -import "gitlink.org.cn/cloudream/storage2/client/types" - -var Local types.LocalMachineInfo - -func Init(local types.LocalMachineInfo) { - Local = local -} diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 4f64bfa..229ef93 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -11,11 +11,10 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/storage2/client/internal/db" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader" "gitlink.org.cn/cloudream/storage2/client/sdk/api" "gitlink.org.cn/cloudream/storage2/client/types" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" "gitlink.org.cn/cloudream/storage2/common/models/datamap" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/plans" "gorm.io/gorm" @@ -681,81 +680,16 @@ func (svc *ObjectService) NewMultipartUploadObject(packageID types.PackageID, pa return obj, nil } -func (svc *Service) AddMultipartUploadPart(objectID types.ObjectID, block types.ObjectBlock) error { - err := svc.DB.DoTx(func(tx db.SQLContext) error { - obj, err := svc.DB.Object().GetByID(tx, objectID) - if err != nil { - return fmt.Errorf("getting object by id: %w", err) - } - - _, ok := obj.Redundancy.(*types.MultipartUploadRedundancy) - if !ok { - return fmt.Errorf("object is not a multipart upload object") - } - - blks, err := svc.DB.ObjectBlock().BatchGetByObjectID(tx, []types.ObjectID{obj.ObjectID}) - if err != nil { - return fmt.Errorf("batch getting object blocks: %w", err) - } - - blks = lo.Reject(blks, func(blk types.ObjectBlock, idx int) bool { return blk.Index == block.Index }) - blks = append(blks, block) - - blks = sort2.Sort(blks, func(a, b types.ObjectBlock) int { return a.Index - b.Index }) - - totalSize := int64(0) - var hashes [][]byte - for _, blk := range blks { - totalSize += blk.Size - hashes = append(hashes, blk.FileHash.GetHashBytes()) - } - - newObjHash := types.CalculateCompositeHash(hashes) - obj.Size = totalSize - obj.FileHash = newObjHash - obj.UpdateTime = time.Now() - - err = svc.DB.ObjectBlock().DeleteByObjectIDIndex(tx, objectID, block.Index) - if err != nil { - return fmt.Errorf("delete object block: %w", err) - } - - err = svc.DB.ObjectBlock().Create(tx, objectID, block.Index, block.UserSpaceID, block.FileHash, block.Size) - if err != nil { - return fmt.Errorf("create object block: %w", err) - } - - err = svc.DB.Object().BatchUpdate(tx, []types.Object{obj}) - if err != nil { - return fmt.Errorf("update object: %w", err) - } - - return nil - }) - if err != nil { - logger.Warnf("add multipart upload part: %s", err.Error()) - return err - } - - return nil -} - func (svc *ObjectService) CompleteMultipartUpload(objectID types.ObjectID, indexes []int) (types.Object, error) { if len(indexes) == 0 { return types.Object{}, fmt.Errorf("no block indexes specified") } - details, err := svc.ObjectSvc().GetObjectDetails([]types.ObjectID{objectID}) + objDe, err := db.DoTx12(svc.DB, svc.DB.Object().GetDetail, objectID) if err != nil { return types.Object{}, err } - if details[0] == nil { - return types.Object{}, fmt.Errorf("object %v not found", objectID) - } - - objDe := details[0] - _, ok := objDe.Object.Redundancy.(*types.MultipartUploadRedundancy) if !ok { return types.Object{}, fmt.Errorf("object %v is not a multipart upload", objectID) @@ -771,28 +705,28 @@ func (svc *ObjectService) CompleteMultipartUpload(objectID types.ObjectID, index } var compBlks []types.ObjectBlock - var compBlkStgs []stgmod.StorageDetail - var targetStg stgmod.StorageDetail + var compBlkSpaces []types.UserSpaceDetail + var targetSpace types.UserSpaceDetail for i, idx := range indexes { blk, ok := objBlkMap[idx] if !ok { return types.Object{}, fmt.Errorf("block %d not found in object %v", idx, objectID) } - stg := svc.StorageMeta.Get(blk.UserSpaceID) + stg := svc.UserSpaceMeta.Get(blk.UserSpaceID) if stg == nil { return types.Object{}, fmt.Errorf("storage of user space %d not found", blk.UserSpaceID) } compBlks = append(compBlks, blk) - compBlkStgs = append(compBlkStgs, *stg) + compBlkSpaces = append(compBlkSpaces, *stg) if i == 0 { - targetStg = *stg + targetSpace = *stg } } bld := exec.NewPlanBuilder() - err = plans.CompleteMultipart(compBlks, compBlkStgs, targetStg, "shard", bld) + err = plans.CompleteMultipart(compBlks, compBlkSpaces, targetSpace, "shard", bld) if err != nil { return types.Object{}, err } @@ -813,7 +747,7 @@ func (svc *ObjectService) CompleteMultipartUpload(objectID types.ObjectID, index Blocks: []types.ObjectBlock{{ ObjectID: objectID, Index: 0, - UserSpaceID: targetStg.Storage.StorageID, + UserSpaceID: targetSpace.UserSpace.UserSpaceID, FileHash: shardInfo.Hash, Size: shardInfo.Size, }}, diff --git a/client/internal/services/package.go b/client/internal/services/package.go index 4f3055f..10b1c0c 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -6,9 +6,9 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage2/client/internal/db" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader" "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/models/datamap" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" ) // PackageService 提供对包相关操作的服务接口 diff --git a/client/internal/services/service.go b/client/internal/services/service.go index bcff86f..e398e00 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -1,17 +1,15 @@ -// services 包提供了服务层的封装,主要负责协调分布锁和任务管理器之间的交互。 - package services import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage2/client/internal/accessstat" "gitlink.org.cn/cloudream/storage2/client/internal/db" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" + "gitlink.org.cn/cloudream/storage2/client/internal/metacache" "gitlink.org.cn/cloudream/storage2/client/internal/task" - "gitlink.org.cn/cloudream/storage2/common/pkgs/accessstat" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" - "gitlink.org.cn/cloudream/storage2/common/pkgs/metacache" + "gitlink.org.cn/cloudream/storage2/client/internal/uploader" "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" - "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" ) // Service 结构体封装了分布锁服务和任务管理服务。 @@ -22,7 +20,7 @@ type Service struct { AccessStat *accessstat.AccessStat Uploader *uploader.Uploader StrategySelector *strategy.Selector - StorageMeta *metacache.StorageMeta + UserSpaceMeta *metacache.UserSpaceMeta DB *db.DB evtPub *sysevent.Publisher } @@ -34,7 +32,7 @@ func NewService( accStat *accessstat.AccessStat, uploder *uploader.Uploader, strategySelector *strategy.Selector, - storageMeta *metacache.StorageMeta, + userSpaceMeta *metacache.UserSpaceMeta, db *db.DB, evtPub *sysevent.Publisher, ) (*Service, error) { @@ -45,7 +43,7 @@ func NewService( AccessStat: accStat, Uploader: uploder, StrategySelector: strategySelector, - StorageMeta: storageMeta, + UserSpaceMeta: userSpaceMeta, DB: db, evtPub: evtPub, }, nil diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index fd5056f..cf0517e 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -8,11 +8,9 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/storage2/client/types" + "gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy" "gitlink.org.cn/cloudream/storage2/client/types" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/agent" @@ -36,10 +34,6 @@ func (svc *UserSpaceService) GetByName(name string) (types.UserSpace, error) { return svc.DB.UserSpace().GetByName(svc.DB.DefCtx(), name) } -func (svc *UserSpaceService) GetDetails(stgIDs []cdssdk.UserSpaceID) ([]*stgmod.UserSpaceDetail, error) { - -} - func (svc *UserSpaceService) LoadPackage(packageID cdssdk.PackageID, userspaceID cdssdk.UserSpaceID, rootPath string) error { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -98,23 +92,23 @@ func (svc *UserSpaceService) LoadPackage(packageID cdssdk.PackageID, userspaceID } } - mutex, err := reqbuilder.NewBuilder(). - // 保护在userspace目录中下载的文件 - UserSpace().Buzy(userspaceID). - // 保护下载文件时同时保存到IPFS的文件 - Shard().Buzy(userspaceID). - MutexLock(svc.DistLock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } + // TODO2 加锁 + // mutex, err := reqbuilder.NewBuilder(). + // // 保护在userspace目录中下载的文件 + // UserSpace().Buzy(userspaceID). + // // 保护下载文件时同时保存到IPFS的文件 + // Shard().Buzy(userspaceID). + // MutexLock(svc.DistLock) + // if err != nil { + // return fmt.Errorf("acquire locks failed, err: %w", err) + // } + // defer mutex.Unlock() // 记录访问统计 for _, obj := range details.Objects { svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, packageID, userspaceID, 1) } - defer mutex.Unlock() - drv := plans.Execute(exec.NewExecContext()) _, err = drv.Wait(context.Background()) if err != nil { @@ -128,22 +122,27 @@ func (svc *UserSpaceService) LoadPackage(packageID cdssdk.PackageID, userspaceID // 请求节点启动从UserSpace中上传文件的任务。会返回节点ID和任务ID func (svc *UserSpaceService) UserSpaceCreatePackage(bucketID cdssdk.BucketID, name string, userspaceID cdssdk.UserSpaceID, path string, userspaceAffinity cdssdk.UserSpaceID) (cdssdk.Package, error) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return cdssdk.Package{}, fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - stgResp, err := coorCli.GetUserSpaceDetails(coormq.ReqGetUserSpaceDetails([]cdssdk.UserSpaceID{userspaceID})) - if err != nil { - return cdssdk.Package{}, fmt.Errorf("getting userspace info: %w", err) + // coorCli, err := stgglb.CoordinatorMQPool.Acquire() + // if err != nil { + // return cdssdk.Package{}, fmt.Errorf("new coordinator client: %w", err) + // } + // defer stgglb.CoordinatorMQPool.Release(coorCli) + + // stgResp, err := coorCli.GetUserSpaceDetails(coormq.ReqGetUserSpaceDetails([]cdssdk.UserSpaceID{userspaceID})) + // if err != nil { + // return cdssdk.Package{}, fmt.Errorf("getting userspace info: %w", err) + // } + + spaceDetail := svc.UserSpaceMeta.Get(userspaceID) + if spaceDetail == nil { + return cdssdk.Package{}, fmt.Errorf("userspace not found: %d", userspaceID) } - if stgResp.UserSpaces[0].UserSpace.ShardStore == nil { + if spaceDetail.UserSpace.ShardStore == nil { return cdssdk.Package{}, fmt.Errorf("shard userspace is not enabled") } - agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.UserSpaces[0].MasterHub.HubID) + agentCli, err := stgglb.AgentMQPool.Acquire(spaceDetail.MasterHub.HubID) if err != nil { return cdssdk.Package{}, fmt.Errorf("new agent client: %w", err) } diff --git a/client/internal/task/task.go b/client/internal/task/task.go deleted file mode 100644 index b3294f6..0000000 --- a/client/internal/task/task.go +++ /dev/null @@ -1,40 +0,0 @@ -package task - -import ( - "gitlink.org.cn/cloudream/common/pkgs/distlock" // 引入分布式锁服务 - "gitlink.org.cn/cloudream/common/pkgs/task" // 引入任务处理相关的包 - "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" // 引入网络连接状态收集器 - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" -) - -// TaskContext 定义了任务执行的上下文环境,包含分布式锁服务和网络连接状态收集器 -type TaskContext struct { - distlock *distlock.Service - connectivity *connectivity.Collector - stgAgts *agtpool.AgentPool -} - -// CompleteFn 类型定义了任务完成时的回调函数,用于设置任务的执行结果 -type CompleteFn = task.CompleteFn - -// Manager 类型定义了任务管理器,用于创建、管理和调度任务 -type Manager = task.Manager[TaskContext] - -// TaskBody 类型定义了任务的主体部分,包含了任务实际执行的逻辑 -type TaskBody = task.TaskBody[TaskContext] - -// Task 类型定义了具体的任务,包括任务的上下文、主体和完成选项 -type Task = task.Task[TaskContext] - -// CompleteOption 类型定义了任务完成时的选项,可用于定制任务完成的处理方式 -type CompleteOption = task.CompleteOption - -// NewManager 创建一个新的任务管理器实例,接受一个分布式锁服务和一个网络连接状态收集器作为参数 -// 返回一个初始化好的任务管理器实例 -func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, stgAgts *agtpool.AgentPool) Manager { - return task.NewManager(TaskContext{ - distlock: distlock, - connectivity: connectivity, - stgAgts: stgAgts, - }) -} diff --git a/client/internal/uploader/create_load.go b/client/internal/uploader/create_load.go index e7cf197..f92515a 100644 --- a/client/internal/uploader/create_load.go +++ b/client/internal/uploader/create_load.go @@ -53,7 +53,7 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err } exeCtx := exec.NewExecContext() - exec.SetValueByType(exeCtx, u.uploader.stgAgts) + exec.SetValueByType(exeCtx, u.uploader.stgPool) exec := plans.Execute(exeCtx) exec.BeginWrite(io.NopCloser(stream), hd) ret, err := exec.Wait(context.TODO()) diff --git a/client/internal/uploader/update.go b/client/internal/uploader/update.go index 6986817..956b902 100644 --- a/client/internal/uploader/update.go +++ b/client/internal/uploader/update.go @@ -60,7 +60,7 @@ func (w *UpdateUploader) Upload(pat string, stream io.Reader) error { } exeCtx := exec.NewExecContext() - exec.SetValueByType(exeCtx, w.uploader.stgAgts) + exec.SetValueByType(exeCtx, w.uploader.stgPool) exec := plans.Execute(exeCtx) exec.BeginWrite(io.NopCloser(stream), hd) ret, err := exec.Wait(context.TODO()) diff --git a/client/internal/uploader/uploader.go b/client/internal/uploader/uploader.go index d3a5c2f..5da253b 100644 --- a/client/internal/uploader/uploader.go +++ b/client/internal/uploader/uploader.go @@ -14,29 +14,29 @@ import ( "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/internal/metacache" - "gitlink.org.cn/cloudream/storage2/client/internal/publics" "gitlink.org.cn/cloudream/storage2/client/types" + stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" ) type Uploader struct { distlock *distlock.Service connectivity *connectivity.Collector - stgAgts *agtpool.AgentPool + stgPool *pool.Pool spaceMeta *metacache.UserSpaceMeta db *db.DB } -func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgAgts *agtpool.AgentPool, spaceMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader { +func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgPool *pool.Pool, spaceMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader { return &Uploader{ distlock: distlock, connectivity: connectivity, - stgAgts: stgAgts, + stgPool: stgPool, spaceMeta: spaceMeta, } } @@ -65,9 +65,9 @@ func (u *Uploader) BeginUpdate(pkgID types.PackageID, affinity types.UserSpaceID } uploadSpaces = append(uploadSpaces, UploadSpaceInfo{ - Space: space, + Space: *space, Delay: latency, - IsSameLocation: space.MasterHub.LocationID == publics.Local.LocationID, + IsSameLocation: space.MasterHub.LocationID == stgglb.Local.LocationID, }) } @@ -217,7 +217,7 @@ func (u *Uploader) UploadPart(objID types.ObjectID, index int, stream io.Reader) userStgs = append(userStgs, UploadSpaceInfo{ Space: *space, Delay: delay, - IsSameLocation: space.MasterHub.LocationID == publics.Local.LocationID, + IsSameLocation: space.MasterHub.LocationID == stgglb.Local.LocationID, }) } @@ -247,7 +247,7 @@ func (u *Uploader) UploadPart(objID types.ObjectID, index int, stream io.Reader) } exeCtx := exec.NewExecContext() - exec.SetValueByType(exeCtx, u.stgAgts) + exec.SetValueByType(exeCtx, u.stgPool) exec := plans.Execute(exeCtx) exec.BeginWrite(io.NopCloser(stream), hd) ret, err := exec.Wait(context.TODO()) diff --git a/client/types/types.go b/client/types/types.go index f6b3ee5..337ca69 100644 --- a/client/types/types.go +++ b/client/types/types.go @@ -1,6 +1,7 @@ package types import ( + "fmt" "time" "github.com/samber/lo" @@ -71,14 +72,26 @@ func (ObjectBlock) TableName() string { type UserSpace struct { UserSpaceID UserSpaceID `gorm:"column:UserSpaceID; primaryKey; type:bigint" json:"userSpaceID"` + // 用户空间名称 + Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"` // 用户空间所在的存储节点 StorageID cotypes.StorageID `gorm:"column:StorageID; type:bigint; not null" json:"storageID"` + // 用户在指定存储节点的凭证信息,比如用户账户,AK/SK等 + Credential cotypes.StorageCredential `gorm:"column:Credential; type:json; not null; serializer:union" json:"credential"` + // 用户空间的分片存储配置,如果为空,则表示不使用分片存储 + ShardStore *cotypes.ShardStoreUserConfig `gorm:"column:ShardStore; type:json;" json:"shardStore"` + // 用户空间信息的版本号,每一次更改都需要更新版本号 + Revision int64 `gorm:"column:Revision; type:bigint; not null" json:"revision"` } func (UserSpace) TableName() string { return "UserSpace" } +func (s UserSpace) String() string { + return fmt.Sprintf("%v[id=%v,storageID=%v,rev=%v]", s.Name, s.UserSpaceID, s.StorageID, s.Revision) +} + type PackageAccessStat struct { PackageID PackageID `gorm:"column:PackageID; primaryKey; type:bigint" json:"packageID"` // 发起读取(调度)的用户空间ID @@ -199,3 +212,14 @@ func (o *ObjectDetail) GroupBlocks() []GrouppedObjectBlock { return sort2.Sort(lo.Values(grps), func(l, r GrouppedObjectBlock) int { return l.Index - r.Index }) } + +type UserSpaceDetail struct { + UserID cotypes.UserID + UserSpace UserSpace + Storage cotypes.Storage + MasterHub *cotypes.Hub +} + +func (d UserSpaceDetail) String() string { + return d.UserSpace.String() +} diff --git a/client/types/utils.go b/client/types/utils.go new file mode 100644 index 0000000..408bbe0 --- /dev/null +++ b/client/types/utils.go @@ -0,0 +1,18 @@ +package types + +import ( + "strings" +) + +func JoinObjectPath(comps ...string) string { + return strings.Join(comps, ObjectPathSeparator) +} + +func SplitObjectPath(pat string) []string { + return strings.Split(pat, ObjectPathSeparator) +} + +func BaseName(pat string) string { + idx := strings.LastIndex(pat, ObjectPathSeparator) + return pat[idx+1:] +} diff --git a/common/globals/globals.go b/common/globals/globals.go index 291be35..3398b69 100644 --- a/common/globals/globals.go +++ b/common/globals/globals.go @@ -1,15 +1,21 @@ package stgglb import ( - stgmodels "gitlink.org.cn/cloudream/storage2/common/models" + "gitlink.org.cn/cloudream/storage2/coordinator/types" ) -var Local *stgmodels.LocalMachineInfo +type LocalMachineInfo struct { + ExternalIP string `json:"externalIP"` + LocalIP string `json:"localIP"` + LocationID types.LocationID `json:"locationID"` +} + +var Local *LocalMachineInfo // InitLocal // // @Description: 初始化本地机器信息 // @param info -func InitLocal(info *stgmodels.LocalMachineInfo) { - Local = info +func InitLocal(info LocalMachineInfo) { + Local = &info } diff --git a/common/globals/utils.go b/common/globals/utils.go index 042a61d..5797b33 100644 --- a/common/globals/utils.go +++ b/common/globals/utils.go @@ -1,9 +1,9 @@ package stgglb -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +import cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" // 根据当前节点与目标地址的距离关系,选择合适的地址 -func SelectGRPCAddress(hub cdssdk.Hub, addr cdssdk.GRPCAddressInfo) (string, int) { +func SelectGRPCAddress(hub cortypes.Hub, addr cortypes.GRPCAddressInfo) (string, int) { if Local != nil && Local.LocationID == hub.LocationID { return addr.LocalIP, addr.LocalGRPCPort } diff --git a/common/models/datamap/datamap.go b/common/models/datamap/datamap.go index dfa5877..0d5732e 100644 --- a/common/models/datamap/datamap.go +++ b/common/models/datamap/datamap.go @@ -28,7 +28,6 @@ type SysEventSource interface { var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[SysEventSource]( (*SourceCoordinator)(nil), - (*SourceScanner)(nil), (*SourceHub)(nil), )), "type") @@ -49,23 +48,6 @@ func (s *SourceCoordinator) String() string { return "Coordinator" } -type SourceScanner struct { - serder.Metadata `union:"Scanner"` - Type string `json:"type"` -} - -func (s *SourceScanner) GetSourceType() string { - return "Scanner" -} - -func (s *SourceScanner) OnUnionSerializing() { - s.Type = s.GetSourceType() -} - -func (s *SourceScanner) String() string { - return "Scanner" -} - type SourceHub struct { serder.Metadata `union:"Hub"` Type string `json:"type"` diff --git a/common/pkgs/accessstat/access_stat.go b/common/pkgs/accessstat/access_stat.go deleted file mode 100644 index b7d1982..0000000 --- a/common/pkgs/accessstat/access_stat.go +++ /dev/null @@ -1,76 +0,0 @@ -package accessstat - -import ( - "fmt" - "sync" - "time" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/sync2" - stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" -) - -type AccessStatEvent interface{} - -type AccessStat struct { - cfg Config - stats []coormq.AddAccessStatEntry - lock sync.Mutex -} - -func NewAccessStat(cfg Config) *AccessStat { - return &AccessStat{ - cfg: cfg, - } -} - -func (p *AccessStat) AddAccessCounter(objID cdssdk.ObjectID, pkgID cdssdk.PackageID, stgID cdssdk.StorageID, value float64) { - p.lock.Lock() - defer p.lock.Unlock() - - p.stats = append(p.stats, coormq.AddAccessStatEntry{ - ObjectID: objID, - PackageID: pkgID, - StorageID: stgID, - Counter: value, - }) -} - -func (p *AccessStat) Start() *sync2.UnboundChannel[AccessStatEvent] { - ch := sync2.NewUnboundChannel[AccessStatEvent]() - - go func() { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - ch.Send(fmt.Errorf("new coordinator client: %w", err)) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - ticker := time.NewTicker(p.cfg.ReportInterval) - for { - <-ticker.C - - p.lock.Lock() - st := p.stats - p.stats = nil - p.lock.Unlock() - - if len(st) == 0 { - continue - } - - err := coorCli.AddAccessStat(coormq.ReqAddAccessStat(st)) - if err != nil { - logger.Errorf("add all package access stat counter: %v", err) - - p.lock.Lock() - p.stats = append(p.stats, st...) - p.lock.Unlock() - continue - } - } - }() - return ch -} diff --git a/common/pkgs/accessstat/config.go b/common/pkgs/accessstat/config.go deleted file mode 100644 index b67e8e6..0000000 --- a/common/pkgs/accessstat/config.go +++ /dev/null @@ -1,7 +0,0 @@ -package accessstat - -import "time" - -type Config struct { - ReportInterval time.Duration -} diff --git a/common/pkgs/connectivity/collector.go b/common/pkgs/connectivity/collector.go index 819e7fe..ac33363 100644 --- a/common/pkgs/connectivity/collector.go +++ b/common/pkgs/connectivity/collector.go @@ -6,13 +6,13 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) type Connectivity struct { - ToHubID cdssdk.HubID + ToHubID cortypes.HubID Latency *time.Duration TestTime time.Time } @@ -22,7 +22,7 @@ type Collector struct { onCollected func(collector *Collector) collectNow chan any close chan any - connectivities map[cdssdk.HubID]Connectivity + connectivities map[cortypes.HubID]Connectivity lock *sync.RWMutex } @@ -31,7 +31,7 @@ func NewCollector(cfg *Config, onCollected func(collector *Collector)) Collector cfg: cfg, collectNow: make(chan any), close: make(chan any), - connectivities: make(map[cdssdk.HubID]Connectivity), + connectivities: make(map[cortypes.HubID]Connectivity), lock: &sync.RWMutex{}, onCollected: onCollected, } @@ -39,7 +39,7 @@ func NewCollector(cfg *Config, onCollected func(collector *Collector)) Collector return rpt } -func NewCollectorWithInitData(cfg *Config, onCollected func(collector *Collector), initData map[cdssdk.HubID]Connectivity) Collector { +func NewCollectorWithInitData(cfg *Config, onCollected func(collector *Collector), initData map[cortypes.HubID]Connectivity) Collector { rpt := Collector{ cfg: cfg, collectNow: make(chan any), @@ -52,11 +52,11 @@ func NewCollectorWithInitData(cfg *Config, onCollected func(collector *Collector return rpt } -func (r *Collector) GetAll() map[cdssdk.HubID]Connectivity { +func (r *Collector) GetAll() map[cortypes.HubID]Connectivity { r.lock.RLock() defer r.lock.RUnlock() - ret := make(map[cdssdk.HubID]Connectivity) + ret := make(map[cortypes.HubID]Connectivity) for k, v := range r.connectivities { ret[k] = v } @@ -147,7 +147,7 @@ func (r *Collector) testing() { r.lock.Lock() // 删除所有hub的记录,然后重建,避免hub数量变化时导致残余数据 - r.connectivities = make(map[cdssdk.HubID]Connectivity) + r.connectivities = make(map[cortypes.HubID]Connectivity) for _, con := range cons { r.connectivities[con.ToHubID] = con } @@ -158,13 +158,13 @@ func (r *Collector) testing() { } } -func (r *Collector) ping(hub cdssdk.Hub) Connectivity { +func (r *Collector) ping(hub cortypes.Hub) Connectivity { log := logger.WithType[Collector]("").WithField("HubID", hub.HubID) var ip string var port int switch addr := hub.Address.(type) { - case *cdssdk.GRPCAddressInfo: + case *cortypes.GRPCAddressInfo: if hub.LocationID == stgglb.Local.LocationID { ip = addr.LocalIP port = addr.LocalGRPCPort diff --git a/common/pkgs/ioswitch2/ops2/s2s.go b/common/pkgs/ioswitch2/ops2/s2s.go index 7747e19..6d1864b 100644 --- a/common/pkgs/ioswitch2/ops2/s2s.go +++ b/common/pkgs/ioswitch2/ops2/s2s.go @@ -39,7 +39,7 @@ func (o *S2STransfer) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } // 传输文件 - dstPath, err := s2s.Transfer(ctx.Context, o.Src, srcPath.Path) + dstPath, err := s2s.Transfer(ctx.Context, &o.Src, srcPath.Path) if err != nil { return err } diff --git a/common/pkgs/ioswitch2/parser/opt/ec.go b/common/pkgs/ioswitch2/parser/opt/ec.go index 5768448..60569f0 100644 --- a/common/pkgs/ioswitch2/parser/opt/ec.go +++ b/common/pkgs/ioswitch2/parser/opt/ec.go @@ -88,7 +88,7 @@ func UseECMultiplier(ctx *state.GenerateState) { return true } - if !factory.GetBuilder(&srNode.From.Space).FeatureDesc().HasBypassHTTPRead() { + if !factory.GetBuilder(&srNode.From.Space).FeatureDesc().HasBypassHTTPRead { return true } diff --git a/common/pkgs/ioswitch2/parser/opt/s2s.go b/common/pkgs/ioswitch2/parser/opt/s2s.go index aa838a7..686e87d 100644 --- a/common/pkgs/ioswitch2/parser/opt/s2s.go +++ b/common/pkgs/ioswitch2/parser/opt/s2s.go @@ -21,7 +21,7 @@ func UseS2STransfer(ctx *state.GenerateState) { } fromStgBld := factory.GetBuilder(&fromShard.Space) - if !fromStgBld.FeatureDesc().HasBypassRead() { + if !fromStgBld.FeatureDesc().HasBypassRead { continue } @@ -47,12 +47,12 @@ func UseS2STransfer(ctx *state.GenerateState) { switch dstNode := dstNode.(type) { case *ops2.ShardWriteNode: dstStgBld := factory.GetBuilder(&dstNode.UserSpace) - if !dstStgBld.FeatureDesc().HasBypassWrite() { + if !dstStgBld.FeatureDesc().HasBypassWrite { failed = true break } - if !s2s.CanTransfer(dstNode.UserSpace) { + if !s2s.CanTransfer(&dstNode.UserSpace) { failed = true break } diff --git a/common/pkgs/mq/agent/server.go b/common/pkgs/mq/agent/server.go index df39606..09b7eda 100644 --- a/common/pkgs/mq/agent/server.go +++ b/common/pkgs/mq/agent/server.go @@ -8,7 +8,7 @@ import ( ) type Service interface { - StorageService + // UserSpaceService CacheService diff --git a/common/pkgs/mq/agent/storage.go b/common/pkgs/mq/agent/storage.go index bc200bf..88ee3ab 100644 --- a/common/pkgs/mq/agent/storage.go +++ b/common/pkgs/mq/agent/storage.go @@ -1,46 +1,48 @@ package agent +/* import ( "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) -type StorageService interface { - StorageCreatePackage(msg *StorageCreatePackage) (*StorageCreatePackageResp, *mq.CodeMessage) +type UserSpaceService interface { + UserSpaceCreatePackage(msg *UserSpaceCreatePackage) (*UserSpaceCreatePackageResp, *mq.CodeMessage) } -// 启动从Storage上传Package的任务 -var _ = Register(Service.StorageCreatePackage) +// 启动从UserSpace上传Package的任务 +var _ = Register(Service.UserSpaceCreatePackage) -type StorageCreatePackage struct { +type UserSpaceCreatePackage struct { mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - BucketID cdssdk.BucketID `json:"bucketID"` - Name string `json:"name"` - StorageID cdssdk.StorageID `json:"storageID"` - Path string `json:"path"` - StorageAffinity cdssdk.StorageID `json:"storageAffinity"` + UserID cdssdk.UserID `json:"userID"` + BucketID cdssdk.BucketID `json:"bucketID"` + Name string `json:"name"` + UserSpaceID cdssdk.UserSpaceID `json:"userspaceID"` + Path string `json:"path"` + UserSpaceAffinity cdssdk.UserSpaceID `json:"userspaceAffinity"` } -type StorageCreatePackageResp struct { +type UserSpaceCreatePackageResp struct { mq.MessageBodyBase Package cdssdk.Package `json:"package"` } -func ReqStorageCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string, stgAffinity cdssdk.StorageID) *StorageCreatePackage { - return &StorageCreatePackage{ - UserID: userID, - BucketID: bucketID, - Name: name, - StorageID: storageID, - Path: path, - StorageAffinity: stgAffinity, +func ReqUserSpaceCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, userspaceID cdssdk.UserSpaceID, path string, stgAffinity cdssdk.UserSpaceID) *UserSpaceCreatePackage { + return &UserSpaceCreatePackage{ + UserID: userID, + BucketID: bucketID, + Name: name, + UserSpaceID: userspaceID, + Path: path, + UserSpaceAffinity: stgAffinity, } } -func RespStorageCreatePackage(pkg cdssdk.Package) *StorageCreatePackageResp { - return &StorageCreatePackageResp{ +func RespUserSpaceCreatePackage(pkg cdssdk.Package) *UserSpaceCreatePackageResp { + return &UserSpaceCreatePackageResp{ Package: pkg, } } -func (client *Client) StorageCreatePackage(msg *StorageCreatePackage, opts ...mq.RequestOption) (*StorageCreatePackageResp, error) { - return mq.Request(Service.StorageCreatePackage, client.rabbitCli, msg, opts...) +func (client *Client) UserSpaceCreatePackage(msg *UserSpaceCreatePackage, opts ...mq.RequestOption) (*UserSpaceCreatePackageResp, error) { + return mq.Request(Service.UserSpaceCreatePackage, client.rabbitCli, msg, opts...) } +*/ diff --git a/common/pkgs/sysevent/publisher.go b/common/pkgs/sysevent/publisher.go index 3c3a5e6..fd804fc 100644 --- a/common/pkgs/sysevent/publisher.go +++ b/common/pkgs/sysevent/publisher.go @@ -7,7 +7,7 @@ import ( "github.com/streadway/amqp" "gitlink.org.cn/cloudream/common/pkgs/async" "gitlink.org.cn/cloudream/common/utils/serder" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + "gitlink.org.cn/cloudream/storage2/common/models/datamap" ) type PublisherEvent interface{} @@ -104,8 +104,8 @@ func (p *Publisher) Start() *async.UnboundChannel[PublisherEvent] { } // Publish 发布事件,会自动补齐必要信息 -func (p *Publisher) Publish(eventBody stgmod.SysEventBody) { - p.eventChan.Send(stgmod.SysEvent{ +func (p *Publisher) Publish(eventBody datamap.SysEventBody) { + p.eventChan.Send(datamap.SysEvent{ Timestamp: time.Now(), Source: p.thisSource, Body: eventBody, diff --git a/common/pkgs/sysevent/sysevent.go b/common/pkgs/sysevent/sysevent.go index 4c9ef00..ed5bf12 100644 --- a/common/pkgs/sysevent/sysevent.go +++ b/common/pkgs/sysevent/sysevent.go @@ -1,7 +1,7 @@ package sysevent import ( - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + "gitlink.org.cn/cloudream/storage2/common/models/datamap" ) const ( @@ -9,6 +9,6 @@ const ( ExchangeName = "SysEventExchange" ) -type SysEvent = stgmod.SysEvent +type SysEvent = datamap.SysEvent -type Source = stgmod.SysEventSource +type Source = datamap.SysEventSource