Browse Source

调整s3接口目录

gitlink
JeshuaRen 1 year ago
parent
commit
880184cbce
54 changed files with 940 additions and 378 deletions
  1. +16
    -0
      agent/internal/cmd/cmd.go
  2. +245
    -0
      agent/internal/cmd/serve.go
  3. +8
    -3
      agent/internal/config/config.go
  4. +7
    -7
      agent/internal/mq/cache.go
  5. +6
    -6
      agent/internal/mq/service.go
  6. +3
    -3
      agent/internal/task/cache_move_package.go
  7. +4
    -4
      agent/internal/task/storage_load_package.go
  8. +12
    -12
      agent/internal/task/task.go
  9. +5
    -32
      agent/main.go
  10. +1
    -1
      client/internal/cmdline/package.go
  11. +2
    -2
      client/internal/http/package.go
  12. +2
    -2
      client/internal/services/package.go
  13. +15
    -0
      common/models/models.go
  14. +2
    -0
      common/pkgs/db/cache.go
  15. +2
    -0
      common/pkgs/db/db.go
  16. +2
    -0
      common/pkgs/db/location.go
  17. +2
    -0
      common/pkgs/db/node.go
  18. +2
    -0
      common/pkgs/db/node_connectivity.go
  19. +2
    -0
      common/pkgs/db/object_access_stat.go
  20. +2
    -0
      common/pkgs/db/object_block.go
  21. +2
    -0
      common/pkgs/db/package_access_stat.go
  22. +2
    -0
      common/pkgs/db/storage_package.go
  23. +2
    -0
      common/pkgs/db/user.go
  24. +2
    -0
      common/pkgs/db/user_bucket.go
  25. +2
    -0
      common/pkgs/db/utils.go
  26. +6
    -0
      common/pkgs/db2/storage.go
  27. +8
    -6
      common/pkgs/ioswitch2/ops2/multipart.go
  28. +12
    -12
      common/pkgs/ioswitch2/ops2/shard_store.go
  29. +1
    -1
      common/pkgs/ioswitch2/parser/parser.go
  30. +12
    -12
      common/pkgs/ioswitchlrc/ops2/shard_store.go
  31. +1
    -1
      common/pkgs/ioswitchlrc/parser/passes.go
  32. +3
    -3
      common/pkgs/mq/agent/server.go
  33. +30
    -0
      common/pkgs/mq/coordinator/node.go
  34. +20
    -19
      common/pkgs/storage/local/shard_store.go
  35. +1
    -1
      common/pkgs/storage/local/temp_store.go
  36. +6
    -6
      common/pkgs/storage/local/writer.go
  37. +10
    -0
      common/pkgs/storage/mgr/create_components.go
  38. +27
    -0
      common/pkgs/storage/mgr/create_shardstore.go
  39. +10
    -0
      common/pkgs/storage/mgr/create_sharedstore.go
  40. +167
    -0
      common/pkgs/storage/mgr/mgr.go
  41. +24
    -0
      common/pkgs/storage/obs/obs.go
  42. +56
    -0
      common/pkgs/storage/oss/oss.go
  43. +0
    -68
      common/pkgs/storage/shard/pool/pool.go
  44. +0
    -26
      common/pkgs/storage/shard/storages/utils/utils.go
  45. +0
    -58
      common/pkgs/storage/shard/types/option.go
  46. +30
    -0
      common/pkgs/storage/types/s3_client.go
  47. +55
    -5
      common/pkgs/storage/types/shard_store.go
  48. +5
    -0
      common/pkgs/storage/types/shared_store.go
  49. +5
    -0
      common/pkgs/storage/types/temp_store.go
  50. +12
    -0
      common/pkgs/storage/types/types.go
  51. +26
    -0
      common/pkgs/storage/utils/utils.go
  52. +56
    -0
      coordinator/internal/mq/node.go
  53. +5
    -26
      go.mod
  54. +2
    -62
      go.sum

+ 16
- 0
agent/internal/cmd/cmd.go View File

@@ -0,0 +1,16 @@
package cmd

import "github.com/spf13/cobra"

var RootCmd = &cobra.Command{
Use: "agent",
}

func init() {
var configPath string
RootCmd.Flags().StringVarP(&configPath, "config", "c", "", "path to config file")

RootCmd.Run = func(cmd *cobra.Command, args []string) {
serve(configPath)
}
}

+ 245
- 0
agent/internal/cmd/serve.go View File

@@ -0,0 +1,245 @@
package cmd

import (
"fmt"
"net"
"os"
"time"

"gitlink.org.cn/cloudream/storage/agent/internal/http"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/agent/internal/config"
"gitlink.org.cn/cloudream/storage/agent/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/accessstat"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"

"google.golang.org/grpc"

agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"

grpcsvc "gitlink.org.cn/cloudream/storage/agent/internal/grpc"
cmdsvc "gitlink.org.cn/cloudream/storage/agent/internal/mq"
)

func serve(configPath string) {
err := config.Init(configPath)
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}

err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}

stgglb.InitLocal(&config.Cfg().Local)
stgglb.InitMQPool(&config.Cfg().RabbitMQ)
stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{})

hubCfg := downloadHubConfig()

stgMgr := mgr.NewManager()
for _, stg := range hubCfg.Storages {
err := stgMgr.InitStorage(stg)
if err != nil {
fmt.Printf("init storage %v: %v", stg, err)
os.Exit(1)
}
}

sw := exec.NewWorker()

httpSvr, err := http.NewServer(config.Cfg().ListenAddr, http.NewService(&sw))
if err != nil {
logger.Fatalf("new http server failed, err: %s", err.Error())
}
go serveHTTP(httpSvr)

// 启动网络连通性检测,并就地检测一次
conCol := connectivity.NewCollector(&config.Cfg().Connectivity, func(collector *connectivity.Collector) {
log := logger.WithField("Connectivity", "")

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
log.Warnf("acquire coordinator mq failed, err: %s", err.Error())
return
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

cons := collector.GetAll()
nodeCons := make([]cdssdk.NodeConnectivity, 0, len(cons))
for _, con := range cons {
var delay *float32
if con.Delay != nil {
v := float32(con.Delay.Microseconds()) / 1000
delay = &v
}

nodeCons = append(nodeCons, cdssdk.NodeConnectivity{
FromNodeID: *stgglb.Local.NodeID,
ToNodeID: con.ToNodeID,
Delay: delay,
TestTime: con.TestTime,
})
}

_, err = coorCli.UpdateNodeConnectivities(coormq.ReqUpdateNodeConnectivities(nodeCons))
if err != nil {
log.Warnf("update node connectivities: %v", err)
}
})
conCol.CollectInPlace()

acStat := accessstat.NewAccessStat(accessstat.Config{
// TODO 考虑放到配置里
ReportInterval: time.Second * 10,
})
go serveAccessStat(acStat)

distlock, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil {
logger.Fatalf("new ipfs failed, err: %s", err.Error())
}

dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol)

taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat, stgMgr)

// 启动命令服务器
// TODO 需要设计AgentID持久化机制
agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, stgMgr), config.Cfg().ID, &config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new agent server failed, err: %s", err.Error())
}
agtSvr.OnError(func(err error) {
logger.Warnf("agent server err: %s", err.Error())
})
go serveAgentServer(agtSvr)

//面向客户端收发数据
listenAddr := config.Cfg().GRPC.MakeListenAddress()
lis, err := net.Listen("tcp", listenAddr)
if err != nil {
logger.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error())
}
s := grpc.NewServer()
agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&sw))
go serveGRPC(s, lis)

go serveDistLock(distlock)

foever := make(chan struct{})
<-foever
}

func downloadHubConfig() coormq.GetHubConfigResp {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
logger.Errorf("new coordinator client: %v", err)
os.Exit(1)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

cfgResp, err := coorCli.GetHubConfig(coormq.ReqGetHubConfig(cdssdk.NodeID(config.Cfg().ID)))
if err != nil {
logger.Errorf("getting hub config: %v", err)
os.Exit(1)
}

return *cfgResp
}

func serveAgentServer(server *agtmq.Server) {
logger.Info("start serving command server")

err := server.Start()

if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
}

logger.Info("command server stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func serveGRPC(s *grpc.Server, lis net.Listener) {
logger.Info("start serving grpc")

err := s.Serve(lis)

if err != nil {
logger.Errorf("grpc stopped with error: %s", err.Error())
}

logger.Info("grpc stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func serveHTTP(server *http.Server) {
logger.Info("start serving http")

err := server.Serve()

if err != nil {
logger.Errorf("http stopped with error: %s", err.Error())
}

logger.Info("http stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func serveDistLock(svc *distlock.Service) {
logger.Info("start serving distlock")

err := svc.Serve()

if err != nil {
logger.Errorf("distlock stopped with error: %s", err.Error())
}

logger.Info("distlock stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func serveAccessStat(svc *accessstat.AccessStat) {
logger.Info("start serving access stat")

ch := svc.Start()
loop:
for {
val, err := ch.Receive()
if err != nil {
logger.Errorf("access stat stopped with error: %v", err)
break
}

switch val := val.(type) {
case error:
logger.Errorf("access stat stopped with error: %v", val)
break loop
}
}
logger.Info("access stat stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

+ 8
- 3
agent/internal/config/config.go View File

@@ -3,6 +3,7 @@ package config
import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
c "gitlink.org.cn/cloudream/common/utils/config"
stgmodels "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
@@ -12,7 +13,7 @@ import (
)

type Config struct {
ID int64 `json:"id"`
ID cdssdk.NodeID `json:"id"`
ListenAddr string `json:"listenAddr"`
Local stgmodels.LocalMachineInfo `json:"local"`
GRPC *grpc.Config `json:"grpc"`
@@ -25,8 +26,12 @@ type Config struct {

var cfg Config

func Init() error {
return c.DefaultLoad("agent", &cfg)
func Init(path string) error {
if path == "" {
return c.DefaultLoad("agent", &cfg)
}

return c.Load(path, &cfg)
}

func Cfg() *Config {


+ 7
- 7
agent/internal/mq/cache.go View File

@@ -12,9 +12,9 @@ import (
)

func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) {
store := svc.shardStorePool.Get(msg.StorageID)
if store == nil {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("storage %v has no shard store", msg.StorageID))
store, err := svc.stgMgr.GetShardStore(msg.StorageID)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of storage %v: %v", msg.StorageID, err))
}

infos, err := store.ListAll()
@@ -31,12 +31,12 @@ func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *m
}

func (svc *Service) CacheGC(msg *agtmq.CacheGC) (*agtmq.CacheGCResp, *mq.CodeMessage) {
store := svc.shardStorePool.Get(msg.StorageID)
if store == nil {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("storage %v has no shard store", msg.StorageID))
store, err := svc.stgMgr.GetShardStore(msg.StorageID)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of storage %v: %v", msg.StorageID, err))
}

err := store.Purge(msg.Avaiables)
err = store.Purge(msg.Avaiables)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("purging cache: %v", err))
}


+ 6
- 6
agent/internal/mq/service.go View File

@@ -2,17 +2,17 @@ package mq

import (
"gitlink.org.cn/cloudream/storage/agent/internal/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
)

type Service struct {
taskManager *task.Manager
shardStorePool *pool.ShardStorePool
taskManager *task.Manager
stgMgr *mgr.Manager
}

func NewService(taskMgr *task.Manager, shardStorePool *pool.ShardStorePool) *Service {
func NewService(taskMgr *task.Manager, stgMgr *mgr.Manager) *Service {
return &Service{
taskManager: taskMgr,
shardStorePool: shardStorePool,
taskManager: taskMgr,
stgMgr: stgMgr,
}
}

+ 3
- 3
agent/internal/task/cache_move_package.go View File

@@ -40,9 +40,9 @@ func (t *CacheMovePackage) do(ctx TaskContext) error {
log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end")

store := ctx.shardStorePool.Get(t.storageID)
if store == nil {
return fmt.Errorf("storage has no shard store")
store, err := ctx.stgMgr.GetShardStore(t.storageID)
if err != nil {
return fmt.Errorf("get shard store of storage %v: %w", t.storageID, err)
}

mutex, err := reqbuilder.NewBuilder().


+ 4
- 4
agent/internal/task/storage_load_package.go View File

@@ -22,7 +22,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"gitlink.org.cn/cloudream/storage/common/utils"
)

@@ -94,9 +94,9 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e
return fmt.Errorf("getting package object details: %w", err)
}

shardstore := ctx.shardStorePool.Get(t.storageID)
if shardstore == nil {
return fmt.Errorf("shard store %v not found on this hub", t.storageID)
shardstore, err := ctx.stgMgr.GetShardStore(t.storageID)
if err != nil {
return fmt.Errorf("get shard store of storage %v: %w", t.storageID, err)
}

mutex, err := reqbuilder.NewBuilder().


+ 12
- 12
agent/internal/task/task.go View File

@@ -6,16 +6,16 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/accessstat"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
)

// TaskContext 定义了任务执行的上下文环境,包含分布式锁服务、IO开关和网络连接状态收集器
type TaskContext struct {
distlock *distlock.Service
connectivity *connectivity.Collector
downloader *downloader.Downloader
accessStat *accessstat.AccessStat
shardStorePool *pool.ShardStorePool
distlock *distlock.Service
connectivity *connectivity.Collector
downloader *downloader.Downloader
accessStat *accessstat.AccessStat
stgMgr *mgr.Manager
}

// CompleteFn 类型定义了任务完成时需要执行的函数,用于设置任务的执行结果
@@ -33,12 +33,12 @@ type Task = task.Task[TaskContext]
// CompleteOption 类型定义了任务完成时的选项,可用于定制化任务完成的处理方式
type CompleteOption = task.CompleteOption

func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, shardStorePool *pool.ShardStorePool) Manager {
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgMgr *mgr.Manager) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
connectivity: connectivity,
downloader: downloader,
accessStat: accessStat,
shardStorePool: shardStorePool,
distlock: distlock,
connectivity: connectivity,
downloader: downloader,
accessStat: accessStat,
stgMgr: stgMgr,
})
}

+ 5
- 32
agent/main.go View File

@@ -1,39 +1,9 @@
package main

import (
"fmt"
"net"
"os"
"time"

"gitlink.org.cn/cloudream/storage/agent/internal/http"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/agent/internal/config"
"gitlink.org.cn/cloudream/storage/agent/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/accessstat"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool"

"google.golang.org/grpc"

agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"

grpcsvc "gitlink.org.cn/cloudream/storage/agent/internal/grpc"
cmdsvc "gitlink.org.cn/cloudream/storage/agent/internal/mq"
)

// TODO 此数据是否在运行时会发生变化?
var AgentIpList []string
import "gitlink.org.cn/cloudream/storage/agent/internal/cmd"

func main() {
<<<<<<< HEAD
// TODO 放到配置里读取
AgentIpList = []string{"pcm01", "pcm1", "pcm2"}

@@ -228,4 +198,7 @@ loop:

// TODO 仅简单结束了程序
os.Exit(1)
=======
cmd.RootCmd.Execute()
>>>>>>> 3fe245a05a09f784fe0081982bca46f964279b4e
}

+ 1
- 1
client/internal/cmdline/package.go View File

@@ -173,7 +173,7 @@ func PackageDeletePackage(ctx CommandContext, packageID cdssdk.PackageID) error
// error - 操作过程中发生的任何错误。
func PackageGetCachedStorages(ctx CommandContext, packageID cdssdk.PackageID) error {
userID := cdssdk.UserID(1)
resp, err := ctx.Cmdline.Svc.PackageSvc().GetCachedNodes(userID, packageID)
resp, err := ctx.Cmdline.Svc.PackageSvc().GetCachedStorages(userID, packageID)
fmt.Printf("resp: %v\n", resp)
if err != nil {
return fmt.Errorf("get package %d cached nodes failed, err: %w", packageID, err)


+ 2
- 2
client/internal/http/package.go View File

@@ -134,14 +134,14 @@ func (s *PackageService) ListBucketPackages(ctx *gin.Context) {
func (s *PackageService) GetCachedStorages(ctx *gin.Context) {
log := logger.WithField("HTTP", "Package.GetCachedStorages")

var req cdsapi.PackageGetCachedNodesReq
var req cdsapi.PackageGetCachedStoragesReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding query: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

resp, err := s.svc.PackageSvc().GetCachedNodes(req.UserID, req.PackageID)
resp, err := s.svc.PackageSvc().GetCachedStorages(req.UserID, req.PackageID)
if err != nil {
log.Warnf("get package cached nodes failed: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package cached nodes failed"))


+ 2
- 2
client/internal/services/package.go View File

@@ -106,8 +106,8 @@ func (svc *PackageService) DeletePackage(userID cdssdk.UserID, packageID cdssdk.
return nil
}

// GetCachedNodes 获取指定包的缓存节点信息
func (svc *PackageService) GetCachedNodes(userID cdssdk.UserID, packageID cdssdk.PackageID) (cdssdk.PackageCachingInfo, error) {
// GetCachedStorages 获取指定包的缓存节点信息
func (svc *PackageService) GetCachedStorages(userID cdssdk.UserID, packageID cdssdk.PackageID) (cdssdk.PackageCachingInfo, error) {
// 从协调器MQ池中获取客户端
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {


+ 15
- 0
common/models/models.go View File

@@ -133,3 +133,18 @@ type StorageDetail struct {
Shard *cdssdk.ShardStorage `json:"shard"`
Shared *cdssdk.SharedStorage `json:"shared"`
}

type ObjectStorage struct {
Manufacturer string `json:"manufacturer"`
Region string `json:"region"`
AK string `json:"access_key_id"`
SK string `json:"secret_access_key"`
Endpoint string `json:"endpoint"`
Bucket string `json:"bucket"`
}

const (
HuaweiCloud = "HuaweiCloud"
AliCloud = "AliCloud"
SugonCloud = "SugonCloud"
)

+ 2
- 0
common/pkgs/db/cache.go View File

@@ -1,5 +1,6 @@
package db

/*
import (
"time"

@@ -121,3 +122,4 @@ func (*CacheDB) FindCachingFileUserNodes(ctx SQLContext, userID cdssdk.NodeID, f
" UserNode.UserID = ? and UserNode.NodeID = Node.NodeID", fileHash, userID)
return x, err
}
*/

+ 2
- 0
common/pkgs/db/db.go View File

@@ -1,5 +1,6 @@
package db

/*
import (
"context"
"database/sql"
@@ -64,3 +65,4 @@ func (db *DB) DoTx(isolation sql.IsolationLevel, fn func(tx *sqlx.Tx) error) err
func (db *DB) SQLCtx() SQLContext {
return db.d
}
*/

+ 2
- 0
common/pkgs/db/location.go View File

@@ -1,5 +1,6 @@
package db

/*
import (
"fmt"

@@ -35,3 +36,4 @@ func (db *LocationDB) FindLocationByExternalIP(ctx SQLContext, ip string) (model

return loc, nil
}
*/

+ 2
- 0
common/pkgs/db/node.go View File

@@ -1,5 +1,6 @@
package db

/*
import (
"time"

@@ -39,3 +40,4 @@ func (db *NodeDB) UpdateState(ctx SQLContext, nodeID cdssdk.NodeID, state string
_, err := ctx.Exec("update Node set State = ?, LastReportTime = ? where NodeID = ?", state, time.Now(), nodeID)
return err
}
*/

+ 2
- 0
common/pkgs/db/node_connectivity.go View File

@@ -1,5 +1,6 @@
package db

/*
import (
"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
@@ -38,3 +39,4 @@ func (db *NodeConnectivityDB) BatchUpdateOrCreate(ctx SQLContext, cons []model.N
"insert into NodeConnectivity(FromNodeID, ToNodeID, Delay, TestTime) values(:FromNodeID, :ToNodeID, :Delay, :TestTime) as new"+
" on duplicate key update Delay = new.Delay, TestTime = new.TestTime", 4, cons, nil)
}
*/

+ 2
- 0
common/pkgs/db/object_access_stat.go View File

@@ -1,5 +1,6 @@
package db

/*
import (
"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
@@ -119,3 +120,4 @@ func (*ObjectAccessStatDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.Pack
_, err := ctx.Exec("delete ObjectAccessStat from ObjectAccessStat inner join Object on ObjectAccessStat.ObjectID = Object.ObjectID where PackageID = ?", packageID)
return err
}
*/

+ 2
- 0
common/pkgs/db/object_block.go View File

@@ -1,5 +1,6 @@
package db

/*
import (
"database/sql"
"strconv"
@@ -134,3 +135,4 @@ func splitConcatedFileHash(idStr string) []string {
idStrs := strings.Split(idStr, ",")
return idStrs
}
*/

+ 2
- 0
common/pkgs/db/package_access_stat.go View File

@@ -1,5 +1,6 @@
package db

/*
import (
"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
@@ -82,3 +83,4 @@ func (*PackageAccessStatDB) DeleteByPackageID(ctx SQLContext, pkgID cdssdk.Packa
_, err := ctx.Exec("delete from PackageAccessStat where PackageID=?", pkgID)
return err
}
*/

+ 2
- 0
common/pkgs/db/storage_package.go View File

@@ -1,5 +1,6 @@
package db

/*
import (
"fmt"

@@ -115,3 +116,4 @@ func (*StoragePackageDB) FindPackageStorages(ctx SQLContext, packageID cdssdk.Pa
)
return ret, err
}
*/

+ 2
- 0
common/pkgs/db/user.go View File

@@ -1,5 +1,6 @@
package db

/*
import (
"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
@@ -19,3 +20,4 @@ func (db *UserDB) GetByID(ctx SQLContext, userID cdssdk.UserID) (model.User, err
err := sqlx.Get(ctx, &ret, "select * from User where UserID = ?", userID)
return ret, err
}
*/

+ 2
- 0
common/pkgs/db/user_bucket.go View File

@@ -1,5 +1,6 @@
package db

/*
type UserBucketDB struct {
*DB
}
@@ -12,3 +13,4 @@ func (*UserBucketDB) Create(ctx SQLContext, userID int64, bucketID int64) error
_, err := ctx.Exec("insert into UserBucket(UserID,BucketID) values(?,?)", userID, bucketID)
return err
}
*/

+ 2
- 0
common/pkgs/db/utils.go View File

@@ -1,5 +1,6 @@
package db

/*
import (
"database/sql"

@@ -73,3 +74,4 @@ func BatchNamedQuery[T any](ctx SQLContext, sql string, argCnt int, arr []T, cal
}
return nil
}
*/

+ 6
- 0
common/pkgs/db2/storage.go View File

@@ -76,3 +76,9 @@ func (db *StorageDB) GetUserStorageByName(ctx SQLContext, userID cdssdk.UserID,

return stg, err
}

func (db *StorageDB) GetHubStorages(ctx SQLContext, hubID cdssdk.NodeID) ([]model.Storage, error) {
var stgs []model.Storage
err := ctx.Table("Storage").Select("Storage.*").Find(&stgs, "NodeID = ?", hubID).Error
return stgs, err
}

+ 8
- 6
common/pkgs/ioswitch2/ops2/multipart.go View File

@@ -2,10 +2,12 @@ package ops2

import (
"encoding/json"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/sdks/cloudstorage"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

@@ -30,9 +32,9 @@ type MultipartManage struct {
}

func (o *MultipartManage) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
manager, err2 := exec.ValueByType[*mgr.Manager](ctx)

var oss cloudstorage.ObjectStorage

var oss stgmod.ObjectStorage
switch addr := o.Address.(type) {
case *cdssdk.LocalStorageAddress:
err := json.Unmarshal([]byte(addr.String()), &oss)
@@ -41,7 +43,7 @@ func (o *MultipartManage) Execute(ctx *exec.ExecContext, e *exec.Executor) error
}
}

client, err := cloudstorage.NewObjectStorageClient(oss)
client, err := types.NewObjectStorageClient(oss)
if err != nil {
return err
}
@@ -96,7 +98,7 @@ type MultipartUpload struct {
func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
value, err2 := exec.BindVar[*InitUploadValue](e, ctx.Context, o.UploadArgs)

var oss cloudstorage.ObjectStorage
var oss stgmod.ObjectStorage
switch addr := o.Address.(type) {
case *cdssdk.LocalStorageAddress:
err := json.Unmarshal([]byte(addr.String()), &oss)
@@ -105,7 +107,7 @@ func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error
}
}

client, err := cloudstorage.NewObjectStorageClient(oss)
client, err := types.NewObjectStorageClient(oss)
if err != nil {
return err
}


+ 12
- 12
common/pkgs/ioswitch2/ops2/shard_store.go View File

@@ -10,8 +10,8 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func init() {
@@ -40,14 +40,14 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("reading from shard store")
defer logger.Debugf("reading from shard store finished")

pool, err := exec.ValueByType[*pool.ShardStorePool](ctx)
stgMgr, err := exec.ValueByType[*mgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting shard store pool: %w", err)
return fmt.Errorf("getting storage manager: %w", err)
}

store := pool.Get(o.StorageID)
if store == nil {
return fmt.Errorf("shard store %v not found", o.StorageID)
store, err := stgMgr.GetShardStore(o.StorageID)
if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
}

file, err := store.Open(o.Open)
@@ -82,14 +82,14 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("writting file to shard store")
defer logger.Debugf("write to shard store finished")

pool, err := exec.ValueByType[*pool.ShardStorePool](ctx)
stgMgr, err := exec.ValueByType[*mgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting shard store pool: %w", err)
return fmt.Errorf("getting storage manager: %w", err)
}

store := pool.Get(o.StorageID)
if store == nil {
return fmt.Errorf("shard store %v not found", o.StorageID)
store, err := stgMgr.GetShardStore(o.StorageID)
if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
}

input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input)


+ 1
- 1
common/pkgs/ioswitch2/parser/parser.go View File

@@ -12,7 +12,7 @@ import (
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type DefaultParser struct {


+ 12
- 12
common/pkgs/ioswitchlrc/ops2/shard_store.go View File

@@ -10,8 +10,8 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func init() {
@@ -40,14 +40,14 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("reading from shard store")
defer logger.Debugf("reading from shard store finished")

pool, err := exec.ValueByType[*pool.ShardStorePool](ctx)
stgMgr, err := exec.ValueByType[*mgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting shard store pool: %w", err)
return fmt.Errorf("getting storage manager: %w", err)
}

store := pool.Get(o.StorageID)
if store == nil {
return fmt.Errorf("shard store %v not found", o.StorageID)
store, err := stgMgr.GetShardStore(o.StorageID)
if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
}

file, err := store.Open(o.Open)
@@ -82,14 +82,14 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("writting file to shard store")
defer logger.Debugf("write to shard store finished")

pool, err := exec.ValueByType[*pool.ShardStorePool](ctx)
stgMgr, err := exec.ValueByType[*mgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting shard store pool: %w", err)
return fmt.Errorf("getting storage manager: %w", err)
}

store := pool.Get(o.StorageID)
if store == nil {
return fmt.Errorf("shard store %v not found", o.StorageID)
store, err := stgMgr.GetShardStore(o.StorageID)
if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
}

input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input)


+ 1
- 1
common/pkgs/ioswitchlrc/parser/passes.go View File

@@ -10,7 +10,7 @@ import (
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/ops2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

// 计算输入流的打开范围。会把流的范围按条带大小取整


+ 3
- 3
common/pkgs/mq/agent/server.go View File

@@ -2,7 +2,7 @@ package agent

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/utils/sync2"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
mymq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
)

@@ -19,14 +19,14 @@ type Server struct {
rabbitSvr mq.RabbitMQServer
}

func NewServer(svc Service, id int64, cfg *mymq.Config) (*Server, error) {
func NewServer(svc Service, id cdssdk.NodeID, cfg *mymq.Config) (*Server, error) {
srv := &Server{
service: svc,
}

rabbitSvr, err := mq.NewRabbitMQServer(
cfg.MakeConnectingURL(),
mymq.MakeAgentQueueName(id),
mymq.MakeAgentQueueName(int64(id)),
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},


+ 30
- 0
common/pkgs/mq/coordinator/node.go View File

@@ -3,9 +3,12 @@ package coordinator
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
)

type NodeService interface {
GetHubConfig(msg *GetHubConfig) (*GetHubConfigResp, *mq.CodeMessage)

GetUserNodes(msg *GetUserNodes) (*GetUserNodesResp, *mq.CodeMessage)

GetNodes(msg *GetNodes) (*GetNodesResp, *mq.CodeMessage)
@@ -15,6 +18,33 @@ type NodeService interface {
UpdateNodeConnectivities(msg *UpdateNodeConnectivities) (*UpdateNodeConnectivitiesResp, *mq.CodeMessage)
}

var _ = Register(Service.GetHubConfig)

type GetHubConfig struct {
mq.MessageBodyBase
HubID cdssdk.NodeID `json:"hubID"`
}
type GetHubConfigResp struct {
mq.MessageBodyBase
Hub cdssdk.Node `json:"hub"`
Storages []stgmod.StorageDetail `json:"storages"`
}

func ReqGetHubConfig(hubID cdssdk.NodeID) *GetHubConfig {
return &GetHubConfig{
HubID: hubID,
}
}
func RespGetHubConfig(hub cdssdk.Node, storages []stgmod.StorageDetail) *GetHubConfigResp {
return &GetHubConfigResp{
Hub: hub,
Storages: storages,
}
}
func (client *Client) GetHubConfig(msg *GetHubConfig) (*GetHubConfigResp, error) {
return mq.Request(Service.GetHubConfig, client.rabbitCli, msg)
}

// 查询用户可用的节点
var _ = Register(Service.GetUserNodes)



common/pkgs/storage/shard/storages/local/local.go → common/pkgs/storage/local/shard_store.go View File

@@ -8,12 +8,11 @@ import (
"os"
"path/filepath"

"gitlink.org.cn/cloudream/common/pkgs/async"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/storages/utils"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils"
)

const (
@@ -21,33 +20,35 @@ const (
BlocksDir = "blocks"
)

type Local struct {
type ShardStore struct {
cfg cdssdk.LocalShardStorage
}

func New(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*Local, error) {
func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStore, error) {
_, ok := stg.Address.(*cdssdk.LocalStorageAddress)
if !ok {
return nil, fmt.Errorf("storage address(%T) is not local", stg)
}

return &Local{
return &ShardStore{
cfg: cfg,
}, nil
}

func (s *Local) Start() *async.UnboundChannel[types.StoreEvent] {
// TODO 暂时没有需要后台执行的任务
return async.NewUnboundChannel[types.StoreEvent]()
func (s *ShardStore) Start(ch *types.StorageEventChan) {

}

func (s *ShardStore) Stop() {
}

func (s *Local) New() types.Writer {
func (s *ShardStore) New() types.ShardWriter {
file, err := os.CreateTemp(filepath.Join(s.cfg.Root, "tmp"), "tmp-*")
if err != nil {
return utils.ErrorWriter(err)
return utils.ErrorShardWriter(err)
}

return &Writer{
return &ShardWriter{
path: filepath.Join(s.cfg.Root, "tmp", file.Name()),
file: file,
hasher: sha256.New(),
@@ -56,7 +57,7 @@ func (s *Local) New() types.Writer {
}

// 使用F函数创建Option对象
func (s *Local) Open(opt types.OpenOption) (io.ReadCloser, error) {
func (s *ShardStore) Open(opt types.OpenOption) (io.ReadCloser, error) {
fileName := string(opt.FileHash)
if len(fileName) < 2 {
return nil, fmt.Errorf("invalid file name")
@@ -83,7 +84,7 @@ func (s *Local) Open(opt types.OpenOption) (io.ReadCloser, error) {
return file, nil
}

func (s *Local) ListAll() ([]types.FileInfo, error) {
func (s *ShardStore) ListAll() ([]types.FileInfo, error) {
var infos []types.FileInfo

blockDir := filepath.Join(s.cfg.Root, BlocksDir)
@@ -113,7 +114,7 @@ func (s *Local) ListAll() ([]types.FileInfo, error) {
return infos, nil
}

func (s *Local) Purge(removes []cdssdk.FileHash) error {
func (s *ShardStore) Purge(removes []cdssdk.FileHash) error {
for _, hash := range removes {
fileName := string(hash)

@@ -128,19 +129,19 @@ func (s *Local) Purge(removes []cdssdk.FileHash) error {
return nil
}

func (s *Local) Stats() types.Stats {
func (s *ShardStore) Stats() types.Stats {
// TODO 统计本地存储的相关信息
return types.Stats{
Status: types.StatusOK,
}
}

func (s *Local) onWritterAbort(w *Writer) {
func (s *ShardStore) onWritterAbort(w *ShardWriter) {
logger.Debugf("writting file %v aborted", w.path)
s.removeTempFile(w.path)
}

func (s *Local) onWritterFinish(w *Writer, hash cdssdk.FileHash) (types.FileInfo, error) {
func (s *ShardStore) onWritterFinish(w *ShardWriter, hash cdssdk.FileHash) (types.FileInfo, error) {
logger.Debugf("write file %v finished, size: %v, hash: %v", w.path, w.size, hash)

blockDir := filepath.Join(s.cfg.Root, BlocksDir, string(hash)[:2])
@@ -166,7 +167,7 @@ func (s *Local) onWritterFinish(w *Writer, hash cdssdk.FileHash) (types.FileInfo
}, nil
}

func (s *Local) removeTempFile(path string) {
func (s *ShardStore) removeTempFile(path string) {
err := os.Remove(path)
if err != nil {
logger.Warnf("removing temp file %v: %v", path, err)

common/pkgs/storage/temp/local.go → common/pkgs/storage/local/temp_store.go View File

@@ -1,4 +1,4 @@
package tempstore
package local

import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"


common/pkgs/storage/shard/storages/local/writer.go → common/pkgs/storage/local/writer.go View File

@@ -8,19 +8,19 @@ import (
"strings"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type Writer struct {
type ShardWriter struct {
path string
file *os.File
hasher hash.Hash
size int64
closed bool
owner *Local
owner *ShardStore
}

func (w *Writer) Write(data []byte) (int, error) {
func (w *ShardWriter) Write(data []byte) (int, error) {
n, err := w.file.Write(data)
if err != nil {
return 0, err
@@ -32,7 +32,7 @@ func (w *Writer) Write(data []byte) (int, error) {
}

// 取消写入
func (w *Writer) Abort() error {
func (w *ShardWriter) Abort() error {
if w.closed {
return nil
}
@@ -44,7 +44,7 @@ func (w *Writer) Abort() error {
}

// 结束写入,获得文件哈希值
func (w *Writer) Finish() (types.FileInfo, error) {
func (w *ShardWriter) Finish() (types.FileInfo, error) {
if w.closed {
return types.FileInfo{}, fmt.Errorf("stream closed")
}

+ 10
- 0
common/pkgs/storage/mgr/create_components.go View File

@@ -0,0 +1,10 @@
package mgr

import (
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func createComponents(detail stgmod.StorageDetail, ch *types.StorageEventChan, stg *storage) error {
return nil
}

+ 27
- 0
common/pkgs/storage/mgr/create_shardstore.go View File

@@ -0,0 +1,27 @@
package mgr

import (
"fmt"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/local"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func createShardStore(detail stgmod.StorageDetail, ch *types.StorageEventChan, stg *storage) error {
switch confg := detail.Shard.Config.(type) {
case *cdssdk.LocalShardStorage:
store, err := local.NewShardStore(detail.Storage, *confg)
if err != nil {
return fmt.Errorf("new local shard store: %v", err)
}

store.Start(ch)
stg.Shard = store
return nil

default:
return fmt.Errorf("unsupported shard store type: %T", confg)
}
}

+ 10
- 0
common/pkgs/storage/mgr/create_sharedstore.go View File

@@ -0,0 +1,10 @@
package mgr

import (
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func createSharedStore(detail stgmod.StorageDetail, ch *types.StorageEventChan, stg *storage) error {
return nil
}

+ 167
- 0
common/pkgs/storage/mgr/mgr.go View File

@@ -0,0 +1,167 @@
package mgr

import (
"errors"
"reflect"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/async"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

var ErrStorageNotFound = errors.New("storage not found")

var ErrComponentNotFound = errors.New("component not found")

var ErrStorageExists = errors.New("storage already exists")

type storage struct {
Shard types.ShardStore
Shared types.SharedStore
Components []types.StorageComponent
}

type Manager struct {
storages map[cdssdk.StorageID]*storage
lock sync.Mutex
eventChan *types.StorageEventChan
}

func NewManager() *Manager {
return &Manager{
storages: make(map[cdssdk.StorageID]*storage),
eventChan: async.NewUnboundChannel[types.StorageEvent](),
}
}

func (m *Manager) InitStorage(detail stgmod.StorageDetail) error {
m.lock.Lock()
defer m.lock.Unlock()

if _, ok := m.storages[detail.Storage.StorageID]; ok {
return ErrStorageExists
}

stg := &storage{}

if detail.Shard != nil {
err := createShardStore(detail, m.eventChan, stg)
if err != nil {
stopStorage(stg)
return err
}
}

if detail.Shared != nil {
err := createSharedStore(detail, m.eventChan, stg)
if err != nil {
stopStorage(stg)
return err
}
}

// 创建其他组件
err := createComponents(detail, m.eventChan, stg)
if err != nil {
stopStorage(stg)
return err
}

m.storages[detail.Storage.StorageID] = stg
return nil
}

func stopStorage(stg *storage) {
if stg.Shard != nil {
stg.Shard.Stop()
}

if stg.Shared != nil {
stg.Shared.Stop()
}

for _, c := range stg.Components {
c.Stop()
}
}

// 查找指定Storage的ShardStore组件
func (m *Manager) GetShardStore(stgID cdssdk.StorageID) (types.ShardStore, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return nil, ErrStorageNotFound
}

if stg.Shard == nil {
return nil, ErrComponentNotFound
}

return stg.Shard, nil
}

// 查找指定Storage的SharedStore组件
func (m *Manager) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return nil, ErrStorageNotFound
}

if stg.Shared == nil {
return nil, ErrComponentNotFound
}

return stg.Shared, nil
}

// 查找指定Storage的指定类型的组件,可以是ShardStore、SharedStore、或者其他自定义的组件
func (m *Manager) GetComponent(stgID cdssdk.StorageID, typ reflect.Type) (types.StorageComponent, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return nil, ErrStorageNotFound
}

switch typ {
case reflect2.TypeOf[types.ShardStore]():
if stg.Shard == nil {
return nil, ErrComponentNotFound
}

return stg.Shard, nil
case reflect2.TypeOf[types.SharedStore]():
if stg.Shared == nil {
return nil, ErrComponentNotFound
}

return stg.Shared, nil
default:
for _, c := range stg.Components {
if reflect.TypeOf(c) == typ {
return c, nil
}
}

return nil, ErrComponentNotFound
}
}

func GetComponent[T types.StorageComponent](mgr *Manager, stgID cdssdk.StorageID) (T, error) {
ret, err := mgr.GetComponent(stgID, reflect2.TypeOf[T]())
if err != nil {
var def T
return def, err
}

return ret.(T), nil
}

+ 24
- 0
common/pkgs/storage/obs/obs.go View File

@@ -0,0 +1,24 @@
package obs

type OBSClient struct {
}

func (c *OBSClient) InitiateMultipartUpload(objectName string) (string, error) {
return "", nil
}

func (c *OBSClient) UploadPart() {

}

func (c *OBSClient) CompleteMultipartUpload() (string, error) {
return "", nil
}

func (c *OBSClient) AbortMultipartUpload() {

}

func (c *OBSClient) Close() {

}

+ 56
- 0
common/pkgs/storage/oss/oss.go View File

@@ -0,0 +1,56 @@
package oss

import (
"fmt"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"log"
)

type OSSClient struct {
client *oss.Client
bucket *oss.Bucket
}

func (c *OSSClient) InitiateMultipartUpload(objectName string) (string, error) {
imur, err := c.bucket.InitiateMultipartUpload(objectName)
if err != nil {
return "", fmt.Errorf("failed to initiate multipart upload: %w", err)
}
return imur.UploadID, nil
}

func NewOSSClient(obs stgmod.ObjectStorage) *OSSClient {
// 创建OSSClient实例。
client, err := oss.New(obs.Endpoint, obs.AK, obs.SK)
if err != nil {
log.Fatalf("Error: %v", err)
}

bucket, err := client.Bucket(obs.Bucket)
if err != nil {
log.Fatalf("Error: %v", err)
}

return &OSSClient{
client: client,
bucket: bucket,
}
}

func (c *OSSClient) UploadPart() {

}

func (c *OSSClient) CompleteMultipartUpload() (string, error) {
return "", nil
}

func (c *OSSClient) AbortMultipartUpload() {

}

func (c *OSSClient) Close() {
// 关闭client

}

+ 0
- 68
common/pkgs/storage/shard/pool/pool.go View File

@@ -1,68 +0,0 @@
package pool

import (
"fmt"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/async"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/storages/local"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types"
)

type ShardStorePool struct {
stores map[cdssdk.StorageID]*shardStore
lock sync.Mutex
}

func New() *ShardStorePool {
return &ShardStorePool{
stores: make(map[cdssdk.StorageID]*shardStore),
}
}

func (p *ShardStorePool) PutNew(stg cdssdk.Storage, config cdssdk.ShardStoreConfig) error {
p.lock.Lock()
defer p.lock.Unlock()

switch confg := config.(type) {
case *cdssdk.LocalShardStorage:
if _, ok := p.stores[stg.StorageID]; ok {
return fmt.Errorf("storage %s already exists", stg.StorageID)
}

store, err := local.New(stg, *confg)
if err != nil {
return fmt.Errorf("new local shard store: %v", err)
}

ch := store.Start()

p.stores[stg.StorageID] = &shardStore{
Store: store,
EventChan: ch,
}
return nil

default:
return fmt.Errorf("unsupported shard store type: %T", confg)
}
}

// 不存在时返回nil
func (p *ShardStorePool) Get(stgID cdssdk.StorageID) types.ShardStore {
p.lock.Lock()
defer p.lock.Unlock()

store, ok := p.stores[stgID]
if !ok {
return nil
}

return store.Store
}

type shardStore struct {
Store types.ShardStore
EventChan *async.UnboundChannel[types.StoreEvent]
}

+ 0
- 26
common/pkgs/storage/shard/storages/utils/utils.go View File

@@ -1,26 +0,0 @@
package utils

import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types"

type errorWriter struct {
err error
}

func (w *errorWriter) Write(data []byte) (int, error) {
return 0, w.err
}

// 取消写入。要求允许在调用了Finish之后再调用此函数,且此时不应该有任何影响。
// 方便defer机制
func (w *errorWriter) Abort() error {
return w.err
}

// 结束写入,获得文件哈希值
func (w *errorWriter) Finish() (types.FileInfo, error) {
return types.FileInfo{}, w.err
}

func ErrorWriter(err error) types.Writer {
return &errorWriter{err: err}
}

+ 0
- 58
common/pkgs/storage/shard/types/option.go View File

@@ -1,58 +0,0 @@
package types

import (
"fmt"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type OpenOption struct {
FileHash cdssdk.FileHash
Offset int64
Length int64
}

func NewOpen(fileHash cdssdk.FileHash) OpenOption {
return OpenOption{
FileHash: fileHash,
Offset: 0,
Length: -1,
}
}

func (o *OpenOption) WithLength(len int64) OpenOption {
o.Length = len
return *o
}

// [start, end],即包含end
func (o *OpenOption) WithRange(start int64, end int64) OpenOption {
o.Offset = start
o.Length = end - start + 1
return *o
}

func (o *OpenOption) WithNullableLength(offset int64, length *int64) {
o.Offset = offset
if length != nil {
o.Length = *length
}
}

func (o *OpenOption) String() string {
rangeStart := ""
if o.Offset > 0 {
rangeStart = fmt.Sprintf("%d", o.Offset)
}

rangeEnd := ""
if o.Length >= 0 {
rangeEnd = fmt.Sprintf("%d", o.Offset+o.Length-1)
}

if rangeStart == "" && rangeEnd == "" {
return string(o.FileHash)
}

return fmt.Sprintf("%s[%s:%s]", string(o.FileHash), rangeStart, rangeEnd)
}

+ 30
- 0
common/pkgs/storage/types/s3_client.go View File

@@ -0,0 +1,30 @@
package types

import (
"fmt"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/obs"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/oss"
)

//type ObjectStorageInfo interface {
// NewClient() (ObjectStorageClient, error)
//}

type ObjectStorageClient interface {
InitiateMultipartUpload(objectName string) (string, error)
UploadPart()
CompleteMultipartUpload() (string, error)
AbortMultipartUpload()
Close()
}

func NewObjectStorageClient(info stgmod.ObjectStorage) (ObjectStorageClient, error) {
switch info.Manufacturer {
case stgmod.AliCloud:
return oss.NewOSSClient(info), nil
case stgmod.HuaweiCloud:
return &obs.OBSClient{}, nil
}
return nil, fmt.Errorf("unknown cloud storage manufacturer %s", info.Manufacturer)
}

common/pkgs/storage/shard/types/shardstore.go → common/pkgs/storage/types/shard_store.go View File

@@ -1,9 +1,9 @@
package types

import (
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/async"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

@@ -23,10 +23,9 @@ type StoreEvent interface {
}

type ShardStore interface {
// 启动服务
Start() *async.UnboundChannel[StoreEvent]
StorageComponent
// 准备写入一个新文件,写入后获得FileHash
New() Writer
New() ShardWriter
// 使用F函数创建Option对象
Open(opt OpenOption) (io.ReadCloser, error)
// 获取所有文件信息,尽量保证操作是原子的
@@ -62,7 +61,7 @@ type Stats struct {
Description string
}

type Writer interface {
type ShardWriter interface {
io.Writer
// 取消写入。要求允许在调用了Finish之后再调用此函数,且此时不应该有任何影响。
// 方便defer机制
@@ -70,3 +69,54 @@ type Writer interface {
// 结束写入,获得文件哈希值
Finish() (FileInfo, error)
}

type OpenOption struct {
FileHash cdssdk.FileHash
Offset int64
Length int64
}

func NewOpen(fileHash cdssdk.FileHash) OpenOption {
return OpenOption{
FileHash: fileHash,
Offset: 0,
Length: -1,
}
}

func (o *OpenOption) WithLength(len int64) OpenOption {
o.Length = len
return *o
}

// [start, end],即包含end
func (o *OpenOption) WithRange(start int64, end int64) OpenOption {
o.Offset = start
o.Length = end - start + 1
return *o
}

func (o *OpenOption) WithNullableLength(offset int64, length *int64) {
o.Offset = offset
if length != nil {
o.Length = *length
}
}

func (o *OpenOption) String() string {
rangeStart := ""
if o.Offset > 0 {
rangeStart = fmt.Sprintf("%d", o.Offset)
}

rangeEnd := ""
if o.Length >= 0 {
rangeEnd = fmt.Sprintf("%d", o.Offset+o.Length-1)
}

if rangeStart == "" && rangeEnd == "" {
return string(o.FileHash)
}

return fmt.Sprintf("%s[%s:%s]", string(o.FileHash), rangeStart, rangeEnd)
}

+ 5
- 0
common/pkgs/storage/types/shared_store.go View File

@@ -0,0 +1,5 @@
package types

type SharedStore interface {
StorageComponent
}

+ 5
- 0
common/pkgs/storage/types/temp_store.go View File

@@ -0,0 +1,5 @@
package types

type TempStore interface {
StorageComponent
}

+ 12
- 0
common/pkgs/storage/types/types.go View File

@@ -0,0 +1,12 @@
package types

import "gitlink.org.cn/cloudream/common/pkgs/async"

type StorageEvent interface{}

type StorageEventChan = async.UnboundChannel[StorageEvent]

type StorageComponent interface {
Start(ch *StorageEventChan)
Stop()
}

+ 26
- 0
common/pkgs/storage/utils/utils.go View File

@@ -0,0 +1,26 @@
package utils

import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"

type errorShardWriter struct {
err error
}

func (w *errorShardWriter) Write(data []byte) (int, error) {
return 0, w.err
}

// 取消写入。要求允许在调用了Finish之后再调用此函数,且此时不应该有任何影响。
// 方便defer机制
func (w *errorShardWriter) Abort() error {
return w.err
}

// 结束写入,获得文件哈希值
func (w *errorShardWriter) Finish() (types.FileInfo, error) {
return types.FileInfo{}, w.err
}

func ErrorShardWriter(err error) types.ShardWriter {
return &errorShardWriter{err: err}
}

+ 56
- 0
coordinator/internal/mq/node.go View File

@@ -2,6 +2,8 @@ package mq

import (
"fmt"

stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"

"gitlink.org.cn/cloudream/common/consts/errorcode"
@@ -11,6 +13,60 @@ import (
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

func (svc *Service) GetHubConfig(msg *coormq.GetHubConfig) (*coormq.GetHubConfigResp, *mq.CodeMessage) {
log := logger.WithField("HubID", msg.HubID)

hub, err := svc.db2.Node().GetByID(svc.db2.DefCtx(), msg.HubID)
if err != nil {
log.Warnf("getting hub: %v", err)
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting hub: %v", err))
}

detailsMap := make(map[cdssdk.StorageID]*stgmod.StorageDetail)

stgs, err := svc.db2.Storage().GetHubStorages(svc.db2.DefCtx(), msg.HubID)
if err != nil {
log.Warnf("getting hub storages: %v", err)
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting hub storages: %v", err))
}

var stgIDs []cdssdk.StorageID
for _, stg := range stgs {
detailsMap[stg.StorageID] = &stgmod.StorageDetail{
Storage: stg,
MasterHub: &hub,
}
stgIDs = append(stgIDs, stg.StorageID)
}

shards, err := svc.db2.ShardStorage().BatchGetByStorageIDs(svc.db2.DefCtx(), stgIDs)
if err != nil {
log.Warnf("getting shard storages: %v", err)
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting shard storages: %v", err))
}
for _, shard := range shards {
sh := shard
detailsMap[shard.StorageID].Shard = &sh
}

shareds, err := svc.db2.SharedStorage().BatchGetByStorageIDs(svc.db2.DefCtx(), stgIDs)
if err != nil {
log.Warnf("getting shared storages: %v", err)
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting shared storages: %v", err))
}
for _, shared := range shareds {
sh := shared
detailsMap[shared.StorageID].Shared = &sh
}

var details []stgmod.StorageDetail
for _, detail := range detailsMap {
details = append(details, *detail)
}

return mq.ReplyOK(coormq.RespGetHubConfig(hub, details))
}

func (svc *Service) GetUserNodes(msg *coormq.GetUserNodes) (*coormq.GetUserNodesResp, *mq.CodeMessage) {
nodes, err := svc.db2.Node().GetUserNodes(svc.db2.DefCtx(), msg.UserID)
if err != nil {


+ 5
- 26
go.mod View File

@@ -10,7 +10,6 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf
github.com/jedib0t/go-pretty/v6 v6.4.7
github.com/jmoiron/sqlx v1.3.5
github.com/klauspost/reedsolomon v1.11.8
github.com/magefile/mage v1.15.0
github.com/samber/lo v1.38.1
@@ -19,21 +18,21 @@ require (
gitlink.org.cn/cloudream/common v0.0.0
google.golang.org/grpc v1.57.0
google.golang.org/protobuf v1.31.0
gorm.io/gorm v1.25.7
)

require (
github.com/google/uuid v1.3.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/stretchr/testify v1.8.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

require (
github.com/antonfisher/nested-logrus-formatter v1.3.1 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.13.0 // indirect
github.com/go-playground/universal-translator v0.17.0 // indirect
@@ -46,36 +45,18 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/imdario/mergo v0.3.15 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/ipfs/boxo v0.12.0 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/ipfs/go-ipfs-api v0.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.26.3 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr v0.8.0 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/sirupsen/logrus v1.9.2 // indirect
github.com/sirupsen/logrus v1.9.2
github.com/smarty/assertions v1.15.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/streadway/amqp v1.1.0 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
@@ -95,7 +76,5 @@ require (
google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/driver/mysql v1.5.7
lukechampine.com/blake3 v1.1.7 // indirect
)

+ 2
- 62
go.sum View File

@@ -1,23 +1,14 @@
github.com/antonfisher/nested-logrus-formatter v1.3.1 h1:NFJIr+pzwv5QLHTPyKz9UMEoHck02Q9L0FP13b/xSbQ=
github.com/antonfisher/nested-logrus-formatter v1.3.1/go.mod h1:6WTfyWFkBc9+zyBaKIqRrg/KwMqBbodBjgbHjDz7zjA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 h1:HVTnpeuvF6Owjd5mniCL8DEXo7uYXdQEmOP4FJbV5tg=
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3/go.mod h1:p1d6YEZWvFzEh4KLyvBcVSnrfNDDvK2zfK/4x2v/4pE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1owhMVTHFZIlnvd4=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs=
@@ -31,7 +22,6 @@ github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+
github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4=
github.com/go-playground/validator/v10 v10.8.0 h1:1kAa0fCrnpv+QYdkdcRzrRM7AyYs5o8+jZdJCz9xj6k=
github.com/go-playground/validator/v10 v10.8.0/go.mod h1:9JhgTzTaE31GZDpH/HSvHiRJrJ3iKAgqqH0Bl/Ocjdk=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
@@ -63,20 +53,12 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf h1:FtEj8sfIcaaBfAKrE1Cwb61YDtYq9JxChK1c7AKce7s=
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf/go.mod h1:yrqSXGoD/4EKfF26AOGzscPOgTTJcyAwM2rpixWT+t4=
github.com/ipfs/boxo v0.12.0 h1:AXHg/1ONZdRQHQLgG5JHsSC3XoE4DjCAMgK+asZvUcQ=
github.com/ipfs/boxo v0.12.0/go.mod h1:xAnfiU6PtxWCnRqu7dcXQ10bB5/kvI1kXRotuGqGBhg=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk=
github.com/ipfs/go-ipfs-api v0.7.0 h1:CMBNCUl0b45coC+lQCXEVpMhwoqjiaCwUIrM+coYW2Q=
github.com/ipfs/go-ipfs-api v0.7.0/go.mod h1:AIxsTNB0+ZhkqIfTZpdZ0VR/cpX5zrXjATa3prSay3g=
github.com/jedib0t/go-pretty/v6 v6.4.7 h1:lwiTJr1DEkAgzljsUsORmWsVn5MQjt1BPJdPCtJ6KXE=
github.com/jedib0t/go-pretty/v6 v6.4.7/go.mod h1:Ndk3ase2CkQbXLLNf5QDHoYb6J9WtVfmHZu9n8rk2xs=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
@@ -84,8 +66,6 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
github.com/klauspost/reedsolomon v1.11.8 h1:s8RpUW5TK4hjr+djiOpbZJB4ksx+TdYbRH7vHQpwPOY=
@@ -94,14 +74,6 @@ github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgx
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8=
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM=
github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro=
github.com/libp2p/go-libp2p v0.26.3 h1:6g/psubqwdaBqNNoidbRKSTBEYgaOuKBhHl8Q5tO+PM=
github.com/libp2p/go-libp2p v0.26.3/go.mod h1:x75BN32YbwuY0Awm2Uix4d4KOz+/4piInkp4Wr3yOo8=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
@@ -109,12 +81,6 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@@ -123,24 +89,6 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aGkbLYxPE=
github.com/multiformats/go-base32 v0.1.0/go.mod h1:Kj3tFY6zNr+ABYMqeUNeGvkIC/UYgtWibDcT0rExnbI=
github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0=
github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4=
github.com/multiformats/go-multiaddr v0.8.0 h1:aqjksEcqK+iD/Foe1RRFsGZh8+XFiGo7FgUCZlpv3LU=
github.com/multiformats/go-multiaddr v0.8.0/go.mod h1:Fs50eBDWvZu+l3/9S6xAE7ZYj6yhxlvaVZjakWN7xRs=
github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g=
github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk=
github.com/multiformats/go-multicodec v0.9.0 h1:pb/dlPnzee/Sxv/j4PmkDRxCOi3hXTz3IbPKOXWJkmg=
github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k=
github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U=
github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM=
github.com/multiformats/go-multistream v0.4.1 h1:rFy0Iiyn3YT0asivDUIR05leAdwZq3de4741sbiSdfo=
github.com/multiformats/go-multistream v0.4.1/go.mod h1:Mz5eykRVAjJWckE2U78c6xqdtyNUEhKSM0Lwar2p77Q=
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/profile v1.6.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -156,8 +104,6 @@ github.com/smarty/assertions v1.15.0 h1:cR//PqUBUiQRakZWqBiFFQ9wb8emQGDb0HeGdqGB
github.com/smarty/assertions v1.15.0/go.mod h1:yABtdzeQs6l1brC900WlRNwj6ZR55d7B+E8C6HtKdec=
github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sSznIX1xY=
github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
@@ -176,6 +122,7 @@ github.com/stretchr/testify v1.7.4/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
@@ -219,8 +166,6 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -241,8 +186,6 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
@@ -274,8 +217,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/mysql v1.5.7 h1:MndhOPYOfEp2rHKgkZIhJ16eVUIRf2HmzgoPmh7FCWo=
gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkDM=
gorm.io/gorm v1.25.7 h1:VsD6acwRjz2zFxGO50gPO6AkNs7KKnvfzUjHQhZDz/A=
gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8=
gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
lukechampine.com/blake3 v1.1.7 h1:GgRMhmdsuK8+ii6UZFDL8Nb+VyMwadAgcJyfYHxG6n0=
lukechampine.com/blake3 v1.1.7/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA=

Loading…
Cancel
Save