You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 13 kB

2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. package mq
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "github.com/jmoiron/sqlx"
  6. "github.com/samber/lo"
  7. "gitlink.org.cn/cloudream/common/consts/errorcode"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. "gitlink.org.cn/cloudream/common/pkgs/mq"
  10. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  11. "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
  12. "gitlink.org.cn/cloudream/common/utils/sort2"
  13. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  14. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  15. )
  16. func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) {
  17. var objs []cdssdk.Object
  18. err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  19. _, err := svc.db.Package().GetUserPackage(tx, msg.UserID, msg.PackageID)
  20. if err != nil {
  21. return fmt.Errorf("getting package by id: %w", err)
  22. }
  23. objs, err = svc.db.Object().GetPackageObjects(svc.db.SQLCtx(), msg.PackageID)
  24. if err != nil {
  25. return fmt.Errorf("getting package objects: %w", err)
  26. }
  27. return nil
  28. })
  29. if err != nil {
  30. logger.WithField("UserID", msg.UserID).WithField("PackageID", msg.PackageID).
  31. Warn(err.Error())
  32. return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed")
  33. }
  34. return mq.ReplyOK(coormq.NewGetPackageObjectsResp(objs))
  35. }
  36. func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) (*coormq.GetPackageObjectDetailsResp, *mq.CodeMessage) {
  37. var details []stgmod.ObjectDetail
  38. // 必须放在事务里进行,因为GetPackageBlockDetails是由多次数据库操作组成,必须保证数据的一致性
  39. err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  40. var err error
  41. _, err = svc.db.Package().GetByID(tx, msg.PackageID)
  42. if err != nil {
  43. return fmt.Errorf("getting package by id: %w", err)
  44. }
  45. details, err = svc.db.Object().GetPackageObjectDetails(tx, msg.PackageID)
  46. if err != nil {
  47. return fmt.Errorf("getting package block details: %w", err)
  48. }
  49. return nil
  50. })
  51. if err != nil {
  52. logger.WithField("PackageID", msg.PackageID).Warn(err.Error())
  53. return nil, mq.Failed(errorcode.OperationFailed, "get package object block details failed")
  54. }
  55. return mq.ReplyOK(coormq.RespPackageObjectDetails(details))
  56. }
  57. func (svc *Service) GetObjectDetails(msg *coormq.GetObjectDetails) (*coormq.GetObjectDetailsResp, *mq.CodeMessage) {
  58. details := make([]*stgmod.ObjectDetail, len(msg.ObjectIDs))
  59. err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  60. var err error
  61. msg.ObjectIDs = sort2.SortAsc(msg.ObjectIDs)
  62. // 根据ID依次查询Object,ObjectBlock,PinnedObject,并根据升序的特点进行合并
  63. objs, err := svc.db.Object().BatchGet(tx, msg.ObjectIDs)
  64. if err != nil {
  65. return fmt.Errorf("batch get objects: %w", err)
  66. }
  67. objIDIdx := 0
  68. objIdx := 0
  69. for objIDIdx < len(msg.ObjectIDs) && objIdx < len(objs) {
  70. if msg.ObjectIDs[objIDIdx] < objs[objIdx].ObjectID {
  71. objIDIdx++
  72. continue
  73. }
  74. // 由于是使用msg.ObjectIDs去查询Object,因此不存在msg.ObjectIDs > Object.ObjectID的情况,
  75. // 下面同理
  76. obj := stgmod.ObjectDetail{
  77. Object: objs[objIDIdx],
  78. }
  79. details[objIDIdx] = &obj
  80. objIdx++
  81. }
  82. // 查询合并
  83. blocks, err := svc.db.ObjectBlock().BatchGetByObjectID(tx, msg.ObjectIDs)
  84. if err != nil {
  85. return fmt.Errorf("batch get object blocks: %w", err)
  86. }
  87. objIDIdx = 0
  88. blkIdx := 0
  89. for objIDIdx < len(msg.ObjectIDs) && blkIdx < len(blocks) {
  90. if details[objIDIdx] == nil {
  91. objIDIdx++
  92. continue
  93. }
  94. if msg.ObjectIDs[objIDIdx] < blocks[blkIdx].ObjectID {
  95. objIDIdx++
  96. continue
  97. }
  98. details[objIDIdx].Blocks = append(details[objIDIdx].Blocks, blocks[blkIdx])
  99. blkIdx++
  100. }
  101. // 查询合并
  102. pinneds, err := svc.db.PinnedObject().BatchGetByObjectID(tx, msg.ObjectIDs)
  103. if err != nil {
  104. return fmt.Errorf("batch get pinned objects: %w", err)
  105. }
  106. objIDIdx = 0
  107. pinIdx := 0
  108. for objIDIdx < len(msg.ObjectIDs) && pinIdx < len(pinneds) {
  109. if details[objIDIdx] == nil {
  110. objIDIdx++
  111. continue
  112. }
  113. if msg.ObjectIDs[objIDIdx] < pinneds[pinIdx].ObjectID {
  114. objIDIdx++
  115. continue
  116. }
  117. details[objIDIdx].PinnedAt = append(details[objIDIdx].PinnedAt, pinneds[pinIdx].NodeID)
  118. pinIdx++
  119. }
  120. return nil
  121. })
  122. if err != nil {
  123. logger.Warn(err.Error())
  124. return nil, mq.Failed(errorcode.OperationFailed, "get object details failed")
  125. }
  126. return mq.ReplyOK(coormq.RespGetObjectDetails(details))
  127. }
  128. func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) (*coormq.UpdateObjectRedundancyResp, *mq.CodeMessage) {
  129. err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  130. return svc.db.Object().BatchUpdateRedundancy(tx, msg.Updatings)
  131. })
  132. if err != nil {
  133. logger.Warnf("batch updating redundancy: %s", err.Error())
  134. return nil, mq.Failed(errorcode.OperationFailed, "batch update redundancy failed")
  135. }
  136. return mq.ReplyOK(coormq.RespUpdateObjectRedundancy())
  137. }
  138. func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.UpdateObjectInfosResp, *mq.CodeMessage) {
  139. var sucs []cdssdk.ObjectID
  140. err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  141. msg.Updatings = sort2.Sort(msg.Updatings, func(o1, o2 cdsapi.UpdatingObject) int {
  142. return sort2.Cmp(o1.ObjectID, o2.ObjectID)
  143. })
  144. objIDs := make([]cdssdk.ObjectID, len(msg.Updatings))
  145. for i, obj := range msg.Updatings {
  146. objIDs[i] = obj.ObjectID
  147. }
  148. oldObjs, err := svc.db.Object().BatchGet(tx, objIDs)
  149. if err != nil {
  150. return fmt.Errorf("batch getting objects: %w", err)
  151. }
  152. oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs))
  153. for i, obj := range oldObjs {
  154. oldObjIDs[i] = obj.ObjectID
  155. }
  156. avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs, func(obj cdsapi.UpdatingObject) cdssdk.ObjectID { return obj.ObjectID })
  157. if len(notExistsObjs) > 0 {
  158. // TODO 部分对象已经不存在
  159. }
  160. newObjs := make([]cdssdk.Object, len(avaiUpdatings))
  161. for i := range newObjs {
  162. newObjs[i] = oldObjs[i]
  163. avaiUpdatings[i].ApplyTo(&newObjs[i])
  164. }
  165. err = svc.db.Object().BatchUpsertByPackagePath(tx, newObjs)
  166. if err != nil {
  167. return fmt.Errorf("batch create or update: %w", err)
  168. }
  169. sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID })
  170. return nil
  171. })
  172. if err != nil {
  173. logger.Warnf("batch updating objects: %s", err.Error())
  174. return nil, mq.Failed(errorcode.OperationFailed, "batch update objects failed")
  175. }
  176. return mq.ReplyOK(coormq.RespUpdateObjectInfos(sucs))
  177. }
  178. // 根据objIDs从objs中挑选Object。
  179. // len(objs) >= len(objIDs)
  180. func pickByObjectIDs[T any](objs []T, objIDs []cdssdk.ObjectID, getID func(T) cdssdk.ObjectID) (picked []T, notFound []T) {
  181. objIdx := 0
  182. idIdx := 0
  183. for idIdx < len(objIDs) && objIdx < len(objs) {
  184. if getID(objs[objIdx]) < objIDs[idIdx] {
  185. notFound = append(notFound, objs[objIdx])
  186. objIdx++
  187. continue
  188. }
  189. picked = append(picked, objs[objIdx])
  190. objIdx++
  191. idIdx++
  192. }
  193. return
  194. }
  195. func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsResp, *mq.CodeMessage) {
  196. var sucs []cdssdk.ObjectID
  197. err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  198. msg.Movings = sort2.Sort(msg.Movings, func(o1, o2 cdsapi.MovingObject) int {
  199. return sort2.Cmp(o1.ObjectID, o2.ObjectID)
  200. })
  201. objIDs := make([]cdssdk.ObjectID, len(msg.Movings))
  202. for i, obj := range msg.Movings {
  203. objIDs[i] = obj.ObjectID
  204. }
  205. oldObjs, err := svc.db.Object().BatchGet(tx, objIDs)
  206. if err != nil {
  207. return fmt.Errorf("batch getting objects: %w", err)
  208. }
  209. oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs))
  210. for i, obj := range oldObjs {
  211. oldObjIDs[i] = obj.ObjectID
  212. }
  213. avaiMovings, notExistsObjs := pickByObjectIDs(msg.Movings, oldObjIDs, func(obj cdsapi.MovingObject) cdssdk.ObjectID { return obj.ObjectID })
  214. if len(notExistsObjs) > 0 {
  215. // TODO 部分对象已经不存在
  216. }
  217. // 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突
  218. var pkgIDChangedObjs []cdssdk.Object
  219. var pathChangedObjs []cdssdk.Object
  220. for i := range avaiMovings {
  221. if avaiMovings[i].PackageID != oldObjs[i].PackageID {
  222. newObj := oldObjs[i]
  223. avaiMovings[i].ApplyTo(&newObj)
  224. pkgIDChangedObjs = append(pkgIDChangedObjs, newObj)
  225. } else if avaiMovings[i].Path != oldObjs[i].Path {
  226. newObj := oldObjs[i]
  227. avaiMovings[i].ApplyTo(&newObj)
  228. pathChangedObjs = append(pathChangedObjs, newObj)
  229. }
  230. }
  231. var newObjs []cdssdk.Object
  232. // 对于PackageID发生变化的对象,需要检查目标Package内是否存在同Path的对象
  233. ensuredObjs, err := svc.ensurePackageChangedObjects(tx, msg.UserID, pkgIDChangedObjs)
  234. if err != nil {
  235. return err
  236. }
  237. newObjs = append(newObjs, ensuredObjs...)
  238. // 对于只有Path发生变化的对象,则检查同Package内有没有同Path的对象
  239. ensuredObjs, err = svc.ensurePathChangedObjects(tx, msg.UserID, pathChangedObjs)
  240. if err != nil {
  241. return err
  242. }
  243. newObjs = append(newObjs, ensuredObjs...)
  244. err = svc.db.Object().BatchUpert(tx, newObjs)
  245. if err != nil {
  246. return fmt.Errorf("batch create or update: %w", err)
  247. }
  248. sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID })
  249. return nil
  250. })
  251. if err != nil {
  252. logger.Warn(err.Error())
  253. return nil, mq.Failed(errorcode.OperationFailed, "move objects failed")
  254. }
  255. return mq.ReplyOK(coormq.RespMoveObjects(sucs))
  256. }
  257. func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) {
  258. if len(objs) == 0 {
  259. return nil, nil
  260. }
  261. type PackageObjects struct {
  262. PackageID cdssdk.PackageID
  263. ObjectByPath map[string]*cdssdk.Object
  264. }
  265. packages := make(map[cdssdk.PackageID]*PackageObjects)
  266. for _, obj := range objs {
  267. pkg, ok := packages[obj.PackageID]
  268. if !ok {
  269. pkg = &PackageObjects{
  270. PackageID: obj.PackageID,
  271. ObjectByPath: make(map[string]*cdssdk.Object),
  272. }
  273. packages[obj.PackageID] = pkg
  274. }
  275. if pkg.ObjectByPath[obj.Path] == nil {
  276. o := obj
  277. pkg.ObjectByPath[obj.Path] = &o
  278. } else {
  279. // TODO 有两个对象移动到同一个路径,有冲突
  280. }
  281. }
  282. var willUpdateObjs []cdssdk.Object
  283. for _, pkg := range packages {
  284. _, err := svc.db.Package().GetUserPackage(tx, userID, pkg.PackageID)
  285. if err == sql.ErrNoRows {
  286. continue
  287. }
  288. if err != nil {
  289. return nil, fmt.Errorf("getting user package by id: %w", err)
  290. }
  291. existsObjs, err := svc.db.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath))
  292. if err != nil {
  293. return nil, fmt.Errorf("batch getting objects by package path: %w", err)
  294. }
  295. // 标记冲突的对象
  296. for _, obj := range existsObjs {
  297. pkg.ObjectByPath[obj.Path] = nil
  298. // TODO 目标Package内有冲突的对象
  299. }
  300. for _, obj := range pkg.ObjectByPath {
  301. if obj == nil {
  302. continue
  303. }
  304. willUpdateObjs = append(willUpdateObjs, *obj)
  305. }
  306. }
  307. return willUpdateObjs, nil
  308. }
  309. func (svc *Service) ensurePathChangedObjects(tx *sqlx.Tx, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) {
  310. if len(objs) == 0 {
  311. return nil, nil
  312. }
  313. objByPath := make(map[string]*cdssdk.Object)
  314. for _, obj := range objs {
  315. if objByPath[obj.Path] == nil {
  316. o := obj
  317. objByPath[obj.Path] = &o
  318. } else {
  319. // TODO 有两个对象移动到同一个路径,有冲突
  320. }
  321. }
  322. _, err := svc.db.Package().GetUserPackage(tx, userID, objs[0].PackageID)
  323. if err == sql.ErrNoRows {
  324. return nil, nil
  325. }
  326. if err != nil {
  327. return nil, fmt.Errorf("getting user package by id: %w", err)
  328. }
  329. existsObjs, err := svc.db.Object().BatchGetByPackagePath(tx, objs[0].PackageID, lo.Map(objs, func(obj cdssdk.Object, idx int) string { return obj.Path }))
  330. if err != nil {
  331. return nil, fmt.Errorf("batch getting objects by package path: %w", err)
  332. }
  333. // 不支持两个对象交换位置的情况,因为数据库不支持
  334. for _, obj := range existsObjs {
  335. objByPath[obj.Path] = nil
  336. }
  337. var willMoveObjs []cdssdk.Object
  338. for _, obj := range objByPath {
  339. if obj == nil {
  340. continue
  341. }
  342. willMoveObjs = append(willMoveObjs, *obj)
  343. }
  344. return willMoveObjs, nil
  345. }
  346. func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObjectsResp, *mq.CodeMessage) {
  347. err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  348. err := svc.db.Object().BatchDelete(tx, msg.ObjectIDs)
  349. if err != nil {
  350. return fmt.Errorf("batch deleting objects: %w", err)
  351. }
  352. err = svc.db.ObjectBlock().BatchDeleteByObjectID(tx, msg.ObjectIDs)
  353. if err != nil {
  354. return fmt.Errorf("batch deleting object blocks: %w", err)
  355. }
  356. err = svc.db.PinnedObject().BatchDeleteByObjectID(tx, msg.ObjectIDs)
  357. if err != nil {
  358. return fmt.Errorf("batch deleting pinned objects: %w", err)
  359. }
  360. err = svc.db.ObjectAccessStat().BatchDeleteByObjectID(tx, msg.ObjectIDs)
  361. if err != nil {
  362. return fmt.Errorf("batch deleting object access stats: %w", err)
  363. }
  364. return nil
  365. })
  366. if err != nil {
  367. logger.Warnf("batch deleting objects: %s", err.Error())
  368. return nil, mq.Failed(errorcode.OperationFailed, "batch delete objects failed")
  369. }
  370. return mq.ReplyOK(coormq.RespDeleteObjects())
  371. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。