| @@ -36,6 +36,20 @@ func (m *MountService) DumpStatus(ctx *gin.Context) { | |||||
| })) | })) | ||||
| } | } | ||||
| func (m *MountService) ReloadFilter(ctx *gin.Context) { | |||||
| // log := logger.WithField("HTTP", "Mount.ReloadFilter") | |||||
| // var req cliapi.MountReloadFilter | |||||
| // if err := ctx.ShouldBindQuery(&req); err != nil { | |||||
| // log.Warnf("binding body: %s", err.Error()) | |||||
| // ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | |||||
| // return | |||||
| // } | |||||
| m.svc.Mount.ReloadSyncFilter() | |||||
| ctx.JSON(http.StatusOK, types.OK(cliapi.MountReloadFilterResp{})) | |||||
| } | |||||
| func (m *MountService) StartReclaimSpace(ctx *gin.Context) { | func (m *MountService) StartReclaimSpace(ctx *gin.Context) { | ||||
| // log := logger.WithField("HTTP", "Mount.ReclaimSpace") | // log := logger.WithField("HTTP", "Mount.ReclaimSpace") | ||||
| // var req cliapi.MountReclaimSpace | // var req cliapi.MountReclaimSpace | ||||
| @@ -81,6 +81,7 @@ func (s *Server) InitRouters(rt gin.IRoutes, ah *auth.Auth) { | |||||
| rt.GET(cliapi.MountDumpStatusPath, certAuth, s.Mount().DumpStatus) | rt.GET(cliapi.MountDumpStatusPath, certAuth, s.Mount().DumpStatus) | ||||
| rt.POST(cliapi.MountStartReclaimSpacePath, certAuth, s.Mount().StartReclaimSpace) | rt.POST(cliapi.MountStartReclaimSpacePath, certAuth, s.Mount().StartReclaimSpace) | ||||
| rt.POST(cliapi.MountReloadFilterPath, certAuth, s.Mount().ReloadFilter) | |||||
| rt.GET(cliapi.TickTockListJobsPath, certAuth, s.TickTock().ListJobs) | rt.GET(cliapi.TickTockListJobsPath, certAuth, s.TickTock().ListJobs) | ||||
| rt.POST(cliapi.TickTockRunJobPath, certAuth, s.TickTock().RunJob) | rt.POST(cliapi.TickTockRunJobPath, certAuth, s.TickTock().RunJob) | ||||
| @@ -99,6 +99,13 @@ func (m *Mount) Dump() MountStatus { | |||||
| } | } | ||||
| } | } | ||||
| func (m *Mount) ReloadSyncFilter() { | |||||
| if m.vfs == nil { | |||||
| return | |||||
| } | |||||
| m.vfs.ReloadSyncFilter() | |||||
| } | |||||
| func (m *Mount) StartReclaimSpace() { | func (m *Mount) StartReclaimSpace() { | ||||
| if m.vfs == nil { | if m.vfs == nil { | ||||
| return | return | ||||
| @@ -39,8 +39,10 @@ func (m *Mount) Dump() MountStatus { | |||||
| return MountStatus{} | return MountStatus{} | ||||
| } | } | ||||
| func (m *Mount) StartReclaimSpace() { | |||||
| func (m *Mount) ReloadSyncFilter() { | |||||
| } | |||||
| func (m *Mount) StartReclaimSpace() { | |||||
| } | } | ||||
| func (m *Mount) NotifyObjectInvalid(obj clitypes.Object) { | func (m *Mount) NotifyObjectInvalid(obj clitypes.Object) { | ||||
| @@ -24,6 +24,10 @@ import ( | |||||
| clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" | clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" | ||||
| ) | ) | ||||
| const ( | |||||
| SyncFilterConfigName = ".cds.sync.filter" | |||||
| ) | |||||
| type CacheEntry interface { | type CacheEntry interface { | ||||
| fuse.FsEntry | fuse.FsEntry | ||||
| // 在虚拟文件系统中的路径,即不包含缓存目录的路径 | // 在虚拟文件系统中的路径,即不包含缓存目录的路径 | ||||
| @@ -59,6 +63,7 @@ type Cache struct { | |||||
| cacheDone chan any | cacheDone chan any | ||||
| doFullScan chan any | doFullScan chan any | ||||
| activeCache *trie.Trie[*CacheFile] | activeCache *trie.Trie[*CacheFile] | ||||
| syncFilter *SyncFilter | |||||
| } | } | ||||
| func NewCache(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Cache { | func NewCache(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Cache { | ||||
| @@ -71,11 +76,15 @@ func NewCache(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downlo | |||||
| cacheDone: make(chan any, 1), | cacheDone: make(chan any, 1), | ||||
| doFullScan: make(chan any, 1), | doFullScan: make(chan any, 1), | ||||
| activeCache: trie.NewTrie[*CacheFile](), | activeCache: trie.NewTrie[*CacheFile](), | ||||
| syncFilter: NewSyncFilter(), | |||||
| } | } | ||||
| } | } | ||||
| func (c *Cache) Start() { | func (c *Cache) Start() { | ||||
| c.syncFilter.ReloadConfig(c.GetCacheDataPath(SyncFilterConfigName)) | |||||
| go c.scanningCache() | go c.scanningCache() | ||||
| go c.scanningData() | go c.scanningData() | ||||
| } | } | ||||
| @@ -97,6 +106,10 @@ func (c *Cache) GetCacheMetaPath(comps ...string) string { | |||||
| return filepath.Join(comps2...) | return filepath.Join(comps2...) | ||||
| } | } | ||||
| func (c *Cache) ReloadSyncFilter() { | |||||
| c.syncFilter.ReloadConfig(c.GetCacheDataPath(SyncFilterConfigName)) | |||||
| } | |||||
| func (c *Cache) Dump() CacheStatus { | func (c *Cache) Dump() CacheStatus { | ||||
| c.lock.RLock() | c.lock.RLock() | ||||
| defer c.lock.RUnlock() | defer c.lock.RUnlock() | ||||
| @@ -677,6 +690,10 @@ func (c *Cache) visitNode(path []string, node *trie.Node[*CacheFile], ch *CacheF | |||||
| shouldUpload = false | shouldUpload = false | ||||
| } | } | ||||
| if !c.syncFilter.ShouldSync(ch.pathComps, info.Size) { | |||||
| shouldUpload = false | |||||
| } | |||||
| // 1. 本地缓存被修改了,如果一段时间内没有被使用,则进行上传 | // 1. 本地缓存被修改了,如果一段时间内没有被使用,则进行上传 | ||||
| if shouldUpload && (info.DataRevision > 0 || info.MetaRevision > 0) { | if shouldUpload && (info.DataRevision > 0 || info.MetaRevision > 0) { | ||||
| if time.Since(info.FreeTime) < c.cfg.UploadPendingTime { | if time.Since(info.FreeTime) < c.cfg.UploadPendingTime { | ||||
| @@ -823,6 +840,12 @@ func (c *Cache) scanningData() { | |||||
| // 无条件加载缓存,可能会导致一些不需要被同步到云端的文件在缓存等级降到最低取消跟踪后,又重新被加载进来 | // 无条件加载缓存,可能会导致一些不需要被同步到云端的文件在缓存等级降到最低取消跟踪后,又重新被加载进来 | ||||
| // 不过由于扫描频率不高,所以问题不大 | // 不过由于扫描频率不高,所以问题不大 | ||||
| walkTraceComps = append(walkTraceComps, e[0].Name()) | walkTraceComps = append(walkTraceComps, e[0].Name()) | ||||
| // 第一个元素是data目录,所以要从第二个元素开始 | |||||
| if !c.syncFilter.ShouldSync(walkTraceComps[1:], e[0].Size()) { | |||||
| walkTraceComps = walkTraceComps[:len(walkTraceComps)-1] | |||||
| continue | |||||
| } | |||||
| untrackedFiles = append(untrackedFiles, lo2.ArrayClone(walkTraceComps[1:])) | untrackedFiles = append(untrackedFiles, lo2.ArrayClone(walkTraceComps[1:])) | ||||
| walkTraceComps = walkTraceComps[:len(walkTraceComps)-1] | walkTraceComps = walkTraceComps[:len(walkTraceComps)-1] | ||||
| } | } | ||||
| @@ -0,0 +1,128 @@ | |||||
| package cache | |||||
| import ( | |||||
| "encoding/json" | |||||
| "os" | |||||
| "path/filepath" | |||||
| "strings" | |||||
| "sync" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| ) | |||||
| type SyncFilterRuleConfig struct { | |||||
| Repo FilterRepoRule `json:"repo"` | |||||
| Object FilterObjectRule `json:"object"` | |||||
| } | |||||
| type FilterRepoRule struct { | |||||
| BlackListMode bool `json:"blackListMode"` // 是否黑名单模式 | |||||
| Names []string `json:"names"` // 匹配模式 | |||||
| } | |||||
| type FilterObjectRule struct { | |||||
| BlackListMode bool `json:"blackListMode"` // 是否黑名单模式 | |||||
| Exts []string `json:"exts"` // 文件名后缀 | |||||
| MinSize int64 `json:"minSize"` | |||||
| MaxSize int64 `json:"maxSize"` // 0为不限制大小 | |||||
| } | |||||
| type SyncFilter struct { | |||||
| lock *sync.RWMutex | |||||
| config SyncFilterRuleConfig | |||||
| repoNames map[string]bool | |||||
| exts map[string]bool | |||||
| } | |||||
| func NewSyncFilter() *SyncFilter { | |||||
| return &SyncFilter{ | |||||
| lock: &sync.RWMutex{}, | |||||
| config: SyncFilterRuleConfig{}, | |||||
| repoNames: make(map[string]bool), | |||||
| exts: make(map[string]bool), | |||||
| } | |||||
| } | |||||
| func (f *SyncFilter) ShouldSync(fullPathComps []string, size int64) bool { | |||||
| f.lock.RLock() | |||||
| defer f.lock.RUnlock() | |||||
| return f.shouldSyncUnlocked(fullPathComps, size) | |||||
| } | |||||
| func (f *SyncFilter) shouldSyncUnlocked(fullPathComps []string, size int64) bool { | |||||
| // user/repo/objects/pack/pack-xxx.pack 或者 user/repo/objects/xx/xxxxxxxxx | |||||
| if len(fullPathComps) < 5 { | |||||
| return false | |||||
| } | |||||
| repoName := fullPathComps[1] | |||||
| if f.repoNames[repoName] { | |||||
| if f.config.Repo.BlackListMode { | |||||
| return false | |||||
| } | |||||
| } else if !f.config.Repo.BlackListMode { | |||||
| return false | |||||
| } | |||||
| // ext包含"." | |||||
| ext := filepath.Ext(fullPathComps[len(fullPathComps)-1]) | |||||
| if f.exts[ext] { | |||||
| if f.config.Object.BlackListMode { | |||||
| return false | |||||
| } | |||||
| } else if !f.config.Object.BlackListMode { | |||||
| return false | |||||
| } | |||||
| if size < f.config.Object.MinSize || (f.config.Object.MaxSize > 0 && size > f.config.Object.MaxSize) { | |||||
| return false | |||||
| } | |||||
| if fullPathComps[2] != "objects" { | |||||
| return false | |||||
| } | |||||
| if fullPathComps[3] == "pack" { | |||||
| // 避免同步临时文件 | |||||
| if !strings.HasPrefix(fullPathComps[4], "pack-") { | |||||
| return false | |||||
| } | |||||
| } else if len(fullPathComps[3]) == 2 { | |||||
| // 未压缩的objects | |||||
| } else { | |||||
| return false | |||||
| } | |||||
| return true | |||||
| } | |||||
| func (f *SyncFilter) ReloadConfig(path string) { | |||||
| data, err := os.ReadFile(path) | |||||
| if err != nil { | |||||
| logger.Warnf("loading trace rule config: %v", err) | |||||
| return | |||||
| } | |||||
| cfg := SyncFilterRuleConfig{} | |||||
| err = json.Unmarshal(data, &cfg) | |||||
| if err != nil { | |||||
| logger.Warnf("unmarshal trace rule config: %v", err) | |||||
| return | |||||
| } | |||||
| f.lock.Lock() | |||||
| defer f.lock.Unlock() | |||||
| f.config = cfg | |||||
| f.repoNames = make(map[string]bool) | |||||
| for _, name := range cfg.Repo.Names { | |||||
| f.repoNames[name] = true | |||||
| } | |||||
| f.exts = make(map[string]bool) | |||||
| for _, ext := range cfg.Object.Exts { | |||||
| f.exts[ext] = true | |||||
| } | |||||
| logger.Infof("trace rule config reloaded: repoNames=%v, exts=%v, minSize=%d, maxSize=%d", len(f.repoNames), len(f.exts), f.config.Object.MinSize, f.config.Object.MaxSize) | |||||
| } | |||||
| @@ -43,6 +43,10 @@ func (v *Vfs) Dump() cache.CacheStatus { | |||||
| return v.cache.Dump() | return v.cache.Dump() | ||||
| } | } | ||||
| func (v *Vfs) ReloadSyncFilter() { | |||||
| v.cache.ReloadSyncFilter() | |||||
| } | |||||
| func (v *Vfs) ReclaimSpace() { | func (v *Vfs) ReclaimSpace() { | ||||
| v.cache.ReclaimSpace() | v.cache.ReclaimSpace() | ||||
| } | } | ||||
| @@ -37,6 +37,26 @@ func (c *MountService) DumpStatus(req MountDumpStatus) (*MountDumpStatusResp, er | |||||
| return JSONAPI(&c.cfg, c.httpCli, &req, &MountDumpStatusResp{}) | return JSONAPI(&c.cfg, c.httpCli, &req, &MountDumpStatusResp{}) | ||||
| } | } | ||||
| const MountReloadFilterPath = "/mount/reloadFilter" | |||||
| type MountReloadFilter struct { | |||||
| } | |||||
| func (r *MountReloadFilter) MakeParam() *sdks.RequestParam { | |||||
| return sdks.MakeQueryParam(http.MethodPost, MountReloadFilterPath, r) | |||||
| } | |||||
| type MountReloadFilterResp struct { | |||||
| } | |||||
| func (r *MountReloadFilterResp) ParseResponse(resp *http.Response) error { | |||||
| return sdks.ParseCodeDataJSONResponse(resp, r) | |||||
| } | |||||
| func (c *MountService) ReloadFilter(req MountReloadFilter) (*MountReloadFilterResp, error) { | |||||
| return JSONAPI(&c.cfg, http.DefaultClient, &req, &MountReloadFilterResp{}) | |||||
| } | |||||
| const MountStartReclaimSpacePath = "/mount/startReclaimSpace" | const MountStartReclaimSpacePath = "/mount/startReclaimSpace" | ||||
| type StartMountReclaimSpace struct{} | type StartMountReclaimSpace struct{} | ||||