You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

storage.go 9.0 kB

2 years ago

  1. package cmd
  2. import (
  3. "fmt"
  4. "io/fs"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "time"
  10. "github.com/samber/lo"
  11. "gitlink.org.cn/cloudream/agent/internal/config"
  12. "gitlink.org.cn/cloudream/agent/internal/task"
  13. "gitlink.org.cn/cloudream/common/consts"
  14. "gitlink.org.cn/cloudream/common/pkg/logger"
  15. log "gitlink.org.cn/cloudream/common/pkg/logger"
  16. "gitlink.org.cn/cloudream/common/utils"
  17. "gitlink.org.cn/cloudream/common/utils/serder"
  18. "gitlink.org.cn/cloudream/ec"
  19. "gitlink.org.cn/cloudream/common/consts/errorcode"
  20. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  21. agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
  22. )
  23. func (service *Service) StartMovingObjectToStorage(msg *agtmsg.StartMovingObjectToStorage) (*agtmsg.StartMovingObjectToStorageResp, *ramsg.CodeMessage) {
  24. outFileName := utils.MakeMoveOperationFileName(msg.ObjectID, msg.UserID)
  25. outFilePath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory, outFileName)
  26. if msg.Redundancy == consts.REDUNDANCY_REP {
  27. taskID, err := service.moveRepObject(msg, outFilePath)
  28. if err != nil {
  29. logger.Warnf("move rep object as %s failed, err: %s", outFilePath, err.Error())
  30. return ramsg.ReplyFailed[agtmsg.StartMovingObjectToStorageResp](errorcode.OPERATION_FAILED, "move rep object failed")
  31. }
  32. return ramsg.ReplyOK(agtmsg.NewStartMovingObjectToStorageResp(taskID))
  33. } else {
  34. // TODO 处理其他备份类型
  35. return ramsg.ReplyFailed[agtmsg.StartMovingObjectToStorageResp](errorcode.OPERATION_FAILED, "not implement yet!")
  36. }
  37. }
  38. func (svc *Service) moveRepObject(msg *agtmsg.StartMovingObjectToStorage, outFilePath string) (string, error) {
  39. var repInfo ramsg.ObjectRepInfo
  40. err := serder.MapToObject(msg.RedundancyData.(map[string]any), &repInfo)
  41. if err != nil {
  42. return "", fmt.Errorf("redundancy data to rep info failed, err: %w", err)
  43. }
  44. tsk := svc.taskManager.StartComparable(task.NewIPFSRead(repInfo.FileHash, outFilePath))
  45. return tsk.ID(), nil
  46. }
  47. func (svc *Service) WaitMovingObject(msg *agtmsg.WaitMovingObject) (*agtmsg.WaitMovingObjectResp, *ramsg.CodeMessage) {
  48. log.WithField("TaskID", msg.TaskID).Debugf("wait moving object")
  49. tsk := svc.taskManager.FindByID(msg.TaskID)
  50. if tsk == nil {
  51. return ramsg.ReplyFailed[agtmsg.WaitMovingObjectResp](errorcode.TASK_NOT_FOUND, "task not found")
  52. }
  53. if msg.WaitTimeoutMs == 0 {
  54. tsk.Wait()
  55. errMsg := ""
  56. if tsk.Error() != nil {
  57. errMsg = tsk.Error().Error()
  58. }
  59. return ramsg.ReplyOK(agtmsg.NewWaitMovingObjectResp(true, errMsg))
  60. } else {
  61. if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {
  62. errMsg := ""
  63. if tsk.Error() != nil {
  64. errMsg = tsk.Error().Error()
  65. }
  66. return ramsg.ReplyOK(agtmsg.NewWaitMovingObjectResp(true, errMsg))
  67. }
  68. return ramsg.ReplyOK(agtmsg.NewWaitMovingObjectResp(false, ""))
  69. }
  70. }
  71. func (svc *Service) CheckStorage(msg *agtmsg.CheckStorage) (*agtmsg.CheckStorageResp, *ramsg.CodeMessage) {
  72. dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory)
  73. infos, err := ioutil.ReadDir(dirFullPath)
  74. if err != nil {
  75. logger.Warnf("list storage directory failed, err: %s", err.Error())
  76. return ramsg.ReplyOK(agtmsg.NewCheckStorageResp(
  77. err.Error(),
  78. nil,
  79. ))
  80. }
  81. fileInfos := lo.Filter(infos, func(info fs.FileInfo, index int) bool { return !info.IsDir() })
  82. if msg.IsComplete {
  83. return svc.checkStorageComplete(msg, fileInfos)
  84. } else {
  85. return svc.checkStorageIncrement(msg, fileInfos)
  86. }
  87. }
  88. func (svc *Service) checkStorageIncrement(msg *agtmsg.CheckStorage, fileInfos []fs.FileInfo) (*agtmsg.CheckStorageResp, *ramsg.CodeMessage) {
  89. infosMap := make(map[string]fs.FileInfo)
  90. for _, info := range fileInfos {
  91. infosMap[info.Name()] = info
  92. }
  93. var entries []agtmsg.CheckStorageRespEntry
  94. for _, obj := range msg.Objects {
  95. fileName := utils.MakeMoveOperationFileName(obj.ObjectID, obj.UserID)
  96. _, ok := infosMap[fileName]
  97. if ok {
  98. // 不需要做处理
  99. // 删除map中的记录,表示此记录已被检查过
  100. delete(infosMap, fileName)
  101. } else {
  102. // 只要文件不存在,就删除StorageObject表中的记录
  103. entries = append(entries, agtmsg.NewCheckStorageRespEntry(obj.ObjectID, obj.UserID, agtmsg.CHECK_STORAGE_RESP_OP_DELETE))
  104. }
  105. }
  106. // 增量情况下,不需要对infosMap中没检查的记录进行处理
  107. return ramsg.ReplyOK(agtmsg.NewCheckStorageResp(consts.STORAGE_DIRECTORY_STATE_OK, entries))
  108. }
  109. func (svc *Service) checkStorageComplete(msg *agtmsg.CheckStorage, fileInfos []fs.FileInfo) (*agtmsg.CheckStorageResp, *ramsg.CodeMessage) {
  110. infosMap := make(map[string]fs.FileInfo)
  111. for _, info := range fileInfos {
  112. infosMap[info.Name()] = info
  113. }
  114. var entries []agtmsg.CheckStorageRespEntry
  115. for _, obj := range msg.Objects {
  116. fileName := utils.MakeMoveOperationFileName(obj.ObjectID, obj.UserID)
  117. _, ok := infosMap[fileName]
  118. if ok {
  119. // 不需要做处理
  120. // 删除map中的记录,表示此记录已被检查过
  121. delete(infosMap, fileName)
  122. } else {
  123. // 只要文件不存在,就删除StorageObject表中的记录
  124. entries = append(entries, agtmsg.NewCheckStorageRespEntry(obj.ObjectID, obj.UserID, agtmsg.CHECK_STORAGE_RESP_OP_DELETE))
  125. }
  126. }
  127. // Storage中多出来的文件不做处理
  128. return ramsg.ReplyOK(agtmsg.NewCheckStorageResp(consts.STORAGE_DIRECTORY_STATE_OK, entries))
  129. }
  130. /*
  131. func (service *Service) ECMove(msg *agtmsg.ECMoveCommand) *agtmsg.StartMovingObjectToStorageResp {
  132. panic("not implement yet!")
  133. wg := sync.WaitGroup{}
  134. fmt.Println("EcMove")
  135. fmt.Println(msg.Hashs)
  136. hashs := msg.Hashs
  137. fileSize := msg.FileSize
  138. blockIds := msg.IDs
  139. ecName := msg.ECName
  140. goalName := msg.BucketName + ":" + msg.ObjectName + ":" + strconv.Itoa(msg.UserID)
  141. ecPolicies := *utils.GetEcPolicy()
  142. ecPolicy := ecPolicies[ecName]
  143. ecK := ecPolicy.GetK()
  144. ecN := ecPolicy.GetN()
  145. numPacket := (fileSize + int64(ecK)*int64(config.Cfg().GRCPPacketSize) - 1) / (int64(ecK) * int64(config.Cfg().GRCPPacketSize))
  146. getBufs := make([]chan []byte, ecN)
  147. decodeBufs := make([]chan []byte, ecK)
  148. for i := 0; i < ecN; i++ {
  149. getBufs[i] = make(chan []byte)
  150. }
  151. for i := 0; i < ecK; i++ {
  152. decodeBufs[i] = make(chan []byte)
  153. }
  154. wg.Add(1)
  155. //执行调度操作
  156. // TODO 这一块需要改写以适配IPFS流式读取
  157. for i := 0; i < len(blockIds); i++ {
  158. go service.get(hashs[i], getBufs[blockIds[i]], numPacket)
  159. }
  160. go decode(getBufs[:], decodeBufs[:], blockIds, ecK, numPacket)
  161. // TODO 写入的文件路径需要带上msg中的Directory字段,参考RepMove
  162. go persist(decodeBufs[:], numPacket, goalName, &wg)
  163. wg.Wait()
  164. //向coor报告临时缓存hash
  165. coorClient, err := racli.NewCoordinatorClient()
  166. if err != nil {
  167. // TODO 日志
  168. return ramsg.NewAgentMoveRespFailed(errorcode.OPERATION_FAILED, fmt.Sprintf("create coordinator client failed"))
  169. }
  170. defer coorClient.Close()
  171. coorClient.TempCacheReport(NodeID, hashs)
  172. return ramsg.NewAgentMoveRespOK()
  173. }
  174. */
  175. func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) {
  176. fmt.Println("decode ")
  177. var tmpIn [][]byte
  178. var zeroPkt []byte
  179. tmpIn = make([][]byte, len(inBufs))
  180. hasBlock := map[int]bool{}
  181. for j := 0; j < len(blockSeq); j++ {
  182. hasBlock[blockSeq[j]] = true
  183. }
  184. needRepair := false //检测是否传入了所有数据块
  185. for j := 0; j < len(outBufs); j++ {
  186. if blockSeq[j] != j {
  187. needRepair = true
  188. }
  189. }
  190. enc := ec.NewRsEnc(ecK, len(inBufs))
  191. for i := 0; int64(i) < numPacket; i++ {
  192. for j := 0; j < len(inBufs); j++ { //3
  193. if hasBlock[j] {
  194. tmpIn[j] = <-inBufs[j]
  195. } else {
  196. tmpIn[j] = zeroPkt
  197. }
  198. }
  199. fmt.Printf("%v", tmpIn)
  200. if needRepair {
  201. err := enc.Repair(tmpIn)
  202. print("&&&&&")
  203. if err != nil {
  204. fmt.Fprintf(os.Stderr, "Decode Repair Error: %s", err.Error())
  205. }
  206. }
  207. //fmt.Printf("%v",tmpIn)
  208. for j := 0; j < len(outBufs); j++ { //1,2,3//示意,需要调用纠删码编解码引擎: tmp[k] = tmp[k]+(tmpIn[w][k]*coefs[w][j])
  209. outBufs[j] <- tmpIn[j]
  210. }
  211. }
  212. fmt.Println("decode over")
  213. for i := 0; i < len(outBufs); i++ {
  214. close(outBufs[i])
  215. }
  216. }
  217. func (service *Service) get(blockHash string, getBuf chan []byte, numPacket int64) {
  218. /*
  219. data := CatIPFS(blockHash)
  220. for i := 0; int64(i) < numPacket; i++ {
  221. buf := []byte(data[i*config.Cfg().GRCPPacketSize : i*config.Cfg().GRCPPacketSize+config.Cfg().GRCPPacketSize])
  222. getBuf <- buf
  223. }
  224. close(getBuf)
  225. */
  226. }
  227. func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) {
  228. //这里的localFilePath应该是要写入的filename
  229. fDir, err := os.Executable()
  230. if err != nil {
  231. panic(err)
  232. }
  233. fURL := filepath.Join(filepath.Dir(fDir), "assets3")
  234. _, err = os.Stat(fURL)
  235. if os.IsNotExist(err) {
  236. os.MkdirAll(fURL, os.ModePerm)
  237. }
  238. file, err := os.Create(filepath.Join(fURL, localFilePath))
  239. if err != nil {
  240. return
  241. }
  242. for i := 0; int64(i) < numPacket; i++ {
  243. for j := 0; j < len(inBuf); j++ {
  244. tmp := <-inBuf[j]
  245. fmt.Println(tmp)
  246. file.Write(tmp)
  247. }
  248. }
  249. file.Close()
  250. wg.Done()
  251. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。