You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 14 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago

  1. package services
  2. import (
  3. "database/sql"
  4. "errors"
  5. "github.com/jmoiron/sqlx"
  6. "github.com/samber/lo"
  7. "gitlink.org.cn/cloudream/common/consts"
  8. "gitlink.org.cn/cloudream/common/consts/errorcode"
  9. "gitlink.org.cn/cloudream/common/pkg/logger"
  10. "gitlink.org.cn/cloudream/db/model"
  11. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  12. coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
  13. scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
  14. )
  15. func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) *coormsg.PreDownloadObjectResp {
  16. // 查询文件对象
  17. object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.ObjectID)
  18. if err != nil {
  19. logger.WithField("ObjectID", msg.Body.ObjectID).
  20. Warnf("query Object failed, err: %s", err.Error())
  21. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Object failed")
  22. }
  23. // 查询客户端所属节点
  24. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.Body.ClientExternalIP)
  25. if err != nil {
  26. logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
  27. Warnf("query client belong node failed, err: %s", err.Error())
  28. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  29. }
  30. logger.Debugf("client address %s is at location %d", msg.Body.ClientExternalIP, belongNode.LocationID)
  31. //-若redundancy是rep,查询对象副本表, 获得FileHash
  32. if object.Redundancy == consts.REDUNDANCY_REP {
  33. objectRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), object.ObjectID)
  34. if err != nil {
  35. logger.WithField("ObjectID", object.ObjectID).
  36. Warnf("get ObjectRep failed, err: %s", err.Error())
  37. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query ObjectRep failed")
  38. }
  39. // 注:由于采用了IPFS存储,因此每个备份文件的FileHash都是一样的
  40. nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.Body.UserID, objectRep.FileHash)
  41. if err != nil {
  42. logger.WithField("FileHash", objectRep.FileHash).
  43. Warnf("query Cache failed, err: %s", err.Error())
  44. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Cache failed")
  45. }
  46. var respNodes []ramsg.RespNode
  47. for _, node := range nodes {
  48. respNodes = append(respNodes, ramsg.NewRespNode(
  49. node.NodeID,
  50. node.ExternalIP,
  51. node.LocalIP,
  52. // LocationID 相同则认为是在同一个地域
  53. belongNode.LocationID == node.LocationID,
  54. ))
  55. }
  56. return ramsg.ReplyOK(coormsg.NewPreDownloadObjectRespBody(
  57. object.Redundancy,
  58. object.FileSize,
  59. ramsg.NewRespObjectRepInfo(objectRep.FileHash, respNodes),
  60. ))
  61. } else {
  62. // TODO 参考上面进行重写
  63. /*blocks, err := svc.db.QueryObjectBlock(object.ObjectID)
  64. if err != nil {
  65. log.WithField("ObjectID", object.ObjectID).
  66. Warnf("query Object Block failed, err: %s", err.Error())
  67. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "query Object Block failed")
  68. }
  69. ecPolicies := *utils.GetEcPolicy()
  70. ecPolicy := ecPolicies[*object.ECName]
  71. ecN := ecPolicy.GetN()
  72. ecK := ecPolicy.GetK()
  73. nodeIPs = make([]string, ecN)
  74. hashes = make([]string, ecN)
  75. for _, tt := range blocks {
  76. id := tt.InnerID
  77. hash := tt.BlockHash
  78. hashes[id] = hash //这里有问题,采取的其实是直接顺序读的方式,等待加入自适应读模块
  79. nodes, err := svc.db.QueryCacheNodeByBlockHash(hash)
  80. if err != nil {
  81. log.WithField("BlockHash", hash).
  82. Warnf("query Cache failed, err: %s", err.Error())
  83. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "query Cache failed")
  84. }
  85. if len(nodes) == 0 {
  86. log.WithField("BlockHash", hash).
  87. Warnf("No node cache the block data for the BlockHash")
  88. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "No node cache the block data for the BlockHash")
  89. }
  90. nodeIPs[id] = nodes[0].IP
  91. }
  92. //这里也有和上面一样的问题
  93. for i := 1; i < ecK; i++ {
  94. blockIDs = append(blockIDs, i)
  95. }*/
  96. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "not implement yet!")
  97. }
  98. }
  99. func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) *coormsg.PreUploadResp {
  100. // 判断同名对象是否存在。等到UploadRepObject时再判断一次。
  101. // 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果
  102. isBucketAvai, err := svc.db.Bucket().IsAvailable(svc.db.SQLCtx(), msg.Body.BucketID, msg.Body.UserID)
  103. if err != nil {
  104. logger.WithField("BucketID", msg.Body.BucketID).
  105. Warnf("check bucket available failed, err: %s", err.Error())
  106. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "check bucket available failed")
  107. }
  108. if !isBucketAvai {
  109. logger.WithField("BucketID", msg.Body.BucketID).
  110. Warnf("bucket is not available to user")
  111. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "bucket is not available to user")
  112. }
  113. _, err = svc.db.Object().GetByName(svc.db.SQLCtx(), msg.Body.BucketID, msg.Body.ObjectName)
  114. if err == nil {
  115. logger.WithField("BucketID", msg.Body.BucketID).
  116. WithField("ObjectName", msg.Body.ObjectName).
  117. Warnf("object with given Name and BucketID already exists")
  118. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "object with given Name and BucketID already exists")
  119. }
  120. if !errors.Is(err, sql.ErrNoRows) {
  121. logger.WithField("BucketID", msg.Body.BucketID).
  122. WithField("ObjectName", msg.Body.ObjectName).
  123. Warnf("get object by name failed, err: %s", err.Error())
  124. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "get object by name failed")
  125. }
  126. //查询用户可用的节点IP
  127. nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.Body.UserID)
  128. if err != nil {
  129. logger.WithField("UserID", msg.Body.UserID).
  130. Warnf("query user nodes failed, err: %s", err.Error())
  131. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query user nodes failed")
  132. }
  133. // 查询客户端所属节点
  134. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.Body.ClientExternalIP)
  135. if err != nil {
  136. logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
  137. Warnf("query client belong node failed, err: %s", err.Error())
  138. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  139. }
  140. var respNodes []ramsg.RespNode
  141. for _, node := range nodes {
  142. respNodes = append(respNodes, ramsg.NewRespNode(
  143. node.NodeID,
  144. node.ExternalIP,
  145. node.LocalIP,
  146. // LocationID 相同则认为是在同一个地域
  147. belongNode.LocationID == node.LocationID,
  148. ))
  149. }
  150. return ramsg.ReplyOK(coormsg.NewPreUploadRespBody(respNodes))
  151. }
  152. func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) *coormsg.CreateObjectResp {
  153. err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  154. _, err := svc.db.Object().CreateRepObject(tx, msg.Body.BucketID, msg.Body.ObjectName, msg.Body.FileSize, msg.Body.RepCount, msg.Body.NodeIDs, msg.Body.FileHash)
  155. return err
  156. })
  157. if err != nil {
  158. logger.WithField("BucketName", msg.Body.BucketID).
  159. WithField("ObjectName", msg.Body.ObjectName).
  160. Warnf("create rep object failed, err: %s", err.Error())
  161. return ramsg.ReplyFailed[coormsg.CreateObjectResp](errorcode.OPERATION_FAILED, "create rep object failed")
  162. }
  163. // 紧急任务
  164. err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.Body.FileHash}), true, true)
  165. if err != nil {
  166. logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error())
  167. }
  168. return ramsg.ReplyOK(coormsg.NewCreateObjectRespBody())
  169. }
  170. func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) *coormsg.PreUpdateRepObjectResp {
  171. // TODO 检查用户是否有Object的权限
  172. // 获取对象信息
  173. obj, err := svc.db.Object().GetByID(svc.db.SQLCtx(), msg.Body.ObjectID)
  174. if err != nil {
  175. logger.WithField("ObjectID", msg.Body.ObjectID).
  176. Warnf("get object failed, err: %s", err.Error())
  177. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object failed")
  178. }
  179. if obj.Redundancy != consts.REDUNDANCY_REP {
  180. logger.WithField("ObjectID", msg.Body.ObjectID).
  181. Warnf("this object is not a rep object")
  182. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "this object is not a rep object")
  183. }
  184. // 获取对象Rep信息
  185. objRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), msg.Body.ObjectID)
  186. if err != nil {
  187. logger.WithField("ObjectID", msg.Body.ObjectID).
  188. Warnf("get object rep failed, err: %s", err.Error())
  189. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object rep failed")
  190. }
  191. //查询用户可用的节点IP
  192. nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.Body.UserID)
  193. if err != nil {
  194. logger.WithField("UserID", msg.Body.UserID).
  195. Warnf("query user nodes failed, err: %s", err.Error())
  196. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query user nodes failed")
  197. }
  198. // 查询客户端所属节点
  199. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.Body.ClientExternalIP)
  200. if err != nil {
  201. logger.WithField("ClientExternalIP", msg.Body.ClientExternalIP).
  202. Warnf("query client belong node failed, err: %s", err.Error())
  203. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  204. }
  205. // 查询保存了旧文件的节点信息
  206. cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.Body.UserID, objRep.FileHash)
  207. if err != nil {
  208. logger.Warnf("find caching file user nodes failed, err: %s", err.Error())
  209. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "find caching file user nodes failed")
  210. }
  211. var retNodes []coormsg.PreUpdateRepObjectRespNode
  212. for _, node := range nodes {
  213. retNodes = append(retNodes, coormsg.NewPreUpdateRepObjectRespNode(
  214. node.NodeID,
  215. node.ExternalIP,
  216. node.LocalIP,
  217. // LocationID 相同则认为是在同一个地域
  218. belongNode.LocationID == node.LocationID,
  219. // 此节点存储了对象旧文件
  220. lo.ContainsBy(cachingNodes, func(n model.Node) bool { return n.NodeID == node.NodeID }),
  221. ))
  222. }
  223. return ramsg.ReplyOK(coormsg.NewPreUpdateRepObjectRespBody(retNodes))
  224. }
  225. func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) *coormsg.UpdateRepObjectResp {
  226. err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  227. return svc.db.Object().UpdateRepObject(tx, msg.Body.ObjectID, msg.Body.FileSize, msg.Body.NodeIDs, msg.Body.FileHash)
  228. })
  229. if err != nil {
  230. logger.WithField("ObjectID", msg.Body.ObjectID).
  231. Warnf("update rep object failed, err: %s", err.Error())
  232. return ramsg.ReplyFailed[coormsg.UpdateRepObjectResp](errorcode.OPERATION_FAILED, "update rep object failed")
  233. }
  234. // 紧急任务
  235. err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.Body.FileHash}), true, true)
  236. if err != nil {
  237. logger.Warnf("post event to scanner failed, but this will not affect updating, err: %s", err.Error())
  238. }
  239. return ramsg.ReplyOK(coormsg.NewUpdateRepObjectRespBody())
  240. }
  241. func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) *coormsg.DeleteObjectResp {
  242. isAva, err := svc.db.Object().IsAvailable(svc.db.SQLCtx(), msg.Body.UserID, msg.Body.ObjectID)
  243. if err != nil {
  244. logger.WithField("UserID", msg.Body.UserID).
  245. WithField("ObjectID", msg.Body.ObjectID).
  246. Warnf("check object available failed, err: %s", err.Error())
  247. return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "check object available failed")
  248. }
  249. if !isAva {
  250. logger.WithField("UserID", msg.Body.UserID).
  251. WithField("ObjectID", msg.Body.ObjectID).
  252. Warnf("object is not available to the user")
  253. return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "object is not available to the user")
  254. }
  255. err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  256. return svc.db.Object().SoftDelete(tx, msg.Body.ObjectID)
  257. })
  258. if err != nil {
  259. logger.WithField("UserID", msg.Body.UserID).
  260. WithField("ObjectID", msg.Body.ObjectID).
  261. Warnf("set object deleted failed, err: %s", err.Error())
  262. return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "set object deleted failed")
  263. }
  264. stgs, err := svc.db.StorageObject().FindObjectStorages(svc.db.SQLCtx(), msg.Body.ObjectID)
  265. if err != nil {
  266. logger.Warnf("find object storages failed, but this will not affect the deleting, err: %s", err.Error())
  267. return ramsg.ReplyOK(coormsg.NewDeleteObjectRespBody())
  268. }
  269. // 不追求及时、准确
  270. if len(stgs) == 0 {
  271. // 如果没有被引用,直接投递CheckObject的任务
  272. err := svc.scanner.PostEvent(scevt.NewCheckObject([]int{msg.Body.ObjectID}), false, false)
  273. if err != nil {
  274. logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
  275. }
  276. logger.Debugf("post check object event")
  277. } else {
  278. // 有引用则让Agent去检查StorageObject
  279. for _, stg := range stgs {
  280. err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int{msg.Body.ObjectID}), false, false)
  281. if err != nil {
  282. logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
  283. }
  284. }
  285. logger.Debugf("post agent check storage event")
  286. }
  287. return ramsg.ReplyOK(coormsg.NewDeleteObjectRespBody())
  288. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。