You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

check_package_redundancy.go 43 kB


  1. package event
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "time"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  11. "gitlink.org.cn/cloudream/common/utils/math2"
  12. "gitlink.org.cn/cloudream/common/utils/sort2"
  13. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  14. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  15. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
  16. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
  17. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2"
  18. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser"
  19. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
  20. lrcparser "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/parser"
  21. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  22. scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
  23. "gitlink.org.cn/cloudream/storage/scanner/internal/config"
  24. )
  25. type CheckPackageRedundancy struct {
  26. *scevt.CheckPackageRedundancy
  27. }
  28. func NewCheckPackageRedundancy(evt *scevt.CheckPackageRedundancy) *CheckPackageRedundancy {
  29. return &CheckPackageRedundancy{
  30. CheckPackageRedundancy: evt,
  31. }
  32. }
  33. type StorageLoadInfo struct {
  34. Storage stgmod.StorageDetail
  35. AccessAmount float64
  36. }
  37. func (t *CheckPackageRedundancy) TryMerge(other Event) bool {
  38. event, ok := other.(*CheckPackageRedundancy)
  39. if !ok {
  40. return false
  41. }
  42. return event.PackageID == t.PackageID
  43. }
  44. func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
  45. log := logger.WithType[CheckPackageRedundancy]("Event")
  46. startTime := time.Now()
  47. log.Debugf("begin with %v", logger.FormatStruct(t.CheckPackageRedundancy))
  48. defer func() {
  49. log.Debugf("end, time: %v", time.Since(startTime))
  50. }()
  51. // TODO 应该像其他event一样直接读取数据库
  52. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  53. if err != nil {
  54. log.Warnf("new coordinator client: %s", err.Error())
  55. return
  56. }
  57. defer stgglb.CoordinatorMQPool.Release(coorCli)
  58. getObjs, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.PackageID))
  59. if err != nil {
  60. log.Warnf("getting package objects: %s", err.Error())
  61. return
  62. }
  63. stats, err := execCtx.Args.DB.PackageAccessStat().GetByPackageID(execCtx.Args.DB.DefCtx(), t.PackageID)
  64. if err != nil {
  65. log.Warnf("getting package access stats: %s", err.Error())
  66. return
  67. }
  68. // TODO UserID
  69. getStgs, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(1))
  70. if err != nil {
  71. log.Warnf("getting all storages: %s", err.Error())
  72. return
  73. }
  74. if len(getStgs.Storages) == 0 {
  75. log.Warnf("no available storages")
  76. return
  77. }
  78. userAllStorages := make(map[cdssdk.StorageID]*StorageLoadInfo)
  79. for _, stg := range getStgs.Storages {
  80. userAllStorages[stg.Storage.StorageID] = &StorageLoadInfo{
  81. Storage: stg,
  82. }
  83. }
  84. for _, stat := range stats {
  85. info, ok := userAllStorages[stat.StorageID]
  86. if !ok {
  87. continue
  88. }
  89. info.AccessAmount = stat.Amount
  90. }
  91. var changedObjects []coormq.UpdatingObjectRedundancy
  92. defRep := cdssdk.DefaultRepRedundancy
  93. defEC := cdssdk.DefaultECRedundancy
  94. // TODO 目前rep的备份数量固定为2,所以这里直接选出两个节点
  95. // TODO 放到chooseRedundancy函数中
  96. mostBlockStgIDs := t.summaryRepObjectBlockStorages(getObjs.Objects, 2)
  97. newRepStgs := t.chooseNewStoragesForRep(&defRep, userAllStorages)
  98. rechoosedRepStgs := t.rechooseStoragesForRep(mostBlockStgIDs, &defRep, userAllStorages)
  99. newECStgs := t.chooseNewStoragesForEC(&defEC, userAllStorages)
  100. // 加锁
  101. builder := reqbuilder.NewBuilder()
  102. for _, storage := range newRepStgs {
  103. builder.Shard().Buzy(storage.Storage.Storage.StorageID)
  104. }
  105. for _, storage := range newECStgs {
  106. builder.Shard().Buzy(storage.Storage.Storage.StorageID)
  107. }
  108. mutex, err := builder.MutexLock(execCtx.Args.DistLock)
  109. if err != nil {
  110. log.Warnf("acquiring dist lock: %s", err.Error())
  111. return
  112. }
  113. defer mutex.Unlock()
  114. for _, obj := range getObjs.Objects {
  115. var updating *coormq.UpdatingObjectRedundancy
  116. var err error
  117. newRed, selectedStorages := t.chooseRedundancy(obj, userAllStorages)
  118. switch srcRed := obj.Object.Redundancy.(type) {
  119. case *cdssdk.NoneRedundancy:
  120. switch newRed := newRed.(type) {
  121. case *cdssdk.RepRedundancy:
  122. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep")
  123. updating, err = t.noneToRep(execCtx, obj, newRed, newRepStgs, userAllStorages)
  124. case *cdssdk.ECRedundancy:
  125. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec")
  126. updating, err = t.noneToEC(execCtx, obj, newRed, newECStgs, userAllStorages)
  127. case *cdssdk.LRCRedundancy:
  128. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> lrc")
  129. updating, err = t.noneToLRC(execCtx, obj, newRed, selectedStorages, userAllStorages)
  130. case *cdssdk.SegmentRedundancy:
  131. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> segment")
  132. updating, err = t.noneToSeg(execCtx, obj, newRed, selectedStorages, userAllStorages)
  133. }
  134. case *cdssdk.RepRedundancy:
  135. switch newRed := newRed.(type) {
  136. case *cdssdk.RepRedundancy:
  137. updating, err = t.repToRep(execCtx, obj, srcRed, rechoosedRepStgs, userAllStorages)
  138. case *cdssdk.ECRedundancy:
  139. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec")
  140. updating, err = t.repToEC(execCtx, obj, newRed, newECStgs, userAllStorages)
  141. }
  142. case *cdssdk.ECRedundancy:
  143. switch newRed := newRed.(type) {
  144. case *cdssdk.RepRedundancy:
  145. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep")
  146. updating, err = t.ecToRep(execCtx, obj, srcRed, newRed, newRepStgs, userAllStorages)
  147. case *cdssdk.ECRedundancy:
  148. uploadStorages := t.rechooseStoragesForEC(obj, srcRed, userAllStorages)
  149. updating, err = t.ecToEC(execCtx, obj, srcRed, newRed, uploadStorages, userAllStorages)
  150. }
  151. case *cdssdk.LRCRedundancy:
  152. switch newRed := newRed.(type) {
  153. case *cdssdk.LRCRedundancy:
  154. uploadStorages := t.rechooseStoragesForLRC(obj, srcRed, userAllStorages)
  155. updating, err = t.lrcToLRC(execCtx, obj, srcRed, newRed, uploadStorages, userAllStorages)
  156. }
  157. }
  158. if updating != nil {
  159. changedObjects = append(changedObjects, *updating)
  160. }
  161. if err != nil {
  162. log.WithField("ObjectID", obj.Object.ObjectID).Warnf("%s, its redundancy wont be changed", err.Error())
  163. }
  164. }
  165. if len(changedObjects) == 0 {
  166. return
  167. }
  168. _, err = coorCli.UpdateObjectRedundancy(coormq.ReqUpdateObjectRedundancy(changedObjects))
  169. if err != nil {
  170. log.Warnf("requesting to change object redundancy: %s", err.Error())
  171. return
  172. }
  173. }
  174. func (t *CheckPackageRedundancy) chooseRedundancy(obj stgmod.ObjectDetail, userAllStgs map[cdssdk.StorageID]*StorageLoadInfo) (cdssdk.Redundancy, []*StorageLoadInfo) {
  175. switch obj.Object.Redundancy.(type) {
  176. case *cdssdk.NoneRedundancy:
  177. if obj.Object.Size > config.Cfg().ECFileSizeThreshold {
  178. newStgs := t.chooseNewStoragesForEC(&cdssdk.DefaultECRedundancy, userAllStgs)
  179. return &cdssdk.DefaultECRedundancy, newStgs
  180. }
  181. return &cdssdk.DefaultRepRedundancy, t.chooseNewStoragesForRep(&cdssdk.DefaultRepRedundancy, userAllStgs)
  182. case *cdssdk.RepRedundancy:
  183. if obj.Object.Size > config.Cfg().ECFileSizeThreshold {
  184. newStgs := t.chooseNewStoragesForEC(&cdssdk.DefaultECRedundancy, userAllStgs)
  185. return &cdssdk.DefaultECRedundancy, newStgs
  186. }
  187. case *cdssdk.ECRedundancy:
  188. if obj.Object.Size <= config.Cfg().ECFileSizeThreshold {
  189. return &cdssdk.DefaultRepRedundancy, t.chooseNewStoragesForRep(&cdssdk.DefaultRepRedundancy, userAllStgs)
  190. }
  191. case *cdssdk.LRCRedundancy:
  192. newLRCStgs := t.rechooseStoragesForLRC(obj, &cdssdk.DefaultLRCRedundancy, userAllStgs)
  193. return &cdssdk.DefaultLRCRedundancy, newLRCStgs
  194. }
  195. return nil, nil
  196. }
  197. // 统计每个对象块所在的节点,选出块最多的不超过storageCnt个节点
  198. func (t *CheckPackageRedundancy) summaryRepObjectBlockStorages(objs []stgmod.ObjectDetail, storageCnt int) []cdssdk.StorageID {
  199. type stgBlocks struct {
  200. StorageID cdssdk.StorageID
  201. Count int
  202. }
  203. stgBlocksMap := make(map[cdssdk.StorageID]*stgBlocks)
  204. for _, obj := range objs {
  205. shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold
  206. if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok && !shouldUseEC {
  207. for _, block := range obj.Blocks {
  208. if _, ok := stgBlocksMap[block.StorageID]; !ok {
  209. stgBlocksMap[block.StorageID] = &stgBlocks{
  210. StorageID: block.StorageID,
  211. Count: 0,
  212. }
  213. }
  214. stgBlocksMap[block.StorageID].Count++
  215. }
  216. }
  217. }
  218. storages := lo.Values(stgBlocksMap)
  219. sort2.Sort(storages, func(left *stgBlocks, right *stgBlocks) int {
  220. return right.Count - left.Count
  221. })
  222. ids := lo.Map(storages, func(item *stgBlocks, idx int) cdssdk.StorageID { return item.StorageID })
  223. if len(ids) > storageCnt {
  224. ids = ids[:storageCnt]
  225. }
  226. return ids
  227. }
  228. func (t *CheckPackageRedundancy) chooseNewStoragesForRep(red *cdssdk.RepRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  229. sortedStorages := sort2.Sort(lo.Values(allStgs), func(left *StorageLoadInfo, right *StorageLoadInfo) int {
  230. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  231. })
  232. return t.chooseSoManyStorages(red.RepCount, sortedStorages)
  233. }
  234. func (t *CheckPackageRedundancy) chooseNewStoragesForEC(red *cdssdk.ECRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  235. sortedStorages := sort2.Sort(lo.Values(allStgs), func(left *StorageLoadInfo, right *StorageLoadInfo) int {
  236. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  237. })
  238. return t.chooseSoManyStorages(red.N, sortedStorages)
  239. }
  240. func (t *CheckPackageRedundancy) chooseNewStoragesForLRC(red *cdssdk.LRCRedundancy, allStorages map[cdssdk.HubID]*StorageLoadInfo) []*StorageLoadInfo {
  241. sortedStorages := sort2.Sort(lo.Values(allStorages), func(left *StorageLoadInfo, right *StorageLoadInfo) int {
  242. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  243. })
  244. return t.chooseSoManyStorages(red.N, sortedStorages)
  245. }
  246. func (t *CheckPackageRedundancy) chooseNewStoragesForSeg(segCount int, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  247. sortedStorages := sort2.Sort(lo.Values(allStgs), func(left *StorageLoadInfo, right *StorageLoadInfo) int {
  248. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  249. })
  250. return t.chooseSoManyStorages(segCount, sortedStorages)
  251. }
  252. func (t *CheckPackageRedundancy) rechooseStoragesForRep(mostBlockStgIDs []cdssdk.StorageID, red *cdssdk.RepRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  253. type rechooseStorage struct {
  254. *StorageLoadInfo
  255. HasBlock bool
  256. }
  257. var rechooseStgs []*rechooseStorage
  258. for _, stg := range allStgs {
  259. hasBlock := false
  260. for _, id := range mostBlockStgIDs {
  261. if id == stg.Storage.Storage.StorageID {
  262. hasBlock = true
  263. break
  264. }
  265. }
  266. rechooseStgs = append(rechooseStgs, &rechooseStorage{
  267. StorageLoadInfo: stg,
  268. HasBlock: hasBlock,
  269. })
  270. }
  271. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStorage, right *rechooseStorage) int {
  272. // 已经缓存了文件块的节点优先选择
  273. v := sort2.CmpBool(right.HasBlock, left.HasBlock)
  274. if v != 0 {
  275. return v
  276. }
  277. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  278. })
  279. return t.chooseSoManyStorages(red.RepCount, lo.Map(sortedStgs, func(storage *rechooseStorage, idx int) *StorageLoadInfo { return storage.StorageLoadInfo }))
  280. }
  281. func (t *CheckPackageRedundancy) rechooseStoragesForEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  282. type rechooseStg struct {
  283. *StorageLoadInfo
  284. CachedBlockIndex int
  285. }
  286. var rechooseStgs []*rechooseStg
  287. for _, stg := range allStgs {
  288. cachedBlockIndex := -1
  289. for _, block := range obj.Blocks {
  290. if block.StorageID == stg.Storage.Storage.StorageID {
  291. cachedBlockIndex = block.Index
  292. break
  293. }
  294. }
  295. rechooseStgs = append(rechooseStgs, &rechooseStg{
  296. StorageLoadInfo: stg,
  297. CachedBlockIndex: cachedBlockIndex,
  298. })
  299. }
  300. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStg, right *rechooseStg) int {
  301. // 已经缓存了文件块的节点优先选择
  302. v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
  303. if v != 0 {
  304. return v
  305. }
  306. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  307. })
  308. // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
  309. return t.chooseSoManyStorages(red.N, lo.Map(sortedStgs, func(storage *rechooseStg, idx int) *StorageLoadInfo { return storage.StorageLoadInfo }))
  310. }
  311. func (t *CheckPackageRedundancy) rechooseStoragesForLRC(obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  312. type rechooseStg struct {
  313. *StorageLoadInfo
  314. CachedBlockIndex int
  315. }
  316. var rechooseStgs []*rechooseStg
  317. for _, stg := range allStgs {
  318. cachedBlockIndex := -1
  319. for _, block := range obj.Blocks {
  320. if block.StorageID == stg.Storage.Storage.StorageID {
  321. cachedBlockIndex = block.Index
  322. break
  323. }
  324. }
  325. rechooseStgs = append(rechooseStgs, &rechooseStg{
  326. StorageLoadInfo: stg,
  327. CachedBlockIndex: cachedBlockIndex,
  328. })
  329. }
  330. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStg, right *rechooseStg) int {
  331. // 已经缓存了文件块的节点优先选择
  332. v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
  333. if v != 0 {
  334. return v
  335. }
  336. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  337. })
  338. // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
  339. return t.chooseSoManyStorages(red.N, lo.Map(sortedStgs, func(storage *rechooseStg, idx int) *StorageLoadInfo { return storage.StorageLoadInfo }))
  340. }
  341. func (t *CheckPackageRedundancy) chooseSoManyStorages(count int, stgs []*StorageLoadInfo) []*StorageLoadInfo {
  342. repeateCount := (count + len(stgs) - 1) / len(stgs)
  343. extendStgs := make([]*StorageLoadInfo, repeateCount*len(stgs))
  344. // 使用复制的方式将节点数扩充到要求的数量
  345. // 复制之后的结构:ABCD -> AAABBBCCCDDD
  346. for p := 0; p < repeateCount; p++ {
  347. for i, storage := range stgs {
  348. putIdx := i*repeateCount + p
  349. extendStgs[putIdx] = storage
  350. }
  351. }
  352. extendStgs = extendStgs[:count]
  353. var chosen []*StorageLoadInfo
  354. for len(chosen) < count {
  355. // 在每一轮内都选不同地区的节点,如果节点数不够,那么就再来一轮
  356. chosenLocations := make(map[cdssdk.LocationID]bool)
  357. for i, stg := range extendStgs {
  358. if stg == nil {
  359. continue
  360. }
  361. if chosenLocations[stg.Storage.MasterHub.LocationID] {
  362. continue
  363. }
  364. chosen = append(chosen, stg)
  365. chosenLocations[stg.Storage.MasterHub.LocationID] = true
  366. extendStgs[i] = nil
  367. }
  368. }
  369. return chosen
  370. }
  371. func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  372. if len(obj.Blocks) == 0 {
  373. return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to rep")
  374. }
  375. srcStg, ok := allStgs[obj.Blocks[0].StorageID]
  376. if !ok {
  377. return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID)
  378. }
  379. if srcStg.Storage.MasterHub == nil {
  380. return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID)
  381. }
  382. // 如果选择的备份节点都是同一个,那么就只要上传一次
  383. uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID })
  384. ft := ioswitch2.NewFromTo()
  385. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream()))
  386. for i, stg := range uploadStgs {
  387. ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
  388. }
  389. plans := exec.NewPlanBuilder()
  390. err := parser.Parse(ft, plans)
  391. if err != nil {
  392. return nil, fmt.Errorf("parsing plan: %w", err)
  393. }
  394. // TODO 添加依赖
  395. execCtx := exec.NewExecContext()
  396. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  397. ret, err := plans.Execute(execCtx).Wait(context.Background())
  398. if err != nil {
  399. return nil, fmt.Errorf("executing io plan: %w", err)
  400. }
  401. var blocks []stgmod.ObjectBlock
  402. var blockChgs []stgmod.BlockChange
  403. for i, stg := range uploadStgs {
  404. blocks = append(blocks, stgmod.ObjectBlock{
  405. ObjectID: obj.Object.ObjectID,
  406. Index: 0,
  407. StorageID: stg.Storage.Storage.StorageID,
  408. FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  409. })
  410. blockChgs = append(blockChgs, &stgmod.BlockChangeClone{
  411. BlockType: stgmod.BlockTypeRaw,
  412. SourceStorageID: obj.Blocks[0].StorageID,
  413. TargetStorageID: stg.Storage.Storage.StorageID,
  414. TransferBytes: 1,
  415. })
  416. }
  417. // 删除原本的文件块
  418. blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{
  419. Index: 0,
  420. StorageID: obj.Blocks[0].StorageID,
  421. })
  422. ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{
  423. ObjectID: obj.Object.ObjectID,
  424. PackageID: obj.Object.PackageID,
  425. BlockChanges: blockChgs,
  426. })
  427. return &coormq.UpdatingObjectRedundancy{
  428. ObjectID: obj.Object.ObjectID,
  429. Redundancy: red,
  430. Blocks: blocks,
  431. }, nil
  432. }
  433. func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  434. if len(obj.Blocks) == 0 {
  435. return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to ec")
  436. }
  437. srcStg, ok := allStgs[obj.Blocks[0].StorageID]
  438. if !ok {
  439. return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID)
  440. }
  441. if srcStg.Storage.MasterHub == nil {
  442. return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID)
  443. }
  444. ft := ioswitch2.NewFromTo()
  445. ft.ECParam = red
  446. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream()))
  447. for i := 0; i < red.N; i++ {
  448. ft.AddTo(ioswitch2.NewToShardStore(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage, ioswitch2.ECStream(i), fmt.Sprintf("%d", i)))
  449. }
  450. plans := exec.NewPlanBuilder()
  451. err := parser.Parse(ft, plans)
  452. if err != nil {
  453. return nil, fmt.Errorf("parsing plan: %w", err)
  454. }
  455. execCtx := exec.NewExecContext()
  456. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  457. ioRet, err := plans.Execute(execCtx).Wait(context.Background())
  458. if err != nil {
  459. return nil, fmt.Errorf("executing io plan: %w", err)
  460. }
  461. var blocks []stgmod.ObjectBlock
  462. var evtTargetBlocks []stgmod.Block
  463. var evtBlockTrans []stgmod.DataTransfer
  464. for i := 0; i < red.N; i++ {
  465. blocks = append(blocks, stgmod.ObjectBlock{
  466. ObjectID: obj.Object.ObjectID,
  467. Index: i,
  468. StorageID: uploadStgs[i].Storage.Storage.StorageID,
  469. FileHash: ioRet[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  470. })
  471. evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{
  472. BlockType: stgmod.BlockTypeEC,
  473. Index: i,
  474. StorageID: uploadStgs[i].Storage.Storage.StorageID,
  475. })
  476. evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{
  477. SourceStorageID: obj.Blocks[0].StorageID,
  478. TargetStorageID: uploadStgs[i].Storage.Storage.StorageID,
  479. TransferBytes: 1,
  480. })
  481. }
  482. ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{
  483. ObjectID: obj.Object.ObjectID,
  484. PackageID: obj.Object.PackageID,
  485. BlockChanges: []stgmod.BlockChange{
  486. &stgmod.BlockChangeEnDecode{
  487. SourceBlocks: []stgmod.Block{{
  488. BlockType: stgmod.BlockTypeRaw,
  489. StorageID: obj.Blocks[0].StorageID,
  490. }},
  491. TargetBlocks: evtTargetBlocks,
  492. DataTransfers: evtBlockTrans,
  493. },
  494. // 删除原本的文件块
  495. &stgmod.BlockChangeDeleted{
  496. Index: 0,
  497. StorageID: obj.Blocks[0].StorageID,
  498. },
  499. },
  500. })
  501. return &coormq.UpdatingObjectRedundancy{
  502. ObjectID: obj.Object.ObjectID,
  503. Redundancy: red,
  504. Blocks: blocks,
  505. }, nil
  506. }
  507. func (t *CheckPackageRedundancy) noneToLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  508. if len(obj.Blocks) == 0 {
  509. return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to ec")
  510. }
  511. srcStg, ok := allStgs[obj.Blocks[0].StorageID]
  512. if !ok {
  513. return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID)
  514. }
  515. if srcStg.Storage.MasterHub == nil {
  516. return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID)
  517. }
  518. var toes []ioswitchlrc.To
  519. for i := 0; i < red.N; i++ {
  520. toes = append(toes, ioswitchlrc.NewToStorage(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, i, fmt.Sprintf("%d", i)))
  521. }
  522. plans := exec.NewPlanBuilder()
  523. err := lrcparser.Encode(ioswitchlrc.NewFromStorage(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage.Storage, -1), toes, plans)
  524. if err != nil {
  525. return nil, fmt.Errorf("parsing plan: %w", err)
  526. }
  527. execCtx := exec.NewExecContext()
  528. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  529. ioRet, err := plans.Execute(execCtx).Wait(context.Background())
  530. if err != nil {
  531. return nil, fmt.Errorf("executing io plan: %w", err)
  532. }
  533. var blocks []stgmod.ObjectBlock
  534. var evtTargetBlocks []stgmod.Block
  535. var evtBlockTrans []stgmod.DataTransfer
  536. for i := 0; i < red.N; i++ {
  537. blocks = append(blocks, stgmod.ObjectBlock{
  538. ObjectID: obj.Object.ObjectID,
  539. Index: i,
  540. StorageID: uploadStgs[i].Storage.Storage.StorageID,
  541. FileHash: ioRet[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  542. })
  543. evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{
  544. BlockType: stgmod.BlockTypeEC,
  545. Index: i,
  546. StorageID: uploadStgs[i].Storage.Storage.StorageID,
  547. })
  548. evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{
  549. SourceStorageID: obj.Blocks[0].StorageID,
  550. TargetStorageID: uploadStgs[i].Storage.Storage.StorageID,
  551. TransferBytes: 1,
  552. })
  553. }
  554. ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{
  555. ObjectID: obj.Object.ObjectID,
  556. PackageID: obj.Object.PackageID,
  557. BlockChanges: []stgmod.BlockChange{
  558. &stgmod.BlockChangeEnDecode{
  559. SourceBlocks: []stgmod.Block{{
  560. BlockType: stgmod.BlockTypeRaw,
  561. StorageID: obj.Blocks[0].StorageID,
  562. }},
  563. TargetBlocks: evtTargetBlocks,
  564. DataTransfers: evtBlockTrans,
  565. },
  566. // 删除原本的文件块
  567. &stgmod.BlockChangeDeleted{
  568. Index: 0,
  569. StorageID: obj.Blocks[0].StorageID,
  570. },
  571. },
  572. })
  573. return &coormq.UpdatingObjectRedundancy{
  574. ObjectID: obj.Object.ObjectID,
  575. Redundancy: red,
  576. Blocks: blocks,
  577. }, nil
  578. }
  579. func (t *CheckPackageRedundancy) noneToSeg(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.SegmentRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  580. if len(obj.Blocks) == 0 {
  581. return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to rep")
  582. }
  583. srcStg, ok := allStgs[obj.Blocks[0].StorageID]
  584. if !ok {
  585. return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID)
  586. }
  587. if srcStg.Storage.MasterHub == nil {
  588. return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID)
  589. }
  590. // 如果选择的备份节点都是同一个,那么就只要上传一次
  591. uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID })
  592. ft := ioswitch2.NewFromTo()
  593. ft.SegmentParam = red
  594. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream()))
  595. for i, stg := range uploadStgs {
  596. ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.SegmentStream(i), fmt.Sprintf("%d", i)))
  597. }
  598. plans := exec.NewPlanBuilder()
  599. err := parser.Parse(ft, plans)
  600. if err != nil {
  601. return nil, fmt.Errorf("parsing plan: %w", err)
  602. }
  603. // TODO 添加依赖
  604. execCtx := exec.NewExecContext()
  605. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  606. ret, err := plans.Execute(execCtx).Wait(context.Background())
  607. if err != nil {
  608. return nil, fmt.Errorf("executing io plan: %w", err)
  609. }
  610. var blocks []stgmod.ObjectBlock
  611. var evtTargetBlocks []stgmod.Block
  612. var evtBlockTrans []stgmod.DataTransfer
  613. for i, stg := range uploadStgs {
  614. blocks = append(blocks, stgmod.ObjectBlock{
  615. ObjectID: obj.Object.ObjectID,
  616. Index: i,
  617. StorageID: stg.Storage.Storage.StorageID,
  618. FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  619. })
  620. evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{
  621. BlockType: stgmod.BlockTypeSegment,
  622. Index: i,
  623. StorageID: uploadStgs[i].Storage.Storage.StorageID,
  624. })
  625. evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{
  626. SourceStorageID: obj.Blocks[0].StorageID,
  627. TargetStorageID: uploadStgs[i].Storage.Storage.StorageID,
  628. TransferBytes: 1,
  629. })
  630. }
  631. ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{
  632. ObjectID: obj.Object.ObjectID,
  633. PackageID: obj.Object.PackageID,
  634. BlockChanges: []stgmod.BlockChange{
  635. &stgmod.BlockChangeEnDecode{
  636. SourceBlocks: []stgmod.Block{{
  637. BlockType: stgmod.BlockTypeRaw,
  638. StorageID: obj.Blocks[0].StorageID,
  639. }},
  640. TargetBlocks: evtTargetBlocks,
  641. DataTransfers: evtBlockTrans,
  642. },
  643. // 删除原本的文件块
  644. &stgmod.BlockChangeDeleted{
  645. Index: 0,
  646. StorageID: obj.Blocks[0].StorageID,
  647. },
  648. },
  649. })
  650. return &coormq.UpdatingObjectRedundancy{
  651. ObjectID: obj.Object.ObjectID,
  652. Redundancy: red,
  653. Blocks: blocks,
  654. }, nil
  655. }
  656. func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  657. if len(obj.Blocks) == 0 {
  658. return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to rep")
  659. }
  660. srcStg, ok := allStgs[obj.Blocks[0].StorageID]
  661. if !ok {
  662. return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID)
  663. }
  664. if srcStg.Storage.MasterHub == nil {
  665. return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID)
  666. }
  667. // 如果选择的备份节点都是同一个,那么就只要上传一次
  668. uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID })
  669. ft := ioswitch2.NewFromTo()
  670. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.Storage.MasterHub, srcStg.Storage, ioswitch2.RawStream()))
  671. for i, stg := range uploadStgs {
  672. ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
  673. }
  674. plans := exec.NewPlanBuilder()
  675. err := parser.Parse(ft, plans)
  676. if err != nil {
  677. return nil, fmt.Errorf("parsing plan: %w", err)
  678. }
  679. // TODO 添加依赖
  680. execCtx := exec.NewExecContext()
  681. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  682. ret, err := plans.Execute(execCtx).Wait(context.Background())
  683. if err != nil {
  684. return nil, fmt.Errorf("executing io plan: %w", err)
  685. }
  686. var blocks []stgmod.ObjectBlock
  687. var blockChgs []stgmod.BlockChange
  688. for i, stg := range uploadStgs {
  689. blocks = append(blocks, stgmod.ObjectBlock{
  690. ObjectID: obj.Object.ObjectID,
  691. Index: 0,
  692. StorageID: stg.Storage.Storage.StorageID,
  693. FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  694. })
  695. blockChgs = append(blockChgs, &stgmod.BlockChangeClone{
  696. BlockType: stgmod.BlockTypeRaw,
  697. SourceStorageID: obj.Blocks[0].StorageID,
  698. TargetStorageID: stg.Storage.Storage.StorageID,
  699. TransferBytes: 1,
  700. })
  701. }
  702. // 删除原本的文件块
  703. blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{
  704. Index: 0,
  705. StorageID: obj.Blocks[0].StorageID,
  706. })
  707. ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{
  708. ObjectID: obj.Object.ObjectID,
  709. PackageID: obj.Object.PackageID,
  710. BlockChanges: blockChgs,
  711. })
  712. return &coormq.UpdatingObjectRedundancy{
  713. ObjectID: obj.Object.ObjectID,
  714. Redundancy: red,
  715. Blocks: blocks,
  716. }, nil
  717. }
  718. func (t *CheckPackageRedundancy) repToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadStorages []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  719. return t.noneToEC(ctx, obj, red, uploadStorages, allStgs)
  720. }
  721. func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  722. var chosenBlocks []stgmod.GrouppedObjectBlock
  723. var chosenBlockIndexes []int
  724. var chosenBlockStg []stgmod.StorageDetail
  725. for _, block := range obj.GroupBlocks() {
  726. if len(block.StorageIDs) > 0 {
  727. // TODO 考虑选择最优的节点
  728. stg, ok := allStgs[block.StorageIDs[0]]
  729. if !ok {
  730. continue
  731. }
  732. if stg.Storage.MasterHub == nil {
  733. continue
  734. }
  735. chosenBlocks = append(chosenBlocks, block)
  736. chosenBlockIndexes = append(chosenBlockIndexes, block.Index)
  737. chosenBlockStg = append(chosenBlockStg, stg.Storage)
  738. }
  739. if len(chosenBlocks) == srcRed.K {
  740. break
  741. }
  742. }
  743. if len(chosenBlocks) < srcRed.K {
  744. return nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  745. }
  746. // 如果选择的备份节点都是同一个,那么就只要上传一次
  747. uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID })
  748. planBlder := exec.NewPlanBuilder()
  749. ft := ioswitch2.NewFromTo()
  750. ft.ECParam = srcRed
  751. for i, block := range chosenBlocks {
  752. ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *chosenBlockStg[i].MasterHub, chosenBlockStg[i], ioswitch2.ECStream(block.Index)))
  753. }
  754. for i := range uploadStgs {
  755. ft.AddTo(ioswitch2.NewToShardStoreWithRange(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i), math2.NewRange(0, obj.Object.Size)))
  756. }
  757. err := parser.Parse(ft, planBlder)
  758. if err != nil {
  759. return nil, fmt.Errorf("parsing plan: %w", err)
  760. }
  761. // TODO 添加依赖
  762. execCtx := exec.NewExecContext()
  763. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  764. ioRet, err := planBlder.Execute(execCtx).Wait(context.Background())
  765. if err != nil {
  766. return nil, fmt.Errorf("executing io plan: %w", err)
  767. }
  768. var blocks []stgmod.ObjectBlock
  769. for i := range uploadStgs {
  770. blocks = append(blocks, stgmod.ObjectBlock{
  771. ObjectID: obj.Object.ObjectID,
  772. Index: 0,
  773. StorageID: uploadStgs[i].Storage.Storage.StorageID,
  774. FileHash: ioRet[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  775. })
  776. }
  777. var evtSrcBlocks []stgmod.Block
  778. var evtTargetBlocks []stgmod.Block
  779. for i2, block := range chosenBlocks {
  780. evtSrcBlocks = append(evtSrcBlocks, stgmod.Block{
  781. BlockType: stgmod.BlockTypeEC,
  782. Index: block.Index,
  783. StorageID: chosenBlockStg[i2].Storage.StorageID,
  784. })
  785. }
  786. for _, stg := range uploadStgs {
  787. evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{
  788. BlockType: stgmod.BlockTypeRaw,
  789. Index: 0,
  790. StorageID: stg.Storage.Storage.StorageID,
  791. })
  792. }
  793. var evtBlockTrans []stgmod.DataTransfer
  794. for _, stg := range uploadStgs {
  795. for i2 := range chosenBlocks {
  796. evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{
  797. SourceStorageID: chosenBlockStg[i2].Storage.StorageID,
  798. TargetStorageID: stg.Storage.Storage.StorageID,
  799. TransferBytes: 1,
  800. })
  801. }
  802. }
  803. var blockChgs []stgmod.BlockChange
  804. blockChgs = append(blockChgs, &stgmod.BlockChangeEnDecode{
  805. SourceBlocks: evtSrcBlocks,
  806. TargetBlocks: evtTargetBlocks,
  807. DataTransfers: evtBlockTrans,
  808. })
  809. for _, block := range obj.Blocks {
  810. blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{
  811. Index: block.Index,
  812. StorageID: block.StorageID,
  813. })
  814. }
  815. ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{
  816. ObjectID: obj.Object.ObjectID,
  817. PackageID: obj.Object.PackageID,
  818. BlockChanges: blockChgs,
  819. })
  820. return &coormq.UpdatingObjectRedundancy{
  821. ObjectID: obj.Object.ObjectID,
  822. Redundancy: tarRed,
  823. Blocks: blocks,
  824. }, nil
  825. }
  826. func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadStorages []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  827. grpBlocks := obj.GroupBlocks()
  828. var chosenBlocks []stgmod.GrouppedObjectBlock
  829. var chosenBlockStg []stgmod.StorageDetail
  830. for _, block := range grpBlocks {
  831. if len(block.StorageIDs) > 0 {
  832. stg, ok := allStgs[block.StorageIDs[0]]
  833. if !ok {
  834. continue
  835. }
  836. if stg.Storage.MasterHub == nil {
  837. continue
  838. }
  839. chosenBlocks = append(chosenBlocks, block)
  840. chosenBlockStg = append(chosenBlockStg, stg.Storage)
  841. }
  842. if len(chosenBlocks) == srcRed.K {
  843. break
  844. }
  845. }
  846. if len(chosenBlocks) < srcRed.K {
  847. return nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  848. }
  849. // 目前EC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
  850. planBlder := exec.NewPlanBuilder()
  851. var evtSrcBlocks []stgmod.Block
  852. var evtTargetBlocks []stgmod.Block
  853. ft := ioswitch2.NewFromTo()
  854. ft.ECParam = srcRed
  855. for i, block := range chosenBlocks {
  856. ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *chosenBlockStg[i].MasterHub, chosenBlockStg[i], ioswitch2.ECStream(block.Index)))
  857. evtSrcBlocks = append(evtSrcBlocks, stgmod.Block{
  858. BlockType: stgmod.BlockTypeEC,
  859. Index: block.Index,
  860. StorageID: chosenBlockStg[i].Storage.StorageID,
  861. })
  862. }
  863. var newBlocks []stgmod.ObjectBlock
  864. shouldUpdateBlocks := false
  865. for i, stg := range uploadStorages {
  866. newBlock := stgmod.ObjectBlock{
  867. ObjectID: obj.Object.ObjectID,
  868. Index: i,
  869. StorageID: stg.Storage.Storage.StorageID,
  870. }
  871. grp, ok := lo.Find(grpBlocks, func(grp stgmod.GrouppedObjectBlock) bool { return grp.Index == i })
  872. // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
  873. if ok && lo.Contains(grp.StorageIDs, stg.Storage.Storage.StorageID) {
  874. newBlock.FileHash = grp.FileHash
  875. newBlocks = append(newBlocks, newBlock)
  876. continue
  877. }
  878. shouldUpdateBlocks = true
  879. // 否则就要重建出这个节点需要的块
  880. // 输出只需要自己要保存的那一块
  881. ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.ECStream(i), fmt.Sprintf("%d", i)))
  882. evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{
  883. BlockType: stgmod.BlockTypeEC,
  884. Index: i,
  885. StorageID: stg.Storage.Storage.StorageID,
  886. })
  887. newBlocks = append(newBlocks, newBlock)
  888. }
  889. err := parser.Parse(ft, planBlder)
  890. if err != nil {
  891. return nil, fmt.Errorf("parsing plan: %w", err)
  892. }
  893. // 如果没有任何Plan,Wait会直接返回成功
  894. execCtx := exec.NewExecContext()
  895. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  896. ret, err := planBlder.Execute(execCtx).Wait(context.Background())
  897. if err != nil {
  898. return nil, fmt.Errorf("executing io plan: %w", err)
  899. }
  900. if !shouldUpdateBlocks {
  901. return nil, nil
  902. }
  903. for k, v := range ret {
  904. idx, err := strconv.ParseInt(k, 10, 64)
  905. if err != nil {
  906. return nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
  907. }
  908. newBlocks[idx].FileHash = v.(*ops2.FileHashValue).Hash
  909. }
  910. var evtBlockTrans []stgmod.DataTransfer
  911. for _, src := range evtSrcBlocks {
  912. for _, tar := range evtTargetBlocks {
  913. evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{
  914. SourceStorageID: src.StorageID,
  915. TargetStorageID: tar.StorageID,
  916. TransferBytes: 1,
  917. })
  918. }
  919. }
  920. var blockChgs []stgmod.BlockChange
  921. for _, block := range obj.Blocks {
  922. keep := lo.ContainsBy(newBlocks, func(newBlock stgmod.ObjectBlock) bool {
  923. return newBlock.Index == block.Index && newBlock.StorageID == block.StorageID
  924. })
  925. if !keep {
  926. blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{
  927. Index: block.Index,
  928. StorageID: block.StorageID,
  929. })
  930. }
  931. }
  932. blockChgs = append(blockChgs, &stgmod.BlockChangeEnDecode{
  933. SourceBlocks: evtSrcBlocks,
  934. TargetBlocks: evtTargetBlocks,
  935. DataTransfers: evtBlockTrans,
  936. })
  937. ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{
  938. ObjectID: obj.Object.ObjectID,
  939. PackageID: obj.Object.PackageID,
  940. BlockChanges: blockChgs,
  941. })
  942. return &coormq.UpdatingObjectRedundancy{
  943. ObjectID: obj.Object.ObjectID,
  944. Redundancy: tarRed,
  945. Blocks: newBlocks,
  946. }, nil
  947. }
  948. func (t *CheckPackageRedundancy) lrcToLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.LRCRedundancy, tarRed *cdssdk.LRCRedundancy, uploadStorages []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  949. blocksGrpByIndex := obj.GroupBlocks()
  950. var lostBlocks []int
  951. var lostBlockGrps []int
  952. canGroupReconstruct := true
  953. allBlockFlags := make([]bool, srcRed.N)
  954. for _, block := range blocksGrpByIndex {
  955. allBlockFlags[block.Index] = true
  956. }
  957. for i, ok := range allBlockFlags {
  958. grpID := srcRed.FindGroup(i)
  959. if !ok {
  960. if grpID == -1 {
  961. canGroupReconstruct = false
  962. break
  963. }
  964. if len(lostBlocks) > 0 && lostBlockGrps[len(lostBlockGrps)-1] == grpID {
  965. canGroupReconstruct = false
  966. break
  967. }
  968. lostBlocks = append(lostBlocks, i)
  969. lostBlockGrps = append(lostBlockGrps, grpID)
  970. }
  971. }
  972. // TODO 产生BlockTransfer事件
  973. if canGroupReconstruct {
  974. // return t.groupReconstructLRC(obj, lostBlocks, lostBlockGrps, blocksGrpByIndex, srcRed, uploadStorages)
  975. }
  976. return t.reconstructLRC(ctx, obj, blocksGrpByIndex, srcRed, uploadStorages, allStgs)
  977. }
  978. /*
  979. TODO2 修复这一块的代码
  980. func (t *CheckPackageRedundancy) groupReconstructLRC(obj stgmod.ObjectDetail, lostBlocks []int, lostBlockGrps []int, grpedBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadStorages []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  981. grped := make(map[int]stgmod.GrouppedObjectBlock)
  982. for _, b := range grpedBlocks {
  983. grped[b.Index] = b
  984. }
  985. plans := exec.NewPlanBuilder()
  986. for i := 0; i < len(lostBlocks); i++ {
  987. var froms []ioswitchlrc.From
  988. grpEles := red.GetGroupElements(lostBlockGrps[i])
  989. for _, ele := range grpEles {
  990. if ele == lostBlocks[i] {
  991. continue
  992. }
  993. froms = append(froms, ioswitchlrc.NewFromStorage(grped[ele].FileHash, nil, ele))
  994. }
  995. err := lrcparser.ReconstructGroup(froms, []ioswitchlrc.To{
  996. ioswitchlrc.NewToStorage(uploadStorages[i].Storage, lostBlocks[i], fmt.Sprintf("%d", lostBlocks[i])),
  997. }, plans)
  998. if err != nil {
  999. return nil, fmt.Errorf("parsing plan: %w", err)
  1000. }
  1001. }
  1002. fmt.Printf("plans: %v\n", plans)
  1003. // 如果没有任何Plan,Wait会直接返回成功
  1004. // TODO 添加依赖
  1005. ret, err := plans.Execute(exec.NewExecContext()).Wait(context.TODO())
  1006. if err != nil {
  1007. return nil, fmt.Errorf("executing io plan: %w", err)
  1008. }
  1009. var newBlocks []stgmod.ObjectBlock
  1010. for _, i := range lostBlocks {
  1011. newBlocks = append(newBlocks, stgmod.ObjectBlock{
  1012. ObjectID: obj.Object.ObjectID,
  1013. Index: i,
  1014. StorageID: uploadStorages[i].Storage.Storage.StorageID,
  1015. FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  1016. })
  1017. }
  1018. for _, b := range grpedBlocks {
  1019. for _, hubID := range b.StorageIDs {
  1020. newBlocks = append(newBlocks, stgmod.ObjectBlock{
  1021. ObjectID: obj.Object.ObjectID,
  1022. Index: b.Index,
  1023. StorageID: hubID,
  1024. FileHash: b.FileHash,
  1025. })
  1026. }
  1027. }
  1028. return &coormq.UpdatingObjectRedundancy{
  1029. ObjectID: obj.Object.ObjectID,
  1030. Redundancy: red,
  1031. Blocks: newBlocks,
  1032. }, nil
  1033. }
  1034. */
  1035. func (t *CheckPackageRedundancy) reconstructLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, grpBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadStorages []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  1036. var chosenBlocks []stgmod.GrouppedObjectBlock
  1037. var chosenBlockStg []stgmod.StorageDetail
  1038. for _, block := range grpBlocks {
  1039. if len(block.StorageIDs) > 0 && block.Index < red.M() {
  1040. stg, ok := allStgs[block.StorageIDs[0]]
  1041. if !ok {
  1042. continue
  1043. }
  1044. if stg.Storage.MasterHub == nil {
  1045. continue
  1046. }
  1047. chosenBlocks = append(chosenBlocks, block)
  1048. chosenBlockStg = append(chosenBlockStg, stg.Storage)
  1049. }
  1050. if len(chosenBlocks) == red.K {
  1051. break
  1052. }
  1053. }
  1054. if len(chosenBlocks) < red.K {
  1055. return nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  1056. }
  1057. // 目前LRC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
  1058. planBlder := exec.NewPlanBuilder()
  1059. var froms []ioswitchlrc.From
  1060. var toes []ioswitchlrc.To
  1061. var newBlocks []stgmod.ObjectBlock
  1062. shouldUpdateBlocks := false
  1063. for i, storage := range uploadStorages {
  1064. newBlock := stgmod.ObjectBlock{
  1065. ObjectID: obj.Object.ObjectID,
  1066. Index: i,
  1067. StorageID: storage.Storage.Storage.StorageID,
  1068. }
  1069. grp, ok := lo.Find(grpBlocks, func(grp stgmod.GrouppedObjectBlock) bool { return grp.Index == i })
  1070. // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
  1071. if ok && lo.Contains(grp.StorageIDs, storage.Storage.Storage.StorageID) {
  1072. newBlock.FileHash = grp.FileHash
  1073. newBlocks = append(newBlocks, newBlock)
  1074. continue
  1075. }
  1076. shouldUpdateBlocks = true
  1077. // 否则就要重建出这个节点需要的块
  1078. for i2, block := range chosenBlocks {
  1079. froms = append(froms, ioswitchlrc.NewFromStorage(block.FileHash, *chosenBlockStg[i2].MasterHub, chosenBlockStg[i2].Storage, block.Index))
  1080. }
  1081. // 输出只需要自己要保存的那一块
  1082. toes = append(toes, ioswitchlrc.NewToStorage(*storage.Storage.MasterHub, storage.Storage.Storage, i, fmt.Sprintf("%d", i)))
  1083. newBlocks = append(newBlocks, newBlock)
  1084. }
  1085. err := lrcparser.ReconstructAny(froms, toes, planBlder)
  1086. if err != nil {
  1087. return nil, fmt.Errorf("parsing plan: %w", err)
  1088. }
  1089. fmt.Printf("plans: %v\n", planBlder)
  1090. // 如果没有任何Plan,Wait会直接返回成功
  1091. execCtx := exec.NewExecContext()
  1092. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  1093. ret, err := planBlder.Execute(execCtx).Wait(context.Background())
  1094. if err != nil {
  1095. return nil, fmt.Errorf("executing io plan: %w", err)
  1096. }
  1097. if !shouldUpdateBlocks {
  1098. return nil, nil
  1099. }
  1100. for k, v := range ret {
  1101. idx, err := strconv.ParseInt(k, 10, 64)
  1102. if err != nil {
  1103. return nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
  1104. }
  1105. newBlocks[idx].FileHash = v.(*ops2.FileHashValue).Hash
  1106. }
  1107. return &coormq.UpdatingObjectRedundancy{
  1108. ObjectID: obj.Object.ObjectID,
  1109. Redundancy: red,
  1110. Blocks: newBlocks,
  1111. }, nil
  1112. }
  1113. // func (t *CheckPackageRedundancy) pinObject(hubID cdssdk.HubID, fileHash string) error {
  1114. // agtCli, err := stgglb.AgentMQPool.Acquire(hubID)
  1115. // if err != nil {
  1116. // return fmt.Errorf("new agent client: %w", err)
  1117. // }
  1118. // defer stgglb.AgentMQPool.Release(agtCli)
  1119. // _, err = agtCli.PinObject(agtmq.ReqPinObject([]string{fileHash}, false))
  1120. // if err != nil {
  1121. // return fmt.Errorf("start pinning object: %w", err)
  1122. // }
  1123. // return nil
  1124. // }
  1125. func init() {
  1126. RegisterMessageConvertor(NewCheckPackageRedundancy)
  1127. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。