Browse Source

SharedStore重名为PublicStore

gitlink
Sydonian 10 months ago
parent
commit
ce77bda8ed
18 changed files with 77 additions and 77 deletions
  1. +1
    -1
      client/internal/services/storage.go
  2. +5
    -5
      common/pkgs/ioswitch2/fromto.go
  3. +17
    -17
      common/pkgs/ioswitch2/ops2/shared_store.go
  4. +2
    -2
      common/pkgs/ioswitch2/parser/gen/generator.go
  5. +3
    -3
      common/pkgs/storage/agtpool/pool.go
  6. +8
    -8
      common/pkgs/storage/local/agent.go
  7. +7
    -7
      common/pkgs/storage/local/local.go
  8. +14
    -14
      common/pkgs/storage/local/shared_store.go
  9. +2
    -2
      common/pkgs/storage/mashup/mashup.go
  10. +1
    -1
      common/pkgs/storage/s3/agent.go
  11. +2
    -2
      common/pkgs/storage/s3/s3.go
  12. +2
    -2
      common/pkgs/storage/s3/shared_store.go
  13. +5
    -5
      common/pkgs/storage/types/empty_builder.go
  14. +1
    -1
      common/pkgs/storage/types/shared_store.go
  15. +3
    -3
      common/pkgs/storage/types/types.go
  16. +1
    -1
      common/pkgs/uploader/create_load.go
  17. +1
    -1
      common/pkgs/uploader/update.go
  18. +2
    -2
      common/pkgs/uploader/uploader.go

+ 1
- 1
client/internal/services/storage.go View File

@@ -119,7 +119,7 @@ func (svc *StorageService) LoadPackage(userID cdssdk.UserID, packageID cdssdk.Pa
return fmt.Errorf("unsupported download strategy: %T", strg) return fmt.Errorf("unsupported download strategy: %T", strg)
} }


ft.AddTo(ioswitch2.NewLoadToShared(*destStg.MasterHub, *destStg, path.Join(rootPath, obj.Object.Path)))
ft.AddTo(ioswitch2.NewLoadToPublic(*destStg.MasterHub, *destStg, path.Join(rootPath, obj.Object.Path)))
// 顺便保存到同存储服务的分片存储中 // 顺便保存到同存储服务的分片存储中
if factory.GetBuilder(*destStg).ShardStoreDesc().Enabled() { if factory.GetBuilder(*destStg).ShardStoreDesc().Enabled() {
ft.AddTo(ioswitch2.NewToShardStore(*destStg.MasterHub, *destStg, ioswitch2.RawStream(), "")) ft.AddTo(ioswitch2.NewToShardStore(*destStg.MasterHub, *destStg, ioswitch2.RawStream(), ""))


+ 5
- 5
common/pkgs/ioswitch2/fromto.go View File

@@ -195,26 +195,26 @@ func (t *ToShardStore) GetRange() math2.Range {
return t.Range return t.Range
} }


type LoadToShared struct {
type LoadToPublic struct {
Hub cdssdk.Hub Hub cdssdk.Hub
Storage stgmod.StorageDetail Storage stgmod.StorageDetail
ObjectPath string ObjectPath string
} }


func NewLoadToShared(hub cdssdk.Hub, storage stgmod.StorageDetail, objectPath string) *LoadToShared {
return &LoadToShared{
func NewLoadToPublic(hub cdssdk.Hub, storage stgmod.StorageDetail, objectPath string) *LoadToPublic {
return &LoadToPublic{
Hub: hub, Hub: hub,
Storage: storage, Storage: storage,
ObjectPath: objectPath, ObjectPath: objectPath,
} }
} }


func (t *LoadToShared) GetStreamIndex() StreamIndex {
func (t *LoadToPublic) GetStreamIndex() StreamIndex {
return StreamIndex{ return StreamIndex{
Type: StreamIndexRaw, Type: StreamIndexRaw,
} }
} }


func (t *LoadToShared) GetRange() math2.Range {
func (t *LoadToPublic) GetRange() math2.Range {
return math2.Range{} return math2.Range{}
} }

+ 17
- 17
common/pkgs/ioswitch2/ops2/shared_store.go View File

@@ -13,29 +13,29 @@ import (
) )


func init() { func init() {
exec.UseOp[*SharedLoad]()
exec.UseOp[*PublicLoad]()
} }


type SharedLoad struct {
type PublicLoad struct {
Input exec.VarID Input exec.VarID
StorageID cdssdk.StorageID StorageID cdssdk.StorageID
ObjectPath string ObjectPath string
} }


func (o *SharedLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
func (o *PublicLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
logger. logger.
WithField("Input", o.Input). WithField("Input", o.Input).
Debugf("load file to shared store")
defer logger.Debugf("load file to shared store finished")
Debugf("load file to public store")
defer logger.Debugf("load file to public store finished")


stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx)
if err != nil { if err != nil {
return fmt.Errorf("getting storage manager: %w", err) return fmt.Errorf("getting storage manager: %w", err)
} }


store, err := stgAgts.GetSharedStore(o.StorageID)
store, err := stgAgts.GetPublicStore(o.StorageID)
if err != nil { if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
return fmt.Errorf("getting public store of storage %v: %w", o.StorageID, err)
} }


input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input)
@@ -47,19 +47,19 @@ func (o *SharedLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
return store.Write(o.ObjectPath, input.Stream) return store.Write(o.ObjectPath, input.Stream)
} }


func (o *SharedLoad) String() string {
return fmt.Sprintf("SharedLoad %v -> %v:%v", o.Input, o.StorageID, o.ObjectPath)
func (o *PublicLoad) String() string {
return fmt.Sprintf("PublicLoad %v -> %v:%v", o.Input, o.StorageID, o.ObjectPath)
} }


type SharedLoadNode struct {
type PublicLoadNode struct {
dag.NodeBase dag.NodeBase
To ioswitch2.To To ioswitch2.To
Storage stgmod.StorageDetail Storage stgmod.StorageDetail
ObjectPath string ObjectPath string
} }


func (b *GraphNodeBuilder) NewSharedLoad(to ioswitch2.To, stg stgmod.StorageDetail, objPath string) *SharedLoadNode {
node := &SharedLoadNode{
func (b *GraphNodeBuilder) NewPublicLoad(to ioswitch2.To, stg stgmod.StorageDetail, objPath string) *PublicLoadNode {
node := &PublicLoadNode{
To: to, To: to,
Storage: stg, Storage: stg,
ObjectPath: objPath, ObjectPath: objPath,
@@ -70,23 +70,23 @@ func (b *GraphNodeBuilder) NewSharedLoad(to ioswitch2.To, stg stgmod.StorageDeta
return node return node
} }


func (t *SharedLoadNode) GetTo() ioswitch2.To {
func (t *PublicLoadNode) GetTo() ioswitch2.To {
return t.To return t.To
} }


func (t *SharedLoadNode) SetInput(input *dag.StreamVar) {
func (t *PublicLoadNode) SetInput(input *dag.StreamVar) {
input.To(t, 0) input.To(t, 0)
} }


func (t *SharedLoadNode) Input() dag.StreamInputSlot {
func (t *PublicLoadNode) Input() dag.StreamInputSlot {
return dag.StreamInputSlot{ return dag.StreamInputSlot{
Node: t, Node: t,
Index: 0, Index: 0,
} }
} }


func (t *SharedLoadNode) GenerateOp() (exec.Op, error) {
return &SharedLoad{
func (t *PublicLoadNode) GenerateOp() (exec.Op, error) {
return &PublicLoad{
Input: t.InputStreams().Get(0).VarID, Input: t.InputStreams().Get(0).VarID,
StorageID: t.Storage.Storage.StorageID, StorageID: t.Storage.Storage.StorageID,
ObjectPath: t.ObjectPath, ObjectPath: t.ObjectPath,


+ 2
- 2
common/pkgs/ioswitch2/parser/gen/generator.go View File

@@ -361,8 +361,8 @@ func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error)


return n, nil return n, nil


case *ioswitch2.LoadToShared:
n := ctx.DAG.NewSharedLoad(t, t.Storage, t.ObjectPath)
case *ioswitch2.LoadToPublic:
n := ctx.DAG.NewPublicLoad(t, t.Storage, t.ObjectPath)


if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil { if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil {
return nil, err return nil, err


+ 3
- 3
common/pkgs/storage/agtpool/pool.go View File

@@ -87,8 +87,8 @@ func (m *AgentPool) GetShardStore(stgID cdssdk.StorageID) (types.ShardStore, err
return stg.Agent.GetShardStore() return stg.Agent.GetShardStore()
} }


// 查找指定Storage的SharedStore组件
func (m *AgentPool) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, error) {
// 查找指定Storage的PublicStore组件
func (m *AgentPool) GetPublicStore(stgID cdssdk.StorageID) (types.PublicStore, error) {
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock() defer m.lock.Unlock()


@@ -97,5 +97,5 @@ func (m *AgentPool) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, e
return nil, types.ErrStorageNotFound return nil, types.ErrStorageNotFound
} }


return stg.Agent.GetSharedStore()
return stg.Agent.GetPublicStore()
} }

+ 8
- 8
common/pkgs/storage/local/agent.go View File

@@ -8,7 +8,7 @@ import (
type agent struct { type agent struct {
Detail stgmod.StorageDetail Detail stgmod.StorageDetail
ShardStore *ShardStore ShardStore *ShardStore
SharedStore *SharedStore
PublicStore *PublicStore
} }


func (s *agent) Start(ch *types.StorageEventChan) { func (s *agent) Start(ch *types.StorageEventChan) {
@@ -16,8 +16,8 @@ func (s *agent) Start(ch *types.StorageEventChan) {
s.ShardStore.Start(ch) s.ShardStore.Start(ch)
} }


if s.SharedStore != nil {
s.SharedStore.Start(ch)
if s.PublicStore != nil {
s.PublicStore.Start(ch)
} }
} }


@@ -26,8 +26,8 @@ func (s *agent) Stop() {
s.ShardStore.Stop() s.ShardStore.Stop()
} }


if s.SharedStore != nil {
s.SharedStore.Stop()
if s.PublicStore != nil {
s.PublicStore.Stop()
} }
} }


@@ -43,10 +43,10 @@ func (a *agent) GetShardStore() (types.ShardStore, error) {
return a.ShardStore, nil return a.ShardStore, nil
} }


func (a *agent) GetSharedStore() (types.SharedStore, error) {
if a.SharedStore == nil {
func (a *agent) GetPublicStore() (types.PublicStore, error) {
if a.PublicStore == nil {
return nil, types.ErrUnsupported return nil, types.ErrUnsupported
} }


return a.SharedStore, nil
return a.PublicStore, nil
} }

+ 7
- 7
common/pkgs/storage/local/local.go View File

@@ -42,18 +42,18 @@ func (b *builder) CreateAgent() (types.StorageAgent, error) {
agt.ShardStore = store agt.ShardStore = store
} }


if b.detail.Storage.SharedStore != nil {
local, ok := b.detail.Storage.SharedStore.(*cdssdk.LocalSharedStorage)
if b.detail.Storage.PublicStore != nil {
local, ok := b.detail.Storage.PublicStore.(*cdssdk.LocalPublicStorage)
if !ok { if !ok {
return nil, fmt.Errorf("invalid shared store type %T for local storage", b.detail.Storage.SharedStore)
return nil, fmt.Errorf("invalid public store type %T for local storage", b.detail.Storage.PublicStore)
} }


store, err := NewSharedStore(agt, *local)
store, err := NewPublicStore(agt, *local)
if err != nil { if err != nil {
return nil, err return nil, err
} }


agt.SharedStore = store
agt.PublicStore = store
} }


return agt, nil return agt, nil
@@ -63,8 +63,8 @@ func (b *builder) ShardStoreDesc() types.ShardStoreDesc {
return &ShardStoreDesc{builder: b} return &ShardStoreDesc{builder: b}
} }


func (b *builder) SharedStoreDesc() types.SharedStoreDesc {
return &SharedStoreDesc{builder: b}
func (b *builder) PublicStoreDesc() types.PublicStoreDesc {
return &PublicStoreDesc{builder: b}
} }


func (b *builder) CreateMultiparter() (types.Multiparter, error) { func (b *builder) CreateMultiparter() (types.Multiparter, error) {


+ 14
- 14
common/pkgs/storage/local/shared_store.go View File

@@ -10,40 +10,40 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
) )


type SharedStoreDesc struct {
types.EmptySharedStoreDesc
type PublicStoreDesc struct {
types.EmptyPublicStoreDesc
builder *builder builder *builder
} }


func (d *SharedStoreDesc) Enabled() bool {
return d.builder.detail.Storage.SharedStore != nil
func (d *PublicStoreDesc) Enabled() bool {
return d.builder.detail.Storage.PublicStore != nil
} }


func (d *SharedStoreDesc) HasBypassWrite() bool {
func (d *PublicStoreDesc) HasBypassWrite() bool {
return false return false
} }


type SharedStore struct {
type PublicStore struct {
agt *agent agt *agent
cfg cdssdk.LocalSharedStorage
cfg cdssdk.LocalPublicStorage
} }


func NewSharedStore(agt *agent, cfg cdssdk.LocalSharedStorage) (*SharedStore, error) {
return &SharedStore{
func NewPublicStore(agt *agent, cfg cdssdk.LocalPublicStorage) (*PublicStore, error) {
return &PublicStore{
agt: agt, agt: agt,
cfg: cfg, cfg: cfg,
}, nil }, nil
} }


func (s *SharedStore) Start(ch *types.StorageEventChan) {
func (s *PublicStore) Start(ch *types.StorageEventChan) {
s.getLogger().Infof("component start, LoadBase: %v", s.cfg.LoadBase) s.getLogger().Infof("component start, LoadBase: %v", s.cfg.LoadBase)
} }


func (s *SharedStore) Stop() {
func (s *PublicStore) Stop() {
s.getLogger().Infof("component stop") s.getLogger().Infof("component stop")
} }


func (s *SharedStore) Write(objPath string, stream io.Reader) error {
func (s *PublicStore) Write(objPath string, stream io.Reader) error {
fullPath := filepath.Join(s.cfg.LoadBase, objPath) fullPath := filepath.Join(s.cfg.LoadBase, objPath)
err := os.MkdirAll(filepath.Dir(fullPath), 0755) err := os.MkdirAll(filepath.Dir(fullPath), 0755)
if err != nil { if err != nil {
@@ -64,6 +64,6 @@ func (s *SharedStore) Write(objPath string, stream io.Reader) error {
return nil return nil
} }


func (s *SharedStore) getLogger() logger.Logger {
return logger.WithField("SharedStore", "Local").WithField("Storage", s.agt.Detail.Storage.String())
func (s *PublicStore) getLogger() logger.Logger {
return logger.WithField("PublicStore", "Local").WithField("Storage", s.agt.Detail.Storage.String())
} }

+ 2
- 2
common/pkgs/storage/mashup/mashup.go View File

@@ -38,13 +38,13 @@ func (b *builder) ShardStoreDesc() types.ShardStoreDesc {
return blder.ShardStoreDesc() return blder.ShardStoreDesc()
} }


func (b *builder) SharedStoreDesc() types.SharedStoreDesc {
func (b *builder) PublicStoreDesc() types.PublicStoreDesc {
stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType)
detail := b.detail detail := b.detail
detail.Storage.Type = stgType.Agent detail.Storage.Type = stgType.Agent


blder := factory.GetBuilder(detail) blder := factory.GetBuilder(detail)
return blder.SharedStoreDesc()
return blder.PublicStoreDesc()
} }


func (b *builder) CreateMultiparter() (types.Multiparter, error) { func (b *builder) CreateMultiparter() (types.Multiparter, error) {


+ 1
- 1
common/pkgs/storage/s3/agent.go View File

@@ -34,6 +34,6 @@ func (a *Agent) GetShardStore() (types.ShardStore, error) {
return a.ShardStore, nil return a.ShardStore, nil
} }


func (a *Agent) GetSharedStore() (types.SharedStore, error) {
func (a *Agent) GetPublicStore() (types.PublicStore, error) {
return nil, types.ErrUnsupported return nil, types.ErrUnsupported
} }

+ 2
- 2
common/pkgs/storage/s3/s3.go View File

@@ -63,8 +63,8 @@ func (b *builder) ShardStoreDesc() types.ShardStoreDesc {
return &ShardStoreDesc{builder: b} return &ShardStoreDesc{builder: b}
} }


func (b *builder) SharedStoreDesc() types.SharedStoreDesc {
return &SharedStoreDesc{}
func (b *builder) PublicStoreDesc() types.PublicStoreDesc {
return &PublicStoreDesc{}
} }


func (b *builder) CreateMultiparter() (types.Multiparter, error) { func (b *builder) CreateMultiparter() (types.Multiparter, error) {


+ 2
- 2
common/pkgs/storage/s3/shared_store.go View File

@@ -2,6 +2,6 @@ package s3


import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"


type SharedStoreDesc struct {
types.EmptySharedStoreDesc
type PublicStoreDesc struct {
types.EmptyPublicStoreDesc
} }

+ 5
- 5
common/pkgs/storage/types/empty_builder.go View File

@@ -19,8 +19,8 @@ func (b *EmptyBuilder) ShardStoreDesc() ShardStoreDesc {
return &EmptyShardStoreDesc{} return &EmptyShardStoreDesc{}
} }


func (b *EmptyBuilder) SharedStoreDesc() SharedStoreDesc {
return &EmptySharedStoreDesc{}
func (b *EmptyBuilder) PublicStoreDesc() PublicStoreDesc {
return &EmptyPublicStoreDesc{}
} }


// 创建一个分片上传组件 // 创建一个分片上传组件
@@ -55,13 +55,13 @@ func (d *EmptyShardStoreDesc) HasBypassHTTPRead() bool {
return false return false
} }


type EmptySharedStoreDesc struct {
type EmptyPublicStoreDesc struct {
} }


func (d *EmptySharedStoreDesc) Enabled() bool {
func (d *EmptyPublicStoreDesc) Enabled() bool {
return false return false
} }


func (d *EmptySharedStoreDesc) HasBypassWrite() bool {
func (d *EmptyPublicStoreDesc) HasBypassWrite() bool {
return false return false
} }

+ 1
- 1
common/pkgs/storage/types/shared_store.go View File

@@ -4,7 +4,7 @@ import (
"io" "io"
) )


type SharedStore interface {
type PublicStore interface {
Start(ch *StorageEventChan) Start(ch *StorageEventChan)
Stop() Stop()




+ 3
- 3
common/pkgs/storage/types/types.go View File

@@ -31,7 +31,7 @@ type StorageAgent interface {
// 获取分片存储服务 // 获取分片存储服务
GetShardStore() (ShardStore, error) GetShardStore() (ShardStore, error)
// 获取共享存储服务 // 获取共享存储服务
GetSharedStore() (SharedStore, error)
GetPublicStore() (PublicStore, error)
} }


// 创建存储服务的指定组件。 // 创建存储服务的指定组件。
@@ -45,7 +45,7 @@ type StorageBuilder interface {
// 是否支持分片存储服务 // 是否支持分片存储服务
ShardStoreDesc() ShardStoreDesc ShardStoreDesc() ShardStoreDesc
// 是否支持共享存储服务 // 是否支持共享存储服务
SharedStoreDesc() SharedStoreDesc
PublicStoreDesc() PublicStoreDesc
// 创建一个分片上传组件 // 创建一个分片上传组件
CreateMultiparter() (Multiparter, error) CreateMultiparter() (Multiparter, error)
// 创建一个存储服务直传组件 // 创建一个存储服务直传组件
@@ -64,7 +64,7 @@ type ShardStoreDesc interface {
HasBypassHTTPRead() bool HasBypassHTTPRead() bool
} }


type SharedStoreDesc interface {
type PublicStoreDesc interface {
// 是否已启动 // 是否已启动
Enabled() bool Enabled() bool
// 是否能旁路上传 // 是否能旁路上传


+ 1
- 1
common/pkgs/uploader/create_load.go View File

@@ -45,7 +45,7 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err
ft.AddFrom(fromExec) ft.AddFrom(fromExec)
for i, stg := range u.targetStgs { for i, stg := range u.targetStgs {
ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg, ioswitch2.RawStream(), "fileHash")) ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg, ioswitch2.RawStream(), "fileHash"))
ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg, path.Join(u.loadRoots[i], pa)))
ft.AddTo(ioswitch2.NewLoadToPublic(*stg.MasterHub, stg, path.Join(u.loadRoots[i], pa)))
stgIDs = append(stgIDs, stg.Storage.StorageID) stgIDs = append(stgIDs, stg.Storage.StorageID)
} }




+ 1
- 1
common/pkgs/uploader/update.go View File

@@ -51,7 +51,7 @@ func (w *UpdateUploader) Upload(pat string, size int64, stream io.Reader) error
AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "fileHash")) AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "fileHash"))


for i, stg := range w.loadToStgs { for i, stg := range w.loadToStgs {
ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg, path.Join(w.loadToPath[i], pat)))
ft.AddTo(ioswitch2.NewLoadToPublic(*stg.MasterHub, stg, path.Join(w.loadToPath[i], pat)))
} }


plans := exec.NewPlanBuilder() plans := exec.NewPlanBuilder()


+ 2
- 2
common/pkgs/uploader/uploader.go View File

@@ -86,8 +86,8 @@ func (u *Uploader) BeginUpdate(userID cdssdk.UserID, pkgID cdssdk.PackageID, aff
if stg.MasterHub == nil { if stg.MasterHub == nil {
return nil, fmt.Errorf("load to storage %v has no master hub", stgID) return nil, fmt.Errorf("load to storage %v has no master hub", stgID)
} }
if !factory.GetBuilder(stg).SharedStoreDesc().Enabled() {
return nil, fmt.Errorf("load to storage %v has no shared store", stgID)
if !factory.GetBuilder(stg).PublicStoreDesc().Enabled() {
return nil, fmt.Errorf("load to storage %v has no public store", stgID)
} }


loadToStgs[i] = stg loadToStgs[i] = stg


Loading…
Cancel
Save