You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 14 kB

11 months ago
11 months ago
2 years ago
11 months ago
11 months ago
11 months ago
11 months ago
2 years ago
11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. package mq
  2. import (
  3. "errors"
  4. "fmt"
  5. "gitlink.org.cn/cloudream/storage/common/pkgs/db2"
  6. "gorm.io/gorm"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/common/consts/errorcode"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. "gitlink.org.cn/cloudream/common/pkgs/mq"
  11. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  12. "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
  13. "gitlink.org.cn/cloudream/common/utils/sort2"
  14. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  15. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  16. )
  17. func (svc *Service) GetObjectsByPath(msg *coormq.GetObjectsByPath) (*coormq.GetObjectsByPathResp, *mq.CodeMessage) {
  18. var objs []cdssdk.Object
  19. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  20. var err error
  21. _, err = svc.db2.Package().GetUserPackage(tx, msg.UserID, msg.PackageID)
  22. if err != nil {
  23. return fmt.Errorf("getting package by id: %w", err)
  24. }
  25. if msg.IsPrefix {
  26. objs, err = svc.db2.Object().GetWithPathPrefix(tx, msg.PackageID, msg.Path)
  27. if err != nil {
  28. return fmt.Errorf("getting objects with prefix: %w", err)
  29. }
  30. } else {
  31. objs, err = svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path)
  32. if err != nil {
  33. return fmt.Errorf("getting object by path: %w", err)
  34. }
  35. }
  36. return nil
  37. })
  38. if err != nil {
  39. logger.WithField("PathPrefix", msg.Path).Warn(err.Error())
  40. return nil, mq.Failed(errorcode.OperationFailed, "get objects with prefix failed")
  41. }
  42. return mq.ReplyOK(coormq.RespGetObjectsByPath(objs))
  43. }
  44. func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) {
  45. var objs []cdssdk.Object
  46. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  47. _, err := svc.db2.Package().GetUserPackage(tx, msg.UserID, msg.PackageID)
  48. if err != nil {
  49. return fmt.Errorf("getting package by id: %w", err)
  50. }
  51. objs, err = svc.db2.Object().GetPackageObjects(tx, msg.PackageID)
  52. if err != nil {
  53. return fmt.Errorf("getting package objects: %w", err)
  54. }
  55. return nil
  56. })
  57. if err != nil {
  58. logger.WithField("UserID", msg.UserID).WithField("PackageID", msg.PackageID).
  59. Warn(err.Error())
  60. return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed")
  61. }
  62. return mq.ReplyOK(coormq.RespGetPackageObjects(objs))
  63. }
  64. func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) (*coormq.GetPackageObjectDetailsResp, *mq.CodeMessage) {
  65. var details []stgmod.ObjectDetail
  66. // 必须放在事务里进行,因为GetPackageBlockDetails是由多次数据库操作组成,必须保证数据的一致性
  67. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  68. var err error
  69. _, err = svc.db2.Package().GetByID(tx, msg.PackageID)
  70. if err != nil {
  71. return fmt.Errorf("getting package by id: %w", err)
  72. }
  73. details, err = svc.db2.Object().GetPackageObjectDetails(tx, msg.PackageID)
  74. if err != nil {
  75. return fmt.Errorf("getting package block details: %w", err)
  76. }
  77. return nil
  78. })
  79. if err != nil {
  80. logger.WithField("PackageID", msg.PackageID).Warn(err.Error())
  81. return nil, mq.Failed(errorcode.OperationFailed, "get package object block details failed")
  82. }
  83. return mq.ReplyOK(coormq.RespPackageObjectDetails(details))
  84. }
  85. func (svc *Service) GetObjectDetails(msg *coormq.GetObjectDetails) (*coormq.GetObjectDetailsResp, *mq.CodeMessage) {
  86. detailsMp := make(map[cdssdk.ObjectID]*stgmod.ObjectDetail)
  87. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  88. var err error
  89. msg.ObjectIDs = sort2.SortAsc(msg.ObjectIDs)
  90. // 根据ID依次查询Object,ObjectBlock,PinnedObject,并根据升序的特点进行合并
  91. objs, err := svc.db2.Object().BatchGet(tx, msg.ObjectIDs)
  92. if err != nil {
  93. return fmt.Errorf("batch get objects: %w", err)
  94. }
  95. for _, obj := range objs {
  96. detailsMp[obj.ObjectID] = &stgmod.ObjectDetail{
  97. Object: obj,
  98. }
  99. }
  100. // 查询合并
  101. blocks, err := svc.db2.ObjectBlock().BatchGetByObjectID(tx, msg.ObjectIDs)
  102. if err != nil {
  103. return fmt.Errorf("batch get object blocks: %w", err)
  104. }
  105. for _, block := range blocks {
  106. d := detailsMp[block.ObjectID]
  107. d.Blocks = append(d.Blocks, block)
  108. }
  109. // 查询合并
  110. pinneds, err := svc.db2.PinnedObject().BatchGetByObjectID(tx, msg.ObjectIDs)
  111. if err != nil {
  112. return fmt.Errorf("batch get pinned objects: %w", err)
  113. }
  114. for _, pinned := range pinneds {
  115. d := detailsMp[pinned.ObjectID]
  116. d.PinnedAt = append(d.PinnedAt, pinned.StorageID)
  117. }
  118. return nil
  119. })
  120. if err != nil {
  121. logger.Warn(err.Error())
  122. return nil, mq.Failed(errorcode.OperationFailed, "get object details failed")
  123. }
  124. details := make([]*stgmod.ObjectDetail, len(msg.ObjectIDs))
  125. for i, objID := range msg.ObjectIDs {
  126. details[i] = detailsMp[objID]
  127. }
  128. return mq.ReplyOK(coormq.RespGetObjectDetails(details))
  129. }
  130. func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) (*coormq.UpdateObjectRedundancyResp, *mq.CodeMessage) {
  131. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  132. return svc.db2.Object().BatchUpdateRedundancy(tx, msg.Updatings)
  133. })
  134. if err != nil {
  135. logger.Warnf("batch updating redundancy: %s", err.Error())
  136. return nil, mq.Failed(errorcode.OperationFailed, "batch update redundancy failed")
  137. }
  138. return mq.ReplyOK(coormq.RespUpdateObjectRedundancy())
  139. }
  140. func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.UpdateObjectInfosResp, *mq.CodeMessage) {
  141. var sucs []cdssdk.ObjectID
  142. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  143. msg.Updatings = sort2.Sort(msg.Updatings, func(o1, o2 cdsapi.UpdatingObject) int {
  144. return sort2.Cmp(o1.ObjectID, o2.ObjectID)
  145. })
  146. objIDs := make([]cdssdk.ObjectID, len(msg.Updatings))
  147. for i, obj := range msg.Updatings {
  148. objIDs[i] = obj.ObjectID
  149. }
  150. oldObjs, err := svc.db2.Object().BatchGet(tx, objIDs)
  151. if err != nil {
  152. return fmt.Errorf("batch getting objects: %w", err)
  153. }
  154. oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs))
  155. for i, obj := range oldObjs {
  156. oldObjIDs[i] = obj.ObjectID
  157. }
  158. avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs, func(obj cdsapi.UpdatingObject) cdssdk.ObjectID { return obj.ObjectID })
  159. if len(notExistsObjs) > 0 {
  160. // TODO 部分对象已经不存在
  161. }
  162. newObjs := make([]cdssdk.Object, len(avaiUpdatings))
  163. for i := range newObjs {
  164. newObjs[i] = oldObjs[i]
  165. avaiUpdatings[i].ApplyTo(&newObjs[i])
  166. }
  167. err = svc.db2.Object().BatchUpdate(tx, newObjs)
  168. if err != nil {
  169. return fmt.Errorf("batch create or update: %w", err)
  170. }
  171. sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID })
  172. return nil
  173. })
  174. if err != nil {
  175. logger.Warnf("batch updating objects: %s", err.Error())
  176. return nil, mq.Failed(errorcode.OperationFailed, "batch update objects failed")
  177. }
  178. return mq.ReplyOK(coormq.RespUpdateObjectInfos(sucs))
  179. }
  180. // 根据objIDs从objs中挑选Object。
  181. // len(objs) >= len(objIDs)
  182. func pickByObjectIDs[T any](objs []T, objIDs []cdssdk.ObjectID, getID func(T) cdssdk.ObjectID) (picked []T, notFound []T) {
  183. objIdx := 0
  184. idIdx := 0
  185. for idIdx < len(objIDs) && objIdx < len(objs) {
  186. if getID(objs[objIdx]) < objIDs[idIdx] {
  187. notFound = append(notFound, objs[objIdx])
  188. objIdx++
  189. continue
  190. }
  191. picked = append(picked, objs[objIdx])
  192. objIdx++
  193. idIdx++
  194. }
  195. return
  196. }
  197. func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsResp, *mq.CodeMessage) {
  198. var sucs []cdssdk.ObjectID
  199. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  200. msg.Movings = sort2.Sort(msg.Movings, func(o1, o2 cdsapi.MovingObject) int {
  201. return sort2.Cmp(o1.ObjectID, o2.ObjectID)
  202. })
  203. objIDs := make([]cdssdk.ObjectID, len(msg.Movings))
  204. for i, obj := range msg.Movings {
  205. objIDs[i] = obj.ObjectID
  206. }
  207. oldObjs, err := svc.db2.Object().BatchGet(tx, objIDs)
  208. if err != nil {
  209. return fmt.Errorf("batch getting objects: %w", err)
  210. }
  211. oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs))
  212. for i, obj := range oldObjs {
  213. oldObjIDs[i] = obj.ObjectID
  214. }
  215. // 找出仍在数据库的Object
  216. avaiMovings, notExistsObjs := pickByObjectIDs(msg.Movings, oldObjIDs, func(obj cdsapi.MovingObject) cdssdk.ObjectID { return obj.ObjectID })
  217. if len(notExistsObjs) > 0 {
  218. // TODO 部分对象已经不存在
  219. }
  220. // 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突
  221. var pkgIDChangedObjs []cdssdk.Object
  222. var pathChangedObjs []cdssdk.Object
  223. for i := range avaiMovings {
  224. if avaiMovings[i].PackageID != oldObjs[i].PackageID {
  225. newObj := oldObjs[i]
  226. avaiMovings[i].ApplyTo(&newObj)
  227. pkgIDChangedObjs = append(pkgIDChangedObjs, newObj)
  228. } else if avaiMovings[i].Path != oldObjs[i].Path {
  229. newObj := oldObjs[i]
  230. avaiMovings[i].ApplyTo(&newObj)
  231. pathChangedObjs = append(pathChangedObjs, newObj)
  232. }
  233. }
  234. var newObjs []cdssdk.Object
  235. // 对于PackageID发生变化的对象,需要检查目标Package内是否存在同Path的对象
  236. checkedObjs, err := svc.checkPackageChangedObjects(tx, msg.UserID, pkgIDChangedObjs)
  237. if err != nil {
  238. return err
  239. }
  240. newObjs = append(newObjs, checkedObjs...)
  241. // 对于只有Path发生变化的对象,则检查同Package内有没有同Path的对象
  242. checkedObjs, err = svc.checkPathChangedObjects(tx, msg.UserID, pathChangedObjs)
  243. if err != nil {
  244. return err
  245. }
  246. newObjs = append(newObjs, checkedObjs...)
  247. err = svc.db2.Object().BatchUpdate(tx, newObjs)
  248. if err != nil {
  249. return fmt.Errorf("batch create or update: %w", err)
  250. }
  251. sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID })
  252. return nil
  253. })
  254. if err != nil {
  255. logger.Warn(err.Error())
  256. return nil, mq.Failed(errorcode.OperationFailed, "move objects failed")
  257. }
  258. return mq.ReplyOK(coormq.RespMoveObjects(sucs))
  259. }
  260. func (svc *Service) checkPackageChangedObjects(tx db2.SQLContext, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) {
  261. if len(objs) == 0 {
  262. return nil, nil
  263. }
  264. type PackageObjects struct {
  265. PackageID cdssdk.PackageID
  266. ObjectByPath map[string]*cdssdk.Object
  267. }
  268. packages := make(map[cdssdk.PackageID]*PackageObjects)
  269. for _, obj := range objs {
  270. pkg, ok := packages[obj.PackageID]
  271. if !ok {
  272. pkg = &PackageObjects{
  273. PackageID: obj.PackageID,
  274. ObjectByPath: make(map[string]*cdssdk.Object),
  275. }
  276. packages[obj.PackageID] = pkg
  277. }
  278. if pkg.ObjectByPath[obj.Path] == nil {
  279. o := obj
  280. pkg.ObjectByPath[obj.Path] = &o
  281. } else {
  282. // TODO 有两个对象移动到同一个路径,有冲突
  283. }
  284. }
  285. var willUpdateObjs []cdssdk.Object
  286. for _, pkg := range packages {
  287. _, err := svc.db2.Package().GetUserPackage(tx, userID, pkg.PackageID)
  288. if errors.Is(err, gorm.ErrRecordNotFound) {
  289. continue
  290. }
  291. if err != nil {
  292. return nil, fmt.Errorf("getting user package by id: %w", err)
  293. }
  294. existsObjs, err := svc.db2.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath))
  295. if err != nil {
  296. return nil, fmt.Errorf("batch getting objects by package path: %w", err)
  297. }
  298. // 标记冲突的对象
  299. for _, obj := range existsObjs {
  300. pkg.ObjectByPath[obj.Path] = nil
  301. // TODO 目标Package内有冲突的对象
  302. }
  303. for _, obj := range pkg.ObjectByPath {
  304. if obj == nil {
  305. continue
  306. }
  307. willUpdateObjs = append(willUpdateObjs, *obj)
  308. }
  309. }
  310. return willUpdateObjs, nil
  311. }
  312. func (svc *Service) checkPathChangedObjects(tx db2.SQLContext, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) {
  313. if len(objs) == 0 {
  314. return nil, nil
  315. }
  316. objByPath := make(map[string]*cdssdk.Object)
  317. for _, obj := range objs {
  318. if objByPath[obj.Path] == nil {
  319. o := obj
  320. objByPath[obj.Path] = &o
  321. } else {
  322. // TODO 有两个对象移动到同一个路径,有冲突
  323. }
  324. }
  325. _, err := svc.db2.Package().GetUserPackage(tx, userID, objs[0].PackageID)
  326. if errors.Is(err, gorm.ErrRecordNotFound) {
  327. return nil, nil
  328. }
  329. if err != nil {
  330. return nil, fmt.Errorf("getting user package by id: %w", err)
  331. }
  332. existsObjs, err := svc.db2.Object().BatchGetByPackagePath(tx, objs[0].PackageID, lo.Map(objs, func(obj cdssdk.Object, idx int) string { return obj.Path }))
  333. if err != nil {
  334. return nil, fmt.Errorf("batch getting objects by package path: %w", err)
  335. }
  336. // 不支持两个对象交换位置的情况,因为数据库不支持
  337. for _, obj := range existsObjs {
  338. objByPath[obj.Path] = nil
  339. }
  340. var willMoveObjs []cdssdk.Object
  341. for _, obj := range objByPath {
  342. if obj == nil {
  343. continue
  344. }
  345. willMoveObjs = append(willMoveObjs, *obj)
  346. }
  347. return willMoveObjs, nil
  348. }
  349. func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObjectsResp, *mq.CodeMessage) {
  350. err := svc.db2.DoTx(func(tx db2.SQLContext) error {
  351. err := svc.db2.Object().BatchDelete(tx, msg.ObjectIDs)
  352. if err != nil {
  353. return fmt.Errorf("batch deleting objects: %w", err)
  354. }
  355. err = svc.db2.ObjectBlock().BatchDeleteByObjectID(tx, msg.ObjectIDs)
  356. if err != nil {
  357. return fmt.Errorf("batch deleting object blocks: %w", err)
  358. }
  359. err = svc.db2.PinnedObject().BatchDeleteByObjectID(tx, msg.ObjectIDs)
  360. if err != nil {
  361. return fmt.Errorf("batch deleting pinned objects: %w", err)
  362. }
  363. err = svc.db2.ObjectAccessStat().BatchDeleteByObjectID(tx, msg.ObjectIDs)
  364. if err != nil {
  365. return fmt.Errorf("batch deleting object access stats: %w", err)
  366. }
  367. return nil
  368. })
  369. if err != nil {
  370. logger.Warnf("batch deleting objects: %s", err.Error())
  371. return nil, mq.Failed(errorcode.OperationFailed, "batch delete objects failed")
  372. }
  373. return mq.ReplyOK(coormq.RespDeleteObjects())
  374. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。