diff --git a/client/internal/mount/vfs/cache/cache.go b/client/internal/mount/vfs/cache/cache.go index 1a3f67c..473b1c4 100644 --- a/client/internal/mount/vfs/cache/cache.go +++ b/client/internal/mount/vfs/cache/cache.go @@ -705,7 +705,9 @@ func (c *Cache) visitNode(path []string, node *trie.Node[*CacheFile], ch *CacheF } // 上传文件需要完全加载级别的缓存等级 - ch.LevelUp(LevelComplete) + if !ch.LevelUp(LevelComplete) { + return + } fullName := packageFullName{ch.pathComps[0], ch.pathComps[1]} pkg, ok := uploadingPkgs[fullName] @@ -717,20 +719,20 @@ func (c *Cache) visitNode(path []string, node *trie.Node[*CacheFile], ch *CacheF uploadingPkgs[fullName] = pkg } - obj := &uploadingObject{ + up := &uploadingObject{ pathComps: lo2.ArrayClone(ch.pathComps), cache: ch, } - pkg.upObjs = append(pkg.upObjs, obj) - ch.state.uploading = obj + pkg.upObjs = append(pkg.upObjs, up) + ch.state.uploading = up if info.DataRevision > 0 { - obj.reader = ch.OpenReadWhenScanning() + up.reader = ch.OpenReadWhenScanning() } if info.MetaRevision > 0 { - obj.modTime = info.ModTime - obj.metaRevision = info.MetaRevision + up.modTime = info.ModTime + up.metaRevision = info.MetaRevision } return } diff --git a/client/internal/mount/vfs/cache/file.go b/client/internal/mount/vfs/cache/file.go index 564e348..bbcf6f0 100644 --- a/client/internal/mount/vfs/cache/file.go +++ b/client/internal/mount/vfs/cache/file.go @@ -340,7 +340,6 @@ func loadReadOnlyCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) metaData, err := os.ReadFile(metaPath) if err != nil { - // 如果有数据文件,而没有元数据文件,则创建一个元数据文件 if !os.IsNotExist(err) { return nil, err } @@ -357,6 +356,8 @@ func loadReadOnlyCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) info.MetaRevision = 1 // 未同步的文件视为已修改 info.DataRevision = 1 + // 实际的元数据文件在LevelUp时才创建 + } else { err = serder.JSONToObject(metaData, info) if err != nil { @@ -518,7 +519,7 @@ func (f *CacheFile) LevelUp(level CacheLevel) bool { case LevelReadOnly: metaPath := f.cache.GetCacheMetaPath(f.pathComps...) - metaFile, err := os.OpenFile(metaPath, os.O_RDWR, 0644) + metaFile, err := os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0644) if err != nil { logger.Warnf("open meta file %v: %v", metaPath, err) return false diff --git a/hub/internal/rpc/ioswitch.go b/hub/internal/rpc/ioswitch.go index a97e7be..dd5375d 100644 --- a/hub/internal/rpc/ioswitch.go +++ b/hub/internal/rpc/ioswitch.go @@ -41,10 +41,10 @@ func (s *Service) SendIOStream(ctx context.Context, req *hubrpc.SendIOStream) (* Debugf("stream input") // 同一批Plan中每个节点的Plan的启动时间有先后,但最多不应该超过30秒 - ctx, cancel := context.WithTimeout(ctx, time.Second*30) + ctx2, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - sw := s.swWorker.FindByIDContexted(ctx, exec.PlanID(req.PlanID)) + sw := s.swWorker.FindByIDContexted(ctx2, exec.PlanID(req.PlanID)) if sw == nil { return nil, rpc.Failed(errorcode.DataNotFound, "plan not found") } @@ -72,10 +72,10 @@ func (s *Service) GetIOStream(ctx context.Context, req *hubrpc.GetIOStream) (*hu Debugf("stream output") // 同上 - ctx, cancel := context.WithTimeout(ctx, time.Second*30) + ctx2, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - sw := s.swWorker.FindByIDContexted(ctx, exec.PlanID(req.PlanID)) + sw := s.swWorker.FindByIDContexted(ctx2, exec.PlanID(req.PlanID)) if sw == nil { return nil, rpc.Failed(errorcode.DataNotFound, "plan not found") } @@ -93,10 +93,10 @@ func (s *Service) GetIOStream(ctx context.Context, req *hubrpc.GetIOStream) (*hu } func (s *Service) SendIOVar(ctx context.Context, req *hubrpc.SendIOVar) (*hubrpc.SendIOVarResp, *rpc.CodeError) { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) + ctx2, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - sw := s.swWorker.FindByIDContexted(ctx, exec.PlanID(req.PlanID)) + sw := s.swWorker.FindByIDContexted(ctx2, exec.PlanID(req.PlanID)) if sw == nil { return nil, rpc.Failed(errorcode.DataNotFound, "plan not found") }