diff --git a/common/pkgs/storage/efile/ec_multiplier.go b/common/pkgs/storage/efile/ec_multiplier.go new file mode 100644 index 0000000..739c337 --- /dev/null +++ b/common/pkgs/storage/efile/ec_multiplier.go @@ -0,0 +1,93 @@ +package efile + +import ( + "fmt" + "net/url" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/os2" + "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +type ECMultiplier struct { + token string + url string + feat *cdssdk.ECMultiplierFeature + outputs []string + completed bool +} + +// 进行EC运算,coef * inputs。coef为编码矩阵,inputs为待编码数据,chunkSize为分块大小。 +// 输出为每一个块文件的路径,数组长度 = len(coef) +func (m *ECMultiplier) Multiply(coef [][]byte, inputs []types.HTTPReqeust, chunkSize int64) ([]string, error) { + type Request struct { + Inputs []types.HTTPReqeust `json:"inputs"` + Outputs []string `json:"outputs"` + Coefs [][]byte `json:"coefs"` + ChunkSize int64 `json:"chunkSize"` + } + type Response struct { + Code string `json:"code"` + Msg string `json:"msg"` + } + + fileName := os2.GenerateRandomFileName(10) + m.outputs = make([]string, len(coef)) + for i := range m.outputs { + m.outputs[i] = fmt.Sprintf("%s_%d", fileName, i) + } + + u, err := url.JoinPath(m.url, "efile/openapi/v2/createECTask") + if err != nil { + return nil, err + } + + resp, err := http2.PostJSON(u, http2.RequestParam{ + Header: map[string]string{"token": m.token}, + Body: Request{ + Inputs: inputs, + Outputs: m.outputs, + Coefs: coef, + ChunkSize: chunkSize, + }, + }) + if err != nil { + return nil, err + } + + var r Response + err = serder.JSONToObjectStream(resp.Body, &r) + if err != nil { + return nil, err + } + + if r.Code != "0" { + return nil, fmt.Errorf("code: %s, msg: %s", r.Code, r.Msg) + } + + return m.outputs, nil +} + +// 完成计算 +func (m *ECMultiplier) Complete() { + m.completed = true +} + +// 取消计算。如果已经调用了Complete,则应该无任何影响 +func (m *ECMultiplier) Abort() { + if !m.completed { + u, err := url.JoinPath(m.url, "efile/openapi/v2/file/remove") + if err != nil { + return + } + + for _, output := range m.outputs { + http2.PostJSON(u, http2.RequestParam{ + Header: map[string]string{"token": m.token}, + Query: map[string]string{"paths": output}, + }) + } + } +} diff --git a/common/pkgs/storage/efile/efile.go b/common/pkgs/storage/efile/efile.go new file mode 100644 index 0000000..af869d6 --- /dev/null +++ b/common/pkgs/storage/efile/efile.go @@ -0,0 +1,109 @@ +package efile + +import ( + "fmt" + "net/url" + "sync" + "time" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/serder" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils" +) + +func init() { + reg.RegisterBuilder[*cdssdk.EFileType](func(detail stgmod.StorageDetail) types.StorageBuilder { + return &builder{ + detail: detail, + } + }) +} + +type builder struct { + types.EmptyBuilder + detail stgmod.StorageDetail + token string + tokenLock sync.Mutex + getTokenTime time.Time +} + +func (b *builder) getToken() (string, error) { + stgType := b.detail.Storage.Type.(*cdssdk.EFileType) + + b.tokenLock.Lock() + defer b.tokenLock.Unlock() + + if b.token != "" { + dt := time.Since(b.getTokenTime) + if dt < time.Second*time.Duration(stgType.TokenExpire) { + return b.token, nil + } + } + + u, err := url.JoinPath(stgType.TokenURL, "/ac/openapi/v2/tokens") + if err != nil { + return "", err + } + + resp, err := http2.PostJSON(u, http2.RequestParam{ + Header: map[string]string{ + "user": stgType.User, + "password": stgType.Password, + "orgId": stgType.OrgID, + }, + }) + if err != nil { + return "", err + } + + type Response struct { + Code string `json:"code"` + Msg string `json:"msg"` + Data []struct { + ClusterID string `json:"clusterId"` + Token string `json:"token"` + } `json:"data"` + } + + var r Response + err = serder.JSONToObjectStream(resp.Body, &r) + if err != nil { + return "", err + } + + if r.Code != "0" { + return "", fmt.Errorf("code:%s, msg:%s", r.Code, r.Msg) + } + + for _, d := range r.Data { + if d.ClusterID == stgType.ClusterID { + b.token = d.Token + b.getTokenTime = time.Now() + return d.Token, nil + } + } + + return "", fmt.Errorf("clusterID:%s not found", stgType.ClusterID) +} + +func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) { + feat := utils.FindFeature[*cdssdk.ECMultiplierFeature](b.detail) + if feat == nil { + return nil, fmt.Errorf("feature ECMultiplier not found") + } + + token, err := b.getToken() + if err != nil { + return nil, fmt.Errorf("get token: %v", err) + } + + return &ECMultiplier{ + token: token, + url: b.detail.Storage.Type.(*cdssdk.EFileType).APIURL, + feat: feat, + }, nil +} diff --git a/common/pkgs/storage/local/local.go b/common/pkgs/storage/local/local.go index ca49db8..1084275 100644 --- a/common/pkgs/storage/local/local.go +++ b/common/pkgs/storage/local/local.go @@ -19,6 +19,7 @@ func init() { } type builder struct { + types.EmptyBuilder detail stgmod.StorageDetail } diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 278cd2b..28ed46a 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -23,6 +23,7 @@ const ( ) type ShardStoreDesc struct { + types.EmptyShardStoreDesc builder *builder } diff --git a/common/pkgs/storage/local/shared_store.go b/common/pkgs/storage/local/shared_store.go index bfd9730..87eb45c 100644 --- a/common/pkgs/storage/local/shared_store.go +++ b/common/pkgs/storage/local/shared_store.go @@ -11,6 +11,7 @@ import ( ) type SharedStoreDesc struct { + types.EmptySharedStoreDesc builder *builder } diff --git a/common/pkgs/storage/mashup/mashup.go b/common/pkgs/storage/mashup/mashup.go new file mode 100644 index 0000000..882aaff --- /dev/null +++ b/common/pkgs/storage/mashup/mashup.go @@ -0,0 +1,75 @@ +package mashup + +import ( + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +func init() { + reg.RegisterBuilder[*cdssdk.MashupStorageType](func(detail stgmod.StorageDetail) types.StorageBuilder { + return &builder{ + detail: detail, + } + }) +} + +type builder struct { + detail stgmod.StorageDetail +} + +func (b *builder) CreateAgent() (types.StorageAgent, error) { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Agent + + blder := factory.GetBuilder(detail) + return blder.CreateAgent() +} + +func (b *builder) ShardStoreDesc() types.ShardStoreDesc { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Agent + + blder := factory.GetBuilder(detail) + return blder.ShardStoreDesc() +} + +func (b *builder) SharedStoreDesc() types.SharedStoreDesc { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Agent + + blder := factory.GetBuilder(detail) + return blder.SharedStoreDesc() +} + +func (b *builder) CreateMultiparter() (types.Multiparter, error) { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Feature + + blder := factory.GetBuilder(detail) + return blder.CreateMultiparter() +} + +func (b *builder) CreateS2STransfer() (types.S2STransfer, error) { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Feature + + blder := factory.GetBuilder(detail) + return blder.CreateS2STransfer() +} + +func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Feature + + blder := factory.GetBuilder(detail) + return blder.CreateECMultiplier() +} diff --git a/common/pkgs/storage/s3/obs/s2s.go b/common/pkgs/storage/s3/obs/s2s.go index 5c28b84..6317092 100644 --- a/common/pkgs/storage/s3/obs/s2s.go +++ b/common/pkgs/storage/s3/obs/s2s.go @@ -170,5 +170,7 @@ func (s *S2STransfer) Abort() { s.omsCli.DeleteTask(&model.DeleteTaskRequest{ TaskId: fmt.Sprintf("%v", *s.taskID), }) + + // TODO 清理临时文件 } } diff --git a/common/pkgs/storage/s3/shard_store.go b/common/pkgs/storage/s3/shard_store.go index 7185476..aa98e5f 100644 --- a/common/pkgs/storage/s3/shard_store.go +++ b/common/pkgs/storage/s3/shard_store.go @@ -27,6 +27,7 @@ const ( ) type ShardStoreDesc struct { + types.EmptyShardStoreDesc builder *builder } diff --git a/common/pkgs/storage/s3/shared_store.go b/common/pkgs/storage/s3/shared_store.go index bb822e8..c0e74e3 100644 --- a/common/pkgs/storage/s3/shared_store.go +++ b/common/pkgs/storage/s3/shared_store.go @@ -1,12 +1,7 @@ package s3 -type SharedStoreDesc struct { -} +import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" -func (d *SharedStoreDesc) Enabled() bool { - return false -} - -func (d *SharedStoreDesc) HasBypassWrite() bool { - return false +type SharedStoreDesc struct { + types.EmptySharedStoreDesc } diff --git a/common/pkgs/storage/types/bypass.go b/common/pkgs/storage/types/bypass.go index 5e8d23f..9760559 100644 --- a/common/pkgs/storage/types/bypass.go +++ b/common/pkgs/storage/types/bypass.go @@ -27,3 +27,15 @@ type BypassFilePath struct { type BypassRead interface { BypassRead(fileHash cdssdk.FileHash) (BypassFilePath, error) } + +// 能通过一个Http请求直接访问文件 +// 仅用于分片存储。 +type HTTPBypassRead interface { + HTTPBypassRead(fileHash cdssdk.FileHash) (HTTPReqeust, error) +} + +type HTTPReqeust struct { + SignedUrl string `json:"signedUrl"` + Header map[string]string `json:"header"` + Body string `json:"body"` +} diff --git a/common/pkgs/storage/types/ec_multiplier.go b/common/pkgs/storage/types/ec_multiplier.go new file mode 100644 index 0000000..c9d8d7b --- /dev/null +++ b/common/pkgs/storage/types/ec_multiplier.go @@ -0,0 +1,11 @@ +package types + +type ECMultiplier interface { + // 进行EC运算,coef * inputs。coef为编码矩阵,inputs为待编码数据,chunkSize为分块大小。 + // 输出为每一个块文件的路径,数组长度 = len(coef) + Multiply(coef [][]byte, inputs []HTTPReqeust, chunkSize int64) ([]string, error) + // 完成计算 + Complete() + // 取消计算。如果已经调用了Complete,则应该无任何影响 + Abort() +} diff --git a/common/pkgs/storage/types/empty_builder.go b/common/pkgs/storage/types/empty_builder.go index be0178c..692d020 100644 --- a/common/pkgs/storage/types/empty_builder.go +++ b/common/pkgs/storage/types/empty_builder.go @@ -32,6 +32,10 @@ func (b *EmptyBuilder) CreateS2STransfer() (S2STransfer, error) { return nil, fmt.Errorf("create s2s transfer for %T: %w", b.Detail.Storage.Type, ErrUnsupported) } +func (b *EmptyBuilder) CreateECMultiplier() (ECMultiplier, error) { + return nil, fmt.Errorf("create ec multiplier for %T: %w", b.Detail.Storage.Type, ErrUnsupported) +} + type EmptyShardStoreDesc struct { } @@ -47,6 +51,10 @@ func (d *EmptyShardStoreDesc) HasBypassRead() bool { return false } +func (d *EmptyShardStoreDesc) HasBypassHTTPRead() bool { + return false +} + type EmptySharedStoreDesc struct { } diff --git a/common/pkgs/storage/types/types.go b/common/pkgs/storage/types/types.go index 11b2516..af1803d 100644 --- a/common/pkgs/storage/types/types.go +++ b/common/pkgs/storage/types/types.go @@ -50,6 +50,7 @@ type StorageBuilder interface { CreateMultiparter() (Multiparter, error) // 创建一个存储服务直传组件 CreateS2STransfer() (S2STransfer, error) + CreateECMultiplier() (ECMultiplier, error) } type ShardStoreDesc interface { @@ -59,6 +60,8 @@ type ShardStoreDesc interface { HasBypassWrite() bool // 是否能旁路读取 HasBypassRead() bool + // 是否能通过HTTP读取 + HasBypassHTTPRead() bool } type SharedStoreDesc interface {