You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 14 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package services
  2. import (
  3. "database/sql"
  4. "errors"
  5. "github.com/jmoiron/sqlx"
  6. "github.com/samber/lo"
  7. "gitlink.org.cn/cloudream/common/consts/errorcode"
  8. "gitlink.org.cn/cloudream/common/models"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. "gitlink.org.cn/cloudream/common/pkgs/mq"
  11. "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
  12. mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message"
  13. coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
  14. scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner/event"
  15. )
  16. func (svc *Service) GetObjectsByDirName(msg *coormsg.GetObjectsByDirName) (*coormsg.GetObjectsResp, *mq.CodeMessage) {
  17. //查询dirName下所有文件
  18. objects, err := svc.db.Object().GetByDirName(svc.db.SQLCtx(), msg.DirName)
  19. if err != nil {
  20. logger.WithField("DirName", msg.DirName).
  21. Warnf("query dirname failed, err: %s", err.Error())
  22. return mq.ReplyFailed[coormsg.GetObjectsResp](errorcode.OperationFailed, "get objects failed")
  23. }
  24. return mq.ReplyOK(coormsg.NewGetObjectsResp(objects))
  25. }
  26. func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) (*coormsg.PreDownloadObjectResp, *mq.CodeMessage) {
  27. // 查询文件对象
  28. object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.UserID, msg.ObjectID)
  29. if err != nil {
  30. logger.WithField("ObjectID", msg.ObjectID).
  31. Warnf("query Object failed, err: %s", err.Error())
  32. return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Object failed")
  33. }
  34. // 查询客户端所属节点
  35. foundBelongNode := true
  36. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
  37. if err == sql.ErrNoRows {
  38. foundBelongNode = false
  39. } else if err != nil {
  40. logger.WithField("ClientExternalIP", msg.ClientExternalIP).
  41. Warnf("query client belong node failed, err: %s", err.Error())
  42. return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query client belong node failed")
  43. }
  44. logger.Debugf("client address %s is at location %d", msg.ClientExternalIP, belongNode.LocationID)
  45. //-若redundancy是rep,查询对象副本表, 获得FileHash
  46. if object.Redundancy == models.RedundancyRep {
  47. objectRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), object.ObjectID)
  48. if err != nil {
  49. logger.WithField("ObjectID", object.ObjectID).
  50. Warnf("get ObjectRep failed, err: %s", err.Error())
  51. return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query ObjectRep failed")
  52. }
  53. // 注:由于采用了IPFS存储,因此每个备份文件的FileHash都是一样的
  54. nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objectRep.FileHash)
  55. if err != nil {
  56. logger.WithField("FileHash", objectRep.FileHash).
  57. Warnf("query Cache failed, err: %s", err.Error())
  58. return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Cache failed")
  59. }
  60. var respNodes []mymq.RespNode
  61. for _, node := range nodes {
  62. respNodes = append(respNodes, mymq.NewRespNode(
  63. node.NodeID,
  64. node.ExternalIP,
  65. node.LocalIP,
  66. // LocationID 相同则认为是在同一个地域
  67. foundBelongNode && belongNode.LocationID == node.LocationID,
  68. ))
  69. }
  70. return mq.ReplyOK(coormsg.NewPreDownloadObjectResp(
  71. object.FileSize,
  72. mymq.NewRespRepRedundancyData(objectRep.FileHash, respNodes),
  73. ))
  74. } else {
  75. // TODO 参考上面进行重写
  76. ecName := object.Redundancy
  77. blocks, err := svc.db.QueryObjectBlock(object.ObjectID)
  78. if err != nil {
  79. logger.WithField("ObjectID", object.ObjectID).
  80. Warnf("query Blocks failed, err: %s", err.Error())
  81. return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Blocks failed")
  82. }
  83. logger.Debugf(blocks[4].BlockHash)
  84. //查询纠删码参数
  85. ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), ecName)
  86. ecc := mymq.NewEc(ec.EcID, ec.Name, ec.EcK, ec.EcN)
  87. //查询每个编码块存放的所有节点
  88. respNodes := make([][]mymq.RespNode, len(blocks))
  89. for i := 0; i < len(blocks); i++ {
  90. nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, blocks[i].BlockHash)
  91. if err != nil {
  92. logger.WithField("FileHash", blocks[i].BlockHash).
  93. Warnf("query Cache failed, err: %s", err.Error())
  94. return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Cache failed")
  95. }
  96. var nd []mymq.RespNode
  97. for _, node := range nodes {
  98. nd = append(nd, mymq.NewRespNode(
  99. node.NodeID,
  100. node.ExternalIP,
  101. node.LocalIP,
  102. // LocationID 相同则认为是在同一个地域
  103. foundBelongNode && belongNode.LocationID == node.LocationID,
  104. ))
  105. }
  106. respNodes[i] = nd
  107. logger.Debugf("##%d\n", i)
  108. }
  109. var blockss []mymq.RespObjectBlock
  110. for i := 0; i < len(blocks); i++ {
  111. blockss = append(blockss, mymq.NewRespObjectBlock(
  112. blocks[i].InnerID,
  113. blocks[i].BlockHash,
  114. ))
  115. }
  116. return mq.ReplyOK(coormsg.NewPreDownloadObjectResp(
  117. object.FileSize,
  118. mymq.NewRespEcRedundancyData(ecc, blockss, respNodes),
  119. ))
  120. }
  121. }
  122. func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) (*coormsg.PreUploadResp, *mq.CodeMessage) {
  123. // 判断同名对象是否存在。等到UploadRepObject时再判断一次。
  124. // 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果
  125. isBucketAvai, err := svc.db.Bucket().IsAvailable(svc.db.SQLCtx(), msg.BucketID, msg.UserID)
  126. if err != nil {
  127. logger.WithField("BucketID", msg.BucketID).
  128. Warnf("check bucket available failed, err: %s", err.Error())
  129. return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "check bucket available failed")
  130. }
  131. if !isBucketAvai {
  132. logger.WithField("BucketID", msg.BucketID).
  133. Warnf("bucket is not available to user")
  134. return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "bucket is not available to user")
  135. }
  136. _, err = svc.db.Object().GetByName(svc.db.SQLCtx(), msg.BucketID, msg.ObjectName)
  137. if err == nil {
  138. logger.WithField("BucketID", msg.BucketID).
  139. WithField("ObjectName", msg.ObjectName).
  140. Warnf("object with given Name and BucketID already exists")
  141. return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "object with given Name and BucketID already exists")
  142. }
  143. if !errors.Is(err, sql.ErrNoRows) {
  144. logger.WithField("BucketID", msg.BucketID).
  145. WithField("ObjectName", msg.ObjectName).
  146. Warnf("get object by name failed, err: %s", err.Error())
  147. return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "get object by name failed")
  148. }
  149. //查询用户可用的节点IP
  150. nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID)
  151. if err != nil {
  152. logger.WithField("UserID", msg.UserID).
  153. Warnf("query user nodes failed, err: %s", err.Error())
  154. return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "query user nodes failed")
  155. }
  156. // 查询客户端所属节点
  157. foundBelongNode := true
  158. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
  159. if err == sql.ErrNoRows {
  160. foundBelongNode = false
  161. } else if err != nil {
  162. logger.WithField("ClientExternalIP", msg.ClientExternalIP).
  163. Warnf("query client belong node failed, err: %s", err.Error())
  164. return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "query client belong node failed")
  165. }
  166. var respNodes []mymq.RespNode
  167. for _, node := range nodes {
  168. respNodes = append(respNodes, mymq.NewRespNode(
  169. node.NodeID,
  170. node.ExternalIP,
  171. node.LocalIP,
  172. // LocationID 相同则认为是在同一个地域
  173. foundBelongNode && belongNode.LocationID == node.LocationID,
  174. ))
  175. }
  176. return mq.ReplyOK(coormsg.NewPreUploadResp(respNodes))
  177. }
  178. func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) (*coormsg.CreateObjectResp, *mq.CodeMessage) {
  179. var objID int64
  180. err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  181. var err error
  182. objID, err = svc.db.Object().CreateRepObject(tx, msg.BucketID, msg.ObjectName, msg.FileSize, msg.RepCount, msg.NodeIDs, msg.FileHash, msg.DirName)
  183. return err
  184. })
  185. if err != nil {
  186. logger.WithField("BucketName", msg.BucketID).
  187. WithField("ObjectName", msg.ObjectName).
  188. Warnf("create rep object failed, err: %s", err.Error())
  189. return mq.ReplyFailed[coormsg.CreateObjectResp](errorcode.OperationFailed, "create rep object failed")
  190. }
  191. // 紧急任务
  192. err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true)
  193. if err != nil {
  194. logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error())
  195. }
  196. return mq.ReplyOK(coormsg.NewCreateObjectResp(objID))
  197. }
  198. func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) (*coormsg.PreUpdateRepObjectResp, *mq.CodeMessage) {
  199. // TODO 检查用户是否有Object的权限
  200. // 获取对象信息
  201. obj, err := svc.db.Object().GetByID(svc.db.SQLCtx(), msg.ObjectID)
  202. if err != nil {
  203. logger.WithField("ObjectID", msg.ObjectID).
  204. Warnf("get object failed, err: %s", err.Error())
  205. return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "get object failed")
  206. }
  207. if obj.Redundancy != models.RedundancyRep {
  208. logger.WithField("ObjectID", msg.ObjectID).
  209. Warnf("this object is not a rep object")
  210. return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "this object is not a rep object")
  211. }
  212. // 获取对象Rep信息
  213. objRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), msg.ObjectID)
  214. if err != nil {
  215. logger.WithField("ObjectID", msg.ObjectID).
  216. Warnf("get object rep failed, err: %s", err.Error())
  217. return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "get object rep failed")
  218. }
  219. //查询用户可用的节点IP
  220. nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID)
  221. if err != nil {
  222. logger.WithField("UserID", msg.UserID).
  223. Warnf("query user nodes failed, err: %s", err.Error())
  224. return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "query user nodes failed")
  225. }
  226. // 查询客户端所属节点
  227. foundBelongNode := true
  228. belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP)
  229. if err == sql.ErrNoRows {
  230. foundBelongNode = false
  231. } else if err != nil {
  232. logger.WithField("ClientExternalIP", msg.ClientExternalIP).
  233. Warnf("query client belong node failed, err: %s", err.Error())
  234. return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "query client belong node failed")
  235. }
  236. // 查询保存了旧文件的节点信息
  237. cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objRep.FileHash)
  238. if err != nil {
  239. logger.Warnf("find caching file user nodes failed, err: %s", err.Error())
  240. return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "find caching file user nodes failed")
  241. }
  242. var retNodes []coormsg.PreUpdateRepObjectRespNode
  243. for _, node := range nodes {
  244. retNodes = append(retNodes, coormsg.NewPreUpdateRepObjectRespNode(
  245. node.NodeID,
  246. node.ExternalIP,
  247. node.LocalIP,
  248. // LocationID 相同则认为是在同一个地域
  249. foundBelongNode && belongNode.LocationID == node.LocationID,
  250. // 此节点存储了对象旧文件
  251. lo.ContainsBy(cachingNodes, func(n model.Node) bool { return n.NodeID == node.NodeID }),
  252. ))
  253. }
  254. return mq.ReplyOK(coormsg.NewPreUpdateRepObjectResp(retNodes))
  255. }
  256. func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) (*coormsg.UpdateRepObjectResp, *mq.CodeMessage) {
  257. err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  258. return svc.db.Object().UpdateRepObject(tx, msg.ObjectID, msg.FileSize, msg.NodeIDs, msg.FileHash)
  259. })
  260. if err != nil {
  261. logger.WithField("ObjectID", msg.ObjectID).
  262. Warnf("update rep object failed, err: %s", err.Error())
  263. return mq.ReplyFailed[coormsg.UpdateRepObjectResp](errorcode.OperationFailed, "update rep object failed")
  264. }
  265. // 紧急任务
  266. err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true)
  267. if err != nil {
  268. logger.Warnf("post event to scanner failed, but this will not affect updating, err: %s", err.Error())
  269. }
  270. return mq.ReplyOK(coormsg.NewUpdateRepObjectResp())
  271. }
  272. func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) (*coormsg.DeleteObjectResp, *mq.CodeMessage) {
  273. isAva, err := svc.db.Object().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.ObjectID)
  274. if err != nil {
  275. logger.WithField("UserID", msg.UserID).
  276. WithField("ObjectID", msg.ObjectID).
  277. Warnf("check object available failed, err: %s", err.Error())
  278. return mq.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OperationFailed, "check object available failed")
  279. }
  280. if !isAva {
  281. logger.WithField("UserID", msg.UserID).
  282. WithField("ObjectID", msg.ObjectID).
  283. Warnf("object is not available to the user")
  284. return mq.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OperationFailed, "object is not available to the user")
  285. }
  286. err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  287. return svc.db.Object().SoftDelete(tx, msg.ObjectID)
  288. })
  289. if err != nil {
  290. logger.WithField("UserID", msg.UserID).
  291. WithField("ObjectID", msg.ObjectID).
  292. Warnf("set object deleted failed, err: %s", err.Error())
  293. return mq.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OperationFailed, "set object deleted failed")
  294. }
  295. stgs, err := svc.db.StorageObject().FindObjectStorages(svc.db.SQLCtx(), msg.ObjectID)
  296. if err != nil {
  297. logger.Warnf("find object storages failed, but this will not affect the deleting, err: %s", err.Error())
  298. return mq.ReplyOK(coormsg.NewDeleteObjectResp())
  299. }
  300. // 不追求及时、准确
  301. if len(stgs) == 0 {
  302. // 如果没有被引用,直接投递CheckObject的任务
  303. err := svc.scanner.PostEvent(scevt.NewCheckObject([]int64{msg.ObjectID}), false, false)
  304. if err != nil {
  305. logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
  306. }
  307. logger.Debugf("post check object event")
  308. } else {
  309. // 有引用则让Agent去检查StorageObject
  310. for _, stg := range stgs {
  311. err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.ObjectID}), false, false)
  312. if err != nil {
  313. logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
  314. }
  315. }
  316. logger.Debugf("post agent check storage event")
  317. }
  318. return mq.ReplyOK(coormsg.NewDeleteObjectResp())
  319. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。