|
- package opt
-
- import (
- "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
- "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2"
- "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/state"
- "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory"
- )
-
- // 将直接从一个存储服务传到另一个存储服务的过程换成S2S传输
- func UseS2STransfer(ctx *state.GenerateState) {
- // S2S传输暂不支持只传文件的一部分
- if ctx.StreamRange.Offset != 0 || ctx.StreamRange.Length != nil {
- return
- }
-
- for fr, frNode := range ctx.FromNodes {
- switch fr := fr.(type) {
- case *ioswitch2.FromShardStore:
- s2sFromShardStore(ctx, fr, frNode)
- case *ioswitch2.FromBaseStore:
- s2sFromBaseStore(ctx, fr, frNode)
- }
- }
- }
-
- func s2sFromShardStore(ctx *state.GenerateState, fromShard *ioswitch2.FromShardStore, frNode ops2.FromNode) {
- fromStgBld := factory.GetBuilder(&fromShard.UserSpace)
- s2s, err := fromStgBld.CreateS2STransfer(true)
- if err != nil {
- return
- }
-
- // 此输出流的所有目的地都要能支持S2S传输
- outVar := frNode.Output().Var()
- if outVar.Dst.Len() == 0 {
- return
- }
-
- failed := false
- var toBases []*ops2.BaseWriteNode
-
- loop:
- for i := 0; i < outVar.Dst.Len(); i++ {
- dstNode := outVar.Dst.Get(i)
-
- switch dstNode := dstNode.(type) {
- case *ops2.BaseWriteNode:
- if !s2s.CanTransfer(&fromShard.UserSpace, &dstNode.UserSpace) {
- failed = true
- break
- }
-
- toBases = append(toBases, dstNode)
-
- default:
- failed = true
- break loop
- }
- }
- if failed {
- return
- }
-
- for _, toBase := range toBases {
- s2sNode := ctx.DAG.NewS2STransferDyn(fromShard.UserSpace, toBase.UserSpace, toBase.Path)
- // 直传指令在目的地Hub上执行
- s2sNode.Env().CopyFrom(toBase.Env())
-
- // 先获取文件路径,送到S2S节点
- gsNode := ctx.DAG.NewGetShardInfo(fromShard.UserSpace, fromShard.FileHash)
- gsNode.Env().ToEnvDriver(true)
- gsNode.FileInfoVar().ToSlot(s2sNode.SrcFileInfoSlot())
-
- // 原本BaseWriteNode的FileInfoVar被替换成S2SNode的FileInfoVar
- for _, dstSlot := range toBase.FileInfoVar().ListDstSlots() {
- s2sNode.FileInfoVar().ToSlot(dstSlot)
- }
-
- // 从计划中删除目标节点
- ctx.DAG.RemoveNode(toBase)
- delete(ctx.ToNodes, toBase.To)
- }
-
- // 从计划中删除源节点
- ctx.DAG.RemoveNode(frNode)
- delete(ctx.FromNodes, frNode.GetFrom())
- }
-
- func s2sFromBaseStore(ctx *state.GenerateState, fromBase *ioswitch2.FromBaseStore, frNode ops2.FromNode) {
- fromStgBld := factory.GetBuilder(&fromBase.UserSpace)
- s2s, err := fromStgBld.CreateS2STransfer(true)
- if err != nil {
- return
- }
-
- // 此输出流的所有目的地都要能支持S2S传输
- outVar := frNode.Output().Var()
- if outVar.Dst.Len() == 0 {
- return
- }
-
- failed := false
- var toBases []*ops2.BaseWriteNode
-
- loop:
- for i := 0; i < outVar.Dst.Len(); i++ {
- dstNode := outVar.Dst.Get(i)
-
- switch dstNode := dstNode.(type) {
- case *ops2.BaseWriteNode:
- if !s2s.CanTransfer(&fromBase.UserSpace, &dstNode.UserSpace) {
- failed = true
- break
- }
-
- toBases = append(toBases, dstNode)
-
- default:
- failed = true
- break loop
- }
- }
- if failed {
- return
- }
-
- for _, toBase := range toBases {
- s2sNode := ctx.DAG.NewS2STransfer(fromBase.UserSpace, fromBase.Path, toBase.UserSpace, toBase.Path)
- // 直传指令在目的地Hub上执行
- s2sNode.Env().CopyFrom(toBase.Env())
-
- // 原本BaseWriteNode的FileInfoVar被替换成S2SNode的FileInfoVar
- for _, dstSlot := range toBase.FileInfoVar().ListDstSlots() {
- s2sNode.FileInfoVar().ToSlot(dstSlot)
- }
-
- // 从计划中删除目标节点
- ctx.DAG.RemoveNode(toBase)
- delete(ctx.ToNodes, toBase.To)
- }
-
- // 从计划中删除源节点
- ctx.DAG.RemoveNode(frNode)
- delete(ctx.FromNodes, frNode.GetFrom())
- }
|