You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 14 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago

  1. package services
  2. import (
  3. "database/sql"
  4. "errors"
  5. "github.com/jmoiron/sqlx"
  6. "github.com/samber/lo"
  7. "gitlink.org.cn/cloudream/common/consts"
  8. "gitlink.org.cn/cloudream/common/consts/errorcode"
  9. "gitlink.org.cn/cloudream/common/pkg/logger"
  10. "gitlink.org.cn/cloudream/db/model"
  11. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  12. coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
  13. scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
  14. )
  15. func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) (*coormsg.PreDownloadObjectResp, *ramsg.CodeMessage) {
  16. // 查询文件对象
  17. object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.UserID, msg.ObjectID)
  18. if err != nil {
  19. logger.WithField("ObjectID", msg.ObjectID).
  20. Warnf("query Object failed, err: %s", err.Error())
  21. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Object failed")
  22. }
  23. // 查询客户端所属节点
  24. foundBelongNode := true
  25. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
  26. if err == sql.ErrNoRows {
  27. foundBelongNode = false
  28. } else if err != nil {
  29. logger.WithField("ClientExternalIP", msg.ClientExternalIP).
  30. Warnf("query client belong node failed, err: %s", err.Error())
  31. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  32. }
  33. logger.Debugf("client address %s is at location %d", msg.ClientExternalIP, belongNode.LocationID)
  34. //-若redundancy是rep,查询对象副本表, 获得FileHash
  35. if object.Redundancy == consts.REDUNDANCY_REP {
  36. objectRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), object.ObjectID)
  37. if err != nil {
  38. logger.WithField("ObjectID", object.ObjectID).
  39. Warnf("get ObjectRep failed, err: %s", err.Error())
  40. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query ObjectRep failed")
  41. }
  42. // 注:由于采用了IPFS存储,因此每个备份文件的FileHash都是一样的
  43. nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objectRep.FileHash)
  44. if err != nil {
  45. logger.WithField("FileHash", objectRep.FileHash).
  46. Warnf("query Cache failed, err: %s", err.Error())
  47. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "query Cache failed")
  48. }
  49. var respNodes []ramsg.RespNode
  50. for _, node := range nodes {
  51. respNodes = append(respNodes, ramsg.NewRespNode(
  52. node.NodeID,
  53. node.ExternalIP,
  54. node.LocalIP,
  55. // LocationID 相同则认为是在同一个地域
  56. foundBelongNode && belongNode.LocationID == node.LocationID,
  57. ))
  58. }
  59. return ramsg.ReplyOK(coormsg.NewPreDownloadObjectResp(
  60. object.Redundancy,
  61. object.FileSize,
  62. ramsg.NewRespObjectRepInfo(objectRep.FileHash, respNodes),
  63. ))
  64. } else {
  65. // TODO 参考上面进行重写
  66. /*blocks, err := svc.db.QueryObjectBlock(object.ObjectID)
  67. if err != nil {
  68. log.WithField("ObjectID", object.ObjectID).
  69. Warnf("query Object Block failed, err: %s", err.Error())
  70. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "query Object Block failed")
  71. }
  72. ecPolicies := *utils.GetEcPolicy()
  73. ecPolicy := ecPolicies[*object.ECName]
  74. ecN := ecPolicy.GetN()
  75. ecK := ecPolicy.GetK()
  76. nodeIPs = make([]string, ecN)
  77. hashes = make([]string, ecN)
  78. for _, tt := range blocks {
  79. id := tt.InnerID
  80. hash := tt.BlockHash
  81. hashes[id] = hash //这里有问题,采取的其实是直接顺序读的方式,等待加入自适应读模块
  82. nodes, err := svc.db.QueryCacheNodeByBlockHash(hash)
  83. if err != nil {
  84. log.WithField("BlockHash", hash).
  85. Warnf("query Cache failed, err: %s", err.Error())
  86. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "query Cache failed")
  87. }
  88. if len(nodes) == 0 {
  89. log.WithField("BlockHash", hash).
  90. Warnf("No node cache the block data for the BlockHash")
  91. return ramsg.ReplyFailed[coormsg.ReadResp](errorcode.OPERATION_FAILED, "No node cache the block data for the BlockHash")
  92. }
  93. nodeIPs[id] = nodes[0].IP
  94. }
  95. //这里也有和上面一样的问题
  96. for i := 1; i < ecK; i++ {
  97. blockIDs = append(blockIDs, i)
  98. }*/
  99. return ramsg.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OPERATION_FAILED, "not implement yet!")
  100. }
  101. }
  102. func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) (*coormsg.PreUploadResp, *ramsg.CodeMessage) {
  103. // 判断同名对象是否存在。等到UploadRepObject时再判断一次。
  104. // 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果
  105. isBucketAvai, err := svc.db.Bucket().IsAvailable(svc.db.SQLCtx(), msg.BucketID, msg.UserID)
  106. if err != nil {
  107. logger.WithField("BucketID", msg.BucketID).
  108. Warnf("check bucket available failed, err: %s", err.Error())
  109. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "check bucket available failed")
  110. }
  111. if !isBucketAvai {
  112. logger.WithField("BucketID", msg.BucketID).
  113. Warnf("bucket is not available to user")
  114. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "bucket is not available to user")
  115. }
  116. _, err = svc.db.Object().GetByName(svc.db.SQLCtx(), msg.BucketID, msg.ObjectName)
  117. if err == nil {
  118. logger.WithField("BucketID", msg.BucketID).
  119. WithField("ObjectName", msg.ObjectName).
  120. Warnf("object with given Name and BucketID already exists")
  121. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "object with given Name and BucketID already exists")
  122. }
  123. if !errors.Is(err, sql.ErrNoRows) {
  124. logger.WithField("BucketID", msg.BucketID).
  125. WithField("ObjectName", msg.ObjectName).
  126. Warnf("get object by name failed, err: %s", err.Error())
  127. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "get object by name failed")
  128. }
  129. //查询用户可用的节点IP
  130. nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID)
  131. if err != nil {
  132. logger.WithField("UserID", msg.UserID).
  133. Warnf("query user nodes failed, err: %s", err.Error())
  134. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query user nodes failed")
  135. }
  136. // 查询客户端所属节点
  137. foundBelongNode := true
  138. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
  139. if err == sql.ErrNoRows {
  140. foundBelongNode = false
  141. } else if err != nil {
  142. logger.WithField("ClientExternalIP", msg.ClientExternalIP).
  143. Warnf("query client belong node failed, err: %s", err.Error())
  144. return ramsg.ReplyFailed[coormsg.PreUploadResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  145. }
  146. var respNodes []ramsg.RespNode
  147. for _, node := range nodes {
  148. respNodes = append(respNodes, ramsg.NewRespNode(
  149. node.NodeID,
  150. node.ExternalIP,
  151. node.LocalIP,
  152. // LocationID 相同则认为是在同一个地域
  153. foundBelongNode && belongNode.LocationID == node.LocationID,
  154. ))
  155. }
  156. return ramsg.ReplyOK(coormsg.NewPreUploadResp(respNodes))
  157. }
  158. func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) (*coormsg.CreateObjectResp, *ramsg.CodeMessage) {
  159. var objID int64
  160. err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  161. var err error
  162. objID, err = svc.db.Object().CreateRepObject(tx, msg.BucketID, msg.ObjectName, msg.FileSize, msg.RepCount, msg.NodeIDs, msg.FileHash)
  163. return err
  164. })
  165. if err != nil {
  166. logger.WithField("BucketName", msg.BucketID).
  167. WithField("ObjectName", msg.ObjectName).
  168. Warnf("create rep object failed, err: %s", err.Error())
  169. return ramsg.ReplyFailed[coormsg.CreateObjectResp](errorcode.OPERATION_FAILED, "create rep object failed")
  170. }
  171. // 紧急任务
  172. err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true)
  173. if err != nil {
  174. logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error())
  175. }
  176. return ramsg.ReplyOK(coormsg.NewCreateObjectResp(objID))
  177. }
  178. func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) (*coormsg.PreUpdateRepObjectResp, *ramsg.CodeMessage) {
  179. // TODO 检查用户是否有Object的权限
  180. // 获取对象信息
  181. obj, err := svc.db.Object().GetByID(svc.db.SQLCtx(), msg.ObjectID)
  182. if err != nil {
  183. logger.WithField("ObjectID", msg.ObjectID).
  184. Warnf("get object failed, err: %s", err.Error())
  185. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object failed")
  186. }
  187. if obj.Redundancy != consts.REDUNDANCY_REP {
  188. logger.WithField("ObjectID", msg.ObjectID).
  189. Warnf("this object is not a rep object")
  190. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "this object is not a rep object")
  191. }
  192. // 获取对象Rep信息
  193. objRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), msg.ObjectID)
  194. if err != nil {
  195. logger.WithField("ObjectID", msg.ObjectID).
  196. Warnf("get object rep failed, err: %s", err.Error())
  197. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "get object rep failed")
  198. }
  199. //查询用户可用的节点IP
  200. nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID)
  201. if err != nil {
  202. logger.WithField("UserID", msg.UserID).
  203. Warnf("query user nodes failed, err: %s", err.Error())
  204. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query user nodes failed")
  205. }
  206. // 查询客户端所属节点
  207. foundBelongNode := true
  208. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
  209. if err == sql.ErrNoRows {
  210. foundBelongNode = false
  211. } else if err != nil {
  212. logger.WithField("ClientExternalIP", msg.ClientExternalIP).
  213. Warnf("query client belong node failed, err: %s", err.Error())
  214. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "query client belong node failed")
  215. }
  216. // 查询保存了旧文件的节点信息
  217. cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objRep.FileHash)
  218. if err != nil {
  219. logger.Warnf("find caching file user nodes failed, err: %s", err.Error())
  220. return ramsg.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OPERATION_FAILED, "find caching file user nodes failed")
  221. }
  222. var retNodes []coormsg.PreUpdateRepObjectRespNode
  223. for _, node := range nodes {
  224. retNodes = append(retNodes, coormsg.NewPreUpdateRepObjectRespNode(
  225. node.NodeID,
  226. node.ExternalIP,
  227. node.LocalIP,
  228. // LocationID 相同则认为是在同一个地域
  229. foundBelongNode && belongNode.LocationID == node.LocationID,
  230. // 此节点存储了对象旧文件
  231. lo.ContainsBy(cachingNodes, func(n model.Node) bool { return n.NodeID == node.NodeID }),
  232. ))
  233. }
  234. return ramsg.ReplyOK(coormsg.NewPreUpdateRepObjectResp(retNodes))
  235. }
  236. func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) (*coormsg.UpdateRepObjectResp, *ramsg.CodeMessage) {
  237. err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  238. return svc.db.Object().UpdateRepObject(tx, msg.ObjectID, msg.FileSize, msg.NodeIDs, msg.FileHash)
  239. })
  240. if err != nil {
  241. logger.WithField("ObjectID", msg.ObjectID).
  242. Warnf("update rep object failed, err: %s", err.Error())
  243. return ramsg.ReplyFailed[coormsg.UpdateRepObjectResp](errorcode.OPERATION_FAILED, "update rep object failed")
  244. }
  245. // 紧急任务
  246. err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true)
  247. if err != nil {
  248. logger.Warnf("post event to scanner failed, but this will not affect updating, err: %s", err.Error())
  249. }
  250. return ramsg.ReplyOK(coormsg.NewUpdateRepObjectResp())
  251. }
  252. func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) (*coormsg.DeleteObjectResp, *ramsg.CodeMessage) {
  253. isAva, err := svc.db.Object().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.ObjectID)
  254. if err != nil {
  255. logger.WithField("UserID", msg.UserID).
  256. WithField("ObjectID", msg.ObjectID).
  257. Warnf("check object available failed, err: %s", err.Error())
  258. return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "check object available failed")
  259. }
  260. if !isAva {
  261. logger.WithField("UserID", msg.UserID).
  262. WithField("ObjectID", msg.ObjectID).
  263. Warnf("object is not available to the user")
  264. return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "object is not available to the user")
  265. }
  266. err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  267. return svc.db.Object().SoftDelete(tx, msg.ObjectID)
  268. })
  269. if err != nil {
  270. logger.WithField("UserID", msg.UserID).
  271. WithField("ObjectID", msg.ObjectID).
  272. Warnf("set object deleted failed, err: %s", err.Error())
  273. return ramsg.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OPERATION_FAILED, "set object deleted failed")
  274. }
  275. stgs, err := svc.db.StorageObject().FindObjectStorages(svc.db.SQLCtx(), msg.ObjectID)
  276. if err != nil {
  277. logger.Warnf("find object storages failed, but this will not affect the deleting, err: %s", err.Error())
  278. return ramsg.ReplyOK(coormsg.NewDeleteObjectResp())
  279. }
  280. // 不追求及时、准确
  281. if len(stgs) == 0 {
  282. // 如果没有被引用,直接投递CheckObject的任务
  283. err := svc.scanner.PostEvent(scevt.NewCheckObject([]int{msg.ObjectID}), false, false)
  284. if err != nil {
  285. logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
  286. }
  287. logger.Debugf("post check object event")
  288. } else {
  289. // 有引用则让Agent去检查StorageObject
  290. for _, stg := range stgs {
  291. err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int{msg.ObjectID}), false, false)
  292. if err != nil {
  293. logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
  294. }
  295. }
  296. logger.Debugf("post agent check storage event")
  297. }
  298. return ramsg.ReplyOK(coormsg.NewDeleteObjectResp())
  299. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。