Browse Source

加载Package时将使用到的Block缓存到IPFS

gitlink
Sydonian 1 year ago
parent
commit
d027969bf7
9 changed files with 308 additions and 158 deletions
  1. +3
    -27
      agent/internal/mq/storage.go
  2. +210
    -9
      agent/internal/task/storage_load_package.go
  3. +2
    -2
      client/internal/cmdline/storage.go
  4. +2
    -2
      client/internal/http/storage.go
  5. +46
    -10
      client/internal/services/storage.go
  6. +0
    -100
      client/internal/task/storage_load_package.go
  7. +9
    -1
      common/pkgs/db/object_block.go
  8. +10
    -7
      common/pkgs/mq/coordinator/storage.go
  9. +26
    -0
      coordinator/internal/mq/storage.go

+ 3
- 27
agent/internal/mq/storage.go View File

@@ -24,31 +24,7 @@ import (
) )


func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) (*agtmq.StartStorageLoadPackageResp, *mq.CodeMessage) { func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) (*agtmq.StartStorageLoadPackageResp, *mq.CodeMessage) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
logger.Warnf("new coordinator client: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed")
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
if err != nil {
logger.WithField("StorageID", msg.StorageID).
Warnf("getting storage info: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed")
}

outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, msg.UserID, msg.PackageID)
if err = os.MkdirAll(outputDirPath, 0755); err != nil {
logger.WithField("StorageID", msg.StorageID).
Warnf("creating output directory: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed")
}

tsk := svc.taskManager.StartNew(mytask.NewStorageLoadPackage(msg.UserID, msg.PackageID, outputDirPath))
tsk := svc.taskManager.StartNew(mytask.NewStorageLoadPackage(msg.UserID, msg.PackageID, msg.StorageID))
return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID())) return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID()))
} }


@@ -70,7 +46,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*


loadTsk := tsk.Body().(*mytask.StorageLoadPackage) loadTsk := tsk.Body().(*mytask.StorageLoadPackage)


return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullOutputPath))


} else { } else {
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
@@ -82,7 +58,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*


loadTsk := tsk.Body().(*mytask.StorageLoadPackage) loadTsk := tsk.Body().(*mytask.StorageLoadPackage)


return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullOutputPath))
} }


return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "", "")) return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "", ""))


+ 210
- 9
agent/internal/task/storage_load_package.go View File

@@ -1,30 +1,231 @@
package task package task


import ( import (
"fmt"
"io"
"os"
"path/filepath"
"time" "time"


"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/common/pkgs/task"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
myio "gitlink.org.cn/cloudream/common/utils/io"
myref "gitlink.org.cn/cloudream/common/utils/reflect"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage/common/utils"
) )


type StorageLoadPackage struct { type StorageLoadPackage struct {
cmd *cmd.DownloadPackage
FullPath string
FullOutputPath string

userID cdssdk.UserID
packageID cdssdk.PackageID
storageID cdssdk.StorageID
pinnedBlocks []stgmod.ObjectBlock
} }


func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, outputPath string) *StorageLoadPackage {
func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *StorageLoadPackage {
return &StorageLoadPackage{ return &StorageLoadPackage{
cmd: cmd.NewDownloadPackage(userID, packageID, outputPath),
FullPath: outputPath,
userID: userID,
packageID: packageID,
storageID: storageID,
} }
} }
func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
err := t.cmd.Execute(&cmd.DownloadPackageContext{
Distlock: ctx.distlock,
})
err := t.do(task, ctx)


complete(err, CompleteOption{ complete(err, CompleteOption{
RemovingDelay: time.Minute, RemovingDelay: time.Minute,
}) })
} }

func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) error {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return fmt.Errorf("new IPFS client: %w", err)
}
defer stgglb.IPFSPool.Release(ipfsCli)

getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID))
if err != nil {
return fmt.Errorf("request to coordinator: %w", err)
}

outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, t.userID, t.packageID)
if err = os.MkdirAll(outputDirPath, 0755); err != nil {
return fmt.Errorf("creating output directory: %w", err)
}
t.FullOutputPath = outputDirPath

getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.packageID))
if err != nil {
return fmt.Errorf("getting package object details: %w", err)
}

mutex, err := reqbuilder.NewBuilder().
// 提前占位
Metadata().StoragePackage().CreateOne(t.userID, t.storageID, t.packageID).
// 保护在storage目录中下载的文件
Storage().Buzy(t.storageID).
// 保护下载文件时同时保存到IPFS的文件
IPFS().Buzy(getStgResp.NodeID).
MutexLock(ctx.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

for _, obj := range getObjectDetails.Objects {
err := t.downloadOne(ipfsCli, outputDirPath, obj)
if err != nil {
return err
}
}

_, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID, t.pinnedBlocks))
if err != nil {
return fmt.Errorf("loading package to storage: %w", err)
}

return err
}

func (t *StorageLoadPackage) downloadOne(ipfsCli *ipfs.PoolClient, dir string, obj stgmod.ObjectDetail) error {
var file io.ReadCloser

switch red := obj.Object.Redundancy.(type) {
case *cdssdk.NoneRedundancy:
reader, err := t.downloadNoneOrRepObject(ipfsCli, obj)
if err != nil {
return fmt.Errorf("downloading object: %w", err)
}
file = reader

case *cdssdk.RepRedundancy:
reader, err := t.downloadNoneOrRepObject(ipfsCli, obj)
if err != nil {
return fmt.Errorf("downloading rep object: %w", err)
}
file = reader

case *cdssdk.ECRedundancy:
reader, pinnedBlocks, err := t.downloadECObject(ipfsCli, obj, red)
if err != nil {
return fmt.Errorf("downloading ec object: %w", err)
}
file = reader
t.pinnedBlocks = append(t.pinnedBlocks, pinnedBlocks...)

default:
return fmt.Errorf("unknow redundancy type: %v", myref.TypeOfValue(obj.Object.Redundancy))
}
defer file.Close()

fullPath := filepath.Join(dir, obj.Object.Path)

lastDirPath := filepath.Dir(fullPath)
if err := os.MkdirAll(lastDirPath, 0755); err != nil {
return fmt.Errorf("creating object last dir: %w", err)
}

outputFile, err := os.Create(fullPath)
if err != nil {
return fmt.Errorf("creating object file: %w", err)
}
defer outputFile.Close()

if _, err := io.Copy(outputFile, file); err != nil {
return fmt.Errorf("writting object to file: %w", err)
}

return nil
}

func (t *StorageLoadPackage) downloadNoneOrRepObject(ipfsCli *ipfs.PoolClient, obj stgmod.ObjectDetail) (io.ReadCloser, error) {
if len(obj.Blocks) == 0 {
return nil, fmt.Errorf("no node has this object")
}

// 异步pin,不管实际有没有成功
go func() {
ipfsCli.Pin(obj.Object.FileHash)
}()

file, err := ipfsCli.OpenRead(obj.Object.FileHash)
if err != nil {
return nil, err
}

return file, nil
}

func (t *StorageLoadPackage) downloadECObject(ipfsCli *ipfs.PoolClient, obj stgmod.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, []stgmod.ObjectBlock, error) {
var chosenBlocks []stgmod.GrouppedObjectBlock
grpBlocks := obj.GroupBlocks()
for i := range grpBlocks {
if len(chosenBlocks) == ecRed.K {
break
}

chosenBlocks = append(chosenBlocks, grpBlocks[i])
}

if len(chosenBlocks) < ecRed.K {
return nil, nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(chosenBlocks))
}

var fileStrs []io.ReadCloser

rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
if err != nil {
return nil, nil, fmt.Errorf("new rs: %w", err)
}

for i := range chosenBlocks {
// 异步pin,不管实际有没有成功
go func() {
ipfsCli.Pin(chosenBlocks[i].FileHash)
}()

str, err := ipfsCli.OpenRead(chosenBlocks[i].FileHash)
if err != nil {
for i -= 1; i >= 0; i-- {
fileStrs[i].Close()
}
return nil, nil, fmt.Errorf("donwloading file: %w", err)
}

fileStrs = append(fileStrs, str)
}

fileReaders, filesCloser := myio.ToReaders(fileStrs)

var indexes []int
var pinnedBlocks []stgmod.ObjectBlock
for _, b := range chosenBlocks {
indexes = append(indexes, b.Index)
pinnedBlocks = append(pinnedBlocks, stgmod.ObjectBlock{
ObjectID: b.ObjectID,
Index: b.Index,
NodeID: *stgglb.Local.NodeID,
FileHash: b.FileHash,
})
}

outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes))
return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) {
filesCloser()
outputsCloser()
}), pinnedBlocks, nil
}

+ 2
- 2
client/internal/cmdline/storage.go View File

@@ -8,13 +8,13 @@ import (
) )


func StorageLoadPackage(ctx CommandContext, packageID cdssdk.PackageID, storageID cdssdk.StorageID) error { func StorageLoadPackage(ctx CommandContext, packageID cdssdk.PackageID, storageID cdssdk.StorageID) error {
taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(0, packageID, storageID)
nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(0, packageID, storageID)
if err != nil { if err != nil {
return fmt.Errorf("start loading package to storage: %w", err) return fmt.Errorf("start loading package to storage: %w", err)
} }


for { for {
complete, fullPath, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
complete, fullPath, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(nodeID, taskID, time.Second*10)
if complete { if complete {
if err != nil { if err != nil {
return fmt.Errorf("moving complete with: %w", err) return fmt.Errorf("moving complete with: %w", err)


+ 2
- 2
client/internal/http/storage.go View File

@@ -40,7 +40,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) {
return return
} }


taskID, err := s.svc.StorageSvc().StartStorageLoadPackage(*req.UserID, *req.PackageID, *req.StorageID)
nodeID, taskID, err := s.svc.StorageSvc().StartStorageLoadPackage(*req.UserID, *req.PackageID, *req.StorageID)
if err != nil { if err != nil {
log.Warnf("start storage load package: %s", err.Error()) log.Warnf("start storage load package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage load package failed")) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage load package failed"))
@@ -48,7 +48,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) {
} }


for { for {
complete, fullPath, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
complete, fullPath, err := s.svc.StorageSvc().WaitStorageLoadPackage(nodeID, taskID, time.Second*10)
if complete { if complete {
if err != nil { if err != nil {
log.Warnf("loading complete with: %s", err.Error()) log.Warnf("loading complete with: %s", err.Error())


+ 46
- 10
client/internal/services/storage.go View File

@@ -6,7 +6,6 @@ import (


cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"


"gitlink.org.cn/cloudream/storage/client/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
@@ -21,18 +20,55 @@ func (svc *Service) StorageSvc() *StorageService {
return &StorageService{Service: svc} return &StorageService{Service: svc}
} }


func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) (string, error) {
tsk := svc.TaskMgr.StartNew(task.NewStorageLoadPackage(userID, packageID, storageID))
return tsk.ID(), nil
func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) (cdssdk.NodeID, string, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return 0, "", fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

stgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID))
if err != nil {
return 0, "", fmt.Errorf("getting storage info: %w", err)
}

agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.NodeID)
if err != nil {
return 0, "", fmt.Errorf("new agent client: %w", err)
}
defer stgglb.AgentMQPool.Release(agentCli)

startResp, err := agentCli.StartStorageLoadPackage(agtmq.NewStartStorageLoadPackage(userID, packageID, storageID))
if err != nil {
return 0, "", fmt.Errorf("start storage load package: %w", err)
}

return stgResp.NodeID, startResp.TaskID, nil
} }


func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, string, error) {
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
loadTsk := tsk.Body().(*task.StorageLoadPackage)
return true, loadTsk.ResultFullPath, tsk.Error()
func (svc *StorageService) WaitStorageLoadPackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, string, error) {
agentCli, err := stgglb.AgentMQPool.Acquire(nodeID)
if err != nil {
// TODO 失败是否要当做任务已经结束?
return true, "", fmt.Errorf("new agent client: %w", err)
}
defer stgglb.AgentMQPool.Release(agentCli)

waitResp, err := agentCli.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(taskID, waitTimeout.Milliseconds()))
if err != nil {
// TODO 请求失败是否要当做任务已经结束?
return true, "", fmt.Errorf("wait storage load package: %w", err)
}

if !waitResp.IsComplete {
return false, "", nil
} }
return false, "", nil

if waitResp.Error != "" {
return true, "", fmt.Errorf("%s", waitResp.Error)
}

return true, waitResp.FullPath, nil
} }


func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error { func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error {


+ 0
- 100
client/internal/task/storage_load_package.go View File

@@ -1,100 +0,0 @@
package task

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/pkgs/task"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

// TODO 可以考虑不用Task来实现这些逻辑
type StorageLoadPackage struct {
userID cdssdk.UserID
packageID cdssdk.PackageID
storageID cdssdk.StorageID

ResultFullPath string
}

func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *StorageLoadPackage {
return &StorageLoadPackage{
userID: userID,
packageID: packageID,
storageID: storageID,
}
}

func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *StorageLoadPackage) do(ctx TaskContext) error {
mutex, err := reqbuilder.NewBuilder().
// 提前占位
Metadata().StoragePackage().CreateOne(t.userID, t.storageID, t.packageID).
// 保护在storage目录中下载的文件
Storage().Buzy(t.storageID).
MutexLock(ctx.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID))
if err != nil {
return fmt.Errorf("getting storage info: %w", err)
}

// 然后向代理端发送移动文件的请求
agentCli, err := stgglb.AgentMQPool.Acquire(getStgResp.NodeID)
if err != nil {
return fmt.Errorf("create agent client to %d failed, err: %w", getStgResp.NodeID, err)
}
defer stgglb.AgentMQPool.Release(agentCli)

agentMoveResp, err := agentCli.StartStorageLoadPackage(
agtmq.NewStartStorageLoadPackage(
t.userID,
t.packageID,
t.storageID,
))
if err != nil {
return fmt.Errorf("start loading package to storage: %w", err)
}

for {
waitResp, err := agentCli.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(agentMoveResp.TaskID, int64(time.Second)*5))
if err != nil {
return fmt.Errorf("wait loading package: %w", err)
}

if waitResp.IsComplete {
if waitResp.Error != "" {
return fmt.Errorf("agent loading package: %s", waitResp.Error)
}

t.ResultFullPath = waitResp.FullPath
break
}
}

_, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID))
if err != nil {
return fmt.Errorf("loading package to storage: %w", err)
}
return nil
}

+ 9
- 1
common/pkgs/db/object_block.go View File

@@ -31,6 +31,14 @@ func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index
return err return err
} }


func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock) error {
_, err := sqlx.NamedExec(ctx,
"insert ignore into ObjectBlock(ObjectID, `Index`, NodeID, FileHash) values(:ObjectID, :Index, :NodeID, :FileHash)",
blocks,
)
return err
}

func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error { func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error {
_, err := ctx.Exec("delete from ObjectBlock where ObjectID = ?", objectID) _, err := ctx.Exec("delete from ObjectBlock where ObjectID = ?", objectID)
return err return err
@@ -78,7 +86,7 @@ func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk
var blocks []stgmod.ObjectBlock var blocks []stgmod.ObjectBlock
err = sqlx.Select(ctx, err = sqlx.Select(ctx,
&blocks, &blocks,
"select * from ObjectBlock where ObjectID = ? order by Index",
"select * from ObjectBlock where ObjectID = ? order by `Index`",
obj.ObjectID, obj.ObjectID,
) )
if err != nil { if err != nil {


+ 10
- 7
common/pkgs/mq/coordinator/storage.go View File

@@ -5,6 +5,7 @@ import (


"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
) )


@@ -55,19 +56,21 @@ var _ = Register(Service.StoragePackageLoaded)


type StoragePackageLoaded struct { type StoragePackageLoaded struct {
mq.MessageBodyBase mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
StorageID cdssdk.StorageID `json:"storageID"`
PackageID cdssdk.PackageID `json:"packageID"`
UserID cdssdk.UserID `json:"userID"`
StorageID cdssdk.StorageID `json:"storageID"`
PackageID cdssdk.PackageID `json:"packageID"`
PinnedBlocks []stgmod.ObjectBlock `json:"pinnedBlocks"`
} }
type StoragePackageLoadedResp struct { type StoragePackageLoadedResp struct {
mq.MessageBodyBase mq.MessageBodyBase
} }


func NewStoragePackageLoaded(userID cdssdk.UserID, stgID cdssdk.StorageID, packageID cdssdk.PackageID) *StoragePackageLoaded {
func NewStoragePackageLoaded(userID cdssdk.UserID, stgID cdssdk.StorageID, packageID cdssdk.PackageID, pinnedBlocks []stgmod.ObjectBlock) *StoragePackageLoaded {
return &StoragePackageLoaded{ return &StoragePackageLoaded{
UserID: userID,
PackageID: packageID,
StorageID: stgID,
UserID: userID,
PackageID: packageID,
StorageID: stgID,
PinnedBlocks: pinnedBlocks,
} }
} }
func NewStoragePackageLoadedResp() *StoragePackageLoadedResp { func NewStoragePackageLoadedResp() *StoragePackageLoadedResp {


+ 26
- 0
coordinator/internal/mq/storage.go View File

@@ -25,6 +25,15 @@ func (svc *Service) GetStorageInfo(msg *coormq.GetStorageInfo) (*coormq.GetStora


func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) { func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) {
err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
// 可以不用检查用户是否存在
if ok, _ := svc.db.Package().IsAvailable(tx, msg.UserID, msg.PackageID); !ok {
return fmt.Errorf("package is not available to user")
}

if ok, _ := svc.db.Storage().IsAvailable(tx, msg.UserID, msg.StorageID); !ok {
return fmt.Errorf("storage is not available to user")
}

err := svc.db.StoragePackage().Create(tx, msg.StorageID, msg.PackageID, msg.UserID) err := svc.db.StoragePackage().Create(tx, msg.StorageID, msg.PackageID, msg.UserID)
if err != nil { if err != nil {
return fmt.Errorf("creating storage package: %w", err) return fmt.Errorf("creating storage package: %w", err)
@@ -35,6 +44,23 @@ func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coo
return fmt.Errorf("creating storage package log: %w", err) return fmt.Errorf("creating storage package log: %w", err)
} }


stg, err := svc.db.Storage().GetByID(tx, msg.StorageID)
if err != nil {
return fmt.Errorf("getting storage: %w", err)
}

err = svc.db.PinnedObject().CreateFromPackage(tx, msg.PackageID, stg.NodeID)
if err != nil {
return fmt.Errorf("creating pinned object from package: %w", err)
}

if len(msg.PinnedBlocks) > 0 {
err = svc.db.ObjectBlock().BatchCreate(tx, msg.PinnedBlocks)
if err != nil {
return fmt.Errorf("batch creating object block: %w", err)
}
}

return nil return nil
}) })
if err != nil { if err != nil {


Loading…
Cancel
Save