Browse Source

初步实现WriteAt和ReadAt的逻辑

gitlink
Sydonian 8 months ago
parent
commit
967477210d
6 changed files with 355 additions and 72 deletions
  1. +37
    -0
      client2/internal/mount/vfs/cache/data_loader.go
  2. +233
    -16
      client2/internal/mount/vfs/cache/file.go
  3. +42
    -55
      client2/internal/mount/vfs/cache/file_segment.go
  4. +1
    -1
      client2/internal/mount/vfs/cache/file_segment_test.go
  5. +38
    -0
      client2/internal/mount/vfs/cache/utils.go
  6. +4
    -0
      common/pkgs/downloader/downloader.go

+ 37
- 0
client2/internal/mount/vfs/cache/data_loader.go View File

@@ -0,0 +1,37 @@
package cache

import (
"io"
"os"

stgmod "gitlink.org.cn/cloudream/storage/common/models"
)

type LocalLoader struct {
cache *CacheFile
file *os.File
offset int64
// 当前正在加载的数据段。为nil表示没有加载任何数据。
loading *FileSegment
// 最多加载到哪个位置。-1代表无限制。
limit int64
}

// 启动数据加载。通过设置seg的Position和Length来指定要加载的数据范围。
func (l *LocalLoader) BeginLoading(seg *FileSegment) {

}

type RemoteLoader struct {
cache *CacheFile
object stgmod.ObjectDetail
reader io.ReadCloser
offset int64
// 当前正在加载的数据段。为nil表示没有加载任何数据。
loading *FileSegment
// 最多加载到哪个位置。-1代表无限制。
limit int64
}

func (l *RemoteLoader) BeginLoading(seg *FileSegment) {
}

+ 233
- 16
client2/internal/mount/vfs/cache/file.go View File

@@ -1,23 +1,50 @@
package cache

import (
"context"
"io"
"os"
"path/filepath"
"sync"
"time"

"gitlink.org.cn/cloudream/common/pkgs/future"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
)

type FileInfo struct {
Dirty bool // 本文件是否有未提交的修改
Segments []Range // 数据段列表,按照段开始位置从小到大排列
ObjectID cdssdk.ObjectID // 文件对应的对象ID,仅在文件是一个缓存文件时才有值
Hash string // 如果本文件完全是一个缓存文件,那么这个字段记录了其内容的哈希值,用于在下载缓存数据时,检查远端文件是否被修改过
Size int64 // 文件大小。如果是缓存文件,那么这个字段记录了其内容的总大小,用于判断文件是否完整。如果是本地文件,那么这个字段记录了其实际大小。
ModTime time.Time // 文件的最后修改时间
Perm os.FileMode // 文件的权限
// 文件总大小。可能会超过对应的远端文件的大小。
// 此大小可能与本地缓存文件大小也不用,需要定时将本地缓存文件大小修正到与这个值相同。
Size int64
// 本文件是否有未提交的修改
Dirty bool
// 数据段列表,按照段开始位置从小到大排列
Segments []Range
// 文件对应的对象ID,仅在文件是一个缓存文件时才有值
ObjectID cdssdk.ObjectID
// 文件对应的对象大小,仅在文件是一个缓存文件时才有值。
// 此值代表有多少数据应该从远端加载,所以可能会小于远端实际大小
ObjectSize int64
// 如果本文件完全是一个缓存文件,那么这个字段记录了其内容的哈希值,用于在下载缓存数据时,检查远端文件是否被修改过
Hash string
// 文件的最后修改时间
ModTime time.Time
// 文件的权限
Perm os.FileMode
}

// 返回值代表文件大小是否发生了变化
func (f *FileInfo) EnlargeTo(size int64) bool {
if size > f.Size {
f.Size = size
return true
}
return false
}

type Range struct {
@@ -25,16 +52,54 @@ type Range struct {
Length int64
}

func (r *Range) GetPosition() int64 {
return r.Position
}

func (r *Range) GetLength() int64 {
return r.Length
}

type ReadRequest struct {
Position int64
Length int64
Callback *future.SetVoidFuture
Zeros int64 // 如果>0,那么说明读到了一块全0的数据,值的大小代表这块数据的长度,此时Segment字段为nil
Segment *FileSegment
}

func (r *ReadRequest) GetPosition() int64 {
return r.Position
}

func (r *ReadRequest) GetLength() int64 {
return r.Length
}

type WriteReqeust struct {
Position int64
Length int64
Callback *future.SetVoidFuture
Segment *FileSegment
}

// 所有读写过程共用同一个CacheFile对象。
// 不应该将此结构体保存到对象中
type CacheFile struct {
pathComps []string
name string
info FileInfo
lock sync.RWMutex
segBuffer SegmentBuffer
readers int
writers int
objDownloader *downloader.Downloader
pathComps []string
name string
info FileInfo
lock sync.Mutex
segBuffer SegmentBuffer
readers int
writers int

localLoaders []*LocalLoader
remoteLoaders []*RemoteLoader
pendingReadings []*ReadRequest
pendingWritings []*WriteReqeust
managerChan chan any
}

func loadCacheFile(pathComps []string, infoPath string) (*CacheFile, error) {
@@ -109,9 +174,29 @@ func (f *CacheFile) Release() {

}

// 一个文件段的数据被写入到本地了
func (f *CacheFile) OnSaved(seg *FileSegment) {
// 一个文件段的数据被写入到本地了。err为nil表示成功,否则表示写入失败。
func (f *CacheFile) OnSaved(seg *FileSegment, err error) {

}

// 一个文件段的数据被加载到内存了。err为nil表示成功,否则表示加载失败。
func (f *CacheFile) OnLoaded(seg *FileSegment, err error) {

}

func (f *CacheFile) notifyManager() {
select {
case f.managerChan <- nil:
break
default:
}
}

func (f *CacheFile) managing() {
for {
<-f.managerChan

}
}

type CacheFileReadWriter struct {
@@ -120,11 +205,143 @@ type CacheFileReadWriter struct {
}

func (f *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) {
f.file.lock.Lock()

if off >= f.file.info.Size {
return 0, io.EOF
}

segIdx := f.file.segBuffer.FirstContains(off)

if segIdx >= 0 {
seg := f.file.segBuffer.Segment(segIdx)

// 读取的数据在当前段内
if off >= seg.Position && off < seg.Position+seg.Length {
readLen := math2.Min(int64(len(buf)), seg.Position+seg.Length-off)
seg.RefCount++
f.file.lock.Unlock()

copy(buf[:readLen], seg.SubSliceAbs(off, readLen))

f.file.lock.Lock()
seg.RefCount--
seg.Type = SegmentDirty
f.file.lock.Unlock()
return int(readLen), nil
}

// if off >= f.file.info.ObjectSize {
// readLen := int64(len(buf))

// if segIdx < f.file.segBuffer.BuzyCount()-1 {
// nextSeg := f.file.segBuffer.Segment(segIdx + 1)
// readLen = math2.Min(readLen, nextSeg.Position-off)
// } else {
// readLen = math2.Min(readLen, f.file.info.Size-off)
// }
// f.file.lock.Unlock()

// clear(buf[:readLen])
// return int(readLen), nil
// }
}

// 没有被缓存的数据,则需要通知加载器去加载

fut := future.NewSetVoid()
req := &ReadRequest{
Position: off,
Callback: fut,
}
insertIdx := FirstContains(f.file.pendingReadings, off)
f.file.pendingReadings = lo2.Insert(f.file.pendingReadings, insertIdx+1, req)
f.file.lock.Unlock()

f.file.notifyManager()

err := fut.Wait(context.Background())
if err != nil {
return 0, err
}

if req.Segment != nil {
// 这里不加锁,因为在引用计数大于0期间,Position不变,而Length虽然可能发生变化,但并发读写是未定义行为,所以不管读到多少数据都可以
readLen := math2.Min(int64(len(buf)), req.Segment.Position+req.Segment.Length-off)
copy(buf[:readLen], req.Segment.SubSliceAbs(off, readLen))

// bufferManager线程会在调用回调之前,给引用计数+1
// TODO 这种做法容易粗心产生遗漏,且不利于实现超时机制
f.file.lock.Lock()
req.Segment.RefCount--
f.file.lock.Unlock()
return int(readLen), nil
}

if req.Zeros > 0 {
clear(buf[:req.Zeros])
return int(req.Zeros), nil
}

return 0, io.EOF
}

func (f *CacheFileReadWriter) WriteAt(buf []byte, off int64) (int, error) {
if f.readOnly {
return 0, fuse.ErrPermission
}

f.file.lock.Lock()
// 如果找到一个包含off位置的段,那么就先写满这个段
segIdx := f.file.segBuffer.FirstContains(off)
if segIdx >= 0 {
seg := f.file.segBuffer.Segment(segIdx)

if off >= seg.Position && off < seg.Position+seg.Length {
writeLen := math2.Min(int64(len(buf)), seg.BufferEnd()-off)
seg.RefCount++
f.file.lock.Unlock()

// 不管此时是不是有其他线程在读取数据,因为我们约定并发读写就是未定义行为。
copy(seg.SubSliceAbs(off, writeLen), buf[:writeLen])

f.file.lock.Lock()
seg.RefCount--
seg.EnlargeTo(off + writeLen)
seg.Type = SegmentDirty
f.file.info.EnlargeTo(seg.AvailableEnd())
f.file.lock.Unlock()
f.file.notifyManager()
return int(writeLen), nil
}
}

fut := future.NewSetVoid()
req := &WriteReqeust{
Position: off,
Callback: fut,
}
f.file.pendingWritings = append(f.file.pendingWritings, req)
f.file.lock.Unlock()

err := fut.Wait(context.Background())
if err != nil {
return 0, err
}

// 一些说明参考ReadAt函数
// 由managing线程保证文件的同一个位置只会有一个段,因此这里不考虑在copy期间又有其他线程在写同一个段导致的产生新的重复段
writeLen := math2.Min(int64(len(buf)), req.Segment.BufferEnd()-off)
copy(req.Segment.SubSliceAbs(off, writeLen), buf[:writeLen])

f.file.lock.Lock()
req.Segment.RefCount--
req.Segment.EnlargeTo(off + writeLen)
req.Segment.Type = SegmentDirty
f.file.info.EnlargeTo(req.Segment.AvailableEnd())
f.file.lock.Unlock()
f.file.notifyManager()
return int(writeLen), nil
}

func (f *CacheFileReadWriter) Sync() error {


+ 42
- 55
client2/internal/mount/vfs/cache/file_segment.go View File

@@ -1,17 +1,17 @@
package cache

import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/common/utils/sync2"
)

type SegmentType int

const (
// 数据段刚被初始化
SegmentInit = iota
// 数据来自本地文件
SegmentLocal = iota
SegmentLocal
// 数据来自远端文件,还未写入到本地文件
SegmentRemote
// 数据由用户写入,还未写入到本地文件
@@ -28,16 +28,25 @@ type FileSegment struct {
// 当前段是否正在被保存到本地文件中
IsSaving bool
// 引用计数。当引用计数为0时,可以安全地删除此段
RefBuzyCount int
RefCount int
}

func (s *FileSegment) SubSliceAbs(pos int64) []byte {
return s.Buffer[pos-s.Position:]
func (s *FileSegment) GetPosition() int64 {
return s.Position
}

func (s *FileSegment) GetLength() int64 {
return s.Length
}

func (s *FileSegment) SubSliceAbs(pos int64, len int64) []byte {
start := pos - s.Position
return s.Buffer[start : start+len]
}

// 将当前段拆分为两个段。当前段将持有第一个段,返回值持有第二个段
func (s *FileSegment) SplitAbs(pos int64) *FileSegment {
s2 := s.SubSliceAbs(pos)
s2 := s.Buffer[pos-s.Position:]
s2Len := math2.Max(s.Position+s.Length-pos, 0)

s.Buffer = s.Buffer[:pos-s.Position]
@@ -51,52 +60,35 @@ func (s *FileSegment) SplitAbs(pos int64) *FileSegment {
}
}

func (s *FileSegment) AvailableEnd() int64 {
return s.Position + s.Length
}

func (s *FileSegment) BufferEnd() int64 {
return s.Position + int64(len(s.Buffer))
}

func (s *FileSegment) EnlargeTo(pos int64) {
if pos > s.Position+s.Length {
s.Length = pos - s.Position
}
}

type SegmentBuffer struct {
// 正在使用的文件段缓冲区
Buzys []*FileSegment
// 完全空闲的文件段缓冲区,可以作为新段使用
frees *sync2.BucketPool[*FileSegment]
buzys []*FileSegment
}

func (s *SegmentBuffer) BuzyCount() int {
return len(s.Buzys)
return len(s.buzys)
}

// 申请一个空闲段,如果暂时没有空闲段,则会阻塞等待
func (s *SegmentBuffer) AcquireFree() *FileSegment {
s.frees.GetEmpty()
func (s *SegmentBuffer) Segment(idx int) *FileSegment {
return s.buzys[idx]
}

// 查找第一个包含指定位置的段索引。如果所有段都不包含指定位置,那么有以下三种情况:
//
// 1. pos小于第一个段的位置,返回-1
//
// 2. pos大于等于最后一个段的结束位置,返回BuzyCount() - 1
//
// 3. pos在段之间的空洞内,那么会返回小于pos的最后一个段
//
// 注:2、3情况返回的结果是相同的
func (s *SegmentBuffer) FirstContains(pos int64) int {
low, high := 0, len(s.Buzys)-1

for low <= high {
mid := (low + high) / 2

if s.Buzys[mid].Position > pos {
high = mid - 1
} else if s.Buzys[mid].Position < pos {
if mid == s.BuzyCount()-1 {
return mid
}

low = mid

} else {
return mid
}
}

return high
return FirstContains(s.buzys, pos)
}

// 将指定段插入到段缓存的恰当位置
@@ -104,27 +96,22 @@ func (s *SegmentBuffer) Insert(seg *FileSegment) {
index := s.FirstContains(seg.Position)

if index == -1 {
s.Buzys = append([]*FileSegment{seg}, s.Buzys...)
s.buzys = append([]*FileSegment{seg}, s.buzys...)
} else {
// index是最后一个小于Position的位置,所以要加1
s.Buzys = lo2.Insert(s.Buzys, index+1, seg)
s.buzys = lo2.Insert(s.buzys, index+1, seg)
}
}

// 插入一个段到指定索引位置。不会检查插入后Segments是否依然保持有序。
func (s *SegmentBuffer) InsertAt(index int, seg *FileSegment) {
s.Buzys = lo2.Insert(s.Buzys, index, seg)
s.buzys = lo2.Insert(s.buzys, index, seg)
}

// 将指定位置的段从Buzys中移除,并放入Frees中
func (s *SegmentBuffer) FreeAt(index int) {
buf := s.Buzys[index]
s.Buzys = append(s.Buzys[:index], s.Buzys[index+1:]...)
s.Frees = append(s.Frees, buf)
func (s *SegmentBuffer) RemoveAt(index int) {
s.buzys = lo2.RemoveAt(s.buzys, index)
}

// 将指定段从Buzys中移除,并放入Frees中
func (s *SegmentBuffer) Free(seg *FileSegment) {
index := lo.IndexOf(s.Buzys, seg)
s.FreeAt(index)
func (s *SegmentBuffer) Remove(seg *FileSegment) {
s.buzys = lo2.Remove(s.buzys, seg)
}

+ 1
- 1
client2/internal/mount/vfs/cache/file_segment_test.go View File

@@ -16,7 +16,7 @@ func Test_FindSegmentIndex(t *testing.T) {
}{
{
buffer: SegmentBuffer{
Buzys: []*FileSegment{
buzys: []*FileSegment{
{Position: 0}, {Position: 10},
},
},


+ 38
- 0
client2/internal/mount/vfs/cache/utils.go View File

@@ -0,0 +1,38 @@
package cache

type Ranger interface {
GetPosition() int64
GetLength() int64
}

// 查找第一个包含指定位置的段索引。如果所有段都不包含指定位置,那么有以下三种情况:
//
// 1. pos小于第一个段的位置,返回-1
//
// 2. pos大于等于最后一个段的结束位置,返回BuzyCount() - 1
//
// 3. pos在段之间的空洞内,那么会返回小于pos的最后一个段
//
// 注:2、3情况返回的结果是相同的
func FirstContains[T Ranger](arr []T, pos int64) int {
low, high := 0, len(arr)-1

for low <= high {
mid := (low + high) / 2

if arr[mid].GetPosition() > pos {
high = mid - 1
} else if arr[mid].GetPosition() < pos {
if mid == len(arr)-1 {
return mid
}

low = mid

} else {
return mid
}
}

return high
}

+ 4
- 0
common/pkgs/downloader/downloader.go View File

@@ -93,6 +93,10 @@ func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator {
return NewDownloadObjectIterator(d, req2s)
}

func (d *Downloader) DownloadObjectByDetail(detail stgmod.ObjectDetail, off int64, length int64) (Downloading, error) {

}

func (d *Downloader) DownloadPackage(pkgID cdssdk.PackageID) DownloadIterator {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {


Loading…
Cancel
Save