Browse Source

将stgmgr添加到ExecContext中

gitlink
Sydonian 1 year ago
parent
commit
1dfb79190a
22 changed files with 129 additions and 79 deletions
  1. +13
    -5
      agent/internal/cmd/serve.go
  2. +1
    -3
      agent/internal/grpc/io.go
  3. +4
    -1
      agent/internal/grpc/service.go
  4. +1
    -2
      agent/internal/http/hub_io.go
  5. +7
    -2
      agent/internal/http/service.go
  6. +1
    -0
      agent/internal/task/create_package.go
  7. +4
    -1
      client/internal/task/task.go
  8. +1
    -0
      client/internal/task/upload_objects.go
  9. +5
    -2
      client/main.go
  10. +7
    -6
      common/pkgs/cmd/upload_objects.go
  11. +6
    -3
      common/pkgs/downloader/downloader.go
  12. +6
    -7
      common/pkgs/downloader/iterator.go
  13. +1
    -1
      common/pkgs/downloader/lrc.go
  14. +4
    -2
      common/pkgs/downloader/lrc_strip_iterator.go
  15. +4
    -3
      common/pkgs/downloader/strip_iterator.go
  16. +2
    -2
      common/pkgs/ioswitch2/ops2/shard_store.go
  17. +2
    -2
      common/pkgs/ioswitchlrc/ops2/shard_store.go
  18. +2
    -1
      common/pkgs/storage/local/shard_store.go
  19. +42
    -30
      scanner/internal/event/check_package_redundancy.go
  20. +5
    -4
      scanner/internal/event/clean_pinned.go
  21. +4
    -1
      scanner/internal/event/event.go
  22. +7
    -1
      scanner/main.go

+ 13
- 5
agent/internal/cmd/serve.go View File

@@ -47,8 +47,10 @@ func serve(configPath string) {
stgglb.InitMQPool(&config.Cfg().RabbitMQ)
stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{})

// 获取Hub配置
hubCfg := downloadHubConfig()

// 初始化存储服务管理器
stgMgr := mgr.NewManager()
for _, stg := range hubCfg.Storages {
err := stgMgr.InitStorage(stg)
@@ -58,9 +60,11 @@ func serve(configPath string) {
}
}

sw := exec.NewWorker()
// 初始化执行器
worker := exec.NewWorker()

httpSvr, err := http.NewServer(config.Cfg().ListenAddr, http.NewService(&sw))
// 初始化HTTP服务
httpSvr, err := http.NewServer(config.Cfg().ListenAddr, http.NewService(&worker, stgMgr))
if err != nil {
logger.Fatalf("new http server failed, err: %s", err.Error())
}
@@ -101,19 +105,23 @@ func serve(configPath string) {
})
conCol.CollectInPlace()

// 启动访问统计服务
acStat := accessstat.NewAccessStat(accessstat.Config{
// TODO 考虑放到配置里
ReportInterval: time.Second * 10,
})
go serveAccessStat(acStat)

// 初始化分布式锁服务
distlock, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil {
logger.Fatalf("new ipfs failed, err: %s", err.Error())
}

dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol)
// 初始化下载器
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgMgr)

// 初始化任务管理器
taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat, stgMgr)

// 启动命令服务器
@@ -127,14 +135,14 @@ func serve(configPath string) {
})
go serveAgentServer(agtSvr)

//面向客户端收发数据
// 启动GRPC服务
listenAddr := config.Cfg().GRPC.MakeListenAddress()
lis, err := net.Listen("tcp", listenAddr)
if err != nil {
logger.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error())
}
s := grpc.NewServer()
agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&sw))
agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&worker, stgMgr))
go serveGRPC(s, lis)

go serveDistLock(distlock)


+ 1
- 3
agent/internal/grpc/io.go View File

@@ -29,9 +29,7 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe
defer s.swWorker.Remove(sw)

execCtx := exec.NewWithContext(ctx)

// TODO2 注入依赖

exec.SetValueByType(execCtx, s.stgMgr)
_, err = sw.Run(execCtx)
if err != nil {
return nil, fmt.Errorf("running io plan: %w", err)


+ 4
- 1
agent/internal/grpc/service.go View File

@@ -3,15 +3,18 @@ package grpc
import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
)

type Service struct {
agentserver.AgentServer
swWorker *exec.Worker
stgMgr *mgr.Manager
}

func NewService(swWorker *exec.Worker) *Service {
func NewService(swWorker *exec.Worker, stgMgr *mgr.Manager) *Service {
return &Service{
swWorker: swWorker,
stgMgr: stgMgr,
}
}

+ 1
- 2
agent/internal/http/hub_io.go View File

@@ -168,8 +168,7 @@ func (s *IOService) ExecuteIOPlan(ctx *gin.Context) {
defer s.svc.swWorker.Remove(sw)

execCtx := exec.NewWithContext(ctx.Request.Context())

// TODO 注入依赖
exec.SetValueByType(execCtx, s.svc.stgMgr)
_, err = sw.Run(execCtx)
if err != nil {
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("executing plan: %v", err)))


+ 7
- 2
agent/internal/http/service.go View File

@@ -1,13 +1,18 @@
package http

import "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
)

type Service struct {
swWorker *exec.Worker
stgMgr *mgr.Manager
}

func NewService(swWorker *exec.Worker) *Service {
func NewService(swWorker *exec.Worker, stgMgr *mgr.Manager) *Service {
return &Service{
swWorker: swWorker,
stgMgr: stgMgr,
}
}

+ 1
- 0
agent/internal/task/create_package.go View File

@@ -87,6 +87,7 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c
uploadRet, err := cmd.NewUploadObjects(t.userID, createResp.Package.PackageID, t.objIter, t.nodeAffinity).Execute(&cmd.UploadObjectsContext{
Distlock: ctx.distlock,
Connectivity: ctx.connectivity,
StgMgr: ctx.stgMgr,
})
if err != nil {
err = fmt.Errorf("uploading objects: %w", err)


+ 4
- 1
client/internal/task/task.go View File

@@ -4,12 +4,14 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/distlock" // 引入分布式锁服务
"gitlink.org.cn/cloudream/common/pkgs/task" // 引入任务处理相关的包
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" // 引入网络连接状态收集器
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
)

// TaskContext 定义了任务执行的上下文环境,包含分布式锁服务和网络连接状态收集器
type TaskContext struct {
distlock *distlock.Service
connectivity *connectivity.Collector
stgMgr *mgr.Manager
}

// CompleteFn 类型定义了任务完成时的回调函数,用于设置任务的执行结果
@@ -29,9 +31,10 @@ type CompleteOption = task.CompleteOption

// NewManager 创建一个新的任务管理器实例,接受一个分布式锁服务和一个网络连接状态收集器作为参数
// 返回一个初始化好的任务管理器实例
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector) Manager {
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *mgr.Manager) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
connectivity: connectivity,
stgMgr: stgMgr,
})
}

+ 1
- 0
client/internal/task/upload_objects.go View File

@@ -41,6 +41,7 @@ func (t *UploadObjects) Execute(task *task.Task[TaskContext], ctx TaskContext, c
ret, err := t.cmd.Execute(&cmd.UploadObjectsContext{
Distlock: ctx.distlock, // 使用任务上下文中的分布式锁。
Connectivity: ctx.connectivity, // 使用任务上下文中的网络连接性信息。
StgMgr: ctx.stgMgr,
})

t.Result = ret // 存储上传结果。


+ 5
- 2
client/main.go View File

@@ -19,6 +19,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
)

func main() {
@@ -85,9 +86,11 @@ func main() {
})
go serveAccessStat(acStat)

taskMgr := task.NewManager(distlockSvc, &conCol)
stgMgr := mgr.NewManager()

dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol)
taskMgr := task.NewManager(distlockSvc, &conCol, stgMgr)

dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgMgr)

svc, err := services.NewService(distlockSvc, &taskMgr, &dlder, acStat)
if err != nil {


+ 7
- 6
common/pkgs/cmd/upload_objects.go View File

@@ -24,6 +24,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
)

type UploadObjects struct {
@@ -52,6 +53,7 @@ type UploadStorageInfo struct {
type UploadObjectsContext struct {
Distlock *distlock.Service
Connectivity *connectivity.Collector
StgMgr *mgr.Manager
}

func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *UploadObjects {
@@ -114,7 +116,7 @@ func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult
}
defer ipfsMutex.Unlock()

rets, err := uploadAndUpdatePackage(t.packageID, t.objectIter, userStgs, t.nodeAffinity)
rets, err := uploadAndUpdatePackage(ctx, t.packageID, t.objectIter, userStgs, t.nodeAffinity)
if err != nil {
return nil, err
}
@@ -147,7 +149,7 @@ func chooseUploadNode(nodes []UploadStorageInfo, nodeAffinity *cdssdk.NodeID) Up
return nodes[0]
}

func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, userNodes []UploadStorageInfo, nodeAffinity *cdssdk.NodeID) ([]ObjectUploadResult, error) {
func uploadAndUpdatePackage(ctx *UploadObjectsContext, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, userNodes []UploadStorageInfo, nodeAffinity *cdssdk.NodeID) ([]ObjectUploadResult, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -172,7 +174,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo
defer objInfo.File.Close()

uploadTime := time.Now()
fileHash, err := uploadFile(objInfo.File, uploadNode)
fileHash, err := uploadFile(ctx, objInfo.File, uploadNode)
if err != nil {
return fmt.Errorf("uploading file: %w", err)
}
@@ -213,7 +215,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo
return uploadRets, nil
}

func uploadFile(file io.Reader, uploadStg UploadStorageInfo) (cdssdk.FileHash, error) {
func uploadFile(ctx *UploadObjectsContext, file io.Reader, uploadStg UploadStorageInfo) (cdssdk.FileHash, error) {
ft := ioswitch2.NewFromTo()
fromExec, hd := ioswitch2.NewFromDriver(-1)
ft.AddFrom(fromExec).AddTo(ioswitch2.NewToShardStore(*uploadStg.Storage.MasterHub, uploadStg.Storage.Storage, -1, "fileHash"))
@@ -225,9 +227,8 @@ func uploadFile(file io.Reader, uploadStg UploadStorageInfo) (cdssdk.FileHash, e
return "", fmt.Errorf("parsing plan: %w", err)
}

// TODO2 注入依赖
exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, ctx.StgMgr)
exec := plans.Execute(exeCtx)
exec.BeginWrite(io.NopCloser(file), hd)
ret, err := exec.Wait(context.TODO())


+ 6
- 3
common/pkgs/downloader/downloader.go View File

@@ -11,6 +11,7 @@ import (
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
)

const (
@@ -38,11 +39,12 @@ type Downloading struct {

type Downloader struct {
strips *StripCache
conn *connectivity.Collector
cfg Config
conn *connectivity.Collector
stgMgr *mgr.Manager
}

func NewDownloader(cfg Config, conn *connectivity.Collector) Downloader {
func NewDownloader(cfg Config, conn *connectivity.Collector, stgMgr *mgr.Manager) Downloader {
if cfg.MaxStripCacheCount == 0 {
cfg.MaxStripCacheCount = DefaultMaxStripCacheCount
}
@@ -50,8 +52,9 @@ func NewDownloader(cfg Config, conn *connectivity.Collector) Downloader {
ch, _ := lru.New[ECStripKey, ObjectECStrip](cfg.MaxStripCacheCount)
return Downloader{
strips: ch,
conn: conn,
cfg: cfg,
conn: conn,
stgMgr: stgMgr,
}
}



+ 6
- 7
common/pkgs/downloader/iterator.go View File

@@ -205,7 +205,7 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj downloadReqeust2
return nil, fmt.Errorf("no storage has this object")
}

logger.Debugf("downloading object %v from storage %v(%v)", obj.Raw.ObjectID, stg)
logger.Debugf("downloading object %v from storage %v", obj.Raw.ObjectID, stg)
return iter.downloadFromStorage(stg, obj)
}

@@ -224,7 +224,7 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed
if i > 0 {
logStrs = append(logStrs, ", ")
}
logStrs = append(logStrs, fmt.Sprintf("%v@%v(%v)", b.Block.Index, b.Storage))
logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Block.Index, b.Storage))
}
logger.Debug(logStrs...)

@@ -237,7 +237,7 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed
}

firstStripIndex := readPos / ecRed.StripSize()
stripIter := NewStripIterator(req.Detail.Object, blocks, ecRed, firstStripIndex, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount)
stripIter := NewStripIterator(iter.downloader, req.Detail.Object, blocks, ecRed, firstStripIndex, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount)
defer stripIter.Close()

for totalReadLen > 0 {
@@ -274,7 +274,7 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed
return nil, fmt.Errorf("no enough blocks to reconstruct the object %v , want %d, get only %d", req.Raw.ObjectID, ecRed.K, len(blocks))
}

logger.Debugf("downloading ec object %v from storage %v(%v)", req.Raw.ObjectID, stg)
logger.Debugf("downloading ec object %v from storage %v", req.Raw.ObjectID, stg)
return iter.downloadFromStorage(stg, req)
}

@@ -395,7 +395,7 @@ func (iter *DownloadObjectIterator) downloadFromStorage(stg *stgmod.StorageDetai
len := req.Raw.Length
toExec.Range.Length = &len
}
// TODO FileHash应该是FileHash类型
ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *stg.MasterHub, stg.Storage, -1)).AddTo(toExec)
strHandle = handle

@@ -405,9 +405,8 @@ func (iter *DownloadObjectIterator) downloadFromStorage(stg *stgmod.StorageDetai
return nil, fmt.Errorf("parsing plan: %w", err)
}

// TODO2 注入依赖
exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, iter.downloader.stgMgr)
exec := plans.Execute(exeCtx)
go exec.Wait(context.TODO())



+ 1
- 1
common/pkgs/downloader/lrc.go View File

@@ -53,7 +53,7 @@ func (iter *DownloadObjectIterator) downloadLRCObject(req downloadReqeust2, red
}

firstStripIndex := readPos / int64(red.K) / int64(red.ChunkSize)
stripIter := NewLRCStripIterator(req.Detail.Object, blocks, red, firstStripIndex, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount)
stripIter := NewLRCStripIterator(iter.downloader, req.Detail.Object, blocks, red, firstStripIndex, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount)
defer stripIter.Close()

for totalReadLen > 0 {


+ 4
- 2
common/pkgs/downloader/lrc_strip_iterator.go View File

@@ -14,6 +14,7 @@ import (
)

type LRCStripIterator struct {
downloder *Downloader
object cdssdk.Object
blocks []downloadBlock
red *cdssdk.LRCRedundancy
@@ -25,12 +26,13 @@ type LRCStripIterator struct {
inited bool
}

func NewLRCStripIterator(object cdssdk.Object, blocks []downloadBlock, red *cdssdk.LRCRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *LRCStripIterator {
func NewLRCStripIterator(downloder *Downloader, object cdssdk.Object, blocks []downloadBlock, red *cdssdk.LRCRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *LRCStripIterator {
if maxPrefetch <= 0 {
maxPrefetch = 1
}

iter := &LRCStripIterator{
downloder: downloder,
object: object,
blocks: blocks,
red: red,
@@ -110,8 +112,8 @@ func (s *LRCStripIterator) downloading() {
return
}

// TODO2 注入依赖
exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, s.downloder.stgMgr)

exec := plans.Execute(exeCtx)



+ 4
- 3
common/pkgs/downloader/strip_iterator.go View File

@@ -25,6 +25,7 @@ type Strip struct {
}

type StripIterator struct {
downloader *Downloader
object cdssdk.Object
blocks []downloadBlock
red *cdssdk.ECRedundancy
@@ -45,12 +46,13 @@ type dataChanEntry struct {
Error error
}

func NewStripIterator(object cdssdk.Object, blocks []downloadBlock, red *cdssdk.ECRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *StripIterator {
func NewStripIterator(downloader *Downloader, object cdssdk.Object, blocks []downloadBlock, red *cdssdk.ECRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *StripIterator {
if maxPrefetch <= 0 {
maxPrefetch = 1
}

iter := &StripIterator{
downloader: downloader,
object: object,
blocks: blocks,
red: red,
@@ -210,9 +212,8 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) {
return 0, err
}

// TODo2 注入依赖
exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, s.downloader.stgMgr)
exec := plans.Execute(exeCtx)

ctx, cancel := context.WithCancel(context.Background())


+ 2
- 2
common/pkgs/ioswitch2/ops2/shard_store.go View File

@@ -40,7 +40,7 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("reading from shard store")
defer logger.Debugf("reading from shard store finished")

stgMgr, err := exec.ValueByType[*mgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}
@@ -82,7 +82,7 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("writting file to shard store")
defer logger.Debugf("write to shard store finished")

stgMgr, err := exec.ValueByType[*mgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}


+ 2
- 2
common/pkgs/ioswitchlrc/ops2/shard_store.go View File

@@ -40,7 +40,7 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("reading from shard store")
defer logger.Debugf("reading from shard store finished")

stgMgr, err := exec.ValueByType[*mgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}
@@ -82,7 +82,7 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("writting file to shard store")
defer logger.Debugf("write to shard store finished")

stgMgr, err := exec.ValueByType[*mgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}


+ 2
- 1
common/pkgs/storage/local/shard_store.go View File

@@ -36,10 +36,11 @@ func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStor
}

func (s *ShardStore) Start(ch *types.StorageEventChan) {
logger.Infof("local shard store start, root: %v, max size: %v", s.cfg.Root, s.cfg.MaxSize)
}

func (s *ShardStore) Stop() {
logger.Infof("local shard store stop")
}

func (s *ShardStore) New() types.ShardWriter {


+ 42
- 30
scanner/internal/event/check_package_redundancy.go View File

@@ -147,43 +147,43 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
switch newRed := newRed.(type) {
case *cdssdk.RepRedundancy:
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep")
updating, err = t.noneToRep(obj, newRed, newRepStgs)
updating, err = t.noneToRep(execCtx, obj, newRed, newRepStgs)

case *cdssdk.ECRedundancy:
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec")
updating, err = t.noneToEC(obj, newRed, newECStgs)
updating, err = t.noneToEC(execCtx, obj, newRed, newECStgs)

case *cdssdk.LRCRedundancy:
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> lrc")
updating, err = t.noneToLRC(obj, newRed, selectedNodes)
updating, err = t.noneToLRC(execCtx, obj, newRed, selectedNodes)
}

case *cdssdk.RepRedundancy:
switch newRed := newRed.(type) {
case *cdssdk.RepRedundancy:
updating, err = t.repToRep(obj, srcRed, rechoosedRepStgs)
updating, err = t.repToRep(execCtx, obj, srcRed, rechoosedRepStgs)

case *cdssdk.ECRedundancy:
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec")
updating, err = t.repToEC(obj, newRed, newECStgs)
updating, err = t.repToEC(execCtx, obj, newRed, newECStgs)
}

case *cdssdk.ECRedundancy:
switch newRed := newRed.(type) {
case *cdssdk.RepRedundancy:
log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep")
updating, err = t.ecToRep(obj, srcRed, newRed, newRepStgs)
updating, err = t.ecToRep(execCtx, obj, srcRed, newRed, newRepStgs)

case *cdssdk.ECRedundancy:
uploadNodes := t.rechooseNodesForEC(obj, srcRed, userAllStorages)
updating, err = t.ecToEC(obj, srcRed, newRed, uploadNodes)
updating, err = t.ecToEC(execCtx, obj, srcRed, newRed, uploadNodes)
}

case *cdssdk.LRCRedundancy:
switch newRed := newRed.(type) {
case *cdssdk.LRCRedundancy:
uploadNodes := t.rechooseNodesForLRC(obj, srcRed, userAllStorages)
updating, err = t.lrcToLRC(obj, srcRed, newRed, uploadNodes)
updating, err = t.lrcToLRC(execCtx, obj, srcRed, newRed, uploadNodes)
}
}

@@ -426,7 +426,7 @@ func (t *CheckPackageRedundancy) chooseSoManyNodes(count int, stgs []*StorageLoa
return chosen
}

func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
if len(obj.Blocks) == 0 {
return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep")
}
@@ -465,7 +465,9 @@ func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.
}

// TODO 添加依赖
ret, err := plans.Execute(exec.NewExecContext()).Wait(context.Background())
execCtx := exec.NewExecContext()
exec.SetValueByType(execCtx, ctx.Args.StgMgr)
ret, err := plans.Execute(execCtx).Wait(context.Background())
if err != nil {
return nil, fmt.Errorf("executing io plan: %w", err)
}
@@ -487,7 +489,7 @@ func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.
}, nil
}

func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -521,8 +523,9 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E
return nil, fmt.Errorf("parsing plan: %w", err)
}

// TODO 添加依赖
ioRet, err := plans.Execute(exec.NewExecContext()).Wait(context.TODO())
execCtx := exec.NewExecContext()
exec.SetValueByType(execCtx, ctx.Args.StgMgr)
ioRet, err := plans.Execute(execCtx).Wait(context.Background())
if err != nil {
return nil, fmt.Errorf("executing io plan: %w", err)
}
@@ -544,7 +547,7 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E
}, nil
}

func (t *CheckPackageRedundancy) noneToLRC(obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
func (t *CheckPackageRedundancy) noneToLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -577,8 +580,9 @@ func (t *CheckPackageRedundancy) noneToLRC(obj stgmod.ObjectDetail, red *cdssdk.
return nil, fmt.Errorf("parsing plan: %w", err)
}

// TODO 添加依赖
ioRet, err := plans.Execute(exec.NewExecContext()).Wait(context.TODO())
execCtx := exec.NewExecContext()
exec.SetValueByType(execCtx, ctx.Args.StgMgr)
ioRet, err := plans.Execute(execCtx).Wait(context.Background())
if err != nil {
return nil, fmt.Errorf("executing io plan: %w", err)
}
@@ -600,7 +604,7 @@ func (t *CheckPackageRedundancy) noneToLRC(obj stgmod.ObjectDetail, red *cdssdk.
}, nil
}

func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
if len(obj.Blocks) == 0 {
return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep")
}
@@ -639,7 +643,9 @@ func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.R
}

// TODO 添加依赖
ret, err := plans.Execute(exec.NewExecContext()).Wait(context.Background())
execCtx := exec.NewExecContext()
exec.SetValueByType(execCtx, ctx.Args.StgMgr)
ret, err := plans.Execute(execCtx).Wait(context.Background())
if err != nil {
return nil, fmt.Errorf("executing io plan: %w", err)
}
@@ -661,11 +667,11 @@ func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.R
}, nil
}

func (t *CheckPackageRedundancy) repToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
return t.noneToEC(obj, red, uploadNodes)
func (t *CheckPackageRedundancy) repToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
return t.noneToEC(ctx, obj, red, uploadNodes)
}

func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -715,7 +721,9 @@ func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk
}

// TODO 添加依赖
ioRet, err := planBlder.Execute(exec.NewExecContext()).Wait(context.TODO())
execCtx := exec.NewExecContext()
exec.SetValueByType(execCtx, ctx.Args.StgMgr)
ioRet, err := planBlder.Execute(execCtx).Wait(context.Background())
if err != nil {
return nil, fmt.Errorf("executing io plan: %w", err)
}
@@ -737,7 +745,7 @@ func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk
}, nil
}

func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -805,8 +813,9 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.
}

// 如果没有任何Plan,Wait会直接返回成功
// TODO 添加依赖
ret, err := planBlder.Execute(exec.NewExecContext()).Wait(context.TODO())
execCtx := exec.NewExecContext()
exec.SetValueByType(execCtx, ctx.Args.StgMgr)
ret, err := planBlder.Execute(execCtx).Wait(context.Background())
if err != nil {
return nil, fmt.Errorf("executing io plan: %w", err)
}
@@ -831,7 +840,7 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.
}, nil
}

func (t *CheckPackageRedundancy) lrcToLRC(obj stgmod.ObjectDetail, srcRed *cdssdk.LRCRedundancy, tarRed *cdssdk.LRCRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
func (t *CheckPackageRedundancy) lrcToLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.LRCRedundancy, tarRed *cdssdk.LRCRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -871,10 +880,12 @@ func (t *CheckPackageRedundancy) lrcToLRC(obj stgmod.ObjectDetail, srcRed *cdssd
// return t.groupReconstructLRC(obj, lostBlocks, lostBlockGrps, blocksGrpByIndex, srcRed, uploadNodes)
}

return t.reconstructLRC(obj, blocksGrpByIndex, srcRed, uploadNodes)
return t.reconstructLRC(ctx, obj, blocksGrpByIndex, srcRed, uploadNodes)
}

/*
TODO2 修复这一块的代码

func (t *CheckPackageRedundancy) groupReconstructLRC(obj stgmod.ObjectDetail, lostBlocks []int, lostBlockGrps []int, grpedBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
grped := make(map[int]stgmod.GrouppedObjectBlock)
for _, b := range grpedBlocks {
@@ -938,7 +949,7 @@ func (t *CheckPackageRedundancy) lrcToLRC(obj stgmod.ObjectDetail, srcRed *cdssd
}, nil
}
*/
func (t *CheckPackageRedundancy) reconstructLRC(obj stgmod.ObjectDetail, grpBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
func (t *CheckPackageRedundancy) reconstructLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, grpBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
var chosenBlocks []stgmod.GrouppedObjectBlock
for _, block := range grpBlocks {
if len(block.StorageIDs) > 0 && block.Index < red.M() {
@@ -1001,8 +1012,9 @@ func (t *CheckPackageRedundancy) reconstructLRC(obj stgmod.ObjectDetail, grpBloc
fmt.Printf("plans: %v\n", planBlder)

// 如果没有任何Plan,Wait会直接返回成功
// TODO 添加依赖
ret, err := planBlder.Execute(exec.NewExecContext()).Wait(context.TODO())
execCtx := exec.NewExecContext()
exec.SetValueByType(execCtx, ctx.Args.StgMgr)
ret, err := planBlder.Execute(execCtx).Wait(context.Background())
if err != nil {
return nil, fmt.Errorf("executing io plan: %w", err)
}


+ 5
- 4
scanner/internal/event/clean_pinned.go View File

@@ -817,13 +817,13 @@ func (t *CleanPinned) makePlansForECObject(allStgInfos map[cdssdk.StorageID]*stg
return entry
}

func (t *CleanPinned) executePlans(execCtx ExecuteContext, planBld *exec.PlanBuilder, planningStgIDs map[cdssdk.StorageID]bool) (map[string]exec.VarValue, error) {
func (t *CleanPinned) executePlans(ctx ExecuteContext, planBld *exec.PlanBuilder, planningStgIDs map[cdssdk.StorageID]bool) (map[string]exec.VarValue, error) {
// 统一加锁,有重复也没关系
lockBld := reqbuilder.NewBuilder()
for id := range planningStgIDs {
lockBld.Shard().Buzy(id)
}
lock, err := lockBld.MutexLock(execCtx.Args.DistLock)
lock, err := lockBld.MutexLock(ctx.Args.DistLock)
if err != nil {
return nil, fmt.Errorf("acquiring distlock: %w", err)
}
@@ -838,8 +838,9 @@ func (t *CleanPinned) executePlans(execCtx ExecuteContext, planBld *exec.PlanBui
go func() {
defer wg.Done()

// TODO 添加依赖
ret, err := planBld.Execute(exec.NewExecContext()).Wait(context.TODO())
execCtx := exec.NewExecContext()
exec.SetValueByType(execCtx, ctx.Args.StgMgr)
ret, err := planBld.Execute(execCtx).Wait(context.TODO())
if err != nil {
ioSwErr = fmt.Errorf("executing io switch plan: %w", err)
return


+ 4
- 1
scanner/internal/event/event.go View File

@@ -9,11 +9,13 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/typedispatcher"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
)

type ExecuteArgs struct {
DB *db2.DB
DistLock *distlock.Service
StgMgr *mgr.Manager
}

type Executor = event.Executor[ExecuteArgs]
@@ -24,10 +26,11 @@ type Event = event.Event[ExecuteArgs]

type ExecuteOption = event.ExecuteOption

func NewExecutor(db *db2.DB, distLock *distlock.Service) Executor {
func NewExecutor(db *db2.DB, distLock *distlock.Service, stgMgr *mgr.Manager) Executor {
return event.NewExecutor(ExecuteArgs{
DB: db,
DistLock: distLock,
StgMgr: stgMgr,
})
}



+ 7
- 1
scanner/main.go View File

@@ -10,6 +10,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/scanner/internal/config"
"gitlink.org.cn/cloudream/storage/scanner/internal/event"
"gitlink.org.cn/cloudream/storage/scanner/internal/mq"
@@ -38,6 +39,7 @@ func main() {

stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{})

// 启动分布式锁服务
distlockSvc, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil {
logger.Warnf("new distlock service failed, err: %s", err.Error())
@@ -45,7 +47,11 @@ func main() {
}
go serveDistLock(distlockSvc)

eventExecutor := event.NewExecutor(db, distlockSvc)
// 启动存储服务管理器
stgMgr := mgr.NewManager()

// 启动事件执行器
eventExecutor := event.NewExecutor(db, distlockSvc, stgMgr)
go serveEventExecutor(&eventExecutor)

agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), &config.Cfg().RabbitMQ)


Loading…
Cancel
Save