diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 9429433..ee56b03 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "sync" + "time" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -24,9 +25,11 @@ const ( ) type ShardStore struct { - stg cdssdk.Storage - cfg cdssdk.LocalShardStorage - lock sync.Mutex + stg cdssdk.Storage + cfg cdssdk.LocalShardStorage + lock sync.Mutex + workingTempFiles map[string]bool + done chan any } func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStore, error) { @@ -36,17 +39,67 @@ func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStor } return &ShardStore{ - stg: stg, - cfg: cfg, + stg: stg, + cfg: cfg, + workingTempFiles: make(map[string]bool), + done: make(chan any, 1), }, nil } func (s *ShardStore) Start(ch *types.StorageEventChan) { s.getLogger().Infof("local shard store start, root: %v, max size: %v", s.cfg.Root, s.cfg.MaxSize) + + go func() { + removeTempTicker := time.NewTicker(time.Minute * 10) + for { + select { + case <-removeTempTicker.C: + s.removeUnusedTempFiles() + case <-s.done: + return + } + } + }() +} + +func (s *ShardStore) removeUnusedTempFiles() { + s.lock.Lock() + defer s.lock.Unlock() + + log := s.getLogger() + + entries, err := os.ReadDir(filepath.Join(s.cfg.Root, TempDir)) + if err != nil { + log.Warnf("read temp dir: %v", err) + return + } + + for _, entry := range entries { + if entry.IsDir() { + continue + } + + if s.workingTempFiles[entry.Name()] { + continue + } + + path := filepath.Join(s.cfg.Root, TempDir, entry.Name()) + err = os.Remove(path) + if err != nil { + log.Warnf("remove temp file %v: %v", path, err) + } else { + log.Infof("remove unused temp file %v", path) + } + } } func (s *ShardStore) Stop() { s.getLogger().Infof("local shard store stop") + + select { + case s.done <- nil: + default: + } } func (s *ShardStore) New() types.ShardWriter { @@ -65,6 +118,8 @@ func (s *ShardStore) New() types.ShardWriter { return utils.ErrorShardWriter(err) } + s.workingTempFiles[filepath.Base(file.Name())] = true + return &ShardWriter{ path: file.Name(), // file.Name 包含tmpDir路径 file: file, @@ -194,11 +249,13 @@ func (s *ShardStore) onWritterAbort(w *ShardWriter) { s.getLogger().Debugf("writting file %v aborted", w.path) s.removeTempFile(w.path) + delete(s.workingTempFiles, filepath.Base(w.path)) } func (s *ShardStore) onWritterFinish(w *ShardWriter, hash cdssdk.FileHash) (types.FileInfo, error) { s.lock.Lock() defer s.lock.Unlock() + defer delete(s.workingTempFiles, filepath.Base(w.path)) log := s.getLogger()