Browse Source

增加分片上传Op

gitlink
Sydonian 11 months ago
parent
commit
a5f34619ae
24 changed files with 440 additions and 202 deletions
  1. +111
    -0
      common/pkgs/ioswitch2/ops2/bypass.go
  2. +2
    -2
      common/pkgs/ioswitch2/ops2/clone.go
  3. +6
    -6
      common/pkgs/ioswitch2/ops2/driver.go
  4. +1
    -1
      common/pkgs/ioswitch2/ops2/ec.go
  5. +6
    -6
      common/pkgs/ioswitch2/ops2/file.go
  6. +180
    -105
      common/pkgs/ioswitch2/ops2/multipart.go
  7. +2
    -2
      common/pkgs/ioswitch2/ops2/ops.go
  8. +6
    -6
      common/pkgs/ioswitch2/ops2/shard_store.go
  9. +3
    -3
      common/pkgs/ioswitch2/ops2/shared_store.go
  10. +13
    -13
      common/pkgs/ioswitch2/parser/parser.go
  11. +2
    -2
      common/pkgs/ioswitchlrc/ops2/clone.go
  12. +1
    -1
      common/pkgs/ioswitchlrc/ops2/ec.go
  13. +2
    -2
      common/pkgs/ioswitchlrc/ops2/ops.go
  14. +6
    -6
      common/pkgs/ioswitchlrc/ops2/shard_store.go
  15. +6
    -6
      common/pkgs/ioswitchlrc/parser/generator.go
  16. +6
    -6
      common/pkgs/ioswitchlrc/parser/passes.go
  17. +10
    -11
      common/pkgs/storage/cos/multiPartUploader.go
  18. +7
    -6
      common/pkgs/storage/obs/multiPartUploader.go
  19. +9
    -8
      common/pkgs/storage/oss/multiPartUploader.go
  20. +12
    -0
      common/pkgs/storage/svcmgr/mgr.go
  21. +14
    -3
      common/pkgs/storage/types/bypass.go
  22. +20
    -5
      common/pkgs/storage/types/s3_client.go
  23. +2
    -2
      common/pkgs/storage/types/temp_store.go
  24. +13
    -0
      common/pkgs/storage/utils/utils.go

+ 111
- 0
common/pkgs/ioswitch2/ops2/bypass.go View File

@@ -0,0 +1,111 @@
package ops2

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func init() {
exec.UseOp[*BypassToShardStore]()
exec.UseVarValue[*BypassFileInfoValue]()
}

type BypassFileInfoValue struct {
types.BypassFileInfo
}

func (v *BypassFileInfoValue) Clone() exec.VarValue {
return &BypassFileInfoValue{
BypassFileInfo: v.BypassFileInfo,
}
}

type BypassHandleResultValue struct {
Commited bool
}

func (r *BypassHandleResultValue) Clone() exec.VarValue {
return &BypassHandleResultValue{
Commited: r.Commited,
}
}

type BypassToShardStore struct {
StorageID cdssdk.StorageID
BypassFileInfo exec.VarID
BypassCallback exec.VarID
}

func (o *BypassToShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
svcMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
if err != nil {
return err
}

shardStore, err := svcMgr.GetShardStore(o.StorageID)
if err != nil {
return err
}

notifier, ok := shardStore.(types.BypassNotifier)
if !ok {
return fmt.Errorf("shard store %v not support bypass", o.StorageID)
}

fileInfo, err := exec.BindVar[*BypassFileInfoValue](e, ctx.Context, o.BypassFileInfo)
if err != nil {
return err
}

err = notifier.BypassUploaded(fileInfo.BypassFileInfo)
if err != nil {
return err
}

e.PutVar(o.BypassCallback, &BypassHandleResultValue{Commited: true})
return nil
}

func (o *BypassToShardStore) String() string {
return fmt.Sprintf("BypassToShardStore[StorageID:%v] Info: %v, Callback: %v", o.StorageID, o.BypassFileInfo, o.BypassCallback)
}

type BypassToShardStoreNode struct {
dag.NodeBase
StorageID cdssdk.StorageID
}

func (b *GraphNodeBuilder) NewBypassToShardStore(storageID cdssdk.StorageID) *BypassToShardStoreNode {
node := &BypassToShardStoreNode{
StorageID: storageID,
}
b.AddNode(node)

node.InputValues().Init(1)
node.OutputValues().Init(node, 1)
return node
}

func (n *BypassToShardStoreNode) BypassFileInfoSlot() dag.ValueInputSlot {
return dag.ValueInputSlot{
Node: n,
Index: 0,
}
}

func (n *BypassToShardStoreNode) BypassCallbackVar() *dag.ValueVar {
return n.OutputValues().Get(0)
}

func (t *BypassToShardStoreNode) GenerateOp() (exec.Op, error) {
return &BypassToShardStore{
StorageID: t.StorageID,
BypassFileInfo: t.BypassFileInfoSlot().Var().VarID,
BypassCallback: t.BypassCallbackVar().VarID,
}, nil
}

+ 2
- 2
common/pkgs/ioswitch2/ops2/clone.go View File

@@ -89,7 +89,7 @@ func (t *CloneStreamType) SetInput(raw *dag.StreamVar) {
}

func (t *CloneStreamType) NewOutput() *dag.StreamVar {
return t.OutputStreams().AppendNew(t).Var
return t.OutputStreams().AppendNew(t).Var()
}

func (t *CloneStreamType) GenerateOp() (exec.Op, error) {
@@ -119,7 +119,7 @@ func (t *CloneVarType) SetInput(raw *dag.ValueVar) {
}

func (t *CloneVarType) NewOutput() *dag.ValueVar {
return t.OutputValues().AppendNew(t).Var
return t.OutputValues().AppendNew(t).Var()
}

func (t *CloneVarType) GenerateOp() (exec.Op, error) {


+ 6
- 6
common/pkgs/ioswitch2/ops2/driver.go View File

@@ -29,9 +29,9 @@ func (f *FromDriverNode) GetFrom() ioswitch2.From {
return f.From
}

func (t *FromDriverNode) Output() dag.StreamSlot {
return dag.StreamSlot{
Var: t.OutputStreams().Get(0),
func (t *FromDriverNode) Output() dag.StreamOutputSlot {
return dag.StreamOutputSlot{
Node: t,
Index: 0,
}
}
@@ -67,9 +67,9 @@ func (t *ToDriverNode) SetInput(v *dag.StreamVar) {
v.To(t, 0)
}

func (t *ToDriverNode) Input() dag.StreamSlot {
return dag.StreamSlot{
Var: t.InputStreams().Get(0),
func (t *ToDriverNode) Input() dag.StreamOutputSlot {
return dag.StreamOutputSlot{
Node: t,
Index: 0,
}
}


+ 1
- 1
common/pkgs/ioswitch2/ops2/ec.go View File

@@ -142,7 +142,7 @@ func (t *ECMultiplyNode) RemoveAllInputs() {

func (t *ECMultiplyNode) NewOutput(dataIndex int) *dag.StreamVar {
t.OutputIndexes = append(t.OutputIndexes, dataIndex)
return t.OutputStreams().AppendNew(t).Var
return t.OutputStreams().AppendNew(t).Var()
}

func (t *ECMultiplyNode) GenerateOp() (exec.Op, error) {


+ 6
- 6
common/pkgs/ioswitch2/ops2/file.go View File

@@ -93,9 +93,9 @@ func (b *GraphNodeBuilder) NewFileRead(filePath string) *FileReadNode {
return node
}

func (t *FileReadNode) Output() dag.StreamSlot {
return dag.StreamSlot{
Var: t.OutputStreams().Get(0),
func (t *FileReadNode) Output() dag.StreamOutputSlot {
return dag.StreamOutputSlot{
Node: t,
Index: 0,
}
}
@@ -126,9 +126,9 @@ func (b *GraphNodeBuilder) NewFileWrite(filePath string) *FileWriteNode {
return node
}

func (t *FileWriteNode) Input() dag.StreamSlot {
return dag.StreamSlot{
Var: t.InputStreams().Get(0),
func (t *FileWriteNode) Input() dag.StreamOutputSlot {
return dag.StreamOutputSlot{
Node: t,
Index: 0,
}
}


+ 180
- 105
common/pkgs/ioswitch2/ops2/multipart.go View File

@@ -1,190 +1,265 @@
package ops2

/*
import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/cos"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/obs"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/oss"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"io"
"time"
)

func init() {
exec.UseOp[*MultipartManage]()
exec.UseOp[*MultipartInitiator]()
exec.UseOp[*MultipartUpload]()
exec.UseVarValue[*InitUploadValue]()
exec.UseVarValue[*MultipartUploadArgsValue]()
exec.UseVarValue[*UploadedPartInfoValue]()
}

type InitUploadValue struct {
Key string `xml:"Key"` // Object name to upload
UploadID string `xml:"UploadId"` // Generated UploadId
type MultipartUploadArgsValue struct {
Key string
InitState types.MultipartInitState
}

func (v *InitUploadValue) Clone() exec.VarValue {
return &*v
func (v *MultipartUploadArgsValue) Clone() exec.VarValue {
return &MultipartUploadArgsValue{
InitState: v.InitState,
}
}

type MultipartManage struct {
Address cdssdk.StorageAddress `json:"address"`
UploadArgs exec.VarID `json:"uploadArgs"`
UploadOutput exec.VarID `json:"uploadOutput"`
StorageID cdssdk.StorageID `json:"storageID"`
type UploadedPartInfoValue struct {
types.UploadedPartInfo
}

func (o *MultipartManage) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
manager, err := exec.GetValueByType[*mgr.Manager](ctx)
if err != nil {
return err
func (v *UploadedPartInfoValue) Clone() exec.VarValue {
return &UploadedPartInfoValue{
UploadedPartInfo: v.UploadedPartInfo,
}
}

var client types.MultipartUploader
switch addr := o.Address.(type) {
case *cdssdk.OSSAddress:
client = oss.NewMultiPartUpload(addr)
case *cdssdk.OBSAddress:
client = obs.NewMultiPartUpload(addr)
case *cdssdk.COSAddress:
client = cos.NewMultiPartUpload(addr)
type MultipartInitiator struct {
StorageID cdssdk.StorageID
UploadArgs exec.VarID
UploadedParts []exec.VarID
BypassFileOutput exec.VarID // 分片上传之后的临时文件的路径
BypassCallback exec.VarID // 临时文件使用结果,用于告知Initiator如何处理临时文件
}

func (o *MultipartInitiator) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
if err != nil {
return err
}
defer client.Close()

tempStore, err := manager.GetTempStore(o.StorageID)
tempStore, err := svcmgr.GetComponent[types.TempStore](stgMgr, o.StorageID)
if err != nil {
return err
}
objName := tempStore.CreateTemp()
defer tempStore.Drop(objName)

uploadID, err := client.InitiateMultipartUpload(objName)
initiator, err := svcmgr.GetComponent[types.MultipartInitiator](stgMgr, o.StorageID)
if err != nil {
return err
}
e.PutVar(o.UploadArgs, &InitUploadValue{
UploadID: uploadID,
Key: objName,
})

parts, err := exec.BindVar[*UploadPartOutputValue](e, ctx.Context, o.UploadOutput)
// 启动一个新的上传任务
initState, err := initiator.Initiate(ctx.Context, objName)
if err != nil {
return err
}
err = client.CompleteMultipartUpload(uploadID, objName, parts.Parts)
// 分发上传参数
e.PutVar(o.UploadArgs, &MultipartUploadArgsValue{
Key: objName,
InitState: initState,
})

// 收集分片上传结果
partInfoValues, err := exec.BindArray[*UploadedPartInfoValue](e, ctx.Context, o.UploadedParts)
if err != nil {
return err
return fmt.Errorf("getting uploaded parts: %v", err)
}

return nil
}
partInfos := make([]types.UploadedPartInfo, len(partInfoValues))
for i, v := range partInfoValues {
partInfos[i] = v.UploadedPartInfo
}

func (o *MultipartManage) String() string {
return "MultipartManage"
}
// 完成分片上传
compInfo, err := initiator.Complete(ctx.Context, partInfos)
if err != nil {
return fmt.Errorf("completing multipart upload: %v", err)
}

type MultipartManageNode struct {
dag.NodeBase
Address cdssdk.StorageAddress
StorageID cdssdk.StorageID `json:"storageID"`
}
// 告知后续Op临时文件的路径
e.PutVar(o.BypassFileOutput, &BypassFileInfoValue{
BypassFileInfo: types.BypassFileInfo{
TempFilePath: objName,
FileHash: compInfo.FileHash,
},
})

func (b *GraphNodeBuilder) NewMultipartManage(addr cdssdk.StorageAddress, storageID cdssdk.StorageID) *MultipartManageNode {
node := &MultipartManageNode{
Address: addr,
StorageID: storageID,
// 等待后续Op处理临时文件
cb, err := exec.BindVar[*BypassHandleResultValue](e, ctx.Context, o.BypassCallback)
if err != nil {
return fmt.Errorf("getting temp file callback: %v", err)
}
b.AddNode(node)
return node
}

func (t *MultipartManageNode) GenerateOp() (exec.Op, error) {
return &MultipartManage{
Address: t.Address,
StorageID: t.StorageID,
}, nil
}
if cb.Commited {
tempStore.Commited(objName)
}

type MultipartUpload struct {
Address cdssdk.StorageAddress `json:"address"`
UploadArgs exec.VarID `json:"uploadArgs"`
UploadOutput exec.VarID `json:"uploadOutput"`
PartNumbers []int `json:"partNumbers"`
PartSize []int64 `json:"partSize"`
Input exec.VarID `json:"input"`
return nil
}

type UploadPartOutputValue struct {
Parts []*types.UploadPartOutput `json:"parts"`
func (o *MultipartInitiator) String() string {
return "MultipartInitiator"
}

func (v *UploadPartOutputValue) Clone() exec.VarValue {
return &*v
type MultipartUpload struct {
Storage stgmod.StorageDetail
UploadArgs exec.VarID
UploadResult exec.VarID
PartStream exec.VarID
PartNumber int
PartSize int64
}

func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
initUploadResult, err := exec.BindVar[*InitUploadValue](e, ctx.Context, o.UploadArgs)
uploadArgs, err := exec.BindVar[*MultipartUploadArgsValue](e, ctx.Context, o.UploadArgs)
if err == nil {
return err
}

input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input)
partStr, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.PartStream)
if err != nil {
return err
}
defer input.Stream.Close()
defer partStr.Stream.Close()

var client types.MultipartUploader
switch addr := o.Address.(type) {
case *cdssdk.OSSAddress:
client = oss.NewMultiPartUpload(addr)
uploader, err := factory.CreateComponent[types.MultipartUploader](o.Storage)
if err != nil {
return err
}

var parts UploadPartOutputValue
for i := 0; i < len(o.PartNumbers); i++ {
startTime := time.Now()
uploadPart, err := client.UploadPart(initUploadResult.UploadID, initUploadResult.Key, o.PartSize[i], o.PartNumbers[i], io.LimitReader(input.Stream, o.PartSize[i]))
log.Debugf("upload multipart spend time: %v", time.Since(startTime))
if err != nil {
return fmt.Errorf("failed to upload part: %w", err)
}
parts.Parts = append(parts.Parts, uploadPart)
startTime := time.Now()
uploadedInfo, err := uploader.UploadPart(ctx.Context, uploadArgs.InitState, uploadArgs.Key, o.PartSize, o.PartNumber, partStr.Stream)
log.Debugf("upload finished in %v", time.Since(startTime))

if err != nil {
return err
}

e.PutVar(o.UploadOutput, &parts)
e.PutVar(o.UploadResult, &UploadedPartInfoValue{
uploadedInfo,
})

return nil
}

func (o *MultipartUpload) String() string {
return "MultipartUpload"
return fmt.Sprintf("MultipartUpload[PartNumber=%v,PartSize=%v] Args: %v, Result: %v, Stream: %v", o.PartNumber, o.PartSize, o.UploadArgs, o.UploadResult, o.PartStream)
}

type MultipartInitiatorNode struct {
dag.NodeBase
StorageID cdssdk.StorageID `json:"storageID"`
}

func (b *GraphNodeBuilder) NewMultipartInitiator(storageID cdssdk.StorageID) *MultipartInitiatorNode {
node := &MultipartInitiatorNode{
StorageID: storageID,
}
b.AddNode(node)

node.OutputValues().Init(node, 2)
node.InputValues().Init(1)
return node
}

func (n *MultipartInitiatorNode) UploadArgsVar() *dag.ValueVar {
return n.OutputValues().Get(0)
}

func (n *MultipartInitiatorNode) BypassFileInfoVar() *dag.ValueVar {
return n.OutputValues().Get(1)
}

func (n *MultipartInitiatorNode) BypassCallbackSlot() dag.ValueInputSlot {
return dag.ValueInputSlot{
Node: n,
Index: 0,
}
}

func (n *MultipartInitiatorNode) AppendPartInfoSlot() dag.ValueInputSlot {
return dag.ValueInputSlot{
Node: n,
Index: n.InputStreams().EnlargeOne(),
}
}

func (n *MultipartInitiatorNode) GenerateOp() (exec.Op, error) {
return &MultipartInitiator{
StorageID: n.StorageID,
UploadArgs: n.UploadArgsVar().VarID,
UploadedParts: n.InputValues().GetVarIDsStart(1),
BypassFileOutput: n.BypassFileInfoVar().VarID,
BypassCallback: n.BypassCallbackSlot().Var().VarID,
}, nil
}

type MultipartUploadNode struct {
dag.NodeBase
Address cdssdk.StorageAddress
PartNumbers []int `json:"partNumbers"`
PartSize []int64 `json:"partSize"`
Storage stgmod.StorageDetail
PartNumber int
PartSize int64
}

func (b *GraphNodeBuilder) NewMultipartUpload(addr cdssdk.StorageAddress, partNumbers []int, partSize []int64) *MultipartUploadNode {
func (b *GraphNodeBuilder) NewMultipartUpload(stg stgmod.StorageDetail, partNumber int, partSize int64) *MultipartUploadNode {
node := &MultipartUploadNode{
Address: addr,
PartNumbers: partNumbers,
PartSize: partSize,
Storage: stg,
PartNumber: partNumber,
PartSize: partSize,
}
b.AddNode(node)

node.InputValues().Init(1)
node.OutputValues().Init(node, 1)
node.InputStreams().Init(1)
return node
}

func (t MultipartUploadNode) GenerateOp() (exec.Op, error) {
func (n *MultipartUploadNode) UploadArgsSlot() dag.ValueInputSlot {
return dag.ValueInputSlot{
Node: n,
Index: 0,
}
}

func (n *MultipartUploadNode) UploadResultVar() *dag.ValueVar {
return n.OutputValues().Get(0)
}

func (n *MultipartUploadNode) PartStreamSlot() dag.ValueInputSlot {
return dag.ValueInputSlot{
Node: n,
Index: 0,
}
}

func (n *MultipartUploadNode) GenerateOp() (exec.Op, error) {
return &MultipartUpload{
Address: t.Address,
PartNumbers: t.PartNumbers,
PartSize: t.PartSize,
Storage: n.Storage,
UploadArgs: n.UploadArgsSlot().Var().VarID,
UploadResult: n.UploadResultVar().VarID,
PartStream: n.PartStreamSlot().Var().VarID,
PartNumber: n.PartNumber,
PartSize: n.PartSize,
}, nil
}
*/

+ 2
- 2
common/pkgs/ioswitch2/ops2/ops.go View File

@@ -17,13 +17,13 @@ func NewGraphNodeBuilder() *GraphNodeBuilder {
type FromNode interface {
dag.Node
GetFrom() ioswitch2.From
Output() dag.StreamSlot
Output() dag.StreamOutputSlot
}

type ToNode interface {
dag.Node
GetTo() ioswitch2.To
Input() dag.StreamSlot
Input() dag.StreamOutputSlot
SetInput(input *dag.StreamVar)
}



+ 6
- 6
common/pkgs/ioswitch2/ops2/shard_store.go View File

@@ -137,9 +137,9 @@ func (t *ShardReadNode) GetFrom() ioswitch2.From {
return t.From
}

func (t *ShardReadNode) Output() dag.StreamSlot {
return dag.StreamSlot{
Var: t.OutputStreams().Get(0),
func (t *ShardReadNode) Output() dag.StreamOutputSlot {
return dag.StreamOutputSlot{
Node: t,
Index: 0,
}
}
@@ -184,9 +184,9 @@ func (t *ShardWriteNode) SetInput(input *dag.StreamVar) {
input.To(t, 0)
}

func (t *ShardWriteNode) Input() dag.StreamSlot {
return dag.StreamSlot{
Var: t.InputStreams().Get(0),
func (t *ShardWriteNode) Input() dag.StreamOutputSlot {
return dag.StreamOutputSlot{
Node: t,
Index: 0,
}
}


+ 3
- 3
common/pkgs/ioswitch2/ops2/shared_store.go View File

@@ -95,9 +95,9 @@ func (t *SharedLoadNode) SetInput(input *dag.StreamVar) {
input.To(t, 0)
}

func (t *SharedLoadNode) Input() dag.StreamSlot {
return dag.StreamSlot{
Var: t.InputStreams().Get(0),
func (t *SharedLoadNode) Input() dag.StreamOutputSlot {
return dag.StreamOutputSlot{
Node: t,
Index: 0,
}
}


+ 13
- 13
common/pkgs/ioswitch2/parser/parser.go View File

@@ -221,7 +221,7 @@ func extend(ctx *ParseContext) error {
}

ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
Stream: frNode.Output().Var,
Stream: frNode.Output().Var(),
StreamIndex: fr.GetStreamIndex(),
})

@@ -230,7 +230,7 @@ func extend(ctx *ParseContext) error {
// 只有输入输出需要EC编码的块时,才生成相关指令
if ctx.UseEC {
splitNode := ctx.DAG.NewChunkedSplit(ctx.Ft.ECParam.ChunkSize, ctx.Ft.ECParam.K)
splitNode.Split(frNode.Output().Var)
splitNode.Split(frNode.Output().Var())
for i := 0; i < ctx.Ft.ECParam.K; i++ {
ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
Stream: splitNode.SubStream(i),
@@ -242,7 +242,7 @@ func extend(ctx *ParseContext) error {
// 同上
if ctx.UseSegment {
splitNode := ctx.DAG.NewSegmentSplit(ctx.Ft.SegmentParam.Segments)
splitNode.SetInput(frNode.Output().Var)
splitNode.SetInput(frNode.Output().Var())
for i := 0; i < len(ctx.Ft.SegmentParam.Segments); i++ {
ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
Stream: splitNode.Segment(i),
@@ -893,12 +893,12 @@ func generateRange(ctx *ParseContext) {
if toStrIdx.IsRaw() {
n := ctx.DAG.NewRange()
toInput := toNode.Input()
*n.Env() = *toInput.Var.Src.Env()
rnged := n.RangeStream(toInput.Var, exec.Range{
*n.Env() = *toInput.Var().Src.Env()
rnged := n.RangeStream(toInput.Var(), exec.Range{
Offset: toRng.Offset - ctx.StreamRange.Offset,
Length: toRng.Length,
})
toInput.Var.NotTo(toNode)
toInput.Var().NotTo(toNode)
toNode.SetInput(rnged)

} else if toStrIdx.IsEC() {
@@ -909,15 +909,15 @@ func generateRange(ctx *ParseContext) {

n := ctx.DAG.NewRange()
toInput := toNode.Input()
*n.Env() = *toInput.Var.Src.Env()
rnged := n.RangeStream(toInput.Var, exec.Range{
*n.Env() = *toInput.Var().Src.Env()
rnged := n.RangeStream(toInput.Var(), exec.Range{
Offset: toRng.Offset - blkStart,
Length: toRng.Length,
})
toInput.Var.NotTo(toNode)
toInput.Var().NotTo(toNode)
toNode.SetInput(rnged)
} else if toStrIdx.IsSegment() {
// if frNode, ok := toNode.Input().Var.From().Node.(ops2.FromNode); ok {
// if frNode, ok := toNode.Input().Var().From().Node.(ops2.FromNode); ok {
// // 目前只有To也是分段时,才可能对接一个提供分段的From,此时不需要再生成Range指令
// if frNode.GetFrom().GetStreamIndex().IsSegment() {
// continue
@@ -929,12 +929,12 @@ func generateRange(ctx *ParseContext) {

// n := ctx.DAG.NewRange()
// toInput := toNode.Input()
// *n.Env() = *toInput.Var.From().Node.Env()
// rnged := n.RangeStream(toInput.Var, exec.Range{
// *n.Env() = *toInput.Var().From().Node.Env()
// rnged := n.RangeStream(toInput.Var(), exec.Range{
// Offset: strStart - ctx.StreamRange.Offset,
// Length: toRng.Length,
// })
// toInput.Var.NotTo(toNode, toInput.Index)
// toInput.Var().NotTo(toNode, toInput.Index)
// toNode.SetInput(rnged)
}
}


+ 2
- 2
common/pkgs/ioswitchlrc/ops2/clone.go View File

@@ -89,7 +89,7 @@ func (t *CloneStreamType) SetInput(raw *dag.StreamVar) {
}

func (t *CloneStreamType) NewOutput() *dag.StreamVar {
return t.OutputStreams().AppendNew(t).Var
return t.OutputStreams().AppendNew(t).Var()
}

func (t *CloneStreamType) GenerateOp() (exec.Op, error) {
@@ -119,7 +119,7 @@ func (t *CloneVarType) SetInput(raw *dag.ValueVar) {
}

func (t *CloneVarType) NewOutput() *dag.ValueVar {
return t.OutputValues().AppendNew(t).Var
return t.OutputValues().AppendNew(t).Var()
}

func (t *CloneVarType) GenerateOp() (exec.Op, error) {


+ 1
- 1
common/pkgs/ioswitchlrc/ops2/ec.go View File

@@ -141,7 +141,7 @@ func (t *LRCConstructAnyNode) RemoveAllInputs() {

func (t *LRCConstructAnyNode) NewOutput(dataIndex int) *dag.StreamVar {
t.OutputIndexes = append(t.OutputIndexes, dataIndex)
return t.OutputStreams().AppendNew(t).Var
return t.OutputStreams().AppendNew(t).Var()
}

func (t *LRCConstructAnyNode) GenerateOp() (exec.Op, error) {


+ 2
- 2
common/pkgs/ioswitchlrc/ops2/ops.go View File

@@ -15,12 +15,12 @@ func NewGraphNodeBuilder() *GraphNodeBuilder {

type FromNode interface {
dag.Node
Output() dag.StreamSlot
Output() dag.StreamOutputSlot
}

type ToNode interface {
dag.Node
Input() dag.StreamSlot
Input() dag.StreamOutputSlot
SetInput(input *dag.StreamVar)
}



+ 6
- 6
common/pkgs/ioswitchlrc/ops2/shard_store.go View File

@@ -137,9 +137,9 @@ func (t *ShardReadNode) GetFrom() ioswitchlrc.From {
return t.From
}

func (t *ShardReadNode) Output() dag.StreamSlot {
return dag.StreamSlot{
Var: t.OutputStreams().Get(0),
func (t *ShardReadNode) Output() dag.StreamOutputSlot {
return dag.StreamOutputSlot{
Node: t,
Index: 0,
}
}
@@ -184,9 +184,9 @@ func (t *ShardWriteNode) SetInput(input *dag.StreamVar) {
input.To(t, 0)
}

func (t *ShardWriteNode) Input() dag.StreamSlot {
return dag.StreamSlot{
Var: t.InputStreams().Get(0),
func (t *ShardWriteNode) Input() dag.StreamOutputSlot {
return dag.StreamOutputSlot{
Node: t,
Index: 0,
}
}


+ 6
- 6
common/pkgs/ioswitchlrc/parser/generator.go View File

@@ -70,7 +70,7 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr
}
ctx.ToNodes[to] = toNode

toNode.SetInput(frNode.Output().Var)
toNode.SetInput(frNode.Output().Var())
} else if idx < ctx.LRC.K {
dataToes = append(dataToes, to)
} else {
@@ -84,7 +84,7 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr

// 需要文件块,则生成Split指令
splitNode := ctx.DAG.NewChunkedSplit(ctx.LRC.ChunkSize, ctx.LRC.K)
splitNode.Split(frNode.Output().Var)
splitNode.Split(frNode.Output().Var())

for _, to := range dataToes {
toNode, err := buildToNode(ctx, to)
@@ -173,7 +173,7 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [
}
ctx.ToNodes[to] = toNode

toNode.SetInput(fr.Output().Var)
toNode.SetInput(fr.Output().Var())
continue
}

@@ -192,7 +192,7 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [

conNode := ctx.DAG.NewLRCConstructAny(ctx.LRC)
for i, fr := range frNodes {
conNode.AddInput(fr.Output().Var, i)
conNode.AddInput(fr.Output().Var(), i)
}

for _, to := range missedToes {
@@ -218,7 +218,7 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [
if fr == nil {
joinNode.AddInput(conNode.NewOutput(i))
} else {
joinNode.AddInput(fr.Output().Var)
joinNode.AddInput(fr.Output().Var())
}
}

@@ -278,7 +278,7 @@ func buildDAGReconstructGroup(ctx *GenerateContext, frs []ioswitchlrc.From, toes
return fmt.Errorf("building from node: %w", err)
}

inputs = append(inputs, frNode.Output().Var)
inputs = append(inputs, frNode.Output().Var())
}

missedGrpIdx := toes[0].GetDataIndex()


+ 6
- 6
common/pkgs/ioswitchlrc/parser/passes.go View File

@@ -233,12 +233,12 @@ func generateRange(ctx *GenerateContext) {
if toDataIdx == -1 {
n := ctx.DAG.NewRange()
toInput := toNode.Input()
*n.Env() = *toInput.Var.Src.Env()
rnged := n.RangeStream(toInput.Var, exec.Range{
*n.Env() = *toInput.Var().Src.Env()
rnged := n.RangeStream(toInput.Var(), exec.Range{
Offset: toRng.Offset - ctx.StreamRange.Offset,
Length: toRng.Length,
})
toInput.Var.NotTo(toNode)
toInput.Var().NotTo(toNode)
toNode.SetInput(rnged)

} else {
@@ -249,12 +249,12 @@ func generateRange(ctx *GenerateContext) {

n := ctx.DAG.NewRange()
toInput := toNode.Input()
*n.Env() = *toInput.Var.Src.Env()
rnged := n.RangeStream(toInput.Var, exec.Range{
*n.Env() = *toInput.Var().Src.Env()
rnged := n.RangeStream(toInput.Var(), exec.Range{
Offset: toRng.Offset - blkStart,
Length: toRng.Length,
})
toInput.Var.NotTo(toNode)
toInput.Var().NotTo(toNode)
toNode.SetInput(rnged)
}
}


+ 10
- 11
common/pkgs/storage/cos/multiPartUploader.go View File

@@ -3,13 +3,13 @@ package cos
import (
"context"
"fmt"
"github.com/tencentyun/cos-go-sdk-v5"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"io"
"net/http"
"net/url"

"github.com/tencentyun/cos-go-sdk-v5"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type MultiPartUploader struct {
@@ -32,16 +32,15 @@ func NewMultiPartUpload(address *cdssdk.COSAddress) *MultiPartUploader {
}
}

func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string, error) {
func (c *MultiPartUploader) Initiate(objectName string) (string, error) {
v, _, err := c.client.Object.InitiateMultipartUpload(context.Background(), objectName, nil)
if err != nil {
log.Error("Failed to initiate multipart upload: %v", err)
return "", err
return "", fmt.Errorf("failed to initiate multipart upload: %w", err)
}
return v.UploadID, nil
}

func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadPartOutput, error) {
func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadedPartInfo, error) {
resp, err := c.client.Object.UploadPart(
context.Background(), key, uploadID, partNumber, stream, nil,
)
@@ -49,14 +48,14 @@ func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int
return nil, fmt.Errorf("failed to upload part: %w", err)
}

result := &types.UploadPartOutput{
result := &types.UploadedPartInfo{
ETag: resp.Header.Get("ETag"),
PartNumber: partNumber,
}
return result, nil
}

func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, key string, parts []*types.UploadPartOutput) error {
func (c *MultiPartUploader) Complete(uploadID string, key string, parts []*types.UploadedPartInfo) error {
opt := &cos.CompleteMultipartUploadOptions{}
for i := 0; i < len(parts); i++ {
opt.Parts = append(opt.Parts, cos.Object{
@@ -72,7 +71,7 @@ func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, key string,

return nil
}
func (c *MultiPartUploader) AbortMultipartUpload() {
func (c *MultiPartUploader) Abort() {

}



+ 7
- 6
common/pkgs/storage/obs/multiPartUploader.go View File

@@ -2,11 +2,12 @@ package obs

import (
"fmt"
"io"

"github.com/huaweicloud/huaweicloud-sdk-go-obs/obs"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"io"
)

type MultiPartUploader struct {
@@ -26,7 +27,7 @@ func NewMultiPartUpload(address *cdssdk.OBSAddress) *MultiPartUploader {
}
}

func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string, error) {
func (c *MultiPartUploader) Initiate(objectName string) (string, error) {
input := &obs.InitiateMultipartUploadInput{}
input.Bucket = c.bucket
input.Key = objectName
@@ -37,7 +38,7 @@ func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string,
return imur.UploadId, nil
}

func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadPartOutput, error) {
func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadedPartInfo, error) {
uploadParam := &obs.UploadPartInput{
Bucket: c.bucket,
Key: key,
@@ -51,14 +52,14 @@ func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int
if err != nil {
return nil, fmt.Errorf("failed to upload part: %w", err)
}
result := &types.UploadPartOutput{
result := &types.UploadedPartInfo{
ETag: part.ETag,
PartNumber: partNumber,
}
return result, nil
}

func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, Key string, parts []*types.UploadPartOutput) error {
func (c *MultiPartUploader) Complete(uploadID string, Key string, parts []*types.UploadedPartInfo) error {
var uploadPart []obs.Part
for i := 0; i < len(parts); i++ {
uploadPart = append(uploadPart, obs.Part{
@@ -80,7 +81,7 @@ func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, Key string,
}
return nil
}
func (c *MultiPartUploader) AbortMultipartUpload() {
func (c *MultiPartUploader) Abort() {

}



+ 9
- 8
common/pkgs/storage/oss/multiPartUploader.go View File

@@ -2,11 +2,12 @@ package oss

import (
"fmt"
"io"
"log"

"github.com/aliyun/aliyun-oss-go-sdk/oss"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"io"
"log"
)

type MultiPartUploader struct {
@@ -14,7 +15,7 @@ type MultiPartUploader struct {
bucket *oss.Bucket
}

func NewMultiPartUpload(address *cdssdk.OSSAddress) *MultiPartUploader {
func NewMultiPartUpload(address *cdssdk.OSSType) *MultiPartUploader {
// 创建OSSClient实例。
client, err := oss.New(address.Endpoint, address.AK, address.SK)
if err != nil {
@@ -32,7 +33,7 @@ func NewMultiPartUpload(address *cdssdk.OSSAddress) *MultiPartUploader {
}
}

func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string, error) {
func (c *MultiPartUploader) Initiate(objectName string) (string, error) {
imur, err := c.bucket.InitiateMultipartUpload(objectName)
if err != nil {
return "", fmt.Errorf("failed to initiate multipart upload: %w", err)
@@ -40,7 +41,7 @@ func (c *MultiPartUploader) InitiateMultipartUpload(objectName string) (string,
return imur.UploadID, nil
}

func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadPartOutput, error) {
func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadedPartInfo, error) {
uploadParam := oss.InitiateMultipartUploadResult{
UploadID: uploadID,
Key: key,
@@ -50,14 +51,14 @@ func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int
if err != nil {
return nil, fmt.Errorf("failed to upload part: %w", err)
}
result := &types.UploadPartOutput{
result := &types.UploadedPartInfo{
ETag: part.ETag,
PartNumber: partNumber,
}
return result, nil
}

func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, Key string, parts []*types.UploadPartOutput) error {
func (c *MultiPartUploader) Complete(uploadID string, Key string, parts []*types.UploadedPartInfo) error {
notifyParam := oss.InitiateMultipartUploadResult{
UploadID: uploadID,
Key: Key,
@@ -77,7 +78,7 @@ func (c *MultiPartUploader) CompleteMultipartUpload(uploadID string, Key string,
return nil
}

func (c *MultiPartUploader) AbortMultipartUpload() {
func (c *MultiPartUploader) Abort() {

}



+ 12
- 0
common/pkgs/storage/svcmgr/mgr.go View File

@@ -51,6 +51,18 @@ func (m *Manager) CreateService(detail stgmod.StorageDetail) error {
return nil
}

func (m *Manager) GetInfo(stgID cdssdk.StorageID) (stgmod.StorageDetail, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return stgmod.StorageDetail{}, types.ErrStorageNotFound
}

return stg.Service.Info(), nil
}

// 查找指定Storage的ShardStore组件
func (m *Manager) GetShardStore(stgID cdssdk.StorageID) (types.ShardStore, error) {
return GetComponent[types.ShardStore](m, stgID)


+ 14
- 3
common/pkgs/storage/types/bypass.go View File

@@ -1,7 +1,18 @@
package types

import "io"
import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type BypassWriter interface {
Write(stream io.Reader) (string, error)
// type BypassWriter interface {
// Write(stream io.Reader) (string, error)
// }

type BypassFileInfo struct {
TempFilePath string
FileHash cdssdk.FileHash
}

type BypassNotifier interface {
BypassUploaded(info BypassFileInfo) error
}

+ 20
- 5
common/pkgs/storage/types/s3_client.go View File

@@ -1,18 +1,33 @@
package types

import (
"context"
"io"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type MultipartInitiator interface {
MultipartUploader
Initiate(ctx context.Context, objectName string) (MultipartInitState, error)
Complete(ctx context.Context, parts []UploadedPartInfo) (CompletedFileInfo, error)
Abort()
}

type MultipartUploader interface {
InitiateMultipartUpload(objectName string) (string, error)
UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*UploadPartOutput, error)
CompleteMultipartUpload(uploadID string, Key string, Parts []*UploadPartOutput) error
AbortMultipartUpload()
UploadPart(ctx context.Context, init MultipartInitState, objectName string, partSize int64, partNumber int, stream io.Reader) (UploadedPartInfo, error)
Close()
}

type UploadPartOutput struct {
type MultipartInitState struct {
UploadID string
}

type UploadedPartInfo struct {
PartNumber int
ETag string
}

type CompletedFileInfo struct {
FileHash cdssdk.FileHash // 可以为空,为空代表获取不到FileHash值
}

+ 2
- 2
common/pkgs/storage/types/temp_store.go View File

@@ -1,10 +1,10 @@
package types

type TempStore interface {
// 生成并注册一个临时文件名。在名字有效期间此临时文件不会被清理
// 生成并注册一个临时文件路径,在Commited或Drop之前,此文件不会被清理。
CreateTemp() string
// 指示一个临时文件已经被移动作它用,不需要再关注它了(也不需要删除这个文件)。
Commited(filePath string)
// 临时文件被放弃,可以删除这个文件了
// 临时文件被放弃,可以删除这个文件了。如果提前调用了Commited,则此函数应该什么也不做。
Drop(filePath string)
}

+ 13
- 0
common/pkgs/storage/utils/utils.go View File

@@ -5,8 +5,21 @@ import (
"path/filepath"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
)

func MakeLoadedPackagePath(userID cdssdk.UserID, packageID cdssdk.PackageID) string {
return filepath.Join(fmt.Sprintf("%v", userID), fmt.Sprintf("%v", packageID))
}

func FindFeature[T cdssdk.StorageFeature](detail stgmod.StorageDetail) T {
for _, f := range detail.Storage.Features {
f2, ok := f.(T)
if ok {
return f2
}
}

var def T
return def
}

Loading…
Cancel
Save