From 9b78962ebe01c84fc04fba1dda1e4d747d33794e Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 16 Jan 2025 10:00:33 +0800 Subject: [PATCH 1/7] =?UTF-8?q?=E6=81=A2=E5=A4=8D=E4=B8=AD=E6=9C=9F?= =?UTF-8?q?=E6=A3=80=E6=9F=A5=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/http/server.go | 2 +- client/internal/http/temp.go | 116 +++++++++++++--------------- client/internal/services/storage.go | 16 ++++ 3 files changed, 71 insertions(+), 63 deletions(-) diff --git a/client/internal/http/server.go b/client/internal/http/server.go index 4594910..8f0216a 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -43,7 +43,7 @@ func (s *Server) Serve() error { func (s *Server) initRouters() { rt := s.engine.Use() - // initTemp(rt, s) + initTemp(rt, s) s.routeV1(s.engine) diff --git a/client/internal/http/temp.go b/client/internal/http/temp.go index 3c0b06e..9b46e44 100644 --- a/client/internal/http/temp.go +++ b/client/internal/http/temp.go @@ -1,6 +1,5 @@ package http -/* import ( "net/http" @@ -96,7 +95,7 @@ type TempGetObjectDetailResp struct { type ObjectBlockDetail struct { ObjectID cdssdk.ObjectID `json:"objectID"` Type string `json:"type"` - FileHash string `json:"fileHash"` + FileHash cdssdk.FileHash `json:"fileHash"` LocationType string `json:"locationType"` LocationName string `json:"locationName"` } @@ -123,43 +122,36 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) { return } - loadedHubIDs, err := s.svc.PackageSvc().GetLoadedNodes(1, details.Object.PackageID) - if err != nil { - log.Warnf("getting loaded nodes: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed")) - return - } - - var allHubIDs []cdssdk.HubID - allHubIDs = append(allHubIDs, details.PinnedAt...) + var allStgIDs []cdssdk.StorageID + allStgIDs = append(allStgIDs, details.PinnedAt...) for _, b := range details.Blocks { - allHubIDs = append(allHubIDs, b.StorageID) + allStgIDs = append(allStgIDs, b.StorageID) } - allHubIDs = append(allHubIDs, loadedHubIDs...) - allHubIDs = lo.Uniq(allHubIDs) + allStgIDs = lo.Uniq(allStgIDs) - getNodes, err := s.svc.NodeSvc().GetNodes(allHubIDs) + getStgs, err := s.svc.StorageSvc().GetDetails(allStgIDs) if err != nil { log.Warnf("getting nodes: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed")) return } - allNodes := make(map[cdssdk.HubID]*cdssdk.Node) - for _, n := range getNodes { - n2 := n - allNodes[n.HubID] = &n2 + allStgs := make(map[cdssdk.StorageID]cdssdk.Storage) + for _, n := range getStgs { + if n != nil { + allStgs[n.Storage.StorageID] = n.Storage + } } var blocks []ObjectBlockDetail - for _, hubID := range details.PinnedAt { + for _, stgID := range details.PinnedAt { blocks = append(blocks, ObjectBlockDetail{ Type: "Rep", FileHash: details.Object.FileHash, LocationType: "Agent", - LocationName: allNodes[hubID].Name, + LocationName: allStgs[stgID].Name, }) } @@ -171,7 +163,7 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) { Type: "Rep", FileHash: blk.FileHash, LocationType: "Agent", - LocationName: allNodes[blk.StorageID].Name, + LocationName: allStgs[blk.StorageID].Name, }) } } @@ -182,7 +174,7 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) { Type: "Rep", FileHash: blk.FileHash, LocationType: "Agent", - LocationName: allNodes[blk.StorageID].Name, + LocationName: allStgs[blk.StorageID].Name, }) } } @@ -193,20 +185,11 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) { Type: "Block", FileHash: blk.FileHash, LocationType: "Agent", - LocationName: allNodes[blk.StorageID].Name, + LocationName: allStgs[blk.StorageID].Name, }) } } - for _, hubID := range loadedHubIDs { - blocks = append(blocks, ObjectBlockDetail{ - Type: "Rep", - FileHash: details.Object.FileHash, - LocationType: "Storage", - LocationName: allNodes[hubID].Name, - }) - } - ctx.JSON(http.StatusOK, OK(TempGetObjectDetailResp{ Blocks: blocks, })) @@ -252,15 +235,25 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) { return } - nodes, err := s.svc.NodeSvc().GetNodes(nil) + var allStgIDs []cdssdk.StorageID + for _, obj := range db.Objects { + allStgIDs = append(allStgIDs, obj.PinnedAt...) + for _, blk := range obj.Blocks { + allStgIDs = append(allStgIDs, blk.StorageID) + } + } + + getStgs, err := s.svc.StorageSvc().GetDetails(allStgIDs) if err != nil { log.Warnf("getting nodes: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed")) return } - allNodes := make(map[cdssdk.HubID]cdssdk.Node) - for _, n := range nodes { - allNodes[n.HubID] = n + allStgs := make(map[cdssdk.StorageID]cdssdk.Storage) + for _, n := range getStgs { + if n != nil { + allStgs[n.Storage.StorageID] = n.Storage + } } bkts := make(map[cdssdk.BucketID]*BucketDetail) @@ -274,25 +267,25 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) { type PackageDetail struct { Package cdssdk.Package - Loaded []cdssdk.Node + // Loaded []cdssdk.Node } pkgs := make(map[cdssdk.PackageID]*PackageDetail) for _, pkg := range db.Packages { p := PackageDetail{ Package: pkg, - Loaded: make([]cdssdk.Node, 0), + // Loaded: make([]cdssdk.Node, 0), } - loaded, err := s.svc.PackageSvc().GetLoadedNodes(1, pkg.PackageID) - if err != nil { - log.Warnf("getting loaded nodes: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed")) - return - } + // loaded, err := s.svc.PackageSvc().GetLoadedNodes(1, pkg.PackageID) + // if err != nil { + // log.Warnf("getting loaded nodes: %s", err.Error()) + // ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed")) + // return + // } - for _, hubID := range loaded { - p.Loaded = append(p.Loaded, allNodes[hubID]) - } + // for _, hubID := range loaded { + // p.Loaded = append(p.Loaded, allNodes[hubID]) + // } pkgs[pkg.PackageID] = &p } @@ -316,7 +309,7 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) { Type: "Rep", FileHash: obj.Object.FileHash, LocationType: "Agent", - LocationName: allNodes[hubID].Name, + LocationName: allStgs[hubID].Name, }) } @@ -329,7 +322,7 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) { Type: "Rep", FileHash: blk.FileHash, LocationType: "Agent", - LocationName: allNodes[blk.StorageID].Name, + LocationName: allStgs[blk.StorageID].Name, }) } } @@ -341,7 +334,7 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) { Type: "Rep", FileHash: blk.FileHash, LocationType: "Agent", - LocationName: allNodes[blk.StorageID].Name, + LocationName: allStgs[blk.StorageID].Name, }) } } @@ -353,20 +346,20 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) { Type: "Block", FileHash: blk.FileHash, LocationType: "Agent", - LocationName: allNodes[blk.StorageID].Name, + LocationName: allStgs[blk.StorageID].Name, }) } } - for _, node := range pkgs[obj.Object.PackageID].Loaded { - blocks = append(blocks, ObjectBlockDetail{ - ObjectID: obj.Object.ObjectID, - Type: "Rep", - FileHash: obj.Object.FileHash, - LocationType: "Storage", - LocationName: allNodes[node.HubID].Name, - }) - } + // for _, node := range pkgs[obj.Object.PackageID].Loaded { + // blocks = append(blocks, ObjectBlockDetail{ + // ObjectID: obj.Object.ObjectID, + // Type: "Rep", + // FileHash: obj.Object.FileHash, + // LocationType: "Storage", + // LocationName: allNodes[node.HubID].Name, + // }) + // } } @@ -390,4 +383,3 @@ func auth(ctx *gin.Context) { ctx.AbortWithStatus(http.StatusUnauthorized) } } -*/ diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index 046de63..df437fc 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -10,6 +10,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy" @@ -58,6 +59,21 @@ func (svc *StorageService) GetByName(userID cdssdk.UserID, name string) (*model. return &getResp.Storage, nil } +func (svc *StorageService) GetDetails(stgIDs []cdssdk.StorageID) ([]*stgmod.StorageDetail, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails(stgIDs)) + if err != nil { + return nil, fmt.Errorf("request to coordinator: %w", err) + } + + return getResp.Storages, nil +} + func (svc *StorageService) LoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID, rootPath string) error { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { From f8ad8b63159da1d37fdefe77ef765146b3fff61b Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 16 Jan 2025 17:06:27 +0800 Subject: [PATCH 2/7] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/uploader/uploader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/pkgs/uploader/uploader.go b/common/pkgs/uploader/uploader.go index 30ea971..ad46d85 100644 --- a/common/pkgs/uploader/uploader.go +++ b/common/pkgs/uploader/uploader.go @@ -86,7 +86,7 @@ func (u *Uploader) BeginUpdate(userID cdssdk.UserID, pkgID cdssdk.PackageID, aff if stg.MasterHub == nil { return nil, fmt.Errorf("load to storage %v has no master hub", stgID) } - if factory.GetBuilder(stg).ShardStoreDesc().Enabled() { + if !factory.GetBuilder(stg).SharedStoreDesc().Enabled() { return nil, fmt.Errorf("load to storage %v has no shared store", stgID) } From bc1d26253d3f665ba67f8853254f519d003a7c90 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 17 Jan 2025 10:01:45 +0800 Subject: [PATCH 3/7] =?UTF-8?q?=E5=A2=9E=E5=8A=A0sysevent=E6=A8=A1?= =?UTF-8?q?=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/sysevent/config.go | 8 ++ common/pkgs/sysevent/publisher.go | 119 +++++++++++++++++++++++++++++ common/pkgs/sysevent/sysevent.go | 9 +++ common/pkgs/sysevent/watcher.go | 121 ++++++++++++++++++++++++++++++ 4 files changed, 257 insertions(+) create mode 100644 common/pkgs/sysevent/config.go create mode 100644 common/pkgs/sysevent/publisher.go create mode 100644 common/pkgs/sysevent/sysevent.go create mode 100644 common/pkgs/sysevent/watcher.go diff --git a/common/pkgs/sysevent/config.go b/common/pkgs/sysevent/config.go new file mode 100644 index 0000000..dfe1bf5 --- /dev/null +++ b/common/pkgs/sysevent/config.go @@ -0,0 +1,8 @@ +package sysevent + +type Config struct { + Address string `json:"address"` + Account string `json:"account"` + Password string `json:"password"` + VHost string `json:"vhost"` +} diff --git a/common/pkgs/sysevent/publisher.go b/common/pkgs/sysevent/publisher.go new file mode 100644 index 0000000..5b42c93 --- /dev/null +++ b/common/pkgs/sysevent/publisher.go @@ -0,0 +1,119 @@ +package sysevent + +import ( + "fmt" + + "github.com/streadway/amqp" + "gitlink.org.cn/cloudream/common/pkgs/async" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type PublisherEvent interface{} + +type PublisherExited struct { + Err error +} + +type PublishError struct { + Err error +} + +type OtherError struct { + Err error +} + +type Publisher struct { + connection *amqp.Connection + channel *amqp.Channel + eventChan *async.UnboundChannel[SysEvent] + thisSource Source +} + +func NewPublisher(cfg Config, thisSource Source) (*Publisher, error) { + config := amqp.Config{ + Vhost: cfg.VHost, + } + + url := fmt.Sprintf("amqp://%s:%s@%s", cfg.Account, cfg.Password, cfg.Address) + connection, err := amqp.DialConfig(url, config) + if err != nil { + return nil, err + } + + channel, err := connection.Channel() + if err != nil { + connection.Close() + return nil, fmt.Errorf("openning channel on connection: %w", err) + } + + _, err = channel.QueueDeclare( + SysEventQueueName, + false, + true, + false, + false, + nil, + ) + if err != nil { + return nil, fmt.Errorf("declare queue: %w", err) + } + + pub := &Publisher{ + connection: connection, + channel: channel, + eventChan: async.NewUnboundChannel[SysEvent](), + thisSource: thisSource, + } + + return pub, nil +} + +func (p *Publisher) Start() *async.UnboundChannel[PublisherEvent] { + ch := async.NewUnboundChannel[PublisherEvent]() + go func() { + defer ch.Close() + defer p.channel.Close() + defer p.connection.Close() + + for { + event := <-p.eventChan.Receive().Chan() + if event.Err != nil { + if event.Err == async.ErrChannelClosed { + ch.Send(PublisherExited{Err: nil}) + } else { + ch.Send(PublisherExited{Err: event.Err}) + } + return + } + + eventData, err := serder.ObjectToJSONEx(event.Value) + if err != nil { + ch.Send(OtherError{Err: fmt.Errorf("serialize event data: %w", err)}) + continue + } + + err = p.channel.Publish("", SysEventQueueName, false, false, amqp.Publishing{ + ContentType: "text/plain", + Body: eventData, + Expiration: "60000", // 消息超时时间默认1分钟 + }) + if err != nil { + ch.Send(PublishError{Err: err}) + continue + } + } + }() + + return ch +} + +// Publish 发布事件,自动补齐时间戳和源信息 +func (p *Publisher) Publish(evt SysEvent) { + // TODO 补齐时间戳和源信息 + p.eventChan.Send(evt) +} + +// PublishRaw 完全原样发布事件,不补齐任何信息 +func (p *Publisher) PublishRaw(evt SysEvent) { + p.eventChan.Send(evt) +} diff --git a/common/pkgs/sysevent/sysevent.go b/common/pkgs/sysevent/sysevent.go new file mode 100644 index 0000000..bbc9afa --- /dev/null +++ b/common/pkgs/sysevent/sysevent.go @@ -0,0 +1,9 @@ +package sysevent + +const ( + SysEventQueueName = "SysEventQueue" +) + +type SysEvent = any + +type Source = any diff --git a/common/pkgs/sysevent/watcher.go b/common/pkgs/sysevent/watcher.go new file mode 100644 index 0000000..6ffbd72 --- /dev/null +++ b/common/pkgs/sysevent/watcher.go @@ -0,0 +1,121 @@ +package sysevent + +import ( + "fmt" + "sync" + + "github.com/streadway/amqp" + "gitlink.org.cn/cloudream/common/pkgs/async" + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type Watcher interface { + OnEvent(event SysEvent) +} + +type WatcherEvent interface{} + +type WatcherExited struct { + Err error +} + +type WatcherHost struct { + watchers []Watcher + lock sync.Mutex + connection *amqp.Connection + channel *amqp.Channel + recvChan <-chan amqp.Delivery +} + +func NewWatcherHost(cfg Config) (*WatcherHost, error) { + config := amqp.Config{ + Vhost: cfg.VHost, + } + + url := fmt.Sprintf("amqp://%s:%s@%s", cfg.Account, cfg.Password, cfg.Address) + connection, err := amqp.DialConfig(url, config) + if err != nil { + return nil, err + } + + channel, err := connection.Channel() + if err != nil { + connection.Close() + return nil, fmt.Errorf("openning channel on connection: %w", err) + } + + _, err = channel.QueueDeclare( + SysEventQueueName, + false, + true, + false, + false, + nil, + ) + if err != nil { + channel.Close() + connection.Close() + return nil, fmt.Errorf("declare queue: %w", err) + } + + recvChan, err := channel.Consume(SysEventQueueName, "", true, false, true, false, nil) + if err != nil { + channel.Close() + connection.Close() + return nil, fmt.Errorf("consume queue: %w", err) + } + + wat := &WatcherHost{ + connection: connection, + channel: channel, + recvChan: recvChan, + } + + return wat, nil +} + +func (w *WatcherHost) Start() *async.UnboundChannel[WatcherEvent] { + ch := async.NewUnboundChannel[WatcherEvent]() + + go func() { + defer ch.Close() + defer w.channel.Close() + defer w.connection.Close() + + for m := range w.recvChan { + evt, err := serder.JSONToObjectEx[SysEvent](m.Body) + if err != nil { + ch.Send(OtherError{Err: fmt.Errorf("deserialize event: %w", err)}) + continue + } + + w.lock.Lock() + ws := make([]Watcher, 0, len(w.watchers)) + ws = append(ws, w.watchers...) + w.lock.Unlock() + + for _, w := range ws { + w.OnEvent(evt) + } + } + + ch.Send(WatcherExited{Err: nil}) + }() + + return ch +} + +func (w *WatcherHost) AddWatcher(watcher Watcher) { + w.lock.Lock() + defer w.lock.Unlock() + + w.watchers = append(w.watchers, watcher) +} + +func (w *WatcherHost) RemoveWatcher(watcher Watcher) { + w.lock.Lock() + defer w.lock.Unlock() + + w.watchers = lo2.Remove(w.watchers, watcher) +} From 04267f90e23783e13bdc144c68f2a07abe38d563 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 17 Jan 2025 10:06:18 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=B8=80=E4=BA=9B?= =?UTF-8?q?=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/sysevent/sysevent.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/pkgs/sysevent/sysevent.go b/common/pkgs/sysevent/sysevent.go index bbc9afa..082b93e 100644 --- a/common/pkgs/sysevent/sysevent.go +++ b/common/pkgs/sysevent/sysevent.go @@ -4,6 +4,6 @@ const ( SysEventQueueName = "SysEventQueue" ) -type SysEvent = any +type SysEvent = any // TODO 换成具体的类型 -type Source = any +type Source = any // TODO 换成具体的类型 From c9274a2c5cb7ffc73f5210a729a7f6e87590f51f Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 20 Jan 2025 10:59:36 +0800 Subject: [PATCH 5/7] =?UTF-8?q?=E5=A2=9E=E5=8A=A0ECMultiplier=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/storage/efile/ec_multiplier.go | 93 ++++++++++++++++++ common/pkgs/storage/efile/efile.go | 109 +++++++++++++++++++++ common/pkgs/storage/local/local.go | 1 + common/pkgs/storage/local/shard_store.go | 1 + common/pkgs/storage/local/shared_store.go | 1 + common/pkgs/storage/mashup/mashup.go | 75 ++++++++++++++ common/pkgs/storage/s3/obs/s2s.go | 2 + common/pkgs/storage/s3/shard_store.go | 1 + common/pkgs/storage/s3/shared_store.go | 11 +-- common/pkgs/storage/types/bypass.go | 12 +++ common/pkgs/storage/types/ec_multiplier.go | 11 +++ common/pkgs/storage/types/empty_builder.go | 8 ++ common/pkgs/storage/types/types.go | 3 + 13 files changed, 320 insertions(+), 8 deletions(-) create mode 100644 common/pkgs/storage/efile/ec_multiplier.go create mode 100644 common/pkgs/storage/efile/efile.go create mode 100644 common/pkgs/storage/mashup/mashup.go create mode 100644 common/pkgs/storage/types/ec_multiplier.go diff --git a/common/pkgs/storage/efile/ec_multiplier.go b/common/pkgs/storage/efile/ec_multiplier.go new file mode 100644 index 0000000..739c337 --- /dev/null +++ b/common/pkgs/storage/efile/ec_multiplier.go @@ -0,0 +1,93 @@ +package efile + +import ( + "fmt" + "net/url" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/os2" + "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +type ECMultiplier struct { + token string + url string + feat *cdssdk.ECMultiplierFeature + outputs []string + completed bool +} + +// 进行EC运算,coef * inputs。coef为编码矩阵,inputs为待编码数据,chunkSize为分块大小。 +// 输出为每一个块文件的路径,数组长度 = len(coef) +func (m *ECMultiplier) Multiply(coef [][]byte, inputs []types.HTTPReqeust, chunkSize int64) ([]string, error) { + type Request struct { + Inputs []types.HTTPReqeust `json:"inputs"` + Outputs []string `json:"outputs"` + Coefs [][]byte `json:"coefs"` + ChunkSize int64 `json:"chunkSize"` + } + type Response struct { + Code string `json:"code"` + Msg string `json:"msg"` + } + + fileName := os2.GenerateRandomFileName(10) + m.outputs = make([]string, len(coef)) + for i := range m.outputs { + m.outputs[i] = fmt.Sprintf("%s_%d", fileName, i) + } + + u, err := url.JoinPath(m.url, "efile/openapi/v2/createECTask") + if err != nil { + return nil, err + } + + resp, err := http2.PostJSON(u, http2.RequestParam{ + Header: map[string]string{"token": m.token}, + Body: Request{ + Inputs: inputs, + Outputs: m.outputs, + Coefs: coef, + ChunkSize: chunkSize, + }, + }) + if err != nil { + return nil, err + } + + var r Response + err = serder.JSONToObjectStream(resp.Body, &r) + if err != nil { + return nil, err + } + + if r.Code != "0" { + return nil, fmt.Errorf("code: %s, msg: %s", r.Code, r.Msg) + } + + return m.outputs, nil +} + +// 完成计算 +func (m *ECMultiplier) Complete() { + m.completed = true +} + +// 取消计算。如果已经调用了Complete,则应该无任何影响 +func (m *ECMultiplier) Abort() { + if !m.completed { + u, err := url.JoinPath(m.url, "efile/openapi/v2/file/remove") + if err != nil { + return + } + + for _, output := range m.outputs { + http2.PostJSON(u, http2.RequestParam{ + Header: map[string]string{"token": m.token}, + Query: map[string]string{"paths": output}, + }) + } + } +} diff --git a/common/pkgs/storage/efile/efile.go b/common/pkgs/storage/efile/efile.go new file mode 100644 index 0000000..af869d6 --- /dev/null +++ b/common/pkgs/storage/efile/efile.go @@ -0,0 +1,109 @@ +package efile + +import ( + "fmt" + "net/url" + "sync" + "time" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/serder" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils" +) + +func init() { + reg.RegisterBuilder[*cdssdk.EFileType](func(detail stgmod.StorageDetail) types.StorageBuilder { + return &builder{ + detail: detail, + } + }) +} + +type builder struct { + types.EmptyBuilder + detail stgmod.StorageDetail + token string + tokenLock sync.Mutex + getTokenTime time.Time +} + +func (b *builder) getToken() (string, error) { + stgType := b.detail.Storage.Type.(*cdssdk.EFileType) + + b.tokenLock.Lock() + defer b.tokenLock.Unlock() + + if b.token != "" { + dt := time.Since(b.getTokenTime) + if dt < time.Second*time.Duration(stgType.TokenExpire) { + return b.token, nil + } + } + + u, err := url.JoinPath(stgType.TokenURL, "/ac/openapi/v2/tokens") + if err != nil { + return "", err + } + + resp, err := http2.PostJSON(u, http2.RequestParam{ + Header: map[string]string{ + "user": stgType.User, + "password": stgType.Password, + "orgId": stgType.OrgID, + }, + }) + if err != nil { + return "", err + } + + type Response struct { + Code string `json:"code"` + Msg string `json:"msg"` + Data []struct { + ClusterID string `json:"clusterId"` + Token string `json:"token"` + } `json:"data"` + } + + var r Response + err = serder.JSONToObjectStream(resp.Body, &r) + if err != nil { + return "", err + } + + if r.Code != "0" { + return "", fmt.Errorf("code:%s, msg:%s", r.Code, r.Msg) + } + + for _, d := range r.Data { + if d.ClusterID == stgType.ClusterID { + b.token = d.Token + b.getTokenTime = time.Now() + return d.Token, nil + } + } + + return "", fmt.Errorf("clusterID:%s not found", stgType.ClusterID) +} + +func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) { + feat := utils.FindFeature[*cdssdk.ECMultiplierFeature](b.detail) + if feat == nil { + return nil, fmt.Errorf("feature ECMultiplier not found") + } + + token, err := b.getToken() + if err != nil { + return nil, fmt.Errorf("get token: %v", err) + } + + return &ECMultiplier{ + token: token, + url: b.detail.Storage.Type.(*cdssdk.EFileType).APIURL, + feat: feat, + }, nil +} diff --git a/common/pkgs/storage/local/local.go b/common/pkgs/storage/local/local.go index ca49db8..1084275 100644 --- a/common/pkgs/storage/local/local.go +++ b/common/pkgs/storage/local/local.go @@ -19,6 +19,7 @@ func init() { } type builder struct { + types.EmptyBuilder detail stgmod.StorageDetail } diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 278cd2b..28ed46a 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -23,6 +23,7 @@ const ( ) type ShardStoreDesc struct { + types.EmptyShardStoreDesc builder *builder } diff --git a/common/pkgs/storage/local/shared_store.go b/common/pkgs/storage/local/shared_store.go index bfd9730..87eb45c 100644 --- a/common/pkgs/storage/local/shared_store.go +++ b/common/pkgs/storage/local/shared_store.go @@ -11,6 +11,7 @@ import ( ) type SharedStoreDesc struct { + types.EmptySharedStoreDesc builder *builder } diff --git a/common/pkgs/storage/mashup/mashup.go b/common/pkgs/storage/mashup/mashup.go new file mode 100644 index 0000000..882aaff --- /dev/null +++ b/common/pkgs/storage/mashup/mashup.go @@ -0,0 +1,75 @@ +package mashup + +import ( + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" +) + +func init() { + reg.RegisterBuilder[*cdssdk.MashupStorageType](func(detail stgmod.StorageDetail) types.StorageBuilder { + return &builder{ + detail: detail, + } + }) +} + +type builder struct { + detail stgmod.StorageDetail +} + +func (b *builder) CreateAgent() (types.StorageAgent, error) { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Agent + + blder := factory.GetBuilder(detail) + return blder.CreateAgent() +} + +func (b *builder) ShardStoreDesc() types.ShardStoreDesc { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Agent + + blder := factory.GetBuilder(detail) + return blder.ShardStoreDesc() +} + +func (b *builder) SharedStoreDesc() types.SharedStoreDesc { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Agent + + blder := factory.GetBuilder(detail) + return blder.SharedStoreDesc() +} + +func (b *builder) CreateMultiparter() (types.Multiparter, error) { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Feature + + blder := factory.GetBuilder(detail) + return blder.CreateMultiparter() +} + +func (b *builder) CreateS2STransfer() (types.S2STransfer, error) { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Feature + + blder := factory.GetBuilder(detail) + return blder.CreateS2STransfer() +} + +func (b *builder) CreateECMultiplier() (types.ECMultiplier, error) { + stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) + detail := b.detail + detail.Storage.Type = stgType.Feature + + blder := factory.GetBuilder(detail) + return blder.CreateECMultiplier() +} diff --git a/common/pkgs/storage/s3/obs/s2s.go b/common/pkgs/storage/s3/obs/s2s.go index 5c28b84..6317092 100644 --- a/common/pkgs/storage/s3/obs/s2s.go +++ b/common/pkgs/storage/s3/obs/s2s.go @@ -170,5 +170,7 @@ func (s *S2STransfer) Abort() { s.omsCli.DeleteTask(&model.DeleteTaskRequest{ TaskId: fmt.Sprintf("%v", *s.taskID), }) + + // TODO 清理临时文件 } } diff --git a/common/pkgs/storage/s3/shard_store.go b/common/pkgs/storage/s3/shard_store.go index 7185476..aa98e5f 100644 --- a/common/pkgs/storage/s3/shard_store.go +++ b/common/pkgs/storage/s3/shard_store.go @@ -27,6 +27,7 @@ const ( ) type ShardStoreDesc struct { + types.EmptyShardStoreDesc builder *builder } diff --git a/common/pkgs/storage/s3/shared_store.go b/common/pkgs/storage/s3/shared_store.go index bb822e8..c0e74e3 100644 --- a/common/pkgs/storage/s3/shared_store.go +++ b/common/pkgs/storage/s3/shared_store.go @@ -1,12 +1,7 @@ package s3 -type SharedStoreDesc struct { -} +import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" -func (d *SharedStoreDesc) Enabled() bool { - return false -} - -func (d *SharedStoreDesc) HasBypassWrite() bool { - return false +type SharedStoreDesc struct { + types.EmptySharedStoreDesc } diff --git a/common/pkgs/storage/types/bypass.go b/common/pkgs/storage/types/bypass.go index 5e8d23f..9760559 100644 --- a/common/pkgs/storage/types/bypass.go +++ b/common/pkgs/storage/types/bypass.go @@ -27,3 +27,15 @@ type BypassFilePath struct { type BypassRead interface { BypassRead(fileHash cdssdk.FileHash) (BypassFilePath, error) } + +// 能通过一个Http请求直接访问文件 +// 仅用于分片存储。 +type HTTPBypassRead interface { + HTTPBypassRead(fileHash cdssdk.FileHash) (HTTPReqeust, error) +} + +type HTTPReqeust struct { + SignedUrl string `json:"signedUrl"` + Header map[string]string `json:"header"` + Body string `json:"body"` +} diff --git a/common/pkgs/storage/types/ec_multiplier.go b/common/pkgs/storage/types/ec_multiplier.go new file mode 100644 index 0000000..c9d8d7b --- /dev/null +++ b/common/pkgs/storage/types/ec_multiplier.go @@ -0,0 +1,11 @@ +package types + +type ECMultiplier interface { + // 进行EC运算,coef * inputs。coef为编码矩阵,inputs为待编码数据,chunkSize为分块大小。 + // 输出为每一个块文件的路径,数组长度 = len(coef) + Multiply(coef [][]byte, inputs []HTTPReqeust, chunkSize int64) ([]string, error) + // 完成计算 + Complete() + // 取消计算。如果已经调用了Complete,则应该无任何影响 + Abort() +} diff --git a/common/pkgs/storage/types/empty_builder.go b/common/pkgs/storage/types/empty_builder.go index be0178c..692d020 100644 --- a/common/pkgs/storage/types/empty_builder.go +++ b/common/pkgs/storage/types/empty_builder.go @@ -32,6 +32,10 @@ func (b *EmptyBuilder) CreateS2STransfer() (S2STransfer, error) { return nil, fmt.Errorf("create s2s transfer for %T: %w", b.Detail.Storage.Type, ErrUnsupported) } +func (b *EmptyBuilder) CreateECMultiplier() (ECMultiplier, error) { + return nil, fmt.Errorf("create ec multiplier for %T: %w", b.Detail.Storage.Type, ErrUnsupported) +} + type EmptyShardStoreDesc struct { } @@ -47,6 +51,10 @@ func (d *EmptyShardStoreDesc) HasBypassRead() bool { return false } +func (d *EmptyShardStoreDesc) HasBypassHTTPRead() bool { + return false +} + type EmptySharedStoreDesc struct { } diff --git a/common/pkgs/storage/types/types.go b/common/pkgs/storage/types/types.go index 11b2516..af1803d 100644 --- a/common/pkgs/storage/types/types.go +++ b/common/pkgs/storage/types/types.go @@ -50,6 +50,7 @@ type StorageBuilder interface { CreateMultiparter() (Multiparter, error) // 创建一个存储服务直传组件 CreateS2STransfer() (S2STransfer, error) + CreateECMultiplier() (ECMultiplier, error) } type ShardStoreDesc interface { @@ -59,6 +60,8 @@ type ShardStoreDesc interface { HasBypassWrite() bool // 是否能旁路读取 HasBypassRead() bool + // 是否能通过HTTP读取 + HasBypassHTTPRead() bool } type SharedStoreDesc interface { From ce77bda8ed7fd0815b0addfe370e0b31907677bf Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 20 Jan 2025 11:08:32 +0800 Subject: [PATCH 6/7] =?UTF-8?q?SharedStore=E9=87=8D=E5=90=8D=E4=B8=BAPubli?= =?UTF-8?q?cStore?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/services/storage.go | 2 +- common/pkgs/ioswitch2/fromto.go | 10 +++--- common/pkgs/ioswitch2/ops2/shared_store.go | 34 +++++++++---------- common/pkgs/ioswitch2/parser/gen/generator.go | 4 +-- common/pkgs/storage/agtpool/pool.go | 6 ++-- common/pkgs/storage/local/agent.go | 16 ++++----- common/pkgs/storage/local/local.go | 14 ++++---- common/pkgs/storage/local/shared_store.go | 28 +++++++-------- common/pkgs/storage/mashup/mashup.go | 4 +-- common/pkgs/storage/s3/agent.go | 2 +- common/pkgs/storage/s3/s3.go | 4 +-- common/pkgs/storage/s3/shared_store.go | 4 +-- common/pkgs/storage/types/empty_builder.go | 10 +++--- common/pkgs/storage/types/shared_store.go | 2 +- common/pkgs/storage/types/types.go | 6 ++-- common/pkgs/uploader/create_load.go | 2 +- common/pkgs/uploader/update.go | 2 +- common/pkgs/uploader/uploader.go | 4 +-- 18 files changed, 77 insertions(+), 77 deletions(-) diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index df437fc..a9263eb 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -119,7 +119,7 @@ func (svc *StorageService) LoadPackage(userID cdssdk.UserID, packageID cdssdk.Pa return fmt.Errorf("unsupported download strategy: %T", strg) } - ft.AddTo(ioswitch2.NewLoadToShared(*destStg.MasterHub, *destStg, path.Join(rootPath, obj.Object.Path))) + ft.AddTo(ioswitch2.NewLoadToPublic(*destStg.MasterHub, *destStg, path.Join(rootPath, obj.Object.Path))) // 顺便保存到同存储服务的分片存储中 if factory.GetBuilder(*destStg).ShardStoreDesc().Enabled() { ft.AddTo(ioswitch2.NewToShardStore(*destStg.MasterHub, *destStg, ioswitch2.RawStream(), "")) diff --git a/common/pkgs/ioswitch2/fromto.go b/common/pkgs/ioswitch2/fromto.go index 2c72e9b..f153776 100644 --- a/common/pkgs/ioswitch2/fromto.go +++ b/common/pkgs/ioswitch2/fromto.go @@ -195,26 +195,26 @@ func (t *ToShardStore) GetRange() math2.Range { return t.Range } -type LoadToShared struct { +type LoadToPublic struct { Hub cdssdk.Hub Storage stgmod.StorageDetail ObjectPath string } -func NewLoadToShared(hub cdssdk.Hub, storage stgmod.StorageDetail, objectPath string) *LoadToShared { - return &LoadToShared{ +func NewLoadToPublic(hub cdssdk.Hub, storage stgmod.StorageDetail, objectPath string) *LoadToPublic { + return &LoadToPublic{ Hub: hub, Storage: storage, ObjectPath: objectPath, } } -func (t *LoadToShared) GetStreamIndex() StreamIndex { +func (t *LoadToPublic) GetStreamIndex() StreamIndex { return StreamIndex{ Type: StreamIndexRaw, } } -func (t *LoadToShared) GetRange() math2.Range { +func (t *LoadToPublic) GetRange() math2.Range { return math2.Range{} } diff --git a/common/pkgs/ioswitch2/ops2/shared_store.go b/common/pkgs/ioswitch2/ops2/shared_store.go index 1d1bb11..8846c97 100644 --- a/common/pkgs/ioswitch2/ops2/shared_store.go +++ b/common/pkgs/ioswitch2/ops2/shared_store.go @@ -13,29 +13,29 @@ import ( ) func init() { - exec.UseOp[*SharedLoad]() + exec.UseOp[*PublicLoad]() } -type SharedLoad struct { +type PublicLoad struct { Input exec.VarID StorageID cdssdk.StorageID ObjectPath string } -func (o *SharedLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error { +func (o *PublicLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error { logger. WithField("Input", o.Input). - Debugf("load file to shared store") - defer logger.Debugf("load file to shared store finished") + Debugf("load file to public store") + defer logger.Debugf("load file to public store finished") stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx) if err != nil { return fmt.Errorf("getting storage manager: %w", err) } - store, err := stgAgts.GetSharedStore(o.StorageID) + store, err := stgAgts.GetPublicStore(o.StorageID) if err != nil { - return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err) + return fmt.Errorf("getting public store of storage %v: %w", o.StorageID, err) } input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) @@ -47,19 +47,19 @@ func (o *SharedLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error { return store.Write(o.ObjectPath, input.Stream) } -func (o *SharedLoad) String() string { - return fmt.Sprintf("SharedLoad %v -> %v:%v", o.Input, o.StorageID, o.ObjectPath) +func (o *PublicLoad) String() string { + return fmt.Sprintf("PublicLoad %v -> %v:%v", o.Input, o.StorageID, o.ObjectPath) } -type SharedLoadNode struct { +type PublicLoadNode struct { dag.NodeBase To ioswitch2.To Storage stgmod.StorageDetail ObjectPath string } -func (b *GraphNodeBuilder) NewSharedLoad(to ioswitch2.To, stg stgmod.StorageDetail, objPath string) *SharedLoadNode { - node := &SharedLoadNode{ +func (b *GraphNodeBuilder) NewPublicLoad(to ioswitch2.To, stg stgmod.StorageDetail, objPath string) *PublicLoadNode { + node := &PublicLoadNode{ To: to, Storage: stg, ObjectPath: objPath, @@ -70,23 +70,23 @@ func (b *GraphNodeBuilder) NewSharedLoad(to ioswitch2.To, stg stgmod.StorageDeta return node } -func (t *SharedLoadNode) GetTo() ioswitch2.To { +func (t *PublicLoadNode) GetTo() ioswitch2.To { return t.To } -func (t *SharedLoadNode) SetInput(input *dag.StreamVar) { +func (t *PublicLoadNode) SetInput(input *dag.StreamVar) { input.To(t, 0) } -func (t *SharedLoadNode) Input() dag.StreamInputSlot { +func (t *PublicLoadNode) Input() dag.StreamInputSlot { return dag.StreamInputSlot{ Node: t, Index: 0, } } -func (t *SharedLoadNode) GenerateOp() (exec.Op, error) { - return &SharedLoad{ +func (t *PublicLoadNode) GenerateOp() (exec.Op, error) { + return &PublicLoad{ Input: t.InputStreams().Get(0).VarID, StorageID: t.Storage.Storage.StorageID, ObjectPath: t.ObjectPath, diff --git a/common/pkgs/ioswitch2/parser/gen/generator.go b/common/pkgs/ioswitch2/parser/gen/generator.go index de35c62..b9da733 100644 --- a/common/pkgs/ioswitch2/parser/gen/generator.go +++ b/common/pkgs/ioswitch2/parser/gen/generator.go @@ -361,8 +361,8 @@ func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error) return n, nil - case *ioswitch2.LoadToShared: - n := ctx.DAG.NewSharedLoad(t, t.Storage, t.ObjectPath) + case *ioswitch2.LoadToPublic: + n := ctx.DAG.NewPublicLoad(t, t.Storage, t.ObjectPath) if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil { return nil, err diff --git a/common/pkgs/storage/agtpool/pool.go b/common/pkgs/storage/agtpool/pool.go index 7b5bb1a..0f93632 100644 --- a/common/pkgs/storage/agtpool/pool.go +++ b/common/pkgs/storage/agtpool/pool.go @@ -87,8 +87,8 @@ func (m *AgentPool) GetShardStore(stgID cdssdk.StorageID) (types.ShardStore, err return stg.Agent.GetShardStore() } -// 查找指定Storage的SharedStore组件 -func (m *AgentPool) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, error) { +// 查找指定Storage的PublicStore组件 +func (m *AgentPool) GetPublicStore(stgID cdssdk.StorageID) (types.PublicStore, error) { m.lock.Lock() defer m.lock.Unlock() @@ -97,5 +97,5 @@ func (m *AgentPool) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, e return nil, types.ErrStorageNotFound } - return stg.Agent.GetSharedStore() + return stg.Agent.GetPublicStore() } diff --git a/common/pkgs/storage/local/agent.go b/common/pkgs/storage/local/agent.go index c2e5e52..ab5f252 100644 --- a/common/pkgs/storage/local/agent.go +++ b/common/pkgs/storage/local/agent.go @@ -8,7 +8,7 @@ import ( type agent struct { Detail stgmod.StorageDetail ShardStore *ShardStore - SharedStore *SharedStore + PublicStore *PublicStore } func (s *agent) Start(ch *types.StorageEventChan) { @@ -16,8 +16,8 @@ func (s *agent) Start(ch *types.StorageEventChan) { s.ShardStore.Start(ch) } - if s.SharedStore != nil { - s.SharedStore.Start(ch) + if s.PublicStore != nil { + s.PublicStore.Start(ch) } } @@ -26,8 +26,8 @@ func (s *agent) Stop() { s.ShardStore.Stop() } - if s.SharedStore != nil { - s.SharedStore.Stop() + if s.PublicStore != nil { + s.PublicStore.Stop() } } @@ -43,10 +43,10 @@ func (a *agent) GetShardStore() (types.ShardStore, error) { return a.ShardStore, nil } -func (a *agent) GetSharedStore() (types.SharedStore, error) { - if a.SharedStore == nil { +func (a *agent) GetPublicStore() (types.PublicStore, error) { + if a.PublicStore == nil { return nil, types.ErrUnsupported } - return a.SharedStore, nil + return a.PublicStore, nil } diff --git a/common/pkgs/storage/local/local.go b/common/pkgs/storage/local/local.go index 1084275..e1a81d7 100644 --- a/common/pkgs/storage/local/local.go +++ b/common/pkgs/storage/local/local.go @@ -42,18 +42,18 @@ func (b *builder) CreateAgent() (types.StorageAgent, error) { agt.ShardStore = store } - if b.detail.Storage.SharedStore != nil { - local, ok := b.detail.Storage.SharedStore.(*cdssdk.LocalSharedStorage) + if b.detail.Storage.PublicStore != nil { + local, ok := b.detail.Storage.PublicStore.(*cdssdk.LocalPublicStorage) if !ok { - return nil, fmt.Errorf("invalid shared store type %T for local storage", b.detail.Storage.SharedStore) + return nil, fmt.Errorf("invalid public store type %T for local storage", b.detail.Storage.PublicStore) } - store, err := NewSharedStore(agt, *local) + store, err := NewPublicStore(agt, *local) if err != nil { return nil, err } - agt.SharedStore = store + agt.PublicStore = store } return agt, nil @@ -63,8 +63,8 @@ func (b *builder) ShardStoreDesc() types.ShardStoreDesc { return &ShardStoreDesc{builder: b} } -func (b *builder) SharedStoreDesc() types.SharedStoreDesc { - return &SharedStoreDesc{builder: b} +func (b *builder) PublicStoreDesc() types.PublicStoreDesc { + return &PublicStoreDesc{builder: b} } func (b *builder) CreateMultiparter() (types.Multiparter, error) { diff --git a/common/pkgs/storage/local/shared_store.go b/common/pkgs/storage/local/shared_store.go index 87eb45c..55f96b2 100644 --- a/common/pkgs/storage/local/shared_store.go +++ b/common/pkgs/storage/local/shared_store.go @@ -10,40 +10,40 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) -type SharedStoreDesc struct { - types.EmptySharedStoreDesc +type PublicStoreDesc struct { + types.EmptyPublicStoreDesc builder *builder } -func (d *SharedStoreDesc) Enabled() bool { - return d.builder.detail.Storage.SharedStore != nil +func (d *PublicStoreDesc) Enabled() bool { + return d.builder.detail.Storage.PublicStore != nil } -func (d *SharedStoreDesc) HasBypassWrite() bool { +func (d *PublicStoreDesc) HasBypassWrite() bool { return false } -type SharedStore struct { +type PublicStore struct { agt *agent - cfg cdssdk.LocalSharedStorage + cfg cdssdk.LocalPublicStorage } -func NewSharedStore(agt *agent, cfg cdssdk.LocalSharedStorage) (*SharedStore, error) { - return &SharedStore{ +func NewPublicStore(agt *agent, cfg cdssdk.LocalPublicStorage) (*PublicStore, error) { + return &PublicStore{ agt: agt, cfg: cfg, }, nil } -func (s *SharedStore) Start(ch *types.StorageEventChan) { +func (s *PublicStore) Start(ch *types.StorageEventChan) { s.getLogger().Infof("component start, LoadBase: %v", s.cfg.LoadBase) } -func (s *SharedStore) Stop() { +func (s *PublicStore) Stop() { s.getLogger().Infof("component stop") } -func (s *SharedStore) Write(objPath string, stream io.Reader) error { +func (s *PublicStore) Write(objPath string, stream io.Reader) error { fullPath := filepath.Join(s.cfg.LoadBase, objPath) err := os.MkdirAll(filepath.Dir(fullPath), 0755) if err != nil { @@ -64,6 +64,6 @@ func (s *SharedStore) Write(objPath string, stream io.Reader) error { return nil } -func (s *SharedStore) getLogger() logger.Logger { - return logger.WithField("SharedStore", "Local").WithField("Storage", s.agt.Detail.Storage.String()) +func (s *PublicStore) getLogger() logger.Logger { + return logger.WithField("PublicStore", "Local").WithField("Storage", s.agt.Detail.Storage.String()) } diff --git a/common/pkgs/storage/mashup/mashup.go b/common/pkgs/storage/mashup/mashup.go index 882aaff..de1e383 100644 --- a/common/pkgs/storage/mashup/mashup.go +++ b/common/pkgs/storage/mashup/mashup.go @@ -38,13 +38,13 @@ func (b *builder) ShardStoreDesc() types.ShardStoreDesc { return blder.ShardStoreDesc() } -func (b *builder) SharedStoreDesc() types.SharedStoreDesc { +func (b *builder) PublicStoreDesc() types.PublicStoreDesc { stgType := b.detail.Storage.Type.(*cdssdk.MashupStorageType) detail := b.detail detail.Storage.Type = stgType.Agent blder := factory.GetBuilder(detail) - return blder.SharedStoreDesc() + return blder.PublicStoreDesc() } func (b *builder) CreateMultiparter() (types.Multiparter, error) { diff --git a/common/pkgs/storage/s3/agent.go b/common/pkgs/storage/s3/agent.go index f33d6c0..271bcab 100644 --- a/common/pkgs/storage/s3/agent.go +++ b/common/pkgs/storage/s3/agent.go @@ -34,6 +34,6 @@ func (a *Agent) GetShardStore() (types.ShardStore, error) { return a.ShardStore, nil } -func (a *Agent) GetSharedStore() (types.SharedStore, error) { +func (a *Agent) GetPublicStore() (types.PublicStore, error) { return nil, types.ErrUnsupported } diff --git a/common/pkgs/storage/s3/s3.go b/common/pkgs/storage/s3/s3.go index c65a19f..9ac531a 100644 --- a/common/pkgs/storage/s3/s3.go +++ b/common/pkgs/storage/s3/s3.go @@ -63,8 +63,8 @@ func (b *builder) ShardStoreDesc() types.ShardStoreDesc { return &ShardStoreDesc{builder: b} } -func (b *builder) SharedStoreDesc() types.SharedStoreDesc { - return &SharedStoreDesc{} +func (b *builder) PublicStoreDesc() types.PublicStoreDesc { + return &PublicStoreDesc{} } func (b *builder) CreateMultiparter() (types.Multiparter, error) { diff --git a/common/pkgs/storage/s3/shared_store.go b/common/pkgs/storage/s3/shared_store.go index c0e74e3..f65dbcc 100644 --- a/common/pkgs/storage/s3/shared_store.go +++ b/common/pkgs/storage/s3/shared_store.go @@ -2,6 +2,6 @@ package s3 import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" -type SharedStoreDesc struct { - types.EmptySharedStoreDesc +type PublicStoreDesc struct { + types.EmptyPublicStoreDesc } diff --git a/common/pkgs/storage/types/empty_builder.go b/common/pkgs/storage/types/empty_builder.go index 692d020..667af24 100644 --- a/common/pkgs/storage/types/empty_builder.go +++ b/common/pkgs/storage/types/empty_builder.go @@ -19,8 +19,8 @@ func (b *EmptyBuilder) ShardStoreDesc() ShardStoreDesc { return &EmptyShardStoreDesc{} } -func (b *EmptyBuilder) SharedStoreDesc() SharedStoreDesc { - return &EmptySharedStoreDesc{} +func (b *EmptyBuilder) PublicStoreDesc() PublicStoreDesc { + return &EmptyPublicStoreDesc{} } // 创建一个分片上传组件 @@ -55,13 +55,13 @@ func (d *EmptyShardStoreDesc) HasBypassHTTPRead() bool { return false } -type EmptySharedStoreDesc struct { +type EmptyPublicStoreDesc struct { } -func (d *EmptySharedStoreDesc) Enabled() bool { +func (d *EmptyPublicStoreDesc) Enabled() bool { return false } -func (d *EmptySharedStoreDesc) HasBypassWrite() bool { +func (d *EmptyPublicStoreDesc) HasBypassWrite() bool { return false } diff --git a/common/pkgs/storage/types/shared_store.go b/common/pkgs/storage/types/shared_store.go index 98456e8..782f2aa 100644 --- a/common/pkgs/storage/types/shared_store.go +++ b/common/pkgs/storage/types/shared_store.go @@ -4,7 +4,7 @@ import ( "io" ) -type SharedStore interface { +type PublicStore interface { Start(ch *StorageEventChan) Stop() diff --git a/common/pkgs/storage/types/types.go b/common/pkgs/storage/types/types.go index af1803d..e74a1af 100644 --- a/common/pkgs/storage/types/types.go +++ b/common/pkgs/storage/types/types.go @@ -31,7 +31,7 @@ type StorageAgent interface { // 获取分片存储服务 GetShardStore() (ShardStore, error) // 获取共享存储服务 - GetSharedStore() (SharedStore, error) + GetPublicStore() (PublicStore, error) } // 创建存储服务的指定组件。 @@ -45,7 +45,7 @@ type StorageBuilder interface { // 是否支持分片存储服务 ShardStoreDesc() ShardStoreDesc // 是否支持共享存储服务 - SharedStoreDesc() SharedStoreDesc + PublicStoreDesc() PublicStoreDesc // 创建一个分片上传组件 CreateMultiparter() (Multiparter, error) // 创建一个存储服务直传组件 @@ -64,7 +64,7 @@ type ShardStoreDesc interface { HasBypassHTTPRead() bool } -type SharedStoreDesc interface { +type PublicStoreDesc interface { // 是否已启动 Enabled() bool // 是否能旁路上传 diff --git a/common/pkgs/uploader/create_load.go b/common/pkgs/uploader/create_load.go index 16590b4..359ea27 100644 --- a/common/pkgs/uploader/create_load.go +++ b/common/pkgs/uploader/create_load.go @@ -45,7 +45,7 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err ft.AddFrom(fromExec) for i, stg := range u.targetStgs { ft.AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg, ioswitch2.RawStream(), "fileHash")) - ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg, path.Join(u.loadRoots[i], pa))) + ft.AddTo(ioswitch2.NewLoadToPublic(*stg.MasterHub, stg, path.Join(u.loadRoots[i], pa))) stgIDs = append(stgIDs, stg.Storage.StorageID) } diff --git a/common/pkgs/uploader/update.go b/common/pkgs/uploader/update.go index 862b9f6..596db35 100644 --- a/common/pkgs/uploader/update.go +++ b/common/pkgs/uploader/update.go @@ -51,7 +51,7 @@ func (w *UpdateUploader) Upload(pat string, size int64, stream io.Reader) error AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "fileHash")) for i, stg := range w.loadToStgs { - ft.AddTo(ioswitch2.NewLoadToShared(*stg.MasterHub, stg, path.Join(w.loadToPath[i], pat))) + ft.AddTo(ioswitch2.NewLoadToPublic(*stg.MasterHub, stg, path.Join(w.loadToPath[i], pat))) } plans := exec.NewPlanBuilder() diff --git a/common/pkgs/uploader/uploader.go b/common/pkgs/uploader/uploader.go index ad46d85..245e36f 100644 --- a/common/pkgs/uploader/uploader.go +++ b/common/pkgs/uploader/uploader.go @@ -86,8 +86,8 @@ func (u *Uploader) BeginUpdate(userID cdssdk.UserID, pkgID cdssdk.PackageID, aff if stg.MasterHub == nil { return nil, fmt.Errorf("load to storage %v has no master hub", stgID) } - if !factory.GetBuilder(stg).SharedStoreDesc().Enabled() { - return nil, fmt.Errorf("load to storage %v has no shared store", stgID) + if !factory.GetBuilder(stg).PublicStoreDesc().Enabled() { + return nil, fmt.Errorf("load to storage %v has no public store", stgID) } loadToStgs[i] = stg From f9b133125b19f29f76e74390b5f3b27ff3d2a0a4 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 20 Jan 2025 11:10:31 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/ioswitch2/ops2/{shared_store.go => public_store.go} | 0 common/pkgs/storage/local/{shared_store.go => public_store.go} | 0 common/pkgs/storage/s3/{shared_store.go => public_store.go} | 0 common/pkgs/storage/types/{shared_store.go => public_store.go} | 0 4 files changed, 0 insertions(+), 0 deletions(-) rename common/pkgs/ioswitch2/ops2/{shared_store.go => public_store.go} (100%) rename common/pkgs/storage/local/{shared_store.go => public_store.go} (100%) rename common/pkgs/storage/s3/{shared_store.go => public_store.go} (100%) rename common/pkgs/storage/types/{shared_store.go => public_store.go} (100%) diff --git a/common/pkgs/ioswitch2/ops2/shared_store.go b/common/pkgs/ioswitch2/ops2/public_store.go similarity index 100% rename from common/pkgs/ioswitch2/ops2/shared_store.go rename to common/pkgs/ioswitch2/ops2/public_store.go diff --git a/common/pkgs/storage/local/shared_store.go b/common/pkgs/storage/local/public_store.go similarity index 100% rename from common/pkgs/storage/local/shared_store.go rename to common/pkgs/storage/local/public_store.go diff --git a/common/pkgs/storage/s3/shared_store.go b/common/pkgs/storage/s3/public_store.go similarity index 100% rename from common/pkgs/storage/s3/shared_store.go rename to common/pkgs/storage/s3/public_store.go diff --git a/common/pkgs/storage/types/shared_store.go b/common/pkgs/storage/types/public_store.go similarity index 100% rename from common/pkgs/storage/types/shared_store.go rename to common/pkgs/storage/types/public_store.go