You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 14 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago

  1. package services
  2. import (
  3. "database/sql"
  4. "errors"
  5. "github.com/jmoiron/sqlx"
  6. "github.com/samber/lo"
  7. "gitlink.org.cn/cloudream/common/consts"
  8. "gitlink.org.cn/cloudream/common/consts/errorcode"
  9. "gitlink.org.cn/cloudream/common/pkg/logger"
  10. "gitlink.org.cn/cloudream/db/model"
  11. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  12. coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
  13. scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
  14. )
  15. func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) (*coormsg.PreDownloadObjectResp, *ramsg.CodeMessage) {
  16. // 查询文件对象
  17. object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.UserID, msg.ObjectID)
  18. if err != nil {
  19. logger.WithField("ObjectID", msg.ObjectID).
  20. Warnf("query Object failed, err: %s", err.Error())
  21. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Object failed")
  22. }
  23. // 查询客户端所属节点
  24. foundBelongNode := true
  25. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
  26. if err == sql.ErrNoRows {
  27. foundBelongNode = false
  28. } else if err != nil {
  29. logger.WithField("ClientExternalIP", msg.ClientExternalIP).
  30. Warnf("query client belong node failed, err: %s", err.Error())
  31. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  32. }
  33. logger.Debugf("client address %s is at location %d", msg.ClientExternalIP, belongNode.LocationID)
  34. //-若redundancy是rep,查询对象副本表, 获得FileHash
  35. if object.Redundancy == consts.REDUNDANCY_REP {
  36. objectRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), object.ObjectID)
  37. if err != nil {
  38. logger.WithField("ObjectID", object.ObjectID).
  39. Warnf("get ObjectRep failed, err: %s", err.Error())
  40. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query ObjectRep failed")
  41. }
  42. // 注:由于采用了IPFS存储,因此每个备份文件的FileHash都是一样的
  43. nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objectRep.FileHash)
  44. if err != nil {
  45. logger.WithField("FileHash", objectRep.FileHash).
  46. Warnf("query Cache failed, err: %s", err.Error())
  47. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Cache failed")
  48. }
  49. var respNodes []ramsg.RespNode
  50. for _, node := range nodes {
  51. respNodes = append(respNodes, ramsg.NewRespNode(
  52. node.NodeID,
  53. node.ExternalIP,
  54. node.LocalIP,
  55. // LocationID 相同则认为是在同一个地域
  56. foundBelongNode && belongNode.LocationID == node.LocationID,
  57. ))
  58. }
  59. return ramsg.ReplyOK(coormsg.NewPreDownloadObjectResp(
  60. object.Redundancy,
  61. object.FileSize,
  62. ramsg.NewRespObjectRepInfo(objectRep.FileHash, respNodes),
  63. ))
  64. } else {
  65. // TODO 参考上面进行重写
  66. /*blocks, err := svc.db.QueryObjectBlock(object.ObjectID)
  67. if err != nil {
  68. log.WithField("ObjectID", object.ObjectID).
  69. Warnf("query Object Block failed, err: %s", err.Error())
  70. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "query Object Block failed")
  71. }
  72. ecPolicies := *utils.GetEcPolicy()
  73. ecPolicy := ecPolicies[*object.ECName]
  74. ecN := ecPolicy.GetN()
  75. ecK := ecPolicy.GetK()
  76. nodeIPs = make([]string, ecN)
  77. hashes = make([]string, ecN)
  78. for _, tt := range blocks {
  79. id := tt.InnerID
  80. hash := tt.BlockHash
  81. hashes[id] = hash //这里有问题,采取的其实是直接顺序读的方式,等待加入自适应读模块
  82. nodes, err := svc.db.QueryCacheNodeByBlockHash(hash)
  83. if err != nil {
  84. log.WithField("BlockHash", hash).
  85. Warnf("query Cache failed, err: %s", err.Error())
  86. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "query Cache failed")
  87. }
  88. if len(nodes) == 0 {
  89. log.WithField("BlockHash", hash).
  90. Warnf("No node cache the block data for the BlockHash")
  91. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "No node cache the block data for the BlockHash")
  92. }
  93. nodeIPs[id] = nodes[0].IP
  94. }
  95. //这里也有和上面一样的问题
  96. for i := 1; i < ecK; i++ {
  97. blockIDs = append(blockIDs, i)
  98. }*/
  99. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "not implement yet!")
  100. }
  101. }
  102. func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) (*coormsg.PreUploadResp, *ramsg.CodeMessage) {
  103. // 判断同名对象是否存在。等到UploadRepObject时再判断一次。
  104. // 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果
  105. isBucketAvai, err := svc.db.Bucket().IsAvailable(svc.db.SQLCtx(), msg.BucketID, msg.UserID)
  106. if err != nil {
  107. logger.WithField("BucketID", msg.BucketID).
  108. Warnf("check bucket available failed, err: %s", err.Error())
  109. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "check bucket available failed")
  110. }
  111. if !isBucketAvai {
  112. logger.WithField("BucketID", msg.BucketID).
  113. Warnf("bucket is not available to user")
  114. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "bucket is not available to user")
  115. }
  116. _, err = svc.db.Object().GetByName(svc.db.SQLCtx(), msg.BucketID, msg.ObjectName)
  117. if err == nil {
  118. logger.WithField("BucketID", msg.BucketID).
  119. WithField("ObjectName", msg.ObjectName).
  120. Warnf("object with given Name and BucketID already exists")
  121. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "object with given Name and BucketID already exists")
  122. }
  123. if !errors.Is(err, sql.ErrNoRows) {
  124. logger.WithField("BucketID", msg.BucketID).
  125. WithField("ObjectName", msg.ObjectName).
  126. Warnf("get object by name failed, err: %s", err.Error())
  127. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "get object by name failed")
  128. }
  129. //查询用户可用的节点IP
  130. nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID)
  131. if err != nil {
  132. logger.WithField("UserID", msg.UserID).
  133. Warnf("query user nodes failed, err: %s", err.Error())
  134. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query user nodes failed")
  135. }
  136. // 查询客户端所属节点
  137. foundBelongNode := true
  138. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
  139. if err == sql.ErrNoRows {
  140. foundBelongNode = false
  141. } else if err != nil {
  142. logger.WithField("ClientExternalIP", msg.ClientExternalIP).
  143. Warnf("query client belong node failed, err: %s", err.Error())
  144. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  145. }
  146. var respNodes []ramsg.RespNode
  147. for _, node := range nodes {
  148. respNodes = append(respNodes, ramsg.NewRespNode(
  149. node.NodeID,
  150. node.ExternalIP,
  151. node.LocalIP,
  152. // LocationID 相同则认为是在同一个地域
  153. foundBelongNode && belongNode.LocationID == node.LocationID,
  154. ))
  155. }
  156. return ramsg.ReplyOK(coormsg.NewPreUploadResp(respNodes))
  157. }
  158. func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) (*coormsg.CreateObjectResp, *ramsg.CodeMessage) {
  159. err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  160. _, err := svc.db.Object().CreateRepObject(tx, msg.BucketID, msg.ObjectName, msg.FileSize, msg.RepCount, msg.NodeIDs, msg.FileHash)
  161. return err
  162. })
  163. if err != nil {
  164. logger.WithField("BucketName", msg.BucketID).
  165. WithField("ObjectName", msg.ObjectName).
  166. Warnf("create rep object failed, err: %s", err.Error())
  167. return ramsg.ReplyFailed[coormsg.CreateObjectResp](errorcode.OPERATION_FAILED, "create rep object failed")
  168. }
  169. // 紧急任务
  170. err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true)
  171. if err != nil {
  172. logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error())
  173. }
  174. return ramsg.ReplyOK(coormsg.NewCreateObjectResp())
  175. }
  176. func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) (*coormsg.PreUpdateRepObjectResp, *ramsg.CodeMessage) {
  177. // TODO 检查用户是否有Object的权限
  178. // 获取对象信息
  179. obj, err := svc.db.Object().GetByID(svc.db.SQLCtx(), msg.ObjectID)
  180. if err != nil {
  181. logger.WithField("ObjectID", msg.ObjectID).
  182. Warnf("get object failed, err: %s", err.Error())
  183. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object failed")
  184. }
  185. if obj.Redundancy != consts.REDUNDANCY_REP {
  186. logger.WithField("ObjectID", msg.ObjectID).
  187. Warnf("this object is not a rep object")
  188. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "this object is not a rep object")
  189. }
  190. // 获取对象Rep信息
  191. objRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), msg.ObjectID)
  192. if err != nil {
  193. logger.WithField("ObjectID", msg.ObjectID).
  194. Warnf("get object rep failed, err: %s", err.Error())
  195. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object rep failed")
  196. }
  197. //查询用户可用的节点IP
  198. nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID)
  199. if err != nil {
  200. logger.WithField("UserID", msg.UserID).
  201. Warnf("query user nodes failed, err: %s", err.Error())
  202. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query user nodes failed")
  203. }
  204. // 查询客户端所属节点
  205. foundBelongNode := true
  206. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
  207. if err == sql.ErrNoRows {
  208. foundBelongNode = false
  209. } else if err != nil {
  210. logger.WithField("ClientExternalIP", msg.ClientExternalIP).
  211. Warnf("query client belong node failed, err: %s", err.Error())
  212. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  213. }
  214. // 查询保存了旧文件的节点信息
  215. cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objRep.FileHash)
  216. if err != nil {
  217. logger.Warnf("find caching file user nodes failed, err: %s", err.Error())
  218. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "find caching file user nodes failed")
  219. }
  220. var retNodes []coormsg.PreUpdateRepObjectRespNode
  221. for _, node := range nodes {
  222. retNodes = append(retNodes, coormsg.NewPreUpdateRepObjectRespNode(
  223. node.NodeID,
  224. node.ExternalIP,
  225. node.LocalIP,
  226. // LocationID 相同则认为是在同一个地域
  227. foundBelongNode && belongNode.LocationID == node.LocationID,
  228. // 此节点存储了对象旧文件
  229. lo.ContainsBy(cachingNodes, func(n model.Node) bool { return n.NodeID == node.NodeID }),
  230. ))
  231. }
  232. return ramsg.ReplyOK(coormsg.NewPreUpdateRepObjectResp(retNodes))
  233. }
  234. func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) (*coormsg.UpdateRepObjectResp, *ramsg.CodeMessage) {
  235. err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  236. return svc.db.Object().UpdateRepObject(tx, msg.ObjectID, msg.FileSize, msg.NodeIDs, msg.FileHash)
  237. })
  238. if err != nil {
  239. logger.WithField("ObjectID", msg.ObjectID).
  240. Warnf("update rep object failed, err: %s", err.Error())
  241. return ramsg.ReplyFailed[coormsg.UpdateRepObjectResp](errorcode.OPERATION_FAILED, "update rep object failed")
  242. }
  243. // 紧急任务
  244. err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true)
  245. if err != nil {
  246. logger.Warnf("post event to scanner failed, but this will not affect updating, err: %s", err.Error())
  247. }
  248. return ramsg.ReplyOK(coormsg.NewUpdateRepObjectResp())
  249. }
  250. func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) (*coormsg.DeleteObjectResp, *ramsg.CodeMessage) {
  251. isAva, err := svc.db.Object().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.ObjectID)
  252. if err != nil {
  253. logger.WithField("UserID", msg.UserID).
  254. WithField("ObjectID", msg.ObjectID).
  255. Warnf("check object available failed, err: %s", err.Error())
  256. return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "check object available failed")
  257. }
  258. if !isAva {
  259. logger.WithField("UserID", msg.UserID).
  260. WithField("ObjectID", msg.ObjectID).
  261. Warnf("object is not available to the user")
  262. return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "object is not available to the user")
  263. }
  264. err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  265. return svc.db.Object().SoftDelete(tx, msg.ObjectID)
  266. })
  267. if err != nil {
  268. logger.WithField("UserID", msg.UserID).
  269. WithField("ObjectID", msg.ObjectID).
  270. Warnf("set object deleted failed, err: %s", err.Error())
  271. return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "set object deleted failed")
  272. }
  273. stgs, err := svc.db.StorageObject().FindObjectStorages(svc.db.SQLCtx(), msg.ObjectID)
  274. if err != nil {
  275. logger.Warnf("find object storages failed, but this will not affect the deleting, err: %s", err.Error())
  276. return ramsg.ReplyOK(coormsg.NewDeleteObjectResp())
  277. }
  278. // 不追求及时、准确
  279. if len(stgs) == 0 {
  280. // 如果没有被引用,直接投递CheckObject的任务
  281. err := svc.scanner.PostEvent(scevt.NewCheckObject([]int64{msg.ObjectID}), false, false)
  282. if err != nil {
  283. logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
  284. }
  285. logger.Debugf("post check object event")
  286. } else {
  287. // 有引用则让Agent去检查StorageObject
  288. for _, stg := range stgs {
  289. err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.ObjectID}), false, false)
  290. if err != nil {
  291. logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
  292. }
  293. }
  294. logger.Debugf("post agent check storage event")
  295. }
  296. return ramsg.ReplyOK(coormsg.NewDeleteObjectResp())
  297. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。