You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

check_package_redundancy.go 44 kB


  1. package event
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "time"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  11. "gitlink.org.cn/cloudream/common/utils/math2"
  12. "gitlink.org.cn/cloudream/common/utils/sort2"
  13. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  14. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  15. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
  16. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
  17. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2"
  18. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser"
  19. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
  20. lrcparser "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/parser"
  21. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  22. scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
  23. "gitlink.org.cn/cloudream/storage/scanner/internal/config"
  24. )
  25. type CheckPackageRedundancy struct {
  26. *scevt.CheckPackageRedundancy
  27. }
  28. func NewCheckPackageRedundancy(evt *scevt.CheckPackageRedundancy) *CheckPackageRedundancy {
  29. return &CheckPackageRedundancy{
  30. CheckPackageRedundancy: evt,
  31. }
  32. }
  33. type StorageLoadInfo struct {
  34. Storage stgmod.StorageDetail
  35. AccessAmount float64
  36. }
  37. func (t *CheckPackageRedundancy) TryMerge(other Event) bool {
  38. event, ok := other.(*CheckPackageRedundancy)
  39. if !ok {
  40. return false
  41. }
  42. return event.PackageID == t.PackageID
  43. }
  44. func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
  45. log := logger.WithType[CheckPackageRedundancy]("Event")
  46. startTime := time.Now()
  47. log.Debugf("begin with %v", logger.FormatStruct(t.CheckPackageRedundancy))
  48. defer func() {
  49. log.Debugf("end, time: %v", time.Since(startTime))
  50. }()
  51. // TODO 应该像其他event一样直接读取数据库
  52. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  53. if err != nil {
  54. log.Warnf("new coordinator client: %s", err.Error())
  55. return
  56. }
  57. defer stgglb.CoordinatorMQPool.Release(coorCli)
  58. getObjs, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.PackageID))
  59. if err != nil {
  60. log.Warnf("getting package objects: %s", err.Error())
  61. return
  62. }
  63. stats, err := execCtx.Args.DB.PackageAccessStat().GetByPackageID(execCtx.Args.DB.DefCtx(), t.PackageID)
  64. if err != nil {
  65. log.Warnf("getting package access stats: %s", err.Error())
  66. return
  67. }
  68. // TODO UserID
  69. getStgs, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(1))
  70. if err != nil {
  71. log.Warnf("getting all storages: %s", err.Error())
  72. return
  73. }
  74. if len(getStgs.Storages) == 0 {
  75. log.Warnf("no available storages")
  76. return
  77. }
  78. userAllStorages := make(map[cdssdk.StorageID]*StorageLoadInfo)
  79. for _, stg := range getStgs.Storages {
  80. userAllStorages[stg.Storage.StorageID] = &StorageLoadInfo{
  81. Storage: stg,
  82. }
  83. }
  84. for _, stat := range stats {
  85. info, ok := userAllStorages[stat.StorageID]
  86. if !ok {
  87. continue
  88. }
  89. info.AccessAmount = stat.Amount
  90. }
  91. var changedObjects []coormq.UpdatingObjectRedundancy
  92. defRep := cdssdk.DefaultRepRedundancy
  93. defEC := cdssdk.DefaultECRedundancy
  94. // TODO 目前rep的备份数量固定为2,所以这里直接选出两个节点
  95. // TODO 放到chooseRedundancy函数中
  96. mostBlockStgIDs := t.summaryRepObjectBlockStorages(getObjs.Objects, 2)
  97. newRepStgs := t.chooseNewStoragesForRep(&defRep, userAllStorages)
  98. rechoosedRepStgs := t.rechooseStoragesForRep(mostBlockStgIDs, &defRep, userAllStorages)
  99. newECStgs := t.chooseNewStoragesForEC(&defEC, userAllStorages)
  100. // 加锁
  101. builder := reqbuilder.NewBuilder()
  102. for _, storage := range newRepStgs {
  103. builder.Shard().Buzy(storage.Storage.Storage.StorageID)
  104. }
  105. for _, storage := range newECStgs {
  106. builder.Shard().Buzy(storage.Storage.Storage.StorageID)
  107. }
  108. mutex, err := builder.MutexLock(execCtx.Args.DistLock)
  109. if err != nil {
  110. log.Warnf("acquiring dist lock: %s", err.Error())
  111. return
  112. }
  113. defer mutex.Unlock()
  114. for _, obj := range getObjs.Objects {
  115. var updating *coormq.UpdatingObjectRedundancy
  116. var err error
  117. newRed, selectedStorages := t.chooseRedundancy(obj, userAllStorages)
  118. switch srcRed := obj.Object.Redundancy.(type) {
  119. case *cdssdk.NoneRedundancy:
  120. switch newRed := newRed.(type) {
  121. case *cdssdk.RepRedundancy:
  122. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep")
  123. updating, err = t.noneToRep(execCtx, obj, newRed, newRepStgs, userAllStorages)
  124. case *cdssdk.ECRedundancy:
  125. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec")
  126. updating, err = t.noneToEC(execCtx, obj, newRed, newECStgs, userAllStorages)
  127. case *cdssdk.LRCRedundancy:
  128. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> lrc")
  129. updating, err = t.noneToLRC(execCtx, obj, newRed, selectedStorages, userAllStorages)
  130. case *cdssdk.SegmentRedundancy:
  131. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> segment")
  132. updating, err = t.noneToSeg(execCtx, obj, newRed, selectedStorages, userAllStorages)
  133. }
  134. case *cdssdk.RepRedundancy:
  135. switch newRed := newRed.(type) {
  136. case *cdssdk.RepRedundancy:
  137. updating, err = t.repToRep(execCtx, obj, srcRed, rechoosedRepStgs, userAllStorages)
  138. case *cdssdk.ECRedundancy:
  139. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec")
  140. updating, err = t.repToEC(execCtx, obj, newRed, newECStgs, userAllStorages)
  141. }
  142. case *cdssdk.ECRedundancy:
  143. switch newRed := newRed.(type) {
  144. case *cdssdk.RepRedundancy:
  145. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep")
  146. updating, err = t.ecToRep(execCtx, obj, srcRed, newRed, newRepStgs, userAllStorages)
  147. case *cdssdk.ECRedundancy:
  148. uploadStorages := t.rechooseStoragesForEC(obj, srcRed, userAllStorages)
  149. updating, err = t.ecToEC(execCtx, obj, srcRed, newRed, uploadStorages, userAllStorages)
  150. }
  151. case *cdssdk.LRCRedundancy:
  152. switch newRed := newRed.(type) {
  153. case *cdssdk.LRCRedundancy:
  154. uploadStorages := t.rechooseStoragesForLRC(obj, srcRed, userAllStorages)
  155. updating, err = t.lrcToLRC(execCtx, obj, srcRed, newRed, uploadStorages, userAllStorages)
  156. }
  157. }
  158. if updating != nil {
  159. changedObjects = append(changedObjects, *updating)
  160. }
  161. if err != nil {
  162. log.WithField("ObjectID", obj.Object.ObjectID).Warnf("%s, its redundancy wont be changed", err.Error())
  163. }
  164. }
  165. if len(changedObjects) == 0 {
  166. return
  167. }
  168. _, err = coorCli.UpdateObjectRedundancy(coormq.ReqUpdateObjectRedundancy(changedObjects))
  169. if err != nil {
  170. log.Warnf("requesting to change object redundancy: %s", err.Error())
  171. return
  172. }
  173. }
  174. func (t *CheckPackageRedundancy) chooseRedundancy(obj stgmod.ObjectDetail, userAllStgs map[cdssdk.StorageID]*StorageLoadInfo) (cdssdk.Redundancy, []*StorageLoadInfo) {
  175. switch obj.Object.Redundancy.(type) {
  176. case *cdssdk.NoneRedundancy:
  177. if obj.Object.Size > config.Cfg().ECFileSizeThreshold {
  178. newStgs := t.chooseNewStoragesForEC(&cdssdk.DefaultECRedundancy, userAllStgs)
  179. return &cdssdk.DefaultECRedundancy, newStgs
  180. }
  181. return &cdssdk.DefaultRepRedundancy, t.chooseNewStoragesForRep(&cdssdk.DefaultRepRedundancy, userAllStgs)
  182. case *cdssdk.RepRedundancy:
  183. if obj.Object.Size > config.Cfg().ECFileSizeThreshold {
  184. newStgs := t.chooseNewStoragesForEC(&cdssdk.DefaultECRedundancy, userAllStgs)
  185. return &cdssdk.DefaultECRedundancy, newStgs
  186. }
  187. case *cdssdk.ECRedundancy:
  188. if obj.Object.Size <= config.Cfg().ECFileSizeThreshold {
  189. return &cdssdk.DefaultRepRedundancy, t.chooseNewStoragesForRep(&cdssdk.DefaultRepRedundancy, userAllStgs)
  190. }
  191. case *cdssdk.LRCRedundancy:
  192. newLRCStgs := t.rechooseStoragesForLRC(obj, &cdssdk.DefaultLRCRedundancy, userAllStgs)
  193. return &cdssdk.DefaultLRCRedundancy, newLRCStgs
  194. }
  195. return nil, nil
  196. }
  197. // 统计每个对象块所在的节点,选出块最多的不超过storageCnt个节点
  198. func (t *CheckPackageRedundancy) summaryRepObjectBlockStorages(objs []stgmod.ObjectDetail, storageCnt int) []cdssdk.StorageID {
  199. type stgBlocks struct {
  200. StorageID cdssdk.StorageID
  201. Count int
  202. }
  203. stgBlocksMap := make(map[cdssdk.StorageID]*stgBlocks)
  204. for _, obj := range objs {
  205. shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold
  206. if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok && !shouldUseEC {
  207. for _, block := range obj.Blocks {
  208. if _, ok := stgBlocksMap[block.StorageID]; !ok {
  209. stgBlocksMap[block.StorageID] = &stgBlocks{
  210. StorageID: block.StorageID,
  211. Count: 0,
  212. }
  213. }
  214. stgBlocksMap[block.StorageID].Count++
  215. }
  216. }
  217. }
  218. storages := lo.Values(stgBlocksMap)
  219. sort2.Sort(storages, func(left *stgBlocks, right *stgBlocks) int {
  220. return right.Count - left.Count
  221. })
  222. ids := lo.Map(storages, func(item *stgBlocks, idx int) cdssdk.StorageID { return item.StorageID })
  223. if len(ids) > storageCnt {
  224. ids = ids[:storageCnt]
  225. }
  226. return ids
  227. }
  228. func (t *CheckPackageRedundancy) chooseNewStoragesForRep(red *cdssdk.RepRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  229. sortedStorages := sort2.Sort(lo.Values(allStgs), func(left *StorageLoadInfo, right *StorageLoadInfo) int {
  230. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  231. })
  232. return t.chooseSoManyStorages(red.RepCount, sortedStorages)
  233. }
  234. func (t *CheckPackageRedundancy) chooseNewStoragesForEC(red *cdssdk.ECRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  235. sortedStorages := sort2.Sort(lo.Values(allStgs), func(left *StorageLoadInfo, right *StorageLoadInfo) int {
  236. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  237. })
  238. return t.chooseSoManyStorages(red.N, sortedStorages)
  239. }
  240. func (t *CheckPackageRedundancy) chooseNewStoragesForLRC(red *cdssdk.LRCRedundancy, allStorages map[cdssdk.HubID]*StorageLoadInfo) []*StorageLoadInfo {
  241. sortedStorages := sort2.Sort(lo.Values(allStorages), func(left *StorageLoadInfo, right *StorageLoadInfo) int {
  242. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  243. })
  244. return t.chooseSoManyStorages(red.N, sortedStorages)
  245. }
  246. func (t *CheckPackageRedundancy) chooseNewStoragesForSeg(segCount int, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  247. sortedStorages := sort2.Sort(lo.Values(allStgs), func(left *StorageLoadInfo, right *StorageLoadInfo) int {
  248. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  249. })
  250. return t.chooseSoManyStorages(segCount, sortedStorages)
  251. }
  252. func (t *CheckPackageRedundancy) rechooseStoragesForRep(mostBlockStgIDs []cdssdk.StorageID, red *cdssdk.RepRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  253. type rechooseStorage struct {
  254. *StorageLoadInfo
  255. HasBlock bool
  256. }
  257. var rechooseStgs []*rechooseStorage
  258. for _, stg := range allStgs {
  259. hasBlock := false
  260. for _, id := range mostBlockStgIDs {
  261. if id == stg.Storage.Storage.StorageID {
  262. hasBlock = true
  263. break
  264. }
  265. }
  266. rechooseStgs = append(rechooseStgs, &rechooseStorage{
  267. StorageLoadInfo: stg,
  268. HasBlock: hasBlock,
  269. })
  270. }
  271. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStorage, right *rechooseStorage) int {
  272. // 已经缓存了文件块的节点优先选择
  273. v := sort2.CmpBool(right.HasBlock, left.HasBlock)
  274. if v != 0 {
  275. return v
  276. }
  277. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  278. })
  279. return t.chooseSoManyStorages(red.RepCount, lo.Map(sortedStgs, func(storage *rechooseStorage, idx int) *StorageLoadInfo { return storage.StorageLoadInfo }))
  280. }
  281. func (t *CheckPackageRedundancy) rechooseStoragesForEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  282. type rechooseStg struct {
  283. *StorageLoadInfo
  284. CachedBlockIndex int
  285. }
  286. var rechooseStgs []*rechooseStg
  287. for _, stg := range allStgs {
  288. cachedBlockIndex := -1
  289. for _, block := range obj.Blocks {
  290. if block.StorageID == stg.Storage.Storage.StorageID {
  291. cachedBlockIndex = block.Index
  292. break
  293. }
  294. }
  295. rechooseStgs = append(rechooseStgs, &rechooseStg{
  296. StorageLoadInfo: stg,
  297. CachedBlockIndex: cachedBlockIndex,
  298. })
  299. }
  300. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStg, right *rechooseStg) int {
  301. // 已经缓存了文件块的节点优先选择
  302. v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
  303. if v != 0 {
  304. return v
  305. }
  306. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  307. })
  308. // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
  309. return t.chooseSoManyStorages(red.N, lo.Map(sortedStgs, func(storage *rechooseStg, idx int) *StorageLoadInfo { return storage.StorageLoadInfo }))
  310. }
  311. func (t *CheckPackageRedundancy) rechooseStoragesForLRC(obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  312. type rechooseStg struct {
  313. *StorageLoadInfo
  314. CachedBlockIndex int
  315. }
  316. var rechooseStgs []*rechooseStg
  317. for _, stg := range allStgs {
  318. cachedBlockIndex := -1
  319. for _, block := range obj.Blocks {
  320. if block.StorageID == stg.Storage.Storage.StorageID {
  321. cachedBlockIndex = block.Index
  322. break
  323. }
  324. }
  325. rechooseStgs = append(rechooseStgs, &rechooseStg{
  326. StorageLoadInfo: stg,
  327. CachedBlockIndex: cachedBlockIndex,
  328. })
  329. }
  330. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStg, right *rechooseStg) int {
  331. // 已经缓存了文件块的节点优先选择
  332. v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
  333. if v != 0 {
  334. return v
  335. }
  336. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  337. })
  338. // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
  339. return t.chooseSoManyStorages(red.N, lo.Map(sortedStgs, func(storage *rechooseStg, idx int) *StorageLoadInfo { return storage.StorageLoadInfo }))
  340. }
  341. func (t *CheckPackageRedundancy) chooseSoManyStorages(count int, stgs []*StorageLoadInfo) []*StorageLoadInfo {
  342. repeateCount := (count + len(stgs) - 1) / len(stgs)
  343. extendStgs := make([]*StorageLoadInfo, repeateCount*len(stgs))
  344. // 使用复制的方式将节点数扩充到要求的数量
  345. // 复制之后的结构:ABCD -> AAABBBCCCDDD
  346. for p := 0; p < repeateCount; p++ {
  347. for i, storage := range stgs {
  348. putIdx := i*repeateCount + p
  349. extendStgs[putIdx] = storage
  350. }
  351. }
  352. extendStgs = extendStgs[:count]
  353. var chosen []*StorageLoadInfo
  354. for len(chosen) < count {
  355. // 在每一轮内都选不同地区的节点,如果节点数不够,那么就再来一轮
  356. chosenLocations := make(map[cdssdk.LocationID]bool)
  357. for i, stg := range extendStgs {
  358. if stg == nil {
  359. continue
  360. }
  361. if chosenLocations[stg.Storage.MasterHub.LocationID] {
  362. continue
  363. }
  364. chosen = append(chosen, stg)
  365. chosenLocations[stg.Storage.MasterHub.LocationID] = true
  366. extendStgs[i] = nil
  367. }
  368. }
  369. return chosen
  370. }
  371. func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  372. if len(obj.Blocks) == 0 {
  373. return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to rep")
  374. }
  375. srcStg, ok := allStgs[obj.Blocks[0].StorageID]
  376. if !ok {
  377. return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID)
  378. }
  379. if srcStg.Storage.MasterHub == nil {
  380. return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID)
  381. }
  382. // 如果选择的备份节点都是同一个,那么就只要上传一次
  383. uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID })
  384. ft := ioswitch2.NewFromTo()
  385. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream()))
  386. for i, stg := range uploadStgs {
  387. ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
  388. }
  389. plans := exec.NewPlanBuilder()
  390. err := parser.Parse(ft, plans)
  391. if err != nil {
  392. return nil, fmt.Errorf("parsing plan: %w", err)
  393. }
  394. // TODO 添加依赖
  395. execCtx := exec.NewExecContext()
  396. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  397. ret, err := plans.Execute(execCtx).Wait(context.Background())
  398. if err != nil {
  399. return nil, fmt.Errorf("executing io plan: %w", err)
  400. }
  401. var blocks []stgmod.ObjectBlock
  402. var blockChgs []stgmod.BlockChange
  403. for i, stg := range uploadStgs {
  404. r := ret[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue)
  405. blocks = append(blocks, stgmod.ObjectBlock{
  406. ObjectID: obj.Object.ObjectID,
  407. Index: 0,
  408. StorageID: stg.Storage.Storage.StorageID,
  409. FileHash: r.Hash,
  410. Size: r.Size,
  411. })
  412. blockChgs = append(blockChgs, &stgmod.BlockChangeClone{
  413. BlockType: stgmod.BlockTypeRaw,
  414. SourceStorageID: obj.Blocks[0].StorageID,
  415. TargetStorageID: stg.Storage.Storage.StorageID,
  416. TransferBytes: 1,
  417. })
  418. }
  419. // 删除原本的文件块
  420. blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{
  421. Index: 0,
  422. StorageID: obj.Blocks[0].StorageID,
  423. })
  424. ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{
  425. ObjectID: obj.Object.ObjectID,
  426. PackageID: obj.Object.PackageID,
  427. BlockChanges: blockChgs,
  428. })
  429. return &coormq.UpdatingObjectRedundancy{
  430. ObjectID: obj.Object.ObjectID,
  431. FileHash: obj.Object.FileHash,
  432. Size: obj.Object.Size,
  433. Redundancy: red,
  434. Blocks: blocks,
  435. }, nil
  436. }
  437. func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  438. if len(obj.Blocks) == 0 {
  439. return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to ec")
  440. }
  441. srcStg, ok := allStgs[obj.Blocks[0].StorageID]
  442. if !ok {
  443. return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID)
  444. }
  445. if srcStg.Storage.MasterHub == nil {
  446. return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID)
  447. }
  448. ft := ioswitch2.NewFromTo()
  449. ft.ECParam = red
  450. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream()))
  451. for i := 0; i < red.N; i++ {
  452. ft.AddTo(ioswitch2.NewToShardStore(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage, ioswitch2.ECStream(i), fmt.Sprintf("%d", i)))
  453. }
  454. plans := exec.NewPlanBuilder()
  455. err := parser.Parse(ft, plans)
  456. if err != nil {
  457. return nil, fmt.Errorf("parsing plan: %w", err)
  458. }
  459. execCtx := exec.NewExecContext()
  460. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  461. ioRet, err := plans.Execute(execCtx).Wait(context.Background())
  462. if err != nil {
  463. return nil, fmt.Errorf("executing io plan: %w", err)
  464. }
  465. var blocks []stgmod.ObjectBlock
  466. var evtTargetBlocks []stgmod.Block
  467. var evtBlockTrans []stgmod.DataTransfer
  468. for i := 0; i < red.N; i++ {
  469. r := ioRet[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue)
  470. blocks = append(blocks, stgmod.ObjectBlock{
  471. ObjectID: obj.Object.ObjectID,
  472. Index: i,
  473. StorageID: uploadStgs[i].Storage.Storage.StorageID,
  474. FileHash: r.Hash,
  475. Size: r.Size,
  476. })
  477. evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{
  478. BlockType: stgmod.BlockTypeEC,
  479. Index: i,
  480. StorageID: uploadStgs[i].Storage.Storage.StorageID,
  481. })
  482. evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{
  483. SourceStorageID: obj.Blocks[0].StorageID,
  484. TargetStorageID: uploadStgs[i].Storage.Storage.StorageID,
  485. TransferBytes: 1,
  486. })
  487. }
  488. ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{
  489. ObjectID: obj.Object.ObjectID,
  490. PackageID: obj.Object.PackageID,
  491. BlockChanges: []stgmod.BlockChange{
  492. &stgmod.BlockChangeEnDecode{
  493. SourceBlocks: []stgmod.Block{{
  494. BlockType: stgmod.BlockTypeRaw,
  495. StorageID: obj.Blocks[0].StorageID,
  496. }},
  497. TargetBlocks: evtTargetBlocks,
  498. DataTransfers: evtBlockTrans,
  499. },
  500. // 删除原本的文件块
  501. &stgmod.BlockChangeDeleted{
  502. Index: 0,
  503. StorageID: obj.Blocks[0].StorageID,
  504. },
  505. },
  506. })
  507. return &coormq.UpdatingObjectRedundancy{
  508. ObjectID: obj.Object.ObjectID,
  509. FileHash: obj.Object.FileHash,
  510. Size: obj.Object.Size,
  511. Redundancy: red,
  512. Blocks: blocks,
  513. }, nil
  514. }
  515. func (t *CheckPackageRedundancy) noneToLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  516. if len(obj.Blocks) == 0 {
  517. return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to ec")
  518. }
  519. srcStg, ok := allStgs[obj.Blocks[0].StorageID]
  520. if !ok {
  521. return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID)
  522. }
  523. if srcStg.Storage.MasterHub == nil {
  524. return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID)
  525. }
  526. var toes []ioswitchlrc.To
  527. for i := 0; i < red.N; i++ {
  528. toes = append(toes, ioswitchlrc.NewToStorage(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, i, fmt.Sprintf("%d", i)))
  529. }
  530. plans := exec.NewPlanBuilder()
  531. err := lrcparser.Encode(ioswitchlrc.NewFromStorage(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage.Storage, -1), toes, plans)
  532. if err != nil {
  533. return nil, fmt.Errorf("parsing plan: %w", err)
  534. }
  535. execCtx := exec.NewExecContext()
  536. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  537. ioRet, err := plans.Execute(execCtx).Wait(context.Background())
  538. if err != nil {
  539. return nil, fmt.Errorf("executing io plan: %w", err)
  540. }
  541. var blocks []stgmod.ObjectBlock
  542. var evtTargetBlocks []stgmod.Block
  543. var evtBlockTrans []stgmod.DataTransfer
  544. for i := 0; i < red.N; i++ {
  545. r := ioRet[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue)
  546. blocks = append(blocks, stgmod.ObjectBlock{
  547. ObjectID: obj.Object.ObjectID,
  548. Index: i,
  549. StorageID: uploadStgs[i].Storage.Storage.StorageID,
  550. FileHash: r.Hash,
  551. Size: r.Size,
  552. })
  553. evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{
  554. BlockType: stgmod.BlockTypeEC,
  555. Index: i,
  556. StorageID: uploadStgs[i].Storage.Storage.StorageID,
  557. })
  558. evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{
  559. SourceStorageID: obj.Blocks[0].StorageID,
  560. TargetStorageID: uploadStgs[i].Storage.Storage.StorageID,
  561. TransferBytes: 1,
  562. })
  563. }
  564. ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{
  565. ObjectID: obj.Object.ObjectID,
  566. PackageID: obj.Object.PackageID,
  567. BlockChanges: []stgmod.BlockChange{
  568. &stgmod.BlockChangeEnDecode{
  569. SourceBlocks: []stgmod.Block{{
  570. BlockType: stgmod.BlockTypeRaw,
  571. StorageID: obj.Blocks[0].StorageID,
  572. }},
  573. TargetBlocks: evtTargetBlocks,
  574. DataTransfers: evtBlockTrans,
  575. },
  576. // 删除原本的文件块
  577. &stgmod.BlockChangeDeleted{
  578. Index: 0,
  579. StorageID: obj.Blocks[0].StorageID,
  580. },
  581. },
  582. })
  583. return &coormq.UpdatingObjectRedundancy{
  584. ObjectID: obj.Object.ObjectID,
  585. FileHash: obj.Object.FileHash,
  586. Size: obj.Object.Size,
  587. Redundancy: red,
  588. Blocks: blocks,
  589. }, nil
  590. }
  591. func (t *CheckPackageRedundancy) noneToSeg(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.SegmentRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  592. if len(obj.Blocks) == 0 {
  593. return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to rep")
  594. }
  595. srcStg, ok := allStgs[obj.Blocks[0].StorageID]
  596. if !ok {
  597. return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID)
  598. }
  599. if srcStg.Storage.MasterHub == nil {
  600. return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID)
  601. }
  602. // 如果选择的备份节点都是同一个,那么就只要上传一次
  603. uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID })
  604. ft := ioswitch2.NewFromTo()
  605. ft.SegmentParam = red
  606. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream()))
  607. for i, stg := range uploadStgs {
  608. ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.SegmentStream(i), fmt.Sprintf("%d", i)))
  609. }
  610. plans := exec.NewPlanBuilder()
  611. err := parser.Parse(ft, plans)
  612. if err != nil {
  613. return nil, fmt.Errorf("parsing plan: %w", err)
  614. }
  615. // TODO 添加依赖
  616. execCtx := exec.NewExecContext()
  617. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  618. ret, err := plans.Execute(execCtx).Wait(context.Background())
  619. if err != nil {
  620. return nil, fmt.Errorf("executing io plan: %w", err)
  621. }
  622. var blocks []stgmod.ObjectBlock
  623. var evtTargetBlocks []stgmod.Block
  624. var evtBlockTrans []stgmod.DataTransfer
  625. for i, stg := range uploadStgs {
  626. r := ret[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue)
  627. blocks = append(blocks, stgmod.ObjectBlock{
  628. ObjectID: obj.Object.ObjectID,
  629. Index: i,
  630. StorageID: stg.Storage.Storage.StorageID,
  631. FileHash: r.Hash,
  632. Size: r.Size,
  633. })
  634. evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{
  635. BlockType: stgmod.BlockTypeSegment,
  636. Index: i,
  637. StorageID: uploadStgs[i].Storage.Storage.StorageID,
  638. })
  639. evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{
  640. SourceStorageID: obj.Blocks[0].StorageID,
  641. TargetStorageID: uploadStgs[i].Storage.Storage.StorageID,
  642. TransferBytes: 1,
  643. })
  644. }
  645. ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{
  646. ObjectID: obj.Object.ObjectID,
  647. PackageID: obj.Object.PackageID,
  648. BlockChanges: []stgmod.BlockChange{
  649. &stgmod.BlockChangeEnDecode{
  650. SourceBlocks: []stgmod.Block{{
  651. BlockType: stgmod.BlockTypeRaw,
  652. StorageID: obj.Blocks[0].StorageID,
  653. }},
  654. TargetBlocks: evtTargetBlocks,
  655. DataTransfers: evtBlockTrans,
  656. },
  657. // 删除原本的文件块
  658. &stgmod.BlockChangeDeleted{
  659. Index: 0,
  660. StorageID: obj.Blocks[0].StorageID,
  661. },
  662. },
  663. })
  664. return &coormq.UpdatingObjectRedundancy{
  665. ObjectID: obj.Object.ObjectID,
  666. FileHash: obj.Object.FileHash,
  667. Size: obj.Object.Size,
  668. Redundancy: red,
  669. Blocks: blocks,
  670. }, nil
  671. }
  672. func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  673. if len(obj.Blocks) == 0 {
  674. return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to rep")
  675. }
  676. srcStg, ok := allStgs[obj.Blocks[0].StorageID]
  677. if !ok {
  678. return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID)
  679. }
  680. if srcStg.Storage.MasterHub == nil {
  681. return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID)
  682. }
  683. // 如果选择的备份节点都是同一个,那么就只要上传一次
  684. uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID })
  685. ft := ioswitch2.NewFromTo()
  686. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream()))
  687. for i, stg := range uploadStgs {
  688. ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
  689. }
  690. plans := exec.NewPlanBuilder()
  691. err := parser.Parse(ft, plans)
  692. if err != nil {
  693. return nil, fmt.Errorf("parsing plan: %w", err)
  694. }
  695. // TODO 添加依赖
  696. execCtx := exec.NewExecContext()
  697. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  698. ret, err := plans.Execute(execCtx).Wait(context.Background())
  699. if err != nil {
  700. return nil, fmt.Errorf("executing io plan: %w", err)
  701. }
  702. var blocks []stgmod.ObjectBlock
  703. var blockChgs []stgmod.BlockChange
  704. for i, stg := range uploadStgs {
  705. r := ret[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue)
  706. blocks = append(blocks, stgmod.ObjectBlock{
  707. ObjectID: obj.Object.ObjectID,
  708. Index: 0,
  709. StorageID: stg.Storage.Storage.StorageID,
  710. FileHash: r.Hash,
  711. Size: r.Size,
  712. })
  713. blockChgs = append(blockChgs, &stgmod.BlockChangeClone{
  714. BlockType: stgmod.BlockTypeRaw,
  715. SourceStorageID: obj.Blocks[0].StorageID,
  716. TargetStorageID: stg.Storage.Storage.StorageID,
  717. TransferBytes: 1,
  718. })
  719. }
  720. // 删除原本的文件块
  721. blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{
  722. Index: 0,
  723. StorageID: obj.Blocks[0].StorageID,
  724. })
  725. ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{
  726. ObjectID: obj.Object.ObjectID,
  727. PackageID: obj.Object.PackageID,
  728. BlockChanges: blockChgs,
  729. })
  730. return &coormq.UpdatingObjectRedundancy{
  731. ObjectID: obj.Object.ObjectID,
  732. FileHash: obj.Object.FileHash,
  733. Size: obj.Object.Size,
  734. Redundancy: red,
  735. Blocks: blocks,
  736. }, nil
  737. }
  738. func (t *CheckPackageRedundancy) repToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadStorages []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  739. return t.noneToEC(ctx, obj, red, uploadStorages, allStgs)
  740. }
  741. func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  742. var chosenBlocks []stgmod.GrouppedObjectBlock
  743. var chosenBlockIndexes []int
  744. var chosenBlockStg []stgmod.StorageDetail
  745. for _, block := range obj.GroupBlocks() {
  746. if len(block.StorageIDs) > 0 {
  747. // TODO 考虑选择最优的节点
  748. stg, ok := allStgs[block.StorageIDs[0]]
  749. if !ok {
  750. continue
  751. }
  752. if stg.Storage.MasterHub == nil {
  753. continue
  754. }
  755. chosenBlocks = append(chosenBlocks, block)
  756. chosenBlockIndexes = append(chosenBlockIndexes, block.Index)
  757. chosenBlockStg = append(chosenBlockStg, stg.Storage)
  758. }
  759. if len(chosenBlocks) == srcRed.K {
  760. break
  761. }
  762. }
  763. if len(chosenBlocks) < srcRed.K {
  764. return nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  765. }
  766. // 如果选择的备份节点都是同一个,那么就只要上传一次
  767. uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID })
  768. planBlder := exec.NewPlanBuilder()
  769. ft := ioswitch2.NewFromTo()
  770. ft.ECParam = srcRed
  771. for i, block := range chosenBlocks {
  772. ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *chosenBlockStg[i].MasterHub, chosenBlockStg[i], ioswitch2.ECStream(block.Index)))
  773. }
  774. for i := range uploadStgs {
  775. ft.AddTo(ioswitch2.NewToShardStoreWithRange(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i), math2.NewRange(0, obj.Object.Size)))
  776. }
  777. err := parser.Parse(ft, planBlder)
  778. if err != nil {
  779. return nil, fmt.Errorf("parsing plan: %w", err)
  780. }
  781. // TODO 添加依赖
  782. execCtx := exec.NewExecContext()
  783. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  784. ioRet, err := planBlder.Execute(execCtx).Wait(context.Background())
  785. if err != nil {
  786. return nil, fmt.Errorf("executing io plan: %w", err)
  787. }
  788. var blocks []stgmod.ObjectBlock
  789. for i := range uploadStgs {
  790. r := ioRet[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue)
  791. blocks = append(blocks, stgmod.ObjectBlock{
  792. ObjectID: obj.Object.ObjectID,
  793. Index: 0,
  794. StorageID: uploadStgs[i].Storage.Storage.StorageID,
  795. FileHash: r.Hash,
  796. Size: r.Size,
  797. })
  798. }
  799. var evtSrcBlocks []stgmod.Block
  800. var evtTargetBlocks []stgmod.Block
  801. for i2, block := range chosenBlocks {
  802. evtSrcBlocks = append(evtSrcBlocks, stgmod.Block{
  803. BlockType: stgmod.BlockTypeEC,
  804. Index: block.Index,
  805. StorageID: chosenBlockStg[i2].Storage.StorageID,
  806. })
  807. }
  808. for _, stg := range uploadStgs {
  809. evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{
  810. BlockType: stgmod.BlockTypeRaw,
  811. Index: 0,
  812. StorageID: stg.Storage.Storage.StorageID,
  813. })
  814. }
  815. var evtBlockTrans []stgmod.DataTransfer
  816. for _, stg := range uploadStgs {
  817. for i2 := range chosenBlocks {
  818. evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{
  819. SourceStorageID: chosenBlockStg[i2].Storage.StorageID,
  820. TargetStorageID: stg.Storage.Storage.StorageID,
  821. TransferBytes: 1,
  822. })
  823. }
  824. }
  825. var blockChgs []stgmod.BlockChange
  826. blockChgs = append(blockChgs, &stgmod.BlockChangeEnDecode{
  827. SourceBlocks: evtSrcBlocks,
  828. TargetBlocks: evtTargetBlocks,
  829. DataTransfers: evtBlockTrans,
  830. })
  831. for _, block := range obj.Blocks {
  832. blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{
  833. Index: block.Index,
  834. StorageID: block.StorageID,
  835. })
  836. }
  837. ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{
  838. ObjectID: obj.Object.ObjectID,
  839. PackageID: obj.Object.PackageID,
  840. BlockChanges: blockChgs,
  841. })
  842. return &coormq.UpdatingObjectRedundancy{
  843. ObjectID: obj.Object.ObjectID,
  844. FileHash: obj.Object.FileHash,
  845. Size: obj.Object.Size,
  846. Redundancy: tarRed,
  847. Blocks: blocks,
  848. }, nil
  849. }
  850. func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadStorages []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  851. grpBlocks := obj.GroupBlocks()
  852. var chosenBlocks []stgmod.GrouppedObjectBlock
  853. var chosenBlockStg []stgmod.StorageDetail
  854. for _, block := range grpBlocks {
  855. if len(block.StorageIDs) > 0 {
  856. stg, ok := allStgs[block.StorageIDs[0]]
  857. if !ok {
  858. continue
  859. }
  860. if stg.Storage.MasterHub == nil {
  861. continue
  862. }
  863. chosenBlocks = append(chosenBlocks, block)
  864. chosenBlockStg = append(chosenBlockStg, stg.Storage)
  865. }
  866. if len(chosenBlocks) == srcRed.K {
  867. break
  868. }
  869. }
  870. if len(chosenBlocks) < srcRed.K {
  871. return nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  872. }
  873. // 目前EC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
  874. planBlder := exec.NewPlanBuilder()
  875. var evtSrcBlocks []stgmod.Block
  876. var evtTargetBlocks []stgmod.Block
  877. ft := ioswitch2.NewFromTo()
  878. ft.ECParam = srcRed
  879. for i, block := range chosenBlocks {
  880. ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *chosenBlockStg[i].MasterHub, chosenBlockStg[i], ioswitch2.ECStream(block.Index)))
  881. evtSrcBlocks = append(evtSrcBlocks, stgmod.Block{
  882. BlockType: stgmod.BlockTypeEC,
  883. Index: block.Index,
  884. StorageID: chosenBlockStg[i].Storage.StorageID,
  885. })
  886. }
  887. var newBlocks []stgmod.ObjectBlock
  888. shouldUpdateBlocks := false
  889. for i, stg := range uploadStorages {
  890. newBlock := stgmod.ObjectBlock{
  891. ObjectID: obj.Object.ObjectID,
  892. Index: i,
  893. StorageID: stg.Storage.Storage.StorageID,
  894. }
  895. grp, ok := lo.Find(grpBlocks, func(grp stgmod.GrouppedObjectBlock) bool { return grp.Index == i })
  896. // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
  897. if ok && lo.Contains(grp.StorageIDs, stg.Storage.Storage.StorageID) {
  898. newBlock.FileHash = grp.FileHash
  899. newBlock.Size = grp.Size
  900. newBlocks = append(newBlocks, newBlock)
  901. continue
  902. }
  903. shouldUpdateBlocks = true
  904. // 否则就要重建出这个节点需要的块
  905. // 输出只需要自己要保存的那一块
  906. ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.ECStream(i), fmt.Sprintf("%d", i)))
  907. evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{
  908. BlockType: stgmod.BlockTypeEC,
  909. Index: i,
  910. StorageID: stg.Storage.Storage.StorageID,
  911. })
  912. newBlocks = append(newBlocks, newBlock)
  913. }
  914. err := parser.Parse(ft, planBlder)
  915. if err != nil {
  916. return nil, fmt.Errorf("parsing plan: %w", err)
  917. }
  918. // 如果没有任何Plan,Wait会直接返回成功
  919. execCtx := exec.NewExecContext()
  920. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  921. ret, err := planBlder.Execute(execCtx).Wait(context.Background())
  922. if err != nil {
  923. return nil, fmt.Errorf("executing io plan: %w", err)
  924. }
  925. if !shouldUpdateBlocks {
  926. return nil, nil
  927. }
  928. for k, v := range ret {
  929. idx, err := strconv.ParseInt(k, 10, 64)
  930. if err != nil {
  931. return nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
  932. }
  933. r := v.(*ops2.ShardInfoValue)
  934. newBlocks[idx].FileHash = r.Hash
  935. newBlocks[idx].Size = r.Size
  936. }
  937. var evtBlockTrans []stgmod.DataTransfer
  938. for _, src := range evtSrcBlocks {
  939. for _, tar := range evtTargetBlocks {
  940. evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{
  941. SourceStorageID: src.StorageID,
  942. TargetStorageID: tar.StorageID,
  943. TransferBytes: 1,
  944. })
  945. }
  946. }
  947. var blockChgs []stgmod.BlockChange
  948. for _, block := range obj.Blocks {
  949. keep := lo.ContainsBy(newBlocks, func(newBlock stgmod.ObjectBlock) bool {
  950. return newBlock.Index == block.Index && newBlock.StorageID == block.StorageID
  951. })
  952. if !keep {
  953. blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{
  954. Index: block.Index,
  955. StorageID: block.StorageID,
  956. })
  957. }
  958. }
  959. blockChgs = append(blockChgs, &stgmod.BlockChangeEnDecode{
  960. SourceBlocks: evtSrcBlocks,
  961. TargetBlocks: evtTargetBlocks,
  962. DataTransfers: evtBlockTrans,
  963. })
  964. ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{
  965. ObjectID: obj.Object.ObjectID,
  966. PackageID: obj.Object.PackageID,
  967. BlockChanges: blockChgs,
  968. })
  969. return &coormq.UpdatingObjectRedundancy{
  970. ObjectID: obj.Object.ObjectID,
  971. FileHash: obj.Object.FileHash,
  972. Size: obj.Object.Size,
  973. Redundancy: tarRed,
  974. Blocks: newBlocks,
  975. }, nil
  976. }
  977. func (t *CheckPackageRedundancy) lrcToLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.LRCRedundancy, tarRed *cdssdk.LRCRedundancy, uploadStorages []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  978. blocksGrpByIndex := obj.GroupBlocks()
  979. var lostBlocks []int
  980. var lostBlockGrps []int
  981. canGroupReconstruct := true
  982. allBlockFlags := make([]bool, srcRed.N)
  983. for _, block := range blocksGrpByIndex {
  984. allBlockFlags[block.Index] = true
  985. }
  986. for i, ok := range allBlockFlags {
  987. grpID := srcRed.FindGroup(i)
  988. if !ok {
  989. if grpID == -1 {
  990. canGroupReconstruct = false
  991. break
  992. }
  993. if len(lostBlocks) > 0 && lostBlockGrps[len(lostBlockGrps)-1] == grpID {
  994. canGroupReconstruct = false
  995. break
  996. }
  997. lostBlocks = append(lostBlocks, i)
  998. lostBlockGrps = append(lostBlockGrps, grpID)
  999. }
  1000. }
  1001. // TODO 产生BlockTransfer事件
  1002. if canGroupReconstruct {
  1003. // return t.groupReconstructLRC(obj, lostBlocks, lostBlockGrps, blocksGrpByIndex, srcRed, uploadStorages)
  1004. }
  1005. return t.reconstructLRC(ctx, obj, blocksGrpByIndex, srcRed, uploadStorages, allStgs)
  1006. }
  1007. /*
  1008. TODO2 修复这一块的代码
  1009. func (t *CheckPackageRedundancy) groupReconstructLRC(obj stgmod.ObjectDetail, lostBlocks []int, lostBlockGrps []int, grpedBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadStorages []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  1010. grped := make(map[int]stgmod.GrouppedObjectBlock)
  1011. for _, b := range grpedBlocks {
  1012. grped[b.Index] = b
  1013. }
  1014. plans := exec.NewPlanBuilder()
  1015. for i := 0; i < len(lostBlocks); i++ {
  1016. var froms []ioswitchlrc.From
  1017. grpEles := red.GetGroupElements(lostBlockGrps[i])
  1018. for _, ele := range grpEles {
  1019. if ele == lostBlocks[i] {
  1020. continue
  1021. }
  1022. froms = append(froms, ioswitchlrc.NewFromStorage(grped[ele].FileHash, nil, ele))
  1023. }
  1024. err := lrcparser.ReconstructGroup(froms, []ioswitchlrc.To{
  1025. ioswitchlrc.NewToStorage(uploadStorages[i].Storage, lostBlocks[i], fmt.Sprintf("%d", lostBlocks[i])),
  1026. }, plans)
  1027. if err != nil {
  1028. return nil, fmt.Errorf("parsing plan: %w", err)
  1029. }
  1030. }
  1031. fmt.Printf("plans: %v\n", plans)
  1032. // 如果没有任何Plan,Wait会直接返回成功
  1033. // TODO 添加依赖
  1034. ret, err := plans.Execute(exec.NewExecContext()).Wait(context.TODO())
  1035. if err != nil {
  1036. return nil, fmt.Errorf("executing io plan: %w", err)
  1037. }
  1038. var newBlocks []stgmod.ObjectBlock
  1039. for _, i := range lostBlocks {
  1040. newBlocks = append(newBlocks, stgmod.ObjectBlock{
  1041. ObjectID: obj.Object.ObjectID,
  1042. Index: i,
  1043. StorageID: uploadStorages[i].Storage.Storage.StorageID,
  1044. FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  1045. })
  1046. }
  1047. for _, b := range grpedBlocks {
  1048. for _, hubID := range b.StorageIDs {
  1049. newBlocks = append(newBlocks, stgmod.ObjectBlock{
  1050. ObjectID: obj.Object.ObjectID,
  1051. Index: b.Index,
  1052. StorageID: hubID,
  1053. FileHash: b.FileHash,
  1054. })
  1055. }
  1056. }
  1057. return &coormq.UpdatingObjectRedundancy{
  1058. ObjectID: obj.Object.ObjectID,
  1059. Redundancy: red,
  1060. Blocks: newBlocks,
  1061. }, nil
  1062. }
  1063. */
  1064. func (t *CheckPackageRedundancy) reconstructLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, grpBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadStorages []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  1065. var chosenBlocks []stgmod.GrouppedObjectBlock
  1066. var chosenBlockStg []stgmod.StorageDetail
  1067. for _, block := range grpBlocks {
  1068. if len(block.StorageIDs) > 0 && block.Index < red.M() {
  1069. stg, ok := allStgs[block.StorageIDs[0]]
  1070. if !ok {
  1071. continue
  1072. }
  1073. if stg.Storage.MasterHub == nil {
  1074. continue
  1075. }
  1076. chosenBlocks = append(chosenBlocks, block)
  1077. chosenBlockStg = append(chosenBlockStg, stg.Storage)
  1078. }
  1079. if len(chosenBlocks) == red.K {
  1080. break
  1081. }
  1082. }
  1083. if len(chosenBlocks) < red.K {
  1084. return nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  1085. }
  1086. // 目前LRC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
  1087. planBlder := exec.NewPlanBuilder()
  1088. var froms []ioswitchlrc.From
  1089. var toes []ioswitchlrc.To
  1090. var newBlocks []stgmod.ObjectBlock
  1091. shouldUpdateBlocks := false
  1092. for i, storage := range uploadStorages {
  1093. newBlock := stgmod.ObjectBlock{
  1094. ObjectID: obj.Object.ObjectID,
  1095. Index: i,
  1096. StorageID: storage.Storage.Storage.StorageID,
  1097. }
  1098. grp, ok := lo.Find(grpBlocks, func(grp stgmod.GrouppedObjectBlock) bool { return grp.Index == i })
  1099. // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
  1100. if ok && lo.Contains(grp.StorageIDs, storage.Storage.Storage.StorageID) {
  1101. newBlock.FileHash = grp.FileHash
  1102. newBlock.Size = grp.Size
  1103. newBlocks = append(newBlocks, newBlock)
  1104. continue
  1105. }
  1106. shouldUpdateBlocks = true
  1107. // 否则就要重建出这个节点需要的块
  1108. for i2, block := range chosenBlocks {
  1109. froms = append(froms, ioswitchlrc.NewFromStorage(block.FileHash, *chosenBlockStg[i2].MasterHub, chosenBlockStg[i2].Storage, block.Index))
  1110. }
  1111. // 输出只需要自己要保存的那一块
  1112. toes = append(toes, ioswitchlrc.NewToStorage(*storage.Storage.MasterHub, storage.Storage.Storage, i, fmt.Sprintf("%d", i)))
  1113. newBlocks = append(newBlocks, newBlock)
  1114. }
  1115. err := lrcparser.ReconstructAny(froms, toes, planBlder)
  1116. if err != nil {
  1117. return nil, fmt.Errorf("parsing plan: %w", err)
  1118. }
  1119. fmt.Printf("plans: %v\n", planBlder)
  1120. // 如果没有任何Plan,Wait会直接返回成功
  1121. execCtx := exec.NewExecContext()
  1122. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  1123. ret, err := planBlder.Execute(execCtx).Wait(context.Background())
  1124. if err != nil {
  1125. return nil, fmt.Errorf("executing io plan: %w", err)
  1126. }
  1127. if !shouldUpdateBlocks {
  1128. return nil, nil
  1129. }
  1130. for k, v := range ret {
  1131. idx, err := strconv.ParseInt(k, 10, 64)
  1132. if err != nil {
  1133. return nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
  1134. }
  1135. r := v.(*ops2.ShardInfoValue)
  1136. newBlocks[idx].FileHash = r.Hash
  1137. newBlocks[idx].Size = r.Size
  1138. }
  1139. return &coormq.UpdatingObjectRedundancy{
  1140. ObjectID: obj.Object.ObjectID,
  1141. FileHash: obj.Object.FileHash,
  1142. Size: obj.Object.Size,
  1143. Redundancy: red,
  1144. Blocks: newBlocks,
  1145. }, nil
  1146. }
  1147. // func (t *CheckPackageRedundancy) pinObject(hubID cdssdk.HubID, fileHash string) error {
  1148. // agtCli, err := stgglb.AgentMQPool.Acquire(hubID)
  1149. // if err != nil {
  1150. // return fmt.Errorf("new agent client: %w", err)
  1151. // }
  1152. // defer stgglb.AgentMQPool.Release(agtCli)
  1153. // _, err = agtCli.PinObject(agtmq.ReqPinObject([]string{fileHash}, false))
  1154. // if err != nil {
  1155. // return fmt.Errorf("start pinning object: %w", err)
  1156. // }
  1157. // return nil
  1158. // }
  1159. func init() {
  1160. RegisterMessageConvertor(NewCheckPackageRedundancy)
  1161. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。