You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

object.go 20 kB

7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
1 year ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
1 year ago
7 months ago
1 year ago
7 months ago
1 year ago
7 months ago
1 year ago
7 months ago
1 year ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. package services
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "time"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. "gitlink.org.cn/cloudream/common/utils/sort2"
  11. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  12. "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader"
  13. "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api"
  14. "gitlink.org.cn/cloudream/jcs-pub/client/types"
  15. "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap"
  16. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2"
  17. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/plans"
  18. "gorm.io/gorm"
  19. )
  20. // ObjectService 定义了对象服务,负责管理对象的上传、下载等操作。
  21. type ObjectService struct {
  22. *Service
  23. }
  24. // ObjectSvc 返回一个ObjectService的实例。
  25. func (svc *Service) ObjectSvc() *ObjectService {
  26. return &ObjectService{Service: svc}
  27. }
  28. func (svc *ObjectService) GetByPath(req api.ObjectListByPath) (api.ObjectListByPathResp, error) {
  29. var resp api.ObjectListByPathResp
  30. maxKeys := 1000
  31. if req.MaxKeys > 0 {
  32. maxKeys = req.MaxKeys
  33. }
  34. err := svc.DB.DoTx(func(tx db.SQLContext) error {
  35. var err error
  36. _, err = svc.DB.Package().GetByID(tx, req.PackageID)
  37. if err != nil {
  38. return fmt.Errorf("getting package by id: %w", err)
  39. }
  40. if !req.IsPrefix {
  41. obj, err := svc.DB.Object().GetByPath(tx, req.PackageID, req.Path)
  42. if err != nil {
  43. return fmt.Errorf("getting object by path: %w", err)
  44. }
  45. resp.Objects = append(resp.Objects, obj)
  46. return nil
  47. }
  48. if !req.NoRecursive {
  49. resp.Objects, err = svc.DB.Object().GetWithPathPrefixPaged(tx, req.PackageID, req.Path, req.ContinuationToken, maxKeys)
  50. if err != nil {
  51. return fmt.Errorf("getting objects with prefix: %w", err)
  52. }
  53. if len(resp.Objects) > 0 {
  54. resp.NextContinuationToken = resp.Objects[len(resp.Objects)-1].Path
  55. }
  56. return nil
  57. }
  58. resp.Objects, resp.CommonPrefixes, resp.NextContinuationToken, err = svc.DB.Object().GetByPrefixGroupedPaged(tx, req.PackageID, req.Path, req.ContinuationToken, maxKeys)
  59. return err
  60. })
  61. return resp, err
  62. }
  63. func (svc *ObjectService) GetByIDs(objectIDs []types.ObjectID) ([]*types.Object, error) {
  64. var ret []*types.Object
  65. err := svc.DB.DoTx(func(tx db.SQLContext) error {
  66. objs, err := svc.DB.Object().BatchGet(tx, objectIDs)
  67. if err != nil {
  68. return err
  69. }
  70. objMp := make(map[types.ObjectID]types.Object)
  71. for _, obj := range objs {
  72. objMp[obj.ObjectID] = obj
  73. }
  74. for _, objID := range objectIDs {
  75. o, ok := objMp[objID]
  76. if ok {
  77. ret = append(ret, &o)
  78. } else {
  79. ret = append(ret, nil)
  80. }
  81. }
  82. return err
  83. })
  84. return ret, err
  85. }
  86. func (svc *ObjectService) UpdateInfo(updatings []api.UpdatingObject) ([]types.ObjectID, error) {
  87. var sucs []types.ObjectID
  88. err := svc.DB.DoTx(func(tx db.SQLContext) error {
  89. updatings = sort2.Sort(updatings, func(o1, o2 api.UpdatingObject) int {
  90. return sort2.Cmp(o1.ObjectID, o2.ObjectID)
  91. })
  92. objIDs := make([]types.ObjectID, len(updatings))
  93. for i, obj := range updatings {
  94. objIDs[i] = obj.ObjectID
  95. }
  96. oldObjs, err := svc.DB.Object().BatchGet(tx, objIDs)
  97. if err != nil {
  98. return fmt.Errorf("batch getting objects: %w", err)
  99. }
  100. oldObjIDs := make([]types.ObjectID, len(oldObjs))
  101. for i, obj := range oldObjs {
  102. oldObjIDs[i] = obj.ObjectID
  103. }
  104. avaiUpdatings, notExistsObjs := pickByObjectIDs(updatings, oldObjIDs, func(obj api.UpdatingObject) types.ObjectID { return obj.ObjectID })
  105. if len(notExistsObjs) > 0 {
  106. // TODO 部分对象已经不存在
  107. }
  108. newObjs := make([]types.Object, len(avaiUpdatings))
  109. for i := range newObjs {
  110. newObjs[i] = oldObjs[i]
  111. avaiUpdatings[i].ApplyTo(&newObjs[i])
  112. }
  113. err = svc.DB.Object().BatchUpdate(tx, newObjs)
  114. if err != nil {
  115. return fmt.Errorf("batch create or update: %w", err)
  116. }
  117. sucs = lo.Map(newObjs, func(obj types.Object, _ int) types.ObjectID { return obj.ObjectID })
  118. return nil
  119. })
  120. return sucs, err
  121. }
  122. // 根据objIDs从objs中挑选Object。
  123. // len(objs) >= len(objIDs)
  124. func pickByObjectIDs[T any](objs []T, objIDs []types.ObjectID, getID func(T) types.ObjectID) (picked []T, notFound []T) {
  125. objIdx := 0
  126. idIdx := 0
  127. for idIdx < len(objIDs) && objIdx < len(objs) {
  128. if getID(objs[objIdx]) < objIDs[idIdx] {
  129. notFound = append(notFound, objs[objIdx])
  130. objIdx++
  131. continue
  132. }
  133. picked = append(picked, objs[objIdx])
  134. objIdx++
  135. idIdx++
  136. }
  137. return
  138. }
  139. func (svc *ObjectService) Move(movings []api.MovingObject) ([]types.ObjectID, error) {
  140. var sucs []types.ObjectID
  141. var evt []*datamap.BodyObjectInfoUpdated
  142. err := svc.DB.DoTx(func(tx db.SQLContext) error {
  143. movings = sort2.Sort(movings, func(o1, o2 api.MovingObject) int {
  144. return sort2.Cmp(o1.ObjectID, o2.ObjectID)
  145. })
  146. objIDs := make([]types.ObjectID, len(movings))
  147. for i, obj := range movings {
  148. objIDs[i] = obj.ObjectID
  149. }
  150. oldObjs, err := svc.DB.Object().BatchGet(tx, objIDs)
  151. if err != nil {
  152. return fmt.Errorf("batch getting objects: %w", err)
  153. }
  154. oldObjIDs := make([]types.ObjectID, len(oldObjs))
  155. for i, obj := range oldObjs {
  156. oldObjIDs[i] = obj.ObjectID
  157. }
  158. // 找出仍在数据库的Object
  159. avaiMovings, notExistsObjs := pickByObjectIDs(movings, oldObjIDs, func(obj api.MovingObject) types.ObjectID { return obj.ObjectID })
  160. if len(notExistsObjs) > 0 {
  161. // TODO 部分对象已经不存在
  162. }
  163. // 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突
  164. var pkgIDChangedObjs []types.Object
  165. var pathChangedObjs []types.Object
  166. for i := range avaiMovings {
  167. if avaiMovings[i].PackageID != oldObjs[i].PackageID {
  168. newObj := oldObjs[i]
  169. avaiMovings[i].ApplyTo(&newObj)
  170. pkgIDChangedObjs = append(pkgIDChangedObjs, newObj)
  171. } else if avaiMovings[i].Path != oldObjs[i].Path {
  172. newObj := oldObjs[i]
  173. avaiMovings[i].ApplyTo(&newObj)
  174. pathChangedObjs = append(pathChangedObjs, newObj)
  175. }
  176. }
  177. var newObjs []types.Object
  178. // 对于PackageID发生变化的对象,需要检查目标Package内是否存在同Path的对象
  179. checkedObjs, err := svc.checkPackageChangedObjects(tx, pkgIDChangedObjs)
  180. if err != nil {
  181. return err
  182. }
  183. newObjs = append(newObjs, checkedObjs...)
  184. // 对于只有Path发生变化的对象,则检查同Package内有没有同Path的对象
  185. checkedObjs, err = svc.checkPathChangedObjects(tx, pathChangedObjs)
  186. if err != nil {
  187. return err
  188. }
  189. newObjs = append(newObjs, checkedObjs...)
  190. err = svc.DB.Object().BatchUpdate(tx, newObjs)
  191. if err != nil {
  192. return fmt.Errorf("batch create or update: %w", err)
  193. }
  194. sucs = lo.Map(newObjs, func(obj types.Object, _ int) types.ObjectID { return obj.ObjectID })
  195. evt = lo.Map(newObjs, func(obj types.Object, _ int) *datamap.BodyObjectInfoUpdated {
  196. return &datamap.BodyObjectInfoUpdated{
  197. Object: obj,
  198. }
  199. })
  200. return nil
  201. })
  202. if err != nil {
  203. logger.Warn(err.Error())
  204. return nil, err
  205. }
  206. for _, e := range evt {
  207. svc.EvtPub.Publish(e)
  208. }
  209. return sucs, nil
  210. }
  211. func (svc *ObjectService) Download(req downloader.DownloadReqeust) (*downloader.Downloading, error) {
  212. iter := svc.Downloader.DownloadObjects([]downloader.DownloadReqeust{req})
  213. // 初始化下载过程
  214. downloading, err := iter.MoveNext()
  215. if downloading == nil {
  216. return nil, fmt.Errorf("object %v not found", req.ObjectID)
  217. }
  218. if err != nil {
  219. return nil, err
  220. }
  221. return downloading, nil
  222. }
  223. func (svc *Service) checkPackageChangedObjects(tx db.SQLContext, objs []types.Object) ([]types.Object, error) {
  224. if len(objs) == 0 {
  225. return nil, nil
  226. }
  227. type PackageObjects struct {
  228. PackageID types.PackageID
  229. ObjectByPath map[string]*types.Object
  230. }
  231. packages := make(map[types.PackageID]*PackageObjects)
  232. for _, obj := range objs {
  233. pkg, ok := packages[obj.PackageID]
  234. if !ok {
  235. pkg = &PackageObjects{
  236. PackageID: obj.PackageID,
  237. ObjectByPath: make(map[string]*types.Object),
  238. }
  239. packages[obj.PackageID] = pkg
  240. }
  241. if pkg.ObjectByPath[obj.Path] == nil {
  242. o := obj
  243. pkg.ObjectByPath[obj.Path] = &o
  244. } else {
  245. // TODO 有两个对象移动到同一个路径,有冲突
  246. }
  247. }
  248. var willUpdateObjs []types.Object
  249. for _, pkg := range packages {
  250. _, err := svc.DB.Package().GetByID(tx, pkg.PackageID)
  251. if errors.Is(err, gorm.ErrRecordNotFound) {
  252. continue
  253. }
  254. if err != nil {
  255. return nil, fmt.Errorf("getting package by id: %w", err)
  256. }
  257. existsObjs, err := svc.DB.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath))
  258. if err != nil {
  259. return nil, fmt.Errorf("batch getting objects by package path: %w", err)
  260. }
  261. // 标记冲突的对象
  262. for _, obj := range existsObjs {
  263. pkg.ObjectByPath[obj.Path] = nil
  264. // TODO 目标Package内有冲突的对象
  265. }
  266. for _, obj := range pkg.ObjectByPath {
  267. if obj == nil {
  268. continue
  269. }
  270. willUpdateObjs = append(willUpdateObjs, *obj)
  271. }
  272. }
  273. return willUpdateObjs, nil
  274. }
  275. func (svc *Service) checkPathChangedObjects(tx db.SQLContext, objs []types.Object) ([]types.Object, error) {
  276. if len(objs) == 0 {
  277. return nil, nil
  278. }
  279. objByPath := make(map[string]*types.Object)
  280. for _, obj := range objs {
  281. if objByPath[obj.Path] == nil {
  282. o := obj
  283. objByPath[obj.Path] = &o
  284. } else {
  285. // TODO 有两个对象移动到同一个路径,有冲突
  286. }
  287. }
  288. _, err := svc.DB.Package().GetByID(tx, objs[0].PackageID)
  289. if errors.Is(err, gorm.ErrRecordNotFound) {
  290. return nil, nil
  291. }
  292. if err != nil {
  293. return nil, fmt.Errorf("getting package by id: %w", err)
  294. }
  295. existsObjs, err := svc.DB.Object().BatchGetByPackagePath(tx, objs[0].PackageID, lo.Map(objs, func(obj types.Object, idx int) string { return obj.Path }))
  296. if err != nil {
  297. return nil, fmt.Errorf("batch getting objects by package path: %w", err)
  298. }
  299. // 不支持两个对象交换位置的情况,因为数据库不支持
  300. for _, obj := range existsObjs {
  301. objByPath[obj.Path] = nil
  302. }
  303. var willMoveObjs []types.Object
  304. for _, obj := range objByPath {
  305. if obj == nil {
  306. continue
  307. }
  308. willMoveObjs = append(willMoveObjs, *obj)
  309. }
  310. return willMoveObjs, nil
  311. }
  312. func (svc *ObjectService) Delete(objectIDs []types.ObjectID) error {
  313. var sucs []types.ObjectID
  314. err := svc.DB.DoTx(func(tx db.SQLContext) error {
  315. avaiIDs, err := svc.DB.Object().BatchTestObjectID(tx, objectIDs)
  316. if err != nil {
  317. return fmt.Errorf("batch testing object id: %w", err)
  318. }
  319. sucs = lo.Keys(avaiIDs)
  320. return svc.DB.Object().BatchDeleteComplete(tx, sucs)
  321. })
  322. if err != nil {
  323. return err
  324. }
  325. for _, objID := range sucs {
  326. svc.EvtPub.Publish(&datamap.BodyObjectDeleted{
  327. ObjectID: objID,
  328. })
  329. }
  330. return nil
  331. }
  332. func (svc *ObjectService) Clone(clonings []api.CloningObject) ([]*types.Object, error) {
  333. type CloningObject struct {
  334. Cloning api.CloningObject
  335. OrgIndex int
  336. }
  337. type PackageClonings struct {
  338. PackageID types.PackageID
  339. Clonings map[string]CloningObject
  340. }
  341. var evt []*datamap.BodyNewOrUpdateObject
  342. cloningMap := make(map[types.PackageID]*PackageClonings)
  343. for i, cloning := range clonings {
  344. pkg, ok := cloningMap[cloning.NewPackageID]
  345. if !ok {
  346. pkg = &PackageClonings{
  347. PackageID: cloning.NewPackageID,
  348. Clonings: make(map[string]CloningObject),
  349. }
  350. cloningMap[cloning.NewPackageID] = pkg
  351. }
  352. pkg.Clonings[cloning.NewPath] = CloningObject{
  353. Cloning: cloning,
  354. OrgIndex: i,
  355. }
  356. }
  357. ret := make([]*types.Object, len(cloningMap))
  358. err := svc.DB.DoTx(func(tx db.SQLContext) error {
  359. // 剔除掉新路径已经存在的对象
  360. for _, pkg := range cloningMap {
  361. exists, err := svc.DB.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.Clonings))
  362. if err != nil {
  363. return fmt.Errorf("batch getting objects by package path: %w", err)
  364. }
  365. for _, obj := range exists {
  366. delete(pkg.Clonings, obj.Path)
  367. }
  368. }
  369. // 删除目的Package不存在的对象
  370. newPkg, err := svc.DB.Package().BatchTestPackageID(tx, lo.Keys(cloningMap))
  371. if err != nil {
  372. return fmt.Errorf("batch testing package id: %w", err)
  373. }
  374. for _, pkg := range cloningMap {
  375. if !newPkg[pkg.PackageID] {
  376. delete(cloningMap, pkg.PackageID)
  377. }
  378. }
  379. var avaiClonings []CloningObject
  380. var avaiObjIDs []types.ObjectID
  381. for _, pkg := range cloningMap {
  382. for _, cloning := range pkg.Clonings {
  383. avaiClonings = append(avaiClonings, cloning)
  384. avaiObjIDs = append(avaiObjIDs, cloning.Cloning.ObjectID)
  385. }
  386. }
  387. avaiDetails, err := svc.DB.Object().BatchGetDetails(tx, avaiObjIDs)
  388. if err != nil {
  389. return fmt.Errorf("batch getting object details: %w", err)
  390. }
  391. avaiDetailsMap := make(map[types.ObjectID]types.ObjectDetail)
  392. for _, detail := range avaiDetails {
  393. avaiDetailsMap[detail.Object.ObjectID] = detail
  394. }
  395. oldAvaiClonings := avaiClonings
  396. avaiClonings = nil
  397. var newObjs []types.Object
  398. for _, cloning := range oldAvaiClonings {
  399. // 进一步剔除原始对象不存在的情况
  400. detail, ok := avaiDetailsMap[cloning.Cloning.ObjectID]
  401. if !ok {
  402. continue
  403. }
  404. avaiClonings = append(avaiClonings, cloning)
  405. newObj := detail.Object
  406. newObj.ObjectID = 0
  407. newObj.Path = cloning.Cloning.NewPath
  408. newObj.PackageID = cloning.Cloning.NewPackageID
  409. newObjs = append(newObjs, newObj)
  410. }
  411. // 先创建出新对象
  412. err = svc.DB.Object().BatchCreate(tx, &newObjs)
  413. if err != nil {
  414. return fmt.Errorf("batch creating objects: %w", err)
  415. }
  416. // 创建了新对象就能拿到新对象ID,再创建新对象块
  417. var newBlks []types.ObjectBlock
  418. for i, cloning := range avaiClonings {
  419. oldBlks := avaiDetailsMap[cloning.Cloning.ObjectID].Blocks
  420. for _, blk := range oldBlks {
  421. newBlk := blk
  422. newBlk.ObjectID = newObjs[i].ObjectID
  423. newBlks = append(newBlks, newBlk)
  424. }
  425. }
  426. err = svc.DB.ObjectBlock().BatchCreate(tx, newBlks)
  427. if err != nil {
  428. return fmt.Errorf("batch creating object blocks: %w", err)
  429. }
  430. for i, cloning := range avaiClonings {
  431. ret[cloning.OrgIndex] = &newObjs[i]
  432. }
  433. for i, cloning := range avaiClonings {
  434. var evtBlks []datamap.BlockDistributionObjectInfo
  435. blkType := getBlockTypeFromRed(newObjs[i].Redundancy)
  436. oldBlks := avaiDetailsMap[cloning.Cloning.ObjectID].Blocks
  437. for _, blk := range oldBlks {
  438. evtBlks = append(evtBlks, datamap.BlockDistributionObjectInfo{
  439. BlockType: blkType,
  440. Index: blk.Index,
  441. UserSpaceID: blk.UserSpaceID,
  442. })
  443. }
  444. evt = append(evt, &datamap.BodyNewOrUpdateObject{
  445. Info: newObjs[i],
  446. BlockDistribution: evtBlks,
  447. })
  448. }
  449. return nil
  450. })
  451. if err != nil {
  452. logger.Warnf("cloning objects: %s", err.Error())
  453. return nil, err
  454. }
  455. for _, e := range evt {
  456. svc.EvtPub.Publish(e)
  457. }
  458. return ret, nil
  459. }
  460. // GetPackageObjects 获取包中的对象列表。
  461. // userID: 用户ID。
  462. // packageID: 包ID。
  463. // 返回值: 对象列表和错误信息。
  464. func (svc *ObjectService) GetPackageObjects(packageID types.PackageID) ([]types.Object, error) {
  465. return svc.DB.Object().GetPackageObjects(svc.DB.DefCtx(), packageID)
  466. }
  467. func (svc *ObjectService) GetObjectDetails(objectIDs []types.ObjectID) ([]*types.ObjectDetail, error) {
  468. detailsMp := make(map[types.ObjectID]*types.ObjectDetail)
  469. err := svc.DB.DoTx(func(tx db.SQLContext) error {
  470. var err error
  471. objectIDs = sort2.SortAsc(objectIDs)
  472. // 根据ID依次查询Object,ObjectBlock,PinnedObject,并根据升序的特点进行合并
  473. objs, err := svc.DB.Object().BatchGet(tx, objectIDs)
  474. if err != nil {
  475. return fmt.Errorf("batch get objects: %w", err)
  476. }
  477. for _, obj := range objs {
  478. detailsMp[obj.ObjectID] = &types.ObjectDetail{
  479. Object: obj,
  480. }
  481. }
  482. // 查询合并
  483. blocks, err := svc.DB.ObjectBlock().BatchGetByObjectID(tx, objectIDs)
  484. if err != nil {
  485. return fmt.Errorf("batch get object blocks: %w", err)
  486. }
  487. for _, block := range blocks {
  488. d := detailsMp[block.ObjectID]
  489. d.Blocks = append(d.Blocks, block)
  490. }
  491. // 查询合并
  492. pinneds, err := svc.DB.PinnedObject().BatchGetByObjectID(tx, objectIDs)
  493. if err != nil {
  494. return fmt.Errorf("batch get pinned objects: %w", err)
  495. }
  496. for _, pinned := range pinneds {
  497. d := detailsMp[pinned.ObjectID]
  498. d.PinnedAt = append(d.PinnedAt, pinned.UserSpaceID)
  499. }
  500. return nil
  501. })
  502. if err != nil {
  503. logger.Warn(err.Error())
  504. return nil, err
  505. }
  506. details := make([]*types.ObjectDetail, len(objectIDs))
  507. for i, objID := range objectIDs {
  508. details[i] = detailsMp[objID]
  509. }
  510. return details, nil
  511. }
  512. func (svc *ObjectService) NewMultipartUploadObject(packageID types.PackageID, path string) (types.Object, error) {
  513. var obj types.Object
  514. err := svc.DB.DoTx(func(tx db.SQLContext) error {
  515. oldObj, err := svc.DB.Object().GetByPath(tx, packageID, path)
  516. if err == nil {
  517. obj = oldObj
  518. err := svc.DB.ObjectBlock().DeleteByObjectID(tx, obj.ObjectID)
  519. if err != nil {
  520. return fmt.Errorf("delete object blocks: %w", err)
  521. }
  522. obj.FileHash = types.EmptyHash
  523. obj.Size = 0
  524. obj.Redundancy = types.NewMultipartUploadRedundancy()
  525. obj.UpdateTime = time.Now()
  526. err = svc.DB.Object().BatchUpdate(tx, []types.Object{obj})
  527. if err != nil {
  528. return fmt.Errorf("update object: %w", err)
  529. }
  530. return nil
  531. }
  532. obj = types.Object{
  533. PackageID: packageID,
  534. Path: path,
  535. FileHash: types.EmptyHash,
  536. Size: 0,
  537. Redundancy: types.NewMultipartUploadRedundancy(),
  538. CreateTime: time.Now(),
  539. UpdateTime: time.Now(),
  540. }
  541. objID, err := svc.DB.Object().Create(tx, obj)
  542. if err != nil {
  543. return fmt.Errorf("create object: %w", err)
  544. }
  545. obj.ObjectID = objID
  546. return nil
  547. })
  548. if err != nil {
  549. logger.Warnf("new multipart upload object: %s", err.Error())
  550. return types.Object{}, err
  551. }
  552. return obj, nil
  553. }
  554. func (svc *ObjectService) CompleteMultipartUpload(objectID types.ObjectID, indexes []int) (types.Object, error) {
  555. if len(indexes) == 0 {
  556. return types.Object{}, fmt.Errorf("no block indexes specified")
  557. }
  558. objDe, err := db.DoTx11(svc.DB, svc.DB.Object().GetDetail, objectID)
  559. if err != nil {
  560. return types.Object{}, err
  561. }
  562. _, ok := objDe.Object.Redundancy.(*types.MultipartUploadRedundancy)
  563. if !ok {
  564. return types.Object{}, fmt.Errorf("object %v is not a multipart upload", objectID)
  565. }
  566. if len(objDe.Blocks) == 0 {
  567. return types.Object{}, fmt.Errorf("object %v has no blocks", objectID)
  568. }
  569. objBlkMap := make(map[int]types.ObjectBlock)
  570. for _, blk := range objDe.Blocks {
  571. objBlkMap[blk.Index] = blk
  572. }
  573. var compBlks []types.ObjectBlock
  574. var compBlkSpaces []types.UserSpaceDetail
  575. var targetSpace types.UserSpaceDetail
  576. for i, idx := range indexes {
  577. blk, ok := objBlkMap[idx]
  578. if !ok {
  579. return types.Object{}, fmt.Errorf("block %d not found in object %v", idx, objectID)
  580. }
  581. stg := svc.UserSpaceMeta.Get(blk.UserSpaceID)
  582. if stg == nil {
  583. return types.Object{}, fmt.Errorf("storage of user space %d not found", blk.UserSpaceID)
  584. }
  585. compBlks = append(compBlks, blk)
  586. compBlkSpaces = append(compBlkSpaces, *stg)
  587. if i == 0 {
  588. targetSpace = *stg
  589. }
  590. }
  591. bld := exec.NewPlanBuilder()
  592. err = plans.CompleteMultipart(compBlks, compBlkSpaces, targetSpace, "shard", bld)
  593. if err != nil {
  594. return types.Object{}, err
  595. }
  596. exeCtx := exec.NewExecContext()
  597. ret, err := bld.Execute(exeCtx).Wait(context.Background())
  598. if err != nil {
  599. return types.Object{}, err
  600. }
  601. shardInfo := ret["shard"].(*ops2.ShardInfoValue)
  602. err = db.DoTx10(svc.DB, svc.DB.Object().BatchUpdateRedundancy, []db.UpdatingObjectRedundancy{
  603. {
  604. ObjectID: objectID,
  605. FileHash: shardInfo.Hash,
  606. Size: shardInfo.Size,
  607. Redundancy: types.NewNoneRedundancy(),
  608. Blocks: []types.ObjectBlock{{
  609. ObjectID: objectID,
  610. Index: 0,
  611. UserSpaceID: targetSpace.UserSpace.UserSpaceID,
  612. FileHash: shardInfo.Hash,
  613. Size: shardInfo.Size,
  614. }},
  615. },
  616. })
  617. if err != nil {
  618. return types.Object{}, err
  619. }
  620. obj, err := svc.DB.Object().GetByID(svc.DB.DefCtx(), objectID)
  621. if err != nil {
  622. return types.Object{}, err
  623. }
  624. return obj, nil
  625. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。