You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

check_package_redundancy.go 34 kB


  1. package event
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "time"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  11. "gitlink.org.cn/cloudream/common/utils/sort2"
  12. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  13. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  14. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
  15. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
  16. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2"
  17. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser"
  18. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
  19. lrcparser "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/parser"
  20. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  21. scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
  22. "gitlink.org.cn/cloudream/storage/scanner/internal/config"
  23. )
  24. type CheckPackageRedundancy struct {
  25. *scevt.CheckPackageRedundancy
  26. }
  27. func NewCheckPackageRedundancy(evt *scevt.CheckPackageRedundancy) *CheckPackageRedundancy {
  28. return &CheckPackageRedundancy{
  29. CheckPackageRedundancy: evt,
  30. }
  31. }
  32. type StorageLoadInfo struct {
  33. Storage stgmod.StorageDetail
  34. AccessAmount float64
  35. }
  36. func (t *CheckPackageRedundancy) TryMerge(other Event) bool {
  37. event, ok := other.(*CheckPackageRedundancy)
  38. if !ok {
  39. return false
  40. }
  41. return event.PackageID == t.PackageID
  42. }
  43. func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
  44. log := logger.WithType[CheckPackageRedundancy]("Event")
  45. startTime := time.Now()
  46. log.Debugf("begin with %v", logger.FormatStruct(t.CheckPackageRedundancy))
  47. defer func() {
  48. log.Debugf("end, time: %v", time.Since(startTime))
  49. }()
  50. // TODO 应该像其他event一样直接读取数据库
  51. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  52. if err != nil {
  53. log.Warnf("new coordinator client: %s", err.Error())
  54. return
  55. }
  56. defer stgglb.CoordinatorMQPool.Release(coorCli)
  57. getObjs, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.PackageID))
  58. if err != nil {
  59. log.Warnf("getting package objects: %s", err.Error())
  60. return
  61. }
  62. stats, err := execCtx.Args.DB.PackageAccessStat().GetByPackageID(execCtx.Args.DB.DefCtx(), t.PackageID)
  63. if err != nil {
  64. log.Warnf("getting package access stats: %s", err.Error())
  65. return
  66. }
  67. // TODO UserID
  68. getStgs, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(1))
  69. if err != nil {
  70. log.Warnf("getting all storages: %s", err.Error())
  71. return
  72. }
  73. if len(getStgs.Storages) == 0 {
  74. log.Warnf("no available storages")
  75. return
  76. }
  77. userAllStorages := make(map[cdssdk.StorageID]*StorageLoadInfo)
  78. for _, stg := range getStgs.Storages {
  79. userAllStorages[stg.Storage.StorageID] = &StorageLoadInfo{
  80. Storage: stg,
  81. }
  82. }
  83. for _, stat := range stats {
  84. info, ok := userAllStorages[stat.StorageID]
  85. if !ok {
  86. continue
  87. }
  88. info.AccessAmount = stat.Amount
  89. }
  90. var changedObjects []coormq.UpdatingObjectRedundancy
  91. defRep := cdssdk.DefaultRepRedundancy
  92. defEC := cdssdk.DefaultECRedundancy
  93. // TODO 目前rep的备份数量固定为2,所以这里直接选出两个节点
  94. // TODO 放到chooseRedundancy函数中
  95. mostBlockStgIDs := t.summaryRepObjectBlockStorages(getObjs.Objects, 2)
  96. newRepStgs := t.chooseNewStoragesForRep(&defRep, userAllStorages)
  97. rechoosedRepStgs := t.rechooseStoragesForRep(mostBlockStgIDs, &defRep, userAllStorages)
  98. newECStgs := t.chooseNewStoragesForEC(&defEC, userAllStorages)
  99. // 加锁
  100. builder := reqbuilder.NewBuilder()
  101. for _, storage := range newRepStgs {
  102. builder.Shard().Buzy(storage.Storage.Storage.StorageID)
  103. }
  104. for _, storage := range newECStgs {
  105. builder.Shard().Buzy(storage.Storage.Storage.StorageID)
  106. }
  107. mutex, err := builder.MutexLock(execCtx.Args.DistLock)
  108. if err != nil {
  109. log.Warnf("acquiring dist lock: %s", err.Error())
  110. return
  111. }
  112. defer mutex.Unlock()
  113. for _, obj := range getObjs.Objects {
  114. var updating *coormq.UpdatingObjectRedundancy
  115. var err error
  116. newRed, selectedStorages := t.chooseRedundancy(obj, userAllStorages)
  117. switch srcRed := obj.Object.Redundancy.(type) {
  118. case *cdssdk.NoneRedundancy:
  119. switch newRed := newRed.(type) {
  120. case *cdssdk.RepRedundancy:
  121. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep")
  122. updating, err = t.noneToRep(execCtx, obj, newRed, newRepStgs)
  123. case *cdssdk.ECRedundancy:
  124. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec")
  125. updating, err = t.noneToEC(execCtx, obj, newRed, newECStgs)
  126. case *cdssdk.LRCRedundancy:
  127. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> lrc")
  128. updating, err = t.noneToLRC(execCtx, obj, newRed, selectedStorages)
  129. }
  130. case *cdssdk.RepRedundancy:
  131. switch newRed := newRed.(type) {
  132. case *cdssdk.RepRedundancy:
  133. updating, err = t.repToRep(execCtx, obj, srcRed, rechoosedRepStgs)
  134. case *cdssdk.ECRedundancy:
  135. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec")
  136. updating, err = t.repToEC(execCtx, obj, newRed, newECStgs)
  137. }
  138. case *cdssdk.ECRedundancy:
  139. switch newRed := newRed.(type) {
  140. case *cdssdk.RepRedundancy:
  141. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep")
  142. updating, err = t.ecToRep(execCtx, obj, srcRed, newRed, newRepStgs)
  143. case *cdssdk.ECRedundancy:
  144. uploadStorages := t.rechooseStoragesForEC(obj, srcRed, userAllStorages)
  145. updating, err = t.ecToEC(execCtx, obj, srcRed, newRed, uploadStorages)
  146. }
  147. case *cdssdk.LRCRedundancy:
  148. switch newRed := newRed.(type) {
  149. case *cdssdk.LRCRedundancy:
  150. uploadStorages := t.rechooseStoragesForLRC(obj, srcRed, userAllStorages)
  151. updating, err = t.lrcToLRC(execCtx, obj, srcRed, newRed, uploadStorages)
  152. }
  153. }
  154. if updating != nil {
  155. changedObjects = append(changedObjects, *updating)
  156. }
  157. if err != nil {
  158. log.WithField("ObjectID", obj.Object.ObjectID).Warnf("%s, its redundancy wont be changed", err.Error())
  159. }
  160. }
  161. if len(changedObjects) == 0 {
  162. return
  163. }
  164. _, err = coorCli.UpdateObjectRedundancy(coormq.ReqUpdateObjectRedundancy(changedObjects))
  165. if err != nil {
  166. log.Warnf("requesting to change object redundancy: %s", err.Error())
  167. return
  168. }
  169. }
  170. func (t *CheckPackageRedundancy) chooseRedundancy(obj stgmod.ObjectDetail, userAllStgs map[cdssdk.StorageID]*StorageLoadInfo) (cdssdk.Redundancy, []*StorageLoadInfo) {
  171. switch obj.Object.Redundancy.(type) {
  172. case *cdssdk.NoneRedundancy:
  173. newStgs := t.chooseNewStoragesForEC(&cdssdk.DefaultECRedundancy, userAllStgs)
  174. return &cdssdk.DefaultECRedundancy, newStgs
  175. // newLRCStorages := t.chooseNewStoragesForLRC(&cdssdk.DefaultLRCRedundancy, userAllStorages)
  176. // return &cdssdk.DefaultLRCRedundancy, newLRCStorages
  177. case *cdssdk.LRCRedundancy:
  178. newLRCStgs := t.rechooseStoragesForLRC(obj, &cdssdk.DefaultLRCRedundancy, userAllStgs)
  179. return &cdssdk.DefaultLRCRedundancy, newLRCStgs
  180. }
  181. return nil, nil
  182. }
  183. // 统计每个对象块所在的节点,选出块最多的不超过storageCnt个节点
  184. func (t *CheckPackageRedundancy) summaryRepObjectBlockStorages(objs []stgmod.ObjectDetail, storageCnt int) []cdssdk.StorageID {
  185. type stgBlocks struct {
  186. StorageID cdssdk.StorageID
  187. Count int
  188. }
  189. stgBlocksMap := make(map[cdssdk.StorageID]*stgBlocks)
  190. for _, obj := range objs {
  191. shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold
  192. if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok && !shouldUseEC {
  193. for _, block := range obj.Blocks {
  194. if _, ok := stgBlocksMap[block.StorageID]; !ok {
  195. stgBlocksMap[block.StorageID] = &stgBlocks{
  196. StorageID: block.StorageID,
  197. Count: 0,
  198. }
  199. }
  200. stgBlocksMap[block.StorageID].Count++
  201. }
  202. }
  203. }
  204. storages := lo.Values(stgBlocksMap)
  205. sort2.Sort(storages, func(left *stgBlocks, right *stgBlocks) int {
  206. return right.Count - left.Count
  207. })
  208. ids := lo.Map(storages, func(item *stgBlocks, idx int) cdssdk.StorageID { return item.StorageID })
  209. if len(ids) > storageCnt {
  210. ids = ids[:storageCnt]
  211. }
  212. return ids
  213. }
  214. func (t *CheckPackageRedundancy) chooseNewStoragesForRep(red *cdssdk.RepRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  215. sortedStorages := sort2.Sort(lo.Values(allStgs), func(left *StorageLoadInfo, right *StorageLoadInfo) int {
  216. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  217. })
  218. return t.chooseSoManyStorages(red.RepCount, sortedStorages)
  219. }
  220. func (t *CheckPackageRedundancy) chooseNewStoragesForEC(red *cdssdk.ECRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  221. sortedStorages := sort2.Sort(lo.Values(allStgs), func(left *StorageLoadInfo, right *StorageLoadInfo) int {
  222. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  223. })
  224. return t.chooseSoManyStorages(red.N, sortedStorages)
  225. }
  226. func (t *CheckPackageRedundancy) chooseNewStoragesForLRC(red *cdssdk.LRCRedundancy, allStorages map[cdssdk.HubID]*StorageLoadInfo) []*StorageLoadInfo {
  227. sortedStorages := sort2.Sort(lo.Values(allStorages), func(left *StorageLoadInfo, right *StorageLoadInfo) int {
  228. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  229. })
  230. return t.chooseSoManyStorages(red.N, sortedStorages)
  231. }
  232. func (t *CheckPackageRedundancy) rechooseStoragesForRep(mostBlockStgIDs []cdssdk.StorageID, red *cdssdk.RepRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  233. type rechooseStorage struct {
  234. *StorageLoadInfo
  235. HasBlock bool
  236. }
  237. var rechooseStgs []*rechooseStorage
  238. for _, stg := range allStgs {
  239. hasBlock := false
  240. for _, id := range mostBlockStgIDs {
  241. if id == stg.Storage.Storage.StorageID {
  242. hasBlock = true
  243. break
  244. }
  245. }
  246. rechooseStgs = append(rechooseStgs, &rechooseStorage{
  247. StorageLoadInfo: stg,
  248. HasBlock: hasBlock,
  249. })
  250. }
  251. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStorage, right *rechooseStorage) int {
  252. // 已经缓存了文件块的节点优先选择
  253. v := sort2.CmpBool(right.HasBlock, left.HasBlock)
  254. if v != 0 {
  255. return v
  256. }
  257. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  258. })
  259. return t.chooseSoManyStorages(red.RepCount, lo.Map(sortedStgs, func(storage *rechooseStorage, idx int) *StorageLoadInfo { return storage.StorageLoadInfo }))
  260. }
  261. func (t *CheckPackageRedundancy) rechooseStoragesForEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  262. type rechooseStg struct {
  263. *StorageLoadInfo
  264. CachedBlockIndex int
  265. }
  266. var rechooseStgs []*rechooseStg
  267. for _, stg := range allStgs {
  268. cachedBlockIndex := -1
  269. for _, block := range obj.Blocks {
  270. if block.StorageID == stg.Storage.Storage.StorageID {
  271. cachedBlockIndex = block.Index
  272. break
  273. }
  274. }
  275. rechooseStgs = append(rechooseStgs, &rechooseStg{
  276. StorageLoadInfo: stg,
  277. CachedBlockIndex: cachedBlockIndex,
  278. })
  279. }
  280. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStg, right *rechooseStg) int {
  281. // 已经缓存了文件块的节点优先选择
  282. v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
  283. if v != 0 {
  284. return v
  285. }
  286. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  287. })
  288. // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
  289. return t.chooseSoManyStorages(red.N, lo.Map(sortedStgs, func(storage *rechooseStg, idx int) *StorageLoadInfo { return storage.StorageLoadInfo }))
  290. }
  291. func (t *CheckPackageRedundancy) rechooseStoragesForLRC(obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, allStgs map[cdssdk.StorageID]*StorageLoadInfo) []*StorageLoadInfo {
  292. type rechooseStg struct {
  293. *StorageLoadInfo
  294. CachedBlockIndex int
  295. }
  296. var rechooseStgs []*rechooseStg
  297. for _, stg := range allStgs {
  298. cachedBlockIndex := -1
  299. for _, block := range obj.Blocks {
  300. if block.StorageID == stg.Storage.Storage.StorageID {
  301. cachedBlockIndex = block.Index
  302. break
  303. }
  304. }
  305. rechooseStgs = append(rechooseStgs, &rechooseStg{
  306. StorageLoadInfo: stg,
  307. CachedBlockIndex: cachedBlockIndex,
  308. })
  309. }
  310. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStg, right *rechooseStg) int {
  311. // 已经缓存了文件块的节点优先选择
  312. v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
  313. if v != 0 {
  314. return v
  315. }
  316. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  317. })
  318. // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
  319. return t.chooseSoManyStorages(red.N, lo.Map(sortedStgs, func(storage *rechooseStg, idx int) *StorageLoadInfo { return storage.StorageLoadInfo }))
  320. }
  321. func (t *CheckPackageRedundancy) chooseSoManyStorages(count int, stgs []*StorageLoadInfo) []*StorageLoadInfo {
  322. repeateCount := (count + len(stgs) - 1) / len(stgs)
  323. extendStgs := make([]*StorageLoadInfo, repeateCount*len(stgs))
  324. // 使用复制的方式将节点数扩充到要求的数量
  325. // 复制之后的结构:ABCD -> AAABBBCCCDDD
  326. for p := 0; p < repeateCount; p++ {
  327. for i, storage := range stgs {
  328. putIdx := i*repeateCount + p
  329. extendStgs[putIdx] = storage
  330. }
  331. }
  332. extendStgs = extendStgs[:count]
  333. var chosen []*StorageLoadInfo
  334. for len(chosen) < count {
  335. // 在每一轮内都选不同地区的节点,如果节点数不够,那么就再来一轮
  336. chosenLocations := make(map[cdssdk.LocationID]bool)
  337. for i, stg := range extendStgs {
  338. if stg == nil {
  339. continue
  340. }
  341. if chosenLocations[stg.Storage.MasterHub.LocationID] {
  342. continue
  343. }
  344. chosen = append(chosen, stg)
  345. chosenLocations[stg.Storage.MasterHub.LocationID] = true
  346. extendStgs[i] = nil
  347. }
  348. }
  349. return chosen
  350. }
  351. func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  352. if len(obj.Blocks) == 0 {
  353. return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to rep")
  354. }
  355. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  356. if err != nil {
  357. return nil, fmt.Errorf("new coordinator client: %w", err)
  358. }
  359. defer stgglb.CoordinatorMQPool.Release(coorCli)
  360. getStgs, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{obj.Blocks[0].StorageID}))
  361. if err != nil {
  362. return nil, fmt.Errorf("requesting to get storages: %w", err)
  363. }
  364. if getStgs.Storages[0] == nil {
  365. return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID)
  366. }
  367. if getStgs.Storages[0].MasterHub == nil {
  368. return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID)
  369. }
  370. // 如果选择的备份节点都是同一个,那么就只要上传一次
  371. uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID })
  372. ft := ioswitch2.NewFromTo()
  373. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *getStgs.Storages[0].MasterHub, getStgs.Storages[0].Storage, ioswitch2.RawStream()))
  374. for i, stg := range uploadStgs {
  375. ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
  376. }
  377. plans := exec.NewPlanBuilder()
  378. err = parser.Parse(ft, plans, cdssdk.DefaultECRedundancy)
  379. if err != nil {
  380. return nil, fmt.Errorf("parsing plan: %w", err)
  381. }
  382. // TODO 添加依赖
  383. execCtx := exec.NewExecContext()
  384. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  385. ret, err := plans.Execute(execCtx).Wait(context.Background())
  386. if err != nil {
  387. return nil, fmt.Errorf("executing io plan: %w", err)
  388. }
  389. var blocks []stgmod.ObjectBlock
  390. for i, stg := range uploadStgs {
  391. blocks = append(blocks, stgmod.ObjectBlock{
  392. ObjectID: obj.Object.ObjectID,
  393. Index: 0,
  394. StorageID: stg.Storage.Storage.StorageID,
  395. FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  396. })
  397. }
  398. return &coormq.UpdatingObjectRedundancy{
  399. ObjectID: obj.Object.ObjectID,
  400. Redundancy: red,
  401. Blocks: blocks,
  402. }, nil
  403. }
  404. func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  405. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  406. if err != nil {
  407. return nil, fmt.Errorf("new coordinator client: %w", err)
  408. }
  409. defer stgglb.CoordinatorMQPool.Release(coorCli)
  410. if len(obj.Blocks) == 0 {
  411. return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to ec")
  412. }
  413. getStgs, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{obj.Blocks[0].StorageID}))
  414. if err != nil {
  415. return nil, fmt.Errorf("requesting to get storages: %w", err)
  416. }
  417. if getStgs.Storages[0] == nil {
  418. return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID)
  419. }
  420. if getStgs.Storages[0].MasterHub == nil {
  421. return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID)
  422. }
  423. ft := ioswitch2.NewFromTo()
  424. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *getStgs.Storages[0].MasterHub, getStgs.Storages[0].Storage, ioswitch2.RawStream()))
  425. for i := 0; i < red.N; i++ {
  426. ft.AddTo(ioswitch2.NewToShardStore(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, ioswitch2.ECSrteam(i), fmt.Sprintf("%d", i)))
  427. }
  428. plans := exec.NewPlanBuilder()
  429. err = parser.Parse(ft, plans, *red)
  430. if err != nil {
  431. return nil, fmt.Errorf("parsing plan: %w", err)
  432. }
  433. execCtx := exec.NewExecContext()
  434. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  435. ioRet, err := plans.Execute(execCtx).Wait(context.Background())
  436. if err != nil {
  437. return nil, fmt.Errorf("executing io plan: %w", err)
  438. }
  439. var blocks []stgmod.ObjectBlock
  440. for i := 0; i < red.N; i++ {
  441. blocks = append(blocks, stgmod.ObjectBlock{
  442. ObjectID: obj.Object.ObjectID,
  443. Index: i,
  444. StorageID: uploadStgs[i].Storage.Storage.StorageID,
  445. FileHash: ioRet[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  446. })
  447. }
  448. return &coormq.UpdatingObjectRedundancy{
  449. ObjectID: obj.Object.ObjectID,
  450. Redundancy: red,
  451. Blocks: blocks,
  452. }, nil
  453. }
  454. func (t *CheckPackageRedundancy) noneToLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, uploadStorages []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  455. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  456. if err != nil {
  457. return nil, fmt.Errorf("new coordinator client: %w", err)
  458. }
  459. defer stgglb.CoordinatorMQPool.Release(coorCli)
  460. if len(obj.Blocks) == 0 {
  461. return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to ec")
  462. }
  463. getStgs, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{obj.Blocks[0].StorageID}))
  464. if err != nil {
  465. return nil, fmt.Errorf("requesting to get storages: %w", err)
  466. }
  467. if getStgs.Storages[0] == nil {
  468. return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID)
  469. }
  470. if getStgs.Storages[0].MasterHub == nil {
  471. return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID)
  472. }
  473. var toes []ioswitchlrc.To
  474. for i := 0; i < red.N; i++ {
  475. toes = append(toes, ioswitchlrc.NewToStorage(*uploadStorages[i].Storage.MasterHub, uploadStorages[i].Storage.Storage, i, fmt.Sprintf("%d", i)))
  476. }
  477. plans := exec.NewPlanBuilder()
  478. err = lrcparser.Encode(ioswitchlrc.NewFromStorage(obj.Object.FileHash, *getStgs.Storages[0].MasterHub, getStgs.Storages[0].Storage, -1), toes, plans)
  479. if err != nil {
  480. return nil, fmt.Errorf("parsing plan: %w", err)
  481. }
  482. execCtx := exec.NewExecContext()
  483. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  484. ioRet, err := plans.Execute(execCtx).Wait(context.Background())
  485. if err != nil {
  486. return nil, fmt.Errorf("executing io plan: %w", err)
  487. }
  488. var blocks []stgmod.ObjectBlock
  489. for i := 0; i < red.N; i++ {
  490. blocks = append(blocks, stgmod.ObjectBlock{
  491. ObjectID: obj.Object.ObjectID,
  492. Index: i,
  493. StorageID: uploadStorages[i].Storage.Storage.StorageID,
  494. FileHash: ioRet[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  495. })
  496. }
  497. return &coormq.UpdatingObjectRedundancy{
  498. ObjectID: obj.Object.ObjectID,
  499. Redundancy: red,
  500. Blocks: blocks,
  501. }, nil
  502. }
  503. func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  504. if len(obj.Blocks) == 0 {
  505. return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to rep")
  506. }
  507. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  508. if err != nil {
  509. return nil, fmt.Errorf("new coordinator client: %w", err)
  510. }
  511. defer stgglb.CoordinatorMQPool.Release(coorCli)
  512. getStgs, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{obj.Blocks[0].StorageID}))
  513. if err != nil {
  514. return nil, fmt.Errorf("requesting to get storages: %w", err)
  515. }
  516. if getStgs.Storages[0] == nil {
  517. return nil, fmt.Errorf("storage %v not found", obj.Blocks[0].StorageID)
  518. }
  519. if getStgs.Storages[0].MasterHub == nil {
  520. return nil, fmt.Errorf("storage %v has no master hub", obj.Blocks[0].StorageID)
  521. }
  522. // 如果选择的备份节点都是同一个,那么就只要上传一次
  523. uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID })
  524. ft := ioswitch2.NewFromTo()
  525. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *getStgs.Storages[0].MasterHub, getStgs.Storages[0].Storage, ioswitch2.RawStream()))
  526. for i, stg := range uploadStgs {
  527. ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
  528. }
  529. plans := exec.NewPlanBuilder()
  530. err = parser.Parse(ft, plans, cdssdk.DefaultECRedundancy)
  531. if err != nil {
  532. return nil, fmt.Errorf("parsing plan: %w", err)
  533. }
  534. // TODO 添加依赖
  535. execCtx := exec.NewExecContext()
  536. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  537. ret, err := plans.Execute(execCtx).Wait(context.Background())
  538. if err != nil {
  539. return nil, fmt.Errorf("executing io plan: %w", err)
  540. }
  541. var blocks []stgmod.ObjectBlock
  542. for i, stg := range uploadStgs {
  543. blocks = append(blocks, stgmod.ObjectBlock{
  544. ObjectID: obj.Object.ObjectID,
  545. Index: 0,
  546. StorageID: stg.Storage.Storage.StorageID,
  547. FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  548. })
  549. }
  550. return &coormq.UpdatingObjectRedundancy{
  551. ObjectID: obj.Object.ObjectID,
  552. Redundancy: red,
  553. Blocks: blocks,
  554. }, nil
  555. }
  556. func (t *CheckPackageRedundancy) repToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadStorages []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  557. return t.noneToEC(ctx, obj, red, uploadStorages)
  558. }
  559. func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  560. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  561. if err != nil {
  562. return nil, fmt.Errorf("new coordinator client: %w", err)
  563. }
  564. defer stgglb.CoordinatorMQPool.Release(coorCli)
  565. var chosenBlocks []stgmod.GrouppedObjectBlock
  566. var chosenBlockIndexes []int
  567. for _, block := range obj.GroupBlocks() {
  568. if len(block.StorageIDs) > 0 {
  569. chosenBlocks = append(chosenBlocks, block)
  570. chosenBlockIndexes = append(chosenBlockIndexes, block.Index)
  571. }
  572. if len(chosenBlocks) == srcRed.K {
  573. break
  574. }
  575. }
  576. if len(chosenBlocks) < srcRed.K {
  577. return nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  578. }
  579. // 如果选择的备份节点都是同一个,那么就只要上传一次
  580. uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID })
  581. // 每个被选节点都在自己节点上重建原始数据
  582. planBlder := exec.NewPlanBuilder()
  583. for i := range uploadStgs {
  584. ft := ioswitch2.NewFromTo()
  585. for _, block := range chosenBlocks {
  586. ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, ioswitch2.ECSrteam(block.Index)))
  587. }
  588. len := obj.Object.Size
  589. ft.AddTo(ioswitch2.NewToShardStoreWithRange(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i), exec.Range{
  590. Offset: 0,
  591. Length: &len,
  592. }))
  593. err := parser.Parse(ft, planBlder, *srcRed)
  594. if err != nil {
  595. return nil, fmt.Errorf("parsing plan: %w", err)
  596. }
  597. }
  598. // TODO 添加依赖
  599. execCtx := exec.NewExecContext()
  600. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  601. ioRet, err := planBlder.Execute(execCtx).Wait(context.Background())
  602. if err != nil {
  603. return nil, fmt.Errorf("executing io plan: %w", err)
  604. }
  605. var blocks []stgmod.ObjectBlock
  606. for i := range uploadStgs {
  607. blocks = append(blocks, stgmod.ObjectBlock{
  608. ObjectID: obj.Object.ObjectID,
  609. Index: 0,
  610. StorageID: uploadStgs[i].Storage.Storage.StorageID,
  611. FileHash: ioRet[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  612. })
  613. }
  614. return &coormq.UpdatingObjectRedundancy{
  615. ObjectID: obj.Object.ObjectID,
  616. Redundancy: tarRed,
  617. Blocks: blocks,
  618. }, nil
  619. }
  620. func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadStorages []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  621. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  622. if err != nil {
  623. return nil, fmt.Errorf("new coordinator client: %w", err)
  624. }
  625. defer stgglb.CoordinatorMQPool.Release(coorCli)
  626. grpBlocks := obj.GroupBlocks()
  627. var chosenBlocks []stgmod.GrouppedObjectBlock
  628. for _, block := range grpBlocks {
  629. if len(block.StorageIDs) > 0 {
  630. chosenBlocks = append(chosenBlocks, block)
  631. }
  632. if len(chosenBlocks) == srcRed.K {
  633. break
  634. }
  635. }
  636. if len(chosenBlocks) < srcRed.K {
  637. return nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  638. }
  639. // 目前EC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
  640. planBlder := exec.NewPlanBuilder()
  641. var newBlocks []stgmod.ObjectBlock
  642. shouldUpdateBlocks := false
  643. for i, stg := range uploadStorages {
  644. newBlock := stgmod.ObjectBlock{
  645. ObjectID: obj.Object.ObjectID,
  646. Index: i,
  647. StorageID: stg.Storage.Storage.StorageID,
  648. }
  649. grp, ok := lo.Find(grpBlocks, func(grp stgmod.GrouppedObjectBlock) bool { return grp.Index == i })
  650. // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
  651. if ok && lo.Contains(grp.StorageIDs, stg.Storage.Storage.StorageID) {
  652. newBlock.FileHash = grp.FileHash
  653. newBlocks = append(newBlocks, newBlock)
  654. continue
  655. }
  656. shouldUpdateBlocks = true
  657. // 否则就要重建出这个节点需要的块
  658. ft := ioswitch2.NewFromTo()
  659. for _, block := range chosenBlocks {
  660. stg := stg.Storage
  661. ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *stg.MasterHub, stg.Storage, ioswitch2.ECSrteam(block.Index)))
  662. }
  663. // 输出只需要自己要保存的那一块
  664. ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage.Storage, ioswitch2.ECSrteam(i), fmt.Sprintf("%d", i)))
  665. err := parser.Parse(ft, planBlder, *srcRed)
  666. if err != nil {
  667. return nil, fmt.Errorf("parsing plan: %w", err)
  668. }
  669. newBlocks = append(newBlocks, newBlock)
  670. }
  671. // 如果没有任何Plan,Wait会直接返回成功
  672. execCtx := exec.NewExecContext()
  673. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  674. ret, err := planBlder.Execute(execCtx).Wait(context.Background())
  675. if err != nil {
  676. return nil, fmt.Errorf("executing io plan: %w", err)
  677. }
  678. if !shouldUpdateBlocks {
  679. return nil, nil
  680. }
  681. for k, v := range ret {
  682. idx, err := strconv.ParseInt(k, 10, 64)
  683. if err != nil {
  684. return nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
  685. }
  686. newBlocks[idx].FileHash = v.(*ops2.FileHashValue).Hash
  687. }
  688. return &coormq.UpdatingObjectRedundancy{
  689. ObjectID: obj.Object.ObjectID,
  690. Redundancy: tarRed,
  691. Blocks: newBlocks,
  692. }, nil
  693. }
  694. func (t *CheckPackageRedundancy) lrcToLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.LRCRedundancy, tarRed *cdssdk.LRCRedundancy, uploadStorages []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  695. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  696. if err != nil {
  697. return nil, fmt.Errorf("new coordinator client: %w", err)
  698. }
  699. defer stgglb.CoordinatorMQPool.Release(coorCli)
  700. blocksGrpByIndex := obj.GroupBlocks()
  701. var lostBlocks []int
  702. var lostBlockGrps []int
  703. canGroupReconstruct := true
  704. allBlockFlags := make([]bool, srcRed.N)
  705. for _, block := range blocksGrpByIndex {
  706. allBlockFlags[block.Index] = true
  707. }
  708. for i, ok := range allBlockFlags {
  709. grpID := srcRed.FindGroup(i)
  710. if !ok {
  711. if grpID == -1 {
  712. canGroupReconstruct = false
  713. break
  714. }
  715. if len(lostBlocks) > 0 && lostBlockGrps[len(lostBlockGrps)-1] == grpID {
  716. canGroupReconstruct = false
  717. break
  718. }
  719. lostBlocks = append(lostBlocks, i)
  720. lostBlockGrps = append(lostBlockGrps, grpID)
  721. }
  722. }
  723. if canGroupReconstruct {
  724. // return t.groupReconstructLRC(obj, lostBlocks, lostBlockGrps, blocksGrpByIndex, srcRed, uploadStorages)
  725. }
  726. return t.reconstructLRC(ctx, obj, blocksGrpByIndex, srcRed, uploadStorages)
  727. }
  728. /*
  729. TODO2 修复这一块的代码
  730. func (t *CheckPackageRedundancy) groupReconstructLRC(obj stgmod.ObjectDetail, lostBlocks []int, lostBlockGrps []int, grpedBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadStorages []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  731. grped := make(map[int]stgmod.GrouppedObjectBlock)
  732. for _, b := range grpedBlocks {
  733. grped[b.Index] = b
  734. }
  735. plans := exec.NewPlanBuilder()
  736. for i := 0; i < len(lostBlocks); i++ {
  737. var froms []ioswitchlrc.From
  738. grpEles := red.GetGroupElements(lostBlockGrps[i])
  739. for _, ele := range grpEles {
  740. if ele == lostBlocks[i] {
  741. continue
  742. }
  743. froms = append(froms, ioswitchlrc.NewFromStorage(grped[ele].FileHash, nil, ele))
  744. }
  745. err := lrcparser.ReconstructGroup(froms, []ioswitchlrc.To{
  746. ioswitchlrc.NewToStorage(uploadStorages[i].Storage, lostBlocks[i], fmt.Sprintf("%d", lostBlocks[i])),
  747. }, plans)
  748. if err != nil {
  749. return nil, fmt.Errorf("parsing plan: %w", err)
  750. }
  751. }
  752. fmt.Printf("plans: %v\n", plans)
  753. // 如果没有任何Plan,Wait会直接返回成功
  754. // TODO 添加依赖
  755. ret, err := plans.Execute(exec.NewExecContext()).Wait(context.TODO())
  756. if err != nil {
  757. return nil, fmt.Errorf("executing io plan: %w", err)
  758. }
  759. var newBlocks []stgmod.ObjectBlock
  760. for _, i := range lostBlocks {
  761. newBlocks = append(newBlocks, stgmod.ObjectBlock{
  762. ObjectID: obj.Object.ObjectID,
  763. Index: i,
  764. StorageID: uploadStorages[i].Storage.Storage.StorageID,
  765. FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  766. })
  767. }
  768. for _, b := range grpedBlocks {
  769. for _, hubID := range b.StorageIDs {
  770. newBlocks = append(newBlocks, stgmod.ObjectBlock{
  771. ObjectID: obj.Object.ObjectID,
  772. Index: b.Index,
  773. StorageID: hubID,
  774. FileHash: b.FileHash,
  775. })
  776. }
  777. }
  778. return &coormq.UpdatingObjectRedundancy{
  779. ObjectID: obj.Object.ObjectID,
  780. Redundancy: red,
  781. Blocks: newBlocks,
  782. }, nil
  783. }
  784. */
  785. func (t *CheckPackageRedundancy) reconstructLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, grpBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadStorages []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) {
  786. var chosenBlocks []stgmod.GrouppedObjectBlock
  787. for _, block := range grpBlocks {
  788. if len(block.StorageIDs) > 0 && block.Index < red.M() {
  789. chosenBlocks = append(chosenBlocks, block)
  790. }
  791. if len(chosenBlocks) == red.K {
  792. break
  793. }
  794. }
  795. if len(chosenBlocks) < red.K {
  796. return nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  797. }
  798. // 目前LRC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
  799. planBlder := exec.NewPlanBuilder()
  800. var froms []ioswitchlrc.From
  801. var toes []ioswitchlrc.To
  802. var newBlocks []stgmod.ObjectBlock
  803. shouldUpdateBlocks := false
  804. for i, storage := range uploadStorages {
  805. newBlock := stgmod.ObjectBlock{
  806. ObjectID: obj.Object.ObjectID,
  807. Index: i,
  808. StorageID: storage.Storage.Storage.StorageID,
  809. }
  810. grp, ok := lo.Find(grpBlocks, func(grp stgmod.GrouppedObjectBlock) bool { return grp.Index == i })
  811. // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
  812. if ok && lo.Contains(grp.StorageIDs, storage.Storage.Storage.StorageID) {
  813. newBlock.FileHash = grp.FileHash
  814. newBlocks = append(newBlocks, newBlock)
  815. continue
  816. }
  817. shouldUpdateBlocks = true
  818. // 否则就要重建出这个节点需要的块
  819. for _, block := range chosenBlocks {
  820. fmt.Printf("b: %v\n", block.Index)
  821. stg := storage.Storage
  822. froms = append(froms, ioswitchlrc.NewFromStorage(block.FileHash, *stg.MasterHub, stg.Storage, block.Index))
  823. }
  824. // 输出只需要自己要保存的那一块
  825. toes = append(toes, ioswitchlrc.NewToStorage(*storage.Storage.MasterHub, storage.Storage.Storage, i, fmt.Sprintf("%d", i)))
  826. newBlocks = append(newBlocks, newBlock)
  827. }
  828. err := lrcparser.ReconstructAny(froms, toes, planBlder)
  829. if err != nil {
  830. return nil, fmt.Errorf("parsing plan: %w", err)
  831. }
  832. fmt.Printf("plans: %v\n", planBlder)
  833. // 如果没有任何Plan,Wait会直接返回成功
  834. execCtx := exec.NewExecContext()
  835. exec.SetValueByType(execCtx, ctx.Args.StgMgr)
  836. ret, err := planBlder.Execute(execCtx).Wait(context.Background())
  837. if err != nil {
  838. return nil, fmt.Errorf("executing io plan: %w", err)
  839. }
  840. if !shouldUpdateBlocks {
  841. return nil, nil
  842. }
  843. for k, v := range ret {
  844. idx, err := strconv.ParseInt(k, 10, 64)
  845. if err != nil {
  846. return nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
  847. }
  848. newBlocks[idx].FileHash = v.(*ops2.FileHashValue).Hash
  849. }
  850. return &coormq.UpdatingObjectRedundancy{
  851. ObjectID: obj.Object.ObjectID,
  852. Redundancy: red,
  853. Blocks: newBlocks,
  854. }, nil
  855. }
  856. // func (t *CheckPackageRedundancy) pinObject(hubID cdssdk.HubID, fileHash string) error {
  857. // agtCli, err := stgglb.AgentMQPool.Acquire(hubID)
  858. // if err != nil {
  859. // return fmt.Errorf("new agent client: %w", err)
  860. // }
  861. // defer stgglb.AgentMQPool.Release(agtCli)
  862. // _, err = agtCli.PinObject(agtmq.ReqPinObject([]string{fileHash}, false))
  863. // if err != nil {
  864. // return fmt.Errorf("start pinning object: %w", err)
  865. // }
  866. // return nil
  867. // }
  868. func init() {
  869. RegisterMessageConvertor(NewCheckPackageRedundancy)
  870. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。