You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 13 kB

11 months ago
1 year ago
1 year ago

  1. package db2
  2. import (
  3. "fmt"
  4. "time"
  5. "gorm.io/gorm/clause"
  6. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  7. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  8. "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model"
  9. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  10. )
  11. type ObjectDB struct {
  12. *DB
  13. }
  14. func (db *DB) Object() *ObjectDB {
  15. return &ObjectDB{DB: db}
  16. }
  17. func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (cdssdk.Object, error) {
  18. var ret cdssdk.Object
  19. err := ctx.Table("Object").Where("ObjectID = ?", objectID).First(&ret).Error
  20. return ret, err
  21. }
  22. func (db *ObjectDB) GetByPath(ctx SQLContext, packageID cdssdk.PackageID, path string) ([]cdssdk.Object, error) {
  23. var ret []cdssdk.Object
  24. err := ctx.Table("Object").Where("PackageID = ? AND Path = ?", packageID, path).Find(&ret).Error
  25. return ret, err
  26. }
  27. func (db *ObjectDB) GetWithPathPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) {
  28. var ret []cdssdk.Object
  29. err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, pathPrefix+"%").Order("ObjectID ASC").Find(&ret).Error
  30. return ret, err
  31. }
  32. func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) {
  33. if len(objectIDs) == 0 {
  34. return make(map[cdssdk.ObjectID]bool), nil
  35. }
  36. var avaiIDs []cdssdk.ObjectID
  37. err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Pluck("ObjectID", &avaiIDs).Error
  38. if err != nil {
  39. return nil, err
  40. }
  41. avaiIDMap := make(map[cdssdk.ObjectID]bool)
  42. for _, pkgID := range avaiIDs {
  43. avaiIDMap[pkgID] = true
  44. }
  45. return avaiIDMap, nil
  46. }
  47. func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]cdssdk.Object, error) {
  48. if len(objectIDs) == 0 {
  49. return nil, nil
  50. }
  51. var objs []cdssdk.Object
  52. err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&objs).Error
  53. if err != nil {
  54. return nil, err
  55. }
  56. return objs, nil
  57. }
  58. func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) {
  59. if len(pathes) == 0 {
  60. return nil, nil
  61. }
  62. var objs []cdssdk.Object
  63. err := ctx.Table("Object").Where("PackageID = ? AND Path IN ?", pkgID, pathes).Find(&objs).Error
  64. if err != nil {
  65. return nil, err
  66. }
  67. return objs, nil
  68. }
  69. // 仅返回查询到的对象
  70. func (db *ObjectDB) BatchGetDetails(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectDetail, error) {
  71. var objs []cdssdk.Object
  72. err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&objs).Error
  73. if err != nil {
  74. return nil, err
  75. }
  76. // 获取所有的 ObjectBlock
  77. var allBlocks []stgmod.ObjectBlock
  78. err = ctx.Table("ObjectBlock").Where("ObjectID IN ?", objectIDs).Order("ObjectID, `Index` ASC").Find(&allBlocks).Error
  79. if err != nil {
  80. return nil, err
  81. }
  82. // 获取所有的 PinnedObject
  83. var allPinnedObjs []cdssdk.PinnedObject
  84. err = ctx.Table("PinnedObject").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&allPinnedObjs).Error
  85. if err != nil {
  86. return nil, err
  87. }
  88. details := make([]stgmod.ObjectDetail, len(objs))
  89. for i, obj := range objs {
  90. details[i] = stgmod.ObjectDetail{
  91. Object: obj,
  92. }
  93. }
  94. stgmod.DetailsFillObjectBlocks(details, allBlocks)
  95. stgmod.DetailsFillPinnedAt(details, allPinnedObjs)
  96. return details, nil
  97. }
  98. func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) {
  99. err := ctx.Table("Object").Create(&obj).Error
  100. if err != nil {
  101. return 0, fmt.Errorf("insert object failed, err: %w", err)
  102. }
  103. return obj.ObjectID, nil
  104. }
  105. // 批量创建对象,创建完成后会填充ObjectID。
  106. func (db *ObjectDB) BatchCreate(ctx SQLContext, objs *[]cdssdk.Object) error {
  107. if len(*objs) == 0 {
  108. return nil
  109. }
  110. return ctx.Table("Object").Create(objs).Error
  111. }
  112. // 批量更新对象所有属性,objs中的对象必须包含ObjectID
  113. func (db *ObjectDB) BatchUpdate(ctx SQLContext, objs []cdssdk.Object) error {
  114. if len(objs) == 0 {
  115. return nil
  116. }
  117. return ctx.Clauses(clause.OnConflict{
  118. Columns: []clause.Column{{Name: "ObjectID"}},
  119. UpdateAll: true,
  120. }).Create(objs).Error
  121. }
  122. // 批量更新对象指定属性,objs中的对象只需设置需要更新的属性即可,但:
  123. // 1. 必须包含ObjectID
  124. // 2. 日期类型属性不能设置为0值
  125. func (db *ObjectDB) BatchUpdateColumns(ctx SQLContext, objs []cdssdk.Object, columns []string) error {
  126. if len(objs) == 0 {
  127. return nil
  128. }
  129. return ctx.Clauses(clause.OnConflict{
  130. Columns: []clause.Column{{Name: "ObjectID"}},
  131. DoUpdates: clause.AssignmentColumns(columns),
  132. }).Create(objs).Error
  133. }
  134. func (db *ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]cdssdk.Object, error) {
  135. var ret []cdssdk.Object
  136. err := ctx.Table("Object").Where("PackageID = ?", packageID).Order("ObjectID ASC").Find(&ret).Error
  137. return ret, err
  138. }
  139. func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) {
  140. var objs []cdssdk.Object
  141. err := ctx.Table("Object").Where("PackageID = ?", packageID).Order("ObjectID ASC").Find(&objs).Error
  142. if err != nil {
  143. return nil, fmt.Errorf("getting objects: %w", err)
  144. }
  145. // 获取所有的 ObjectBlock
  146. var allBlocks []stgmod.ObjectBlock
  147. err = ctx.Table("ObjectBlock").
  148. Select("ObjectBlock.*").
  149. Joins("JOIN Object ON ObjectBlock.ObjectID = Object.ObjectID").
  150. Where("Object.PackageID = ?", packageID).
  151. Order("ObjectBlock.ObjectID, `Index` ASC").
  152. Find(&allBlocks).Error
  153. if err != nil {
  154. return nil, fmt.Errorf("getting all object blocks: %w", err)
  155. }
  156. // 获取所有的 PinnedObject
  157. var allPinnedObjs []cdssdk.PinnedObject
  158. err = ctx.Table("PinnedObject").
  159. Select("PinnedObject.*").
  160. Joins("JOIN Object ON PinnedObject.ObjectID = Object.ObjectID").
  161. Where("Object.PackageID = ?", packageID).
  162. Order("PinnedObject.ObjectID").
  163. Find(&allPinnedObjs).Error
  164. if err != nil {
  165. return nil, fmt.Errorf("getting all pinned objects: %w", err)
  166. }
  167. details := make([]stgmod.ObjectDetail, len(objs))
  168. for i, obj := range objs {
  169. details[i] = stgmod.ObjectDetail{
  170. Object: obj,
  171. }
  172. }
  173. stgmod.DetailsFillObjectBlocks(details, allBlocks)
  174. stgmod.DetailsFillPinnedAt(details, allPinnedObjs)
  175. return details, nil
  176. }
  177. func (db *ObjectDB) GetObjectsIfAnyBlockOnStorage(ctx SQLContext, stgID cdssdk.StorageID) ([]cdssdk.Object, error) {
  178. var objs []cdssdk.Object
  179. err := ctx.Table("Object").Where("ObjectID IN (SELECT ObjectID FROM ObjectBlock WHERE StorageID = ?)", stgID).Order("ObjectID ASC").Find(&objs).Error
  180. if err != nil {
  181. return nil, fmt.Errorf("getting objects: %w", err)
  182. }
  183. return objs, nil
  184. }
  185. func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.Object, error) {
  186. if len(adds) == 0 {
  187. return nil, nil
  188. }
  189. // 收集所有路径
  190. pathes := make([]string, 0, len(adds))
  191. for _, add := range adds {
  192. pathes = append(pathes, add.Path)
  193. }
  194. // 先查询要更新的对象,不存在也没关系
  195. existsObjs, err := db.BatchGetByPackagePath(ctx, packageID, pathes)
  196. if err != nil {
  197. return nil, fmt.Errorf("batch get object by path: %w", err)
  198. }
  199. existsObjsMap := make(map[string]cdssdk.Object)
  200. for _, obj := range existsObjs {
  201. existsObjsMap[obj.Path] = obj
  202. }
  203. var updatingObjs []cdssdk.Object
  204. var addingObjs []cdssdk.Object
  205. for i := range adds {
  206. o := cdssdk.Object{
  207. PackageID: packageID,
  208. Path: adds[i].Path,
  209. Size: adds[i].Size,
  210. FileHash: adds[i].FileHash,
  211. Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式
  212. CreateTime: adds[i].UploadTime,
  213. UpdateTime: adds[i].UploadTime,
  214. }
  215. e, ok := existsObjsMap[adds[i].Path]
  216. if ok {
  217. o.ObjectID = e.ObjectID
  218. o.CreateTime = e.CreateTime
  219. updatingObjs = append(updatingObjs, o)
  220. } else {
  221. addingObjs = append(addingObjs, o)
  222. }
  223. }
  224. // 先进行更新
  225. err = db.BatchUpdate(ctx, updatingObjs)
  226. if err != nil {
  227. return nil, fmt.Errorf("batch update objects: %w", err)
  228. }
  229. // 再执行插入,Create函数插入后会填充ObjectID
  230. err = db.BatchCreate(ctx, &addingObjs)
  231. if err != nil {
  232. return nil, fmt.Errorf("batch create objects: %w", err)
  233. }
  234. // 按照add参数的顺序返回结果
  235. affectedObjsMp := make(map[string]cdssdk.Object)
  236. for _, o := range updatingObjs {
  237. affectedObjsMp[o.Path] = o
  238. }
  239. for _, o := range addingObjs {
  240. affectedObjsMp[o.Path] = o
  241. }
  242. affectedObjs := make([]cdssdk.Object, 0, len(affectedObjsMp))
  243. affectedObjIDs := make([]cdssdk.ObjectID, 0, len(affectedObjsMp))
  244. for i := range adds {
  245. obj := affectedObjsMp[adds[i].Path]
  246. affectedObjs = append(affectedObjs, obj)
  247. affectedObjIDs = append(affectedObjIDs, obj.ObjectID)
  248. }
  249. if len(affectedObjIDs) > 0 {
  250. // 批量删除 ObjectBlock
  251. if err := ctx.Table("ObjectBlock").Where("ObjectID IN ?", affectedObjIDs).Delete(&stgmod.ObjectBlock{}).Error; err != nil {
  252. return nil, fmt.Errorf("batch delete object blocks: %w", err)
  253. }
  254. // 批量删除 PinnedObject
  255. if err := ctx.Table("PinnedObject").Where("ObjectID IN ?", affectedObjIDs).Delete(&cdssdk.PinnedObject{}).Error; err != nil {
  256. return nil, fmt.Errorf("batch delete pinned objects: %w", err)
  257. }
  258. }
  259. // 创建 ObjectBlock
  260. objBlocks := make([]stgmod.ObjectBlock, 0, len(adds))
  261. for i, add := range adds {
  262. for _, stgID := range add.StorageIDs {
  263. objBlocks = append(objBlocks, stgmod.ObjectBlock{
  264. ObjectID: affectedObjIDs[i],
  265. Index: 0,
  266. StorageID: stgID,
  267. FileHash: add.FileHash,
  268. })
  269. }
  270. }
  271. if err := db.ObjectBlock().BatchCreate(ctx, objBlocks); err != nil {
  272. return nil, fmt.Errorf("batch create object blocks: %w", err)
  273. }
  274. // 创建 Cache
  275. caches := make([]model.Cache, 0, len(adds))
  276. for _, add := range adds {
  277. for _, stgID := range add.StorageIDs {
  278. caches = append(caches, model.Cache{
  279. FileHash: add.FileHash,
  280. StorageID: stgID,
  281. CreateTime: time.Now(),
  282. Priority: 0,
  283. })
  284. }
  285. }
  286. if err := db.Cache().BatchCreate(ctx, caches); err != nil {
  287. return nil, fmt.Errorf("batch create caches: %w", err)
  288. }
  289. return affectedObjs, nil
  290. }
  291. func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.UpdatingObjectRedundancy) error {
  292. if len(objs) == 0 {
  293. return nil
  294. }
  295. nowTime := time.Now()
  296. objIDs := make([]cdssdk.ObjectID, 0, len(objs))
  297. dummyObjs := make([]cdssdk.Object, 0, len(objs))
  298. for _, obj := range objs {
  299. objIDs = append(objIDs, obj.ObjectID)
  300. dummyObjs = append(dummyObjs, cdssdk.Object{
  301. ObjectID: obj.ObjectID,
  302. Redundancy: obj.Redundancy,
  303. CreateTime: nowTime, // 实际不会更新,只因为不能是0值
  304. UpdateTime: nowTime,
  305. })
  306. }
  307. err := db.Object().BatchUpdateColumns(ctx, dummyObjs, []string{"Redundancy", "UpdateTime"})
  308. if err != nil {
  309. return fmt.Errorf("batch update object redundancy: %w", err)
  310. }
  311. // 删除原本所有的编码块记录,重新添加
  312. err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs)
  313. if err != nil {
  314. return fmt.Errorf("batch delete object blocks: %w", err)
  315. }
  316. // 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况
  317. err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs)
  318. if err != nil {
  319. return fmt.Errorf("batch delete pinned object: %w", err)
  320. }
  321. blocks := make([]stgmod.ObjectBlock, 0, len(objs))
  322. for _, obj := range objs {
  323. blocks = append(blocks, obj.Blocks...)
  324. }
  325. err = db.ObjectBlock().BatchCreate(ctx, blocks)
  326. if err != nil {
  327. return fmt.Errorf("batch create object blocks: %w", err)
  328. }
  329. caches := make([]model.Cache, 0, len(objs))
  330. for _, obj := range objs {
  331. for _, blk := range obj.Blocks {
  332. caches = append(caches, model.Cache{
  333. FileHash: blk.FileHash,
  334. StorageID: blk.StorageID,
  335. CreateTime: nowTime,
  336. Priority: 0,
  337. })
  338. }
  339. }
  340. err = db.Cache().BatchCreate(ctx, caches)
  341. if err != nil {
  342. return fmt.Errorf("batch create object caches: %w", err)
  343. }
  344. pinneds := make([]cdssdk.PinnedObject, 0, len(objs))
  345. for _, obj := range objs {
  346. for _, p := range obj.PinnedAt {
  347. pinneds = append(pinneds, cdssdk.PinnedObject{
  348. ObjectID: obj.ObjectID,
  349. StorageID: p,
  350. CreateTime: nowTime,
  351. })
  352. }
  353. }
  354. err = db.PinnedObject().BatchTryCreate(ctx, pinneds)
  355. if err != nil {
  356. return fmt.Errorf("batch create pinned objects: %w", err)
  357. }
  358. return nil
  359. }
  360. func (db *ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error {
  361. if len(ids) == 0 {
  362. return nil
  363. }
  364. return ctx.Table("Object").Where("ObjectID IN ?", ids).Delete(&cdssdk.Object{}).Error
  365. }
  366. func (db *ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
  367. return ctx.Table("Object").Where("PackageID = ?", packageID).Delete(&cdssdk.Object{}).Error
  368. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。