You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 13 kB

2 years ago
2 years ago

  1. package mq
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "github.com/jmoiron/sqlx"
  6. "github.com/samber/lo"
  7. "gitlink.org.cn/cloudream/common/consts/errorcode"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. "gitlink.org.cn/cloudream/common/pkgs/mq"
  10. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  11. "gitlink.org.cn/cloudream/common/utils/sort2"
  12. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  13. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  14. )
  15. func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) {
  16. var objs []cdssdk.Object
  17. err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  18. _, err := svc.db.Package().GetUserPackage(tx, msg.UserID, msg.PackageID)
  19. if err != nil {
  20. return fmt.Errorf("getting package by id: %w", err)
  21. }
  22. objs, err = svc.db.Object().GetPackageObjects(svc.db.SQLCtx(), msg.PackageID)
  23. if err != nil {
  24. return fmt.Errorf("getting package objects: %w", err)
  25. }
  26. return nil
  27. })
  28. if err != nil {
  29. logger.WithField("UserID", msg.UserID).WithField("PackageID", msg.PackageID).
  30. Warn(err.Error())
  31. return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed")
  32. }
  33. return mq.ReplyOK(coormq.NewGetPackageObjectsResp(objs))
  34. }
  35. func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) (*coormq.GetPackageObjectDetailsResp, *mq.CodeMessage) {
  36. var details []stgmod.ObjectDetail
  37. // 必须放在事务里进行,因为GetPackageBlockDetails是由多次数据库操作组成,必须保证数据的一致性
  38. err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  39. var err error
  40. _, err = svc.db.Package().GetByID(tx, msg.PackageID)
  41. if err != nil {
  42. return fmt.Errorf("getting package by id: %w", err)
  43. }
  44. details, err = svc.db.Object().GetPackageObjectDetails(tx, msg.PackageID)
  45. if err != nil {
  46. return fmt.Errorf("getting package block details: %w", err)
  47. }
  48. return nil
  49. })
  50. if err != nil {
  51. logger.WithField("PackageID", msg.PackageID).Warn(err.Error())
  52. return nil, mq.Failed(errorcode.OperationFailed, "get package object block details failed")
  53. }
  54. return mq.ReplyOK(coormq.RespPackageObjectDetails(details))
  55. }
  56. func (svc *Service) GetObjectDetails(msg *coormq.GetObjectDetails) (*coormq.GetObjectDetailsResp, *mq.CodeMessage) {
  57. details := make([]*stgmod.ObjectDetail, len(msg.ObjectIDs))
  58. err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  59. var err error
  60. msg.ObjectIDs = sort2.SortAsc(msg.ObjectIDs)
  61. // 根据ID依次查询Object,ObjectBlock,PinnedObject,并根据升序的特点进行合并
  62. objs, err := svc.db.Object().BatchGet(tx, msg.ObjectIDs)
  63. if err != nil {
  64. return fmt.Errorf("batch get objects: %w", err)
  65. }
  66. objIDIdx := 0
  67. objIdx := 0
  68. for objIDIdx < len(msg.ObjectIDs) && objIdx < len(objs) {
  69. if msg.ObjectIDs[objIDIdx] < objs[objIdx].ObjectID {
  70. objIDIdx++
  71. continue
  72. }
  73. // 由于是使用msg.ObjectIDs去查询Object,因此不存在msg.ObjectIDs > Object.ObjectID的情况,
  74. // 下面同理
  75. obj := stgmod.ObjectDetail{
  76. Object: objs[objIDIdx],
  77. }
  78. details[objIDIdx] = &obj
  79. objIdx++
  80. }
  81. // 查询合并
  82. blocks, err := svc.db.ObjectBlock().BatchGetByObjectID(tx, msg.ObjectIDs)
  83. if err != nil {
  84. return fmt.Errorf("batch get object blocks: %w", err)
  85. }
  86. objIDIdx = 0
  87. blkIdx := 0
  88. for objIDIdx < len(msg.ObjectIDs) && blkIdx < len(blocks) {
  89. if details[objIDIdx] == nil {
  90. objIDIdx++
  91. continue
  92. }
  93. if msg.ObjectIDs[objIDIdx] < blocks[blkIdx].ObjectID {
  94. objIDIdx++
  95. continue
  96. }
  97. details[objIDIdx].Blocks = append(details[objIDIdx].Blocks, blocks[blkIdx])
  98. blkIdx++
  99. }
  100. // 查询合并
  101. pinneds, err := svc.db.PinnedObject().BatchGetByObjectID(tx, msg.ObjectIDs)
  102. if err != nil {
  103. return fmt.Errorf("batch get pinned objects: %w", err)
  104. }
  105. objIDIdx = 0
  106. pinIdx := 0
  107. for objIDIdx < len(msg.ObjectIDs) && pinIdx < len(pinneds) {
  108. if details[objIDIdx] == nil {
  109. objIDIdx++
  110. continue
  111. }
  112. if msg.ObjectIDs[objIDIdx] < pinneds[pinIdx].ObjectID {
  113. objIDIdx++
  114. continue
  115. }
  116. details[objIDIdx].PinnedAt = append(details[objIDIdx].PinnedAt, pinneds[pinIdx].NodeID)
  117. pinIdx++
  118. }
  119. return nil
  120. })
  121. if err != nil {
  122. logger.Warn(err.Error())
  123. return nil, mq.Failed(errorcode.OperationFailed, "get object details failed")
  124. }
  125. return mq.ReplyOK(coormq.RespGetObjectDetails(details))
  126. }
  127. func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) (*coormq.UpdateObjectRedundancyResp, *mq.CodeMessage) {
  128. err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  129. return svc.db.Object().BatchUpdateRedundancy(tx, msg.Updatings)
  130. })
  131. if err != nil {
  132. logger.Warnf("batch updating redundancy: %s", err.Error())
  133. return nil, mq.Failed(errorcode.OperationFailed, "batch update redundancy failed")
  134. }
  135. return mq.ReplyOK(coormq.RespUpdateObjectRedundancy())
  136. }
  137. func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.UpdateObjectInfosResp, *mq.CodeMessage) {
  138. var sucs []cdssdk.ObjectID
  139. err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  140. msg.Updatings = sort2.Sort(msg.Updatings, func(o1, o2 cdssdk.UpdatingObject) int {
  141. return sort2.Cmp(o1.ObjectID, o2.ObjectID)
  142. })
  143. objIDs := make([]cdssdk.ObjectID, len(msg.Updatings))
  144. for i, obj := range msg.Updatings {
  145. objIDs[i] = obj.ObjectID
  146. }
  147. oldObjs, err := svc.db.Object().BatchGet(tx, objIDs)
  148. if err != nil {
  149. return fmt.Errorf("batch getting objects: %w", err)
  150. }
  151. oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs))
  152. for i, obj := range oldObjs {
  153. oldObjIDs[i] = obj.ObjectID
  154. }
  155. avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs, func(obj cdssdk.UpdatingObject) cdssdk.ObjectID { return obj.ObjectID })
  156. if len(notExistsObjs) > 0 {
  157. // TODO 部分对象已经不存在
  158. }
  159. newObjs := make([]cdssdk.Object, len(avaiUpdatings))
  160. for i := range newObjs {
  161. newObjs[i] = oldObjs[i]
  162. avaiUpdatings[i].ApplyTo(&newObjs[i])
  163. }
  164. err = svc.db.Object().BatchUpsertByPackagePath(tx, newObjs)
  165. if err != nil {
  166. return fmt.Errorf("batch create or update: %w", err)
  167. }
  168. sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID })
  169. return nil
  170. })
  171. if err != nil {
  172. logger.Warnf("batch updating objects: %s", err.Error())
  173. return nil, mq.Failed(errorcode.OperationFailed, "batch update objects failed")
  174. }
  175. return mq.ReplyOK(coormq.RespUpdateObjectInfos(sucs))
  176. }
  177. // 根据objIDs从objs中挑选Object。
  178. // len(objs) >= len(objIDs)
  179. func pickByObjectIDs[T any](objs []T, objIDs []cdssdk.ObjectID, getID func(T) cdssdk.ObjectID) (picked []T, notFound []T) {
  180. objIdx := 0
  181. idIdx := 0
  182. for idIdx < len(objIDs) && objIdx < len(objs) {
  183. if getID(objs[objIdx]) < objIDs[idIdx] {
  184. notFound = append(notFound, objs[objIdx])
  185. objIdx++
  186. continue
  187. }
  188. picked = append(picked, objs[objIdx])
  189. objIdx++
  190. idIdx++
  191. }
  192. return
  193. }
  194. func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsResp, *mq.CodeMessage) {
  195. var sucs []cdssdk.ObjectID
  196. err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  197. msg.Movings = sort2.Sort(msg.Movings, func(o1, o2 cdssdk.MovingObject) int {
  198. return sort2.Cmp(o1.ObjectID, o2.ObjectID)
  199. })
  200. objIDs := make([]cdssdk.ObjectID, len(msg.Movings))
  201. for i, obj := range msg.Movings {
  202. objIDs[i] = obj.ObjectID
  203. }
  204. oldObjs, err := svc.db.Object().BatchGet(tx, objIDs)
  205. if err != nil {
  206. return fmt.Errorf("batch getting objects: %w", err)
  207. }
  208. oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs))
  209. for i, obj := range oldObjs {
  210. oldObjIDs[i] = obj.ObjectID
  211. }
  212. avaiMovings, notExistsObjs := pickByObjectIDs(msg.Movings, oldObjIDs, func(obj cdssdk.MovingObject) cdssdk.ObjectID { return obj.ObjectID })
  213. if len(notExistsObjs) > 0 {
  214. // TODO 部分对象已经不存在
  215. }
  216. // 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突
  217. var pkgIDChangedObjs []cdssdk.Object
  218. var pathChangedObjs []cdssdk.Object
  219. for i := range avaiMovings {
  220. if avaiMovings[i].PackageID != oldObjs[i].PackageID {
  221. newObj := oldObjs[i]
  222. avaiMovings[i].ApplyTo(&newObj)
  223. pkgIDChangedObjs = append(pkgIDChangedObjs, newObj)
  224. } else if avaiMovings[i].Path != oldObjs[i].Path {
  225. newObj := oldObjs[i]
  226. avaiMovings[i].ApplyTo(&newObj)
  227. pathChangedObjs = append(pathChangedObjs, newObj)
  228. }
  229. }
  230. var newObjs []cdssdk.Object
  231. // 对于PackageID发生变化的对象,需要检查目标Package内是否存在同Path的对象
  232. ensuredObjs, err := svc.ensurePackageChangedObjects(tx, msg.UserID, pkgIDChangedObjs)
  233. if err != nil {
  234. return err
  235. }
  236. newObjs = append(newObjs, ensuredObjs...)
  237. // 对于只有Path发生变化的对象,则检查同Package内有没有同Path的对象
  238. ensuredObjs, err = svc.ensurePathChangedObjects(tx, msg.UserID, pathChangedObjs)
  239. if err != nil {
  240. return err
  241. }
  242. newObjs = append(newObjs, ensuredObjs...)
  243. err = svc.db.Object().BatchUpert(tx, newObjs)
  244. if err != nil {
  245. return fmt.Errorf("batch create or update: %w", err)
  246. }
  247. sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID })
  248. return nil
  249. })
  250. if err != nil {
  251. logger.Warn(err.Error())
  252. return nil, mq.Failed(errorcode.OperationFailed, "move objects failed")
  253. }
  254. return mq.ReplyOK(coormq.RespMoveObjects(sucs))
  255. }
  256. func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) {
  257. if len(objs) == 0 {
  258. return nil, nil
  259. }
  260. type PackageObjects struct {
  261. PackageID cdssdk.PackageID
  262. ObjectByPath map[string]*cdssdk.Object
  263. }
  264. packages := make(map[cdssdk.PackageID]*PackageObjects)
  265. for _, obj := range objs {
  266. pkg, ok := packages[obj.PackageID]
  267. if !ok {
  268. pkg = &PackageObjects{
  269. PackageID: obj.PackageID,
  270. ObjectByPath: make(map[string]*cdssdk.Object),
  271. }
  272. packages[obj.PackageID] = pkg
  273. }
  274. if pkg.ObjectByPath[obj.Path] == nil {
  275. o := obj
  276. pkg.ObjectByPath[obj.Path] = &o
  277. } else {
  278. // TODO 有两个对象移动到同一个路径,有冲突
  279. }
  280. }
  281. var willUpdateObjs []cdssdk.Object
  282. for _, pkg := range packages {
  283. _, err := svc.db.Package().GetUserPackage(tx, userID, pkg.PackageID)
  284. if err == sql.ErrNoRows {
  285. continue
  286. }
  287. if err != nil {
  288. return nil, fmt.Errorf("getting user package by id: %w", err)
  289. }
  290. existsObjs, err := svc.db.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath))
  291. if err != nil {
  292. return nil, fmt.Errorf("batch getting objects by package path: %w", err)
  293. }
  294. // 标记冲突的对象
  295. for _, obj := range existsObjs {
  296. pkg.ObjectByPath[obj.Path] = nil
  297. // TODO 目标Package内有冲突的对象
  298. }
  299. for _, obj := range pkg.ObjectByPath {
  300. if obj == nil {
  301. continue
  302. }
  303. willUpdateObjs = append(willUpdateObjs, *obj)
  304. }
  305. }
  306. return willUpdateObjs, nil
  307. }
  308. func (svc *Service) ensurePathChangedObjects(tx *sqlx.Tx, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) {
  309. if len(objs) == 0 {
  310. return nil, nil
  311. }
  312. objByPath := make(map[string]*cdssdk.Object)
  313. for _, obj := range objs {
  314. if objByPath[obj.Path] == nil {
  315. o := obj
  316. objByPath[obj.Path] = &o
  317. } else {
  318. // TODO 有两个对象移动到同一个路径,有冲突
  319. }
  320. }
  321. _, err := svc.db.Package().GetUserPackage(tx, userID, objs[0].PackageID)
  322. if err == sql.ErrNoRows {
  323. return nil, nil
  324. }
  325. if err != nil {
  326. return nil, fmt.Errorf("getting user package by id: %w", err)
  327. }
  328. existsObjs, err := svc.db.Object().BatchGetByPackagePath(tx, objs[0].PackageID, lo.Map(objs, func(obj cdssdk.Object, idx int) string { return obj.Path }))
  329. if err != nil {
  330. return nil, fmt.Errorf("batch getting objects by package path: %w", err)
  331. }
  332. // 不支持两个对象交换位置的情况,因为数据库不支持
  333. for _, obj := range existsObjs {
  334. objByPath[obj.Path] = nil
  335. }
  336. var willMoveObjs []cdssdk.Object
  337. for _, obj := range objByPath {
  338. if obj == nil {
  339. continue
  340. }
  341. willMoveObjs = append(willMoveObjs, *obj)
  342. }
  343. return willMoveObjs, nil
  344. }
  345. func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObjectsResp, *mq.CodeMessage) {
  346. err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  347. err := svc.db.Object().BatchDelete(tx, msg.ObjectIDs)
  348. if err != nil {
  349. return fmt.Errorf("batch deleting objects: %w", err)
  350. }
  351. err = svc.db.ObjectBlock().BatchDeleteByObjectID(tx, msg.ObjectIDs)
  352. if err != nil {
  353. return fmt.Errorf("batch deleting object blocks: %w", err)
  354. }
  355. err = svc.db.PinnedObject().BatchDeleteByObjectID(tx, msg.ObjectIDs)
  356. if err != nil {
  357. return fmt.Errorf("batch deleting pinned objects: %w", err)
  358. }
  359. err = svc.db.ObjectAccessStat().BatchDeleteByObjectID(tx, msg.ObjectIDs)
  360. if err != nil {
  361. return fmt.Errorf("batch deleting object access stats: %w", err)
  362. }
  363. return nil
  364. })
  365. if err != nil {
  366. logger.Warnf("batch deleting objects: %s", err.Error())
  367. return nil, mq.Failed(errorcode.OperationFailed, "batch delete objects failed")
  368. }
  369. return mq.ReplyOK(coormq.RespDeleteObjects())
  370. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。