You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

storage.go 8.3 kB

2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. package mq
  2. import (
  3. "fmt"
  4. "io/fs"
  5. "os"
  6. "path/filepath"
  7. "strconv"
  8. "time"
  9. "github.com/samber/lo"
  10. "gitlink.org.cn/cloudream/common/consts/errorcode"
  11. "gitlink.org.cn/cloudream/common/pkgs/logger"
  12. "gitlink.org.cn/cloudream/common/pkgs/mq"
  13. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  14. mytask "gitlink.org.cn/cloudream/storage/agent/internal/task"
  15. "gitlink.org.cn/cloudream/storage/common/consts"
  16. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  17. "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
  18. "gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
  19. agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
  20. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  21. "gitlink.org.cn/cloudream/storage/common/utils"
  22. )
  23. func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) (*agtmq.StartStorageLoadPackageResp, *mq.CodeMessage) {
  24. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  25. if err != nil {
  26. logger.Warnf("new coordinator client: %s", err.Error())
  27. return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed")
  28. }
  29. defer stgglb.CoordinatorMQPool.Release(coorCli)
  30. getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
  31. if err != nil {
  32. logger.WithField("StorageID", msg.StorageID).
  33. Warnf("getting storage info: %s", err.Error())
  34. return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed")
  35. }
  36. outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, msg.UserID, msg.PackageID)
  37. if err = os.MkdirAll(outputDirPath, 0755); err != nil {
  38. logger.WithField("StorageID", msg.StorageID).
  39. Warnf("creating output directory: %s", err.Error())
  40. return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed")
  41. }
  42. tsk := svc.taskManager.StartNew(mytask.NewStorageLoadPackage(msg.UserID, msg.PackageID, outputDirPath))
  43. return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID()))
  44. }
  45. func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*agtmq.WaitStorageLoadPackageResp, *mq.CodeMessage) {
  46. logger.WithField("TaskID", msg.TaskID).Debugf("wait loading package")
  47. tsk := svc.taskManager.FindByID(msg.TaskID)
  48. if tsk == nil {
  49. return nil, mq.Failed(errorcode.TaskNotFound, "task not found")
  50. }
  51. if msg.WaitTimeoutMs == 0 {
  52. tsk.Wait()
  53. errMsg := ""
  54. if tsk.Error() != nil {
  55. errMsg = tsk.Error().Error()
  56. }
  57. loadTsk := tsk.Body().(*mytask.StorageLoadPackage)
  58. return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath))
  59. } else {
  60. if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
  61. errMsg := ""
  62. if tsk.Error() != nil {
  63. errMsg = tsk.Error().Error()
  64. }
  65. loadTsk := tsk.Body().(*mytask.StorageLoadPackage)
  66. return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath))
  67. }
  68. return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "", ""))
  69. }
  70. }
  71. func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) {
  72. infos, err := os.ReadDir(msg.Directory)
  73. if err != nil {
  74. logger.Warnf("list storage directory failed, err: %s", err.Error())
  75. return mq.ReplyOK(agtmq.NewStorageCheckResp(
  76. err.Error(),
  77. nil,
  78. ))
  79. }
  80. var stgPkgs []model.StoragePackage
  81. userDirs := lo.Filter(infos, func(info fs.DirEntry, index int) bool { return info.IsDir() })
  82. for _, dir := range userDirs {
  83. userIDInt, err := strconv.ParseInt(dir.Name(), 10, 64)
  84. if err != nil {
  85. logger.Warnf("parsing user id %s: %s", dir.Name(), err.Error())
  86. continue
  87. }
  88. pkgDir := utils.MakeStorageLoadDirectory(msg.Directory, dir.Name())
  89. pkgDirs, err := os.ReadDir(pkgDir)
  90. if err != nil {
  91. logger.Warnf("reading package dir %s: %s", pkgDir, err.Error())
  92. continue
  93. }
  94. for _, pkg := range pkgDirs {
  95. pkgIDInt, err := strconv.ParseInt(pkg.Name(), 10, 64)
  96. if err != nil {
  97. logger.Warnf("parsing package dir %s: %s", pkg.Name(), err.Error())
  98. continue
  99. }
  100. stgPkgs = append(stgPkgs, model.StoragePackage{
  101. StorageID: msg.StorageID,
  102. PackageID: cdssdk.PackageID(pkgIDInt),
  103. UserID: cdssdk.UserID(userIDInt),
  104. })
  105. }
  106. }
  107. return mq.ReplyOK(agtmq.NewStorageCheckResp(consts.StorageDirectoryStateOK, stgPkgs))
  108. }
  109. func (svc *Service) StorageGC(msg *agtmq.StorageGC) (*agtmq.StorageGCResp, *mq.CodeMessage) {
  110. infos, err := os.ReadDir(msg.Directory)
  111. if err != nil {
  112. logger.Warnf("list storage directory failed, err: %s", err.Error())
  113. return nil, mq.Failed(errorcode.OperationFailed, "list directory files failed")
  114. }
  115. // userID->pkgID->pkg
  116. userPkgs := make(map[string]map[string]bool)
  117. for _, pkg := range msg.Packages {
  118. userIDStr := fmt.Sprintf("%d", pkg.UserID)
  119. pkgs, ok := userPkgs[userIDStr]
  120. if !ok {
  121. pkgs = make(map[string]bool)
  122. userPkgs[userIDStr] = pkgs
  123. }
  124. pkgIDStr := fmt.Sprintf("%d", pkg.PackageID)
  125. pkgs[pkgIDStr] = true
  126. }
  127. userDirs := lo.Filter(infos, func(info fs.DirEntry, index int) bool { return info.IsDir() })
  128. for _, dir := range userDirs {
  129. pkgMap, ok := userPkgs[dir.Name()]
  130. // 第一级目录名是UserID,先删除UserID在StoragePackage表里没出现过的文件夹
  131. if !ok {
  132. rmPath := filepath.Join(msg.Directory, dir.Name())
  133. err := os.RemoveAll(rmPath)
  134. if err != nil {
  135. logger.Warnf("removing user dir %s: %s", rmPath, err.Error())
  136. } else {
  137. logger.Debugf("user dir %s removed by gc", rmPath)
  138. }
  139. continue
  140. }
  141. pkgDir := utils.MakeStorageLoadDirectory(msg.Directory, dir.Name())
  142. // 遍历每个UserID目录的packages目录里的内容
  143. pkgs, err := os.ReadDir(pkgDir)
  144. if err != nil {
  145. logger.Warnf("reading package dir %s: %s", pkgDir, err.Error())
  146. continue
  147. }
  148. for _, pkg := range pkgs {
  149. if !pkgMap[pkg.Name()] {
  150. rmPath := filepath.Join(pkgDir, pkg.Name())
  151. err := os.RemoveAll(rmPath)
  152. if err != nil {
  153. logger.Warnf("removing package dir %s: %s", rmPath, err.Error())
  154. } else {
  155. logger.Debugf("package dir %s removed by gc", rmPath)
  156. }
  157. }
  158. }
  159. }
  160. return mq.ReplyOK(agtmq.RespStorageGC())
  161. }
  162. func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePackage) (*agtmq.StartStorageCreatePackageResp, *mq.CodeMessage) {
  163. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  164. if err != nil {
  165. logger.Warnf("new coordinator client: %s", err.Error())
  166. return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed")
  167. }
  168. defer stgglb.CoordinatorMQPool.Release(coorCli)
  169. getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
  170. if err != nil {
  171. logger.WithField("StorageID", msg.StorageID).
  172. Warnf("getting storage info: %s", err.Error())
  173. return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed")
  174. }
  175. fullPath := filepath.Clean(filepath.Join(getStgResp.Directory, msg.Path))
  176. var uploadFilePathes []string
  177. err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error {
  178. if err != nil {
  179. return nil
  180. }
  181. if !fi.IsDir() {
  182. uploadFilePathes = append(uploadFilePathes, fname)
  183. }
  184. return nil
  185. })
  186. if err != nil {
  187. logger.Warnf("opening directory %s: %s", fullPath, err.Error())
  188. return nil, mq.Failed(errorcode.OperationFailed, "read directory failed")
  189. }
  190. objIter := iterator.NewUploadingObjectIterator(fullPath, uploadFilePathes)
  191. tsk := svc.taskManager.StartNew(mytask.NewCreatePackage(msg.UserID, msg.BucketID, msg.Name, objIter, msg.NodeAffinity))
  192. return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID()))
  193. }
  194. func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage) (*agtmq.WaitStorageCreatePackageResp, *mq.CodeMessage) {
  195. tsk := svc.taskManager.FindByID(msg.TaskID)
  196. if tsk == nil {
  197. return nil, mq.Failed(errorcode.TaskNotFound, "task not found")
  198. }
  199. if msg.WaitTimeoutMs == 0 {
  200. tsk.Wait()
  201. } else if !tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
  202. return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(false, "", 0))
  203. }
  204. if tsk.Error() != nil {
  205. return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, tsk.Error().Error(), 0))
  206. }
  207. taskBody := tsk.Body().(*mytask.CreatePackage)
  208. return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", taskBody.Result.PackageID))
  209. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。