You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

passes.go 7.5 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. package parser
  2. import (
  3. "fmt"
  4. "math"
  5. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
  6. "gitlink.org.cn/cloudream/common/utils/math2"
  7. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc"
  8. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc/ops2"
  9. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
  10. cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
  11. )
  12. // 计算输入流的打开范围。会把流的范围按条带大小取整
  13. func calcStreamRange(ctx *GenerateContext) {
  14. stripSize := int64(ctx.LRC.ChunkSize * ctx.LRC.K)
  15. rng := math2.Range{
  16. Offset: math.MaxInt64,
  17. }
  18. for _, to := range ctx.To {
  19. if to.GetDataIndex() == -1 {
  20. toRng := to.GetRange()
  21. rng.ExtendStart(math2.Floor(toRng.Offset, stripSize))
  22. if toRng.Length != nil {
  23. rng.ExtendEnd(math2.Ceil(toRng.Offset+*toRng.Length, stripSize))
  24. } else {
  25. rng.Length = nil
  26. }
  27. } else {
  28. toRng := to.GetRange()
  29. blkStartIndex := math2.FloorDiv(toRng.Offset, int64(ctx.LRC.ChunkSize))
  30. rng.ExtendStart(blkStartIndex * stripSize)
  31. if toRng.Length != nil {
  32. blkEndIndex := math2.CeilDiv(toRng.Offset+*toRng.Length, int64(ctx.LRC.ChunkSize))
  33. rng.ExtendEnd(blkEndIndex * stripSize)
  34. } else {
  35. rng.Length = nil
  36. }
  37. }
  38. }
  39. ctx.StreamRange = rng
  40. }
  41. func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, error) {
  42. var repRange math2.Range
  43. var blkRange math2.Range
  44. repRange.Offset = ctx.StreamRange.Offset
  45. blkRange.Offset = ctx.StreamRange.Offset / int64(ctx.LRC.ChunkSize*ctx.LRC.K) * int64(ctx.LRC.ChunkSize)
  46. if ctx.StreamRange.Length != nil {
  47. repRngLen := *ctx.StreamRange.Length
  48. repRange.Length = &repRngLen
  49. blkRngLen := *ctx.StreamRange.Length / int64(ctx.LRC.ChunkSize*ctx.LRC.K) * int64(ctx.LRC.ChunkSize)
  50. blkRange.Length = &blkRngLen
  51. }
  52. switch f := f.(type) {
  53. case *ioswitchlrc.FromNode:
  54. t := ctx.DAG.NewShardRead(f, f.Space, types.NewOpen(f.FileHash))
  55. if f.DataIndex == -1 {
  56. t.Open.WithNullableLength(repRange.Offset, repRange.Length)
  57. } else {
  58. t.Open.WithNullableLength(blkRange.Offset, blkRange.Length)
  59. }
  60. // TODO2 支持HTTP协议
  61. t.Env().ToEnvWorker(&ioswitchlrc.HubWorker{Hub: f.Space.RecommendHub, Address: *f.Space.RecommendHub.Address.(*cortypes.GRPCAddressInfo)})
  62. t.Env().Pinned = true
  63. return t, nil
  64. case *ioswitchlrc.FromDriver:
  65. n := ctx.DAG.NewFromDriver(f.Handle)
  66. n.Env().ToEnvDriver()
  67. n.Env().Pinned = true
  68. if f.DataIndex == -1 {
  69. f.Handle.RangeHint.Offset = repRange.Offset
  70. f.Handle.RangeHint.Length = repRange.Length
  71. } else {
  72. f.Handle.RangeHint.Offset = blkRange.Offset
  73. f.Handle.RangeHint.Length = blkRange.Length
  74. }
  75. return n, nil
  76. default:
  77. return nil, fmt.Errorf("unsupported from type %T", f)
  78. }
  79. }
  80. func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) {
  81. switch t := t.(type) {
  82. case *ioswitchlrc.ToNode:
  83. n := ctx.DAG.NewShardWrite(t, t.Space, t.FileHashStoreKey)
  84. switch addr := t.Space.RecommendHub.Address.(type) {
  85. // case *cdssdk.HttpAddressInfo:
  86. // n.Env().ToEnvWorker(&ioswitchlrc.HttpHubWorker{Node: t.Hub})
  87. // TODO2 支持HTTP协议
  88. case *cortypes.GRPCAddressInfo:
  89. n.Env().ToEnvWorker(&ioswitchlrc.HubWorker{Hub: t.Space.RecommendHub, Address: *addr})
  90. default:
  91. return nil, fmt.Errorf("unsupported node address type %T", addr)
  92. }
  93. n.Env().Pinned = true
  94. return n, nil
  95. case *ioswitchlrc.ToDriver:
  96. n := ctx.DAG.NewToDriver(t.Handle)
  97. n.Env().ToEnvDriver()
  98. n.Env().Pinned = true
  99. return n, nil
  100. default:
  101. return nil, fmt.Errorf("unsupported to type %T", t)
  102. }
  103. }
  104. // 通过流的输入输出位置来确定指令的执行位置。
  105. // To系列的指令都会有固定的执行位置,这些位置会随着pin操作逐步扩散到整个DAG,
  106. // 所以理论上不会出现有指令的位置始终无法确定的情况。
  107. func pin(ctx *GenerateContext) bool {
  108. changed := false
  109. ctx.DAG.Walk(func(node dag.Node) bool {
  110. if node.Env().Pinned {
  111. return true
  112. }
  113. var toEnv *dag.NodeEnv
  114. for _, out := range node.OutputStreams().Slots.RawArray() {
  115. for _, to := range out.Dst.RawArray() {
  116. if to.Env().Type == dag.EnvUnknown {
  117. continue
  118. }
  119. if toEnv == nil {
  120. toEnv = to.Env()
  121. } else if !toEnv.Equals(to.Env()) {
  122. toEnv = nil
  123. break
  124. }
  125. }
  126. }
  127. if toEnv != nil {
  128. if !node.Env().Equals(toEnv) {
  129. changed = true
  130. }
  131. *node.Env() = *toEnv
  132. return true
  133. }
  134. // 否则根据输入流的始发地来固定
  135. var fromEnv *dag.NodeEnv
  136. for _, in := range node.InputStreams().Slots.RawArray() {
  137. if in.Src.Env().Type == dag.EnvUnknown {
  138. continue
  139. }
  140. if fromEnv == nil {
  141. fromEnv = in.Src.Env()
  142. } else if !fromEnv.Equals(in.Src.Env()) {
  143. fromEnv = nil
  144. break
  145. }
  146. }
  147. if fromEnv != nil {
  148. if !node.Env().Equals(fromEnv) {
  149. changed = true
  150. }
  151. *node.Env() = *fromEnv
  152. }
  153. return true
  154. })
  155. return changed
  156. }
  157. // 对于所有未使用的流,增加Drop指令
  158. func dropUnused(ctx *GenerateContext) {
  159. ctx.DAG.Walk(func(node dag.Node) bool {
  160. for _, out := range node.OutputStreams().Slots.RawArray() {
  161. if out.Dst.Len() == 0 {
  162. n := ctx.DAG.NewDropStream()
  163. *n.Env() = *node.Env()
  164. n.SetInput(out)
  165. }
  166. }
  167. return true
  168. })
  169. }
  170. // 为IPFS写入指令存储结果
  171. func storeIPFSWriteResult(ctx *GenerateContext) {
  172. dag.WalkOnlyType[*ops2.ShardWriteNode](ctx.DAG.Graph, func(n *ops2.ShardWriteNode) bool {
  173. if n.FileHashStoreKey == "" {
  174. return true
  175. }
  176. storeNode := ctx.DAG.NewStore()
  177. storeNode.Env().ToEnvDriver()
  178. storeNode.Store(n.FileHashStoreKey, n.FileHashVar())
  179. return true
  180. })
  181. }
  182. // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回
  183. func generateRange(ctx *GenerateContext) {
  184. for i := 0; i < len(ctx.To); i++ {
  185. to := ctx.To[i]
  186. toNode := ctx.ToNodes[to]
  187. toDataIdx := to.GetDataIndex()
  188. toRng := to.GetRange()
  189. if toDataIdx == -1 {
  190. n := ctx.DAG.NewRange()
  191. toInput := toNode.Input()
  192. *n.Env() = *toInput.Var().Src.Env()
  193. rnged := n.RangeStream(toInput.Var(), math2.Range{
  194. Offset: toRng.Offset - ctx.StreamRange.Offset,
  195. Length: toRng.Length,
  196. })
  197. toInput.Var().NotTo(toNode)
  198. toNode.SetInput(rnged)
  199. } else {
  200. stripSize := int64(ctx.LRC.ChunkSize * ctx.LRC.K)
  201. blkStartIdx := ctx.StreamRange.Offset / stripSize
  202. blkStart := blkStartIdx * int64(ctx.LRC.ChunkSize)
  203. n := ctx.DAG.NewRange()
  204. toInput := toNode.Input()
  205. *n.Env() = *toInput.Var().Src.Env()
  206. rnged := n.RangeStream(toInput.Var(), math2.Range{
  207. Offset: toRng.Offset - blkStart,
  208. Length: toRng.Length,
  209. })
  210. toInput.Var().NotTo(toNode)
  211. toNode.SetInput(rnged)
  212. }
  213. }
  214. }
  215. // 生成Clone指令
  216. func generateClone(ctx *GenerateContext) {
  217. ctx.DAG.Walk(func(node dag.Node) bool {
  218. for _, outVar := range node.OutputStreams().Slots.RawArray() {
  219. if outVar.Dst.Len() <= 1 {
  220. continue
  221. }
  222. t := ctx.DAG.NewCloneStream()
  223. *t.Env() = *node.Env()
  224. for _, to := range outVar.Dst.RawArray() {
  225. t.NewOutput().To(to, to.InputStreams().IndexOf(outVar))
  226. }
  227. outVar.Dst.Resize(0)
  228. t.SetInput(outVar)
  229. }
  230. for _, outVar := range node.OutputValues().Slots.RawArray() {
  231. if outVar.Dst.Len() <= 1 {
  232. continue
  233. }
  234. t := ctx.DAG.NewCloneValue()
  235. *t.Env() = *node.Env()
  236. for _, to := range outVar.Dst.RawArray() {
  237. t.NewOutput().To(to, to.InputValues().IndexOf(outVar))
  238. }
  239. outVar.Dst.Resize(0)
  240. t.SetInput(outVar)
  241. }
  242. return true
  243. })
  244. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。