From 367faa32512451f0b4895c4c89189e92dd8fb73c Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 16 Jun 2025 17:32:27 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=BC=BA=E7=94=A8=E6=88=B7=E7=A9=BA?= =?UTF-8?q?=E9=97=B4=E5=90=8C=E6=AD=A5=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/migrate.go | 1 + client/internal/cmdline/serve.go | 24 +- client/internal/cmdline/test.go | 19 +- client/internal/cmdline/vfstest.go | 15 +- client/internal/config/config.go | 2 +- client/internal/db/space_sync_task.go | 33 +++ client/internal/db/string_serializer.go | 99 ++++++++ client/internal/http/v1/object.go | 9 +- client/internal/http/v1/package.go | 9 +- client/internal/http/v1/presigned.go | 10 +- client/internal/http/v1/server.go | 5 +- client/internal/http/v1/space_syncer.go | 103 ++++++++ client/internal/http/v1/user_space.go | 25 +- client/internal/mount/vfs/cache/cache.go | 2 +- client/internal/services/service.go | 4 + client/internal/services/user_space.go | 158 +----------- client/internal/spacesyncer/execute.go | 38 +++ client/internal/spacesyncer/execute_diff.go | 240 ++++++++++++++++++ client/internal/spacesyncer/execute_full.go | 159 ++++++++++++ client/internal/spacesyncer/filter.go | 39 +++ client/internal/spacesyncer/space_syncer.go | 191 ++++++++++++++ client/internal/spacesyncer/trigger.go | 109 ++++++++ client/internal/uploader/create_load.go | 9 +- client/internal/uploader/update.go | 9 +- client/internal/uploader/uploader.go | 4 +- client/internal/uploader/user_space_upload.go | 61 +++-- client/sdk/api/v1/space_syncer.go | 89 +++++++ client/types/path.go | 157 ++++++++++++ client/types/space_syncer.go | 151 +++++++++++ client/types/types.go | 2 +- common/pkgs/ioswitch2/fromto.go | 10 +- common/pkgs/ioswitch2/ops2/base_store.go | 18 +- common/pkgs/ioswitch2/ops2/s2s.go | 16 +- common/pkgs/ioswitch2/parser/gen/generator.go | 4 +- .../ioswitch2/plans/complete_multipart.go | 2 +- common/pkgs/ioswitchlrc/fromto.go | 2 + common/pkgs/ioswitchlrc/ops2/base_store.go | 18 +- common/pkgs/ioswitchlrc/parser/passes.go | 2 +- common/pkgs/storage/efile/ec_multiplier.go | 5 +- common/pkgs/storage/local/base_store.go | 76 ++---- common/pkgs/storage/local/dir_reader.go | 117 +++++++++ common/pkgs/storage/local/multipart_upload.go | 13 +- common/pkgs/storage/local/s2s.go | 14 +- common/pkgs/storage/local/shard_store.go | 16 +- common/pkgs/storage/obs/obs_test.go | 2 +- common/pkgs/storage/obs/s2s.go | 17 +- common/pkgs/storage/obs/shard_store.go | 3 +- common/pkgs/storage/s3/base_store.go | 91 ++++--- common/pkgs/storage/s3/dir_reader.go | 61 +++++ common/pkgs/storage/s3/multipart_upload.go | 20 +- common/pkgs/storage/s3/shard_store.go | 40 +-- common/pkgs/storage/s3/utils.go | 9 + common/pkgs/storage/types/base_store.go | 38 ++- common/pkgs/storage/types/s2s.go | 2 +- common/pkgs/storage/types/shard_store.go | 2 +- common/pkgs/storage/types/types.go | 2 +- common/pkgs/storage/types/utils.go | 18 +- 57 files changed, 1959 insertions(+), 435 deletions(-) create mode 100644 client/internal/db/space_sync_task.go create mode 100644 client/internal/db/string_serializer.go create mode 100644 client/internal/http/v1/space_syncer.go create mode 100644 client/internal/spacesyncer/execute.go create mode 100644 client/internal/spacesyncer/execute_diff.go create mode 100644 client/internal/spacesyncer/execute_full.go create mode 100644 client/internal/spacesyncer/filter.go create mode 100644 client/internal/spacesyncer/space_syncer.go create mode 100644 client/internal/spacesyncer/trigger.go create mode 100644 client/sdk/api/v1/space_syncer.go create mode 100644 client/types/path.go create mode 100644 client/types/space_syncer.go create mode 100644 common/pkgs/storage/local/dir_reader.go create mode 100644 common/pkgs/storage/s3/dir_reader.go diff --git a/client/internal/cmdline/migrate.go b/client/internal/cmdline/migrate.go index 849a3ac..762f10d 100644 --- a/client/internal/cmdline/migrate.go +++ b/client/internal/cmdline/migrate.go @@ -46,6 +46,7 @@ func migrate(configPath string) { migrateOne(db, clitypes.Package{}) migrateOne(db, clitypes.PinnedObject{}) migrateOne(db, clitypes.UserSpace{}) + migrateOne(db, clitypes.SpaceSyncTask{}) fmt.Println("migrate success") } diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index fd4259d..264451a 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -18,6 +18,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount" "gitlink.org.cn/cloudream/jcs-pub/client/internal/repl" "gitlink.org.cn/cloudream/jcs-pub/client/internal/services" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/spacesyncer" "gitlink.org.cn/cloudream/jcs-pub/client/internal/ticktock" "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" @@ -140,7 +141,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { // 元数据缓存 metaCacheHost := metacache.NewHost(db) go metaCacheHost.Serve() - stgMeta := metaCacheHost.AddStorageMeta() + spaceMeta := metaCacheHost.AddStorageMeta() hubMeta := metaCacheHost.AddHubMeta() conMeta := metaCacheHost.AddConnectivity() @@ -159,19 +160,24 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { stgPool := pool.NewPool() // 下载策略 - strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) + strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, spaceMeta, hubMeta, conMeta) // 下载器 dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db) // 上传器 - uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) + uploader := uploader.NewUploader(publock, conCol, stgPool, spaceMeta, db) // 定时任务 - tktk := ticktock.New(config.Cfg().TickTock, db, stgMeta, stgPool, evtPub, publock) + tktk := ticktock.New(config.Cfg().TickTock, db, spaceMeta, stgPool, evtPub, publock) tktk.Start() defer tktk.Stop() + // 用户空间同步功能 + spaceSync := spacesyncer.New(db, stgPool, spaceMeta) + spaceSyncChan := spaceSync.Start() + defer spaceSync.Stop() + // 交互式命令行 rep := repl.New(db, tktk) replCh := rep.Start() @@ -189,7 +195,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { mntChan := mnt.Start() defer mnt.Stop() - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt, stgPool) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, spaceMeta, db, evtPub, mnt, stgPool, spaceSync) // HTTP接口 httpCfgJSON := config.Cfg().HTTP @@ -217,6 +223,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { evtPubEvt := evtPubChan.Receive() conColEvt := conColChan.Receive() acStatEvt := acStatChan.Receive() + spaceSyncEvt := spaceSyncChan.Receive() replEvt := replCh.Receive() httpEvt := httpChan.Receive() mntEvt := mntChan.Receive() @@ -295,6 +302,13 @@ loop: } acStatEvt = acStatChan.Receive() + case e := <-spaceSyncEvt.Chan(): + if e.Err != nil { + logger.Errorf("receive space sync event: %v", err) + break loop + } + spaceSyncEvt = spaceSyncChan.Receive() + case e := <-replEvt.Chan(): if e.Err != nil { logger.Errorf("receive repl event: %v", err) diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index b8e53df..551f01d 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -17,7 +17,9 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy" "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" "gitlink.org.cn/cloudream/jcs-pub/client/internal/services" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/spacesyncer" "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader" + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity" @@ -58,7 +60,7 @@ func doTest(svc *services.Service) { ft = ioswitch2.NewFromTo() ft.AddFrom(ioswitch2.NewFromShardstore("Full1AE5436AF72D8EF93923486E0E167315CEF0C91898064DADFAC22216FFBC5E3D", *space1, ioswitch2.RawStream())) - ft.AddTo(ioswitch2.NewToBaseStore(*space2, "test3.txt")) + ft.AddTo(ioswitch2.NewToBaseStore(*space2, clitypes.PathFromComps("test3.txt"))) plans := exec.NewPlanBuilder() parser.Parse(ft, plans) fmt.Println(plans) @@ -178,7 +180,12 @@ func test(configPath string) { // 上传器 uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, nil, stgPool) + // 用户空间同步功能 + spaceSync := spacesyncer.New(db, stgPool, stgMeta) + spaceSyncChan := spaceSync.Start() + defer spaceSync.Stop() + + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, nil, stgPool, spaceSync) go func() { doTest(svc) @@ -189,6 +196,7 @@ func test(configPath string) { evtPubEvt := evtPubChan.Receive() conColEvt := conColChan.Receive() acStatEvt := acStatChan.Receive() + spaceSyncEvt := spaceSyncChan.Receive() loop: for { @@ -262,6 +270,13 @@ loop: break loop } acStatEvt = acStatChan.Receive() + + case e := <-spaceSyncEvt.Chan(): + if e.Err != nil { + logger.Errorf("receive space sync event: %v", err) + break loop + } + spaceSyncEvt = spaceSyncChan.Receive() } } } diff --git a/client/internal/cmdline/vfstest.go b/client/internal/cmdline/vfstest.go index 3336c26..a9b4993 100644 --- a/client/internal/cmdline/vfstest.go +++ b/client/internal/cmdline/vfstest.go @@ -19,6 +19,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount" "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/vfstest" "gitlink.org.cn/cloudream/jcs-pub/client/internal/services" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/spacesyncer" "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap" @@ -158,6 +159,11 @@ func vfsTest(configPath string, opts serveHTTPOptions) { // 上传器 uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) + // 用户空间同步功能 + spaceSync := spacesyncer.New(db, stgPool, stgMeta) + spaceSyncChan := spaceSync.Start() + defer spaceSync.Stop() + // 挂载 mntCfg := config.Cfg().Mount if !opts.DisableMount && mntCfg != nil && mntCfg.Enabled { @@ -171,7 +177,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { mntChan := mnt.Start() defer mnt.Stop() - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt, stgPool) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt, stgPool, spaceSync) // HTTP接口 httpCfgJSON := config.Cfg().HTTP @@ -208,6 +214,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { evtPubEvt := evtPubChan.Receive() conColEvt := conColChan.Receive() acStatEvt := acStatChan.Receive() + spaceSyncEvt := spaceSyncChan.Receive() httpEvt := httpChan.Receive() mntEvt := mntChan.Receive() @@ -284,6 +291,12 @@ loop: } acStatEvt = acStatChan.Receive() + case e := <-spaceSyncEvt.Chan(): + if e.Err != nil { + logger.Errorf("receive space sync event: %v", err) + } + spaceSyncEvt = spaceSyncChan.Receive() + case e := <-httpEvt.Chan(): if e.Err != nil { logger.Errorf("receive http event: %v", err) diff --git a/client/internal/config/config.go b/client/internal/config/config.go index 79eb990..7a7cb50 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -39,7 +39,7 @@ var cfg Config // TODO 这里的modeulName参数弄成可配置的更好 func Init(configPath string) error { if configPath == "" { - return config.DefaultLoad("client", &cfg) + return config.Load("config.json", &cfg) } return config.Load(configPath, &cfg) } diff --git a/client/internal/db/space_sync_task.go b/client/internal/db/space_sync_task.go new file mode 100644 index 0000000..006ecc9 --- /dev/null +++ b/client/internal/db/space_sync_task.go @@ -0,0 +1,33 @@ +package db + +import "gitlink.org.cn/cloudream/jcs-pub/client/types" + +type SpaceSyncTaskDB struct { + *DB +} + +func (db *DB) SpaceSyncTask() *SpaceSyncTaskDB { + return &SpaceSyncTaskDB{db} +} + +func (db *SpaceSyncTaskDB) Create(ctx SQLContext, task *types.SpaceSyncTask) error { + return ctx.Create(task).Error +} + +func (db *SpaceSyncTaskDB) GetAll(ctx SQLContext) ([]types.SpaceSyncTask, error) { + var tasks []types.SpaceSyncTask + err := ctx.Find(&tasks).Order("TaskID ASC").Error + return tasks, err +} + +func (*SpaceSyncTaskDB) Delete(ctx SQLContext, taskID types.SpaceSyncTaskID) error { + return ctx.Delete(&types.SpaceSyncTask{}, taskID).Error +} + +func (*SpaceSyncTaskDB) BatchDelete(ctx SQLContext, taskIDs []types.SpaceSyncTaskID) error { + if len(taskIDs) == 0 { + return nil + } + + return ctx.Where("TaskID IN (?)", taskIDs).Delete(&types.SpaceSyncTask{}).Error +} diff --git a/client/internal/db/string_serializer.go b/client/internal/db/string_serializer.go new file mode 100644 index 0000000..fa80ccd --- /dev/null +++ b/client/internal/db/string_serializer.go @@ -0,0 +1,99 @@ +package db + +import ( + "context" + "fmt" + "reflect" + + "gorm.io/gorm/schema" +) + +// 必须给结构体(而不是指针)实现此接口。FromString实现为静态方法 +type StringDBValuer interface { + ToString() (string, error) + FromString(str string) (any, error) +} + +type StringSerializer struct { +} + +func (StringSerializer) Scan(ctx context.Context, field *schema.Field, dst reflect.Value, dbValue interface{}) error { + if dbValue == nil { + fieldValue := reflect.New(field.FieldType) + field.ReflectValueOf(ctx, dst).Set(fieldValue.Elem()) + return nil + } + + str := "" + switch v := dbValue.(type) { + case []byte: + str = string(v) + case string: + str = v + default: + return fmt.Errorf("expected []byte or string, got: %T", dbValue) + } + + if field.FieldType.Kind() == reflect.Struct { + val := reflect.Zero(field.FieldType) + + sv, ok := val.Interface().(StringDBValuer) + if !ok { + return fmt.Errorf("ref of field type %v is not StringDBValuer", field.FieldType) + } + + v2, err := sv.FromString(str) + if err != nil { + return err + } + + field.ReflectValueOf(ctx, dst).Set(reflect.ValueOf(v2)) + return nil + } + + if field.FieldType.Kind() == reflect.Ptr { + val := reflect.Zero(field.FieldType.Elem()) + + sv, ok := val.Interface().(StringDBValuer) + if !ok { + return fmt.Errorf("field type %v is not StringDBValuer", field.FieldType) + } + + v2, err := sv.FromString(str) + if err != nil { + return err + } + + field.ReflectValueOf(ctx, dst).Set(reflect.ValueOf(v2)) + return nil + } + + return fmt.Errorf("unsupported field type: %v", field.FieldType) +} + +func (StringSerializer) Value(ctx context.Context, field *schema.Field, dst reflect.Value, fieldValue interface{}) (interface{}, error) { + val := reflect.ValueOf(fieldValue) + if val.Kind() == reflect.Struct { + sv, ok := val.Interface().(StringDBValuer) + if !ok { + return nil, fmt.Errorf("ref of field type %v is not StringDBValuer", field.FieldType) + } + + return sv.ToString() + } + + if val.Kind() == reflect.Ptr { + sv, ok := val.Elem().Interface().(StringDBValuer) + if !ok { + return nil, fmt.Errorf("field type %v is not StringDBValuer", field.FieldType) + } + + return sv.ToString() + } + + return nil, fmt.Errorf("unsupported field type: %v", field.FieldType) +} + +func init() { + schema.RegisterSerializer("string", StringSerializer{}) +} diff --git a/client/internal/http/v1/object.go b/client/internal/http/v1/object.go index a87fc18..008e637 100644 --- a/client/internal/http/v1/object.go +++ b/client/internal/http/v1/object.go @@ -84,7 +84,12 @@ func (s *ObjectService) Upload(ctx *gin.Context) { return } - up, err := s.svc.Uploader.BeginUpdate(req.Info.PackageID, req.Info.Affinity, req.Info.CopyTo, req.Info.CopyToPath) + copyToPath := make([]clitypes.JPath, 0, len(req.Info.CopyToPath)) + for _, p := range req.Info.CopyToPath { + copyToPath = append(copyToPath, clitypes.PathFromJcsPathString(p)) + } + + up, err := s.svc.Uploader.BeginUpdate(req.Info.PackageID, req.Info.Affinity, req.Info.CopyTo, copyToPath) if err != nil { log.Warnf("begin update: %s", err.Error()) ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, fmt.Sprintf("begin update: %v", err))) @@ -109,7 +114,7 @@ func (s *ObjectService) Upload(ctx *gin.Context) { } path = filepath.ToSlash(path) - err = up.Upload(path, f) + err = up.Upload(clitypes.PathFromJcsPathString(path), f) if err != nil { log.Warnf("uploading file: %s", err.Error()) ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, fmt.Sprintf("uploading file %v: %v", file.Filename, err))) diff --git a/client/internal/http/v1/package.go b/client/internal/http/v1/package.go index a390db8..3466b33 100644 --- a/client/internal/http/v1/package.go +++ b/client/internal/http/v1/package.go @@ -110,7 +110,12 @@ func (s *PackageService) CreateLoad(ctx *gin.Context) { return } - up, err := s.svc.Uploader.BeginCreateUpload(req.Info.BucketID, req.Info.Name, req.Info.CopyTo, req.Info.CopyToPath) + copyToPath := make([]clitypes.JPath, 0, len(req.Info.CopyToPath)) + for _, p := range req.Info.CopyToPath { + copyToPath = append(copyToPath, clitypes.PathFromJcsPathString(p)) + } + + up, err := s.svc.Uploader.BeginCreateUpload(req.Info.BucketID, req.Info.Name, req.Info.CopyTo, copyToPath) if err != nil { log.Warnf("begin package create upload: %s", err.Error()) ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "%v", err)) @@ -135,7 +140,7 @@ func (s *PackageService) CreateLoad(ctx *gin.Context) { } path = filepath.ToSlash(path) - err = up.Upload(path, f) + err = up.Upload(clitypes.PathFromJcsPathString(path), f) if err != nil { log.Warnf("uploading file: %s", err.Error()) ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, fmt.Sprintf("uploading file %v: %v", file.Filename, err))) diff --git a/client/internal/http/v1/presigned.go b/client/internal/http/v1/presigned.go index 36a94d1..b9d611a 100644 --- a/client/internal/http/v1/presigned.go +++ b/client/internal/http/v1/presigned.go @@ -14,6 +14,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types" cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/ecode" ) @@ -155,7 +156,12 @@ func (s *PresignedService) ObjectUpload(ctx *gin.Context) { return } - up, err := s.svc.Uploader.BeginUpdate(req.PackageID, req.Affinity, req.CopyTo, req.CopyToPath) + copyToPath := make([]clitypes.JPath, 0, len(req.CopyToPath)) + for _, p := range req.CopyToPath { + copyToPath = append(copyToPath, clitypes.PathFromJcsPathString(p)) + } + + up, err := s.svc.Uploader.BeginUpdate(req.PackageID, req.Affinity, req.CopyTo, copyToPath) if err != nil { log.Warnf("begin update: %s", err.Error()) ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, fmt.Sprintf("begin update: %v", err))) @@ -165,7 +171,7 @@ func (s *PresignedService) ObjectUpload(ctx *gin.Context) { path := filepath.ToSlash(req.Path) - err = up.Upload(path, ctx.Request.Body) + err = up.Upload(clitypes.PathFromJcsPathString(path), ctx.Request.Body) if err != nil { log.Warnf("uploading file: %s", err.Error()) ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, fmt.Sprintf("uploading file %v: %v", req.Path, err))) diff --git a/client/internal/http/v1/server.go b/client/internal/http/v1/server.go index 1e141ba..075ad6e 100644 --- a/client/internal/http/v1/server.go +++ b/client/internal/http/v1/server.go @@ -52,7 +52,6 @@ func (s *Server) InitRouters(rt gin.IRoutes, ah *auth.Auth) { rt.POST(cliapi.UserSpaceUpdatePath, certAuth, s.UserSpace().Update) rt.POST(cliapi.UserSpaceDeletePath, certAuth, s.UserSpace().Delete) rt.POST(cliapi.UserSpaceTestPath, certAuth, s.UserSpace().Test) - rt.POST(cliapi.UserSpaceSpaceToSpacePath, certAuth, s.UserSpace().SpaceToSpace) rt.GET(cliapi.BucketGetByNamePath, certAuth, s.Bucket().GetByName) rt.POST(cliapi.BucketCreatePath, certAuth, s.Bucket().Create) @@ -63,6 +62,10 @@ func (s *Server) InitRouters(rt gin.IRoutes, ah *auth.Auth) { rt.POST(cliapi.ObjectUploadPartPath, certAuth, s.Object().UploadPart) rt.POST(cliapi.ObjectCompleteMultipartUploadPath, certAuth, s.Object().CompleteMultipartUpload) + rt.POST(cliapi.SpaceSyncerCreateTaskPath, certAuth, s.SpaceSyncer().CreateTask) + rt.GET(cliapi.SpaceSyncerGetTaskPath, certAuth, s.SpaceSyncer().GetTask) + rt.POST(cliapi.SpaceSyncerCancelTaskPath, certAuth, s.SpaceSyncer().CancelTask) + rt.GET(cliapi.PresignedObjectListByPathPath, signAuth, s.Presigned().ObjectListByPath) rt.GET(cliapi.PresignedObjectDownloadByPathPath, signAuth, s.Presigned().ObjectDownloadByPath) rt.GET(cliapi.PresignedObjectDownloadPath, signAuth, s.Presigned().ObjectDownload) diff --git a/client/internal/http/v1/space_syncer.go b/client/internal/http/v1/space_syncer.go new file mode 100644 index 0000000..1414869 --- /dev/null +++ b/client/internal/http/v1/space_syncer.go @@ -0,0 +1,103 @@ +package http + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types" + cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/ecode" +) + +type SpaceSyncerService struct { + *Server +} + +func (s *Server) SpaceSyncer() *SpaceSyncerService { + return &SpaceSyncerService{s} +} + +func (s *SpaceSyncerService) CreateTask(ctx *gin.Context) { + log := logger.WithField("HTTP", "SpaceSyncer.CreateTask") + + req, err := types.ShouldBindJSONEx[cliapi.SpaceSyncerCreateTask](ctx) + if err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "missing argument or invalid argument")) + return + } + + if len(req.DestPathes) != len(req.DestUserSpaceIDs) { + log.Warnf("destPathes and destUserSpaceIDs should have the same length") + ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "destPathes and destUserSpaceIDs should have the same length")) + return + } + + if len(req.DestPathes) == 0 { + log.Warnf("must have at least one dest") + ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "must have at least one dest")) + return + } + + dests := make([]clitypes.SpaceSyncDest, 0, len(req.DestUserSpaceIDs)) + for _, id := range req.DestUserSpaceIDs { + dests = append(dests, clitypes.SpaceSyncDest{ + DestUserSpaceID: clitypes.UserSpaceID(id), + DestPath: clitypes.PathFromJcsPathString(req.DestPathes[0]), + }) + } + + info, err := s.svc.SpaceSyncer.CreateTask(clitypes.SpaceSyncTask{ + Trigger: req.Trigger, + Mode: req.Mode, + Filters: req.Filters, + Options: req.Options, + SrcUserSpaceID: req.SrcUserSpaceID, + SrcPath: clitypes.PathFromJcsPathString(req.SrcPath), + Dests: dests, + }) + if err != nil { + log.Warnf("start task: %s", err.Error()) + ctx.JSON(http.StatusInternalServerError, types.Failed(ecode.OperationFailed, "start task: %v", err)) + return + } + + ctx.JSON(http.StatusOK, types.OK(cliapi.SpaceSyncerCreateTaskResp{ + Task: info.Task, + })) +} + +func (s *SpaceSyncerService) CancelTask(ctx *gin.Context) { + log := logger.WithField("HTTP", "SpaceSyncer.CancelTask") + + var req cliapi.SpaceSyncerCancelTask + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "missing argument or invalid argument")) + return + } + + s.svc.SpaceSyncer.CancelTask(req.TaskID) + ctx.JSON(http.StatusOK, types.OK(cliapi.SpaceSyncerCancelTaskResp{})) +} + +func (s *SpaceSyncerService) GetTask(ctx *gin.Context) { + log := logger.WithField("HTTP", "SpaceSyncer.GetTask") + + var req cliapi.SpaceSyncerGetTask + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "missing argument or invalid argument")) + return + } + + task := s.svc.SpaceSyncer.GetTask(req.TaskID) + if task == nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.DataNotFound, "task not found")) + return + } + + ctx.JSON(http.StatusOK, types.OK(cliapi.SpaceSyncerGetTaskResp{Task: *task})) +} diff --git a/client/internal/http/v1/user_space.go b/client/internal/http/v1/user_space.go index 5351dc2..ddafb3b 100644 --- a/client/internal/http/v1/user_space.go +++ b/client/internal/http/v1/user_space.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types" cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/ecode" ) @@ -51,7 +52,7 @@ func (s *UserSpaceService) CreatePackage(ctx *gin.Context) { return } - pkg, err := s.svc.Uploader.UserSpaceUpload(req.UserSpaceID, req.Path, req.BucketID, req.Name, req.SpaceAffinity) + pkg, err := s.svc.Uploader.UserSpaceUpload(req.UserSpaceID, clitypes.PathFromJcsPathString(req.Path), req.BucketID, req.Name, req.SpaceAffinity) if err != nil { log.Warnf("userspace create package: %s", err.Error()) ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, fmt.Sprintf("userspace create package: %v", err))) @@ -166,25 +167,3 @@ func (s *UserSpaceService) Test(ctx *gin.Context) { ctx.JSON(http.StatusOK, types.OK(resp)) } - -func (s *UserSpaceService) SpaceToSpace(ctx *gin.Context) { - log := logger.WithField("HTTP", "UserSpace.SpaceToSpace") - - var req cliapi.UserSpaceSpaceToSpace - if err := ctx.ShouldBindJSON(&req); err != nil { - log.Warnf("binding body: %s", err.Error()) - ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "missing argument or invalid argument")) - return - } - - ret, err := s.svc.UserSpaceSvc().SpaceToSpace(req.SrcUserSpaceID, req.SrcPath, req.DstUserSpaceID, req.DstPath) - if err != nil { - log.Warnf("space2space: %s", err.Error()) - ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "space2space failed")) - return - } - - ctx.JSON(http.StatusOK, types.OK(cliapi.UserSpaceSpaceToSpaceResp{ - SpaceToSpaceResult: ret, - })) -} diff --git a/client/internal/mount/vfs/cache/cache.go b/client/internal/mount/vfs/cache/cache.go index 4a2c6d9..593d1d9 100644 --- a/client/internal/mount/vfs/cache/cache.go +++ b/client/internal/mount/vfs/cache/cache.go @@ -1003,7 +1003,7 @@ func (c *Cache) doUploading(pkgs []*syncPackage) { counter := io2.Counter(&rd) - err = upder.Upload(clitypes.JoinObjectPath(o.pathComps[2:]...), counter, uploader.UploadOption{ + err = upder.Upload(clitypes.PathFromComps(o.pathComps[2:]...), counter, uploader.UploadOption{ CreateTime: o.modTime, }) if err != nil { diff --git a/client/internal/services/service.go b/client/internal/services/service.go index c7e20c2..fca0cb4 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -7,6 +7,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy" "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/spacesyncer" "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" @@ -25,6 +26,7 @@ type Service struct { EvtPub *sysevent.Publisher Mount *mount.Mount StgPool *pool.Pool + SpaceSyncer *spacesyncer.SpaceSyncer } func NewService( @@ -38,6 +40,7 @@ func NewService( evtPub *sysevent.Publisher, mount *mount.Mount, stgPool *pool.Pool, + spaceSyncer *spacesyncer.SpaceSyncer, ) *Service { return &Service{ PubLock: publock, @@ -50,5 +53,6 @@ func NewService( EvtPub: evtPub, Mount: mount, StgPool: stgPool, + SpaceSyncer: spaceSyncer, } } diff --git a/client/internal/services/user_space.go b/client/internal/services/user_space.go index 9a052a5..6a39c58 100644 --- a/client/internal/services/user_space.go +++ b/client/internal/services/user_space.go @@ -3,13 +3,8 @@ package services import ( "context" "fmt" - "path" - "strings" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/trie" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gorm.io/gorm" @@ -22,7 +17,6 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/reqbuilder" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) type UserSpaceService struct { @@ -58,7 +52,7 @@ func (svc *UserSpaceService) Create(req cliapi.UserSpaceCreate) (*cliapi.UserSpa Credential: req.Credential, ShardStore: req.ShardStore, Features: req.Features, - WorkingDir: req.WorkingDir, + WorkingDir: clitypes.PathFromJcsPathString(req.WorkingDir), Revision: 0, } err = db2.UserSpace().Create(tx, &space) @@ -170,7 +164,7 @@ func (svc *UserSpaceService) Test(req cliapi.UserSpaceTest) (*cliapi.UserSpaceTe Name: "test", Storage: req.Storage, Credential: req.Credential, - WorkingDir: req.WorikingDir, + WorkingDir: clitypes.PathFromJcsPathString(req.WorikingDir), }, } blder := factory.GetBuilder(&detail) @@ -179,8 +173,7 @@ func (svc *UserSpaceService) Test(req cliapi.UserSpaceTest) (*cliapi.UserSpaceTe return nil, ecode.Newf(ecode.OperationFailed, "%v", err) } - // TODO 可以考虑增加一个专门用于检查配置的接口F - _, err = baseStore.ListAll("") + err = baseStore.Test() if err != nil { return nil, ecode.Newf(ecode.OperationFailed, "%v", err) } @@ -202,6 +195,8 @@ func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, users return err } + rootJPath := clitypes.PathFromJcsPathString(rootPath) + var pinned []clitypes.ObjectID plans := exec.NewPlanBuilder() for _, obj := range details { @@ -227,7 +222,9 @@ func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, users return fmt.Errorf("unsupported download strategy: %T", strg) } - ft.AddTo(ioswitch2.NewToBaseStore(*destStg, path.Join(rootPath, obj.Object.Path))) + objPath := clitypes.PathFromJcsPathString(obj.Object.Path) + dstPath := rootJPath.ConcatNew(objPath) + ft.AddTo(ioswitch2.NewToBaseStore(*destStg, dstPath)) // 顺便保存到同存储服务的分片存储中 if destStg.UserSpace.ShardStore != nil { ft.AddTo(ioswitch2.NewToShardStore(*destStg, ioswitch2.RawStream(), "")) @@ -262,142 +259,3 @@ func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, users return nil } - -func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPath string, dstSpaceID clitypes.UserSpaceID, dstPath string) (clitypes.SpaceToSpaceResult, error) { - srcSpace := svc.UserSpaceMeta.Get(srcSpaceID) - if srcSpace == nil { - return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source userspace not found: %d", srcSpaceID) - } - - srcStore, err := svc.StgPool.GetBaseStore(srcSpace) - if err != nil { - return clitypes.SpaceToSpaceResult{}, fmt.Errorf("get source userspace store: %w", err) - } - - dstSpace := svc.UserSpaceMeta.Get(dstSpaceID) - if dstSpace == nil { - return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination userspace not found: %d", dstSpaceID) - } - - dstStore, err := svc.StgPool.GetBaseStore(dstSpace) - if err != nil { - return clitypes.SpaceToSpaceResult{}, fmt.Errorf("get destination userspace store: %w", err) - } - - srcPath = strings.Trim(srcPath, cdssdk.ObjectPathSeparator) - dstPath = strings.Trim(dstPath, cdssdk.ObjectPathSeparator) - - if srcPath == "" { - return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source path is empty") - } - - if dstPath == "" { - return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination path is empty") - } - - entries, cerr := srcStore.ListAll(srcPath) - if cerr != nil { - return clitypes.SpaceToSpaceResult{}, fmt.Errorf("list all from source userspace: %w", cerr) - } - - srcPathComps := clitypes.SplitObjectPath(srcPath) - srcDirCompLen := len(srcPathComps) - 1 - - entryTree := trie.NewTrie[*types.ListEntry]() - for _, e := range entries { - pa, ok := strings.CutSuffix(e.Path, clitypes.ObjectPathSeparator) - comps := clitypes.SplitObjectPath(pa) - e.Path = pa - - e2 := e - entryTree.CreateWords(comps[srcDirCompLen:]).Value = &e2 - e2.IsDir = e2.IsDir || ok - } - - entryTree.Iterate(func(path []string, node *trie.Node[*types.ListEntry], isWordNode bool) trie.VisitCtrl { - if node.Value == nil { - return trie.VisitContinue - } - - if node.Value.IsDir && len(node.WordNexts) > 0 { - node.Value = nil - return trie.VisitContinue - } - - if !node.Value.IsDir && len(node.WordNexts) == 0 { - node.WordNexts = nil - } - - return trie.VisitContinue - }) - - var filePathes []string - var dirPathes []string - entryTree.Iterate(func(path []string, node *trie.Node[*types.ListEntry], isWordNode bool) trie.VisitCtrl { - if node.Value == nil { - return trie.VisitContinue - } - - if node.Value.IsDir { - dirPathes = append(dirPathes, node.Value.Path) - } else { - filePathes = append(filePathes, node.Value.Path) - } - - return trie.VisitContinue - }) - - mutex, err := reqbuilder.NewBuilder().UserSpace().Buzy(srcSpaceID).Buzy(dstSpaceID).MutexLock(svc.PubLock) - if err != nil { - return clitypes.SpaceToSpaceResult{}, fmt.Errorf("acquire lock: %w", err) - } - defer mutex.Unlock() - - var success []string - var failed []string - - for _, f := range filePathes { - newPath := strings.Replace(f, srcPath, dstPath, 1) - - ft := ioswitch2.NewFromTo() - ft.AddFrom(ioswitch2.NewFromBaseStore(*srcSpace, f)) - ft.AddTo(ioswitch2.NewToBaseStore(*dstSpace, newPath)) - - plans := exec.NewPlanBuilder() - err := parser.Parse(ft, plans) - if err != nil { - failed = append(failed, f) - logger.Warnf("s2s: parse plan of file %v: %v", f, err) - continue - } - exeCtx := exec.NewExecContext() - exec.SetValueByType(exeCtx, svc.StgPool) - _, cerr := plans.Execute(exeCtx).Wait(context.Background()) - if cerr != nil { - failed = append(failed, f) - logger.Warnf("s2s: execute plan of file %v: %v", f, cerr) - continue - } - - success = append(success, f) - } - - newDirPathes := make([]string, 0, len(dirPathes)) - for i := range dirPathes { - newDirPathes = append(newDirPathes, strings.Replace(dirPathes[i], srcPath, dstPath, 1)) - } - - for _, d := range newDirPathes { - err := dstStore.Mkdir(d) - if err != nil { - failed = append(failed, d) - } else { - success = append(success, d) - } - } - - return clitypes.SpaceToSpaceResult{ - Success: success, - Failed: failed, - }, nil -} diff --git a/client/internal/spacesyncer/execute.go b/client/internal/spacesyncer/execute.go new file mode 100644 index 0000000..224a17e --- /dev/null +++ b/client/internal/spacesyncer/execute.go @@ -0,0 +1,38 @@ +package spacesyncer + +import ( + "gitlink.org.cn/cloudream/common/pkgs/trie" + "gitlink.org.cn/cloudream/jcs-pub/client/types" + stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" +) + +func execute(syncer *SpaceSyncer, task *task) { + switch mode := task.Task.Mode.(type) { + case *types.SpaceSyncModeFull: + executeFull(syncer, task) + case *types.SpaceSyncModeDiff: + executeDiff(syncer, task, mode) + } +} + +func createDirNode(tree *trie.Trie[*stgtypes.DirEntry], pathComps []string, e *stgtypes.DirEntry) { + var ptr = &tree.Root + for _, c := range pathComps { + ptr.Value = nil + ptr = ptr.Create(c) + } + ptr.Value = e +} + +func removeDirNode(tree *trie.Trie[*stgtypes.DirEntry], pathComps []string) { + var ptr = &tree.Root + for _, c := range pathComps { + ptr.Value = nil + next := ptr.WalkNext(c) + if next == nil { + break + } + } + ptr.Value = nil + ptr.RemoveSelf(true) +} diff --git a/client/internal/spacesyncer/execute_diff.go b/client/internal/spacesyncer/execute_diff.go new file mode 100644 index 0000000..ceb4b16 --- /dev/null +++ b/client/internal/spacesyncer/execute_diff.go @@ -0,0 +1,240 @@ +package spacesyncer + +import ( + "context" + "io" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/trie" + "gitlink.org.cn/cloudream/common/utils/math2" + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" + stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" +) + +func executeDiff(syncer *SpaceSyncer, task *task, mode *clitypes.SpaceSyncModeDiff) { + log := logger.WithField("Mod", logMod).WithField("TaskID", task.Task.TaskID) + + startTime := time.Now() + log.Infof("begin full sync task") + defer func() { + log.Infof("full sync task finished, time: %v", time.Since(startTime)) + }() + + srcSpace := syncer.spaceMeta.Get(task.Task.SrcUserSpaceID) + if srcSpace == nil { + log.Warnf("src space %v not found", task.Task.SrcUserSpaceID) + return + } + + if len(task.Task.Dests) > 1 { + log.Warnf("diff mode only support one dest now") + } + + dstSpace := syncer.spaceMeta.Get(task.Task.Dests[0].DestUserSpaceID) + if dstSpace == nil { + log.Warnf("dest space %v not found", task.Task.Dests[0].DestUserSpaceID) + return + } + + srcBase, err := syncer.stgPool.GetBaseStore(srcSpace) + if err != nil { + log.Warnf("get src base store error: %v", err) + return + } + + dstBase, err := syncer.stgPool.GetBaseStore(dstSpace) + if err != nil { + log.Warnf("get dst base store error: %v", err) + return + } + + filter := buildFilter(task) + + srcReader := srcBase.ReadDir(task.Task.SrcPath) + dstReader := dstBase.ReadDir(task.Task.Dests[0].DestPath) + + dirTree := trie.NewTrie[srcDstDirEntry]() + + for { + e, err := srcReader.Next() + if err == io.EOF { + break + } + if err != nil { + log.Warnf("read src dir: %v", err) + return + } + + if !filter(e) { + continue + } + + rela := e.Path.Clone() + rela.DropFrontN(task.Task.SrcPath.Len()) + + ne := e + ne.Path = rela.Clone() + if !filter(ne) { + continue + } + + diffCreateSrcNode(dirTree, rela, &e) + } + + for { + e, err := dstReader.Next() + if err == io.EOF { + break + } + if err != nil { + log.Warnf("read dst dir: %v", err) + return + } + + if !filter(e) { + continue + } + + rela := e.Path.Clone() + rela.DropFrontN(task.Task.Dests[0].DestPath.Len()) + + ne := e + ne.Path = rela.Clone() + if !filter(ne) { + continue + } + + diffCreateDstNode(dirTree, rela, &e) + } + + var willSync []stgtypes.DirEntry + var willMkdirs []clitypes.JPath + + dirTree.Iterate(func(path []string, node *trie.Node[srcDstDirEntry], isWordNode bool) trie.VisitCtrl { + if node.Value.src == nil { + // 目前不支持删除多余文件 + return trie.VisitContinue + } + + if node.Value.src.IsDir { + if node.Value.dst == nil { + if node.IsEmpty() { + willMkdirs = append(willMkdirs, clitypes.PathFromComps(path...)) + } + } + } else { + if node.Value.dst == nil { + // 目标路径不存在(不是文件也不是目录),需要同步 + if node.IsEmpty() { + willSync = append(willSync, *node.Value.src) + } + } else if !node.Value.dst.IsDir { + // 目标路径是个文件,但文件指纹不同,需要同步 + if !cmpFile(mode, node.Value.src, node.Value.dst) { + willSync = append(willSync, *node.Value.src) + } + } + + // 目标路径是个目录,则不进行同步 + } + + return trie.VisitContinue + }) + + willSyncCnt := len(willSync) + for len(willSync) > 0 { + syncs := willSync[:math2.Min(len(willSync), 50)] + willSync = willSync[len(syncs):] + + ft := ioswitch2.NewFromTo() + for _, s := range syncs { + ft.AddFrom(ioswitch2.NewFromBaseStore(*srcSpace, s.Path)) + rela := s.Path.Clone() + rela.DropFrontN(task.Task.SrcPath.Len()) + dstPath := task.Task.Dests[0].DestPath.ConcatNew(rela) + to := ioswitch2.NewToBaseStore(*dstSpace, dstPath) + to.Option.ModTime = s.ModTime + ft.AddTo(to) + } + + planBld := exec.NewPlanBuilder() + err := parser.Parse(ft, planBld) + if err != nil { + log.Warnf("parse fromto: %v", err) + return + } + + execCtx := exec.NewWithContext(task.Context) + exec.SetValueByType(execCtx, syncer.stgPool) + _, err = planBld.Execute(execCtx).Wait(context.Background()) + if err != nil { + log.Warnf("execute plan: %v", err) + return + } + } + + log.Infof("%v files synced", willSyncCnt) + + if !task.Task.Options.NoEmptyDirectories && len(willMkdirs) > 0 { + for _, p := range willMkdirs { + rela := p.Clone() + rela.DropFrontN(task.Task.SrcPath.Len()) + dstPath := task.Task.Dests[0].DestPath.ConcatNew(rela) + err := dstBase.Mkdir(dstPath) + if err != nil { + log.Warnf("mkdir: %v", err) + continue + } + } + } +} + +func diffCreateSrcNode(tree *trie.Trie[srcDstDirEntry], path clitypes.JPath, e *stgtypes.DirEntry) { + var ptr = &tree.Root + for _, c := range path.Comps() { + if ptr.Value.src != nil && ptr.Value.src.IsDir { + ptr.Value.src = nil + } + ptr = ptr.Create(c) + } + + ptr.Value.src = e +} + +func diffCreateDstNode(tree *trie.Trie[srcDstDirEntry], path clitypes.JPath, e *stgtypes.DirEntry) { + var ptr = &tree.Root + for _, c := range path.Comps() { + if ptr.Value.src != nil && ptr.Value.src.IsDir { + ptr.Value.src = nil + } + + if ptr.Value.dst != nil && ptr.Value.dst.IsDir { + ptr.Value.dst = nil + } + + ptr = ptr.Create(c) + } + + ptr.Value.dst = e +} + +type srcDstDirEntry struct { + src *stgtypes.DirEntry + dst *stgtypes.DirEntry +} + +func cmpFile(diff *clitypes.SpaceSyncModeDiff, src, dst *stgtypes.DirEntry) bool { + if diff.IncludeSize && src.Size != dst.Size { + return false + } + + if diff.IncludeModTime && src.ModTime != dst.ModTime { + return false + } + + return true +} diff --git a/client/internal/spacesyncer/execute_full.go b/client/internal/spacesyncer/execute_full.go new file mode 100644 index 0000000..f80eb67 --- /dev/null +++ b/client/internal/spacesyncer/execute_full.go @@ -0,0 +1,159 @@ +package spacesyncer + +import ( + "context" + "fmt" + "io" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/trie" + "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" + stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" +) + +func executeFull(syncer *SpaceSyncer, task *task) { + log := logger.WithField("Mod", logMod).WithField("TaskID", task.Task.TaskID) + + startTime := time.Now() + log.Infof("begin full sync task") + defer func() { + log.Infof("full sync task finished, time: %v", time.Since(startTime)) + }() + + srcSpace := syncer.spaceMeta.Get(task.Task.SrcUserSpaceID) + if srcSpace == nil { + log.Warnf("src space %v not found", task.Task.SrcUserSpaceID) + return + } + + dstSpaceIDs := make([]types.UserSpaceID, len(task.Task.Dests)) + for i := range task.Task.Dests { + dstSpaceIDs[i] = task.Task.Dests[i].DestUserSpaceID + } + dstSpaces := syncer.spaceMeta.GetMany(dstSpaceIDs) + + for i := range dstSpaces { + if dstSpaces[i] == nil { + log.Warnf("dst space %v not found", dstSpaceIDs[i]) + return + } + } + + srcBase, err := syncer.stgPool.GetBaseStore(srcSpace) + if err != nil { + log.Warnf("get src base store: %v", err) + return + } + + filter := buildFilter(task) + + srcDirReader := srcBase.ReadDir(task.Task.SrcPath) + defer srcDirReader.Close() + + srcDirTree := trie.NewTrie[*stgtypes.DirEntry]() + fileCnt := 0 + for { + isEOF := false + ft := ioswitch2.NewFromTo() + cnt := 0 + for { + e, err := srcDirReader.Next() + if err == io.EOF { + isEOF = true + break + } + if err != nil { + log.Warnf("read src dir: %v", err) + return + } + + rela := e.Path.Clone() + rela.DropFrontN(task.Task.SrcPath.Len()) + + ne := e + ne.Path = rela.Clone() + if !filter(ne) { + continue + } + + if e.IsDir { + // 如果是一个目录,则创建对应的Dir节点,且在创建过程中清除掉路径上的Dir信息(仅保留最后一个Dir节点) + createDirNode(srcDirTree, rela.Comps(), &e) + continue + } + + fmt.Printf("rela: %v\n", rela) + + // 如果是一个文件,那么它路径上的目录都可以在写入时一并创建,所以可以清理掉路径上的Dir节点 + removeDirNode(srcDirTree, rela.Comps()) + ft.AddFrom(ioswitch2.NewFromBaseStore(*srcSpace, e.Path)) + for i, dst := range dstSpaces { + dstPath := task.Task.Dests[i].DestPath.Clone() + dstPath.Concat(rela) + ft.AddTo(ioswitch2.NewToBaseStore(*dst, dstPath)) + } + cnt++ + fileCnt++ + + // 每一批转发50个文件 + if cnt > 50 { + break + } + } + if len(ft.Froms) > 0 { + planBld := exec.NewPlanBuilder() + err := parser.Parse(ft, planBld) + if err != nil { + log.Warnf("parse fromto to plan: %v", err) + return + } + + execCtx := exec.NewWithContext(task.Context) + exec.SetValueByType(execCtx, syncer.stgPool) + _, err = planBld.Execute(execCtx).Wait(context.Background()) + if err != nil { + log.Warnf("execute plan: %v", err) + return + } + } + if isEOF { + break + } + } + + log.Infof("%v files synced", fileCnt) + + if !task.Task.Options.NoEmptyDirectories { + dstBases := make([]stgtypes.BaseStore, len(dstSpaces)) + for i := range dstSpaces { + dstBases[i], err = syncer.stgPool.GetBaseStore(dstSpaces[i]) + if err != nil { + log.Warnf("get dst base store: %v", err) + continue + } + } + + srcDirTree.Iterate(func(path []string, node *trie.Node[*stgtypes.DirEntry], isWordNode bool) trie.VisitCtrl { + if node.Value == nil { + return trie.VisitContinue + } + + for i, base := range dstBases { + if base != nil { + dirPath := task.Task.Dests[i].DestPath.Clone() + dirPath.ConcatComps(path) + err := base.Mkdir(dirPath) + if err != nil { + log.Warnf("mkdir %v at user space %v: %v", dirPath, dstSpaces[i].String(), err) + } + } + } + + return trie.VisitContinue + }) + } +} diff --git a/client/internal/spacesyncer/filter.go b/client/internal/spacesyncer/filter.go new file mode 100644 index 0000000..1ee7c15 --- /dev/null +++ b/client/internal/spacesyncer/filter.go @@ -0,0 +1,39 @@ +package spacesyncer + +import ( + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" +) + +type FilterFn func(info stgtypes.DirEntry) bool + +func buildFilter(task *task) FilterFn { + var fns []FilterFn + for _, f := range task.Task.Filters { + switch f := f.(type) { + case *clitypes.SpaceSyncFilterSize: + fns = append(fns, filterSize(f)) + } + } + + return func(info stgtypes.DirEntry) bool { + for _, fn := range fns { + if !fn(info) { + return false + } + } + return true + } +} + +func filterSize(filter *clitypes.SpaceSyncFilterSize) FilterFn { + return func(info stgtypes.DirEntry) bool { + if filter.MinSize > 0 && info.Size < filter.MinSize { + return false + } + if filter.MaxSize > 0 && info.Size > filter.MaxSize { + return false + } + return true + } +} diff --git a/client/internal/spacesyncer/space_syncer.go b/client/internal/spacesyncer/space_syncer.go new file mode 100644 index 0000000..a28302a --- /dev/null +++ b/client/internal/spacesyncer/space_syncer.go @@ -0,0 +1,191 @@ +package spacesyncer + +import ( + "context" + "fmt" + "sync" + + "gitlink.org.cn/cloudream/common/pkgs/async" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + stgpool "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" +) + +const ( + logMod = "SpaceSyncer" +) + +type SpaceSyncerEvent interface { + IsSpaceSyncerEvent() bool +} + +type SpaceSyncer struct { + db *db.DB + stgPool *stgpool.Pool + spaceMeta *metacache.UserSpaceMeta + lock sync.Mutex + tasks map[clitypes.SpaceSyncTaskID]*task +} + +func New(db *db.DB, stgPool *stgpool.Pool, spaceMeta *metacache.UserSpaceMeta) *SpaceSyncer { + return &SpaceSyncer{ + db: db, + stgPool: stgPool, + spaceMeta: spaceMeta, + tasks: make(map[clitypes.SpaceSyncTaskID]*task), + } +} + +func (s *SpaceSyncer) Start() *async.UnboundChannel[SpaceSyncerEvent] { + s.lock.Lock() + defer s.lock.Unlock() + + log := logger.WithField("Mod", logMod) + + ch := async.NewUnboundChannel[SpaceSyncerEvent]() + + allTask, err := db.DoTx01(s.db, s.db.SpaceSyncTask().GetAll) + if err != nil { + log.Warnf("load task from db: %v", err) + } else { + var rms []clitypes.SpaceSyncTaskID + for _, t := range allTask { + ctx, cancel := context.WithCancel(context.Background()) + tsk := task{ + Task: t, + Context: ctx, + CancelFn: cancel, + } + + switch tr := t.Trigger.(type) { + case *clitypes.SpaceSyncTriggerOnce: + // Once类型的任务没有执行完也不执行了 + rms = append(rms, t.TaskID) + + case *clitypes.SpaceSyncTriggerInterval: + triggerInterval(s, &tsk, tr) + + case *clitypes.SpaceSyncTriggerAt: + triggerAt(s, &tsk, tr) + } + + log.Infof("load task %v from db", t.TaskID) + } + + if len(rms) > 0 { + err := s.db.SpaceSyncTask().BatchDelete(s.db.DefCtx(), rms) + if err != nil { + log.Warnf("batch delete task: %v", err) + } else { + log.Infof("%v once task deleted", len(rms)) + } + } + } + + return ch +} + +func (s *SpaceSyncer) Stop() { + s.lock.Lock() + defer s.lock.Unlock() + + for _, t := range s.tasks { + t.CancelFn() + } + + s.tasks = make(map[clitypes.SpaceSyncTaskID]*task) +} + +func (s *SpaceSyncer) CreateTask(t clitypes.SpaceSyncTask) (*TaskInfo, error) { + log := logger.WithField("Mod", logMod) + + d := s.db + err := d.DoTx(func(tx db.SQLContext) error { + err := d.SpaceSyncTask().Create(tx, &t) + if err != nil { + return err + } + + return nil + }) + if err != nil { + return nil, fmt.Errorf("creating space sync task: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + tsk := task{ + Task: t, + Context: ctx, + CancelFn: cancel, + } + + s.lock.Lock() + s.tasks[t.TaskID] = &tsk + s.lock.Unlock() + + switch tr := t.Trigger.(type) { + case *clitypes.SpaceSyncTriggerOnce: + triggerOnce(s, &tsk) + + case *clitypes.SpaceSyncTriggerInterval: + triggerInterval(s, &tsk, tr) + + case *clitypes.SpaceSyncTriggerAt: + triggerAt(s, &tsk, tr) + } + + log.Infof("task %v created", t.TaskID) + + return &TaskInfo{ + Task: t, + }, nil +} + +func (s *SpaceSyncer) CancelTask(taskID clitypes.SpaceSyncTaskID) { + log := logger.WithField("Mod", logMod) + + s.lock.Lock() + defer s.lock.Unlock() + + t := s.tasks[taskID] + if t == nil { + log.Infof("task %v not found, cancel aborted", taskID) + return + } + + t.CancelFn() + delete(s.tasks, taskID) + + err := s.db.SpaceSyncTask().Delete(s.db.DefCtx(), taskID) + if err != nil { + log.Warnf("delete task %v from db: %v", taskID, err) + } + + log.Infof("task %v canceled", taskID) +} + +func (s *SpaceSyncer) GetTask(taskID clitypes.SpaceSyncTaskID) *clitypes.SpaceSyncTask { + s.lock.Lock() + defer s.lock.Unlock() + + tsk := s.tasks[taskID] + if tsk == nil { + return nil + } + + // TODO 考虑复制一份状态,防止修改 + t := tsk.Task + return &t +} + +type TaskInfo struct { + Task clitypes.SpaceSyncTask +} + +type task struct { + Task clitypes.SpaceSyncTask + Context context.Context + CancelFn func() +} diff --git a/client/internal/spacesyncer/trigger.go b/client/internal/spacesyncer/trigger.go new file mode 100644 index 0000000..620cdca --- /dev/null +++ b/client/internal/spacesyncer/trigger.go @@ -0,0 +1,109 @@ +package spacesyncer + +import ( + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/sort2" + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" +) + +func triggerOnce(syncer *SpaceSyncer, task *task) { + go func() { + log := logger.WithField("Mod", logMod) + execute(syncer, task) + + syncer.lock.Lock() + defer syncer.lock.Unlock() + + tsk := syncer.tasks[task.Task.TaskID] + if tsk == nil { + return + } + + tsk.CancelFn() + delete(syncer.tasks, task.Task.TaskID) + + err := syncer.db.SpaceSyncTask().Delete(syncer.db.DefCtx(), task.Task.TaskID) + if err != nil { + log.Warnf("delete task %v from db: %v", task.Task.TaskID, err) + } + }() +} + +func triggerInterval(syncer *SpaceSyncer, task *task, trigger *clitypes.SpaceSyncTriggerInterval) { + go func() { + log := logger.WithField("Mod", logMod) + + ticker := time.NewTicker(time.Duration(trigger.Interval) * time.Second) + defer ticker.Stop() + + loop: + for { + select { + case <-ticker.C: + execute(syncer, task) + case <-task.Context.Done(): + break loop + } + } + + syncer.lock.Lock() + defer syncer.lock.Unlock() + + tsk := syncer.tasks[task.Task.TaskID] + if tsk == nil { + return + } + + tsk.CancelFn() + delete(syncer.tasks, task.Task.TaskID) + + err := syncer.db.SpaceSyncTask().Delete(syncer.db.DefCtx(), task.Task.TaskID) + if err != nil { + log.Warnf("delete task %v from db: %v", task.Task.TaskID, err) + } + }() +} + +func triggerAt(syncer *SpaceSyncer, task *task, trigger *clitypes.SpaceSyncTriggerAt) { + go func() { + log := logger.WithField("Mod", logMod) + + atTimes := sort2.Sort(trigger.At, func(l, r time.Time) int { + return l.Compare(r) + }) + + loop: + for _, at := range atTimes { + nowTime := time.Now() + if nowTime.After(at) { + continue + } + + select { + case <-time.After(at.Sub(nowTime)): + execute(syncer, task) + + case <-task.Context.Done(): + break loop + } + } + + syncer.lock.Lock() + defer syncer.lock.Unlock() + + tsk := syncer.tasks[task.Task.TaskID] + if tsk == nil { + return + } + + tsk.CancelFn() + delete(syncer.tasks, task.Task.TaskID) + + err := syncer.db.SpaceSyncTask().Delete(syncer.db.DefCtx(), task.Task.TaskID) + if err != nil { + log.Warnf("delete task %v from db: %v", task.Task.TaskID, err) + } + }() +} diff --git a/client/internal/uploader/create_load.go b/client/internal/uploader/create_load.go index b0f83c0..4f749a8 100644 --- a/client/internal/uploader/create_load.go +++ b/client/internal/uploader/create_load.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "path" "sync" "time" @@ -20,7 +19,7 @@ import ( type CreateUploader struct { pkg types.Package targetSpaces []types.UserSpaceDetail - copyRoots []string + copyRoots []types.JPath uploader *Uploader pubLock *publock.Mutex successes []db.AddObjectEntry @@ -33,7 +32,7 @@ type CreateUploadResult struct { Objects map[string]types.Object } -func (u *CreateUploader) Upload(pa string, stream io.Reader, opts ...UploadOption) error { +func (u *CreateUploader) Upload(pa types.JPath, stream io.Reader, opts ...UploadOption) error { opt := UploadOption{} if len(opts) > 0 { opt = opts[0] @@ -50,7 +49,7 @@ func (u *CreateUploader) Upload(pa string, stream io.Reader, opts ...UploadOptio ft.AddFrom(fromExec) for i, space := range u.targetSpaces { ft.AddTo(ioswitch2.NewToShardStore(space, ioswitch2.RawStream(), "shardInfo")) - ft.AddTo(ioswitch2.NewToBaseStore(space, path.Join(u.copyRoots[i], pa))) + ft.AddTo(ioswitch2.NewToBaseStore(space, u.copyRoots[i].ConcatNew(pa))) spaceIDs = append(spaceIDs, space.UserSpace.UserSpaceID) } @@ -75,7 +74,7 @@ func (u *CreateUploader) Upload(pa string, stream io.Reader, opts ...UploadOptio // 记录上传结果 shardInfo := ret["fileHash"].(*ops2.FileInfoValue) u.successes = append(u.successes, db.AddObjectEntry{ - Path: pa, + Path: pa.String(), Size: shardInfo.Size, FileHash: shardInfo.Hash, CreateTime: opt.CreateTime, diff --git a/client/internal/uploader/update.go b/client/internal/uploader/update.go index 4c51d01..25f7117 100644 --- a/client/internal/uploader/update.go +++ b/client/internal/uploader/update.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "path" "sync" "time" @@ -24,7 +23,7 @@ type UpdateUploader struct { targetSpace types.UserSpaceDetail pubLock *publock.Mutex copyToSpaces []types.UserSpaceDetail - copyToPath []string + copyToPath []types.JPath successes []db.AddObjectEntry lock sync.Mutex commited bool @@ -45,7 +44,7 @@ type UploadOption struct { CreateTime time.Time // 设置文件的上传时间,如果为0值,则使用开始上传时的时间。 } -func (w *UpdateUploader) Upload(pat string, stream io.Reader, opts ...UploadOption) error { +func (w *UpdateUploader) Upload(pat types.JPath, stream io.Reader, opts ...UploadOption) error { opt := UploadOption{} if len(opts) > 0 { opt = opts[0] @@ -61,7 +60,7 @@ func (w *UpdateUploader) Upload(pat string, stream io.Reader, opts ...UploadOpti AddTo(ioswitch2.NewToShardStore(w.targetSpace, ioswitch2.RawStream(), "shardInfo")) for i, space := range w.copyToSpaces { - ft.AddTo(ioswitch2.NewToBaseStore(space, path.Join(w.copyToPath[i], pat))) + ft.AddTo(ioswitch2.NewToBaseStore(space, w.copyToPath[i].ConcatNew(pat))) } plans := exec.NewPlanBuilder() @@ -85,7 +84,7 @@ func (w *UpdateUploader) Upload(pat string, stream io.Reader, opts ...UploadOpti // 记录上传结果 shardInfo := ret["shardInfo"].(*ops2.FileInfoValue) w.successes = append(w.successes, db.AddObjectEntry{ - Path: pat, + Path: pat.String(), Size: shardInfo.Size, FileHash: shardInfo.Hash, CreateTime: opt.CreateTime, diff --git a/client/internal/uploader/uploader.go b/client/internal/uploader/uploader.go index 3e19da0..0bfd7ba 100644 --- a/client/internal/uploader/uploader.go +++ b/client/internal/uploader/uploader.go @@ -43,7 +43,7 @@ func NewUploader(pubLock *publock.Service, connectivity *connectivity.Collector, } } -func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserSpaceID, copyTo []clitypes.UserSpaceID, copyToPath []string) (*UpdateUploader, error) { +func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserSpaceID, copyTo []clitypes.UserSpaceID, copyToPath []clitypes.JPath) (*UpdateUploader, error) { spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx()) if err != nil { return nil, fmt.Errorf("getting user space ids: %w", err) @@ -137,7 +137,7 @@ func (w *Uploader) chooseUploadStorage(spaces []UploadSpaceInfo, spaceAffinity c return spaces[0] } -func (u *Uploader) BeginCreateUpload(bktID clitypes.BucketID, pkgName string, copyTo []clitypes.UserSpaceID, copyToPath []string) (*CreateUploader, error) { +func (u *Uploader) BeginCreateUpload(bktID clitypes.BucketID, pkgName string, copyTo []clitypes.UserSpaceID, copyToPath []clitypes.JPath) (*CreateUploader, error) { getSpaces := u.spaceMeta.GetMany(copyTo) spacesStgs := make([]clitypes.UserSpaceDetail, len(copyTo)) diff --git a/client/internal/uploader/user_space_upload.go b/client/internal/uploader/user_space_upload.go index e062b19..d181124 100644 --- a/client/internal/uploader/user_space_upload.go +++ b/client/internal/uploader/user_space_upload.go @@ -3,8 +3,8 @@ package uploader import ( "context" "fmt" + "io" "math" - "strings" "time" "github.com/samber/lo" @@ -21,7 +21,7 @@ import ( cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) -func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath string, targetBktID clitypes.BucketID, newPkgName string, uploadAffinity clitypes.UserSpaceID) (*clitypes.Package, error) { +func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath clitypes.JPath, targetBktID clitypes.BucketID, newPkgName string, uploadAffinity clitypes.UserSpaceID) (*clitypes.Package, error) { srcSpace := u.spaceMeta.Get(userSpaceID) if srcSpace == nil { return nil, fmt.Errorf("user space %d not found", userSpaceID) @@ -105,11 +105,6 @@ func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath st delPkg() return nil, fmt.Errorf("getting base store: %w", err) } - entries, err := store.ListAll(rootPath) - if err != nil { - delPkg() - return nil, fmt.Errorf("listing base store: %w", err) - } mutex, err := reqbuilder.NewBuilder().UserSpace().Buzy(srcSpace.UserSpace.UserSpaceID).Buzy(targetSapce.Space.UserSpace.UserSpaceID).MutexLock(u.pubLock) if err != nil { @@ -118,10 +113,35 @@ func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath st } defer mutex.Unlock() - adds, err := u.uploadFromBaseStore(srcSpace, &targetSapce.Space, entries, rootPath) - if err != nil { - delPkg() - return nil, fmt.Errorf("uploading from base store: %w", err) + dirReader := store.ReadDir(rootPath) + var adds []db.AddObjectEntry + entries := make([]types.DirEntry, 0, 50) + for { + eof := false + for len(entries) < 50 { + entry, err := dirReader.Next() + if err == io.EOF { + eof = true + break + } + if err != nil { + delPkg() + return nil, fmt.Errorf("reading dir: %w", err) + } + + entries = append(entries, entry) + } + + as, err := u.uploadFromBaseStore(srcSpace, &targetSapce.Space, entries, rootPath) + if err != nil { + delPkg() + return nil, fmt.Errorf("uploading from base store: %w", err) + } + adds = append(adds, as...) + entries = entries[:0] + if eof { + break + } } _, err = db.DoTx21(u.db, u.db.Object().BatchAdd, pkg.PackageID, adds) @@ -133,7 +153,7 @@ func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath st return &pkg, nil } -func (u *Uploader) uploadFromBaseStore(srcSpace *clitypes.UserSpaceDetail, targetSpace *clitypes.UserSpaceDetail, entries []types.ListEntry, rootPath string) ([]db.AddObjectEntry, error) { +func (u *Uploader) uploadFromBaseStore(srcSpace *clitypes.UserSpaceDetail, targetSpace *clitypes.UserSpaceDetail, entries []types.DirEntry, rootPath clitypes.JPath) ([]db.AddObjectEntry, error) { ft := ioswitch2.FromTo{} for _, e := range entries { @@ -143,7 +163,7 @@ func (u *Uploader) uploadFromBaseStore(srcSpace *clitypes.UserSpaceDetail, targe } ft.AddFrom(ioswitch2.NewFromBaseStore(*srcSpace, e.Path)) - ft.AddTo(ioswitch2.NewToShardStore(*targetSpace, ioswitch2.RawStream(), e.Path)) + ft.AddTo(ioswitch2.NewToShardStore(*targetSpace, ioswitch2.RawStream(), e.Path.String())) } plans := exec.NewPlanBuilder() @@ -159,21 +179,22 @@ func (u *Uploader) uploadFromBaseStore(srcSpace *clitypes.UserSpaceDetail, targe return nil, fmt.Errorf("executing plan: %w", err) } - cleanRoot := strings.TrimSuffix(rootPath, clitypes.ObjectPathSeparator) - adds := make([]db.AddObjectEntry, 0, len(ret)) for _, e := range entries { if e.IsDir { continue } - pat := strings.TrimPrefix(e.Path, cleanRoot+clitypes.ObjectPathSeparator) - if pat == cleanRoot { - pat = clitypes.BaseName(e.Path) + + pat := e.Path.Clone() + pat.DropFrontN(rootPath.Len() - 1) + // 如果对象路径和RootPath相同(即RootPath是一个文件),则用文件名作为对象Path + if pat.Len() > 1 { + pat.DropFrontN(1) } - info := ret[e.Path].(*ops2.FileInfoValue) + info := ret[e.Path.String()].(*ops2.FileInfoValue) adds = append(adds, db.AddObjectEntry{ - Path: pat, + Path: pat.String(), Size: info.Size, FileHash: info.Hash, CreateTime: time.Now(), diff --git a/client/sdk/api/v1/space_syncer.go b/client/sdk/api/v1/space_syncer.go new file mode 100644 index 0000000..cd74e0c --- /dev/null +++ b/client/sdk/api/v1/space_syncer.go @@ -0,0 +1,89 @@ +package api + +import ( + "net/http" + + "gitlink.org.cn/cloudream/common/sdks" + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" +) + +type SpaceSyncerService struct { + *Client +} + +func (c *Client) SpaceSyncer() *SpaceSyncerService { + return &SpaceSyncerService{ + Client: c, + } +} + +const SpaceSyncerCreateTaskPath = "/spaceSyncer/createTask" + +type SpaceSyncerCreateTask struct { + Trigger clitypes.SpaceSyncTrigger `json:"trigger" binding:"required"` + Mode clitypes.SpaceSyncMode `json:"mode" binding:"required"` + Filters []clitypes.SpaceSyncFilter `json:"filters"` + Options clitypes.SpaceSyncOptions `json:"options" binding:"required"` + SrcUserSpaceID clitypes.UserSpaceID `json:"srcUserSpaceID" binding:"required"` + SrcPath string `json:"srcPath"` + DestUserSpaceIDs []clitypes.UserSpaceID `json:"destUserSpaceIDs" binding:"required"` + DestPathes []string `json:"destPathes" binding:"required"` +} + +func (r *SpaceSyncerCreateTask) MakeParam() *sdks.RequestParam { + return sdks.MakeJSONParam(http.MethodPost, SpaceSyncerCreateTaskPath, r) +} + +type SpaceSyncerCreateTaskResp struct { + Task clitypes.SpaceSyncTask `json:"task"` +} + +func (r *SpaceSyncerCreateTaskResp) ParseResponse(resp *http.Response) error { + return sdks.ParseCodeDataJSONResponse(resp, r) +} + +func (c *SpaceSyncerService) CreateTask(req SpaceSyncerCreateTask) (*SpaceSyncerCreateTaskResp, error) { + return JSONAPI(&c.cfg, c.httpCli, &req, &SpaceSyncerCreateTaskResp{}) +} + +const SpaceSyncerGetTaskPath = "/spaceSyncer/getTask" + +type SpaceSyncerGetTask struct { + TaskID clitypes.SpaceSyncTaskID `url:"taskID" binding:"required"` +} + +func (r *SpaceSyncerGetTask) MakeParam() *sdks.RequestParam { + return sdks.MakeQueryParam(http.MethodGet, SpaceSyncerGetTaskPath, r) +} + +type SpaceSyncerGetTaskResp struct { + Task clitypes.SpaceSyncTask `json:"task"` +} + +func (r *SpaceSyncerGetTaskResp) ParseResponse(resp *http.Response) error { + return sdks.ParseCodeDataJSONResponse(resp, r) +} + +func (c *SpaceSyncerService) GetTask(req SpaceSyncerGetTask) (*SpaceSyncerGetTaskResp, error) { + return JSONAPI(&c.cfg, c.httpCli, &req, &SpaceSyncerGetTaskResp{}) +} + +const SpaceSyncerCancelTaskPath = "/spaceSyncer/cancelTask" + +type SpaceSyncerCancelTask struct { + TaskID clitypes.SpaceSyncTaskID `json:"taskID" binding:"required"` +} + +func (r *SpaceSyncerCancelTask) MakeParam() *sdks.RequestParam { + return sdks.MakeJSONParam(http.MethodPost, SpaceSyncerCancelTaskPath, r) +} + +type SpaceSyncerCancelTaskResp struct{} + +func (r *SpaceSyncerCancelTaskResp) ParseResponse(resp *http.Response) error { + return sdks.ParseCodeDataJSONResponse(resp, r) +} + +func (c *SpaceSyncerService) CancelTask(req SpaceSyncerCancelTask) (*SpaceSyncerCancelTaskResp, error) { + return JSONAPI(&c.cfg, c.httpCli, &req, &SpaceSyncerCancelTaskResp{}) +} diff --git a/client/types/path.go b/client/types/path.go new file mode 100644 index 0000000..2b234c7 --- /dev/null +++ b/client/types/path.go @@ -0,0 +1,157 @@ +package types + +import ( + "path/filepath" + "strings" + + "github.com/samber/lo" +) + +type JPath struct { + comps []string +} + +func (p *JPath) Len() int { + return len(p.comps) +} + +func (p *JPath) Comp(idx int) string { + return p.comps[idx] +} + +func (p *JPath) Comps() []string { + return p.comps +} + +func (p *JPath) LastComp() string { + if len(p.comps) == 0 { + return "" + } + + return p.comps[len(p.comps)-1] +} + +func (p *JPath) Push(comp string) { + p.comps = append(p.comps, comp) +} + +func (p *JPath) Pop() string { + if len(p.comps) == 0 { + return "" + } + comp := p.comps[len(p.comps)-1] + p.comps = p.comps[:len(p.comps)-1] + return comp +} + +func (p *JPath) SplitParent() JPath { + if len(p.comps) <= 1 { + return JPath{} + } + + parent := JPath{ + comps: make([]string, len(p.comps)-1), + } + copy(parent.comps, p.comps[:len(p.comps)-1]) + p.comps = p.comps[len(p.comps)-1:] + return parent +} + +func (p *JPath) DropFrontN(cnt int) { + if cnt >= len(p.comps) { + p.comps = nil + return + } + if cnt <= 0 { + return + } + + p.comps = p.comps[cnt:] +} + +func (p *JPath) Concat(other JPath) { + p.comps = append(p.comps, other.comps...) +} + +func (p *JPath) ConcatNew(other JPath) JPath { + clone := p.Clone() + clone.Concat(other) + return clone +} + +func (p *JPath) ConcatComps(comps []string) { + p.comps = append(p.comps, comps...) +} + +func (p *JPath) ConcatCompsNew(comps ...string) JPath { + clone := p.Clone() + clone.ConcatComps(comps) + return clone +} + +func (p *JPath) Clone() JPath { + clone := JPath{ + comps: make([]string, len(p.comps)), + } + copy(clone.comps, p.comps) + return clone +} + +func (p *JPath) JoinOSPath() string { + return filepath.Join(p.comps...) +} + +func (p JPath) String() string { + return strings.Join(p.comps, ObjectPathSeparator) +} + +func (p JPath) ToString() (string, error) { + return p.String(), nil +} + +func (p JPath) FromString(s string) (any, error) { + p2 := PathFromJcsPathString(s) + return p2, nil +} + +func (p JPath) MarshalJSON() ([]byte, error) { + return []byte(`"` + p.String() + `"`), nil +} + +func (p *JPath) UnmarshalJSON(data []byte) error { + s := string(data) + s = strings.Trim(s, `"`) + p2 := PathFromJcsPathString(s) + *p = p2 + return nil +} + +func PathFromComps(comps ...string) JPath { + c2 := make([]string, len(comps)) + copy(c2, comps) + return JPath{ + comps: c2, + } +} + +func PathFromOSPathString(s string) JPath { + cleaned := filepath.Clean(s) + comps := strings.Split(cleaned, string(filepath.Separator)) + return JPath{ + comps: lo.Reject(comps, func(s string, idx int) bool { return s == "" }), + } +} + +func PathFromJcsPathString(s string) JPath { + comps := strings.Split(s, ObjectPathSeparator) + + i := 0 + for ; i < len(comps) && len(comps[i]) == 0; i++ { + } + + comps = comps[i:] + + return JPath{ + comps: comps, + } +} diff --git a/client/types/space_syncer.go b/client/types/space_syncer.go new file mode 100644 index 0000000..12ed6f0 --- /dev/null +++ b/client/types/space_syncer.go @@ -0,0 +1,151 @@ +package types + +import ( + "time" + + "gitlink.org.cn/cloudream/common/pkgs/types" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type SpaceSyncTaskID int64 + +type SpaceSyncTrigger interface { + IsSpaceSyncTrigger() bool +} + +var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[SpaceSyncTrigger]( + (*SpaceSyncTriggerOnce)(nil), + (*SpaceSyncTriggerInterval)(nil), + (*SpaceSyncTriggerAt)(nil), +)), "type") + +// 仅同步一次 +type SpaceSyncTriggerOnce struct { + serder.Metadata `union:"Once"` + Type string `json:"type"` +} + +func (*SpaceSyncTriggerOnce) IsSpaceSyncTrigger() bool { + return true +} + +func (m *SpaceSyncTriggerOnce) OnUnionSerializing() { + m.Type = "Once" +} + +// 隔一段时间同步一次 +type SpaceSyncTriggerInterval struct { + serder.Metadata `union:"Interval"` + Type string `json:"type"` + Interval int64 `json:"interval"` // 单位秒 +} + +func (*SpaceSyncTriggerInterval) IsSpaceSyncTrigger() bool { + return true +} + +func (m *SpaceSyncTriggerInterval) OnUnionSerializing() { + m.Type = "Interval" +} + +// 在固定时间点同步 +type SpaceSyncTriggerAt struct { + serder.Metadata `union:"At"` + Type string `json:"type"` + At []time.Time `json:"at"` +} + +func (*SpaceSyncTriggerAt) IsSpaceSyncTrigger() bool { + return true +} + +func (m *SpaceSyncTriggerAt) OnUnionSerializing() { + m.Type = "At" +} + +type SpaceSyncMode interface { + IsSpaceSyncMode() bool +} + +var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[SpaceSyncMode]( + (*SpaceSyncModeFull)(nil), + (*SpaceSyncModeDiff)(nil), +)), "type") + +type SpaceSyncModeFull struct { + serder.Metadata `union:"Full"` + Type string `json:"type"` +} + +func (*SpaceSyncModeFull) IsSpaceSyncMode() bool { + return true +} + +func (m *SpaceSyncModeFull) OnUnionSerializing() { + m.Type = "Full" +} + +type SpaceSyncModeDiff struct { + serder.Metadata `union:"Diff"` + Type string `json:"type"` + // 将文件的大小作为文件指纹的一部分 + IncludeSize bool `json:"includeSize"` + // 将文件的修改时间作为文件指纹的一部分 + IncludeModTime bool `json:"includeModTime"` + // TODO 删除目录路径多余的文件 + // DeleteExtras bool `json:"deleteExtras"` +} + +func (*SpaceSyncModeDiff) IsSpaceSyncMode() bool { + return true +} + +func (m *SpaceSyncModeDiff) OnUnionSerializing() { + m.Type = "Diff" +} + +type SpaceSyncFilter interface { + IsSpaceSyncfilter() bool +} + +var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[SpaceSyncFilter]( + (*SpaceSyncFilterSize)(nil), +)), "type") + +type SpaceSyncFilterSize struct { + serder.Metadata `union:"Size"` + Type string `json:"type"` + // 最小文件大小。为0则不限制最小文件大小 + MinSize int64 `json:"minSize"` + // 最大文件大小,为0则不限制最大文件大小 + MaxSize int64 `json:"maxSize"` +} + +func (f *SpaceSyncFilterSize) IsSpaceSyncfilter() bool { + return true +} + +type SpaceSyncOptions struct { + // 不保留空文件夹 + NoEmptyDirectories bool `json:"noEmptyDirectories"` +} + +type SpaceSyncDest struct { + DestUserSpaceID UserSpaceID `json:"destUserSpaceID"` + DestPath JPath `json:"destPath"` +} + +type SpaceSyncTask struct { + TaskID SpaceSyncTaskID `gorm:"column:TaskID; primaryKey; type:bigint; autoIncrement" json:"taskID"` + Trigger SpaceSyncTrigger `gorm:"column:Trigger; type:json; not null; serializer:union" json:"trigger"` + Mode SpaceSyncMode `gorm:"column:Mode; type:json; not null; serializer:union" json:"mode"` + Filters []SpaceSyncFilter `gorm:"column:Filters; type:json; not null; serializer:union" json:"filters"` + Options SpaceSyncOptions `gorm:"column:Options; type:json; not null; serializer:union" json:"options"` + SrcUserSpaceID UserSpaceID `gorm:"column:SrcUserSpaceID; type:bigint; not null" json:"srcUserSpaceID"` + SrcPath JPath `gorm:"column:SrcPath; type:varchar(255); not null; serializer:string" json:"srcPath"` + Dests []SpaceSyncDest `gorm:"column:Dests; type:json; not null; serializer:union" json:"dests"` +} + +func (SpaceSyncTask) TableName() string { + return "SpaceSyncTask" +} diff --git a/client/types/types.go b/client/types/types.go index 14b4f64..9e33d5a 100644 --- a/client/types/types.go +++ b/client/types/types.go @@ -83,7 +83,7 @@ type UserSpace struct { // 存储服务特性功能的配置 Features []cortypes.StorageFeature `json:"features" gorm:"column:Features; type:json; serializer:union"` // 各种组件保存数据的根目录。组件工作过程中都会以这个目录为根(除了BaseStore)。 - WorkingDir string `gorm:"column:WorkingDir; type:varchar(1024); not null" json:"workingDir"` + WorkingDir JPath `gorm:"column:WorkingDir; type:varchar(1024); not null; serializer:string" json:"workingDir"` // 工作目录在存储系统中的真实路径。当工作路径在挂载点内时,这个字段记录的是挂载背后的真实路径。部分直接与存储系统交互的组件需要知道真实路径。 // RealWorkingDir string `gorm:"column:RealWorkingDir; type:varchar(1024); not null" json:"realWorkingDir"` // 用户空间信息的版本号,每一次更改都需要更新版本号 diff --git a/common/pkgs/ioswitch2/fromto.go b/common/pkgs/ioswitch2/fromto.go index a7c9976..1820af5 100644 --- a/common/pkgs/ioswitch2/fromto.go +++ b/common/pkgs/ioswitch2/fromto.go @@ -4,6 +4,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/math2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) type From interface { @@ -128,10 +129,10 @@ func (f *FromShardStore) GetStreamIndex() StreamIndex { type FromBaseStore struct { UserSpace clitypes.UserSpaceDetail - Path string + Path clitypes.JPath } -func NewFromBaseStore(space clitypes.UserSpaceDetail, path string) *FromBaseStore { +func NewFromBaseStore(space clitypes.UserSpaceDetail, path clitypes.JPath) *FromBaseStore { return &FromBaseStore{ UserSpace: space, Path: path, @@ -209,10 +210,11 @@ func (t *ToShardStore) GetRange() math2.Range { type ToBaseStore struct { UserSpace clitypes.UserSpaceDetail - ObjectPath string + ObjectPath clitypes.JPath + Option types.WriteOption } -func NewToBaseStore(space clitypes.UserSpaceDetail, objectPath string) *ToBaseStore { +func NewToBaseStore(space clitypes.UserSpaceDetail, objectPath clitypes.JPath) *ToBaseStore { return &ToBaseStore{ UserSpace: space, ObjectPath: objectPath, diff --git a/common/pkgs/ioswitch2/ops2/base_store.go b/common/pkgs/ioswitch2/ops2/base_store.go index 844ec24..b7842d6 100644 --- a/common/pkgs/ioswitch2/ops2/base_store.go +++ b/common/pkgs/ioswitch2/ops2/base_store.go @@ -24,7 +24,7 @@ func init() { type BaseRead struct { Output exec.VarID UserSpace clitypes.UserSpaceDetail - Path string + Path clitypes.JPath Option types.OpenOption } @@ -120,8 +120,9 @@ func (o *BaseReadDyn) String() string { type BaseWrite struct { Input exec.VarID UserSpace clitypes.UserSpaceDetail - Path string + Path clitypes.JPath FileInfo exec.VarID + Option types.WriteOption } func (o *BaseWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { @@ -146,7 +147,7 @@ func (o *BaseWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } defer input.Stream.Close() - ret, err := store.Write(o.Path, input.Stream) + ret, err := store.Write(o.Path, input.Stream, o.Option) if err != nil { return err } @@ -165,11 +166,11 @@ type BaseReadNode struct { dag.NodeBase From ioswitch2.From UserSpace clitypes.UserSpaceDetail - Path string + Path clitypes.JPath Option types.OpenOption } -func (b *GraphNodeBuilder) NewBaseRead(from ioswitch2.From, userSpace clitypes.UserSpaceDetail, path string, opt types.OpenOption) *BaseReadNode { +func (b *GraphNodeBuilder) NewBaseRead(from ioswitch2.From, userSpace clitypes.UserSpaceDetail, path clitypes.JPath, opt types.OpenOption) *BaseReadNode { node := &BaseReadNode{ From: from, UserSpace: userSpace, @@ -253,14 +254,16 @@ type BaseWriteNode struct { dag.NodeBase To ioswitch2.To UserSpace clitypes.UserSpaceDetail - Path string + Path clitypes.JPath + Option types.WriteOption } -func (b *GraphNodeBuilder) NewBaseWrite(to ioswitch2.To, userSpace clitypes.UserSpaceDetail, path string) *BaseWriteNode { +func (b *GraphNodeBuilder) NewBaseWrite(to ioswitch2.To, userSpace clitypes.UserSpaceDetail, path clitypes.JPath, opt types.WriteOption) *BaseWriteNode { node := &BaseWriteNode{ To: to, UserSpace: userSpace, Path: path, + Option: opt, } b.AddNode(node) @@ -293,5 +296,6 @@ func (t *BaseWriteNode) GenerateOp() (exec.Op, error) { UserSpace: t.UserSpace, Path: t.Path, FileInfo: t.FileInfoVar().Var().VarID, + Option: t.Option, }, nil } diff --git a/common/pkgs/ioswitch2/ops2/s2s.go b/common/pkgs/ioswitch2/ops2/s2s.go index aa58c26..92c63e3 100644 --- a/common/pkgs/ioswitch2/ops2/s2s.go +++ b/common/pkgs/ioswitch2/ops2/s2s.go @@ -16,9 +16,9 @@ func init() { type S2STransfer struct { SrcSpace clitypes.UserSpaceDetail - SrcPath string + SrcPath clitypes.JPath DstSpace clitypes.UserSpaceDetail - DstPath string + DstPath clitypes.JPath Output exec.VarID } @@ -58,7 +58,7 @@ type S2STransferDyn struct { SrcSpace clitypes.UserSpaceDetail SrcFileInfo exec.VarID DstSpace clitypes.UserSpaceDetail - DstPath string + DstPath clitypes.JPath Output exec.VarID } @@ -102,12 +102,12 @@ func (o *S2STransferDyn) String() string { type S2STransferNode struct { dag.NodeBase SrcSpace clitypes.UserSpaceDetail - SrcPath string + SrcPath clitypes.JPath DstSpace clitypes.UserSpaceDetail - DstPath string + DstPath clitypes.JPath } -func (b *GraphNodeBuilder) NewS2STransfer(srcSpace clitypes.UserSpaceDetail, srcPath string, dstSpace clitypes.UserSpaceDetail, dstPath string) *S2STransferNode { +func (b *GraphNodeBuilder) NewS2STransfer(srcSpace clitypes.UserSpaceDetail, srcPath clitypes.JPath, dstSpace clitypes.UserSpaceDetail, dstPath clitypes.JPath) *S2STransferNode { n := &S2STransferNode{ SrcSpace: srcSpace, SrcPath: srcPath, @@ -141,10 +141,10 @@ type S2STransferDynNode struct { dag.NodeBase SrcSpace clitypes.UserSpaceDetail DstSpace clitypes.UserSpaceDetail - DstPath string + DstPath clitypes.JPath } -func (b *GraphNodeBuilder) NewS2STransferDyn(srcSpace clitypes.UserSpaceDetail, dstSpace clitypes.UserSpaceDetail, dstPath string) *S2STransferDynNode { +func (b *GraphNodeBuilder) NewS2STransferDyn(srcSpace clitypes.UserSpaceDetail, dstSpace clitypes.UserSpaceDetail, dstPath clitypes.JPath) *S2STransferDynNode { n := &S2STransferDynNode{ SrcSpace: srcSpace, DstSpace: dstSpace, diff --git a/common/pkgs/ioswitch2/parser/gen/generator.go b/common/pkgs/ioswitch2/parser/gen/generator.go index be6567a..a536315 100644 --- a/common/pkgs/ioswitch2/parser/gen/generator.go +++ b/common/pkgs/ioswitch2/parser/gen/generator.go @@ -350,7 +350,7 @@ func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error) case *ioswitch2.ToShardStore: tempFileName := types.MakeTempDirPath(&t.UserSpace, os2.GenerateRandomFileName(20)) - write := ctx.DAG.NewBaseWrite(t, t.UserSpace, tempFileName) + write := ctx.DAG.NewBaseWrite(t, t.UserSpace, tempFileName, types.WriteOption{}) if err := setEnvBySpace(write, &t.UserSpace); err != nil { return nil, fmt.Errorf("set node env by user space: %w", err) } @@ -370,7 +370,7 @@ func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error) return n, nil case *ioswitch2.ToBaseStore: - n := ctx.DAG.NewBaseWrite(t, t.UserSpace, t.ObjectPath) + n := ctx.DAG.NewBaseWrite(t, t.UserSpace, t.ObjectPath, t.Option) if err := setEnvBySpace(n, &t.UserSpace); err != nil { return nil, fmt.Errorf("set node env by user space: %w", err) diff --git a/common/pkgs/ioswitch2/plans/complete_multipart.go b/common/pkgs/ioswitch2/plans/complete_multipart.go index 3b69a85..8ca52c2 100644 --- a/common/pkgs/ioswitch2/plans/complete_multipart.go +++ b/common/pkgs/ioswitch2/plans/complete_multipart.go @@ -38,7 +38,7 @@ func CompleteMultipart(blocks []clitypes.ObjectBlock, blockSpaces []clitypes.Use } // TODO 应该采取更合理的方式同时支持Parser和直接生成DAG - br := da.NewBaseWrite(nil, targetSpace, types.MakeTempDirPath(&targetSpace, os2.GenerateRandomFileName(20))) + br := da.NewBaseWrite(nil, targetSpace, types.MakeTempDirPath(&targetSpace, os2.GenerateRandomFileName(20)), types.WriteOption{}) if err := setEnvBySpace(br, &targetSpace); err != nil { return fmt.Errorf("set node env by user space: %w", err) } diff --git a/common/pkgs/ioswitchlrc/fromto.go b/common/pkgs/ioswitchlrc/fromto.go index 1fe709b..bf1e298 100644 --- a/common/pkgs/ioswitchlrc/fromto.go +++ b/common/pkgs/ioswitchlrc/fromto.go @@ -4,6 +4,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/math2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) type From interface { @@ -91,6 +92,7 @@ type ToNode struct { DataIndex int Range math2.Range FileHashStoreKey string + Option types.WriteOption } func NewToStorage(space clitypes.UserSpaceDetail, dataIndex int, fileHashStoreKey string) *ToNode { diff --git a/common/pkgs/ioswitchlrc/ops2/base_store.go b/common/pkgs/ioswitchlrc/ops2/base_store.go index 394343f..f358594 100644 --- a/common/pkgs/ioswitchlrc/ops2/base_store.go +++ b/common/pkgs/ioswitchlrc/ops2/base_store.go @@ -24,7 +24,7 @@ func init() { type BaseRead struct { Output exec.VarID UserSpace clitypes.UserSpaceDetail - Path string + Path clitypes.JPath Option types.OpenOption } @@ -119,8 +119,9 @@ func (o *BaseReadDyn) String() string { type BaseWrite struct { Input exec.VarID UserSpace clitypes.UserSpaceDetail - Path string + Path clitypes.JPath WriteResult exec.VarID + Option types.WriteOption } func (o *BaseWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { @@ -145,7 +146,7 @@ func (o *BaseWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } defer input.Stream.Close() - ret, err := store.Write(o.Path, input.Stream) + ret, err := store.Write(o.Path, input.Stream, o.Option) if err != nil { return err } @@ -164,11 +165,11 @@ type BaseReadNode struct { dag.NodeBase From ioswitchlrc.From UserSpace clitypes.UserSpaceDetail - Path string + Path clitypes.JPath Option types.OpenOption } -func (b *GraphNodeBuilder) NewBaseRead(from ioswitchlrc.From, userSpace clitypes.UserSpaceDetail, path string, opt types.OpenOption) *BaseReadNode { +func (b *GraphNodeBuilder) NewBaseRead(from ioswitchlrc.From, userSpace clitypes.UserSpaceDetail, path clitypes.JPath, opt types.OpenOption) *BaseReadNode { node := &BaseReadNode{ From: from, UserSpace: userSpace, @@ -252,14 +253,16 @@ type BaseWriteNode struct { dag.NodeBase To ioswitchlrc.To UserSpace clitypes.UserSpaceDetail - Path string + Path clitypes.JPath + Option types.WriteOption } -func (b *GraphNodeBuilder) NewBaseWrite(to ioswitchlrc.To, userSpace clitypes.UserSpaceDetail, path string) *BaseWriteNode { +func (b *GraphNodeBuilder) NewBaseWrite(to ioswitchlrc.To, userSpace clitypes.UserSpaceDetail, path clitypes.JPath, opt types.WriteOption) *BaseWriteNode { node := &BaseWriteNode{ To: to, UserSpace: userSpace, Path: path, + Option: opt, } b.AddNode(node) @@ -292,5 +295,6 @@ func (t *BaseWriteNode) GenerateOp() (exec.Op, error) { UserSpace: t.UserSpace, Path: t.Path, WriteResult: t.FileInfoVar().Var().VarID, + Option: t.Option, }, nil } diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index badb059..c7035da 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -103,7 +103,7 @@ func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) { switch t := t.(type) { case *ioswitchlrc.ToNode: tempFileName := types.MakeTempDirPath(&t.UserSpace, os2.GenerateRandomFileName(20)) - write := ctx.DAG.NewBaseWrite(t, t.UserSpace, tempFileName) + write := ctx.DAG.NewBaseWrite(t, t.UserSpace, tempFileName, t.Option) if err := setEnvBySpace(write, &t.UserSpace); err != nil { return nil, fmt.Errorf("set node env by user space: %w", err) diff --git a/common/pkgs/storage/efile/ec_multiplier.go b/common/pkgs/storage/efile/ec_multiplier.go index 5e0da05..f03292c 100644 --- a/common/pkgs/storage/efile/ec_multiplier.go +++ b/common/pkgs/storage/efile/ec_multiplier.go @@ -48,7 +48,7 @@ func (m *ECMultiplier) Multiply(coef [][]byte, inputs []types.HTTPRequest, chunk } fileName := os2.GenerateRandomFileName(10) - tempDir := path.Join(m.blder.detail.UserSpace.WorkingDir, types.TempWorkingDir) + tempDir := path.Join(m.blder.detail.UserSpace.WorkingDir.String(), types.TempWorkingDir) m.outputs = make([]string, len(coef)) for i := range m.outputs { m.outputs[i] = path.Join(tempDir, fmt.Sprintf("%s_%d", fileName, i)) @@ -97,7 +97,8 @@ func (m *ECMultiplier) Multiply(coef [][]byte, inputs []types.HTTPRequest, chunk ret := make([]types.FileInfo, len(r.Data)) for i, data := range r.Data { ret[i] = types.FileInfo{ - Path: m.outputs[i], + // TODO 要确认一下output的格式 + Path: clitypes.PathFromJcsPathString(m.outputs[i]), Size: data.Size, Hash: clitypes.NewFullHashFromString(data.Sha256), } diff --git a/common/pkgs/storage/local/base_store.go b/common/pkgs/storage/local/base_store.go index e3c1819..758b675 100644 --- a/common/pkgs/storage/local/base_store.go +++ b/common/pkgs/storage/local/base_store.go @@ -3,7 +3,6 @@ package local import ( "crypto/sha256" "io" - "io/fs" "os" "path/filepath" @@ -25,8 +24,10 @@ func NewBaseStore(root string, detail *clitypes.UserSpaceDetail) (*BaseStore, er }, nil } -func (s *BaseStore) Write(objPath string, stream io.Reader) (types.FileInfo, error) { - absObjPath := filepath.Join(s.root, objPath) +func (s *BaseStore) Write(pat clitypes.JPath, stream io.Reader, opt types.WriteOption) (types.FileInfo, error) { + log := s.getLogger() + + absObjPath := filepath.Join(s.root, pat.String()) err := os.MkdirAll(filepath.Dir(absObjPath), 0755) if err != nil { @@ -47,15 +48,22 @@ func (s *BaseStore) Write(objPath string, stream io.Reader) (types.FileInfo, err return types.FileInfo{}, err } + if !opt.ModTime.IsZero() { + err := os.Chtimes(absObjPath, opt.ModTime, opt.ModTime) + if err != nil { + log.Warnf("change file %v mod time: %v", absObjPath, err) + } + } + return types.FileInfo{ - Path: objPath, + Path: pat, Size: counter.Count(), Hash: clitypes.NewFullHash(hasher.Sum()), }, nil } -func (s *BaseStore) Read(objPath string, opt types.OpenOption) (io.ReadCloser, error) { - absObjPath := filepath.Join(s.root, objPath) +func (s *BaseStore) Read(objPath clitypes.JPath, opt types.OpenOption) (io.ReadCloser, error) { + absObjPath := filepath.Join(s.root, objPath.JoinOSPath()) file, err := os.Open(absObjPath) if err != nil { return nil, err @@ -78,8 +86,8 @@ func (s *BaseStore) Read(objPath string, opt types.OpenOption) (io.ReadCloser, e return ret, nil } -func (s *BaseStore) Mkdir(path string) error { - absObjPath := filepath.Join(s.root, path) +func (s *BaseStore) Mkdir(path clitypes.JPath) error { + absObjPath := filepath.Join(s.root, path.JoinOSPath()) err := os.MkdirAll(absObjPath, 0755) if err != nil { return err @@ -88,54 +96,17 @@ func (s *BaseStore) Mkdir(path string) error { return nil } -func (s *BaseStore) ListAll(path string) ([]types.ListEntry, error) { - absObjPath := filepath.Join(s.root, path) - - var es []types.ListEntry - err := filepath.WalkDir(absObjPath, func(path string, d fs.DirEntry, err error) error { - if err != nil { - return err - } - - relaPath, err := filepath.Rel(s.root, path) - if err != nil { - return err - } - - if d.IsDir() { - es = append(es, types.ListEntry{ - Path: filepath.ToSlash(relaPath), - Size: 0, - IsDir: true, - }) - return nil - } - info, err := d.Info() - if err != nil { - return err - } - - es = append(es, types.ListEntry{ - Path: filepath.ToSlash(relaPath), - Size: info.Size(), - IsDir: false, - }) - return nil - }) - if os.IsNotExist(err) { - return nil, nil +func (s *BaseStore) ReadDir(pat clitypes.JPath) types.DirReader { + return &DirReader{ + absRootPath: filepath.Join(s.root, pat.JoinOSPath()), + rootJPath: pat.Clone(), } - if err != nil { - return nil, err - } - - return es, nil } func (s *BaseStore) CleanTemps() { log := s.getLogger() - tempDir := filepath.Join(s.root, s.detail.UserSpace.WorkingDir, types.TempWorkingDir) + tempDir := filepath.Join(s.root, s.detail.UserSpace.WorkingDir.JoinOSPath(), types.TempWorkingDir) entries, err := os.ReadDir(tempDir) if err != nil { log.Warnf("read temp dir: %v", err) @@ -163,6 +134,11 @@ func (s *BaseStore) CleanTemps() { } } +func (s *BaseStore) Test() error { + _, err := os.Stat(s.root) + return err +} + func (s *BaseStore) getLogger() logger.Logger { return logger.WithField("BaseStore", "Local").WithField("UserSpace", s.detail.UserSpace) } diff --git a/common/pkgs/storage/local/dir_reader.go b/common/pkgs/storage/local/dir_reader.go new file mode 100644 index 0000000..e09ad4a --- /dev/null +++ b/common/pkgs/storage/local/dir_reader.go @@ -0,0 +1,117 @@ +package local + +import ( + "io" + "os" + "path/filepath" + + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" +) + +type DirReader struct { + // 完整的根路径(包括ReadDir的path参数),比如包括了盘符 + absRootPath string + // ReadDir函数传递进来的path参数 + rootJPath clitypes.JPath + init bool + curEntries []dirEntry +} + +func (r *DirReader) Next() (types.DirEntry, error) { + if !r.init { + info, err := os.Stat(r.absRootPath) + if err != nil { + return types.DirEntry{}, err + } + + if !info.IsDir() { + r.init = true + return types.DirEntry{ + Path: r.rootJPath, + Size: info.Size(), + ModTime: info.ModTime(), + IsDir: false, + }, nil + } + + es, err := os.ReadDir(r.absRootPath) + if err != nil { + return types.DirEntry{}, err + } + + for _, e := range es { + r.curEntries = append(r.curEntries, dirEntry{ + dir: clitypes.JPath{}, + entry: e, + }) + } + + r.init = true + } + if len(r.curEntries) == 0 { + return types.DirEntry{}, io.EOF + } + + entry := r.curEntries[0] + r.curEntries = r.curEntries[1:] + + if entry.entry.IsDir() { + es, err := os.ReadDir(filepath.Join(r.absRootPath, entry.dir.JoinOSPath(), entry.entry.Name())) + if err != nil { + return types.DirEntry{}, nil + } + + // 多个entry对象共享同一个JPath对象,但因为不会修改JPath,所以没问题 + dir := entry.dir.Clone() + dir.Push(entry.entry.Name()) + for _, e := range es { + r.curEntries = append(r.curEntries, dirEntry{ + dir: dir, + entry: e, + }) + } + } + + info, err := entry.entry.Info() + if err != nil { + return types.DirEntry{}, err + } + + p := r.rootJPath.ConcatNew(entry.dir) + p.Push(entry.entry.Name()) + + if entry.entry.IsDir() { + return types.DirEntry{ + Path: p, + Size: 0, + ModTime: info.ModTime(), + IsDir: true, + }, nil + } + + return types.DirEntry{ + Path: p, + Size: info.Size(), + ModTime: info.ModTime(), + IsDir: false, + }, nil +} + +func (r *DirReader) Close() { + +} + +type dirEntry struct { + dir clitypes.JPath + entry os.DirEntry +} + +type fileInfoDirEntry struct { + info os.FileInfo +} + +func (d fileInfoDirEntry) Name() string { return d.info.Name() } +func (d fileInfoDirEntry) IsDir() bool { return d.info.IsDir() } +func (d fileInfoDirEntry) Type() os.FileMode { return d.info.Mode().Type() } +func (d fileInfoDirEntry) Info() (os.FileInfo, error) { return d.info, nil } diff --git a/common/pkgs/storage/local/multipart_upload.go b/common/pkgs/storage/local/multipart_upload.go index bdfd68d..2ffacdb 100644 --- a/common/pkgs/storage/local/multipart_upload.go +++ b/common/pkgs/storage/local/multipart_upload.go @@ -18,8 +18,9 @@ import ( ) type Multiparter struct { - detail *clitypes.UserSpaceDetail - feat *cortypes.MultipartUploadFeature + detail *clitypes.UserSpaceDetail + localStg *cortypes.LocalCred + feat *cortypes.MultipartUploadFeature } func (*Multiparter) MinPartSize() int64 { @@ -31,7 +32,7 @@ func (*Multiparter) MaxPartSize() int64 { } func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error) { - tempDir := filepath.Join(m.detail.UserSpace.WorkingDir, types.TempWorkingDir) + tempDir := filepath.Join(m.localStg.RootDir, m.detail.UserSpace.WorkingDir.JoinOSPath(), types.TempWorkingDir) absTempDir, err := filepath.Abs(tempDir) if err != nil { return nil, fmt.Errorf("get abs temp dir %v: %v", tempDir, err) @@ -51,7 +52,7 @@ func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error) absTempDir: absTempDir, tempFileName: tempFileName, tempPartsDir: tempPartsDir, - joinedFilePath: types.PathJoin(m.detail.UserSpace.WorkingDir, types.TempWorkingDir, tempFileName+".joined"), + joinedFileJPath: m.detail.UserSpace.WorkingDir.ConcatCompsNew(types.TempWorkingDir, tempFileName+".joined"), absJoinedFilePath: absJoinedFilePath, uploadID: tempPartsDir, }, nil @@ -79,7 +80,7 @@ type MultipartTask struct { absTempDir string // 应该要是绝对路径 tempFileName string tempPartsDir string - joinedFilePath string + joinedFileJPath clitypes.JPath absJoinedFilePath string uploadID string } @@ -115,7 +116,7 @@ func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPar h := hasher.Sum(nil) return types.FileInfo{ - Path: i.joinedFilePath, + Path: i.joinedFileJPath, Size: size, Hash: clitypes.NewFullHash(h), }, nil diff --git a/common/pkgs/storage/local/s2s.go b/common/pkgs/storage/local/s2s.go index b3a1ffb..bede4f5 100644 --- a/common/pkgs/storage/local/s2s.go +++ b/common/pkgs/storage/local/s2s.go @@ -4,6 +4,7 @@ import ( "context" "io" "os" + "path/filepath" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" @@ -11,9 +12,10 @@ import ( ) type S2STransfer struct { - feat *cortypes.S2STransferFeature - detail *clitypes.UserSpaceDetail - dstPath string + feat *cortypes.S2STransferFeature + detail *clitypes.UserSpaceDetail + localStg *cortypes.LocalCred + dstPath clitypes.JPath } // 只有同一个机器的存储之间才可以进行数据直传 @@ -35,16 +37,16 @@ func (*S2STransfer) CanTransfer(src, dst *clitypes.UserSpaceDetail) bool { } // 执行数据直传 -func (s *S2STransfer) Transfer(ctx context.Context, src *clitypes.UserSpaceDetail, srcPath string, dstPath string) (types.FileInfo, error) { +func (s *S2STransfer) Transfer(ctx context.Context, src *clitypes.UserSpaceDetail, srcPath clitypes.JPath, dstPath clitypes.JPath) (types.FileInfo, error) { s.dstPath = dstPath - copy, err := os.OpenFile(s.dstPath, os.O_WRONLY|os.O_CREATE, 0644) + copy, err := os.OpenFile(filepath.Join(s.localStg.RootDir, s.dstPath.JoinOSPath()), os.O_WRONLY|os.O_CREATE, 0644) if err != nil { return types.FileInfo{}, err } defer copy.Close() - srcFile, err := os.Open(srcPath) + srcFile, err := os.Open(filepath.Join(s.localStg.RootDir, srcPath.JoinOSPath())) if err != nil { return types.FileInfo{}, err } diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index ff54b4e..52451b2 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -22,7 +22,7 @@ type ShardStore struct { } func NewShardStore(root string, detail *clitypes.UserSpaceDetail) (*ShardStore, error) { - storeAbsRoot, err := filepath.Abs(filepath.Join(root, detail.UserSpace.WorkingDir, types.ShardStoreWorkingDir)) + storeAbsRoot, err := filepath.Abs(filepath.Join(root, detail.UserSpace.WorkingDir.JoinOSPath(), types.ShardStoreWorkingDir)) if err != nil { return nil, fmt.Errorf("get abs root: %w", err) } @@ -43,8 +43,8 @@ func (s *ShardStore) Stop() { s.getLogger().Infof("component stop") } -func (s *ShardStore) Store(path string, hash clitypes.FileHash, size int64) (types.FileInfo, error) { - fullTempPath := filepath.Join(s.stgRoot, path) +func (s *ShardStore) Store(path clitypes.JPath, hash clitypes.FileHash, size int64) (types.FileInfo, error) { + fullTempPath := filepath.Join(s.stgRoot, path.JoinOSPath()) s.lock.Lock() defer s.lock.Unlock() @@ -77,7 +77,7 @@ func (s *ShardStore) Store(path string, hash clitypes.FileHash, size int64) (typ return types.FileInfo{ Hash: hash, Size: size, - Path: s.getSlashFilePathFromHash(hash), + Path: s.getJPathFromHash(hash), }, nil } @@ -94,7 +94,7 @@ func (s *ShardStore) Info(hash clitypes.FileHash) (types.FileInfo, error) { return types.FileInfo{ Hash: hash, Size: info.Size(), - Path: s.getSlashFilePathFromHash(hash), + Path: s.getJPathFromHash(hash), }, nil } @@ -126,7 +126,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { infos = append(infos, types.FileInfo{ Hash: fileHash, Size: info.Size(), - Path: s.getSlashFilePathFromHash(fileHash), + Path: s.getJPathFromHash(fileHash), }) return nil }) @@ -207,6 +207,6 @@ func (s *ShardStore) getFilePathFromHash(hash clitypes.FileHash) string { return filepath.Join(s.storeAbsRoot, hash.GetHashPrefix(2), string(hash)) } -func (s *ShardStore) getSlashFilePathFromHash(hash clitypes.FileHash) string { - return types.PathJoin(s.detail.UserSpace.WorkingDir, types.ShardStoreWorkingDir, hash.GetHashPrefix(2), string(hash)) +func (s *ShardStore) getJPathFromHash(hash clitypes.FileHash) clitypes.JPath { + return s.detail.UserSpace.WorkingDir.ConcatCompsNew(types.ShardStoreWorkingDir, hash.GetHashPrefix(2), string(hash)) } diff --git a/common/pkgs/storage/obs/obs_test.go b/common/pkgs/storage/obs/obs_test.go index 65bf268..5b35554 100644 --- a/common/pkgs/storage/obs/obs_test.go +++ b/common/pkgs/storage/obs/obs_test.go @@ -38,7 +38,7 @@ func Test_S2S(t *testing.T) { SK: "", }, }, - }, "test_data/test03.txt", "atest.txt") + }, clitypes.PathFromComps("test_data/test03.txt"), clitypes.PathFromComps("atest.txt")) defer s2s.Close() So(err, ShouldEqual, nil) diff --git a/common/pkgs/storage/obs/s2s.go b/common/pkgs/storage/obs/s2s.go index 3a37626..d44bc52 100644 --- a/common/pkgs/storage/obs/s2s.go +++ b/common/pkgs/storage/obs/s2s.go @@ -14,6 +14,7 @@ import ( omsregion "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/oms/v2/region" "gitlink.org.cn/cloudream/common/utils/os2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + stgs3 "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/s3" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) @@ -38,12 +39,12 @@ func NewS2STransfer(detail *clitypes.UserSpaceDetail, stgType *cortypes.OBSType, // 判断是否能从指定的源存储中直传到当前存储的目的路径 func (*S2STransfer) CanTransfer(src, dst *clitypes.UserSpaceDetail) bool { - req := makeRequest(src, "") + req := makeRequest(src, clitypes.JPath{}) return req != nil } // 执行数据直传。返回传输后的文件路径 -func (s *S2STransfer) Transfer(ctx context.Context, src *clitypes.UserSpaceDetail, srcPath string, dstPath string) (types.FileInfo, error) { +func (s *S2STransfer) Transfer(ctx context.Context, src *clitypes.UserSpaceDetail, srcPath clitypes.JPath, dstPath clitypes.JPath) (types.FileInfo, error) { req := makeRequest(src, srcPath) if req == nil { return types.FileInfo{}, fmt.Errorf("unsupported source storage type: %T", src.UserSpace.Storage) @@ -72,8 +73,8 @@ func (s *S2STransfer) Transfer(ctx context.Context, src *clitypes.UserSpaceDetai } // 先上传成一个临时文件 - tempDir := types.PathJoin(s.detail.UserSpace.WorkingDir, types.TempWorkingDir) - tempPrefix := types.PathJoin(tempDir, os2.GenerateRandomFileName(10)) + "/" + tempDir := stgs3.JoinKey(s.detail.UserSpace.WorkingDir.String(), types.TempWorkingDir) + tempPrefix := stgs3.JoinKey(tempDir, os2.GenerateRandomFileName(10)) + "/" taskType := model.GetCreateTaskReqTaskTypeEnum().OBJECT s.omsCli = oms.NewOmsClient(cli) @@ -110,8 +111,8 @@ func (s *S2STransfer) Transfer(ctx context.Context, src *clitypes.UserSpaceDetai _, err = obsCli.CopyObject(ctx, &awss3.CopyObjectInput{ Bucket: aws.String(bkt), - CopySource: aws.String(types.PathJoin(bkt, tempPrefix, srcPath)), - Key: aws.String(dstPath), + CopySource: aws.String(stgs3.JoinKey(bkt, tempPrefix, srcPath.String())), + Key: aws.String(dstPath.String()), }) if err != nil { return types.FileInfo{}, fmt.Errorf("copy object: %w", err) @@ -177,7 +178,7 @@ func (s *S2STransfer) Close() { } } -func makeRequest(srcStg *clitypes.UserSpaceDetail, srcPath string) *model.SrcNodeReq { +func makeRequest(srcStg *clitypes.UserSpaceDetail, srcPath clitypes.JPath) *model.SrcNodeReq { switch srcType := srcStg.UserSpace.Storage.(type) { case *cortypes.OBSType: cloudType := "HuaweiCloud" @@ -193,7 +194,7 @@ func makeRequest(srcStg *clitypes.UserSpaceDetail, srcPath string) *model.SrcNod Ak: &cred.AK, Sk: &cred.SK, Bucket: &srcType.Bucket, - ObjectKey: &[]string{srcPath}, + ObjectKey: &[]string{srcPath.String()}, } default: diff --git a/common/pkgs/storage/obs/shard_store.go b/common/pkgs/storage/obs/shard_store.go index 3137c77..a6f91cb 100644 --- a/common/pkgs/storage/obs/shard_store.go +++ b/common/pkgs/storage/obs/shard_store.go @@ -38,10 +38,11 @@ func (s *ShardStore) MakeHTTPReadRequest(fileHash clitypes.FileHash) (types.HTTP return types.HTTPRequest{}, err } + filePath := s.GetFilePathFromHash(fileHash) getSigned, err := cli.CreateSignedUrl(&obs.CreateSignedUrlInput{ Method: "GET", Bucket: s.Bucket, - Key: s.GetFilePathFromHash(fileHash), + Key: filePath.String(), Expires: 3600, }) if err != nil { diff --git a/common/pkgs/storage/s3/base_store.go b/common/pkgs/storage/s3/base_store.go index 221bc67..9709546 100644 --- a/common/pkgs/storage/s3/base_store.go +++ b/common/pkgs/storage/s3/base_store.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -17,6 +18,10 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) +const ( + ModTimeHeader = "X-JCS-ModTime" +) + type BaseStore struct { Detail *clitypes.UserSpaceDetail Bucket string @@ -37,17 +42,29 @@ func NewBaseStore(detail *clitypes.UserSpaceDetail, cli *s3.Client, bkt string, }, nil } -func (s *BaseStore) Write(objPath string, stream io.Reader) (types.FileInfo, error) { - key := objPath +func (s *BaseStore) Write(pat clitypes.JPath, stream io.Reader, opt types.WriteOption) (types.FileInfo, error) { + key := pat + meta := make(map[string]string) + if opt.ModTime.IsZero() { + mt, _ := time.Now().MarshalText() + meta[ModTimeHeader] = string(mt) + } else { + mt, err := opt.ModTime.MarshalText() + if err != nil { + return types.FileInfo{}, err + } + meta[ModTimeHeader] = string(mt) + } counter := io2.Counter(stream) if s.opt.UseAWSSha256 { resp, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(key), + Key: aws.String(key.String()), Body: counter, ChecksumAlgorithm: s3types.ChecksumAlgorithmSha256, + Metadata: meta, }) if err != nil { return types.FileInfo{}, err @@ -70,9 +87,10 @@ func (s *BaseStore) Write(objPath string, stream io.Reader) (types.FileInfo, err hashStr := io2.NewReadHasher(sha256.New(), counter) _, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{ - Bucket: aws.String(s.Bucket), - Key: aws.String(key), - Body: counter, + Bucket: aws.String(s.Bucket), + Key: aws.String(key.String()), + Body: counter, + Metadata: meta, }) if err != nil { return types.FileInfo{}, err @@ -85,8 +103,8 @@ func (s *BaseStore) Write(objPath string, stream io.Reader) (types.FileInfo, err }, nil } -func (s *BaseStore) Read(objPath string, opt types.OpenOption) (io.ReadCloser, error) { - key := objPath +func (s *BaseStore) Read(pat clitypes.JPath, opt types.OpenOption) (io.ReadCloser, error) { + key := pat rngStr := fmt.Sprintf("bytes=%d-", opt.Offset) if opt.Length >= 0 { @@ -95,7 +113,7 @@ func (s *BaseStore) Read(objPath string, opt types.OpenOption) (io.ReadCloser, e resp, err := s.cli.GetObject(context.TODO(), &s3.GetObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(key), + Key: aws.String(key.String()), Range: aws.String(rngStr), }) @@ -106,51 +124,21 @@ func (s *BaseStore) Read(objPath string, opt types.OpenOption) (io.ReadCloser, e return resp.Body, nil } -func (s *BaseStore) Mkdir(path string) error { +func (s *BaseStore) Mkdir(path clitypes.JPath) error { _, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(path + "/"), + Key: aws.String(path.String() + "/"), Body: bytes.NewReader([]byte{}), }) return err } -func (s *BaseStore) ListAll(path string) ([]types.ListEntry, error) { - key := path - // TODO 待测试 - - input := &s3.ListObjectsInput{ - Bucket: aws.String(s.Bucket), - Prefix: aws.String(key), - Delimiter: aws.String("/"), - } - - var objs []types.ListEntry - - var marker *string - for { - input.Marker = marker - resp, err := s.cli.ListObjects(context.Background(), input) - if err != nil { - return nil, err - } - - for _, obj := range resp.Contents { - objs = append(objs, types.ListEntry{ - Path: *obj.Key, - Size: *obj.Size, - IsDir: false, - }) - } - - if !*resp.IsTruncated { - break - } - - marker = resp.NextMarker +func (s *BaseStore) ReadDir(path clitypes.JPath) types.DirReader { + return &DirReader{ + cli: s.cli, + bucket: s.Bucket, + rootPath: path.Clone(), } - - return objs, nil } func (s *BaseStore) CleanTemps() { @@ -162,7 +150,7 @@ func (s *BaseStore) CleanTemps() { for { resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ Bucket: aws.String(s.Bucket), - Prefix: aws.String(types.PathJoin(s.Detail.UserSpace.WorkingDir, types.TempWorkingDir, "/")), + Prefix: aws.String(JoinKey(s.Detail.UserSpace.WorkingDir.String(), types.TempWorkingDir, "/")), Marker: marker, }) @@ -206,6 +194,15 @@ func (s *BaseStore) CleanTemps() { } } +func (s *BaseStore) Test() error { + _, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ + Bucket: aws.String(s.Bucket), + Prefix: aws.String(""), + MaxKeys: aws.Int32(1), + }) + return err +} + func (s *BaseStore) getLogger() logger.Logger { return logger.WithField("BaseStore", "S3").WithField("Storage", s.Detail.UserSpace.Storage.String()) } diff --git a/common/pkgs/storage/s3/dir_reader.go b/common/pkgs/storage/s3/dir_reader.go new file mode 100644 index 0000000..d7f0c53 --- /dev/null +++ b/common/pkgs/storage/s3/dir_reader.go @@ -0,0 +1,61 @@ +package s3 + +import ( + "context" + "io" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" +) + +type DirReader struct { + cli *s3.Client + bucket string + rootPath clitypes.JPath + marker *string + curInfos []types.DirEntry + eof bool +} + +func (r *DirReader) Next() (types.DirEntry, error) { + if len(r.curInfos) > 0 { + e := r.curInfos[0] + r.curInfos = r.curInfos[1:] + return e, nil + } + if r.eof { + return types.DirEntry{}, io.EOF + } + + resp, err := r.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ + Bucket: aws.String(r.bucket), + Prefix: aws.String(r.rootPath.String()), + Marker: r.marker, + }) + if err != nil { + return types.DirEntry{}, err + } + + for _, obj := range resp.Contents { + key := clitypes.PathFromJcsPathString(*obj.Key) + + r.curInfos = append(r.curInfos, types.DirEntry{ + Path: key, + Size: *obj.Size, + ModTime: *obj.LastModified, + IsDir: false, + }) + } + + if !*resp.IsTruncated { + r.eof = true + } + r.marker = resp.NextMarker + + return r.Next() +} +func (r *DirReader) Close() { + +} diff --git a/common/pkgs/storage/s3/multipart_upload.go b/common/pkgs/storage/s3/multipart_upload.go index 614fcf6..d79281e 100644 --- a/common/pkgs/storage/s3/multipart_upload.go +++ b/common/pkgs/storage/s3/multipart_upload.go @@ -42,12 +42,14 @@ func (*Multiparter) MaxPartSize() int64 { func (m *Multiparter) Initiate(ctx context.Context) (types.MultipartTask, error) { tempFileName := os2.GenerateRandomFileName(10) - tempDir := types.PathJoin(m.detail.UserSpace.WorkingDir, types.TempWorkingDir) - tempFilePath := types.PathJoin(tempDir, tempFileName) + tempDir := m.detail.UserSpace.WorkingDir.Clone() + tempDir.Push(types.TempWorkingDir) + tempFilePath := tempDir.Clone() + tempFilePath.Push(tempFileName) resp, err := m.cli.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ Bucket: aws.String(m.bucket), - Key: aws.String(tempFilePath), + Key: aws.String(tempFilePath.String()), ChecksumAlgorithm: s3types.ChecksumAlgorithmSha256, }) if err != nil { @@ -87,9 +89,9 @@ func (m *Multiparter) UploadPart(ctx context.Context, init types.MultipartInitSt type MultipartTask struct { cli *s3.Client bucket string - tempDir string + tempDir clitypes.JPath tempFileName string - tempFilePath string + tempFilePath clitypes.JPath uploadID string } @@ -97,7 +99,7 @@ func (i *MultipartTask) InitState() types.MultipartInitState { return types.MultipartInitState{ UploadID: i.uploadID, Bucket: i.bucket, - Key: i.tempFilePath, + Key: i.tempFilePath.String(), } } @@ -120,7 +122,7 @@ func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPar _, err := i.cli.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ Bucket: aws.String(i.bucket), - Key: aws.String(i.tempFilePath), + Key: aws.String(i.tempFilePath.String()), UploadId: aws.String(i.uploadID), MultipartUpload: &s3types.CompletedMultipartUpload{ Parts: s3Parts, @@ -132,7 +134,7 @@ func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPar headResp, err := i.cli.HeadObject(ctx, &s3.HeadObjectInput{ Bucket: aws.String(i.bucket), - Key: aws.String(i.tempFilePath), + Key: aws.String(i.tempFilePath.String()), }) if err != nil { return types.FileInfo{}, err @@ -151,7 +153,7 @@ func (i *MultipartTask) JoinParts(ctx context.Context, parts []types.UploadedPar func (i *MultipartTask) Close() { i.cli.AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{ Bucket: aws.String(i.bucket), - Key: aws.String(i.tempFilePath), + Key: aws.String(i.tempFilePath.String()), UploadId: aws.String(i.uploadID), }) } diff --git a/common/pkgs/storage/s3/shard_store.go b/common/pkgs/storage/s3/shard_store.go index ab36236..12e376f 100644 --- a/common/pkgs/storage/s3/shard_store.go +++ b/common/pkgs/storage/s3/shard_store.go @@ -19,17 +19,19 @@ type ShardStoreOption struct { type ShardStore struct { Detail *clitypes.UserSpaceDetail Bucket string - workingDir string + workingDir clitypes.JPath cli *s3.Client opt ShardStoreOption lock sync.Mutex } func NewShardStore(detail *clitypes.UserSpaceDetail, cli *s3.Client, bkt string, opt ShardStoreOption) (*ShardStore, error) { + wd := detail.UserSpace.WorkingDir.Clone() + wd.Push(types.ShardStoreWorkingDir) return &ShardStore{ Detail: detail, Bucket: bkt, - workingDir: types.PathJoin(detail.UserSpace.WorkingDir, types.ShardStoreWorkingDir), + workingDir: wd, cli: cli, opt: opt, }, nil @@ -43,7 +45,7 @@ func (s *ShardStore) Stop() { s.getLogger().Infof("component stop") } -func (s *ShardStore) Store(path string, hash clitypes.FileHash, size int64) (types.FileInfo, error) { +func (s *ShardStore) Store(path clitypes.JPath, hash clitypes.FileHash, size int64) (types.FileInfo, error) { s.lock.Lock() defer s.lock.Unlock() @@ -51,13 +53,12 @@ func (s *ShardStore) Store(path string, hash clitypes.FileHash, size int64) (typ log.Debugf("write file %v finished, size: %v, hash: %v", path, size, hash) - blockDir := s.GetFileDirFromHash(hash) - newPath := types.PathJoin(blockDir, string(hash)) + newPath := s.GetFilePathFromHash(hash) _, err := s.cli.CopyObject(context.Background(), &s3.CopyObjectInput{ Bucket: aws.String(s.Bucket), - CopySource: aws.String(types.PathJoin(s.Bucket, path)), - Key: aws.String(newPath), + CopySource: aws.String(JoinKey(s.Bucket, path.String())), + Key: aws.String(newPath.String()), }) if err != nil { log.Warnf("copy file %v to %v: %v", path, newPath, err) @@ -78,7 +79,7 @@ func (s *ShardStore) Info(hash clitypes.FileHash) (types.FileInfo, error) { filePath := s.GetFilePathFromHash(hash) info, err := s.cli.HeadObject(context.TODO(), &s3.HeadObjectInput{ Bucket: aws.String(s.Bucket), - Key: aws.String(filePath), + Key: aws.String(filePath.String()), }) if err != nil { s.getLogger().Warnf("get file %v: %v", filePath, err) @@ -102,7 +103,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { for { resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ Bucket: aws.String(s.Bucket), - Prefix: aws.String(s.workingDir), + Prefix: aws.String(s.workingDir.String()), Marker: marker, }) @@ -112,7 +113,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { } for _, obj := range resp.Contents { - key := types.PathBase(*obj.Key) + key := BaseKey(*obj.Key) fileHash, err := clitypes.ParseHash(key) if err != nil { @@ -122,7 +123,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { infos = append(infos, types.FileInfo{ Hash: fileHash, Size: *obj.Size, - Path: *obj.Key, + Path: clitypes.PathFromJcsPathString(*obj.Key), }) } @@ -150,7 +151,7 @@ func (s *ShardStore) GC(avaiables []clitypes.FileHash) error { for { resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ Bucket: aws.String(s.Bucket), - Prefix: aws.String(s.workingDir), + Prefix: aws.String(s.workingDir.String()), Marker: marker, }) @@ -160,7 +161,7 @@ func (s *ShardStore) GC(avaiables []clitypes.FileHash) error { } for _, obj := range resp.Contents { - key := types.PathBase(*obj.Key) + key := BaseKey(*obj.Key) fileHash, err := clitypes.ParseHash(key) if err != nil { continue @@ -212,10 +213,15 @@ func (s *ShardStore) getLogger() logger.Logger { return logger.WithField("ShardStore", "S3").WithField("UserSpace", s.Detail) } -func (s *ShardStore) GetFileDirFromHash(hash clitypes.FileHash) string { - return types.PathJoin(s.workingDir, hash.GetHashPrefix(2)) +func (s *ShardStore) GetFileDirFromHash(hash clitypes.FileHash) clitypes.JPath { + p := s.workingDir.Clone() + p.Push(hash.GetHashPrefix(2)) + return p } -func (s *ShardStore) GetFilePathFromHash(hash clitypes.FileHash) string { - return types.PathJoin(s.workingDir, hash.GetHashPrefix(2), string(hash)) +func (s *ShardStore) GetFilePathFromHash(hash clitypes.FileHash) clitypes.JPath { + p := s.workingDir.Clone() + p.Push(hash.GetHashPrefix(2)) + p.Push(string(hash)) + return p } diff --git a/common/pkgs/storage/s3/utils.go b/common/pkgs/storage/s3/utils.go index 02fa87d..88b1f5e 100644 --- a/common/pkgs/storage/s3/utils.go +++ b/common/pkgs/storage/s3/utils.go @@ -3,6 +3,7 @@ package s3 import ( "encoding/base64" "fmt" + "path" ) func DecodeBase64Hash(hash string) ([]byte, error) { @@ -17,3 +18,11 @@ func DecodeBase64Hash(hash string) ([]byte, error) { return hashBytes, nil } + +func JoinKey(comps ...string) string { + return path.Join(comps...) +} + +func BaseKey(key string) string { + return path.Base(key) +} diff --git a/common/pkgs/storage/types/base_store.go b/common/pkgs/storage/types/base_store.go index fd6998a..41e1e32 100644 --- a/common/pkgs/storage/types/base_store.go +++ b/common/pkgs/storage/types/base_store.go @@ -3,25 +3,36 @@ package types import ( "fmt" "io" + "time" + + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" ) -type ListEntry struct { - Path string - Size int64 - IsDir bool +type DirEntry struct { + Path clitypes.JPath + Size int64 + ModTime time.Time + IsDir bool +} + +type DirReader interface { + // 读取下一个目录条目。如果没有更多条目,那么应该返回io.EOF + Next() (DirEntry, error) + Close() } type BaseStore interface { - Write(path string, stream io.Reader) (FileInfo, error) - Read(path string, opt OpenOption) (io.ReadCloser, error) + Write(path clitypes.JPath, stream io.Reader, opt WriteOption) (FileInfo, error) + Read(path clitypes.JPath, opt OpenOption) (io.ReadCloser, error) // 创建指定路径的文件夹。对于不支持空文件夹的存储系统来说,可以采用创建以/结尾的对象的方式来模拟文件夹。 - Mkdir(path string) error - // 返回指定路径下的所有文件,文件路径是包含path在内的完整路径。返回结果的第一条一定是路径本身,可能是文件,也可能是目录。 - // 如果路径不存在,那么不会返回错误,而是返回一个空列表。 - // 返回的内容严格按照存储系统的原始结果来,比如当存储系统是一个对象存储时,那么就可能不会包含目录,或者包含用于模拟的以“/”结尾的对象。 - ListAll(path string) ([]ListEntry, error) + Mkdir(path clitypes.JPath) error + // 返回指定路径下的所有文件,文件路径是包含path在内的完整路径。返回结果的第一条一定是路径本身,可能是文件,也可能是目录,路径不存在时,Next应该直接返回io.EOF。 + // Next必须按照目录的层级关系返回,但不一定要按照文件名排序。 + ReadDir(path clitypes.JPath) DirReader // 清空临时目录。只应该在此存储服务未被使用时调用 CleanTemps() + // 测试存储服务是否可用 + Test() error } type OpenOption struct { @@ -65,3 +76,8 @@ func (o *OpenOption) String() string { return fmt.Sprintf("%s:%s", rangeStart, rangeEnd) } + +type WriteOption struct { + // 文件修改时间,如果为0,则使用当前时间 + ModTime time.Time +} diff --git a/common/pkgs/storage/types/s2s.go b/common/pkgs/storage/types/s2s.go index 12d0230..ec4c704 100644 --- a/common/pkgs/storage/types/s2s.go +++ b/common/pkgs/storage/types/s2s.go @@ -10,6 +10,6 @@ type S2STransfer interface { // 【静态方法】判断是否能从指定的源存储中直传到当前存储的目的路径。仅在生成计划时使用 CanTransfer(src, dst *clitypes.UserSpaceDetail) bool // 从远端获取文件并保存到本地路径 - Transfer(ctx context.Context, src *clitypes.UserSpaceDetail, srcPath string, dstPath string) (FileInfo, error) + Transfer(ctx context.Context, src *clitypes.UserSpaceDetail, srcPath clitypes.JPath, dstPath clitypes.JPath) (FileInfo, error) Close() } diff --git a/common/pkgs/storage/types/shard_store.go b/common/pkgs/storage/types/shard_store.go index 2a0f902..b7ff90c 100644 --- a/common/pkgs/storage/types/shard_store.go +++ b/common/pkgs/storage/types/shard_store.go @@ -10,7 +10,7 @@ type ShardStore interface { Start(ch *StorageEventChan) Stop() // 将存储系统中已有的文件作为分片纳入管理范围 - Store(path string, hash clitypes.FileHash, size int64) (FileInfo, error) + Store(path clitypes.JPath, hash clitypes.FileHash, size int64) (FileInfo, error) // 获得指定文件信息 Info(fileHash clitypes.FileHash) (FileInfo, error) // 获取所有文件信息,尽量保证操作是原子的 diff --git a/common/pkgs/storage/types/types.go b/common/pkgs/storage/types/types.go index 8a80304..0458dbb 100644 --- a/common/pkgs/storage/types/types.go +++ b/common/pkgs/storage/types/types.go @@ -43,7 +43,7 @@ type FeatureDesc struct{} type FileInfo struct { // 分片在存储系统中的路径,可以通过BaseStore读取的 - Path string + Path clitypes.JPath // 文件大小 Size int64 // 分片的哈希值,不一定有值,根据来源不同,可能为空 diff --git a/common/pkgs/storage/types/utils.go b/common/pkgs/storage/types/utils.go index a6cea72..bd51c3b 100644 --- a/common/pkgs/storage/types/utils.go +++ b/common/pkgs/storage/types/utils.go @@ -1,8 +1,6 @@ package types import ( - "path" - clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) @@ -19,15 +17,9 @@ func FindFeature[T cortypes.StorageFeature](detail *clitypes.UserSpaceDetail) T return def } -func PathJoin(comps ...string) string { - return path.Join(comps...) -} - -func PathBase(p string) string { - return path.Base(p) -} - -func MakeTempDirPath(detail *clitypes.UserSpaceDetail, comps ...string) string { - cs := append([]string{detail.UserSpace.WorkingDir, TempWorkingDir}, comps...) - return PathJoin(cs...) +func MakeTempDirPath(detail *clitypes.UserSpaceDetail, comps ...string) clitypes.JPath { + p := detail.UserSpace.WorkingDir.Clone() + p.Push(TempWorkingDir) + p.ConcatComps(comps) + return p }