| @@ -9,6 +9,7 @@ import ( | |||
| "os" | |||
| "path/filepath" | |||
| "sync" | |||
| "time" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||
| @@ -24,9 +25,11 @@ const ( | |||
| ) | |||
| type ShardStore struct { | |||
| stg cdssdk.Storage | |||
| cfg cdssdk.LocalShardStorage | |||
| lock sync.Mutex | |||
| stg cdssdk.Storage | |||
| cfg cdssdk.LocalShardStorage | |||
| lock sync.Mutex | |||
| workingTempFiles map[string]bool | |||
| done chan any | |||
| } | |||
| func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStore, error) { | |||
| @@ -36,17 +39,67 @@ func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStor | |||
| } | |||
| return &ShardStore{ | |||
| stg: stg, | |||
| cfg: cfg, | |||
| stg: stg, | |||
| cfg: cfg, | |||
| workingTempFiles: make(map[string]bool), | |||
| done: make(chan any, 1), | |||
| }, nil | |||
| } | |||
| func (s *ShardStore) Start(ch *types.StorageEventChan) { | |||
| s.getLogger().Infof("local shard store start, root: %v, max size: %v", s.cfg.Root, s.cfg.MaxSize) | |||
| go func() { | |||
| removeTempTicker := time.NewTicker(time.Minute * 10) | |||
| for { | |||
| select { | |||
| case <-removeTempTicker.C: | |||
| s.removeUnusedTempFiles() | |||
| case <-s.done: | |||
| return | |||
| } | |||
| } | |||
| }() | |||
| } | |||
| func (s *ShardStore) removeUnusedTempFiles() { | |||
| s.lock.Lock() | |||
| defer s.lock.Unlock() | |||
| log := s.getLogger() | |||
| entries, err := os.ReadDir(filepath.Join(s.cfg.Root, TempDir)) | |||
| if err != nil { | |||
| log.Warnf("read temp dir: %v", err) | |||
| return | |||
| } | |||
| for _, entry := range entries { | |||
| if entry.IsDir() { | |||
| continue | |||
| } | |||
| if s.workingTempFiles[entry.Name()] { | |||
| continue | |||
| } | |||
| path := filepath.Join(s.cfg.Root, TempDir, entry.Name()) | |||
| err = os.Remove(path) | |||
| if err != nil { | |||
| log.Warnf("remove temp file %v: %v", path, err) | |||
| } else { | |||
| log.Infof("remove unused temp file %v", path) | |||
| } | |||
| } | |||
| } | |||
| func (s *ShardStore) Stop() { | |||
| s.getLogger().Infof("local shard store stop") | |||
| select { | |||
| case s.done <- nil: | |||
| default: | |||
| } | |||
| } | |||
| func (s *ShardStore) New() types.ShardWriter { | |||
| @@ -65,6 +118,8 @@ func (s *ShardStore) New() types.ShardWriter { | |||
| return utils.ErrorShardWriter(err) | |||
| } | |||
| s.workingTempFiles[filepath.Base(file.Name())] = true | |||
| return &ShardWriter{ | |||
| path: file.Name(), // file.Name 包含tmpDir路径 | |||
| file: file, | |||
| @@ -194,11 +249,13 @@ func (s *ShardStore) onWritterAbort(w *ShardWriter) { | |||
| s.getLogger().Debugf("writting file %v aborted", w.path) | |||
| s.removeTempFile(w.path) | |||
| delete(s.workingTempFiles, filepath.Base(w.path)) | |||
| } | |||
| func (s *ShardStore) onWritterFinish(w *ShardWriter, hash cdssdk.FileHash) (types.FileInfo, error) { | |||
| s.lock.Lock() | |||
| defer s.lock.Unlock() | |||
| defer delete(s.workingTempFiles, filepath.Base(w.path)) | |||
| log := s.getLogger() | |||