You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 26 kB

11 months ago
11 months ago
11 months ago
11 months ago
2 years ago
11 months ago
11 months ago

  1. package mq
  2. import (
  3. "errors"
  4. "fmt"
  5. "time"
  6. "gitlink.org.cn/cloudream/storage2/common/pkgs/db2"
  7. "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
  8. "gorm.io/gorm"
  9. "github.com/samber/lo"
  10. "gitlink.org.cn/cloudream/common/consts/errorcode"
  11. "gitlink.org.cn/cloudream/common/pkgs/logger"
  12. "gitlink.org.cn/cloudream/common/pkgs/mq"
  13. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  14. "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
  15. "gitlink.org.cn/cloudream/common/utils/sort2"
  16. stgmod "gitlink.org.cn/cloudream/storage2/common/models"
  17. coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
  18. )
  19. func (svc *Service) GetObjects(msg *coormq.GetObjects) (*coormq.GetObjectsResp, *mq.CodeMessage) {
  20. var ret []*cdssdk.Object
  21. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  22. // TODO 应该检查用户是否有每一个Object所在Package的权限
  23. objs, err := svc.db2.Object().BatchGet(tx, msg.ObjectIDs)
  24. if err != nil {
  25. return err
  26. }
  27. objMp := make(map[cdssdk.ObjectID]cdssdk.Object)
  28. for _, obj := range objs {
  29. objMp[obj.ObjectID] = obj
  30. }
  31. for _, objID := range msg.ObjectIDs {
  32. o, ok := objMp[objID]
  33. if ok {
  34. ret = append(ret, &o)
  35. } else {
  36. ret = append(ret, nil)
  37. }
  38. }
  39. return err
  40. })
  41. if err != nil {
  42. logger.WithField("UserID", msg.UserID).
  43. Warn(err.Error())
  44. return nil, mq.Failed(errorcode.OperationFailed, "get objects failed")
  45. }
  46. return mq.ReplyOK(coormq.RespGetObjects(ret))
  47. }
  48. func (svc *Service) ListObjectsByPath(msg *coormq.ListObjectsByPath) (*coormq.ListObjectsByPathResp, *mq.CodeMessage) {
  49. var coms []string
  50. var objs []cdssdk.Object
  51. var conToken string
  52. maxKeys := 1000
  53. if msg.MaxKeys > 0 {
  54. maxKeys = msg.MaxKeys
  55. }
  56. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  57. var err error
  58. _, err = svc.db2.Package().GetUserPackage(tx, msg.UserID, msg.PackageID)
  59. if err != nil {
  60. return fmt.Errorf("getting package by id: %w", err)
  61. }
  62. if !msg.IsPrefix {
  63. obj, err := svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path)
  64. if err != nil {
  65. return fmt.Errorf("getting object by path: %w", err)
  66. }
  67. objs = append(objs, obj)
  68. return nil
  69. }
  70. if !msg.NoRecursive {
  71. objs, err = svc.db2.Object().GetWithPathPrefixPaged(tx, msg.PackageID, msg.Path, msg.ContinuationToken, maxKeys)
  72. if err != nil {
  73. return fmt.Errorf("getting objects with prefix: %w", err)
  74. }
  75. if len(objs) > 0 {
  76. conToken = objs[len(objs)-1].Path
  77. }
  78. return nil
  79. }
  80. objs, coms, conToken, err = svc.db2.Object().GetByPrefixGroupedPaged(tx, msg.PackageID, msg.Path, msg.ContinuationToken, maxKeys)
  81. return err
  82. })
  83. if err != nil {
  84. logger.WithField("PathPrefix", msg.Path).Warn(err.Error())
  85. return nil, mq.Failed(errorcode.OperationFailed, "get objects with prefix failed")
  86. }
  87. return mq.ReplyOK(coormq.RespListObjectsByPath(cdsapi.ObjectListByPathResp{
  88. CommonPrefixes: coms,
  89. Objects: objs,
  90. IsTruncated: len(coms)+len(objs) >= maxKeys,
  91. NextContinuationToken: conToken,
  92. }))
  93. }
  94. func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) {
  95. var objs []cdssdk.Object
  96. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  97. _, err := svc.db2.Package().GetUserPackage(tx, msg.UserID, msg.PackageID)
  98. if err != nil {
  99. return fmt.Errorf("getting package by id: %w", err)
  100. }
  101. objs, err = svc.db2.Object().GetPackageObjects(tx, msg.PackageID)
  102. if err != nil {
  103. return fmt.Errorf("getting package objects: %w", err)
  104. }
  105. return nil
  106. })
  107. if err != nil {
  108. logger.WithField("UserID", msg.UserID).WithField("PackageID", msg.PackageID).
  109. Warn(err.Error())
  110. return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed")
  111. }
  112. return mq.ReplyOK(coormq.RespGetPackageObjects(objs))
  113. }
  114. func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) (*coormq.GetPackageObjectDetailsResp, *mq.CodeMessage) {
  115. var details []stgmod.ObjectDetail
  116. // 必须放在事务里进行,因为GetPackageBlockDetails是由多次数据库操作组成,必须保证数据的一致性
  117. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  118. var err error
  119. _, err = svc.db2.Package().GetByID(tx, msg.PackageID)
  120. if err != nil {
  121. return fmt.Errorf("getting package by id: %w", err)
  122. }
  123. details, err = svc.db2.Object().GetPackageObjectDetails(tx, msg.PackageID)
  124. if err != nil {
  125. return fmt.Errorf("getting package block details: %w", err)
  126. }
  127. return nil
  128. })
  129. if err != nil {
  130. logger.WithField("PackageID", msg.PackageID).Warn(err.Error())
  131. return nil, mq.Failed(errorcode.OperationFailed, "get package object block details failed")
  132. }
  133. return mq.ReplyOK(coormq.RespPackageObjectDetails(details))
  134. }
  135. func (svc *Service) GetObjectDetails(msg *coormq.GetObjectDetails) (*coormq.GetObjectDetailsResp, *mq.CodeMessage) {
  136. detailsMp := make(map[cdssdk.ObjectID]*stgmod.ObjectDetail)
  137. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  138. var err error
  139. msg.ObjectIDs = sort2.SortAsc(msg.ObjectIDs)
  140. // 根据ID依次查询Object,ObjectBlock,PinnedObject,并根据升序的特点进行合并
  141. objs, err := svc.db2.Object().BatchGet(tx, msg.ObjectIDs)
  142. if err != nil {
  143. return fmt.Errorf("batch get objects: %w", err)
  144. }
  145. for _, obj := range objs {
  146. detailsMp[obj.ObjectID] = &stgmod.ObjectDetail{
  147. Object: obj,
  148. }
  149. }
  150. // 查询合并
  151. blocks, err := svc.db2.ObjectBlock().BatchGetByObjectID(tx, msg.ObjectIDs)
  152. if err != nil {
  153. return fmt.Errorf("batch get object blocks: %w", err)
  154. }
  155. for _, block := range blocks {
  156. d := detailsMp[block.ObjectID]
  157. d.Blocks = append(d.Blocks, block)
  158. }
  159. // 查询合并
  160. pinneds, err := svc.db2.PinnedObject().BatchGetByObjectID(tx, msg.ObjectIDs)
  161. if err != nil {
  162. return fmt.Errorf("batch get pinned objects: %w", err)
  163. }
  164. for _, pinned := range pinneds {
  165. d := detailsMp[pinned.ObjectID]
  166. d.PinnedAt = append(d.PinnedAt, pinned.StorageID)
  167. }
  168. return nil
  169. })
  170. if err != nil {
  171. logger.Warn(err.Error())
  172. return nil, mq.Failed(errorcode.OperationFailed, "get object details failed")
  173. }
  174. details := make([]*stgmod.ObjectDetail, len(msg.ObjectIDs))
  175. for i, objID := range msg.ObjectIDs {
  176. details[i] = detailsMp[objID]
  177. }
  178. return mq.ReplyOK(coormq.RespGetObjectDetails(details))
  179. }
  180. func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) (*coormq.UpdateObjectRedundancyResp, *mq.CodeMessage) {
  181. err := svc.db2.DoTx(func(ctx db2.SQLContext) error {
  182. db := svc.db2
  183. objs := msg.Updatings
  184. nowTime := time.Now()
  185. objIDs := make([]cdssdk.ObjectID, 0, len(objs))
  186. for _, obj := range objs {
  187. objIDs = append(objIDs, obj.ObjectID)
  188. }
  189. avaiIDs, err := db.Object().BatchTestObjectID(ctx, objIDs)
  190. if err != nil {
  191. return fmt.Errorf("batch test object id: %w", err)
  192. }
  193. // 过滤掉已经不存在的对象。
  194. // 注意,objIDs没有被过滤,因为后续逻辑不过滤也不会出错
  195. objs = lo.Filter(objs, func(obj coormq.UpdatingObjectRedundancy, _ int) bool {
  196. return avaiIDs[obj.ObjectID]
  197. })
  198. dummyObjs := make([]cdssdk.Object, 0, len(objs))
  199. for _, obj := range objs {
  200. dummyObjs = append(dummyObjs, cdssdk.Object{
  201. ObjectID: obj.ObjectID,
  202. FileHash: obj.FileHash,
  203. Size: obj.Size,
  204. Redundancy: obj.Redundancy,
  205. CreateTime: nowTime, // 实际不会更新,只因为不能是0值
  206. UpdateTime: nowTime,
  207. })
  208. }
  209. err = db.Object().BatchUpdateColumns(ctx, dummyObjs, []string{"FileHash", "Size", "Redundancy", "UpdateTime"})
  210. if err != nil {
  211. return fmt.Errorf("batch update object redundancy: %w", err)
  212. }
  213. // 删除原本所有的编码块记录,重新添加
  214. err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs)
  215. if err != nil {
  216. return fmt.Errorf("batch delete object blocks: %w", err)
  217. }
  218. // 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况
  219. err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs)
  220. if err != nil {
  221. return fmt.Errorf("batch delete pinned object: %w", err)
  222. }
  223. blocks := make([]stgmod.ObjectBlock, 0, len(objs))
  224. for _, obj := range objs {
  225. blocks = append(blocks, obj.Blocks...)
  226. }
  227. err = db.ObjectBlock().BatchCreate(ctx, blocks)
  228. if err != nil {
  229. return fmt.Errorf("batch create object blocks: %w", err)
  230. }
  231. caches := make([]model.Cache, 0, len(objs))
  232. for _, obj := range objs {
  233. for _, blk := range obj.Blocks {
  234. caches = append(caches, model.Cache{
  235. FileHash: blk.FileHash,
  236. StorageID: blk.StorageID,
  237. CreateTime: nowTime,
  238. Priority: 0,
  239. })
  240. }
  241. }
  242. err = db.Cache().BatchCreate(ctx, caches)
  243. if err != nil {
  244. return fmt.Errorf("batch create object caches: %w", err)
  245. }
  246. pinneds := make([]cdssdk.PinnedObject, 0, len(objs))
  247. for _, obj := range objs {
  248. for _, p := range obj.PinnedAt {
  249. pinneds = append(pinneds, cdssdk.PinnedObject{
  250. ObjectID: obj.ObjectID,
  251. StorageID: p,
  252. CreateTime: nowTime,
  253. })
  254. }
  255. }
  256. err = db.PinnedObject().BatchTryCreate(ctx, pinneds)
  257. if err != nil {
  258. return fmt.Errorf("batch create pinned objects: %w", err)
  259. }
  260. return nil
  261. })
  262. if err != nil {
  263. logger.Warnf("batch updating redundancy: %s", err.Error())
  264. return nil, mq.Failed(errorcode.OperationFailed, "batch update redundancy failed")
  265. }
  266. return mq.ReplyOK(coormq.RespUpdateObjectRedundancy())
  267. }
  268. func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.UpdateObjectInfosResp, *mq.CodeMessage) {
  269. var sucs []cdssdk.ObjectID
  270. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  271. msg.Updatings = sort2.Sort(msg.Updatings, func(o1, o2 cdsapi.UpdatingObject) int {
  272. return sort2.Cmp(o1.ObjectID, o2.ObjectID)
  273. })
  274. objIDs := make([]cdssdk.ObjectID, len(msg.Updatings))
  275. for i, obj := range msg.Updatings {
  276. objIDs[i] = obj.ObjectID
  277. }
  278. oldObjs, err := svc.db2.Object().BatchGet(tx, objIDs)
  279. if err != nil {
  280. return fmt.Errorf("batch getting objects: %w", err)
  281. }
  282. oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs))
  283. for i, obj := range oldObjs {
  284. oldObjIDs[i] = obj.ObjectID
  285. }
  286. avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs, func(obj cdsapi.UpdatingObject) cdssdk.ObjectID { return obj.ObjectID })
  287. if len(notExistsObjs) > 0 {
  288. // TODO 部分对象已经不存在
  289. }
  290. newObjs := make([]cdssdk.Object, len(avaiUpdatings))
  291. for i := range newObjs {
  292. newObjs[i] = oldObjs[i]
  293. avaiUpdatings[i].ApplyTo(&newObjs[i])
  294. }
  295. err = svc.db2.Object().BatchUpdate(tx, newObjs)
  296. if err != nil {
  297. return fmt.Errorf("batch create or update: %w", err)
  298. }
  299. sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID })
  300. return nil
  301. })
  302. if err != nil {
  303. logger.Warnf("batch updating objects: %s", err.Error())
  304. return nil, mq.Failed(errorcode.OperationFailed, "batch update objects failed")
  305. }
  306. return mq.ReplyOK(coormq.RespUpdateObjectInfos(sucs))
  307. }
  308. // 根据objIDs从objs中挑选Object。
  309. // len(objs) >= len(objIDs)
  310. func pickByObjectIDs[T any](objs []T, objIDs []cdssdk.ObjectID, getID func(T) cdssdk.ObjectID) (picked []T, notFound []T) {
  311. objIdx := 0
  312. idIdx := 0
  313. for idIdx < len(objIDs) && objIdx < len(objs) {
  314. if getID(objs[objIdx]) < objIDs[idIdx] {
  315. notFound = append(notFound, objs[objIdx])
  316. objIdx++
  317. continue
  318. }
  319. picked = append(picked, objs[objIdx])
  320. objIdx++
  321. idIdx++
  322. }
  323. return
  324. }
  325. func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsResp, *mq.CodeMessage) {
  326. var sucs []cdssdk.ObjectID
  327. var evt []*stgmod.BodyObjectInfoUpdated
  328. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  329. msg.Movings = sort2.Sort(msg.Movings, func(o1, o2 cdsapi.MovingObject) int {
  330. return sort2.Cmp(o1.ObjectID, o2.ObjectID)
  331. })
  332. objIDs := make([]cdssdk.ObjectID, len(msg.Movings))
  333. for i, obj := range msg.Movings {
  334. objIDs[i] = obj.ObjectID
  335. }
  336. oldObjs, err := svc.db2.Object().BatchGet(tx, objIDs)
  337. if err != nil {
  338. return fmt.Errorf("batch getting objects: %w", err)
  339. }
  340. oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs))
  341. for i, obj := range oldObjs {
  342. oldObjIDs[i] = obj.ObjectID
  343. }
  344. // 找出仍在数据库的Object
  345. avaiMovings, notExistsObjs := pickByObjectIDs(msg.Movings, oldObjIDs, func(obj cdsapi.MovingObject) cdssdk.ObjectID { return obj.ObjectID })
  346. if len(notExistsObjs) > 0 {
  347. // TODO 部分对象已经不存在
  348. }
  349. // 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突
  350. var pkgIDChangedObjs []cdssdk.Object
  351. var pathChangedObjs []cdssdk.Object
  352. for i := range avaiMovings {
  353. if avaiMovings[i].PackageID != oldObjs[i].PackageID {
  354. newObj := oldObjs[i]
  355. avaiMovings[i].ApplyTo(&newObj)
  356. pkgIDChangedObjs = append(pkgIDChangedObjs, newObj)
  357. } else if avaiMovings[i].Path != oldObjs[i].Path {
  358. newObj := oldObjs[i]
  359. avaiMovings[i].ApplyTo(&newObj)
  360. pathChangedObjs = append(pathChangedObjs, newObj)
  361. }
  362. }
  363. var newObjs []cdssdk.Object
  364. // 对于PackageID发生变化的对象,需要检查目标Package内是否存在同Path的对象
  365. checkedObjs, err := svc.checkPackageChangedObjects(tx, msg.UserID, pkgIDChangedObjs)
  366. if err != nil {
  367. return err
  368. }
  369. newObjs = append(newObjs, checkedObjs...)
  370. // 对于只有Path发生变化的对象,则检查同Package内有没有同Path的对象
  371. checkedObjs, err = svc.checkPathChangedObjects(tx, msg.UserID, pathChangedObjs)
  372. if err != nil {
  373. return err
  374. }
  375. newObjs = append(newObjs, checkedObjs...)
  376. err = svc.db2.Object().BatchUpdate(tx, newObjs)
  377. if err != nil {
  378. return fmt.Errorf("batch create or update: %w", err)
  379. }
  380. sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID })
  381. evt = lo.Map(newObjs, func(obj cdssdk.Object, _ int) *stgmod.BodyObjectInfoUpdated {
  382. return &stgmod.BodyObjectInfoUpdated{
  383. Object: obj,
  384. }
  385. })
  386. return nil
  387. })
  388. if err != nil {
  389. logger.Warn(err.Error())
  390. return nil, mq.Failed(errorcode.OperationFailed, "move objects failed")
  391. }
  392. for _, e := range evt {
  393. svc.evtPub.Publish(e)
  394. }
  395. return mq.ReplyOK(coormq.RespMoveObjects(sucs))
  396. }
  397. func (svc *Service) checkPackageChangedObjects(tx db2.SQLContext, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) {
  398. if len(objs) == 0 {
  399. return nil, nil
  400. }
  401. type PackageObjects struct {
  402. PackageID cdssdk.PackageID
  403. ObjectByPath map[string]*cdssdk.Object
  404. }
  405. packages := make(map[cdssdk.PackageID]*PackageObjects)
  406. for _, obj := range objs {
  407. pkg, ok := packages[obj.PackageID]
  408. if !ok {
  409. pkg = &PackageObjects{
  410. PackageID: obj.PackageID,
  411. ObjectByPath: make(map[string]*cdssdk.Object),
  412. }
  413. packages[obj.PackageID] = pkg
  414. }
  415. if pkg.ObjectByPath[obj.Path] == nil {
  416. o := obj
  417. pkg.ObjectByPath[obj.Path] = &o
  418. } else {
  419. // TODO 有两个对象移动到同一个路径,有冲突
  420. }
  421. }
  422. var willUpdateObjs []cdssdk.Object
  423. for _, pkg := range packages {
  424. _, err := svc.db2.Package().GetUserPackage(tx, userID, pkg.PackageID)
  425. if errors.Is(err, gorm.ErrRecordNotFound) {
  426. continue
  427. }
  428. if err != nil {
  429. return nil, fmt.Errorf("getting user package by id: %w", err)
  430. }
  431. existsObjs, err := svc.db2.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath))
  432. if err != nil {
  433. return nil, fmt.Errorf("batch getting objects by package path: %w", err)
  434. }
  435. // 标记冲突的对象
  436. for _, obj := range existsObjs {
  437. pkg.ObjectByPath[obj.Path] = nil
  438. // TODO 目标Package内有冲突的对象
  439. }
  440. for _, obj := range pkg.ObjectByPath {
  441. if obj == nil {
  442. continue
  443. }
  444. willUpdateObjs = append(willUpdateObjs, *obj)
  445. }
  446. }
  447. return willUpdateObjs, nil
  448. }
  449. func (svc *Service) checkPathChangedObjects(tx db2.SQLContext, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) {
  450. if len(objs) == 0 {
  451. return nil, nil
  452. }
  453. objByPath := make(map[string]*cdssdk.Object)
  454. for _, obj := range objs {
  455. if objByPath[obj.Path] == nil {
  456. o := obj
  457. objByPath[obj.Path] = &o
  458. } else {
  459. // TODO 有两个对象移动到同一个路径,有冲突
  460. }
  461. }
  462. _, err := svc.db2.Package().GetUserPackage(tx, userID, objs[0].PackageID)
  463. if errors.Is(err, gorm.ErrRecordNotFound) {
  464. return nil, nil
  465. }
  466. if err != nil {
  467. return nil, fmt.Errorf("getting user package by id: %w", err)
  468. }
  469. existsObjs, err := svc.db2.Object().BatchGetByPackagePath(tx, objs[0].PackageID, lo.Map(objs, func(obj cdssdk.Object, idx int) string { return obj.Path }))
  470. if err != nil {
  471. return nil, fmt.Errorf("batch getting objects by package path: %w", err)
  472. }
  473. // 不支持两个对象交换位置的情况,因为数据库不支持
  474. for _, obj := range existsObjs {
  475. objByPath[obj.Path] = nil
  476. }
  477. var willMoveObjs []cdssdk.Object
  478. for _, obj := range objByPath {
  479. if obj == nil {
  480. continue
  481. }
  482. willMoveObjs = append(willMoveObjs, *obj)
  483. }
  484. return willMoveObjs, nil
  485. }
  486. func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObjectsResp, *mq.CodeMessage) {
  487. var sucs []cdssdk.ObjectID
  488. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  489. avaiIDs, err := svc.db2.Object().BatchTestObjectID(tx, msg.ObjectIDs)
  490. if err != nil {
  491. return fmt.Errorf("batch testing object id: %w", err)
  492. }
  493. sucs = lo.Keys(avaiIDs)
  494. err = svc.db2.Object().BatchDelete(tx, msg.ObjectIDs)
  495. if err != nil {
  496. return fmt.Errorf("batch deleting objects: %w", err)
  497. }
  498. err = svc.db2.ObjectBlock().BatchDeleteByObjectID(tx, msg.ObjectIDs)
  499. if err != nil {
  500. return fmt.Errorf("batch deleting object blocks: %w", err)
  501. }
  502. err = svc.db2.PinnedObject().BatchDeleteByObjectID(tx, msg.ObjectIDs)
  503. if err != nil {
  504. return fmt.Errorf("batch deleting pinned objects: %w", err)
  505. }
  506. err = svc.db2.ObjectAccessStat().BatchDeleteByObjectID(tx, msg.ObjectIDs)
  507. if err != nil {
  508. return fmt.Errorf("batch deleting object access stats: %w", err)
  509. }
  510. return nil
  511. })
  512. if err != nil {
  513. logger.Warnf("batch deleting objects: %s", err.Error())
  514. return nil, mq.Failed(errorcode.OperationFailed, "batch delete objects failed")
  515. }
  516. for _, objID := range sucs {
  517. svc.evtPub.Publish(&stgmod.BodyObjectDeleted{
  518. ObjectID: objID,
  519. })
  520. }
  521. return mq.ReplyOK(coormq.RespDeleteObjects(sucs))
  522. }
  523. func (svc *Service) CloneObjects(msg *coormq.CloneObjects) (*coormq.CloneObjectsResp, *mq.CodeMessage) {
  524. type CloningObject struct {
  525. Cloning cdsapi.CloningObject
  526. OrgIndex int
  527. }
  528. type PackageClonings struct {
  529. PackageID cdssdk.PackageID
  530. Clonings map[string]CloningObject
  531. }
  532. var evt []*stgmod.BodyNewOrUpdateObject
  533. // TODO 要检查用户是否有Object、Package的权限
  534. clonings := make(map[cdssdk.PackageID]*PackageClonings)
  535. for i, cloning := range msg.Clonings {
  536. pkg, ok := clonings[cloning.NewPackageID]
  537. if !ok {
  538. pkg = &PackageClonings{
  539. PackageID: cloning.NewPackageID,
  540. Clonings: make(map[string]CloningObject),
  541. }
  542. clonings[cloning.NewPackageID] = pkg
  543. }
  544. pkg.Clonings[cloning.NewPath] = CloningObject{
  545. Cloning: cloning,
  546. OrgIndex: i,
  547. }
  548. }
  549. ret := make([]*cdssdk.Object, len(msg.Clonings))
  550. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  551. // 剔除掉新路径已经存在的对象
  552. for _, pkg := range clonings {
  553. exists, err := svc.db2.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.Clonings))
  554. if err != nil {
  555. return fmt.Errorf("batch getting objects by package path: %w", err)
  556. }
  557. for _, obj := range exists {
  558. delete(pkg.Clonings, obj.Path)
  559. }
  560. }
  561. // 删除目的Package不存在的对象
  562. newPkg, err := svc.db2.Package().BatchTestPackageID(tx, lo.Keys(clonings))
  563. if err != nil {
  564. return fmt.Errorf("batch testing package id: %w", err)
  565. }
  566. for _, pkg := range clonings {
  567. if !newPkg[pkg.PackageID] {
  568. delete(clonings, pkg.PackageID)
  569. }
  570. }
  571. var avaiClonings []CloningObject
  572. var avaiObjIDs []cdssdk.ObjectID
  573. for _, pkg := range clonings {
  574. for _, cloning := range pkg.Clonings {
  575. avaiClonings = append(avaiClonings, cloning)
  576. avaiObjIDs = append(avaiObjIDs, cloning.Cloning.ObjectID)
  577. }
  578. }
  579. avaiDetails, err := svc.db2.Object().BatchGetDetails(tx, avaiObjIDs)
  580. if err != nil {
  581. return fmt.Errorf("batch getting object details: %w", err)
  582. }
  583. avaiDetailsMap := make(map[cdssdk.ObjectID]stgmod.ObjectDetail)
  584. for _, detail := range avaiDetails {
  585. avaiDetailsMap[detail.Object.ObjectID] = detail
  586. }
  587. oldAvaiClonings := avaiClonings
  588. avaiClonings = nil
  589. var newObjs []cdssdk.Object
  590. for _, cloning := range oldAvaiClonings {
  591. // 进一步剔除原始对象不存在的情况
  592. detail, ok := avaiDetailsMap[cloning.Cloning.ObjectID]
  593. if !ok {
  594. continue
  595. }
  596. avaiClonings = append(avaiClonings, cloning)
  597. newObj := detail.Object
  598. newObj.ObjectID = 0
  599. newObj.Path = cloning.Cloning.NewPath
  600. newObj.PackageID = cloning.Cloning.NewPackageID
  601. newObjs = append(newObjs, newObj)
  602. }
  603. // 先创建出新对象
  604. err = svc.db2.Object().BatchCreate(tx, &newObjs)
  605. if err != nil {
  606. return fmt.Errorf("batch creating objects: %w", err)
  607. }
  608. // 创建了新对象就能拿到新对象ID,再创建新对象块
  609. var newBlks []stgmod.ObjectBlock
  610. for i, cloning := range avaiClonings {
  611. oldBlks := avaiDetailsMap[cloning.Cloning.ObjectID].Blocks
  612. for _, blk := range oldBlks {
  613. newBlk := blk
  614. newBlk.ObjectID = newObjs[i].ObjectID
  615. newBlks = append(newBlks, newBlk)
  616. }
  617. }
  618. err = svc.db2.ObjectBlock().BatchCreate(tx, newBlks)
  619. if err != nil {
  620. return fmt.Errorf("batch creating object blocks: %w", err)
  621. }
  622. for i, cloning := range avaiClonings {
  623. ret[cloning.OrgIndex] = &newObjs[i]
  624. }
  625. for i, cloning := range avaiClonings {
  626. var evtBlks []stgmod.BlockDistributionObjectInfo
  627. blkType := getBlockTypeFromRed(newObjs[i].Redundancy)
  628. oldBlks := avaiDetailsMap[cloning.Cloning.ObjectID].Blocks
  629. for _, blk := range oldBlks {
  630. evtBlks = append(evtBlks, stgmod.BlockDistributionObjectInfo{
  631. BlockType: blkType,
  632. Index: blk.Index,
  633. StorageID: blk.StorageID,
  634. })
  635. }
  636. evt = append(evt, &stgmod.BodyNewOrUpdateObject{
  637. Info: newObjs[i],
  638. BlockDistribution: evtBlks,
  639. })
  640. }
  641. return nil
  642. })
  643. if err != nil {
  644. logger.Warnf("cloning objects: %s", err.Error())
  645. return nil, mq.Failed(errorcode.OperationFailed, err.Error())
  646. }
  647. for _, e := range evt {
  648. svc.evtPub.Publish(e)
  649. }
  650. return mq.ReplyOK(coormq.RespCloneObjects(ret))
  651. }
  652. func (svc *Service) NewMultipartUploadObject(msg *coormq.NewMultipartUploadObject) (*coormq.NewMultipartUploadObjectResp, *mq.CodeMessage) {
  653. var obj cdssdk.Object
  654. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  655. oldObj, err := svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path)
  656. if err == nil {
  657. obj = oldObj
  658. err := svc.db2.ObjectBlock().DeleteByObjectID(tx, obj.ObjectID)
  659. if err != nil {
  660. return fmt.Errorf("delete object blocks: %w", err)
  661. }
  662. obj.FileHash = cdssdk.EmptyHash
  663. obj.Size = 0
  664. obj.Redundancy = cdssdk.NewMultipartUploadRedundancy()
  665. obj.UpdateTime = time.Now()
  666. err = svc.db2.Object().BatchUpdate(tx, []cdssdk.Object{obj})
  667. if err != nil {
  668. return fmt.Errorf("update object: %w", err)
  669. }
  670. return nil
  671. }
  672. obj = cdssdk.Object{
  673. PackageID: msg.PackageID,
  674. Path: msg.Path,
  675. FileHash: cdssdk.EmptyHash,
  676. Size: 0,
  677. Redundancy: cdssdk.NewMultipartUploadRedundancy(),
  678. CreateTime: time.Now(),
  679. UpdateTime: time.Now(),
  680. }
  681. objID, err := svc.db2.Object().Create(tx, obj)
  682. if err != nil {
  683. return fmt.Errorf("create object: %w", err)
  684. }
  685. obj.ObjectID = objID
  686. return nil
  687. })
  688. if err != nil {
  689. logger.Warnf("new multipart upload object: %s", err.Error())
  690. return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("new multipart upload object: %v", err))
  691. }
  692. return mq.ReplyOK(coormq.RespNewMultipartUploadObject(obj))
  693. }
  694. func (svc *Service) AddMultipartUploadPart(msg *coormq.AddMultipartUploadPart) (*coormq.AddMultipartUploadPartResp, *mq.CodeMessage) {
  695. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  696. obj, err := svc.db2.Object().GetByID(tx, msg.ObjectID)
  697. if err != nil {
  698. return fmt.Errorf("getting object by id: %w", err)
  699. }
  700. _, ok := obj.Redundancy.(*cdssdk.MultipartUploadRedundancy)
  701. if !ok {
  702. return fmt.Errorf("object is not a multipart upload object")
  703. }
  704. blks, err := svc.db2.ObjectBlock().BatchGetByObjectID(tx, []cdssdk.ObjectID{obj.ObjectID})
  705. if err != nil {
  706. return fmt.Errorf("batch getting object blocks: %w", err)
  707. }
  708. blks = lo.Reject(blks, func(blk stgmod.ObjectBlock, idx int) bool { return blk.Index == msg.Block.Index })
  709. blks = append(blks, msg.Block)
  710. blks = sort2.Sort(blks, func(a, b stgmod.ObjectBlock) int { return a.Index - b.Index })
  711. totalSize := int64(0)
  712. var hashes [][]byte
  713. for _, blk := range blks {
  714. totalSize += blk.Size
  715. hashes = append(hashes, blk.FileHash.GetHashBytes())
  716. }
  717. newObjHash := cdssdk.CalculateCompositeHash(hashes)
  718. obj.Size = totalSize
  719. obj.FileHash = newObjHash
  720. obj.UpdateTime = time.Now()
  721. err = svc.db2.ObjectBlock().DeleteByObjectIDIndex(tx, msg.ObjectID, msg.Block.Index)
  722. if err != nil {
  723. return fmt.Errorf("delete object block: %w", err)
  724. }
  725. err = svc.db2.ObjectBlock().Create(tx, msg.ObjectID, msg.Block.Index, msg.Block.StorageID, msg.Block.FileHash, msg.Block.Size)
  726. if err != nil {
  727. return fmt.Errorf("create object block: %w", err)
  728. }
  729. err = svc.db2.Object().BatchUpdate(tx, []cdssdk.Object{obj})
  730. if err != nil {
  731. return fmt.Errorf("update object: %w", err)
  732. }
  733. return nil
  734. })
  735. if err != nil {
  736. logger.Warnf("add multipart upload part: %s", err.Error())
  737. code := errorcode.OperationFailed
  738. if errors.Is(err, gorm.ErrRecordNotFound) {
  739. code = errorcode.DataNotFound
  740. }
  741. return nil, mq.Failed(code, fmt.Sprintf("add multipart upload part: %v", err))
  742. }
  743. return mq.ReplyOK(coormq.RespAddMultipartUploadPart())
  744. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。