|
|
@@ -10,7 +10,6 @@ import ( |
|
|
"gitlink.org.cn/cloudream/common/consts/errorcode" |
|
|
"gitlink.org.cn/cloudream/common/consts/errorcode" |
|
|
"gitlink.org.cn/cloudream/common/pkgs/logger" |
|
|
"gitlink.org.cn/cloudream/common/pkgs/logger" |
|
|
"gitlink.org.cn/cloudream/common/pkgs/mq" |
|
|
"gitlink.org.cn/cloudream/common/pkgs/mq" |
|
|
"gitlink.org.cn/cloudream/storage/agent/internal/config" |
|
|
|
|
|
mytask "gitlink.org.cn/cloudream/storage/agent/internal/task" |
|
|
mytask "gitlink.org.cn/cloudream/storage/agent/internal/task" |
|
|
"gitlink.org.cn/cloudream/storage/common/consts" |
|
|
"gitlink.org.cn/cloudream/storage/common/consts" |
|
|
stgglb "gitlink.org.cn/cloudream/storage/common/globals" |
|
|
stgglb "gitlink.org.cn/cloudream/storage/common/globals" |
|
|
@@ -37,7 +36,7 @@ func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) |
|
|
return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") |
|
|
return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
outputDirPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, utils.MakeStorageLoadPackageDirName(msg.PackageID, msg.UserID)) |
|
|
|
|
|
|
|
|
outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, msg.UserID, msg.PackageID) |
|
|
if err = os.MkdirAll(outputDirPath, 0755); err != nil { |
|
|
if err = os.MkdirAll(outputDirPath, 0755); err != nil { |
|
|
logger.WithField("StorageID", msg.StorageID). |
|
|
logger.WithField("StorageID", msg.StorageID). |
|
|
Warnf("creating output directory: %s", err.Error()) |
|
|
Warnf("creating output directory: %s", err.Error()) |
|
|
@@ -87,9 +86,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) { |
|
|
func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) { |
|
|
dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory) |
|
|
|
|
|
|
|
|
|
|
|
infos, err := os.ReadDir(dirFullPath) |
|
|
|
|
|
|
|
|
infos, err := os.ReadDir(msg.Directory) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
logger.Warnf("list storage directory failed, err: %s", err.Error()) |
|
|
logger.Warnf("list storage directory failed, err: %s", err.Error()) |
|
|
return mq.ReplyOK(agtmq.NewStorageCheckResp( |
|
|
return mq.ReplyOK(agtmq.NewStorageCheckResp( |
|
|
@@ -115,7 +112,7 @@ func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, dirInfos []fs |
|
|
|
|
|
|
|
|
var entries []agtmq.StorageCheckRespEntry |
|
|
var entries []agtmq.StorageCheckRespEntry |
|
|
for _, obj := range msg.Packages { |
|
|
for _, obj := range msg.Packages { |
|
|
dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID) |
|
|
|
|
|
|
|
|
dirName := utils.MakeStorageLoadPackagePath(msg.Directory, obj.UserID, obj.PackageID) |
|
|
_, ok := infosMap[dirName] |
|
|
_, ok := infosMap[dirName] |
|
|
|
|
|
|
|
|
if ok { |
|
|
if ok { |
|
|
@@ -143,7 +140,7 @@ func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, dirInfos []fs. |
|
|
|
|
|
|
|
|
var entries []agtmq.StorageCheckRespEntry |
|
|
var entries []agtmq.StorageCheckRespEntry |
|
|
for _, obj := range msg.Packages { |
|
|
for _, obj := range msg.Packages { |
|
|
dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID) |
|
|
|
|
|
|
|
|
dirName := utils.MakeStorageLoadPackagePath(msg.Directory, obj.UserID, obj.PackageID) |
|
|
_, ok := infosMap[dirName] |
|
|
_, ok := infosMap[dirName] |
|
|
|
|
|
|
|
|
if ok { |
|
|
if ok { |
|
|
@@ -177,7 +174,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka |
|
|
return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") |
|
|
return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
fullPath := filepath.Clean(filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, msg.Path)) |
|
|
|
|
|
|
|
|
fullPath := filepath.Clean(filepath.Join(getStgResp.Directory, msg.Path)) |
|
|
|
|
|
|
|
|
var uploadFilePathes []string |
|
|
var uploadFilePathes []string |
|
|
err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error { |
|
|
err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error { |
|
|
|