Browse Source

增加ECMultiplier功能

gitlink
Sydonian 10 months ago
parent
commit
c9274a2c5c
13 changed files with 320 additions and 8 deletions
  1. +93
    -0
      common/pkgs/storage/efile/ec_multiplier.go
  2. +109
    -0
      common/pkgs/storage/efile/efile.go
  3. +1
    -0
      common/pkgs/storage/local/local.go
  4. +1
    -0
      common/pkgs/storage/local/shard_store.go
  5. +1
    -0
      common/pkgs/storage/local/shared_store.go
  6. +75
    -0
      common/pkgs/storage/mashup/mashup.go
  7. +2
    -0
      common/pkgs/storage/s3/obs/s2s.go
  8. +1
    -0
      common/pkgs/storage/s3/shard_store.go
  9. +3
    -8
      common/pkgs/storage/s3/shared_store.go
  10. +12
    -0
      common/pkgs/storage/types/bypass.go
  11. +11
    -0
      common/pkgs/storage/types/ec_multiplier.go
  12. +8
    -0
      common/pkgs/storage/types/empty_builder.go
  13. +3
    -0
      common/pkgs/storage/types/types.go

+ 93
- 0
common/pkgs/storage/efile/ec_multiplier.go View File

@@ -0,0 +1,93 @@
package efile

import (
"fmt"
"net/url"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/os2"
"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type ECMultiplier struct {
token string
url string
feat *cdssdk.ECMultiplierFeature
outputs []string
completed bool
}

// 进行EC运算,coef * inputs。coef为编码矩阵,inputs为待编码数据,chunkSize为分块大小。
// 输出为每一个块文件的路径,数组长度 = len(coef)
func (m *ECMultiplier) Multiply(coef [][]byte, inputs []types.HTTPReqeust, chunkSize int64) ([]string, error) {
type Request struct {
Inputs []types.HTTPReqeust `json:"inputs"`
Outputs []string `json:"outputs"`
Coefs [][]byte `json:"coefs"`
ChunkSize int64 `json:"chunkSize"`
}
type Response struct {
Code string `json:"code"`
Msg string `json:"msg"`
}

fileName := os2.GenerateRandomFileName(10)
m.outputs = make([]string, len(coef))
for i := range m.outputs {
m.outputs[i] = fmt.Sprintf("%s_%d", fileName, i)
}

u, err := url.JoinPath(m.url, "efile/openapi/v2/createECTask")
if err != nil {
return nil, err
}

resp, err := http2.PostJSON(u, http2.RequestParam{
Header: map[string]string{"token": m.token},
Body: Request{
Inputs: inputs,
Outputs: m.outputs,
Coefs: coef,
ChunkSize: chunkSize,
},
})
if err != nil {
return nil, err
}

var r Response
err = serder.JSONToObjectStream(resp.Body, &r)
if err != nil {
return nil, err
}

if r.Code != "0" {
return nil, fmt.Errorf("code: %s, msg: %s", r.Code, r.Msg)
}

return m.outputs, nil
}

// 完成计算
func (m *ECMultiplier) Complete() {
m.completed = true
}

// 取消计算。如果已经调用了Complete,则应该无任何影响
func (m *ECMultiplier) Abort() {
if !m.completed {
u, err := url.JoinPath(m.url, "efile/openapi/v2/file/remove")
if err != nil {
return
}

for _, output := range m.outputs {
http2.PostJSON(u, http2.RequestParam{
Header: map[string]string{"token": m.token},
Query: map[string]string{"paths": output},
})
}
}
}

+ 109
- 0
common/pkgs/storage/efile/efile.go View File

@@ -0,0 +1,109 @@
package efile

import (
"fmt"
"net/url"
"sync"
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils"
)

func init() {
reg.RegisterBuilder[*cdssdk.EFileType](func(detail stgmod.StorageDetail) types.StorageBuilder {
return &builder{
detail: detail,
}
})
}

type builder struct {
types.EmptyBuilder
detail stgmod.StorageDetail
token string
tokenLock sync.Mutex
getTokenTime time.Time
}

func (b *builder) getToken() (string, error) {
stgType := b.detail.Storage.Type.(*cdssdk.EFileType)

b.tokenLock.Lock()
defer b.tokenLock.Unlock()

if b.token != "" {
dt := time.Since(b.getTokenTime)
if dt < time.Second*time.Duration(stgType.TokenExpire) {
return b.token, nil
}
}

u, err := url.JoinPath(stgType.TokenURL, "/ac/openapi/v2/tokens")
if err != nil {
return "", err
}

resp, err := http2.PostJSON(u, http2.RequestParam{
Header: map[string]string{
"user": stgType.User,
"password": stgType.Password,
"orgId": stgType.OrgID,
},
})
if err != nil {
return "", err
}

type Response struct {
Code string `json:"code"`
Msg string `json:"msg"`
Data []struct {
ClusterID string `json:"clusterId"`
Token string `json:"token"`
} `json:"data"`
}

var r Response
err = serder.JSONToObjectStream(resp.Body, &r)
if err != nil {
return "", err
}

if r.Code != "0" {
return "", fmt.Errorf("code:%s, msg:%s", r.Code, r.Msg)
}

for _, d := range r.Data {
if d.ClusterID == stgType.ClusterID {
b.token = d.Token
b.getTokenTime = time.Now()
return d.Token, nil
}
}

return "", fmt.Errorf("clusterID:%s not found", stgType.ClusterID)
}

func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) {
feat := utils.FindFeature[*cdssdk.ECMultiplierFeature](b.detail)
if feat == nil {
return nil, fmt.Errorf("feature ECMultiplier not found")
}

token, err := b.getToken()
if err != nil {
return nil, fmt.Errorf("get token: %v", err)
}

return &ECMultiplier{
token: token,
url: b.detail.Storage.Type.(*cdssdk.EFileType).APIURL,
feat: feat,
}, nil
}

+ 1
- 0
common/pkgs/storage/local/local.go View File

@@ -19,6 +19,7 @@ func init() {
} }


type builder struct { type builder struct {
types.EmptyBuilder
detail stgmod.StorageDetail detail stgmod.StorageDetail
} }




+ 1
- 0
common/pkgs/storage/local/shard_store.go View File

@@ -23,6 +23,7 @@ const (
) )


type ShardStoreDesc struct { type ShardStoreDesc struct {
types.EmptyShardStoreDesc
builder *builder builder *builder
} }




+ 1
- 0
common/pkgs/storage/local/shared_store.go View File

@@ -11,6 +11,7 @@ import (
) )


type SharedStoreDesc struct { type SharedStoreDesc struct {
types.EmptySharedStoreDesc
builder *builder builder *builder
} }




+ 75
- 0
common/pkgs/storage/mashup/mashup.go View File

@@ -0,0 +1,75 @@
package mashup

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func init() {
reg.RegisterBuilder[*cdssdk.MashupStorageType](func(detail stgmod.StorageDetail) types.StorageBuilder {
return &builder{
detail: detail,
}
})
}

type builder struct {
detail stgmod.StorageDetail
}

func (b *builder) CreateAgent() (types.StorageAgent, error) {
stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType)
detail := b.detail
detail.Storage.Type = stgType.Agent

blder := factory.GetBuilder(detail)
return blder.CreateAgent()
}

func (b *builder) ShardStoreDesc() types.ShardStoreDesc {
stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType)
detail := b.detail
detail.Storage.Type = stgType.Agent

blder := factory.GetBuilder(detail)
return blder.ShardStoreDesc()
}

func (b *builder) SharedStoreDesc() types.SharedStoreDesc {
stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType)
detail := b.detail
detail.Storage.Type = stgType.Agent

blder := factory.GetBuilder(detail)
return blder.SharedStoreDesc()
}

func (b *builder) CreateMultiparter() (types.Multiparter, error) {
stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType)
detail := b.detail
detail.Storage.Type = stgType.Feature

blder := factory.GetBuilder(detail)
return blder.CreateMultiparter()
}

func (b *builder) CreateS2STransfer() (types.S2STransfer, error) {
stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType)
detail := b.detail
detail.Storage.Type = stgType.Feature

blder := factory.GetBuilder(detail)
return blder.CreateS2STransfer()
}

func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) {
stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType)
detail := b.detail
detail.Storage.Type = stgType.Feature

blder := factory.GetBuilder(detail)
return blder.CreateECMultiplier()
}

+ 2
- 0
common/pkgs/storage/s3/obs/s2s.go View File

@@ -170,5 +170,7 @@ func (s *S2STransfer) Abort() {
s.omsCli.DeleteTask(&model.DeleteTaskRequest{ s.omsCli.DeleteTask(&model.DeleteTaskRequest{
TaskId: fmt.Sprintf("%v", *s.taskID), TaskId: fmt.Sprintf("%v", *s.taskID),
}) })

// TODO 清理临时文件
} }
} }

+ 1
- 0
common/pkgs/storage/s3/shard_store.go View File

@@ -27,6 +27,7 @@ const (
) )


type ShardStoreDesc struct { type ShardStoreDesc struct {
types.EmptyShardStoreDesc
builder *builder builder *builder
} }




+ 3
- 8
common/pkgs/storage/s3/shared_store.go View File

@@ -1,12 +1,7 @@
package s3 package s3


type SharedStoreDesc struct {
}
import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"


func (d *SharedStoreDesc) Enabled() bool {
return false
}

func (d *SharedStoreDesc) HasBypassWrite() bool {
return false
type SharedStoreDesc struct {
types.EmptySharedStoreDesc
} }

+ 12
- 0
common/pkgs/storage/types/bypass.go View File

@@ -27,3 +27,15 @@ type BypassFilePath struct {
type BypassRead interface { type BypassRead interface {
BypassRead(fileHash cdssdk.FileHash) (BypassFilePath, error) BypassRead(fileHash cdssdk.FileHash) (BypassFilePath, error)
} }

// 能通过一个Http请求直接访问文件
// 仅用于分片存储。
type HTTPBypassRead interface {
HTTPBypassRead(fileHash cdssdk.FileHash) (HTTPReqeust, error)
}

type HTTPReqeust struct {
SignedUrl string `json:"signedUrl"`
Header map[string]string `json:"header"`
Body string `json:"body"`
}

+ 11
- 0
common/pkgs/storage/types/ec_multiplier.go View File

@@ -0,0 +1,11 @@
package types

type ECMultiplier interface {
// 进行EC运算,coef * inputs。coef为编码矩阵,inputs为待编码数据,chunkSize为分块大小。
// 输出为每一个块文件的路径,数组长度 = len(coef)
Multiply(coef [][]byte, inputs []HTTPReqeust, chunkSize int64) ([]string, error)
// 完成计算
Complete()
// 取消计算。如果已经调用了Complete,则应该无任何影响
Abort()
}

+ 8
- 0
common/pkgs/storage/types/empty_builder.go View File

@@ -32,6 +32,10 @@ func (b *EmptyBuilder) CreateS2STransfer() (S2STransfer, error) {
return nil, fmt.Errorf("create s2s transfer for %T: %w", b.Detail.Storage.Type, ErrUnsupported) return nil, fmt.Errorf("create s2s transfer for %T: %w", b.Detail.Storage.Type, ErrUnsupported)
} }


func (b *EmptyBuilder) CreateECMultiplier() (ECMultiplier, error) {
return nil, fmt.Errorf("create ec multiplier for %T: %w", b.Detail.Storage.Type, ErrUnsupported)
}

type EmptyShardStoreDesc struct { type EmptyShardStoreDesc struct {
} }


@@ -47,6 +51,10 @@ func (d *EmptyShardStoreDesc) HasBypassRead() bool {
return false return false
} }


func (d *EmptyShardStoreDesc) HasBypassHTTPRead() bool {
return false
}

type EmptySharedStoreDesc struct { type EmptySharedStoreDesc struct {
} }




+ 3
- 0
common/pkgs/storage/types/types.go View File

@@ -50,6 +50,7 @@ type StorageBuilder interface {
CreateMultiparter() (Multiparter, error) CreateMultiparter() (Multiparter, error)
// 创建一个存储服务直传组件 // 创建一个存储服务直传组件
CreateS2STransfer() (S2STransfer, error) CreateS2STransfer() (S2STransfer, error)
CreateECMultiplier() (ECMultiplier, error)
} }


type ShardStoreDesc interface { type ShardStoreDesc interface {
@@ -59,6 +60,8 @@ type ShardStoreDesc interface {
HasBypassWrite() bool HasBypassWrite() bool
// 是否能旁路读取 // 是否能旁路读取
HasBypassRead() bool HasBypassRead() bool
// 是否能通过HTTP读取
HasBypassHTTPRead() bool
} }


type SharedStoreDesc interface { type SharedStoreDesc interface {


Loading…
Cancel
Save