From 06f18a2ede35bb87eaa47d7f372e250adbdefe8f Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 13 Nov 2024 10:46:42 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=BE=E8=AE=A1=E5=85=B1=E4=BA=AB=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E7=9B=B8=E5=85=B3=E6=8E=A5=E5=8F=A3=EF=BC=9B=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E5=88=86=E7=89=87=E5=AD=98=E5=82=A8=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/mq/cache.go | 2 +- agent/internal/mq/storage.go | 228 +++++------------ agent/internal/task/cache_move_package.go | 9 +- agent/internal/task/storage_load_package.go | 39 +-- common/models/models.go | 5 + common/pkgs/ioswitch2/ops2/shard_store.go | 10 +- common/pkgs/ioswitchlrc/ops2/shard_store.go | 10 +- common/pkgs/mq/agent/storage.go | 9 +- common/pkgs/storage/local/shard_store.go | 234 ++++++++++++------ common/pkgs/storage/local/shared_store.go | 225 +++++++++++++++++ common/pkgs/storage/local/writer.go | 66 ----- common/pkgs/storage/mgr/create_sharedstore.go | 19 +- common/pkgs/storage/types/shard_store.go | 16 +- common/pkgs/storage/types/shared_store.go | 13 + common/pkgs/storage/utils/utils.go | 28 +-- common/utils/utils.go | 7 - 16 files changed, 498 insertions(+), 422 deletions(-) create mode 100644 common/pkgs/storage/local/shared_store.go delete mode 100644 common/pkgs/storage/local/writer.go diff --git a/agent/internal/mq/cache.go b/agent/internal/mq/cache.go index ea85a46..19fe06d 100644 --- a/agent/internal/mq/cache.go +++ b/agent/internal/mq/cache.go @@ -36,7 +36,7 @@ func (svc *Service) CacheGC(msg *agtmq.CacheGC) (*agtmq.CacheGCResp, *mq.CodeMes return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of storage %v: %v", msg.StorageID, err)) } - err = store.Purge(msg.Avaiables) + err = store.GC(msg.Avaiables) if err != nil { return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("purging cache: %v", err)) } diff --git a/agent/internal/mq/storage.go b/agent/internal/mq/storage.go index 4340c96..dfd4161 100644 --- a/agent/internal/mq/storage.go +++ b/agent/internal/mq/storage.go @@ -1,26 +1,15 @@ package mq import ( - "fmt" - "io/fs" - "os" - "path/filepath" - "strconv" "time" - "github.com/samber/lo" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" mytask "gitlink.org.cn/cloudream/storage/agent/internal/task" - "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" + stgmod "gitlink.org.cn/cloudream/storage/common/models" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" - "gitlink.org.cn/cloudream/storage/common/utils" ) func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) (*agtmq.StartStorageLoadPackageResp, *mq.CodeMessage) { @@ -68,70 +57,21 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { - return mq.ReplyOK(agtmq.NewStorageCheckResp( - err.Error(), - nil, - )) + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } defer stgglb.CoordinatorMQPool.Release(coorCli) - // TODO UserID。应该设计两种接口,一种需要UserID,一种不需要。 - getStg, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{msg.StorageID})) + shared, err := svc.stgMgr.GetSharedStore(msg.StorageID) if err != nil { - return mq.ReplyOK(agtmq.NewStorageCheckResp( - err.Error(), - nil, - )) - } - if getStg.Storages[0] == nil { - return nil, mq.Failed(errorcode.OperationFailed, "storage not found") - } - if getStg.Storages[0].Shared == nil { - return nil, mq.Failed(errorcode.OperationFailed, "storage has no shared storage") + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } - entries, err := os.ReadDir(utils.MakeStorageLoadDirectory(getStg.Storages[0].Shared.LoadBase)) + loaded, err := shared.ListLoadedPackages() if err != nil { - logger.Warnf("list storage directory failed, err: %s", err.Error()) - return mq.ReplyOK(agtmq.NewStorageCheckResp( - err.Error(), - nil, - )) - } - - var stgPkgs []model.StoragePackage - - userDirs := lo.Filter(entries, func(info fs.DirEntry, index int) bool { return info.IsDir() }) - for _, dir := range userDirs { - userIDInt, err := strconv.ParseInt(dir.Name(), 10, 64) - if err != nil { - logger.Warnf("parsing user id %s: %s", dir.Name(), err.Error()) - continue - } - - pkgDir := filepath.Join(utils.MakeStorageLoadDirectory(getStg.Storages[0].Shared.LoadBase), dir.Name()) - pkgDirs, err := os.ReadDir(pkgDir) - if err != nil { - logger.Warnf("reading package dir %s: %s", pkgDir, err.Error()) - continue - } - - for _, pkg := range pkgDirs { - pkgIDInt, err := strconv.ParseInt(pkg.Name(), 10, 64) - if err != nil { - logger.Warnf("parsing package dir %s: %s", pkg.Name(), err.Error()) - continue - } - - stgPkgs = append(stgPkgs, model.StoragePackage{ - StorageID: msg.StorageID, - PackageID: cdssdk.PackageID(pkgIDInt), - UserID: cdssdk.UserID(userIDInt), - }) - } + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } - return mq.ReplyOK(agtmq.NewStorageCheckResp(consts.StorageDirectoryStateOK, stgPkgs)) + return mq.ReplyOK(agtmq.NewStorageCheckResp(loaded)) } func (svc *Service) StorageGC(msg *agtmq.StorageGC) (*agtmq.StorageGCResp, *mq.CodeMessage) { @@ -141,121 +81,71 @@ func (svc *Service) StorageGC(msg *agtmq.StorageGC) (*agtmq.StorageGCResp, *mq.C } defer stgglb.CoordinatorMQPool.Release(coorCli) - // TODO UserID。应该设计两种接口,一种需要UserID,一种不需要。 - getStg, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{msg.StorageID})) + shared, err := svc.stgMgr.GetSharedStore(msg.StorageID) if err != nil { return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } - if getStg.Storages[0] == nil { - return nil, mq.Failed(errorcode.OperationFailed, "storage not found") - } - if getStg.Storages[0].Shared == nil { - return nil, mq.Failed(errorcode.OperationFailed, "storage has no shared storage") - } - entries, err := os.ReadDir(utils.MakeStorageLoadDirectory(getStg.Storages[0].Shared.LoadBase)) - if err != nil { - logger.Warnf("list storage directory failed, err: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "list directory files failed") - } - - // userID->pkgID->pkg - userPkgs := make(map[string]map[string]bool) + var loadeds []stgmod.LoadedPackageID for _, pkg := range msg.Packages { - userIDStr := fmt.Sprintf("%d", pkg.UserID) - - pkgs, ok := userPkgs[userIDStr] - if !ok { - pkgs = make(map[string]bool) - userPkgs[userIDStr] = pkgs - } - - pkgIDStr := fmt.Sprintf("%d", pkg.PackageID) - pkgs[pkgIDStr] = true + loadeds = append(loadeds, stgmod.LoadedPackageID{ + UserID: pkg.UserID, + PackageID: pkg.PackageID, + }) } - userDirs := lo.Filter(entries, func(info fs.DirEntry, index int) bool { return info.IsDir() }) - for _, dir := range userDirs { - pkgMap, ok := userPkgs[dir.Name()] - // 第一级目录名是UserID,先删除UserID在StoragePackage表里没出现过的文件夹 - if !ok { - rmPath := filepath.Join(utils.MakeStorageLoadDirectory(getStg.Storages[0].Shared.LoadBase), dir.Name()) - err := os.RemoveAll(rmPath) - if err != nil { - logger.Warnf("removing user dir %s: %s", rmPath, err.Error()) - } else { - logger.Debugf("user dir %s removed by gc", rmPath) - } - continue - } - - pkgDir := filepath.Join(utils.MakeStorageLoadDirectory(getStg.Storages[0].Shared.LoadBase), dir.Name()) - // 遍历每个UserID目录的packages目录里的内容 - pkgs, err := os.ReadDir(pkgDir) - if err != nil { - logger.Warnf("reading package dir %s: %s", pkgDir, err.Error()) - continue - } - - for _, pkg := range pkgs { - if !pkgMap[pkg.Name()] { - rmPath := filepath.Join(pkgDir, pkg.Name()) - err := os.RemoveAll(rmPath) - if err != nil { - logger.Warnf("removing package dir %s: %s", rmPath, err.Error()) - } else { - logger.Debugf("package dir %s removed by gc", rmPath) - } - } - } + err = shared.PackageGC(loadeds) + if err != nil { + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } return mq.ReplyOK(agtmq.RespStorageGC()) } func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePackage) (*agtmq.StartStorageCreatePackageResp, *mq.CodeMessage) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - logger.Warnf("new coordinator client: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - getStg, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{msg.StorageID})) - if err != nil { - return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - } - if getStg.Storages[0] == nil { - return nil, mq.Failed(errorcode.OperationFailed, "storage not found") - } - if getStg.Storages[0].Shared == nil { - return nil, mq.Failed(errorcode.OperationFailed, "storage has no shared storage") - } - - fullPath := filepath.Clean(filepath.Join(getStg.Storages[0].Shared.LoadBase, msg.Path)) - - var uploadFilePathes []string - err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error { - if err != nil { - return nil - } - - if !fi.IsDir() { - uploadFilePathes = append(uploadFilePathes, fname) - } - - return nil - }) - if err != nil { - logger.Warnf("opening directory %s: %s", fullPath, err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "read directory failed") - } - - objIter := iterator.NewUploadingObjectIterator(fullPath, uploadFilePathes) - tsk := svc.taskManager.StartNew(mytask.NewCreatePackage(msg.UserID, msg.BucketID, msg.Name, objIter, msg.StorageAffinity)) - return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) + return nil, mq.Failed(errorcode.OperationFailed, "not implemented") + // coorCli, err := stgglb.CoordinatorMQPool.Acquire() + // if err != nil { + // logger.Warnf("new coordinator client: %s", err.Error()) + + // return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") + // } + // defer stgglb.CoordinatorMQPool.Release(coorCli) + + // getStg, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{msg.StorageID})) + // if err != nil { + // return nil, mq.Failed(errorcode.OperationFailed, err.Error()) + // } + // if getStg.Storages[0] == nil { + // return nil, mq.Failed(errorcode.OperationFailed, "storage not found") + // } + // if getStg.Storages[0].Shared == nil { + // return nil, mq.Failed(errorcode.OperationFailed, "storage has no shared storage") + // } + + // fullPath := filepath.Clean(filepath.Join(getStg.Storages[0].Shared.LoadBase, msg.Path)) + + // var uploadFilePathes []string + // err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error { + // if err != nil { + // return nil + // } + + // if !fi.IsDir() { + // uploadFilePathes = append(uploadFilePathes, fname) + // } + + // return nil + // }) + // if err != nil { + // logger.Warnf("opening directory %s: %s", fullPath, err.Error()) + + // return nil, mq.Failed(errorcode.OperationFailed, "read directory failed") + // } + + // objIter := iterator.NewUploadingObjectIterator(fullPath, uploadFilePathes) + // tsk := svc.taskManager.StartNew(mytask.NewCreatePackage(msg.UserID, msg.BucketID, msg.Name, objIter, msg.StorageAffinity)) + // return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) } func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage) (*agtmq.WaitStorageCreatePackageResp, *mq.CodeMessage) { diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index 74c1b2f..a6f51f8 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -2,7 +2,6 @@ package task import ( "fmt" - "io" "time" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -74,16 +73,10 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { } defer obj.File.Close() - writer := store.New() - _, err = io.Copy(writer, obj.File) + _, err = store.Create(obj.File) if err != nil { - writer.Abort() return fmt.Errorf("writing to store: %w", err) } - _, err = writer.Finish() - if err != nil { - return fmt.Errorf("finishing store: %w", err) - } ctx.accessStat.AddAccessCounter(obj.Object.ObjectID, t.packageID, t.storageID, 1) } diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go index af4dd65..02436d0 100644 --- a/agent/internal/task/storage_load_package.go +++ b/agent/internal/task/storage_load_package.go @@ -4,8 +4,6 @@ import ( "fmt" "io" "math" - "os" - "path/filepath" "time" "github.com/samber/lo" @@ -23,7 +21,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/ec" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" - "gitlink.org.cn/cloudream/storage/common/utils" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils" ) type StorageLoadPackage struct { @@ -71,23 +69,11 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e } defer stgglb.CoordinatorMQPool.Release(coorCli) - getStgResp, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{t.storageID})) + shared, err := ctx.stgMgr.GetSharedStore(t.storageID) if err != nil { - return fmt.Errorf("request to coordinator: %w", err) + return fmt.Errorf("get shared store of storage %v: %w", t.storageID, err) } - if getStgResp.Storages[0] == nil { - return fmt.Errorf("storage not found") - } - if getStgResp.Storages[0].Shared == nil { - return fmt.Errorf("storage has shared storage") - } - t.PackagePath = utils.MakeLoadedPackagePath(t.userID, t.packageID) - fullLocalPath := filepath.Join(getStgResp.Storages[0].Shared.LoadBase, t.PackagePath) - - if err = os.MkdirAll(fullLocalPath, 0755); err != nil { - return fmt.Errorf("creating output directory: %w", err) - } getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.packageID)) if err != nil { @@ -113,7 +99,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e defer mutex.Unlock() for _, obj := range getObjectDetails.Objects { - err := t.downloadOne(coorCli, shardstore, fullLocalPath, obj) + err := t.downloadOne(coorCli, shardstore, shared, obj) if err != nil { return err } @@ -129,7 +115,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e return err } -func (t *StorageLoadPackage) downloadOne(coorCli *coormq.Client, shardStore types.ShardStore, dir string, obj stgmod.ObjectDetail) error { +func (t *StorageLoadPackage) downloadOne(coorCli *coormq.Client, shardStore types.ShardStore, shared types.SharedStore, obj stgmod.ObjectDetail) error { var file io.ReadCloser switch red := obj.Object.Redundancy.(type) { @@ -160,20 +146,7 @@ func (t *StorageLoadPackage) downloadOne(coorCli *coormq.Client, shardStore type } defer file.Close() - fullPath := filepath.Join(dir, obj.Object.Path) - - lastDirPath := filepath.Dir(fullPath) - if err := os.MkdirAll(lastDirPath, 0755); err != nil { - return fmt.Errorf("creating object last dir: %w", err) - } - - outputFile, err := os.Create(fullPath) - if err != nil { - return fmt.Errorf("creating object file: %w", err) - } - defer outputFile.Close() - - if _, err := io.Copy(outputFile, file); err != nil { + if _, err := shared.WritePackageObject(t.userID, t.packageID, obj.Object.Path, file); err != nil { return fmt.Errorf("writting object to file: %w", err) } diff --git a/common/models/models.go b/common/models/models.go index 988ae38..0031093 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -146,3 +146,8 @@ const ( AliCloud = "AliCloud" SugonCloud = "SugonCloud" ) + +type LoadedPackageID struct { + UserID cdssdk.UserID + PackageID cdssdk.PackageID +} diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index 90d4329..0253138 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -98,19 +98,11 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } defer input.Stream.Close() - writer := store.New() - defer writer.Abort() - - _, err = io.Copy(writer, input.Stream) + fileInfo, err := store.Create(input.Stream) if err != nil { return fmt.Errorf("writing file to shard store: %w", err) } - fileInfo, err := writer.Finish() - if err != nil { - return fmt.Errorf("finishing writing file to shard store: %w", err) - } - e.PutVar(o.FileHash, &FileHashValue{ Hash: fileInfo.Hash, }) diff --git a/common/pkgs/ioswitchlrc/ops2/shard_store.go b/common/pkgs/ioswitchlrc/ops2/shard_store.go index f15fae0..3f22f50 100644 --- a/common/pkgs/ioswitchlrc/ops2/shard_store.go +++ b/common/pkgs/ioswitchlrc/ops2/shard_store.go @@ -98,15 +98,7 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } defer input.Stream.Close() - writer := store.New() - defer writer.Abort() - - _, err = io.Copy(writer, input.Stream) - if err != nil { - return fmt.Errorf("writing file to shard store: %w", err) - } - - fileInfo, err := writer.Finish() + fileInfo, err := store.Create(input.Stream) if err != nil { return fmt.Errorf("finishing writing file to shard store: %w", err) } diff --git a/common/pkgs/mq/agent/storage.go b/common/pkgs/mq/agent/storage.go index 8fb186b..8fac030 100644 --- a/common/pkgs/mq/agent/storage.go +++ b/common/pkgs/mq/agent/storage.go @@ -4,6 +4,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" ) @@ -96,8 +97,7 @@ type StorageCheck struct { } type StorageCheckResp struct { mq.MessageBodyBase - DirectoryState string `json:"directoryState"` - Packages []model.StoragePackage `json:"packages"` + Packages []stgmod.LoadedPackageID `json:"packages"` } func NewStorageCheck(storageID cdssdk.StorageID) *StorageCheck { @@ -105,10 +105,9 @@ func NewStorageCheck(storageID cdssdk.StorageID) *StorageCheck { StorageID: storageID, } } -func NewStorageCheckResp(dirState string, packages []model.StoragePackage) *StorageCheckResp { +func NewStorageCheckResp(packages []stgmod.LoadedPackageID) *StorageCheckResp { return &StorageCheckResp{ - DirectoryState: dirState, - Packages: packages, + Packages: packages, } } func (client *Client) StorageCheck(msg *StorageCheck, opts ...mq.RequestOption) (*StorageCheckResp, error) { diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index ee56b03..557ed6f 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -2,12 +2,14 @@ package local import ( "crypto/sha256" + "encoding/hex" "errors" "fmt" "io" "io/fs" "os" "path/filepath" + "strings" "sync" "time" @@ -15,13 +17,11 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils" ) const ( TempDir = "tmp" BlocksDir = "blocks" - SvcName = "LocalShardStore" ) type ShardStore struct { @@ -47,10 +47,12 @@ func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStor } func (s *ShardStore) Start(ch *types.StorageEventChan) { - s.getLogger().Infof("local shard store start, root: %v, max size: %v", s.cfg.Root, s.cfg.MaxSize) + s.getLogger().Infof("component start, root: %v, max size: %v", s.cfg.Root, s.cfg.MaxSize) go func() { removeTempTicker := time.NewTicker(time.Minute * 10) + defer removeTempTicker.Stop() + for { select { case <-removeTempTicker.C: @@ -83,18 +85,24 @@ func (s *ShardStore) removeUnusedTempFiles() { continue } + info, err := entry.Info() + if err != nil { + log.Warnf("get temp file %v info: %v", entry.Name(), err) + continue + } + path := filepath.Join(s.cfg.Root, TempDir, entry.Name()) err = os.Remove(path) if err != nil { log.Warnf("remove temp file %v: %v", path, err) } else { - log.Infof("remove unused temp file %v", path) + log.Infof("remove unused temp file %v, size: %v, last mod time: %v", path, info.Size(), info.ModTime()) } } } func (s *ShardStore) Stop() { - s.getLogger().Infof("local shard store stop") + s.getLogger().Infof("component stop") select { case s.done <- nil: @@ -102,7 +110,23 @@ func (s *ShardStore) Stop() { } } -func (s *ShardStore) New() types.ShardWriter { +func (s *ShardStore) Create(stream io.Reader) (types.FileInfo, error) { + file, err := s.createTempFile() + if err != nil { + return types.FileInfo{}, err + } + + size, hash, err := s.writeTempFile(file, stream) + if err != nil { + // Name是文件完整路径 + s.onCreateFailed(file.Name()) + return types.FileInfo{}, err + } + + return s.onCreateFinished(file.Name(), size, hash) +} + +func (s *ShardStore) createTempFile() (*os.File, error) { s.lock.Lock() defer s.lock.Unlock() @@ -110,25 +134,107 @@ func (s *ShardStore) New() types.ShardWriter { err := os.MkdirAll(tmpDir, 0755) if err != nil { - return utils.ErrorShardWriter(err) + s.lock.Unlock() + return nil, err } file, err := os.CreateTemp(tmpDir, "tmp-*") if err != nil { - return utils.ErrorShardWriter(err) + s.lock.Unlock() + return nil, err } s.workingTempFiles[filepath.Base(file.Name())] = true - return &ShardWriter{ - path: file.Name(), // file.Name 包含tmpDir路径 - file: file, - hasher: sha256.New(), - owner: s, + return file, nil +} + +func (s *ShardStore) writeTempFile(file *os.File, stream io.Reader) (int64, cdssdk.FileHash, error) { + buf := make([]byte, 32*1024) + size := int64(0) + + hasher := sha256.New() + for { + n, err := stream.Read(buf) + if n > 0 { + size += int64(n) + io2.WriteAll(hasher, buf[:n]) + err := io2.WriteAll(file, buf[:n]) + if err != nil { + return 0, "", err + } + } + if err == io.EOF { + break + } + if err != nil { + return 0, "", err + } } + + h := hasher.Sum(nil) + return size, cdssdk.FileHash(strings.ToUpper(hex.EncodeToString(h))), nil } -// 使用F函数创建Option对象 +func (s *ShardStore) onCreateFinished(tempFilePath string, size int64, hash cdssdk.FileHash) (types.FileInfo, error) { + s.lock.Lock() + defer s.lock.Unlock() + defer delete(s.workingTempFiles, filepath.Base(tempFilePath)) + + log := s.getLogger() + + log.Debugf("write file %v finished, size: %v, hash: %v", tempFilePath, size, hash) + + blockDir := s.getFileDirFromHash(hash) + err := os.MkdirAll(blockDir, 0755) + if err != nil { + s.removeTempFile(tempFilePath) + log.Warnf("make block dir %v: %v", blockDir, err) + return types.FileInfo{}, fmt.Errorf("making block dir: %w", err) + } + + newPath := filepath.Join(blockDir, string(hash)) + _, err = os.Stat(newPath) + if os.IsNotExist(err) { + err = os.Rename(tempFilePath, newPath) + if err != nil { + s.removeTempFile(tempFilePath) + log.Warnf("rename %v to %v: %v", tempFilePath, newPath, err) + return types.FileInfo{}, fmt.Errorf("rename file: %w", err) + } + + } else if err != nil { + s.removeTempFile(tempFilePath) + log.Warnf("get file %v stat: %v", newPath, err) + return types.FileInfo{}, fmt.Errorf("get file stat: %w", err) + } else { + s.removeTempFile(tempFilePath) + } + + return types.FileInfo{ + Hash: hash, + Size: size, + Description: tempFilePath, + }, nil +} + +func (s *ShardStore) onCreateFailed(tempFilePath string) { + s.lock.Lock() + defer s.lock.Unlock() + + s.getLogger().Debugf("writting file %v aborted", tempFilePath) + s.removeTempFile(tempFilePath) + delete(s.workingTempFiles, filepath.Base(tempFilePath)) +} + +func (s *ShardStore) removeTempFile(path string) { + err := os.Remove(path) + if err != nil { + s.getLogger().Warnf("removing temp file %v: %v", path, err) + } +} + +// 使用NewOpen函数创建Option对象 func (s *ShardStore) Open(opt types.OpenOption) (io.ReadCloser, error) { s.lock.Lock() defer s.lock.Unlock() @@ -212,22 +318,46 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { return infos, nil } -func (s *ShardStore) Purge(removes []cdssdk.FileHash) error { +func (s *ShardStore) GC(avaiables []cdssdk.FileHash) error { s.lock.Lock() defer s.lock.Unlock() + avais := make(map[cdssdk.FileHash]bool) + for _, hash := range avaiables { + avais[hash] = true + } + cnt := 0 - for _, hash := range removes { - fileName := string(hash) + blockDir := filepath.Join(s.cfg.Root, BlocksDir) + err := filepath.WalkDir(blockDir, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } - path := filepath.Join(s.cfg.Root, BlocksDir, fileName[:2], fileName) - err := os.Remove(path) + if d.IsDir() { + return nil + } + + info, err := d.Info() if err != nil { - s.getLogger().Warnf("remove file %v: %v", path, err) - } else { - cnt++ + return err } + + fileHash := cdssdk.FileHash(filepath.Base(info.Name())) + if !avais[fileHash] { + err = os.Remove(path) + if err != nil { + s.getLogger().Warnf("remove file %v: %v", path, err) + } else { + cnt++ + } + } + + return nil + }) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return err } s.getLogger().Infof("purge %d files", cnt) @@ -243,66 +373,8 @@ func (s *ShardStore) Stats() types.Stats { } } -func (s *ShardStore) onWritterAbort(w *ShardWriter) { - s.lock.Lock() - defer s.lock.Unlock() - - s.getLogger().Debugf("writting file %v aborted", w.path) - s.removeTempFile(w.path) - delete(s.workingTempFiles, filepath.Base(w.path)) -} - -func (s *ShardStore) onWritterFinish(w *ShardWriter, hash cdssdk.FileHash) (types.FileInfo, error) { - s.lock.Lock() - defer s.lock.Unlock() - defer delete(s.workingTempFiles, filepath.Base(w.path)) - - log := s.getLogger() - - log.Debugf("write file %v finished, size: %v, hash: %v", w.path, w.size, hash) - - blockDir := s.getFileDirFromHash(hash) - err := os.MkdirAll(blockDir, 0755) - if err != nil { - s.removeTempFile(w.path) - log.Warnf("make block dir %v: %v", blockDir, err) - return types.FileInfo{}, fmt.Errorf("making block dir: %w", err) - } - - newPath := filepath.Join(blockDir, string(hash)) - _, err = os.Stat(newPath) - if os.IsNotExist(err) { - err = os.Rename(w.path, newPath) - if err != nil { - s.removeTempFile(w.path) - log.Warnf("rename %v to %v: %v", w.path, newPath, err) - return types.FileInfo{}, fmt.Errorf("rename file: %w", err) - } - - } else if err != nil { - s.removeTempFile(w.path) - log.Warnf("get file %v stat: %v", newPath, err) - return types.FileInfo{}, fmt.Errorf("get file stat: %w", err) - } else { - s.removeTempFile(w.path) - } - - return types.FileInfo{ - Hash: hash, - Size: w.size, - Description: w.path, - }, nil -} - -func (s *ShardStore) removeTempFile(path string) { - err := os.Remove(path) - if err != nil { - s.getLogger().Warnf("removing temp file %v: %v", path, err) - } -} - func (s *ShardStore) getLogger() logger.Logger { - return logger.WithField("S", SvcName).WithField("Storage", s.stg.String()) + return logger.WithField("ShardStore", "Local").WithField("Storage", s.stg.String()) } func (s *ShardStore) getFileDirFromHash(hash cdssdk.FileHash) string { diff --git a/common/pkgs/storage/local/shared_store.go b/common/pkgs/storage/local/shared_store.go new file mode 100644 index 0000000..b285ab4 --- /dev/null +++ b/common/pkgs/storage/local/shared_store.go @@ -0,0 +1,225 @@ +package local + +import ( + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "strconv" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +type SharedStore struct { + stg cdssdk.Storage + cfg cdssdk.LocalSharedStorage + // lock sync.Mutex +} + +func NewSharedStore(stg cdssdk.Storage, cfg cdssdk.LocalSharedStorage) (*SharedStore, error) { + _, ok := stg.Address.(*cdssdk.LocalStorageAddress) + if !ok { + return nil, fmt.Errorf("storage address(%T) is not local", stg) + } + + return &SharedStore{ + stg: stg, + cfg: cfg, + }, nil +} + +func (s *SharedStore) Start(ch *types.StorageEventChan) { + s.getLogger().Infof("component start, LoadBase: %v", s.cfg.LoadBase) +} + +func (s *SharedStore) Stop() { + s.getLogger().Infof("component stop") +} + +func (s *SharedStore) WritePackageObject(userID cdssdk.UserID, pkgID cdssdk.PackageID, path string, stream io.Reader) (string, error) { + relaPath := filepath.Join(fmt.Sprintf("%v", userID), fmt.Sprintf("%v", pkgID), path) + fullPath := filepath.Join(s.cfg.LoadBase, relaPath) + err := os.MkdirAll(filepath.Dir(fullPath), 0755) + if err != nil { + return "", err + } + + f, err := os.Create(fullPath) + if err != nil { + return "", err + } + defer f.Close() + + _, err = io.Copy(f, stream) + if err != nil { + return "", err + } + + return filepath.ToSlash(relaPath), nil +} + +func (s *SharedStore) ListLoadedPackages() ([]stgmod.LoadedPackageID, error) { + entries, err := os.ReadDir(s.cfg.LoadBase) + if os.IsNotExist(err) { + return nil, nil + } + if err != nil { + s.getLogger().Warnf("list package directory: %v", err) + return nil, err + } + + var loadeds []stgmod.LoadedPackageID + for _, e := range entries { + if !e.IsDir() { + continue + } + + uid, err := strconv.ParseInt(e.Name(), 10, 64) + if err != nil { + continue + } + + userID := cdssdk.UserID(uid) + pkgs, err := s.listUserPackages(userID, fmt.Sprintf("%v", userID)) + if err != nil { + continue + } + + loadeds = append(loadeds, pkgs...) + } + + return loadeds, nil +} + +func (s *SharedStore) listUserPackages(userID cdssdk.UserID, userIDStr string) ([]stgmod.LoadedPackageID, error) { + userDir := filepath.Join(s.cfg.LoadBase, userIDStr) + entries, err := os.ReadDir(userDir) + if os.IsNotExist(err) { + return nil, nil + } + if err != nil { + s.getLogger().Warnf("list package directory: %v", err) + return nil, err + } + + var pkgs []stgmod.LoadedPackageID + for _, e := range entries { + if !e.IsDir() { + continue + } + + pkgID, err := strconv.ParseInt(e.Name(), 10, 64) + if err != nil { + continue + } + + pkgs = append(pkgs, stgmod.LoadedPackageID{ + UserID: userID, + PackageID: cdssdk.PackageID(pkgID), + }) + } + + return pkgs, nil +} + +func (s *SharedStore) PackageGC(avaiables []stgmod.LoadedPackageID) error { + log := s.getLogger() + + entries, err := os.ReadDir(s.cfg.LoadBase) + if err != nil { + log.Warnf("list storage directory: %s", err.Error()) + return err + } + + // userID->pkgID->pkg + userPkgs := make(map[string]map[string]bool) + for _, pkg := range avaiables { + userIDStr := fmt.Sprintf("%v", pkg.UserID) + + pkgs, ok := userPkgs[userIDStr] + if !ok { + pkgs = make(map[string]bool) + userPkgs[userIDStr] = pkgs + } + + pkgIDStr := fmt.Sprintf("%v", pkg.PackageID) + pkgs[pkgIDStr] = true + } + + userDirs := lo.Filter(entries, func(info fs.DirEntry, index int) bool { return info.IsDir() }) + for _, dir := range userDirs { + pkgMap, ok := userPkgs[dir.Name()] + // 第一级目录名是UserID,先删除UserID在StoragePackage表里没出现过的文件夹 + if !ok { + rmPath := filepath.Join(s.cfg.LoadBase, dir.Name()) + err := os.RemoveAll(rmPath) + if err != nil { + log.Warnf("removing user dir %s: %s", rmPath, err.Error()) + } else { + log.Debugf("user dir %s removed by gc", rmPath) + } + continue + } + + pkgDir := filepath.Join(s.cfg.LoadBase, dir.Name()) + // 遍历每个UserID目录的packages目录里的内容 + pkgs, err := os.ReadDir(pkgDir) + if err != nil { + log.Warnf("reading package dir %s: %s", pkgDir, err.Error()) + continue + } + + for _, pkg := range pkgs { + if !pkgMap[pkg.Name()] { + rmPath := filepath.Join(pkgDir, pkg.Name()) + err := os.RemoveAll(rmPath) + if err != nil { + log.Warnf("removing package dir %s: %s", rmPath, err.Error()) + } else { + log.Debugf("package dir %s removed by gc", rmPath) + } + } + } + } + + return nil +} + +func (s *SharedStore) getLogger() logger.Logger { + return logger.WithField("SharedStore", "Local").WithField("Storage", s.stg.String()) +} + +type PackageWriter struct { + pkgRoot string + fullDirPath string +} + +func (w *PackageWriter) Root() string { + return w.pkgRoot +} + +func (w *PackageWriter) Write(path string, stream io.Reader) (string, error) { + fullFilePath := filepath.Join(w.fullDirPath, path) + err := os.MkdirAll(filepath.Dir(fullFilePath), 0755) + if err != nil { + return "", err + } + + f, err := os.Create(fullFilePath) + if err != nil { + return "", err + } + defer f.Close() + + _, err = io.Copy(f, stream) + if err != nil { + return "", err + } + + return filepath.ToSlash(filepath.Join(w.pkgRoot, path)), nil +} diff --git a/common/pkgs/storage/local/writer.go b/common/pkgs/storage/local/writer.go deleted file mode 100644 index 8994ae8..0000000 --- a/common/pkgs/storage/local/writer.go +++ /dev/null @@ -1,66 +0,0 @@ -package local - -import ( - "encoding/hex" - "fmt" - "hash" - "os" - "strings" - - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" -) - -type ShardWriter struct { - path string - file *os.File - hasher hash.Hash - size int64 - closed bool - owner *ShardStore -} - -func (w *ShardWriter) Write(data []byte) (int, error) { - n, err := w.file.Write(data) - if err != nil { - return 0, err - } - - w.hasher.Write(data[:n]) - w.size += int64(n) - return n, nil -} - -// 取消写入 -func (w *ShardWriter) Abort() error { - if w.closed { - return nil - } - w.closed = true - - err := w.file.Close() - w.owner.onWritterAbort(w) - return err -} - -// 结束写入,获得文件哈希值 -func (w *ShardWriter) Finish() (types.FileInfo, error) { - if w.closed { - return types.FileInfo{}, fmt.Errorf("stream closed") - } - w.closed = true - - err := w.file.Close() - if err != nil { - w.owner.onWritterAbort(w) - return types.FileInfo{}, err - } - - sum := w.hasher.Sum(nil) - info, err := w.owner.onWritterFinish(w, cdssdk.FileHash(strings.ToUpper(hex.EncodeToString(sum)))) - if err != nil { - // 无需再调onWritterAbort, onWritterFinish会处理 - return types.FileInfo{}, err - } - return info, nil -} diff --git a/common/pkgs/storage/mgr/create_sharedstore.go b/common/pkgs/storage/mgr/create_sharedstore.go index 670af06..b5dca09 100644 --- a/common/pkgs/storage/mgr/create_sharedstore.go +++ b/common/pkgs/storage/mgr/create_sharedstore.go @@ -1,10 +1,27 @@ package mgr import ( + "fmt" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/local" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) func createSharedStore(detail stgmod.StorageDetail, ch *types.StorageEventChan, stg *storage) error { - return nil + switch confg := detail.Storage.SharedStore.(type) { + case *cdssdk.LocalSharedStorage: + store, err := local.NewSharedStore(detail.Storage, *confg) + if err != nil { + return fmt.Errorf("new local shard store: %v", err) + } + + store.Start(ch) + stg.Shared = store + return nil + + default: + return fmt.Errorf("unsupported shard store type: %T", confg) + } } diff --git a/common/pkgs/storage/types/shard_store.go b/common/pkgs/storage/types/shard_store.go index f67e7c7..61badd8 100644 --- a/common/pkgs/storage/types/shard_store.go +++ b/common/pkgs/storage/types/shard_store.go @@ -24,16 +24,16 @@ type StoreEvent interface { type ShardStore interface { StorageComponent - // 准备写入一个新文件,写入后获得FileHash - New() ShardWriter + // 写入一个新文件,写入后获得FileHash + Create(stream io.Reader) (FileInfo, error) // 使用F函数创建Option对象 Open(opt OpenOption) (io.ReadCloser, error) // 获得指定文件信息 Info(fileHash cdssdk.FileHash) (FileInfo, error) // 获取所有文件信息,尽量保证操作是原子的 ListAll() ([]FileInfo, error) - // 删除指定的文件 - Purge(removes []cdssdk.FileHash) error + // 垃圾清理。只保留availables中的文件,删除其他文件 + GC(avaiables []cdssdk.FileHash) error // 获得存储系统信息 Stats() Stats } @@ -63,14 +63,6 @@ type Stats struct { Description string } -type ShardWriter interface { - io.Writer - // 取消写入。要求允许在调用了Finish之后再调用此函数,且此时不应该有任何影响,方便使用defer语句 - Abort() error - // 结束写入,获得文件哈希值 - Finish() (FileInfo, error) -} - type OpenOption struct { FileHash cdssdk.FileHash Offset int64 diff --git a/common/pkgs/storage/types/shared_store.go b/common/pkgs/storage/types/shared_store.go index bab3f34..c32b617 100644 --- a/common/pkgs/storage/types/shared_store.go +++ b/common/pkgs/storage/types/shared_store.go @@ -1,5 +1,18 @@ package types +import ( + "io" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" +) + type SharedStore interface { StorageComponent + // 写入一个文件到Package的调度目录下,返回值为文件路径:userID/pkgID/path + WritePackageObject(userID cdssdk.UserID, pkgID cdssdk.PackageID, path string, stream io.Reader) (string, error) + // 获取所有已加载的Package信息 + ListLoadedPackages() ([]stgmod.LoadedPackageID, error) + // 垃圾回收,删除过期的Package + PackageGC(avaiables []stgmod.LoadedPackageID) error } diff --git a/common/pkgs/storage/utils/utils.go b/common/pkgs/storage/utils/utils.go index f6941b8..8a7f6b6 100644 --- a/common/pkgs/storage/utils/utils.go +++ b/common/pkgs/storage/utils/utils.go @@ -1,26 +1,12 @@ package utils -import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +import ( + "path/filepath" + "strconv" -type errorShardWriter struct { - err error -} - -func (w *errorShardWriter) Write(data []byte) (int, error) { - return 0, w.err -} - -// 取消写入。要求允许在调用了Finish之后再调用此函数,且此时不应该有任何影响。 -// 方便defer机制 -func (w *errorShardWriter) Abort() error { - return w.err -} - -// 结束写入,获得文件哈希值 -func (w *errorShardWriter) Finish() (types.FileInfo, error) { - return types.FileInfo{}, w.err -} + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) -func ErrorShardWriter(err error) types.ShardWriter { - return &errorShardWriter{err: err} +func MakeLoadedPackagePath(userID cdssdk.UserID, packageID cdssdk.PackageID) string { + return filepath.Join("packages", strconv.FormatInt(int64(userID), 10), strconv.FormatInt(int64(packageID), 10)) } diff --git a/common/utils/utils.go b/common/utils/utils.go index 29a98a3..3b9d1e8 100644 --- a/common/utils/utils.go +++ b/common/utils/utils.go @@ -2,15 +2,8 @@ package utils import ( "path/filepath" - "strconv" - - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) -func MakeLoadedPackagePath(userID cdssdk.UserID, packageID cdssdk.PackageID) string { - return filepath.Join("packages", strconv.FormatInt(int64(userID), 10), strconv.FormatInt(int64(packageID), 10)) -} - func MakeStorageLoadDirectory(stgDir string) string { return filepath.Join(stgDir, "packages") }