You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 13 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. package services
  2. import (
  3. "database/sql"
  4. "errors"
  5. "github.com/samber/lo"
  6. "gitlink.org.cn/cloudream/common/consts"
  7. "gitlink.org.cn/cloudream/common/consts/errorcode"
  8. "gitlink.org.cn/cloudream/common/utils/logger"
  9. log "gitlink.org.cn/cloudream/common/utils/logger"
  10. "gitlink.org.cn/cloudream/db/model"
  11. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  12. coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
  13. scmsg "gitlink.org.cn/cloudream/rabbitmq/message/scanner"
  14. scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
  15. )
  16. func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.PreDownloadObjectResp {
  17. // 查询文件对象
  18. object, err := svc.db.Object().GetUserObject(msg.Body.UserID, msg.Body.ObjectID)
  19. if err != nil {
  20. log.WithField("ObjectID", msg.Body.ObjectID).
  21. Warnf("query Object failed, err: %s", err.Error())
  22. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Object failed")
  23. }
  24. // 查询客户端所属节点
  25. belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP)
  26. if err != nil {
  27. log.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
  28. Warnf("query client belong node failed, err: %s", err.Error())
  29. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  30. }
  31. log.Debugf("client address %s is at location %d", msg.Body.ClientExternalIP, belongNode.LocationID)
  32. //-若redundancy是rep,查询对象副本表, 获得FileHash
  33. if object.Redundancy == consts.REDUNDANCY_REP {
  34. objectRep, err := svc.db.ObjectRep().GetObjectRep(object.ObjectID)
  35. if err != nil {
  36. log.WithField("ObjectID", object.ObjectID).
  37. Warnf("get ObjectRep failed, err: %s", err.Error())
  38. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query ObjectRep failed")
  39. }
  40. // 注:由于采用了IPFS存储,因此每个备份文件的FileHash都是一样的
  41. nodes, err := svc.db.Cache().FindCachingFileUserNodes(msg.Body.UserID, objectRep.FileHash)
  42. if err != nil {
  43. log.WithField("FileHash", objectRep.FileHash).
  44. Warnf("query Cache failed, err: %s", err.Error())
  45. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Cache failed")
  46. }
  47. var respNodes []ramsg.RespNode
  48. for _, node := range nodes {
  49. respNodes = append(respNodes, ramsg.NewRespNode(
  50. node.NodeID,
  51. node.ExternalIP,
  52. node.LocalIP,
  53. // LocationID 相同则认为是在同一个地域
  54. belongNode.LocationID == node.LocationID,
  55. ))
  56. }
  57. return ramsg.ReplyOK(coormsg.NewPreDownloadObjectRespBody(
  58. object.Redundancy,
  59. object.FileSize,
  60. ramsg.NewRespObjectRepInfo(objectRep.FileHash, respNodes),
  61. ))
  62. } else {
  63. // TODO 参考上面进行重写
  64. /*blocks, err := svc.db.QueryObjectBlock(object.ObjectID)
  65. if err != nil {
  66. log.WithField("ObjectID", object.ObjectID).
  67. Warnf("query Object Block failed, err: %s", err.Error())
  68. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "query Object Block failed")
  69. }
  70. ecPolicies := *utils.GetEcPolicy()
  71. ecPolicy := ecPolicies[*object.ECName]
  72. ecN := ecPolicy.GetN()
  73. ecK := ecPolicy.GetK()
  74. nodeIPs = make([]string, ecN)
  75. hashes = make([]string, ecN)
  76. for _, tt := range blocks {
  77. id := tt.InnerID
  78. hash := tt.BlockHash
  79. hashes[id] = hash //这里有问题,采取的其实是直接顺序读的方式,等待加入自适应读模块
  80. nodes, err := svc.db.QueryCacheNodeByBlockHash(hash)
  81. if err != nil {
  82. log.WithField("BlockHash", hash).
  83. Warnf("query Cache failed, err: %s", err.Error())
  84. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "query Cache failed")
  85. }
  86. if len(nodes) == 0 {
  87. log.WithField("BlockHash", hash).
  88. Warnf("No node cache the block data for the BlockHash")
  89. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "No node cache the block data for the BlockHash")
  90. }
  91. nodeIPs[id] = nodes[0].IP
  92. }
  93. //这里也有和上面一样的问题
  94. for i := 1; i < ecK; i++ {
  95. blockIDs = append(blockIDs, i)
  96. }*/
  97. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "not implement yet!")
  98. }
  99. }
  100. func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg.PreUploadResp {
  101. // 判断同名对象是否存在。等到UploadRepObject时再判断一次。
  102. // 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果
  103. isBucketAvai, err := svc.db.Bucket().IsAvailable(msg.Body.BucketID, msg.Body.UserID)
  104. if err != nil {
  105. log.WithField("BucketID", msg.Body.BucketID).
  106. Warnf("check bucket available failed, err: %s", err.Error())
  107. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "check bucket available failed")
  108. }
  109. if !isBucketAvai {
  110. log.WithField("BucketID", msg.Body.BucketID).
  111. Warnf("bucket is not available to user")
  112. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "bucket is not available to user")
  113. }
  114. _, err = svc.db.Object().GetByName(msg.Body.BucketID, msg.Body.ObjectName)
  115. if err == nil {
  116. log.WithField("BucketID", msg.Body.BucketID).
  117. WithField("ObjectName", msg.Body.ObjectName).
  118. Warnf("object with given Name and BucketID already exists")
  119. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "object with given Name and BucketID already exists")
  120. }
  121. if !errors.Is(err, sql.ErrNoRows) {
  122. log.WithField("BucketID", msg.Body.BucketID).
  123. WithField("ObjectName", msg.Body.ObjectName).
  124. Warnf("get object by name failed, err: %s", err.Error())
  125. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "get object by name failed")
  126. }
  127. //查询用户可用的节点IP
  128. nodes, err := svc.db.Node().GetUserNodes(msg.Body.UserID)
  129. if err != nil {
  130. log.WithField("UserID", msg.Body.UserID).
  131. Warnf("query user nodes failed, err: %s", err.Error())
  132. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query user nodes failed")
  133. }
  134. // 查询客户端所属节点
  135. belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP)
  136. if err != nil {
  137. log.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
  138. Warnf("query client belong node failed, err: %s", err.Error())
  139. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  140. }
  141. var respNodes []ramsg.RespNode
  142. for _, node := range nodes {
  143. respNodes = append(respNodes, ramsg.NewRespNode(
  144. node.NodeID,
  145. node.ExternalIP,
  146. node.LocalIP,
  147. // LocationID 相同则认为是在同一个地域
  148. belongNode.LocationID == node.LocationID,
  149. ))
  150. }
  151. return ramsg.ReplyOK(coormsg.NewPreUploadRespBody(respNodes))
  152. }
  153. func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) *coormsg.CreateObjectResp {
  154. _, err := svc.db.Object().CreateRepObject(msg.Body.BucketID, msg.Body.ObjectName, msg.Body.FileSize, msg.Body.RepCount, msg.Body.NodeIDs, msg.Body.FileHash)
  155. if err != nil {
  156. log.WithField("BucketName", msg.Body.BucketID).
  157. WithField("ObjectName", msg.Body.ObjectName).
  158. Warnf("create rep object failed, err: %s", err.Error())
  159. return ramsg.ReplyFailed[coormsg.CreateObjectResp](errorcode.OPERATION_FAILED, "create rep object failed")
  160. }
  161. // 紧急任务
  162. evtmsg, err := scmsg.NewPostEventBody(scevt.NewCheckRepCount([]string{msg.Body.FileHash}), true, true)
  163. if err == nil {
  164. svc.scanner.PostEvent(evtmsg)
  165. } else {
  166. logger.Warnf("new post event body failed, but this will not affect creating, err: %s", err.Error())
  167. }
  168. return ramsg.ReplyOK(coormsg.NewCreateObjectRespBody())
  169. }
  170. func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg.PreUpdateRepObjectResp {
  171. // 获取对象信息
  172. obj, err := svc.db.Object().GetByID(msg.Body.ObjectID)
  173. if err != nil {
  174. log.WithField("ObjectID", msg.Body.ObjectID).
  175. Warnf("get object failed, err: %s", err.Error())
  176. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object failed")
  177. }
  178. if obj.Redundancy != consts.REDUNDANCY_REP {
  179. log.WithField("ObjectID", msg.Body.ObjectID).
  180. Warnf("this object is not a rep object")
  181. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "this object is not a rep object")
  182. }
  183. // 获取对象Rep信息
  184. objRep, err := svc.db.ObjectRep().GetObjectRep(msg.Body.ObjectID)
  185. if err != nil {
  186. log.WithField("ObjectID", msg.Body.ObjectID).
  187. Warnf("get object rep failed, err: %s", err.Error())
  188. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object rep failed")
  189. }
  190. //查询用户可用的节点IP
  191. nodes, err := svc.db.Node().GetUserNodes(msg.Body.UserID)
  192. if err != nil {
  193. log.WithField("UserID", msg.Body.UserID).
  194. Warnf("query user nodes failed, err: %s", err.Error())
  195. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query user nodes failed")
  196. }
  197. // 查询客户端所属节点
  198. belongNode, err := svc.db.Node().GetByExternalIP(msg.Body.ClientExternalIP)
  199. if err != nil {
  200. log.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
  201. Warnf("query client belong node failed, err: %s", err.Error())
  202. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  203. }
  204. // 查询保存了旧文件的节点信息
  205. cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(msg.Body.UserID, objRep.FileHash)
  206. if err != nil {
  207. log.Warnf("find caching file user nodes failed, err: %s", err.Error())
  208. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "find caching file user nodes failed")
  209. }
  210. var retNodes []coormsg.PreUpdateRepObjectRespNode
  211. for _, node := range nodes {
  212. retNodes = append(retNodes, coormsg.NewPreUpdateRepObjectRespNode(
  213. node.NodeID,
  214. node.ExternalIP,
  215. node.LocalIP,
  216. // LocationID 相同则认为是在同一个地域
  217. belongNode.LocationID == node.LocationID,
  218. // 此节点存储了对象旧文件
  219. lo.ContainsBy(cachingNodes, func(n model.Node) bool { return n.NodeID == node.NodeID }),
  220. ))
  221. }
  222. return ramsg.ReplyOK(coormsg.NewPreUpdateRepObjectRespBody(retNodes))
  223. }
  224. func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) *coormsg.UpdateRepObjectResp {
  225. err := svc.db.Object().UpdateRepObject(msg.Body.ObjectID, msg.Body.FileSize, msg.Body.NodeIDs, msg.Body.FileHash)
  226. if err != nil {
  227. log.WithField("ObjectID", msg.Body.ObjectID).
  228. Warnf("update rep object failed, err: %s", err.Error())
  229. return ramsg.ReplyFailed[coormsg.UpdateRepObjectResp](errorcode.OPERATION_FAILED, "update rep object failed")
  230. }
  231. // 紧急任务
  232. evtmsg, err := scmsg.NewPostEventBody(scevt.NewCheckRepCount([]string{msg.Body.FileHash}), true, true)
  233. if err == nil {
  234. svc.scanner.PostEvent(evtmsg)
  235. } else {
  236. logger.Warnf("new post event body failed, but this will not affect updating, err: %s", err.Error())
  237. }
  238. return ramsg.ReplyOK(coormsg.NewUpdateRepObjectRespBody())
  239. }
  240. func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) *coormsg.DeleteObjectResp {
  241. err := svc.db.Object().SoftDelete(msg.Body.UserID, msg.Body.ObjectID)
  242. if err != nil {
  243. log.WithField("UserID", msg.Body.UserID).
  244. WithField("ObjectID", msg.Body.ObjectID).
  245. Warnf("set object deleted failed, err: %s", err.Error())
  246. return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "set object deleted failed")
  247. }
  248. stgs, err := svc.db.StorageObject().FindObjectStorages(svc.db.SQLCtx(), msg.Body.ObjectID)
  249. if err != nil {
  250. logger.Warnf("find object storages failed, but this will not affect the deleting, err: %s", err.Error())
  251. return ramsg.ReplyOK(coormsg.NewDeleteObjectRespBody())
  252. }
  253. // 不追求及时、准确
  254. if len(stgs) == 0 {
  255. // 如果没有被引用,直接投递CheckObject的任务
  256. evtmsg, err := scmsg.NewPostEventBody(scevt.NewCheckObject([]int{msg.Body.ObjectID}), false, false)
  257. if err == nil {
  258. svc.scanner.PostEvent(evtmsg)
  259. } else {
  260. logger.Warnf("new post event body failed, but this will not affect deleting, err: %s", err.Error())
  261. }
  262. } else {
  263. // 有引用则让Agent去检查StorageObject
  264. for _, stg := range stgs {
  265. evtmsg, err := scmsg.NewPostEventBody(scevt.NewAgentCheckStorage(stg.StorageID, []int{msg.Body.ObjectID}), false, false)
  266. if err == nil {
  267. svc.scanner.PostEvent(evtmsg)
  268. } else {
  269. logger.Warnf("new post event body failed, but this will not affect deleting, err: %s", err.Error())
  270. }
  271. }
  272. }
  273. return ramsg.ReplyOK(coormsg.NewDeleteObjectRespBody())
  274. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。