Browse Source

统一单文件和多文件逻辑;优化程序结构

gitlink
Sydonian 2 years ago
parent
commit
e077f64edd
12 changed files with 153 additions and 650 deletions
  1. +2
    -2
      internal/config/config.go
  2. +3
    -3
      internal/services/cmd/agent.go
  3. +12
    -12
      internal/services/cmd/ipfs.go
  4. +9
    -9
      internal/services/cmd/object.go
  5. +4
    -1
      internal/services/cmd/service.go
  6. +107
    -128
      internal/services/cmd/storage.go
  7. +0
    -194
      internal/task/ec_read.go
  8. +1
    -1
      internal/task/ipfs_pin.go
  9. +1
    -1
      internal/task/ipfs_read.go
  10. +8
    -8
      internal/task/task.go
  11. +0
    -285
      internal/task/upload_rep_objects.go
  12. +6
    -6
      main.go

+ 2
- 2
internal/config/config.go View File

@@ -5,7 +5,7 @@ import (
log "gitlink.org.cn/cloudream/common/pkgs/logger" log "gitlink.org.cn/cloudream/common/pkgs/logger"
c "gitlink.org.cn/cloudream/common/utils/config" c "gitlink.org.cn/cloudream/common/utils/config"
"gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/common/utils/ipfs"
racfg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/config"
stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
) )


type Config struct { type Config struct {
@@ -18,7 +18,7 @@ type Config struct {
StorageBaseDir string `json:"storageBaseDir"` StorageBaseDir string `json:"storageBaseDir"`
TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒 TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒
Logger log.Config `json:"logger"` Logger log.Config `json:"logger"`
RabbitMQ racfg.Config `json:"rabbitMQ"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
IPFS ipfs.Config `json:"ipfs"` IPFS ipfs.Config `json:"ipfs"`
DistLock distlock.Config `json:"distlock"` DistLock distlock.Config `json:"distlock"`
} }


+ 3
- 3
internal/services/cmd/agent.go View File

@@ -3,10 +3,10 @@ package cmd
import ( import (
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/consts" "gitlink.org.cn/cloudream/storage-common/consts"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
) )


func (svc *Service) GetState(msg *agtmsg.GetState) (*agtmsg.GetStateResp, *mq.CodeMessage) {
func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) {
var ipfsState string var ipfsState string


if svc.ipfs.IsUp() { if svc.ipfs.IsUp() {
@@ -15,5 +15,5 @@ func (svc *Service) GetState(msg *agtmsg.GetState) (*agtmsg.GetStateResp, *mq.Co
ipfsState = consts.IPFSStateOK ipfsState = consts.IPFSStateOK
} }


return mq.ReplyOK(agtmsg.NewGetStateRespBody(ipfsState))
return mq.ReplyOK(agtmq.NewGetStateResp(ipfsState))
} }

+ 12
- 12
internal/services/cmd/ipfs.go View File

@@ -10,14 +10,14 @@ import (
"gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-agent/internal/task"
"gitlink.org.cn/cloudream/storage-common/consts" "gitlink.org.cn/cloudream/storage-common/consts"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
) )


func (svc *Service) CheckIPFS(msg *agtmsg.CheckIPFS) (*agtmsg.CheckIPFSResp, *mq.CodeMessage) {
func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.CodeMessage) {
filesMap, err := svc.ipfs.GetPinnedFiles() filesMap, err := svc.ipfs.GetPinnedFiles()
if err != nil { if err != nil {
logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error())
return mq.ReplyFailed[agtmsg.CheckIPFSResp](errorcode.OperationFailed, "get pinned files from ipfs failed")
return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "get pinned files from ipfs failed")
} }


// TODO 根据锁定清单过滤被锁定的文件的记录 // TODO 根据锁定清单过滤被锁定的文件的记录
@@ -28,8 +28,8 @@ func (svc *Service) CheckIPFS(msg *agtmsg.CheckIPFS) (*agtmsg.CheckIPFSResp, *mq
} }
} }


func (svc *Service) checkIncrement(msg *agtmsg.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmsg.CheckIPFSResp, *mq.CodeMessage) {
var entries []agtmsg.CheckIPFSRespEntry
func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmq.CheckIPFSResp, *mq.CodeMessage) {
var entries []agtmq.CheckIPFSRespEntry
for _, cache := range msg.Caches { for _, cache := range msg.Caches {
_, ok := filesMap[cache.FileHash] _, ok := filesMap[cache.FileHash]
if ok { if ok {
@@ -52,7 +52,7 @@ func (svc *Service) checkIncrement(msg *agtmsg.CheckIPFS, filesMap map[string]sh


} else if cache.State == consts.CacheStateTemp { } else if cache.State == consts.CacheStateTemp {
if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second {
entries = append(entries, agtmsg.NewCheckIPFSRespEntry(cache.FileHash, agtmsg.CHECK_IPFS_RESP_OP_DELETE_TEMP))
entries = append(entries, agtmq.NewCheckIPFSRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP))
} }
} }
} }
@@ -60,11 +60,11 @@ func (svc *Service) checkIncrement(msg *agtmsg.CheckIPFS, filesMap map[string]sh


// 增量情况下,不需要对filesMap中没检查的记录进行处理 // 增量情况下,不需要对filesMap中没检查的记录进行处理


return mq.ReplyOK(agtmsg.NewCheckIPFSResp(entries))
return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries))
} }


func (svc *Service) checkComplete(msg *agtmsg.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmsg.CheckIPFSResp, *mq.CodeMessage) {
var entries []agtmsg.CheckIPFSRespEntry
func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmq.CheckIPFSResp, *mq.CodeMessage) {
var entries []agtmq.CheckIPFSRespEntry
for _, cache := range msg.Caches { for _, cache := range msg.Caches {
_, ok := filesMap[cache.FileHash] _, ok := filesMap[cache.FileHash]
if ok { if ok {
@@ -87,7 +87,7 @@ func (svc *Service) checkComplete(msg *agtmsg.CheckIPFS, filesMap map[string]she


} else if cache.State == consts.CacheStateTemp { } else if cache.State == consts.CacheStateTemp {
if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second {
entries = append(entries, agtmsg.NewCheckIPFSRespEntry(cache.FileHash, agtmsg.CHECK_IPFS_RESP_OP_DELETE_TEMP))
entries = append(entries, agtmq.NewCheckIPFSRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP))
} }
} }
} }
@@ -100,8 +100,8 @@ func (svc *Service) checkComplete(msg *agtmsg.CheckIPFS, filesMap map[string]she
if err != nil { if err != nil {
logger.WithField("FileHash", hash).Warnf("unpin file failed, err: %s", err.Error()) logger.WithField("FileHash", hash).Warnf("unpin file failed, err: %s", err.Error())
} }
entries = append(entries, agtmsg.NewCheckIPFSRespEntry(hash, agtmsg.CHECK_IPFS_RESP_OP_CREATE_TEMP))
entries = append(entries, agtmq.NewCheckIPFSRespEntry(hash, agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP))
} }


return mq.ReplyOK(agtmsg.NewCheckIPFSResp(entries))
return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries))
} }

+ 9
- 9
internal/services/cmd/object.go View File

@@ -7,10 +7,10 @@ import (
log "gitlink.org.cn/cloudream/common/pkgs/logger" log "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-agent/internal/task"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
) )


func (svc *Service) StartPinningObject(msg *agtmsg.StartPinningObject) (*agtmsg.StartPinningObjectResp, *mq.CodeMessage) {
func (svc *Service) StartPinningObject(msg *agtmq.StartPinningObject) (*agtmq.StartPinningObjectResp, *mq.CodeMessage) {
log.WithField("FileHash", msg.FileHash).Debugf("pin object") log.WithField("FileHash", msg.FileHash).Debugf("pin object")


tsk := svc.taskManager.StartComparable(task.NewIPFSPin(msg.FileHash)) tsk := svc.taskManager.StartComparable(task.NewIPFSPin(msg.FileHash))
@@ -18,18 +18,18 @@ func (svc *Service) StartPinningObject(msg *agtmsg.StartPinningObject) (*agtmsg.
if tsk.Error() != nil { if tsk.Error() != nil {
log.WithField("FileHash", msg.FileHash). log.WithField("FileHash", msg.FileHash).
Warnf("pin object failed, err: %s", tsk.Error().Error()) Warnf("pin object failed, err: %s", tsk.Error().Error())
return mq.ReplyFailed[agtmsg.StartPinningObjectResp](errorcode.OperationFailed, "pin object failed")
return mq.ReplyFailed[agtmq.StartPinningObjectResp](errorcode.OperationFailed, "pin object failed")
} }


return mq.ReplyOK(agtmsg.NewStartPinningObjectResp(tsk.ID()))
return mq.ReplyOK(agtmq.NewStartPinningObjectResp(tsk.ID()))
} }


func (svc *Service) WaitPinningObject(msg *agtmsg.WaitPinningObject) (*agtmsg.WaitPinningObjectResp, *mq.CodeMessage) {
func (svc *Service) WaitPinningObject(msg *agtmq.WaitPinningObject) (*agtmq.WaitPinningObjectResp, *mq.CodeMessage) {
log.WithField("TaskID", msg.TaskID).Debugf("wait pinning object") log.WithField("TaskID", msg.TaskID).Debugf("wait pinning object")


tsk := svc.taskManager.FindByID(msg.TaskID) tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk == nil { if tsk == nil {
return mq.ReplyFailed[agtmsg.WaitPinningObjectResp](errorcode.TaskNotFound, "task not found")
return mq.ReplyFailed[agtmq.WaitPinningObjectResp](errorcode.TaskNotFound, "task not found")
} }


if msg.WaitTimeoutMs == 0 { if msg.WaitTimeoutMs == 0 {
@@ -40,7 +40,7 @@ func (svc *Service) WaitPinningObject(msg *agtmsg.WaitPinningObject) (*agtmsg.Wa
errMsg = tsk.Error().Error() errMsg = tsk.Error().Error()
} }


return mq.ReplyOK(agtmsg.NewWaitPinningObjectResp(true, errMsg))
return mq.ReplyOK(agtmq.NewWaitPinningObjectResp(true, errMsg))


} else { } else {
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {
@@ -50,9 +50,9 @@ func (svc *Service) WaitPinningObject(msg *agtmsg.WaitPinningObject) (*agtmsg.Wa
errMsg = tsk.Error().Error() errMsg = tsk.Error().Error()
} }


return mq.ReplyOK(agtmsg.NewWaitPinningObjectResp(true, errMsg))
return mq.ReplyOK(agtmq.NewWaitPinningObjectResp(true, errMsg))
} }


return mq.ReplyOK(agtmsg.NewWaitPinningObjectResp(false, ""))
return mq.ReplyOK(agtmq.NewWaitPinningObjectResp(false, ""))
} }
} }

+ 4
- 1
internal/services/cmd/service.go View File

@@ -3,16 +3,19 @@ package cmd
import ( import (
"gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/common/utils/ipfs"
"gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-agent/internal/task"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
) )


type Service struct { type Service struct {
ipfs *ipfs.IPFS ipfs *ipfs.IPFS
taskManager *task.Manager taskManager *task.Manager
coordinator *coormq.Client
} }


func NewService(ipfs *ipfs.IPFS, taskMgr *task.Manager) *Service {
func NewService(ipfs *ipfs.IPFS, taskMgr *task.Manager, coordinator *coormq.Client) *Service {
return &Service{ return &Service{
ipfs: ipfs, ipfs: ipfs,
taskManager: taskMgr, taskManager: taskMgr,
coordinator: coordinator,
} }
} }

+ 107
- 128
internal/services/cmd/storage.go View File

@@ -1,7 +1,6 @@
package cmd package cmd


import ( import (
"fmt"
"io/fs" "io/fs"
"io/ioutil" "io/ioutil"
"os" "os"
@@ -9,70 +8,48 @@ import (
"time" "time"


"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-agent/internal/task"
mytask "gitlink.org.cn/cloudream/storage-agent/internal/task"
"gitlink.org.cn/cloudream/storage-common/consts" "gitlink.org.cn/cloudream/storage-common/consts"
"gitlink.org.cn/cloudream/storage-common/models"
"gitlink.org.cn/cloudream/storage-common/pkgs/ec"
"gitlink.org.cn/cloudream/storage-common/utils" "gitlink.org.cn/cloudream/storage-common/utils"


"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent"
stgcmd "gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
myos "gitlink.org.cn/cloudream/storage-common/utils/os"
) )


func (service *Service) StartStorageMoveObject(msg *agtmsg.StartStorageMoveObject) (*agtmsg.StartStorageMoveObjectResp, *mq.CodeMessage) {
// TODO 修改文件名,可用objectname
outFileName := utils.MakeMoveOperationFileName(msg.ObjectID, msg.UserID)
objectDir := filepath.Dir(msg.ObjectName)
outFilePath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory, objectDir, outFileName)

if repRed, ok := msg.Redundancy.(models.RepRedundancyData); ok {
taskID, err := service.moveRepObject(repRed, outFilePath)
if err != nil {
logger.Warnf("move rep object as %s failed, err: %s", outFilePath, err.Error())
return mq.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "move rep object failed")
}

return mq.ReplyOK(agtmsg.NewStartStorageMoveObjectResp(taskID))

} else if repRed, ok := msg.Redundancy.(models.ECRedundancyData); ok {
taskID, err := service.moveEcObject(msg.ObjectID, msg.FileSize, repRed, outFilePath)
if err != nil {
logger.Warnf("move ec object as %s failed, err: %s", outFilePath, err.Error())
return mq.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "move ec object failed")
}
func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) (*agtmq.StartStorageMovePackageResp, *mq.CodeMessage) {
getStgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
if err != nil {
logger.WithField("StorageID", msg.StorageID).
Warnf("getting storage info: %s", err.Error())


return mq.ReplyOK(agtmsg.NewStartStorageMoveObjectResp(taskID))
return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed")
} }


return mq.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "not rep or ec object???")
}

func (svc *Service) moveRepObject(repData models.RepRedundancyData, outFilePath string) (string, error) {
tsk := svc.taskManager.StartComparable(task.NewIPFSRead(repData.FileHash, outFilePath))
return tsk.ID(), nil
}
outputDirPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, utils.MakeStorageMovePackageDirName(msg.PackageID, msg.UserID))
if err = os.MkdirAll(outputDirPath, 0755); err != nil {
logger.WithField("StorageID", msg.StorageID).
Warnf("creating output directory: %s", err.Error())


func (svc *Service) moveEcObject(objID int64, fileSize int64, ecData models.ECRedundancyData, outFilePath string) (string, error) {
ecK := ecData.Ec.EcK
blockIDs := make([]int, ecK)
hashs := make([]string, ecK)
for i := 0; i < ecK; i++ {
blockIDs[i] = i
hashs[i] = ecData.Blocks[i].FileHash
return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed")
} }
tsk := svc.taskManager.StartComparable(task.NewEcRead(objID, fileSize, ecData.Ec, blockIDs, hashs, outFilePath))
return tsk.ID(), nil

tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext](stgcmd.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath)))
return mq.ReplyOK(agtmq.NewStartStorageMovePackageResp(tsk.ID()))
} }


func (svc *Service) WaitStorageMoveObject(msg *agtmsg.WaitStorageMoveObject) (*agtmsg.WaitStorageMoveObjectResp, *mq.CodeMessage) {
logger.WithField("TaskID", msg.TaskID).Debugf("wait moving object")
func (svc *Service) WaitStorageMovePackage(msg *agtmq.WaitStorageMovePackage) (*agtmq.WaitStorageMovePackageResp, *mq.CodeMessage) {
logger.WithField("TaskID", msg.TaskID).Debugf("wait moving package")


tsk := svc.taskManager.FindByID(msg.TaskID) tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk == nil { if tsk == nil {
return mq.ReplyFailed[agtmsg.WaitStorageMoveObjectResp](errorcode.TaskNotFound, "task not found")
return mq.ReplyFailed[agtmq.WaitStorageMovePackageResp](errorcode.TaskNotFound, "task not found")
} }


if msg.WaitTimeoutMs == 0 { if msg.WaitTimeoutMs == 0 {
@@ -83,7 +60,7 @@ func (svc *Service) WaitStorageMoveObject(msg *agtmsg.WaitStorageMoveObject) (*a
errMsg = tsk.Error().Error() errMsg = tsk.Error().Error()
} }


return mq.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(true, errMsg))
return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(true, errMsg))


} else { } else {
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {
@@ -93,20 +70,20 @@ func (svc *Service) WaitStorageMoveObject(msg *agtmsg.WaitStorageMoveObject) (*a
errMsg = tsk.Error().Error() errMsg = tsk.Error().Error()
} }


return mq.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(true, errMsg))
return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(true, errMsg))
} }


return mq.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(false, ""))
return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(false, ""))
} }
} }


func (svc *Service) StorageCheck(msg *agtmsg.StorageCheck) (*agtmsg.StorageCheckResp, *mq.CodeMessage) {
func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) {
dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory) dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory)


infos, err := ioutil.ReadDir(dirFullPath) infos, err := ioutil.ReadDir(dirFullPath)
if err != nil { if err != nil {
logger.Warnf("list storage directory failed, err: %s", err.Error()) logger.Warnf("list storage directory failed, err: %s", err.Error())
return mq.ReplyOK(agtmsg.NewStorageCheckResp(
return mq.ReplyOK(agtmq.NewStorageCheckResp(
err.Error(), err.Error(),
nil, nil,
)) ))
@@ -121,15 +98,15 @@ func (svc *Service) StorageCheck(msg *agtmsg.StorageCheck) (*agtmsg.StorageCheck
} }
} }


func (svc *Service) checkStorageIncrement(msg *agtmsg.StorageCheck, fileInfos []fs.FileInfo) (*agtmsg.StorageCheckResp, *mq.CodeMessage) {
func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, fileInfos []fs.FileInfo) (*agtmq.StorageCheckResp, *mq.CodeMessage) {
infosMap := make(map[string]fs.FileInfo) infosMap := make(map[string]fs.FileInfo)
for _, info := range fileInfos { for _, info := range fileInfos {
infosMap[info.Name()] = info infosMap[info.Name()] = info
} }


var entries []agtmsg.StorageCheckRespEntry
for _, obj := range msg.Objects {
fileName := utils.MakeMoveOperationFileName(obj.ObjectID, obj.UserID)
var entries []agtmq.StorageCheckRespEntry
for _, obj := range msg.Packages {
fileName := utils.MakeStorageMovePackageDirName(obj.PackageID, obj.UserID)
_, ok := infosMap[fileName] _, ok := infosMap[fileName]


if ok { if ok {
@@ -138,26 +115,26 @@ func (svc *Service) checkStorageIncrement(msg *agtmsg.StorageCheck, fileInfos []
delete(infosMap, fileName) delete(infosMap, fileName)


} else { } else {
// 只要文件不存在,就删除StorageObject表中的记录
entries = append(entries, agtmsg.NewStorageCheckRespEntry(obj.ObjectID, obj.UserID, agtmsg.CHECK_STORAGE_RESP_OP_DELETE))
// 只要文件不存在,就删除StoragePackage表中的记录
entries = append(entries, agtmq.NewStorageCheckRespEntry(obj.PackageID, obj.UserID, agtmq.CHECK_STORAGE_RESP_OP_DELETE))
} }
} }


// 增量情况下,不需要对infosMap中没检查的记录进行处理 // 增量情况下,不需要对infosMap中没检查的记录进行处理


return mq.ReplyOK(agtmsg.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries))
return mq.ReplyOK(agtmq.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries))
} }


func (svc *Service) checkStorageComplete(msg *agtmsg.StorageCheck, fileInfos []fs.FileInfo) (*agtmsg.StorageCheckResp, *mq.CodeMessage) {
func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, fileInfos []fs.FileInfo) (*agtmq.StorageCheckResp, *mq.CodeMessage) {


infosMap := make(map[string]fs.FileInfo) infosMap := make(map[string]fs.FileInfo)
for _, info := range fileInfos { for _, info := range fileInfos {
infosMap[info.Name()] = info infosMap[info.Name()] = info
} }


var entries []agtmsg.StorageCheckRespEntry
for _, obj := range msg.Objects {
fileName := utils.MakeMoveOperationFileName(obj.ObjectID, obj.UserID)
var entries []agtmq.StorageCheckRespEntry
for _, obj := range msg.Packages {
fileName := utils.MakeStorageMovePackageDirName(obj.PackageID, obj.UserID)
_, ok := infosMap[fileName] _, ok := infosMap[fileName]


if ok { if ok {
@@ -166,83 +143,83 @@ func (svc *Service) checkStorageComplete(msg *agtmsg.StorageCheck, fileInfos []f
delete(infosMap, fileName) delete(infosMap, fileName)


} else { } else {
// 只要文件不存在,就删除StorageObject表中的记录
entries = append(entries, agtmsg.NewStorageCheckRespEntry(obj.ObjectID, obj.UserID, agtmsg.CHECK_STORAGE_RESP_OP_DELETE))
// 只要文件不存在,就删除StoragePackage表中的记录
entries = append(entries, agtmq.NewStorageCheckRespEntry(obj.PackageID, obj.UserID, agtmq.CHECK_STORAGE_RESP_OP_DELETE))
} }
} }


return mq.ReplyOK(agtmsg.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries))
return mq.ReplyOK(agtmq.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries))
} }


func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) {
fmt.Println("decode ")
var tmpIn [][]byte
var zeroPkt []byte
tmpIn = make([][]byte, len(inBufs))
hasBlock := map[int]bool{}
for j := 0; j < len(blockSeq); j++ {
hasBlock[blockSeq[j]] = true
}
needRepair := false //检测是否传入了所有数据块
for j := 0; j < len(outBufs); j++ {
if blockSeq[j] != j {
needRepair = true
}
func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePackage) (*agtmq.StartStorageCreatePackageResp, *mq.CodeMessage) {
getStgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
if err != nil {
logger.WithField("StorageID", msg.StorageID).
Warnf("getting storage info: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed")
} }
enc := ec.NewRsEnc(ecK, len(inBufs))
for i := 0; int64(i) < numPacket; i++ {
for j := 0; j < len(inBufs); j++ { //3
if hasBlock[j] {
tmpIn[j] = <-inBufs[j]
} else {
tmpIn[j] = zeroPkt
}
}
if needRepair {
err := enc.Repair(tmpIn)
if err != nil {
fmt.Fprintf(os.Stderr, "Decode Repair Error: %s", err.Error())
}

fullPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, msg.Path)

var uploadFilePathes []string
err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error {
if err != nil {
return nil
} }
for j := 0; j < len(outBufs); j++ { //1,2,3//示意,需要调用纠删码编解码引擎: tmp[k] = tmp[k]+(tmpIn[w][k]*coefs[w][j])
outBufs[j] <- tmpIn[j]

if !fi.IsDir() {
uploadFilePathes = append(uploadFilePathes, fname)
} }

return nil
})
if err != nil {
logger.Warnf("opening directory %s: %s", fullPath, err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "read directory failed")
} }
for i := 0; i < len(outBufs); i++ {
close(outBufs[i])
}
}


func (svc *Service) StartStorageUploadRepObject(msg *agtmsg.StartStorageUploadRepObject) (*agtmsg.StartStorageUploadRepObjectResp, *mq.CodeMessage) {
fullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.StorageDirectory, msg.FilePath)
objIter := myos.NewUploadingObjectIterator(fullPath, uploadFilePathes)


file, err := os.Open(fullPath)
if err != nil {
logger.Warnf("opening file %s: %s", fullPath, err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "open file failed")
if msg.Redundancy.Type == models.RedundancyRep {
repInfo, err := msg.Redundancy.ToRepInfo()
if err != nil {
logger.Warnf("getting rep redundancy info: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get rep redundancy info failed")
}

tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext](
stgcmd.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo, stgcmd.UploadConfig{
LocalIPFS: svc.ipfs,
LocalNodeID: &config.Cfg().ID,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})))
return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID()))
} }


fileInfo, err := file.Stat()
ecInfo, err := msg.Redundancy.ToECInfo()
if err != nil { if err != nil {
file.Close()
logger.Warnf("getting file %s state: %s", fullPath, err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get file info failed")
}
fileSize := fileInfo.Size()
logger.Warnf("getting ec redundancy info: %s", err.Error())


uploadObject := task.UploadObject{
ObjectName: msg.ObjectName,
File: file,
FileSize: fileSize,
return nil, mq.Failed(errorcode.OperationFailed, "get ec redundancy info failed")
} }
uploadObjects := []task.UploadObject{uploadObject}


// Task会关闭文件流
tsk := svc.taskManager.StartNew(task.NewUploadRepObjects(msg.UserID, msg.BucketID, uploadObjects, msg.RepCount))
return mq.ReplyOK(agtmsg.NewStartStorageUploadRepObjectResp(tsk.ID()))
tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext](
stgcmd.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo, config.Cfg().ECPacketSize, stgcmd.UploadConfig{
LocalIPFS: svc.ipfs,
LocalNodeID: &config.Cfg().ID,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})))
return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID()))
} }


func (svc *Service) WaitStorageUploadRepObject(msg *agtmsg.WaitStorageUploadRepObject) (*agtmsg.WaitStorageUploadRepObjectResp, *mq.CodeMessage) {
func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage) (*agtmq.WaitStorageCreatePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.FindByID(msg.TaskID) tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk == nil { if tsk == nil {
return nil, mq.Failed(errorcode.TaskNotFound, "task not found") return nil, mq.Failed(errorcode.TaskNotFound, "task not found")
@@ -251,20 +228,22 @@ func (svc *Service) WaitStorageUploadRepObject(msg *agtmsg.WaitStorageUploadRepO
if msg.WaitTimeoutMs == 0 { if msg.WaitTimeoutMs == 0 {
tsk.Wait() tsk.Wait()
} else if !tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { } else if !tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {
return mq.ReplyOK(agtmsg.NewWaitStorageUploadRepObjectResp(false, "", 0, ""))
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(false, "", 0))
} }


uploadTask := tsk.Body().(*task.UploadRepObjects)
uploadRet := uploadTask.Results[0]
wrapTask := tsk.Body().(*stgcmd.TaskWrapper[mytask.TaskContext])


errMsg := ""
if tsk.Error() != nil { if tsk.Error() != nil {
errMsg = tsk.Error().Error()
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, tsk.Error().Error(), 0))
}

if repTask, ok := wrapTask.InnerTask().(*stgcmd.CreateRepPackage); ok {
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", repTask.Result.PackageID))
} }


if uploadRet.Error != nil {
errMsg = uploadRet.Error.Error()
if ecTask, ok := wrapTask.InnerTask().(*stgcmd.CreateECPackage); ok {
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", ecTask.Result.PackageID))
} }


return mq.ReplyOK(agtmsg.NewWaitStorageUploadRepObjectResp(true, errMsg, uploadRet.ObjectID, uploadRet.FileHash))
return nil, mq.Failed(errorcode.TaskNotFound, "task not found")
} }

+ 0
- 194
internal/task/ec_read.go View File

@@ -1,194 +0,0 @@
package task

import (
"fmt"
"io"
"os"
"path/filepath"
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-common/models"
"gitlink.org.cn/cloudream/storage-common/pkgs/ec"
)

type EcRead struct {
objID int64
FileSize int64
Ec models.EC
BlockIDs []int
BlockHashs []string
LocalPath string
}

func NewEcRead(objID int64, fileSize int64, ec models.EC, blockIDs []int, blockHashs []string, localPath string) *EcRead {
return &EcRead{
objID: objID,
FileSize: fileSize,
Ec: ec,
BlockIDs: blockIDs,
BlockHashs: blockHashs,
LocalPath: localPath,
}
}

func (t *EcRead) Compare(other *Task) bool {
tsk, ok := other.Body().(*EcRead)
if !ok {
return false
}

return t.objID == tsk.objID && t.LocalPath == tsk.LocalPath
}

func (t *EcRead) Execute(ctx TaskContext, complete CompleteFn) {
log := logger.WithType[EcRead]("Task")
log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end")

outputFileDir := filepath.Dir(t.LocalPath)

err := os.MkdirAll(outputFileDir, os.ModePerm)
if err != nil {
err := fmt.Errorf("create output file directory %s failed, err: %w", outputFileDir, err)
log.WithField("LocalPath", t.LocalPath).Warn(err.Error())

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
return
}

outputFile, err := os.Create(t.LocalPath)
if err != nil {
err := fmt.Errorf("create output file %s failed, err: %w", t.LocalPath, err)
log.WithField("LocalPath", t.LocalPath).Warn(err.Error())

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
return
}
defer outputFile.Close()
//这里引入ecread
ecK := t.Ec.EcK
ecN := t.Ec.EcN
rd, err := readObject(ctx, t.FileSize, ecK, ecN, t.BlockIDs, t.BlockHashs)
if err != nil {
err := fmt.Errorf("read ipfs block failed, err: %w", err)
log.WithField("FileHash1", t.BlockHashs[0]).Warn(err.Error())

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
return
}

_, err = io.Copy(outputFile, rd)
if err != nil {
err := fmt.Errorf("copy ipfs file to local file failed, err: %w", err)
log.WithField("LocalPath", t.LocalPath).Warn(err.Error())

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
return
}

complete(nil, CompleteOption{
RemovingDelay: time.Minute,
})
}

func readObject(ctx TaskContext, fileSize int64, ecK int, ecN int, blockIDs []int, hashs []string) (io.ReadCloser, error) {
// TODO zkx 先使用同步方式实现读取多个block并解码数据的逻辑,做好错误处理

numPacket := (fileSize + int64(ecK)*config.Cfg().ECPacketSize - 1) / (int64(ecK) * config.Cfg().ECPacketSize)
getBufs := make([]chan []byte, ecN)
decodeBufs := make([]chan []byte, ecK)
for i := 0; i < ecN; i++ {
getBufs[i] = make(chan []byte)
}
for i := 0; i < ecK; i++ {
decodeBufs[i] = make(chan []byte)
}
for i := 0; i < len(blockIDs); i++ {
go get(ctx, hashs[i], getBufs[blockIDs[i]], numPacket)
}
//print(numPacket)
go decode(getBufs[:], decodeBufs[:], blockIDs, ecK, numPacket)
r, w := io.Pipe()
//这个就是persist函数
go func() {
for i := 0; int64(i) < numPacket; i++ {
for j := 0; j < len(decodeBufs); j++ {
tmp := <-decodeBufs[j]
_, err := w.Write(tmp)
if err != nil {
fmt.Errorf("persist file falied, err:%w", err)
}
}
}
w.Close()
}()
return r, nil
}

func get(ctx TaskContext, blockHash string, getBuf chan []byte, numPacket int64) error {
//使用本地IPFS获取
//获取IPFS的reader
reader, err := ctx.IPFS.OpenRead(blockHash)
if err != nil {
return fmt.Errorf("read ipfs block failed, err: %w", err)
}
defer reader.Close()
for i := 0; int64(i) < numPacket; i++ {
buf := make([]byte, config.Cfg().ECPacketSize)
_, err := io.ReadFull(reader, buf)
if err != nil {
return fmt.Errorf("read file falied, err:%w", err)
}
getBuf <- buf
}
close(getBuf)
return nil
}

func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) {
var tmpIn [][]byte
var zeroPkt []byte
tmpIn = make([][]byte, len(inBufs))
hasBlock := map[int]bool{}
for j := 0; j < len(blockSeq); j++ {
hasBlock[blockSeq[j]] = true
}
needRepair := false //检测是否传入了所有数据块
for j := 0; j < len(outBufs); j++ {
if blockSeq[j] != j {
needRepair = true
}
}
enc := ec.NewRsEnc(ecK, len(inBufs))
for i := 0; int64(i) < numPacket; i++ {
for j := 0; j < len(inBufs); j++ { //3
if hasBlock[j] {
tmpIn[j] = <-inBufs[j]
} else {
tmpIn[j] = zeroPkt
}
}
if needRepair {
err := enc.Repair(tmpIn)
if err != nil {
fmt.Fprintf(os.Stderr, "Decode Repair Error: %s", err.Error())
}
}
for j := 0; j < len(outBufs); j++ { //1,2,3//示意,需要调用纠删码编解码引擎: tmp[k] = tmp[k]+(tmpIn[w][k]*coefs[w][j])
outBufs[j] <- tmpIn[j]
}
}
for i := 0; i < len(outBufs); i++ {
close(outBufs[i])
}
}

+ 1
- 1
internal/task/ipfs_pin.go View File

@@ -31,7 +31,7 @@ func (t *IPFSPin) Execute(ctx TaskContext, complete CompleteFn) {
log.Debugf("begin with %v", logger.FormatStruct(t)) log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end") defer log.Debugf("end")


err := ctx.IPFS.Pin(t.FileHash)
err := ctx.ipfs.Pin(t.FileHash)
if err != nil { if err != nil {
err := fmt.Errorf("pin file failed, err: %w", err) err := fmt.Errorf("pin file failed, err: %w", err)
log.WithField("FileHash", t.FileHash).Warn(err.Error()) log.WithField("FileHash", t.FileHash).Warn(err.Error())


+ 1
- 1
internal/task/ipfs_read.go View File

@@ -61,7 +61,7 @@ func (t *IPFSRead) Execute(ctx TaskContext, complete CompleteFn) {
} }
defer outputFile.Close() defer outputFile.Close()


rd, err := ctx.IPFS.OpenRead(t.FileHash)
rd, err := ctx.ipfs.OpenRead(t.FileHash)
if err != nil { if err != nil {
err := fmt.Errorf("read ipfs file failed, err: %w", err) err := fmt.Errorf("read ipfs file failed, err: %w", err)
log.WithField("FileHash", t.FileHash).Warn(err.Error()) log.WithField("FileHash", t.FileHash).Warn(err.Error())


+ 8
- 8
internal/task/task.go View File

@@ -4,13 +4,13 @@ import (
distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/common/utils/ipfs"
coorcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
) )


type TaskContext struct { type TaskContext struct {
IPFS *ipfs.IPFS
Coordinator *coorcli.Client
DistLock *distsvc.Service
ipfs *ipfs.IPFS
coordinator *coormq.Client
distlock *distsvc.Service
} }


// 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用,
@@ -25,10 +25,10 @@ type Task = task.Task[TaskContext]


type CompleteOption = task.CompleteOption type CompleteOption = task.CompleteOption


func NewManager(ipfs *ipfs.IPFS, coorCli *coorcli.Client, distLock *distsvc.Service) Manager {
func NewManager(ipfs *ipfs.IPFS, coorCli *coormq.Client, distLock *distsvc.Service) Manager {
return task.NewManager(TaskContext{ return task.NewManager(TaskContext{
IPFS: ipfs,
Coordinator: coorCli,
DistLock: distLock,
ipfs: ipfs,
coordinator: coorCli,
distlock: distLock,
}) })
} }

+ 0
- 285
internal/task/upload_rep_objects.go View File

@@ -1,285 +0,0 @@
package task

import (
"fmt"
"io"
"math/rand"
"time"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/ipfs"
"gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-common/utils"
mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc"

ramsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// UploadObjects和UploadRepResults为一一对应关系
type UploadRepObjects struct {
userID int64
bucketID int64
repCount int
Objects []UploadObject
Results []UploadSingleRepObjectResult
IsUploading bool
}

type UploadRepObjectsResult struct {
Objects []UploadObject
Results []UploadSingleRepObjectResult
IsUploading bool
}

type UploadObject struct {
ObjectName string
File io.ReadCloser
FileSize int64
}

type UploadSingleRepObjectResult struct {
Error error
FileHash string
ObjectID int64
}

func NewUploadRepObjects(userID int64, bucketID int64, uploadObjects []UploadObject, repCount int) *UploadRepObjects {
return &UploadRepObjects{
userID: userID,
bucketID: bucketID,
Objects: uploadObjects,
repCount: repCount,
}
}

func (t *UploadRepObjects) Execute(ctx TaskContext, complete CompleteFn) {
log := logger.WithType[UploadRepObjects]("Task")
log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end")

err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
for _, obj := range t.Objects {
obj.File.Close()
}
}

func (t *UploadRepObjects) do(ctx TaskContext) error {

reqBlder := reqbuilder.NewBuilder()
for _, uploadObject := range t.Objects {
reqBlder.Metadata().
// 用于防止创建了多个同名对象
Object().CreateOne(t.bucketID, uploadObject.ObjectName)
}
mutex, err := reqBlder.
Metadata().
// 用于判断用户是否有桶的权限
UserBucket().ReadOne(t.userID, t.bucketID).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于设置Rep配置
ObjectRep().CreateAny().
// 用于创建Cache记录
Cache().CreateAny().
MutexLock(ctx.DistLock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

var repWriteResps []*coormsg.PreUploadResp

//判断是否所有文件都符合上传条件
hasFailure := true
for i := 0; i < len(t.Objects); i++ {
repWriteResp, err := t.preUploadSingleObject(ctx, t.Objects[i])
if err != nil {
hasFailure = false
t.Results = append(t.Results,
UploadSingleRepObjectResult{
Error: err,
FileHash: "",
ObjectID: 0,
})
continue
}
t.Results = append(t.Results, UploadSingleRepObjectResult{})
repWriteResps = append(repWriteResps, repWriteResp)
}

// 不满足上传条件,返回各文件检查结果
if !hasFailure {
return nil
}

//上传文件夹
t.IsUploading = true
for i := 0; i < len(repWriteResps); i++ {
objectID, fileHash, err := t.uploadSingleObject(ctx, t.Objects[i], repWriteResps[i])
// 记录文件上传结果
t.Results[i] = UploadSingleRepObjectResult{
Error: err,
FileHash: fileHash,
ObjectID: objectID,
}
}
return nil
}

// 检查单个文件是否能够上传
func (t *UploadRepObjects) preUploadSingleObject(ctx TaskContext, uploadObject UploadObject) (*coormsg.PreUploadResp, error) {
//发送写请求,请求Coor分配写入节点Ip
repWriteResp, err := ctx.Coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.userID, config.Cfg().ExternalIP))
if err != nil {
return nil, fmt.Errorf("pre upload rep object: %w", err)
}
if len(repWriteResp.Nodes) == 0 {
return nil, fmt.Errorf("no node to upload file")
}
return repWriteResp, nil
}

// 上传文件
func (t *UploadRepObjects) uploadSingleObject(ctx TaskContext, uploadObject UploadObject, preResp *coormsg.PreUploadResp) (int64, string, error) {

var fileHash string
uploadedNodeIDs := []int64{}
willUploadToNode := true

// 因为本地的IPFS属于调度系统的一部分,所以需要加锁
mutex, err := reqbuilder.NewBuilder().
IPFS().CreateAnyRep(config.Cfg().ID).
MutexLock(ctx.DistLock)
if err != nil {
return 0, "", fmt.Errorf("acquiring locks: %w", err)
}

fileHash, err = uploadToLocalIPFS(ctx.IPFS, uploadObject.File)
if err != nil {
// 上传失败,则立刻解锁
mutex.Unlock()

logger.Warnf("uploading to local IPFS: %s, will select a node to upload", err.Error())

} else {
willUploadToNode = false
uploadedNodeIDs = append(uploadedNodeIDs, config.Cfg().ID)

// 上传成功,则等到所有操作结束后才能解锁
defer mutex.Unlock()
}

// 本地IPFS失败,则发送到agent上传
if willUploadToNode {
// 本地IPFS已经失败,所以不要再选择当前节点了
uploadNode := t.chooseUploadNode(lo.Reject(preResp.Nodes, func(item ramsg.RespNode, index int) bool { return item.ID == config.Cfg().ID }))

// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := uploadNode.ExternalIP
if uploadNode.IsSameLocation {
nodeIP = uploadNode.LocalIP

logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID)
}

mutex, err := reqbuilder.NewBuilder().
// 防止上传的副本被清除
IPFS().CreateAnyRep(uploadNode.ID).
MutexLock(ctx.DistLock)
if err != nil {
return 0, "", fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

fileHash, err = uploadToNode(uploadObject.File, nodeIP)
if err != nil {
return 0, "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
}
uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID)
}

dirName := utils.GetDirectoryName(uploadObject.ObjectName)

// 记录写入的文件的Hash
createResp, err := ctx.Coordinator.CreateRepObject(coormsg.NewCreateRepObject(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.repCount, t.userID, uploadedNodeIDs, fileHash, dirName))
if err != nil {
return 0, "", fmt.Errorf("creating rep object: %w", err)
}

return createResp.ObjectID, fileHash, nil
}

// chooseUploadNode 选择一个上传文件的节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (t *UploadRepObjects) chooseUploadNode(nodes []ramsg.RespNode) ramsg.RespNode {
sameLocationNodes := lo.Filter(nodes, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation })
if len(sameLocationNodes) > 0 {
return sameLocationNodes[rand.Intn(len(sameLocationNodes))]
}

return nodes[rand.Intn(len(nodes))]
}

func uploadToNode(file io.ReadCloser, nodeIP string) (string, error) {
// 建立grpc连接,发送请求
grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return "", fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
defer grpcCon.Close()

client := agentcaller.NewFileTransportClient(grpcCon)
upload, err := mygrpc.SendFileAsStream(client)
if err != nil {
return "", fmt.Errorf("request to send file failed, err: %w", err)
}

// 发送文件数据
_, err = io.Copy(upload, file)
if err != nil {
// 发生错误则关闭连接
upload.Abort(io.ErrClosedPipe)
return "", fmt.Errorf("copy file date to upload stream failed, err: %w", err)
}

// 发送EOF消息,并获得FileHash
fileHash, err := upload.Finish()
if err != nil {
upload.Abort(io.ErrClosedPipe)
return "", fmt.Errorf("send EOF failed, err: %w", err)
}

return fileHash, nil
}

func uploadToLocalIPFS(ipfs *ipfs.IPFS, file io.ReadCloser) (string, error) {
// 从本地IPFS上传文件
writer, err := ipfs.CreateFile()
if err != nil {
return "", fmt.Errorf("create IPFS file failed, err: %w", err)
}

_, err = io.Copy(writer, file)
if err != nil {
return "", fmt.Errorf("copy file data to IPFS failed, err: %w", err)
}

fileHash, err := writer.Finish()
if err != nil {
return "", fmt.Errorf("finish writing IPFS failed, err: %w", err)
}

return fileHash, nil
}

+ 6
- 6
main.go View File

@@ -15,8 +15,8 @@ import (


"google.golang.org/grpc" "google.golang.org/grpc"


"gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator"
rasvr "gitlink.org.cn/cloudream/storage-common/pkgs/mq/server/agent"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"


cmdsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/cmd" cmdsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/cmd"
grpcsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/grpc" grpcsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/grpc"
@@ -46,7 +46,7 @@ func main() {
log.Fatalf("new ipfs failed, err: %s", err.Error()) log.Fatalf("new ipfs failed, err: %s", err.Error())
} }


coor, err := coordinator.NewClient(&config.Cfg().RabbitMQ)
coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ)
if err != nil { if err != nil {
log.Fatalf("new ipfs failed, err: %s", err.Error()) log.Fatalf("new ipfs failed, err: %s", err.Error())
} }
@@ -60,11 +60,11 @@ func main() {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(5) wg.Add(5)


taskMgr := task.NewManager(ipfs, coor, distlock)
taskMgr := task.NewManager(ipfs, coorCli, distlock)


// 启动命令服务器 // 启动命令服务器
// TODO 需要设计AgentID持久化机制 // TODO 需要设计AgentID持久化机制
agtSvr, err := rasvr.NewServer(cmdsvc.NewService(ipfs, &taskMgr), config.Cfg().ID, &config.Cfg().RabbitMQ)
agtSvr, err := agtmq.NewServer(cmdsvc.NewService(ipfs, &taskMgr, coorCli), config.Cfg().ID, &config.Cfg().RabbitMQ)
if err != nil { if err != nil {
log.Fatalf("new agent server failed, err: %s", err.Error()) log.Fatalf("new agent server failed, err: %s", err.Error())
} }
@@ -91,7 +91,7 @@ func main() {
wg.Wait() wg.Wait()
} }


func serveAgentServer(server *rasvr.Server, wg *sync.WaitGroup) {
func serveAgentServer(server *agtmq.Server, wg *sync.WaitGroup) {
log.Info("start serving command server") log.Info("start serving command server")


err := server.Serve() err := server.Serve()


Loading…
Cancel
Save