You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

upload_rep_objects.go 8.4 kB


  1. package task
  2. import (
  3. "fmt"
  4. "io"
  5. "math/rand"
  6. "time"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/agent/internal/config"
  9. "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder"
  10. "gitlink.org.cn/cloudream/common/pkg/logger"
  11. "gitlink.org.cn/cloudream/common/utils"
  12. mygrpc "gitlink.org.cn/cloudream/common/utils/grpc"
  13. "gitlink.org.cn/cloudream/common/utils/ipfs"
  14. agentcaller "gitlink.org.cn/cloudream/proto"
  15. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  16. coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
  17. "google.golang.org/grpc"
  18. "google.golang.org/grpc/credentials/insecure"
  19. )
  20. // UploadObjects和UploadRepResults为一一对应关系
  21. type UploadRepObjects struct {
  22. userID int64
  23. bucketID int64
  24. repCount int
  25. Objects []UploadObject
  26. Results []UploadSingleRepObjectResult
  27. IsUploading bool
  28. }
  29. type UploadRepObjectsResult struct {
  30. Objects []UploadObject
  31. Results []UploadSingleRepObjectResult
  32. IsUploading bool
  33. }
  34. type UploadObject struct {
  35. ObjectName string
  36. File io.ReadCloser
  37. FileSize int64
  38. }
  39. type UploadSingleRepObjectResult struct {
  40. Error error
  41. FileHash string
  42. ObjectID int64
  43. }
  44. func NewUploadRepObjects(userID int64, bucketID int64, uploadObjects []UploadObject, repCount int) *UploadRepObjects {
  45. return &UploadRepObjects{
  46. userID: userID,
  47. bucketID: bucketID,
  48. Objects: uploadObjects,
  49. repCount: repCount,
  50. }
  51. }
  52. func (t *UploadRepObjects) Execute(ctx TaskContext, complete CompleteFn) {
  53. log := logger.WithType[UploadRepObjects]("Task")
  54. log.Debugf("begin with %v", logger.FormatStruct(t))
  55. defer log.Debugf("end")
  56. err := t.do(ctx)
  57. complete(err, CompleteOption{
  58. RemovingDelay: time.Minute,
  59. })
  60. for _, obj := range t.Objects {
  61. obj.File.Close()
  62. }
  63. }
  64. func (t *UploadRepObjects) do(ctx TaskContext) error {
  65. reqBlder := reqbuilder.NewBuilder()
  66. for _, uploadObject := range t.Objects {
  67. reqBlder.Metadata().
  68. // 用于防止创建了多个同名对象
  69. Object().CreateOne(t.bucketID, uploadObject.ObjectName)
  70. }
  71. mutex, err := reqBlder.
  72. Metadata().
  73. // 用于判断用户是否有桶的权限
  74. UserBucket().ReadOne(t.userID, t.bucketID).
  75. // 用于查询可用的上传节点
  76. Node().ReadAny().
  77. // 用于设置Rep配置
  78. ObjectRep().CreateAny().
  79. // 用于创建Cache记录
  80. Cache().CreateAny().
  81. MutexLock(ctx.DistLock)
  82. if err != nil {
  83. return fmt.Errorf("acquire locks failed, err: %w", err)
  84. }
  85. defer mutex.Unlock()
  86. var repWriteResps []*coormsg.PreUploadResp
  87. //判断是否所有文件都符合上传条件
  88. hasFailure := true
  89. for i := 0; i < len(t.Objects); i++ {
  90. repWriteResp, err := t.preUploadSingleObject(ctx, t.Objects[i])
  91. if err != nil {
  92. hasFailure = false
  93. t.Results = append(t.Results,
  94. UploadSingleRepObjectResult{
  95. Error: err,
  96. FileHash: "",
  97. ObjectID: 0,
  98. })
  99. continue
  100. }
  101. t.Results = append(t.Results, UploadSingleRepObjectResult{})
  102. repWriteResps = append(repWriteResps, repWriteResp)
  103. }
  104. // 不满足上传条件,返回各文件检查结果
  105. if !hasFailure {
  106. return nil
  107. }
  108. //上传文件夹
  109. t.IsUploading = true
  110. for i := 0; i < len(repWriteResps); i++ {
  111. objectID, fileHash, err := t.uploadSingleObject(ctx, t.Objects[i], repWriteResps[i])
  112. // 记录文件上传结果
  113. t.Results[i] = UploadSingleRepObjectResult{
  114. Error: err,
  115. FileHash: fileHash,
  116. ObjectID: objectID,
  117. }
  118. }
  119. return nil
  120. }
  121. // 检查单个文件是否能够上传
  122. func (t *UploadRepObjects) preUploadSingleObject(ctx TaskContext, uploadObject UploadObject) (*coormsg.PreUploadResp, error) {
  123. //发送写请求,请求Coor分配写入节点Ip
  124. repWriteResp, err := ctx.Coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.userID, config.Cfg().ExternalIP))
  125. if err != nil {
  126. return nil, fmt.Errorf("pre upload rep object: %w", err)
  127. }
  128. if len(repWriteResp.Nodes) == 0 {
  129. return nil, fmt.Errorf("no node to upload file")
  130. }
  131. return repWriteResp, nil
  132. }
  133. // 上传文件
  134. func (t *UploadRepObjects) uploadSingleObject(ctx TaskContext, uploadObject UploadObject, preResp *coormsg.PreUploadResp) (int64, string, error) {
  135. var fileHash string
  136. uploadedNodeIDs := []int64{}
  137. willUploadToNode := true
  138. // 因为本地的IPFS属于调度系统的一部分,所以需要加锁
  139. mutex, err := reqbuilder.NewBuilder().
  140. IPFS().CreateAnyRep(config.Cfg().ID).
  141. MutexLock(ctx.DistLock)
  142. if err != nil {
  143. return 0, "", fmt.Errorf("acquiring locks: %w", err)
  144. }
  145. fileHash, err = uploadToLocalIPFS(ctx.IPFS, uploadObject.File)
  146. if err != nil {
  147. // 上传失败,则立刻解锁
  148. mutex.Unlock()
  149. logger.Warnf("uploading to local IPFS: %s, will select a node to upload", err.Error())
  150. } else {
  151. willUploadToNode = false
  152. uploadedNodeIDs = append(uploadedNodeIDs, config.Cfg().ID)
  153. // 上传成功,则等到所有操作结束后才能解锁
  154. defer mutex.Unlock()
  155. }
  156. // 本地IPFS失败,则发送到agent上传
  157. if willUploadToNode {
  158. // 本地IPFS已经失败,所以不要再选择当前节点了
  159. uploadNode := t.chooseUploadNode(lo.Reject(preResp.Nodes, func(item ramsg.RespNode, index int) bool { return item.ID == config.Cfg().ID }))
  160. // 如果客户端与节点在同一个地域,则使用内网地址连接节点
  161. nodeIP := uploadNode.ExternalIP
  162. if uploadNode.IsSameLocation {
  163. nodeIP = uploadNode.LocalIP
  164. logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.ID)
  165. }
  166. mutex, err := reqbuilder.NewBuilder().
  167. // 防止上传的副本被清除
  168. IPFS().CreateAnyRep(uploadNode.ID).
  169. MutexLock(ctx.DistLock)
  170. if err != nil {
  171. return 0, "", fmt.Errorf("acquire locks failed, err: %w", err)
  172. }
  173. defer mutex.Unlock()
  174. fileHash, err = uploadToNode(uploadObject.File, nodeIP)
  175. if err != nil {
  176. return 0, "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
  177. }
  178. uploadedNodeIDs = append(uploadedNodeIDs, uploadNode.ID)
  179. }
  180. dirName := utils.GetDirectoryName(uploadObject.ObjectName)
  181. // 记录写入的文件的Hash
  182. createResp, err := ctx.Coordinator.CreateRepObject(coormsg.NewCreateRepObject(t.bucketID, uploadObject.ObjectName, uploadObject.FileSize, t.repCount, t.userID, uploadedNodeIDs, fileHash, dirName))
  183. if err != nil {
  184. return 0, "", fmt.Errorf("creating rep object: %w", err)
  185. }
  186. return createResp.ObjectID, fileHash, nil
  187. }
  188. // chooseUploadNode 选择一个上传文件的节点
  189. // 1. 从与当前客户端相同地域的节点中随机选一个
  190. // 2. 没有用的话从所有节点中随机选一个
  191. func (t *UploadRepObjects) chooseUploadNode(nodes []ramsg.RespNode) ramsg.RespNode {
  192. sameLocationNodes := lo.Filter(nodes, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation })
  193. if len(sameLocationNodes) > 0 {
  194. return sameLocationNodes[rand.Intn(len(sameLocationNodes))]
  195. }
  196. return nodes[rand.Intn(len(nodes))]
  197. }
  198. func uploadToNode(file io.ReadCloser, nodeIP string) (string, error) {
  199. // 建立grpc连接,发送请求
  200. grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort)
  201. grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
  202. if err != nil {
  203. return "", fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
  204. }
  205. defer grpcCon.Close()
  206. client := agentcaller.NewFileTransportClient(grpcCon)
  207. upload, err := mygrpc.SendFileAsStream(client)
  208. if err != nil {
  209. return "", fmt.Errorf("request to send file failed, err: %w", err)
  210. }
  211. // 发送文件数据
  212. _, err = io.Copy(upload, file)
  213. if err != nil {
  214. // 发生错误则关闭连接
  215. upload.Abort(io.ErrClosedPipe)
  216. return "", fmt.Errorf("copy file date to upload stream failed, err: %w", err)
  217. }
  218. // 发送EOF消息,并获得FileHash
  219. fileHash, err := upload.Finish()
  220. if err != nil {
  221. upload.Abort(io.ErrClosedPipe)
  222. return "", fmt.Errorf("send EOF failed, err: %w", err)
  223. }
  224. return fileHash, nil
  225. }
  226. func uploadToLocalIPFS(ipfs *ipfs.IPFS, file io.ReadCloser) (string, error) {
  227. // 从本地IPFS上传文件
  228. writer, err := ipfs.CreateFile()
  229. if err != nil {
  230. return "", fmt.Errorf("create IPFS file failed, err: %w", err)
  231. }
  232. _, err = io.Copy(writer, file)
  233. if err != nil {
  234. return "", fmt.Errorf("copy file data to IPFS failed, err: %w", err)
  235. }
  236. fileHash, err := writer.Finish()
  237. if err != nil {
  238. return "", fmt.Errorf("finish writing IPFS failed, err: %w", err)
  239. }
  240. return fileHash, nil
  241. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。