You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 12 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. package services
  2. import (
  3. "database/sql"
  4. "errors"
  5. "github.com/samber/lo"
  6. "gitlink.org.cn/cloudream/common/consts"
  7. "gitlink.org.cn/cloudream/common/consts/errorcode"
  8. "gitlink.org.cn/cloudream/common/utils/logger"
  9. log "gitlink.org.cn/cloudream/common/utils/logger"
  10. "gitlink.org.cn/cloudream/db/model"
  11. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  12. coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
  13. scmsg "gitlink.org.cn/cloudream/rabbitmq/message/scanner"
  14. scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
  15. )
  16. func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.PreDownloadObjectResp {
  17. // 查询文件对象
  18. object, err := svc.db.Object().GetUserObject(msg.Body.UserID, msg.Body.ObjectID)
  19. if err != nil {
  20. log.WithField("ObjectID", msg.Body.ObjectID).
  21. Warnf("query Object failed, err: %s", err.Error())
  22. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Object failed")
  23. }
  24. // 查询客户端所属节点
  25. belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP)
  26. if err != nil {
  27. log.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
  28. Warnf("query client belong node failed, err: %s", err.Error())
  29. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  30. }
  31. log.Debugf("client address %s is at location %d", msg.Body.ClientExternalIP, belongNode.LocationID)
  32. //-若redundancy是rep,查询对象副本表, 获得FileHash
  33. if object.Redundancy == consts.REDUNDANCY_REP {
  34. objectRep, err := svc.db.ObjectRep().GetObjectRep(object.ObjectID)
  35. if err != nil {
  36. log.WithField("ObjectID", object.ObjectID).
  37. Warnf("get ObjectRep failed, err: %s", err.Error())
  38. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query ObjectRep failed")
  39. }
  40. // 注:由于采用了IPFS存储,因此每个备份文件的FileHash都是一样的
  41. nodes, err := svc.db.Cache().FindCachingFileUserNodes(msg.Body.UserID, objectRep.FileHash)
  42. if err != nil {
  43. log.WithField("FileHash", objectRep.FileHash).
  44. Warnf("query Cache failed, err: %s", err.Error())
  45. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Cache failed")
  46. }
  47. var respNodes []ramsg.RespNode
  48. for _, node := range nodes {
  49. respNodes = append(respNodes, ramsg.NewRespNode(
  50. node.NodeID,
  51. node.ExternalIP,
  52. node.LocalIP,
  53. // LocationID 相同则认为是在同一个地域
  54. belongNode.LocationID == node.LocationID,
  55. ))
  56. }
  57. return ramsg.ReplyOK(coormsg.NewPreDownloadObjectRespBody(
  58. object.Redundancy,
  59. object.FileSize,
  60. ramsg.NewRespObjectRepInfo(objectRep.FileHash, respNodes),
  61. ))
  62. } else {
  63. // TODO 参考上面进行重写
  64. /*blocks, err := svc.db.QueryObjectBlock(object.ObjectID)
  65. if err != nil {
  66. log.WithField("ObjectID", object.ObjectID).
  67. Warnf("query Object Block failed, err: %s", err.Error())
  68. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "query Object Block failed")
  69. }
  70. ecPolicies := *utils.GetEcPolicy()
  71. ecPolicy := ecPolicies[*object.ECName]
  72. ecN := ecPolicy.GetN()
  73. ecK := ecPolicy.GetK()
  74. nodeIPs = make([]string, ecN)
  75. hashes = make([]string, ecN)
  76. for _, tt := range blocks {
  77. id := tt.InnerID
  78. hash := tt.BlockHash
  79. hashes[id] = hash //这里有问题,采取的其实是直接顺序读的方式,等待加入自适应读模块
  80. nodes, err := svc.db.QueryCacheNodeByBlockHash(hash)
  81. if err != nil {
  82. log.WithField("BlockHash", hash).
  83. Warnf("query Cache failed, err: %s", err.Error())
  84. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "query Cache failed")
  85. }
  86. if len(nodes) == 0 {
  87. log.WithField("BlockHash", hash).
  88. Warnf("No node cache the block data for the BlockHash")
  89. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "No node cache the block data for the BlockHash")
  90. }
  91. nodeIPs[id] = nodes[0].IP
  92. }
  93. //这里也有和上面一样的问题
  94. for i := 1; i < ecK; i++ {
  95. blockIDs = append(blockIDs, i)
  96. }*/
  97. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "not implement yet!")
  98. }
  99. }
  100. func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg.PreUploadResp {
  101. // 判断同名对象是否存在。等到UploadRepObject时再判断一次。
  102. // 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果
  103. isBucketAvai, err := svc.db.Bucket().IsAvailable(msg.Body.BucketID, msg.Body.UserID)
  104. if err != nil {
  105. log.WithField("BucketID", msg.Body.BucketID).
  106. Warnf("check bucket available failed, err: %s", err.Error())
  107. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "check bucket available failed")
  108. }
  109. if !isBucketAvai {
  110. log.WithField("BucketID", msg.Body.BucketID).
  111. Warnf("bucket is not available to user")
  112. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "bucket is not available to user")
  113. }
  114. _, err = svc.db.Object().GetByName(msg.Body.BucketID, msg.Body.ObjectName)
  115. if err == nil {
  116. log.WithField("BucketID", msg.Body.BucketID).
  117. WithField("ObjectName", msg.Body.ObjectName).
  118. Warnf("object with given Name and BucketID already exists")
  119. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "object with given Name and BucketID already exists")
  120. }
  121. if !errors.Is(err, sql.ErrNoRows) {
  122. log.WithField("BucketID", msg.Body.BucketID).
  123. WithField("ObjectName", msg.Body.ObjectName).
  124. Warnf("get object by name failed, err: %s", err.Error())
  125. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "get object by name failed")
  126. }
  127. //查询用户可用的节点IP
  128. nodes, err := svc.db.Node().GetUserNodes(msg.Body.UserID)
  129. if err != nil {
  130. log.WithField("UserID", msg.Body.UserID).
  131. Warnf("query user nodes failed, err: %s", err.Error())
  132. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query user nodes failed")
  133. }
  134. // 查询客户端所属节点
  135. belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP)
  136. if err != nil {
  137. log.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
  138. Warnf("query client belong node failed, err: %s", err.Error())
  139. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  140. }
  141. var respNodes []ramsg.RespNode
  142. for _, node := range nodes {
  143. respNodes = append(respNodes, ramsg.NewRespNode(
  144. node.NodeID,
  145. node.ExternalIP,
  146. node.LocalIP,
  147. // LocationID 相同则认为是在同一个地域
  148. belongNode.LocationID == node.LocationID,
  149. ))
  150. }
  151. return ramsg.ReplyOK(coormsg.NewPreUploadRespBody(respNodes))
  152. }
  153. func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) *coormsg.CreateObjectResp {
  154. _, err := svc.db.Object().CreateRepObject(msg.Body.BucketID, msg.Body.ObjectName, msg.Body.FileSize, msg.Body.RepCount, msg.Body.NodeIDs, msg.Body.FileHash)
  155. if err != nil {
  156. log.WithField("BucketName", msg.Body.BucketID).
  157. WithField("ObjectName", msg.Body.ObjectName).
  158. Warnf("create rep object failed, err: %s", err.Error())
  159. return ramsg.ReplyFailed[coormsg.CreateObjectResp](errorcode.OPERATION_FAILED, "create rep object failed")
  160. }
  161. // 紧急任务
  162. svc.scanner.PostEvent(scmsg.NewPostEventBody(scevt.NewCheckRepCount([]string{msg.Body.FileHash}), true, true))
  163. return ramsg.ReplyOK(coormsg.NewCreateObjectRespBody())
  164. }
  165. func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg.PreUpdateRepObjectResp {
  166. // 获取对象信息
  167. obj, err := svc.db.Object().GetByID(msg.Body.ObjectID)
  168. if err != nil {
  169. log.WithField("ObjectID", msg.Body.ObjectID).
  170. Warnf("get object failed, err: %s", err.Error())
  171. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object failed")
  172. }
  173. if obj.Redundancy != consts.REDUNDANCY_REP {
  174. log.WithField("ObjectID", msg.Body.ObjectID).
  175. Warnf("this object is not a rep object")
  176. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "this object is not a rep object")
  177. }
  178. // 获取对象Rep信息
  179. objRep, err := svc.db.ObjectRep().GetObjectRep(msg.Body.ObjectID)
  180. if err != nil {
  181. log.WithField("ObjectID", msg.Body.ObjectID).
  182. Warnf("get object rep failed, err: %s", err.Error())
  183. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object rep failed")
  184. }
  185. //查询用户可用的节点IP
  186. nodes, err := svc.db.Node().GetUserNodes(msg.Body.UserID)
  187. if err != nil {
  188. log.WithField("UserID", msg.Body.UserID).
  189. Warnf("query user nodes failed, err: %s", err.Error())
  190. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query user nodes failed")
  191. }
  192. // 查询客户端所属节点
  193. belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP)
  194. if err != nil {
  195. log.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
  196. Warnf("query client belong node failed, err: %s", err.Error())
  197. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  198. }
  199. // 查询保存了旧文件的节点信息
  200. cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(msg.Body.UserID, objRep.FileHash)
  201. if err != nil {
  202. log.Warnf("find caching file user nodes failed, err: %s", err.Error())
  203. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "find caching file user nodes failed")
  204. }
  205. var retNodes []coormsg.PreUpdateRepObjectRespNode
  206. for _, node := range nodes {
  207. retNodes = append(retNodes, coormsg.NewPreUpdateRepObjectRespNode(
  208. node.NodeID,
  209. node.ExternalIP,
  210. node.LocalIP,
  211. // LocationID 相同则认为是在同一个地域
  212. belongNode.LocationID == node.LocationID,
  213. // 此节点存储了对象旧文件
  214. lo.ContainsBy(cachingNodes, func(n model.Node) bool { return n.NodeID == node.NodeID }),
  215. ))
  216. }
  217. return ramsg.ReplyOK(coormsg.NewPreUpdateRepObjectRespBody(retNodes))
  218. }
  219. func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) *coormsg.UpdateRepObjectResp {
  220. err := svc.db.Object().UpdateRepObject(msg.Body.ObjectID, msg.Body.FileSize, msg.Body.NodeIDs, msg.Body.FileHash)
  221. if err != nil {
  222. log.WithField("ObjectID", msg.Body.ObjectID).
  223. Warnf("update rep object failed, err: %s", err.Error())
  224. return ramsg.ReplyFailed[coormsg.UpdateRepObjectResp](errorcode.OPERATION_FAILED, "update rep object failed")
  225. }
  226. // 紧急任务
  227. svc.scanner.PostEvent(scmsg.NewPostEventBody(scevt.NewCheckRepCount([]string{msg.Body.FileHash}), true, true))
  228. return ramsg.ReplyOK(coormsg.NewUpdateRepObjectRespBody())
  229. }
  230. func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) *coormsg.DeleteObjectResp {
  231. err := svc.db.Object().SoftDelete(msg.Body.UserID, msg.Body.ObjectID)
  232. if err != nil {
  233. log.WithField("UserID", msg.Body.UserID).
  234. WithField("ObjectID", msg.Body.ObjectID).
  235. Warnf("set object deleted failed, err: %s", err.Error())
  236. return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "set object deleted failed")
  237. }
  238. stgs, err := svc.db.StorageObject().FindObjectStorages(svc.db.SQLCtx(), msg.Body.ObjectID)
  239. if err != nil {
  240. logger.Warnf("find object storages failed, but this will not affect the deleting, err: %s", err.Error())
  241. return ramsg.ReplyOK(coormsg.NewDeleteObjectRespBody())
  242. }
  243. // 不追求及时、准确
  244. if len(stgs) == 0 {
  245. // 如果没有被引用,直接投递CheckObject的任务
  246. svc.scanner.PostEvent(scmsg.NewPostEventBody(scevt.NewCheckObject([]int{msg.Body.ObjectID}), false, false))
  247. } else {
  248. // 有引用则让Agent去检查StorageObject
  249. for _, stg := range stgs {
  250. svc.scanner.PostEvent(scmsg.NewPostEventBody(scevt.NewAgentCheckStorage(stg.StorageID, []int{msg.Body.ObjectID}), false, false))
  251. }
  252. }
  253. return ramsg.ReplyOK(coormsg.NewDeleteObjectRespBody())
  254. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。