From 967477210dff365e79c87d1c5e8270f7dccf1bfc Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 3 Mar 2025 11:24:06 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=9E=E7=8E=B0WriteAt?= =?UTF-8?q?=E5=92=8CReadAt=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../internal/mount/vfs/cache/data_loader.go | 37 +++ client2/internal/mount/vfs/cache/file.go | 249 ++++++++++++++++-- .../internal/mount/vfs/cache/file_segment.go | 97 +++---- .../mount/vfs/cache/file_segment_test.go | 2 +- client2/internal/mount/vfs/cache/utils.go | 38 +++ common/pkgs/downloader/downloader.go | 4 + 6 files changed, 355 insertions(+), 72 deletions(-) create mode 100644 client2/internal/mount/vfs/cache/data_loader.go create mode 100644 client2/internal/mount/vfs/cache/utils.go diff --git a/client2/internal/mount/vfs/cache/data_loader.go b/client2/internal/mount/vfs/cache/data_loader.go new file mode 100644 index 0000000..cd35c8c --- /dev/null +++ b/client2/internal/mount/vfs/cache/data_loader.go @@ -0,0 +1,37 @@ +package cache + +import ( + "io" + "os" + + stgmod "gitlink.org.cn/cloudream/storage/common/models" +) + +type LocalLoader struct { + cache *CacheFile + file *os.File + offset int64 + // 当前正在加载的数据段。为nil表示没有加载任何数据。 + loading *FileSegment + // 最多加载到哪个位置。-1代表无限制。 + limit int64 +} + +// 启动数据加载。通过设置seg的Position和Length来指定要加载的数据范围。 +func (l *LocalLoader) BeginLoading(seg *FileSegment) { + +} + +type RemoteLoader struct { + cache *CacheFile + object stgmod.ObjectDetail + reader io.ReadCloser + offset int64 + // 当前正在加载的数据段。为nil表示没有加载任何数据。 + loading *FileSegment + // 最多加载到哪个位置。-1代表无限制。 + limit int64 +} + +func (l *RemoteLoader) BeginLoading(seg *FileSegment) { +} diff --git a/client2/internal/mount/vfs/cache/file.go b/client2/internal/mount/vfs/cache/file.go index 8dc05a6..e574e01 100644 --- a/client2/internal/mount/vfs/cache/file.go +++ b/client2/internal/mount/vfs/cache/file.go @@ -1,23 +1,50 @@ package cache import ( + "context" + "io" "os" "path/filepath" "sync" "time" + "gitlink.org.cn/cloudream/common/pkgs/future" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" ) type FileInfo struct { - Dirty bool // 本文件是否有未提交的修改 - Segments []Range // 数据段列表,按照段开始位置从小到大排列 - ObjectID cdssdk.ObjectID // 文件对应的对象ID,仅在文件是一个缓存文件时才有值 - Hash string // 如果本文件完全是一个缓存文件,那么这个字段记录了其内容的哈希值,用于在下载缓存数据时,检查远端文件是否被修改过 - Size int64 // 文件大小。如果是缓存文件,那么这个字段记录了其内容的总大小,用于判断文件是否完整。如果是本地文件,那么这个字段记录了其实际大小。 - ModTime time.Time // 文件的最后修改时间 - Perm os.FileMode // 文件的权限 + // 文件总大小。可能会超过对应的远端文件的大小。 + // 此大小可能与本地缓存文件大小也不用,需要定时将本地缓存文件大小修正到与这个值相同。 + Size int64 + // 本文件是否有未提交的修改 + Dirty bool + // 数据段列表,按照段开始位置从小到大排列 + Segments []Range + // 文件对应的对象ID,仅在文件是一个缓存文件时才有值 + ObjectID cdssdk.ObjectID + // 文件对应的对象大小,仅在文件是一个缓存文件时才有值。 + // 此值代表有多少数据应该从远端加载,所以可能会小于远端实际大小 + ObjectSize int64 + // 如果本文件完全是一个缓存文件,那么这个字段记录了其内容的哈希值,用于在下载缓存数据时,检查远端文件是否被修改过 + Hash string + // 文件的最后修改时间 + ModTime time.Time + // 文件的权限 + Perm os.FileMode +} + +// 返回值代表文件大小是否发生了变化 +func (f *FileInfo) EnlargeTo(size int64) bool { + if size > f.Size { + f.Size = size + return true + } + return false } type Range struct { @@ -25,16 +52,54 @@ type Range struct { Length int64 } +func (r *Range) GetPosition() int64 { + return r.Position +} + +func (r *Range) GetLength() int64 { + return r.Length +} + +type ReadRequest struct { + Position int64 + Length int64 + Callback *future.SetVoidFuture + Zeros int64 // 如果>0,那么说明读到了一块全0的数据,值的大小代表这块数据的长度,此时Segment字段为nil + Segment *FileSegment +} + +func (r *ReadRequest) GetPosition() int64 { + return r.Position +} + +func (r *ReadRequest) GetLength() int64 { + return r.Length +} + +type WriteReqeust struct { + Position int64 + Length int64 + Callback *future.SetVoidFuture + Segment *FileSegment +} + // 所有读写过程共用同一个CacheFile对象。 // 不应该将此结构体保存到对象中 type CacheFile struct { - pathComps []string - name string - info FileInfo - lock sync.RWMutex - segBuffer SegmentBuffer - readers int - writers int + objDownloader *downloader.Downloader + pathComps []string + name string + info FileInfo + lock sync.Mutex + segBuffer SegmentBuffer + readers int + writers int + + localLoaders []*LocalLoader + remoteLoaders []*RemoteLoader + pendingReadings []*ReadRequest + pendingWritings []*WriteReqeust + managerChan chan any } func loadCacheFile(pathComps []string, infoPath string) (*CacheFile, error) { @@ -109,9 +174,29 @@ func (f *CacheFile) Release() { } -// 一个文件段的数据被写入到本地了 -func (f *CacheFile) OnSaved(seg *FileSegment) { +// 一个文件段的数据被写入到本地了。err为nil表示成功,否则表示写入失败。 +func (f *CacheFile) OnSaved(seg *FileSegment, err error) { + +} + +// 一个文件段的数据被加载到内存了。err为nil表示成功,否则表示加载失败。 +func (f *CacheFile) OnLoaded(seg *FileSegment, err error) { + +} + +func (f *CacheFile) notifyManager() { + select { + case f.managerChan <- nil: + break + default: + } +} + +func (f *CacheFile) managing() { + for { + <-f.managerChan + } } type CacheFileReadWriter struct { @@ -120,11 +205,143 @@ type CacheFileReadWriter struct { } func (f *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) { + f.file.lock.Lock() + + if off >= f.file.info.Size { + return 0, io.EOF + } + segIdx := f.file.segBuffer.FirstContains(off) + + if segIdx >= 0 { + seg := f.file.segBuffer.Segment(segIdx) + + // 读取的数据在当前段内 + if off >= seg.Position && off < seg.Position+seg.Length { + readLen := math2.Min(int64(len(buf)), seg.Position+seg.Length-off) + seg.RefCount++ + f.file.lock.Unlock() + + copy(buf[:readLen], seg.SubSliceAbs(off, readLen)) + + f.file.lock.Lock() + seg.RefCount-- + seg.Type = SegmentDirty + f.file.lock.Unlock() + return int(readLen), nil + } + + // if off >= f.file.info.ObjectSize { + // readLen := int64(len(buf)) + + // if segIdx < f.file.segBuffer.BuzyCount()-1 { + // nextSeg := f.file.segBuffer.Segment(segIdx + 1) + // readLen = math2.Min(readLen, nextSeg.Position-off) + // } else { + // readLen = math2.Min(readLen, f.file.info.Size-off) + // } + // f.file.lock.Unlock() + + // clear(buf[:readLen]) + // return int(readLen), nil + // } + } + + // 没有被缓存的数据,则需要通知加载器去加载 + + fut := future.NewSetVoid() + req := &ReadRequest{ + Position: off, + Callback: fut, + } + insertIdx := FirstContains(f.file.pendingReadings, off) + f.file.pendingReadings = lo2.Insert(f.file.pendingReadings, insertIdx+1, req) + f.file.lock.Unlock() + + f.file.notifyManager() + + err := fut.Wait(context.Background()) + if err != nil { + return 0, err + } + + if req.Segment != nil { + // 这里不加锁,因为在引用计数大于0期间,Position不变,而Length虽然可能发生变化,但并发读写是未定义行为,所以不管读到多少数据都可以 + readLen := math2.Min(int64(len(buf)), req.Segment.Position+req.Segment.Length-off) + copy(buf[:readLen], req.Segment.SubSliceAbs(off, readLen)) + + // bufferManager线程会在调用回调之前,给引用计数+1 + // TODO 这种做法容易粗心产生遗漏,且不利于实现超时机制 + f.file.lock.Lock() + req.Segment.RefCount-- + f.file.lock.Unlock() + return int(readLen), nil + } + + if req.Zeros > 0 { + clear(buf[:req.Zeros]) + return int(req.Zeros), nil + } + + return 0, io.EOF } func (f *CacheFileReadWriter) WriteAt(buf []byte, off int64) (int, error) { + if f.readOnly { + return 0, fuse.ErrPermission + } + + f.file.lock.Lock() + // 如果找到一个包含off位置的段,那么就先写满这个段 + segIdx := f.file.segBuffer.FirstContains(off) + if segIdx >= 0 { + seg := f.file.segBuffer.Segment(segIdx) + + if off >= seg.Position && off < seg.Position+seg.Length { + writeLen := math2.Min(int64(len(buf)), seg.BufferEnd()-off) + seg.RefCount++ + f.file.lock.Unlock() + + // 不管此时是不是有其他线程在读取数据,因为我们约定并发读写就是未定义行为。 + copy(seg.SubSliceAbs(off, writeLen), buf[:writeLen]) + + f.file.lock.Lock() + seg.RefCount-- + seg.EnlargeTo(off + writeLen) + seg.Type = SegmentDirty + f.file.info.EnlargeTo(seg.AvailableEnd()) + f.file.lock.Unlock() + f.file.notifyManager() + return int(writeLen), nil + } + } + + fut := future.NewSetVoid() + req := &WriteReqeust{ + Position: off, + Callback: fut, + } + f.file.pendingWritings = append(f.file.pendingWritings, req) + f.file.lock.Unlock() + + err := fut.Wait(context.Background()) + if err != nil { + return 0, err + } + + // 一些说明参考ReadAt函数 + // 由managing线程保证文件的同一个位置只会有一个段,因此这里不考虑在copy期间又有其他线程在写同一个段导致的产生新的重复段 + writeLen := math2.Min(int64(len(buf)), req.Segment.BufferEnd()-off) + copy(req.Segment.SubSliceAbs(off, writeLen), buf[:writeLen]) + f.file.lock.Lock() + req.Segment.RefCount-- + req.Segment.EnlargeTo(off + writeLen) + req.Segment.Type = SegmentDirty + f.file.info.EnlargeTo(req.Segment.AvailableEnd()) + f.file.lock.Unlock() + f.file.notifyManager() + return int(writeLen), nil } func (f *CacheFileReadWriter) Sync() error { diff --git a/client2/internal/mount/vfs/cache/file_segment.go b/client2/internal/mount/vfs/cache/file_segment.go index 8bcbcd7..44d1236 100644 --- a/client2/internal/mount/vfs/cache/file_segment.go +++ b/client2/internal/mount/vfs/cache/file_segment.go @@ -1,17 +1,17 @@ package cache import ( - "github.com/samber/lo" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/common/utils/sync2" ) type SegmentType int const ( + // 数据段刚被初始化 + SegmentInit = iota // 数据来自本地文件 - SegmentLocal = iota + SegmentLocal // 数据来自远端文件,还未写入到本地文件 SegmentRemote // 数据由用户写入,还未写入到本地文件 @@ -28,16 +28,25 @@ type FileSegment struct { // 当前段是否正在被保存到本地文件中 IsSaving bool // 引用计数。当引用计数为0时,可以安全地删除此段 - RefBuzyCount int + RefCount int } -func (s *FileSegment) SubSliceAbs(pos int64) []byte { - return s.Buffer[pos-s.Position:] +func (s *FileSegment) GetPosition() int64 { + return s.Position +} + +func (s *FileSegment) GetLength() int64 { + return s.Length +} + +func (s *FileSegment) SubSliceAbs(pos int64, len int64) []byte { + start := pos - s.Position + return s.Buffer[start : start+len] } // 将当前段拆分为两个段。当前段将持有第一个段,返回值持有第二个段 func (s *FileSegment) SplitAbs(pos int64) *FileSegment { - s2 := s.SubSliceAbs(pos) + s2 := s.Buffer[pos-s.Position:] s2Len := math2.Max(s.Position+s.Length-pos, 0) s.Buffer = s.Buffer[:pos-s.Position] @@ -51,52 +60,35 @@ func (s *FileSegment) SplitAbs(pos int64) *FileSegment { } } +func (s *FileSegment) AvailableEnd() int64 { + return s.Position + s.Length +} + +func (s *FileSegment) BufferEnd() int64 { + return s.Position + int64(len(s.Buffer)) +} + +func (s *FileSegment) EnlargeTo(pos int64) { + if pos > s.Position+s.Length { + s.Length = pos - s.Position + } +} + type SegmentBuffer struct { // 正在使用的文件段缓冲区 - Buzys []*FileSegment - // 完全空闲的文件段缓冲区,可以作为新段使用 - frees *sync2.BucketPool[*FileSegment] + buzys []*FileSegment } func (s *SegmentBuffer) BuzyCount() int { - return len(s.Buzys) + return len(s.buzys) } -// 申请一个空闲段,如果暂时没有空闲段,则会阻塞等待 -func (s *SegmentBuffer) AcquireFree() *FileSegment { - s.frees.GetEmpty() +func (s *SegmentBuffer) Segment(idx int) *FileSegment { + return s.buzys[idx] } -// 查找第一个包含指定位置的段索引。如果所有段都不包含指定位置,那么有以下三种情况: -// -// 1. pos小于第一个段的位置,返回-1 -// -// 2. pos大于等于最后一个段的结束位置,返回BuzyCount() - 1 -// -// 3. pos在段之间的空洞内,那么会返回小于pos的最后一个段 -// -// 注:2、3情况返回的结果是相同的 func (s *SegmentBuffer) FirstContains(pos int64) int { - low, high := 0, len(s.Buzys)-1 - - for low <= high { - mid := (low + high) / 2 - - if s.Buzys[mid].Position > pos { - high = mid - 1 - } else if s.Buzys[mid].Position < pos { - if mid == s.BuzyCount()-1 { - return mid - } - - low = mid - - } else { - return mid - } - } - - return high + return FirstContains(s.buzys, pos) } // 将指定段插入到段缓存的恰当位置 @@ -104,27 +96,22 @@ func (s *SegmentBuffer) Insert(seg *FileSegment) { index := s.FirstContains(seg.Position) if index == -1 { - s.Buzys = append([]*FileSegment{seg}, s.Buzys...) + s.buzys = append([]*FileSegment{seg}, s.buzys...) } else { // index是最后一个小于Position的位置,所以要加1 - s.Buzys = lo2.Insert(s.Buzys, index+1, seg) + s.buzys = lo2.Insert(s.buzys, index+1, seg) } } // 插入一个段到指定索引位置。不会检查插入后Segments是否依然保持有序。 func (s *SegmentBuffer) InsertAt(index int, seg *FileSegment) { - s.Buzys = lo2.Insert(s.Buzys, index, seg) + s.buzys = lo2.Insert(s.buzys, index, seg) } -// 将指定位置的段从Buzys中移除,并放入Frees中 -func (s *SegmentBuffer) FreeAt(index int) { - buf := s.Buzys[index] - s.Buzys = append(s.Buzys[:index], s.Buzys[index+1:]...) - s.Frees = append(s.Frees, buf) +func (s *SegmentBuffer) RemoveAt(index int) { + s.buzys = lo2.RemoveAt(s.buzys, index) } -// 将指定段从Buzys中移除,并放入Frees中 -func (s *SegmentBuffer) Free(seg *FileSegment) { - index := lo.IndexOf(s.Buzys, seg) - s.FreeAt(index) +func (s *SegmentBuffer) Remove(seg *FileSegment) { + s.buzys = lo2.Remove(s.buzys, seg) } diff --git a/client2/internal/mount/vfs/cache/file_segment_test.go b/client2/internal/mount/vfs/cache/file_segment_test.go index 3f0c8ca..a8947ef 100644 --- a/client2/internal/mount/vfs/cache/file_segment_test.go +++ b/client2/internal/mount/vfs/cache/file_segment_test.go @@ -16,7 +16,7 @@ func Test_FindSegmentIndex(t *testing.T) { }{ { buffer: SegmentBuffer{ - Buzys: []*FileSegment{ + buzys: []*FileSegment{ {Position: 0}, {Position: 10}, }, }, diff --git a/client2/internal/mount/vfs/cache/utils.go b/client2/internal/mount/vfs/cache/utils.go new file mode 100644 index 0000000..fa6e0d1 --- /dev/null +++ b/client2/internal/mount/vfs/cache/utils.go @@ -0,0 +1,38 @@ +package cache + +type Ranger interface { + GetPosition() int64 + GetLength() int64 +} + +// 查找第一个包含指定位置的段索引。如果所有段都不包含指定位置,那么有以下三种情况: +// +// 1. pos小于第一个段的位置,返回-1 +// +// 2. pos大于等于最后一个段的结束位置,返回BuzyCount() - 1 +// +// 3. pos在段之间的空洞内,那么会返回小于pos的最后一个段 +// +// 注:2、3情况返回的结果是相同的 +func FirstContains[T Ranger](arr []T, pos int64) int { + low, high := 0, len(arr)-1 + + for low <= high { + mid := (low + high) / 2 + + if arr[mid].GetPosition() > pos { + high = mid - 1 + } else if arr[mid].GetPosition() < pos { + if mid == len(arr)-1 { + return mid + } + + low = mid + + } else { + return mid + } + } + + return high +} diff --git a/common/pkgs/downloader/downloader.go b/common/pkgs/downloader/downloader.go index 176234f..66712e0 100644 --- a/common/pkgs/downloader/downloader.go +++ b/common/pkgs/downloader/downloader.go @@ -93,6 +93,10 @@ func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator { return NewDownloadObjectIterator(d, req2s) } +func (d *Downloader) DownloadObjectByDetail(detail stgmod.ObjectDetail, off int64, length int64) (Downloading, error) { + +} + func (d *Downloader) DownloadPackage(pkgID cdssdk.PackageID) DownloadIterator { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil {