Browse Source

Merge branch 'master' of https://gitlink.org.cn/cloudream/storage into feature_zqj

gitlink
Jake 10 months ago
parent
commit
09a0ca5c52
32 changed files with 724 additions and 147 deletions
  1. +1
    -1
      client/internal/http/server.go
  2. +54
    -62
      client/internal/http/temp.go
  3. +17
    -1
      client/internal/services/storage.go
  4. +5
    -5
      common/pkgs/ioswitch2/fromto.go
  5. +17
    -17
      common/pkgs/ioswitch2/ops2/public_store.go
  6. +2
    -2
      common/pkgs/ioswitch2/parser/gen/generator.go
  7. +3
    -3
      common/pkgs/storage/agtpool/pool.go
  8. +93
    -0
      common/pkgs/storage/efile/ec_multiplier.go
  9. +109
    -0
      common/pkgs/storage/efile/efile.go
  10. +8
    -8
      common/pkgs/storage/local/agent.go
  11. +8
    -7
      common/pkgs/storage/local/local.go
  12. +14
    -13
      common/pkgs/storage/local/public_store.go
  13. +1
    -0
      common/pkgs/storage/local/shard_store.go
  14. +75
    -0
      common/pkgs/storage/mashup/mashup.go
  15. +1
    -1
      common/pkgs/storage/s3/agent.go
  16. +2
    -0
      common/pkgs/storage/s3/obs/s2s.go
  17. +7
    -0
      common/pkgs/storage/s3/public_store.go
  18. +2
    -2
      common/pkgs/storage/s3/s3.go
  19. +1
    -0
      common/pkgs/storage/s3/shard_store.go
  20. +0
    -12
      common/pkgs/storage/s3/shared_store.go
  21. +12
    -0
      common/pkgs/storage/types/bypass.go
  22. +11
    -0
      common/pkgs/storage/types/ec_multiplier.go
  23. +13
    -5
      common/pkgs/storage/types/empty_builder.go
  24. +1
    -1
      common/pkgs/storage/types/public_store.go
  25. +6
    -3
      common/pkgs/storage/types/types.go
  26. +8
    -0
      common/pkgs/sysevent/config.go
  27. +119
    -0
      common/pkgs/sysevent/publisher.go
  28. +9
    -0
      common/pkgs/sysevent/sysevent.go
  29. +121
    -0
      common/pkgs/sysevent/watcher.go
  30. +1
    -1
      common/pkgs/uploader/create_load.go
  31. +1
    -1
      common/pkgs/uploader/update.go
  32. +2
    -2
      common/pkgs/uploader/uploader.go

+ 1
- 1
client/internal/http/server.go View File

@@ -43,7 +43,7 @@ func (s *Server) Serve() error {
func (s *Server) initRouters() {
rt := s.engine.Use()

// initTemp(rt, s)
initTemp(rt, s)

s.routeV1(s.engine)



+ 54
- 62
client/internal/http/temp.go View File

@@ -1,6 +1,5 @@
package http

/*
import (
"net/http"

@@ -96,7 +95,7 @@ type TempGetObjectDetailResp struct {
type ObjectBlockDetail struct {
ObjectID cdssdk.ObjectID `json:"objectID"`
Type string `json:"type"`
FileHash string `json:"fileHash"`
FileHash cdssdk.FileHash `json:"fileHash"`
LocationType string `json:"locationType"`
LocationName string `json:"locationName"`
}
@@ -123,43 +122,36 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) {
return
}

loadedHubIDs, err := s.svc.PackageSvc().GetLoadedNodes(1, details.Object.PackageID)
if err != nil {
log.Warnf("getting loaded nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed"))
return
}

var allHubIDs []cdssdk.HubID
allHubIDs = append(allHubIDs, details.PinnedAt...)
var allStgIDs []cdssdk.StorageID
allStgIDs = append(allStgIDs, details.PinnedAt...)
for _, b := range details.Blocks {
allHubIDs = append(allHubIDs, b.StorageID)
allStgIDs = append(allStgIDs, b.StorageID)
}
allHubIDs = append(allHubIDs, loadedHubIDs...)

allHubIDs = lo.Uniq(allHubIDs)
allStgIDs = lo.Uniq(allStgIDs)

getNodes, err := s.svc.NodeSvc().GetNodes(allHubIDs)
getStgs, err := s.svc.StorageSvc().GetDetails(allStgIDs)
if err != nil {
log.Warnf("getting nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed"))
return
}

allNodes := make(map[cdssdk.HubID]*cdssdk.Node)
for _, n := range getNodes {
n2 := n
allNodes[n.HubID] = &n2
allStgs := make(map[cdssdk.StorageID]cdssdk.Storage)
for _, n := range getStgs {
if n != nil {
allStgs[n.Storage.StorageID] = n.Storage
}
}

var blocks []ObjectBlockDetail

for _, hubID := range details.PinnedAt {
for _, stgID := range details.PinnedAt {
blocks = append(blocks, ObjectBlockDetail{
Type: "Rep",
FileHash: details.Object.FileHash,
LocationType: "Agent",
LocationName: allNodes[hubID].Name,
LocationName: allStgs[stgID].Name,
})
}

@@ -171,7 +163,7 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) {
Type: "Rep",
FileHash: blk.FileHash,
LocationType: "Agent",
LocationName: allNodes[blk.StorageID].Name,
LocationName: allStgs[blk.StorageID].Name,
})
}
}
@@ -182,7 +174,7 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) {
Type: "Rep",
FileHash: blk.FileHash,
LocationType: "Agent",
LocationName: allNodes[blk.StorageID].Name,
LocationName: allStgs[blk.StorageID].Name,
})
}
}
@@ -193,20 +185,11 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) {
Type: "Block",
FileHash: blk.FileHash,
LocationType: "Agent",
LocationName: allNodes[blk.StorageID].Name,
LocationName: allStgs[blk.StorageID].Name,
})
}
}

for _, hubID := range loadedHubIDs {
blocks = append(blocks, ObjectBlockDetail{
Type: "Rep",
FileHash: details.Object.FileHash,
LocationType: "Storage",
LocationName: allNodes[hubID].Name,
})
}

ctx.JSON(http.StatusOK, OK(TempGetObjectDetailResp{
Blocks: blocks,
}))
@@ -252,15 +235,25 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) {
return
}

nodes, err := s.svc.NodeSvc().GetNodes(nil)
var allStgIDs []cdssdk.StorageID
for _, obj := range db.Objects {
allStgIDs = append(allStgIDs, obj.PinnedAt...)
for _, blk := range obj.Blocks {
allStgIDs = append(allStgIDs, blk.StorageID)
}
}

getStgs, err := s.svc.StorageSvc().GetDetails(allStgIDs)
if err != nil {
log.Warnf("getting nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed"))
return
}
allNodes := make(map[cdssdk.HubID]cdssdk.Node)
for _, n := range nodes {
allNodes[n.HubID] = n
allStgs := make(map[cdssdk.StorageID]cdssdk.Storage)
for _, n := range getStgs {
if n != nil {
allStgs[n.Storage.StorageID] = n.Storage
}
}

bkts := make(map[cdssdk.BucketID]*BucketDetail)
@@ -274,25 +267,25 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) {

type PackageDetail struct {
Package cdssdk.Package
Loaded []cdssdk.Node
// Loaded []cdssdk.Node
}
pkgs := make(map[cdssdk.PackageID]*PackageDetail)
for _, pkg := range db.Packages {
p := PackageDetail{
Package: pkg,
Loaded: make([]cdssdk.Node, 0),
// Loaded: make([]cdssdk.Node, 0),
}

loaded, err := s.svc.PackageSvc().GetLoadedNodes(1, pkg.PackageID)
if err != nil {
log.Warnf("getting loaded nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed"))
return
}
// loaded, err := s.svc.PackageSvc().GetLoadedNodes(1, pkg.PackageID)
// if err != nil {
// log.Warnf("getting loaded nodes: %s", err.Error())
// ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed"))
// return
// }

for _, hubID := range loaded {
p.Loaded = append(p.Loaded, allNodes[hubID])
}
// for _, hubID := range loaded {
// p.Loaded = append(p.Loaded, allNodes[hubID])
// }

pkgs[pkg.PackageID] = &p
}
@@ -316,7 +309,7 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) {
Type: "Rep",
FileHash: obj.Object.FileHash,
LocationType: "Agent",
LocationName: allNodes[hubID].Name,
LocationName: allStgs[hubID].Name,
})
}

@@ -329,7 +322,7 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) {
Type: "Rep",
FileHash: blk.FileHash,
LocationType: "Agent",
LocationName: allNodes[blk.StorageID].Name,
LocationName: allStgs[blk.StorageID].Name,
})
}
}
@@ -341,7 +334,7 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) {
Type: "Rep",
FileHash: blk.FileHash,
LocationType: "Agent",
LocationName: allNodes[blk.StorageID].Name,
LocationName: allStgs[blk.StorageID].Name,
})
}
}
@@ -353,20 +346,20 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) {
Type: "Block",
FileHash: blk.FileHash,
LocationType: "Agent",
LocationName: allNodes[blk.StorageID].Name,
LocationName: allStgs[blk.StorageID].Name,
})
}
}

for _, node := range pkgs[obj.Object.PackageID].Loaded {
blocks = append(blocks, ObjectBlockDetail{
ObjectID: obj.Object.ObjectID,
Type: "Rep",
FileHash: obj.Object.FileHash,
LocationType: "Storage",
LocationName: allNodes[node.HubID].Name,
})
}
// for _, node := range pkgs[obj.Object.PackageID].Loaded {
// blocks = append(blocks, ObjectBlockDetail{
// ObjectID: obj.Object.ObjectID,
// Type: "Rep",
// FileHash: obj.Object.FileHash,
// LocationType: "Storage",
// LocationName: allNodes[node.HubID].Name,
// })
// }

}

@@ -390,4 +383,3 @@ func auth(ctx *gin.Context) {
ctx.AbortWithStatus(http.StatusUnauthorized)
}
}
*/

+ 17
- 1
client/internal/services/storage.go View File

@@ -10,6 +10,7 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy"
@@ -58,6 +59,21 @@ func (svc *StorageService) GetByName(userID cdssdk.UserID, name string) (*model.
return &getResp.Storage, nil
}

func (svc *StorageService) GetDetails(stgIDs []cdssdk.StorageID) ([]*stgmod.StorageDetail, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails(stgIDs))
if err != nil {
return nil, fmt.Errorf("request to coordinator: %w", err)
}

return getResp.Storages, nil
}

func (svc *StorageService) LoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID, rootPath string) error {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
@@ -103,7 +119,7 @@ func (svc *StorageService) LoadPackage(userID cdssdk.UserID, packageID cdssdk.Pa
return fmt.Errorf("unsupported download strategy: %T", strg)
}

ft.AddTo(ioswitch2.NewLoadToShared(*destStg.MasterHub, *destStg, path.Join(rootPath, obj.Object.Path)))
ft.AddTo(ioswitch2.NewLoadToPublic(*destStg.MasterHub, *destStg, path.Join(rootPath, obj.Object.Path)))
// 顺便保存到同存储服务的分片存储中
if factory.GetBuilder(*destStg).ShardStoreDesc().Enabled() {
ft.AddTo(ioswitch2.NewToShardStore(*destStg.MasterHub, *destStg, ioswitch2.RawStream(), ""))


+ 5
- 5
common/pkgs/ioswitch2/fromto.go View File

@@ -195,26 +195,26 @@ func (t *ToShardStore) GetRange() math2.Range {
return t.Range
}

type LoadToShared struct {
type LoadToPublic struct {
Hub cdssdk.Hub
Storage stgmod.StorageDetail
ObjectPath string
}

func NewLoadToShared(hub cdssdk.Hub, storage stgmod.StorageDetail, objectPath string) *LoadToShared {
return &LoadToShared{
func NewLoadToPublic(hub cdssdk.Hub, storage stgmod.StorageDetail, objectPath string) *LoadToPublic {
return &LoadToPublic{
Hub: hub,
Storage: storage,
ObjectPath: objectPath,
}
}

func (t *LoadToShared) GetStreamIndex() StreamIndex {
func (t *LoadToPublic) GetStreamIndex() StreamIndex {
return StreamIndex{
Type: StreamIndexRaw,
}
}

func (t *LoadToShared) GetRange() math2.Range {
func (t *LoadToPublic) GetRange() math2.Range {
return math2.Range{}
}

common/pkgs/ioswitch2/ops2/shared_store.go → common/pkgs/ioswitch2/ops2/public_store.go View File

@@ -13,29 +13,29 @@ import (
)

func init() {
exec.UseOp[*SharedLoad]()
exec.UseOp[*PublicLoad]()
}

type SharedLoad struct {
type PublicLoad struct {
Input exec.VarID
StorageID cdssdk.StorageID
ObjectPath string
}

func (o *SharedLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
func (o *PublicLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
logger.
WithField("Input", o.Input).
Debugf("load file to shared store")
defer logger.Debugf("load file to shared store finished")
Debugf("load file to public store")
defer logger.Debugf("load file to public store finished")

stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}

store, err := stgAgts.GetSharedStore(o.StorageID)
store, err := stgAgts.GetPublicStore(o.StorageID)
if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
return fmt.Errorf("getting public store of storage %v: %w", o.StorageID, err)
}

input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input)
@@ -47,19 +47,19 @@ func (o *SharedLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
return store.Write(o.ObjectPath, input.Stream)
}

func (o *SharedLoad) String() string {
return fmt.Sprintf("SharedLoad %v -> %v:%v", o.Input, o.StorageID, o.ObjectPath)
func (o *PublicLoad) String() string {
return fmt.Sprintf("PublicLoad %v -> %v:%v", o.Input, o.StorageID, o.ObjectPath)
}

type SharedLoadNode struct {
type PublicLoadNode struct {
dag.NodeBase
To ioswitch2.To
Storage stgmod.StorageDetail
ObjectPath string
}

func (b *GraphNodeBuilder) NewSharedLoad(to ioswitch2.To, stg stgmod.StorageDetail, objPath string) *SharedLoadNode {
node := &SharedLoadNode{
func (b *GraphNodeBuilder) NewPublicLoad(to ioswitch2.To, stg stgmod.StorageDetail, objPath string) *PublicLoadNode {
node := &PublicLoadNode{
To: to,
Storage: stg,
ObjectPath: objPath,
@@ -70,23 +70,23 @@ func (b *GraphNodeBuilder) NewSharedLoad(to ioswitch2.To, stg stgmod.StorageDeta
return node
}

func (t *SharedLoadNode) GetTo() ioswitch2.To {
func (t *PublicLoadNode) GetTo() ioswitch2.To {
return t.To
}

func (t *SharedLoadNode) SetInput(input *dag.StreamVar) {
func (t *PublicLoadNode) SetInput(input *dag.StreamVar) {
input.To(t, 0)
}

func (t *SharedLoadNode) Input() dag.StreamInputSlot {
func (t *PublicLoadNode) Input() dag.StreamInputSlot {
return dag.StreamInputSlot{
Node: t,
Index: 0,
}
}

func (t *SharedLoadNode) GenerateOp() (exec.Op, error) {
return &SharedLoad{
func (t *PublicLoadNode) GenerateOp() (exec.Op, error) {
return &PublicLoad{
Input: t.InputStreams().Get(0).VarID,
StorageID: t.Storage.Storage.StorageID,
ObjectPath: t.ObjectPath,

+ 2
- 2
common/pkgs/ioswitch2/parser/gen/generator.go View File

@@ -361,8 +361,8 @@ func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error)

return n, nil

case *ioswitch2.LoadToShared:
n := ctx.DAG.NewSharedLoad(t, t.Storage, t.ObjectPath)
case *ioswitch2.LoadToPublic:
n := ctx.DAG.NewPublicLoad(t, t.Storage, t.ObjectPath)

if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil {
return nil, err


+ 3
- 3
common/pkgs/storage/agtpool/pool.go View File

@@ -87,8 +87,8 @@ func (m *AgentPool) GetShardStore(stgID cdssdk.StorageID) (types.ShardStore, err
return stg.Agent.GetShardStore()
}

// 查找指定Storage的SharedStore组件
func (m *AgentPool) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, error) {
// 查找指定Storage的PublicStore组件
func (m *AgentPool) GetPublicStore(stgID cdssdk.StorageID) (types.PublicStore, error) {
m.lock.Lock()
defer m.lock.Unlock()

@@ -97,5 +97,5 @@ func (m *AgentPool) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, e
return nil, types.ErrStorageNotFound
}

return stg.Agent.GetSharedStore()
return stg.Agent.GetPublicStore()
}

+ 93
- 0
common/pkgs/storage/efile/ec_multiplier.go View File

@@ -0,0 +1,93 @@
package efile

import (
"fmt"
"net/url"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/os2"
"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type ECMultiplier struct {
token string
url string
feat *cdssdk.ECMultiplierFeature
outputs []string
completed bool
}

// 进行EC运算,coef * inputs。coef为编码矩阵,inputs为待编码数据,chunkSize为分块大小。
// 输出为每一个块文件的路径,数组长度 = len(coef)
func (m *ECMultiplier) Multiply(coef [][]byte, inputs []types.HTTPReqeust, chunkSize int64) ([]string, error) {
type Request struct {
Inputs []types.HTTPReqeust `json:"inputs"`
Outputs []string `json:"outputs"`
Coefs [][]byte `json:"coefs"`
ChunkSize int64 `json:"chunkSize"`
}
type Response struct {
Code string `json:"code"`
Msg string `json:"msg"`
}

fileName := os2.GenerateRandomFileName(10)
m.outputs = make([]string, len(coef))
for i := range m.outputs {
m.outputs[i] = fmt.Sprintf("%s_%d", fileName, i)
}

u, err := url.JoinPath(m.url, "efile/openapi/v2/createECTask")
if err != nil {
return nil, err
}

resp, err := http2.PostJSON(u, http2.RequestParam{
Header: map[string]string{"token": m.token},
Body: Request{
Inputs: inputs,
Outputs: m.outputs,
Coefs: coef,
ChunkSize: chunkSize,
},
})
if err != nil {
return nil, err
}

var r Response
err = serder.JSONToObjectStream(resp.Body, &r)
if err != nil {
return nil, err
}

if r.Code != "0" {
return nil, fmt.Errorf("code: %s, msg: %s", r.Code, r.Msg)
}

return m.outputs, nil
}

// 完成计算
func (m *ECMultiplier) Complete() {
m.completed = true
}

// 取消计算。如果已经调用了Complete,则应该无任何影响
func (m *ECMultiplier) Abort() {
if !m.completed {
u, err := url.JoinPath(m.url, "efile/openapi/v2/file/remove")
if err != nil {
return
}

for _, output := range m.outputs {
http2.PostJSON(u, http2.RequestParam{
Header: map[string]string{"token": m.token},
Query: map[string]string{"paths": output},
})
}
}
}

+ 109
- 0
common/pkgs/storage/efile/efile.go View File

@@ -0,0 +1,109 @@
package efile

import (
"fmt"
"net/url"
"sync"
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils"
)

func init() {
reg.RegisterBuilder[*cdssdk.EFileType](func(detail stgmod.StorageDetail) types.StorageBuilder {
return &builder{
detail: detail,
}
})
}

type builder struct {
types.EmptyBuilder
detail stgmod.StorageDetail
token string
tokenLock sync.Mutex
getTokenTime time.Time
}

func (b *builder) getToken() (string, error) {
stgType := b.detail.Storage.Type.(*cdssdk.EFileType)

b.tokenLock.Lock()
defer b.tokenLock.Unlock()

if b.token != "" {
dt := time.Since(b.getTokenTime)
if dt < time.Second*time.Duration(stgType.TokenExpire) {
return b.token, nil
}
}

u, err := url.JoinPath(stgType.TokenURL, "/ac/openapi/v2/tokens")
if err != nil {
return "", err
}

resp, err := http2.PostJSON(u, http2.RequestParam{
Header: map[string]string{
"user": stgType.User,
"password": stgType.Password,
"orgId": stgType.OrgID,
},
})
if err != nil {
return "", err
}

type Response struct {
Code string `json:"code"`
Msg string `json:"msg"`
Data []struct {
ClusterID string `json:"clusterId"`
Token string `json:"token"`
} `json:"data"`
}

var r Response
err = serder.JSONToObjectStream(resp.Body, &r)
if err != nil {
return "", err
}

if r.Code != "0" {
return "", fmt.Errorf("code:%s, msg:%s", r.Code, r.Msg)
}

for _, d := range r.Data {
if d.ClusterID == stgType.ClusterID {
b.token = d.Token
b.getTokenTime = time.Now()
return d.Token, nil
}
}

return "", fmt.Errorf("clusterID:%s not found", stgType.ClusterID)
}

func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) {
feat := utils.FindFeature[*cdssdk.ECMultiplierFeature](b.detail)
if feat == nil {
return nil, fmt.Errorf("feature ECMultiplier not found")
}

token, err := b.getToken()
if err != nil {
return nil, fmt.Errorf("get token: %v", err)
}

return &ECMultiplier{
token: token,
url: b.detail.Storage.Type.(*cdssdk.EFileType).APIURL,
feat: feat,
}, nil
}

+ 8
- 8
common/pkgs/storage/local/agent.go View File

@@ -8,7 +8,7 @@ import (
type agent struct {
Detail stgmod.StorageDetail
ShardStore *ShardStore
SharedStore *SharedStore
PublicStore *PublicStore
}

func (s *agent) Start(ch *types.StorageEventChan) {
@@ -16,8 +16,8 @@ func (s *agent) Start(ch *types.StorageEventChan) {
s.ShardStore.Start(ch)
}

if s.SharedStore != nil {
s.SharedStore.Start(ch)
if s.PublicStore != nil {
s.PublicStore.Start(ch)
}
}

@@ -26,8 +26,8 @@ func (s *agent) Stop() {
s.ShardStore.Stop()
}

if s.SharedStore != nil {
s.SharedStore.Stop()
if s.PublicStore != nil {
s.PublicStore.Stop()
}
}

@@ -43,10 +43,10 @@ func (a *agent) GetShardStore() (types.ShardStore, error) {
return a.ShardStore, nil
}

func (a *agent) GetSharedStore() (types.SharedStore, error) {
if a.SharedStore == nil {
func (a *agent) GetPublicStore() (types.PublicStore, error) {
if a.PublicStore == nil {
return nil, types.ErrUnsupported
}

return a.SharedStore, nil
return a.PublicStore, nil
}

+ 8
- 7
common/pkgs/storage/local/local.go View File

@@ -19,6 +19,7 @@ func init() {
}

type builder struct {
types.EmptyBuilder
detail stgmod.StorageDetail
}

@@ -41,18 +42,18 @@ func (b *builder) CreateAgent() (types.StorageAgent, error) {
agt.ShardStore = store
}

if b.detail.Storage.SharedStore != nil {
local, ok := b.detail.Storage.SharedStore.(*cdssdk.LocalSharedStorage)
if b.detail.Storage.PublicStore != nil {
local, ok := b.detail.Storage.PublicStore.(*cdssdk.LocalPublicStorage)
if !ok {
return nil, fmt.Errorf("invalid shared store type %T for local storage", b.detail.Storage.SharedStore)
return nil, fmt.Errorf("invalid public store type %T for local storage", b.detail.Storage.PublicStore)
}

store, err := NewSharedStore(agt, *local)
store, err := NewPublicStore(agt, *local)
if err != nil {
return nil, err
}

agt.SharedStore = store
agt.PublicStore = store
}

return agt, nil
@@ -62,8 +63,8 @@ func (b *builder) ShardStoreDesc() types.ShardStoreDesc {
return &ShardStoreDesc{builder: b}
}

func (b *builder) SharedStoreDesc() types.SharedStoreDesc {
return &SharedStoreDesc{builder: b}
func (b *builder) PublicStoreDesc() types.PublicStoreDesc {
return &PublicStoreDesc{builder: b}
}

func (b *builder) CreateMultiparter() (types.Multiparter, error) {


common/pkgs/storage/local/shared_store.go → common/pkgs/storage/local/public_store.go View File

@@ -10,39 +10,40 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type SharedStoreDesc struct {
type PublicStoreDesc struct {
types.EmptyPublicStoreDesc
builder *builder
}

func (d *SharedStoreDesc) Enabled() bool {
return d.builder.detail.Storage.SharedStore != nil
func (d *PublicStoreDesc) Enabled() bool {
return d.builder.detail.Storage.PublicStore != nil
}

func (d *SharedStoreDesc) HasBypassWrite() bool {
func (d *PublicStoreDesc) HasBypassWrite() bool {
return false
}

type SharedStore struct {
type PublicStore struct {
agt *agent
cfg cdssdk.LocalSharedStorage
cfg cdssdk.LocalPublicStorage
}

func NewSharedStore(agt *agent, cfg cdssdk.LocalSharedStorage) (*SharedStore, error) {
return &SharedStore{
func NewPublicStore(agt *agent, cfg cdssdk.LocalPublicStorage) (*PublicStore, error) {
return &PublicStore{
agt: agt,
cfg: cfg,
}, nil
}

func (s *SharedStore) Start(ch *types.StorageEventChan) {
func (s *PublicStore) Start(ch *types.StorageEventChan) {
s.getLogger().Infof("component start, LoadBase: %v", s.cfg.LoadBase)
}

func (s *SharedStore) Stop() {
func (s *PublicStore) Stop() {
s.getLogger().Infof("component stop")
}

func (s *SharedStore) Write(objPath string, stream io.Reader) error {
func (s *PublicStore) Write(objPath string, stream io.Reader) error {
fullPath := filepath.Join(s.cfg.LoadBase, objPath)
err := os.MkdirAll(filepath.Dir(fullPath), 0755)
if err != nil {
@@ -63,6 +64,6 @@ func (s *SharedStore) Write(objPath string, stream io.Reader) error {
return nil
}

func (s *SharedStore) getLogger() logger.Logger {
return logger.WithField("SharedStore", "Local").WithField("Storage", s.agt.Detail.Storage.String())
func (s *PublicStore) getLogger() logger.Logger {
return logger.WithField("PublicStore", "Local").WithField("Storage", s.agt.Detail.Storage.String())
}

+ 1
- 0
common/pkgs/storage/local/shard_store.go View File

@@ -23,6 +23,7 @@ const (
)

type ShardStoreDesc struct {
types.EmptyShardStoreDesc
builder *builder
}



+ 75
- 0
common/pkgs/storage/mashup/mashup.go View File

@@ -0,0 +1,75 @@
package mashup

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func init() {
reg.RegisterBuilder[*cdssdk.MashupStorageType](func(detail stgmod.StorageDetail) types.StorageBuilder {
return &builder{
detail: detail,
}
})
}

type builder struct {
detail stgmod.StorageDetail
}

func (b *builder) CreateAgent() (types.StorageAgent, error) {
stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType)
detail := b.detail
detail.Storage.Type = stgType.Agent

blder := factory.GetBuilder(detail)
return blder.CreateAgent()
}

func (b *builder) ShardStoreDesc() types.ShardStoreDesc {
stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType)
detail := b.detail
detail.Storage.Type = stgType.Agent

blder := factory.GetBuilder(detail)
return blder.ShardStoreDesc()
}

func (b *builder) PublicStoreDesc() types.PublicStoreDesc {
stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType)
detail := b.detail
detail.Storage.Type = stgType.Agent

blder := factory.GetBuilder(detail)
return blder.PublicStoreDesc()
}

func (b *builder) CreateMultiparter() (types.Multiparter, error) {
stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType)
detail := b.detail
detail.Storage.Type = stgType.Feature

blder := factory.GetBuilder(detail)
return blder.CreateMultiparter()
}

func (b *builder) CreateS2STransfer() (types.S2STransfer, error) {
stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType)
detail := b.detail
detail.Storage.Type = stgType.Feature

blder := factory.GetBuilder(detail)
return blder.CreateS2STransfer()
}

func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) {
stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType)
detail := b.detail
detail.Storage.Type = stgType.Feature

blder := factory.GetBuilder(detail)
return blder.CreateECMultiplier()
}

+ 1
- 1
common/pkgs/storage/s3/agent.go View File

@@ -34,6 +34,6 @@ func (a *Agent) GetShardStore() (types.ShardStore, error) {
return a.ShardStore, nil
}

func (a *Agent) GetSharedStore() (types.SharedStore, error) {
func (a *Agent) GetPublicStore() (types.PublicStore, error) {
return nil, types.ErrUnsupported
}

+ 2
- 0
common/pkgs/storage/s3/obs/s2s.go View File

@@ -170,5 +170,7 @@ func (s *S2STransfer) Abort() {
s.omsCli.DeleteTask(&model.DeleteTaskRequest{
TaskId: fmt.Sprintf("%v", *s.taskID),
})

// TODO 清理临时文件
}
}

+ 7
- 0
common/pkgs/storage/s3/public_store.go View File

@@ -0,0 +1,7 @@
package s3

import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"

type PublicStoreDesc struct {
types.EmptyPublicStoreDesc
}

+ 2
- 2
common/pkgs/storage/s3/s3.go View File

@@ -63,8 +63,8 @@ func (b *builder) ShardStoreDesc() types.ShardStoreDesc {
return &ShardStoreDesc{builder: b}
}

func (b *builder) SharedStoreDesc() types.SharedStoreDesc {
return &SharedStoreDesc{}
func (b *builder) PublicStoreDesc() types.PublicStoreDesc {
return &PublicStoreDesc{}
}

func (b *builder) CreateMultiparter() (types.Multiparter, error) {


+ 1
- 0
common/pkgs/storage/s3/shard_store.go View File

@@ -27,6 +27,7 @@ const (
)

type ShardStoreDesc struct {
types.EmptyShardStoreDesc
builder *builder
}



+ 0
- 12
common/pkgs/storage/s3/shared_store.go View File

@@ -1,12 +0,0 @@
package s3

type SharedStoreDesc struct {
}

func (d *SharedStoreDesc) Enabled() bool {
return false
}

func (d *SharedStoreDesc) HasBypassWrite() bool {
return false
}

+ 12
- 0
common/pkgs/storage/types/bypass.go View File

@@ -27,3 +27,15 @@ type BypassFilePath struct {
type BypassRead interface {
BypassRead(fileHash cdssdk.FileHash) (BypassFilePath, error)
}

// 能通过一个Http请求直接访问文件
// 仅用于分片存储。
type HTTPBypassRead interface {
HTTPBypassRead(fileHash cdssdk.FileHash) (HTTPReqeust, error)
}

type HTTPReqeust struct {
SignedUrl string `json:"signedUrl"`
Header map[string]string `json:"header"`
Body string `json:"body"`
}

+ 11
- 0
common/pkgs/storage/types/ec_multiplier.go View File

@@ -0,0 +1,11 @@
package types

type ECMultiplier interface {
// 进行EC运算,coef * inputs。coef为编码矩阵,inputs为待编码数据,chunkSize为分块大小。
// 输出为每一个块文件的路径,数组长度 = len(coef)
Multiply(coef [][]byte, inputs []HTTPReqeust, chunkSize int64) ([]string, error)
// 完成计算
Complete()
// 取消计算。如果已经调用了Complete,则应该无任何影响
Abort()
}

+ 13
- 5
common/pkgs/storage/types/empty_builder.go View File

@@ -19,8 +19,8 @@ func (b *EmptyBuilder) ShardStoreDesc() ShardStoreDesc {
return &EmptyShardStoreDesc{}
}

func (b *EmptyBuilder) SharedStoreDesc() SharedStoreDesc {
return &EmptySharedStoreDesc{}
func (b *EmptyBuilder) PublicStoreDesc() PublicStoreDesc {
return &EmptyPublicStoreDesc{}
}

// 创建一个分片上传组件
@@ -32,6 +32,10 @@ func (b *EmptyBuilder) CreateS2STransfer() (S2STransfer, error) {
return nil, fmt.Errorf("create s2s transfer for %T: %w", b.Detail.Storage.Type, ErrUnsupported)
}

func (b *EmptyBuilder) CreateECMultiplier() (ECMultiplier, error) {
return nil, fmt.Errorf("create ec multiplier for %T: %w", b.Detail.Storage.Type, ErrUnsupported)
}

type EmptyShardStoreDesc struct {
}

@@ -47,13 +51,17 @@ func (d *EmptyShardStoreDesc) HasBypassRead() bool {
return false
}

type EmptySharedStoreDesc struct {
func (d *EmptyShardStoreDesc) HasBypassHTTPRead() bool {
return false
}

type EmptyPublicStoreDesc struct {
}

func (d *EmptySharedStoreDesc) Enabled() bool {
func (d *EmptyPublicStoreDesc) Enabled() bool {
return false
}

func (d *EmptySharedStoreDesc) HasBypassWrite() bool {
func (d *EmptyPublicStoreDesc) HasBypassWrite() bool {
return false
}

common/pkgs/storage/types/shared_store.go → common/pkgs/storage/types/public_store.go View File

@@ -4,7 +4,7 @@ import (
"io"
)

type SharedStore interface {
type PublicStore interface {
Start(ch *StorageEventChan)
Stop()


+ 6
- 3
common/pkgs/storage/types/types.go View File

@@ -31,7 +31,7 @@ type StorageAgent interface {
// 获取分片存储服务
GetShardStore() (ShardStore, error)
// 获取共享存储服务
GetSharedStore() (SharedStore, error)
GetPublicStore() (PublicStore, error)
}

// 创建存储服务的指定组件。
@@ -45,11 +45,12 @@ type StorageBuilder interface {
// 是否支持分片存储服务
ShardStoreDesc() ShardStoreDesc
// 是否支持共享存储服务
SharedStoreDesc() SharedStoreDesc
PublicStoreDesc() PublicStoreDesc
// 创建一个分片上传组件
CreateMultiparter() (Multiparter, error)
// 创建一个存储服务直传组件
CreateS2STransfer() (S2STransfer, error)
CreateECMultiplier() (ECMultiplier, error)
}

type ShardStoreDesc interface {
@@ -59,9 +60,11 @@ type ShardStoreDesc interface {
HasBypassWrite() bool
// 是否能旁路读取
HasBypassRead() bool
// 是否能通过HTTP读取
HasBypassHTTPRead() bool
}

type SharedStoreDesc interface {
type PublicStoreDesc interface {
// 是否已启动
Enabled() bool
// 是否能旁路上传


+ 8
- 0
common/pkgs/sysevent/config.go View File

@@ -0,0 +1,8 @@
package sysevent

type Config struct {
Address string `json:"address"`
Account string `json:"account"`
Password string `json:"password"`
VHost string `json:"vhost"`
}

+ 119
- 0
common/pkgs/sysevent/publisher.go View File

@@ -0,0 +1,119 @@
package sysevent

import (
"fmt"

"github.com/streadway/amqp"
"gitlink.org.cn/cloudream/common/pkgs/async"
"gitlink.org.cn/cloudream/common/utils/serder"
)

type PublisherEvent interface{}

type PublisherExited struct {
Err error
}

type PublishError struct {
Err error
}

type OtherError struct {
Err error
}

type Publisher struct {
connection *amqp.Connection
channel *amqp.Channel
eventChan *async.UnboundChannel[SysEvent]
thisSource Source
}

func NewPublisher(cfg Config, thisSource Source) (*Publisher, error) {
config := amqp.Config{
Vhost: cfg.VHost,
}

url := fmt.Sprintf("amqp://%s:%s@%s", cfg.Account, cfg.Password, cfg.Address)
connection, err := amqp.DialConfig(url, config)
if err != nil {
return nil, err
}

channel, err := connection.Channel()
if err != nil {
connection.Close()
return nil, fmt.Errorf("openning channel on connection: %w", err)
}

_, err = channel.QueueDeclare(
SysEventQueueName,
false,
true,
false,
false,
nil,
)
if err != nil {
return nil, fmt.Errorf("declare queue: %w", err)
}

pub := &Publisher{
connection: connection,
channel: channel,
eventChan: async.NewUnboundChannel[SysEvent](),
thisSource: thisSource,
}

return pub, nil
}

func (p *Publisher) Start() *async.UnboundChannel[PublisherEvent] {
ch := async.NewUnboundChannel[PublisherEvent]()
go func() {
defer ch.Close()
defer p.channel.Close()
defer p.connection.Close()

for {
event := <-p.eventChan.Receive().Chan()
if event.Err != nil {
if event.Err == async.ErrChannelClosed {
ch.Send(PublisherExited{Err: nil})
} else {
ch.Send(PublisherExited{Err: event.Err})
}
return
}

eventData, err := serder.ObjectToJSONEx(event.Value)
if err != nil {
ch.Send(OtherError{Err: fmt.Errorf("serialize event data: %w", err)})
continue
}

err = p.channel.Publish("", SysEventQueueName, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: eventData,
Expiration: "60000", // 消息超时时间默认1分钟
})
if err != nil {
ch.Send(PublishError{Err: err})
continue
}
}
}()

return ch
}

// Publish 发布事件,自动补齐时间戳和源信息
func (p *Publisher) Publish(evt SysEvent) {
// TODO 补齐时间戳和源信息
p.eventChan.Send(evt)
}

// PublishRaw 完全原样发布事件,不补齐任何信息
func (p *Publisher) PublishRaw(evt SysEvent) {
p.eventChan.Send(evt)
}

+ 9
- 0
common/pkgs/sysevent/sysevent.go View File

@@ -0,0 +1,9 @@
package sysevent

const (
SysEventQueueName = "SysEventQueue"
)

type SysEvent = any // TODO 换成具体的类型

type Source = any // TODO 换成具体的类型

+ 121
- 0
common/pkgs/sysevent/watcher.go View File

@@ -0,0 +1,121 @@
package sysevent

import (
"fmt"
"sync"

"github.com/streadway/amqp"
"gitlink.org.cn/cloudream/common/pkgs/async"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/serder"
)

type Watcher interface {
OnEvent(event SysEvent)
}

type WatcherEvent interface{}

type WatcherExited struct {
Err error
}

type WatcherHost struct {
watchers []Watcher
lock sync.Mutex
connection *amqp.Connection
channel *amqp.Channel
recvChan <-chan amqp.Delivery
}

func NewWatcherHost(cfg Config) (*WatcherHost, error) {
config := amqp.Config{
Vhost: cfg.VHost,
}

url := fmt.Sprintf("amqp://%s:%s@%s", cfg.Account, cfg.Password, cfg.Address)
connection, err := amqp.DialConfig(url, config)
if err != nil {
return nil, err
}

channel, err := connection.Channel()
if err != nil {
connection.Close()
return nil, fmt.Errorf("openning channel on connection: %w", err)
}

_, err = channel.QueueDeclare(
SysEventQueueName,
false,
true,
false,
false,
nil,
)
if err != nil {
channel.Close()
connection.Close()
return nil, fmt.Errorf("declare queue: %w", err)
}

recvChan, err := channel.Consume(SysEventQueueName, "", true, false, true, false, nil)
if err != nil {
channel.Close()
connection.Close()
return nil, fmt.Errorf("consume queue: %w", err)
}

wat := &WatcherHost{
connection: connection,
channel: channel,
recvChan: recvChan,
}

return wat, nil
}

func (w *WatcherHost) Start() *async.UnboundChannel[WatcherEvent] {
ch := async.NewUnboundChannel[WatcherEvent]()

go func() {
defer ch.Close()
defer w.channel.Close()
defer w.connection.Close()

for m := range w.recvChan {
evt, err := serder.JSONToObjectEx[SysEvent](m.Body)
if err != nil {
ch.Send(OtherError{Err: fmt.Errorf("deserialize event: %w", err)})
continue
}

w.lock.Lock()
ws := make([]Watcher, 0, len(w.watchers))
ws = append(ws, w.watchers...)
w.lock.Unlock()

for _, w := range ws {
w.OnEvent(evt)
}
}

ch.Send(WatcherExited{Err: nil})
}()

return ch
}

func (w *WatcherHost) AddWatcher(watcher Watcher) {
w.lock.Lock()
defer w.lock.Unlock()

w.watchers = append(w.watchers, watcher)
}

func (w *WatcherHost) RemoveWatcher(watcher Watcher) {
w.lock.Lock()
defer w.lock.Unlock()

w.watchers = lo2.Remove(w.watchers, watcher)
}

+ 1
- 1
common/pkgs/uploader/create_load.go View File

@@ -45,7 +45,7 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err
ft.AddFrom(fromExec)
for i, stg := range u.targetStgs {
ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg, ioswitch2.RawStream(), "fileHash"))
ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg, path.Join(u.loadRoots[i], pa)))
ft.AddTo(ioswitch2.NewLoadToPublic(*stg.MasterHub, stg, path.Join(u.loadRoots[i], pa)))
stgIDs = append(stgIDs, stg.Storage.StorageID)
}



+ 1
- 1
common/pkgs/uploader/update.go View File

@@ -51,7 +51,7 @@ func (w *UpdateUploader) Upload(pat string, size int64, stream io.Reader) error
AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "fileHash"))

for i, stg := range w.loadToStgs {
ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg, path.Join(w.loadToPath[i], pat)))
ft.AddTo(ioswitch2.NewLoadToPublic(*stg.MasterHub, stg, path.Join(w.loadToPath[i], pat)))
}

plans := exec.NewPlanBuilder()


+ 2
- 2
common/pkgs/uploader/uploader.go View File

@@ -86,8 +86,8 @@ func (u *Uploader) BeginUpdate(userID cdssdk.UserID, pkgID cdssdk.PackageID, aff
if stg.MasterHub == nil {
return nil, fmt.Errorf("load to storage %v has no master hub", stgID)
}
if factory.GetBuilder(stg).ShardStoreDesc().Enabled() {
return nil, fmt.Errorf("load to storage %v has no shared store", stgID)
if !factory.GetBuilder(stg).PublicStoreDesc().Enabled() {
return nil, fmt.Errorf("load to storage %v has no public store", stgID)
}

loadToStgs[i] = stg


Loading…
Cancel
Save