Browse Source

拆分锁服务的代码

gitlink
Sydonian 2 years ago
parent
commit
a2f195322e
2 changed files with 32 additions and 34 deletions
  1. +29
    -31
      internal/services/mq/storage.go
  2. +3
    -3
      main.go

+ 29
- 31
internal/services/mq/storage.go View File

@@ -2,28 +2,26 @@ package mq


import ( import (
"io/fs" "io/fs"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"


"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/config"
mytask "gitlink.org.cn/cloudream/storage-agent/internal/task" mytask "gitlink.org.cn/cloudream/storage-agent/internal/task"
"gitlink.org.cn/cloudream/storage-common/consts" "gitlink.org.cn/cloudream/storage-common/consts"
"gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/utils"

"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage-common/utils"
) )


func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) (*agtmq.StartStorageMovePackageResp, *mq.CodeMessage) {
func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) (*agtmq.StartStorageLoadPackageResp, *mq.CodeMessage) {
coorCli, err := globals.CoordinatorMQPool.Acquire() coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil { if err != nil {
logger.Warnf("new coordinator client: %s", err.Error()) logger.Warnf("new coordinator client: %s", err.Error())
@@ -40,7 +38,7 @@ func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage)
return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed")
} }


outputDirPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, utils.MakeStorageMovePackageDirName(msg.PackageID, msg.UserID))
outputDirPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, utils.MakeStorageLoadPackageDirName(msg.PackageID, msg.UserID))
if err = os.MkdirAll(outputDirPath, 0755); err != nil { if err = os.MkdirAll(outputDirPath, 0755); err != nil {
logger.WithField("StorageID", msg.StorageID). logger.WithField("StorageID", msg.StorageID).
Warnf("creating output directory: %s", err.Error()) Warnf("creating output directory: %s", err.Error())
@@ -49,15 +47,15 @@ func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage)
} }


tsk := svc.taskManager.StartNew(mytask.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath)) tsk := svc.taskManager.StartNew(mytask.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath))
return mq.ReplyOK(agtmq.NewStartStorageMovePackageResp(tsk.ID()))
return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID()))
} }


func (svc *Service) WaitStorageMovePackage(msg *agtmq.WaitStorageMovePackage) (*agtmq.WaitStorageMovePackageResp, *mq.CodeMessage) {
logger.WithField("TaskID", msg.TaskID).Debugf("wait moving package")
func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*agtmq.WaitStorageLoadPackageResp, *mq.CodeMessage) {
logger.WithField("TaskID", msg.TaskID).Debugf("wait loading package")


tsk := svc.taskManager.FindByID(msg.TaskID) tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk == nil { if tsk == nil {
return mq.ReplyFailed[agtmq.WaitStorageMovePackageResp](errorcode.TaskNotFound, "task not found")
return mq.ReplyFailed[agtmq.WaitStorageLoadPackageResp](errorcode.TaskNotFound, "task not found")
} }


if msg.WaitTimeoutMs == 0 { if msg.WaitTimeoutMs == 0 {
@@ -68,7 +66,7 @@ func (svc *Service) WaitStorageMovePackage(msg *agtmq.WaitStorageMovePackage) (*
errMsg = tsk.Error().Error() errMsg = tsk.Error().Error()
} }


return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(true, errMsg))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg))


} else { } else {
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {
@@ -78,17 +76,17 @@ func (svc *Service) WaitStorageMovePackage(msg *agtmq.WaitStorageMovePackage) (*
errMsg = tsk.Error().Error() errMsg = tsk.Error().Error()
} }


return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(true, errMsg))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg))
} }


return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(false, ""))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, ""))
} }
} }


func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) { func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) {
dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory) dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory)


infos, err := ioutil.ReadDir(dirFullPath)
infos, err := os.ReadDir(dirFullPath)
if err != nil { if err != nil {
logger.Warnf("list storage directory failed, err: %s", err.Error()) logger.Warnf("list storage directory failed, err: %s", err.Error())
return mq.ReplyOK(agtmq.NewStorageCheckResp( return mq.ReplyOK(agtmq.NewStorageCheckResp(
@@ -97,30 +95,30 @@ func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckRe
)) ))
} }


fileInfos := lo.Filter(infos, func(info fs.FileInfo, index int) bool { return !info.IsDir() })
dirInfos := lo.Filter(infos, func(info fs.DirEntry, index int) bool { return info.IsDir() })


if msg.IsComplete { if msg.IsComplete {
return svc.checkStorageComplete(msg, fileInfos)
return svc.checkStorageComplete(msg, dirInfos)
} else { } else {
return svc.checkStorageIncrement(msg, fileInfos)
return svc.checkStorageIncrement(msg, dirInfos)
} }
} }


func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, fileInfos []fs.FileInfo) (*agtmq.StorageCheckResp, *mq.CodeMessage) {
infosMap := make(map[string]fs.FileInfo)
for _, info := range fileInfos {
func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, dirInfos []fs.DirEntry) (*agtmq.StorageCheckResp, *mq.CodeMessage) {
infosMap := make(map[string]fs.DirEntry)
for _, info := range dirInfos {
infosMap[info.Name()] = info infosMap[info.Name()] = info
} }


var entries []agtmq.StorageCheckRespEntry var entries []agtmq.StorageCheckRespEntry
for _, obj := range msg.Packages { for _, obj := range msg.Packages {
fileName := utils.MakeStorageMovePackageDirName(obj.PackageID, obj.UserID)
_, ok := infosMap[fileName]
dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID)
_, ok := infosMap[dirName]


if ok { if ok {
// 不需要做处理 // 不需要做处理
// 删除map中的记录,表示此记录已被检查过 // 删除map中的记录,表示此记录已被检查过
delete(infosMap, fileName)
delete(infosMap, dirName)


} else { } else {
// 只要文件不存在,就删除StoragePackage表中的记录 // 只要文件不存在,就删除StoragePackage表中的记录
@@ -133,22 +131,22 @@ func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, fileInfos []f
return mq.ReplyOK(agtmq.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries)) return mq.ReplyOK(agtmq.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries))
} }


func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, fileInfos []fs.FileInfo) (*agtmq.StorageCheckResp, *mq.CodeMessage) {
func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, dirInfos []fs.DirEntry) (*agtmq.StorageCheckResp, *mq.CodeMessage) {


infosMap := make(map[string]fs.FileInfo)
for _, info := range fileInfos {
infosMap := make(map[string]fs.DirEntry)
for _, info := range dirInfos {
infosMap[info.Name()] = info infosMap[info.Name()] = info
} }


var entries []agtmq.StorageCheckRespEntry var entries []agtmq.StorageCheckRespEntry
for _, obj := range msg.Packages { for _, obj := range msg.Packages {
fileName := utils.MakeStorageMovePackageDirName(obj.PackageID, obj.UserID)
_, ok := infosMap[fileName]
dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID)
_, ok := infosMap[dirName]


if ok { if ok {
// 不需要做处理 // 不需要做处理
// 删除map中的记录,表示此记录已被检查过 // 删除map中的记录,表示此记录已被检查过
delete(infosMap, fileName)
delete(infosMap, dirName)


} else { } else {
// 只要文件不存在,就删除StoragePackage表中的记录 // 只要文件不存在,就删除StoragePackage表中的记录
@@ -176,7 +174,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka
return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed")
} }


fullPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, msg.Path)
fullPath := filepath.Clean(filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, msg.Path))


var uploadFilePathes []string var uploadFilePathes []string
err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error { err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error {


+ 3
- 3
main.go View File

@@ -6,11 +6,11 @@ import (
"os" "os"
"sync" "sync"


distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
log "gitlink.org.cn/cloudream/common/pkgs/logger" log "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-agent/internal/task"
"gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock"
agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent" agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent"


"google.golang.org/grpc" "google.golang.org/grpc"
@@ -47,7 +47,7 @@ func main() {
}) })
globals.InitIPFSPool(&config.Cfg().IPFS) globals.InitIPFSPool(&config.Cfg().IPFS)


distlock, err := distsvc.NewService(&config.Cfg().DistLock)
distlock, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil { if err != nil {
log.Fatalf("new ipfs failed, err: %s", err.Error()) log.Fatalf("new ipfs failed, err: %s", err.Error())
} }
@@ -115,7 +115,7 @@ func serveGRPC(s *grpc.Server, lis net.Listener, wg *sync.WaitGroup) {
wg.Done() wg.Done()
} }


func serveDistLock(svc *distsvc.Service) {
func serveDistLock(svc *distlock.Service) {
log.Info("start serving distlock") log.Info("start serving distlock")


err := svc.Serve() err := svc.Serve()


Loading…
Cancel
Save