diff --git a/agent/internal/grpc/io.go b/agent/internal/grpc/io.go index 84b61ae..49d7805 100644 --- a/agent/internal/grpc/io.go +++ b/agent/internal/grpc/io.go @@ -28,7 +28,11 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe s.swWorker.Add(sw) defer s.swWorker.Remove(sw) - _, err = sw.Run(ctx) + execCtx := exec.NewWithContext(ctx) + + // TODO2 注入依赖 + + _, err = sw.Run(execCtx) if err != nil { return nil, fmt.Errorf("running io plan: %w", err) } diff --git a/agent/internal/http/hub_io.go b/agent/internal/http/hub_io.go index 2fdeb48..8abe0b4 100644 --- a/agent/internal/http/hub_io.go +++ b/agent/internal/http/hub_io.go @@ -1,7 +1,6 @@ package http import ( - "bytes" "context" "fmt" "io" @@ -123,9 +122,6 @@ func (s *IOService) GetStream(ctx *gin.Context) { } func (s *IOService) SendStream(ctx *gin.Context) { - //planID := ctx.PostForm("plan_id") - //varID := ctx.PostForm("var_id") - var req cdsapi.SendStreamReq if err := ctx.ShouldBindJSON(&req); err != nil { logger.Warnf("binding body: %s", err.Error()) @@ -194,25 +190,14 @@ func (s *IOService) SendStream(ctx *gin.Context) { } func (s *IOService) ExecuteIOPlan(ctx *gin.Context) { - bodyBytes, err := io.ReadAll(ctx.Request.Body) + data, err := io.ReadAll(ctx.Request.Body) if err != nil { logger.Warnf("reading body: %s", err.Error()) ctx.JSON(http.StatusInternalServerError, Failed("400", "internal error")) return } - println("Received body: %s", string(bodyBytes)) - ctx.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) // Reset body for subsequent reads - - var req cdsapi.ExecuteIOPlanReq - if err := ctx.ShouldBindJSON(&req); err != nil { - logger.Warnf("binding body: %s", err.Error()) - ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) - return - } - planBytes, err := serder.ObjectToJSON(req.Plan) - // 反序列化 Plan - plan, err := serder.JSONToObjectEx[exec.Plan](planBytes) + plan, err := serder.JSONToObjectEx[exec.Plan](data) if err != nil { ctx.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("deserializing plan: %v", err)}) return @@ -226,11 +211,11 @@ func (s *IOService) ExecuteIOPlan(ctx *gin.Context) { s.svc.swWorker.Add(sw) defer s.svc.swWorker.Remove(sw) - // 设置上下文超时 - c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) - defer cancel() + execCtx := exec.NewWithContext(ctx.Request.Context()) + + // TODO 注入依赖 - _, err = sw.Run(c) + _, err = sw.Run(execCtx) if err != nil { ctx.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("running io plan: %v", err)}) return diff --git a/agent/internal/mq/cache.go b/agent/internal/mq/cache.go index fd18684..907e989 100644 --- a/agent/internal/mq/cache.go +++ b/agent/internal/mq/cache.go @@ -1,62 +1,51 @@ package mq import ( + "fmt" "time" - "github.com/samber/lo" "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" mytask "gitlink.org.cn/cloudream/storage/agent/internal/task" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" ) func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) { - ipfsCli, err := stgglb.IPFSPool.Acquire() + store, err := svc.shardStorePool.Get(msg.StorageID) if err != nil { - logger.Warnf("new ipfs client: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "new ipfs client failed") + return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("finding shard store: %v", err)) } - defer ipfsCli.Close() - files, err := ipfsCli.GetPinnedFiles() + infos, err := store.ListAll() if err != nil { - logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get pinned files from ipfs failed") + return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("listting file in shard store: %v", err)) } - return mq.ReplyOK(agtmq.NewCheckCacheResp(lo.Keys(files))) + var fileHashes []types.FileHash + for _, info := range infos { + fileHashes = append(fileHashes, info.Hash) + } + + return mq.ReplyOK(agtmq.NewCheckCacheResp(fileHashes)) } func (svc *Service) CacheGC(msg *agtmq.CacheGC) (*agtmq.CacheGCResp, *mq.CodeMessage) { - ipfsCli, err := stgglb.IPFSPool.Acquire() + store, err := svc.shardStorePool.Get(msg.StorageID) if err != nil { - logger.Warnf("new ipfs client: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "new ipfs client failed") + return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("finding shard store: %v", err)) } - defer ipfsCli.Close() - files, err := ipfsCli.GetPinnedFiles() + err = store.Purge(msg.Avaiables) if err != nil { - logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get pinned files from ipfs failed") - } - - // unpin所有没有没记录到元数据的文件 - shouldPinnedFiles := lo.SliceToMap(msg.PinnedFileHashes, func(hash string) (string, bool) { return hash, true }) - for hash := range files { - if !shouldPinnedFiles[hash] { - ipfsCli.Unpin(hash) - logger.WithField("FileHash", hash).Debugf("unpinned by gc") - } + return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("purging cache: %v", err)) } return mq.ReplyOK(agtmq.RespCacheGC()) } func (svc *Service) StartCacheMovePackage(msg *agtmq.StartCacheMovePackage) (*agtmq.StartCacheMovePackageResp, *mq.CodeMessage) { - tsk := svc.taskManager.StartNew(mytask.NewCacheMovePackage(msg.UserID, msg.PackageID)) + tsk := svc.taskManager.StartNew(mytask.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.StorageID)) return mq.ReplyOK(agtmq.NewStartCacheMovePackageResp(tsk.ID())) } diff --git a/agent/internal/mq/service.go b/agent/internal/mq/service.go index 18c3e56..a72aa68 100644 --- a/agent/internal/mq/service.go +++ b/agent/internal/mq/service.go @@ -2,14 +2,17 @@ package mq import ( "gitlink.org.cn/cloudream/storage/agent/internal/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool" ) type Service struct { - taskManager *task.Manager + taskManager *task.Manager + shardStorePool *pool.ShardStorePool } -func NewService(taskMgr *task.Manager) *Service { +func NewService(taskMgr *task.Manager, shardStorePool *pool.ShardStorePool) *Service { return &Service{ - taskManager: taskMgr, + taskManager: taskMgr, + shardStorePool: shardStorePool, } } diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index 4be9bf3..cb03028 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -2,6 +2,7 @@ package task import ( "fmt" + "io" "time" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -16,12 +17,14 @@ import ( type CacheMovePackage struct { userID cdssdk.UserID packageID cdssdk.PackageID + storageID cdssdk.StorageID } -func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) *CacheMovePackage { +func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *CacheMovePackage { return &CacheMovePackage{ userID: userID, packageID: packageID, + storageID: storageID, } } @@ -37,6 +40,11 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { log.Debugf("begin with %v", logger.FormatStruct(t)) defer log.Debugf("end") + store, err := ctx.shardStorePool.Get(t.storageID) + if err != nil { + return fmt.Errorf("getting shard store: %w", err) + } + mutex, err := reqbuilder.NewBuilder(). // 保护解码出来的Object数据 IPFS().Buzy(*stgglb.Local.NodeID). @@ -52,12 +60,6 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { } defer stgglb.CoordinatorMQPool.Release(coorCli) - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - return fmt.Errorf("new ipfs client: %w", err) - } - defer ipfsCli.Close() - // TODO 可以考虑优化,比如rep类型的直接pin就可以 objIter := ctx.downloader.DownloadPackage(t.packageID) defer objIter.Close() @@ -72,15 +74,20 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { } defer obj.File.Close() - _, err = ipfsCli.CreateFile(obj.File) + writer := store.New() + _, err = io.Copy(writer, obj.File) + if err != nil { + return fmt.Errorf("writing to store: %w", err) + } + _, err = writer.Finish() if err != nil { - return fmt.Errorf("creating ipfs file: %w", err) + return fmt.Errorf("finishing store: %w", err) } ctx.accessStat.AddAccessCounter(obj.Object.ObjectID, t.packageID, *stgglb.Local.NodeID, 1) } - _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(t.packageID, *stgglb.Local.NodeID)) + _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(t.packageID, t.storageID)) if err != nil { return fmt.Errorf("request to coordinator: %w", err) } diff --git a/agent/internal/task/task.go b/agent/internal/task/task.go index 31c27f8..be00b71 100644 --- a/agent/internal/task/task.go +++ b/agent/internal/task/task.go @@ -6,13 +6,15 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/accessstat" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool" ) type TaskContext struct { - distlock *distlock.Service - connectivity *connectivity.Collector - downloader *downloader.Downloader - accessStat *accessstat.AccessStat + distlock *distlock.Service + connectivity *connectivity.Collector + downloader *downloader.Downloader + accessStat *accessstat.AccessStat + shardStorePool *pool.ShardStorePool } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -27,11 +29,12 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat) Manager { +func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, shardStorePool *pool.ShardStorePool) Manager { return task.NewManager(TaskContext{ - distlock: distlock, - connectivity: connectivity, - downloader: downloader, - accessStat: accessStat, + distlock: distlock, + connectivity: connectivity, + downloader: downloader, + accessStat: accessStat, + shardStorePool: shardStorePool, }) } diff --git a/agent/main.go b/agent/main.go index 69f9620..22333df 100644 --- a/agent/main.go +++ b/agent/main.go @@ -2,11 +2,12 @@ package main import ( "fmt" - "gitlink.org.cn/cloudream/storage/agent/internal/http" "net" "os" "time" + "gitlink.org.cn/cloudream/storage/agent/internal/http" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -18,6 +19,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool" "google.golang.org/grpc" @@ -104,6 +106,9 @@ func main() { }) go serveAccessStat(acStat) + // TODO2 根据配置实例化Store并加入到Pool中 + shardStorePool := pool.New() + distlock, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { logger.Fatalf("new ipfs failed, err: %s", err.Error()) @@ -111,11 +116,11 @@ func main() { dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol) - taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat) + taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat, shardStorePool) // 启动命令服务器 // TODO 需要设计AgentID持久化机制 - agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr), config.Cfg().ID, &config.Cfg().RabbitMQ) + agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, shardStorePool), config.Cfg().ID, &config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new agent server failed, err: %s", err.Error()) } diff --git a/client/internal/cmdline/cache.go b/client/internal/cmdline/cache.go index ac32e92..e4c5f90 100644 --- a/client/internal/cmdline/cache.go +++ b/client/internal/cmdline/cache.go @@ -7,19 +7,19 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) -func CacheMovePackage(ctx CommandContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { +func CacheMovePackage(ctx CommandContext, packageID cdssdk.PackageID, stgID cdssdk.StorageID) error { startTime := time.Now() defer func() { fmt.Printf("%v\n", time.Since(startTime).Seconds()) }() - taskID, err := ctx.Cmdline.Svc.CacheSvc().StartCacheMovePackage(1, packageID, nodeID) + hubID, taskID, err := ctx.Cmdline.Svc.CacheSvc().StartCacheMovePackage(1, packageID, stgID) if err != nil { return fmt.Errorf("start cache moving package: %w", err) } for { - complete, err := ctx.Cmdline.Svc.CacheSvc().WaitCacheMovePackage(nodeID, taskID, time.Second*10) + complete, err := ctx.Cmdline.Svc.CacheSvc().WaitCacheMovePackage(hubID, taskID, time.Second*10) if complete { if err != nil { return fmt.Errorf("moving complete with: %w", err) diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 02f9e37..41c59f4 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -1,5 +1,6 @@ package cmdline +/* import ( "context" "fmt" @@ -248,3 +249,4 @@ func init() { }, }) } +*/ diff --git a/client/internal/http/cache.go b/client/internal/http/cache.go index 0e48f19..cde49d2 100644 --- a/client/internal/http/cache.go +++ b/client/internal/http/cache.go @@ -22,9 +22,9 @@ func (s *Server) Cache() *CacheService { } type CacheMovePackageReq struct { - UserID *cdssdk.UserID `json:"userID" binding:"required"` - PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` - NodeID *cdssdk.NodeID `json:"nodeID" binding:"required"` + UserID cdssdk.UserID `json:"userID" binding:"required"` + PackageID cdssdk.PackageID `json:"packageID" binding:"required"` + StorageID cdssdk.StorageID `json:"storageID" binding:"required"` } type CacheMovePackageResp = cdsapi.CacheMovePackageResp @@ -38,7 +38,7 @@ func (s *CacheService) MovePackage(ctx *gin.Context) { return } - taskID, err := s.svc.CacheSvc().StartCacheMovePackage(*req.UserID, *req.PackageID, *req.NodeID) + hubID, taskID, err := s.svc.CacheSvc().StartCacheMovePackage(req.UserID, req.PackageID, req.StorageID) if err != nil { log.Warnf("start cache move package: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed")) @@ -46,7 +46,7 @@ func (s *CacheService) MovePackage(ctx *gin.Context) { } for { - complete, err := s.svc.CacheSvc().WaitCacheMovePackage(*req.NodeID, taskID, time.Second*10) + complete, err := s.svc.CacheSvc().WaitCacheMovePackage(hubID, taskID, time.Second*10) if complete { if err != nil { log.Warnf("moving complete with: %s", err.Error()) diff --git a/client/internal/services/cache.go b/client/internal/services/cache.go index a32e456..9c26efa 100644 --- a/client/internal/services/cache.go +++ b/client/internal/services/cache.go @@ -19,23 +19,38 @@ func (svc *Service) CacheSvc() *CacheService { return &CacheService{Service: svc} } -func (svc *CacheService) StartCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) (string, error) { - agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) +func (svc *CacheService) StartCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgID cdssdk.StorageID) (cdssdk.NodeID, string, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return 0, "", fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getStg, err := coorCli.GetStorageDetail(coormq.ReqGetStorageDetail(stgID)) + if err != nil { + return 0, "", fmt.Errorf("get storage detail: %w", err) + } + + if getStg.Storage.Shard == nil { + return 0, "", fmt.Errorf("shard storage is not enabled") + } + + agentCli, err := stgglb.AgentMQPool.Acquire(getStg.Storage.Shard.MasterHub) if err != nil { - return "", fmt.Errorf("new agent client: %w", err) + return 0, "", fmt.Errorf("new agent client: %w", err) } defer stgglb.AgentMQPool.Release(agentCli) - startResp, err := agentCli.StartCacheMovePackage(agtmq.NewStartCacheMovePackage(userID, packageID)) + startResp, err := agentCli.StartCacheMovePackage(agtmq.NewStartCacheMovePackage(userID, packageID, stgID)) if err != nil { - return "", fmt.Errorf("start cache move package: %w", err) + return 0, "", fmt.Errorf("start cache move package: %w", err) } - return startResp.TaskID, nil + return getStg.Storage.Shard.MasterHub, startResp.TaskID, nil } -func (svc *CacheService) WaitCacheMovePackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, error) { - agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) +func (svc *CacheService) WaitCacheMovePackage(hubID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, error) { + agentCli, err := stgglb.AgentMQPool.Acquire(hubID) if err != nil { return true, fmt.Errorf("new agent client: %w", err) } diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index e961319..ff3ee74 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -57,12 +57,16 @@ func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, package } defer stgglb.CoordinatorMQPool.Release(coorCli) - stgResp, err := coorCli.GetStorage(coormq.ReqGetStorage(userID, storageID)) + stgResp, err := coorCli.GetStorageDetail(coormq.ReqGetStorageDetail(storageID)) if err != nil { return 0, "", fmt.Errorf("getting storage info: %w", err) } - agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.Storage.NodeID) + if stgResp.Storage.Shard == nil { + return 0, "", fmt.Errorf("shard storage is not enabled") + } + + agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.Storage.Shard.MasterHub) if err != nil { return 0, "", fmt.Errorf("new agent client: %w", err) } @@ -73,7 +77,7 @@ func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, package return 0, "", fmt.Errorf("start storage load package: %w", err) } - return stgResp.Storage.NodeID, startResp.TaskID, nil + return stgResp.Storage.Shard.MasterHub, startResp.TaskID, nil } type StorageLoadPackageResult struct { @@ -124,12 +128,16 @@ func (svc *StorageService) StartStorageCreatePackage(userID cdssdk.UserID, bucke } defer stgglb.CoordinatorMQPool.Release(coorCli) - stgResp, err := coorCli.GetStorage(coormq.ReqGetStorage(userID, storageID)) + stgResp, err := coorCli.GetStorageDetail(coormq.ReqGetStorageDetail(storageID)) if err != nil { return 0, "", fmt.Errorf("getting storage info: %w", err) } - agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.Storage.NodeID) + if stgResp.Storage.Shard == nil { + return 0, "", fmt.Errorf("shard storage is not enabled") + } + + agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.Storage.Shard.MasterHub) if err != nil { return 0, "", fmt.Errorf("new agent client: %w", err) } @@ -140,7 +148,7 @@ func (svc *StorageService) StartStorageCreatePackage(userID cdssdk.UserID, bucke return 0, "", fmt.Errorf("start storage upload package: %w", err) } - return stgResp.Storage.NodeID, startResp.TaskID, nil + return stgResp.Storage.Shard.MasterHub, startResp.TaskID, nil } func (svc *StorageService) WaitStorageCreatePackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, cdssdk.PackageID, error) { diff --git a/common/models/models.go b/common/models/models.go index b11fb3d..c60cce7 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -64,7 +64,7 @@ func DetailsFillPinnedAt(objs []ObjectDetail, pinnedAt []cdssdk.PinnedObject) { if pinnedAt[pinnedCur].ObjectID != obj.Object.ObjectID { break } - obj.PinnedAt = append(obj.PinnedAt, pinnedAt[pinnedCur].NodeID) + obj.PinnedAt = append(obj.PinnedAt, pinnedAt[pinnedCur].StorageID) } } } @@ -116,7 +116,7 @@ type ObjectAccessStat struct { } type StorageDetail struct { - Storage cdssdk.Storage `json:"storage"` - Shard cdssdk.ShardStorage `json:"shard"` - Shared cdssdk.SharedStorage `json:"shared"` + Storage cdssdk.Storage `json:"storage"` + Shard *cdssdk.ShardStorage `json:"shard"` + Shared *cdssdk.SharedStorage `json:"shared"` } diff --git a/common/pkgs/cmd/upload_objects.go b/common/pkgs/cmd/upload_objects.go index cfc9a15..162736a 100644 --- a/common/pkgs/cmd/upload_objects.go +++ b/common/pkgs/cmd/upload_objects.go @@ -225,7 +225,10 @@ func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { return "", fmt.Errorf("parsing plan: %w", err) } - exec := plans.Execute() + // TODO2 注入依赖 + exeCtx := exec.NewExecContext() + + exec := plans.Execute(exeCtx) exec.BeginWrite(io.NopCloser(file), hd) ret, err := exec.Wait(context.TODO()) if err != nil { diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index a8dcc3c..d732c85 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -341,7 +341,7 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.Updating for _, p := range obj.PinnedAt { pinneds = append(pinneds, cdssdk.PinnedObject{ ObjectID: obj.ObjectID, - NodeID: p, + StorageID: p, CreateTime: time.Now(), }) } diff --git a/common/pkgs/db/pinned_object.go b/common/pkgs/db/pinned_object.go deleted file mode 100644 index 63964ac..0000000 --- a/common/pkgs/db/pinned_object.go +++ /dev/null @@ -1,139 +0,0 @@ -package db - -import ( - "time" - - "github.com/jmoiron/sqlx" - "github.com/samber/lo" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" -) - -type PinnedObjectDB struct { - *DB -} - -func (db *DB) PinnedObject() *PinnedObjectDB { - return &PinnedObjectDB{DB: db} -} - -func (*PinnedObjectDB) GetByNodeID(ctx SQLContext, nodeID cdssdk.NodeID) ([]cdssdk.PinnedObject, error) { - var ret []cdssdk.PinnedObject - err := sqlx.Select(ctx, &ret, "select * from PinnedObject where NodeID = ?", nodeID) - return ret, err -} - -func (*PinnedObjectDB) GetObjectsByNodeID(ctx SQLContext, nodeID cdssdk.NodeID) ([]cdssdk.Object, error) { - var ret []model.TempObject - err := sqlx.Select(ctx, &ret, "select Object.* from PinnedObject, Object where PinnedObject.ObjectID = Object.ObjectID and NodeID = ?", nodeID) - return lo.Map(ret, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), err -} - -func (*PinnedObjectDB) Create(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID, createTime time.Time) error { - _, err := ctx.Exec("insert into PinnedObject values(?,?,?)", nodeID, objectID, createTime) - return err -} - -func (*PinnedObjectDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]cdssdk.PinnedObject, error) { - if len(objectIDs) == 0 { - return nil, nil - } - - stmt, args, err := sqlx.In("select * from PinnedObject where ObjectID in (?) order by ObjectID asc", objectIDs) - if err != nil { - return nil, err - } - stmt = ctx.Rebind(stmt) - - var pinneds []cdssdk.PinnedObject - err = sqlx.Select(ctx, &pinneds, stmt, args...) - if err != nil { - return nil, err - } - - return pinneds, nil -} - -func (*PinnedObjectDB) TryCreate(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID, createTime time.Time) error { - _, err := ctx.Exec("insert ignore into PinnedObject values(?,?,?)", nodeID, objectID, createTime) - return err -} - -func (*PinnedObjectDB) BatchTryCreate(ctx SQLContext, pinneds []cdssdk.PinnedObject) error { - if len(pinneds) == 0 { - return nil - } - - return BatchNamedExec(ctx, "insert ignore into PinnedObject values(:ObjectID,:NodeID,:CreateTime)", 3, pinneds, nil) -} - -func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { - _, err := ctx.Exec( - "insert ignore into PinnedObject(NodeID, ObjectID, CreateTime) select ? as NodeID, ObjectID, ? as CreateTime from Object where PackageID = ?", - nodeID, - time.Now(), - packageID, - ) - return err -} - -func (db *PinnedObjectDB) ObjectBatchCreate(ctx SQLContext, objectID cdssdk.ObjectID, nodeIDs []cdssdk.NodeID) error { - if len(nodeIDs) == 0 { - return nil - } - - for _, id := range nodeIDs { - err := db.TryCreate(ctx, id, objectID, time.Now()) - if err != nil { - return err - } - } - return nil -} - -func (*PinnedObjectDB) Delete(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID) error { - _, err := ctx.Exec("delete from PinnedObject where NodeID = ? and ObjectID = ?", nodeID, objectID) - return err -} - -func (*PinnedObjectDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error { - _, err := ctx.Exec("delete from PinnedObject where ObjectID = ?", objectID) - return err -} - -func (*PinnedObjectDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { - if len(objectIDs) == 0 { - return nil - } - - // TODO in语句有长度限制 - query, args, err := sqlx.In("delete from PinnedObject where ObjectID in (?)", objectIDs) - if err != nil { - return err - } - _, err = ctx.Exec(query, args...) - return err -} - -func (*PinnedObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { - _, err := ctx.Exec("delete PinnedObject from PinnedObject inner join Object on PinnedObject.ObjectID = Object.ObjectID where PackageID = ?", packageID) - return err -} - -func (*PinnedObjectDB) DeleteInPackageAtNode(ctx SQLContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { - _, err := ctx.Exec("delete PinnedObject from PinnedObject inner join Object on PinnedObject.ObjectID = Object.ObjectID where PackageID = ? and NodeID = ?", packageID, nodeID) - return err -} - -func (*PinnedObjectDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, objectIDs []cdssdk.ObjectID) error { - if len(objectIDs) == 0 { - return nil - } - - query, args, err := sqlx.In("delete from PinnedObject where NodeID = ? and ObjectID in (?)", nodeID, objectIDs) - if err != nil { - return err - } - _, err = ctx.Exec(query, args...) - return err -} diff --git a/common/pkgs/db2/pinned_object.go b/common/pkgs/db2/pinned_object.go new file mode 100644 index 0000000..8b31dca --- /dev/null +++ b/common/pkgs/db2/pinned_object.go @@ -0,0 +1,118 @@ +package db2 + +import ( + "time" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gorm.io/gorm/clause" +) + +type PinnedObjectDB struct { + *DB +} + +func (db *DB) PinnedObject() *PinnedObjectDB { + return &PinnedObjectDB{DB: db} +} + +func (*PinnedObjectDB) GetByStorageID(ctx SQLContext, stgID cdssdk.StorageID) ([]cdssdk.PinnedObject, error) { + var ret []cdssdk.PinnedObject + err := ctx.Table("PinnedObject").Find(&ret, "StorageID = ?", stgID).Error + return ret, err +} + +func (*PinnedObjectDB) GetObjectsByStorageID(ctx SQLContext, stgID cdssdk.StorageID) ([]cdssdk.Object, error) { + var ret []cdssdk.Object + err := ctx.Table("Object").Joins("inner join PinnedObject on Object.ObjectID = PinnedObject.ObjectID").Where("StorageID = ?", stgID).Find(&ret).Error + return ret, err +} + +func (*PinnedObjectDB) Create(ctx SQLContext, stgID cdssdk.StorageID, objectID cdssdk.ObjectID, createTime time.Time) error { + return ctx.Table("PinnedObject").Create(&cdssdk.PinnedObject{StorageID: stgID, ObjectID: objectID, CreateTime: createTime}).Error +} + +func (*PinnedObjectDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]cdssdk.PinnedObject, error) { + if len(objectIDs) == 0 { + return nil, nil + } + + var pinneds []cdssdk.PinnedObject + err := ctx.Table("PinnedObject").Where("ObjectID in (?)", objectIDs).Order("ObjectID asc").Find(&pinneds).Error + return pinneds, err +} + +func (*PinnedObjectDB) TryCreate(ctx SQLContext, stgID cdssdk.StorageID, objectID cdssdk.ObjectID, createTime time.Time) error { + err := ctx.Clauses(clause.Insert{Modifier: "ignore"}).Table("PinnedObject").Create(&cdssdk.PinnedObject{StorageID: stgID, ObjectID: objectID, CreateTime: createTime}).Error + return err +} + +func (*PinnedObjectDB) BatchTryCreate(ctx SQLContext, pinneds []cdssdk.PinnedObject) error { + if len(pinneds) == 0 { + return nil + } + + err := ctx.Clauses(clause.Insert{Modifier: "ignore"}).Table("PinnedObject").Create(pinneds).Error + return err +} + +func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.PackageID, stgID cdssdk.StorageID) error { + err := ctx.Exec( + "insert ignore into PinnedObject(StorageID, ObjectID, CreateTime) select ? as StorageID, ObjectID, ? as CreateTime from Object where PackageID = ?", + stgID, + time.Now(), + packageID, + ).Error + return err +} + +func (db *PinnedObjectDB) ObjectBatchCreate(ctx SQLContext, objectID cdssdk.ObjectID, stgIDs []cdssdk.StorageID) error { + if len(stgIDs) == 0 { + return nil + } + + for _, id := range stgIDs { + err := db.TryCreate(ctx, id, objectID, time.Now()) + if err != nil { + return err + } + } + return nil +} + +func (*PinnedObjectDB) Delete(ctx SQLContext, stgID cdssdk.StorageID, objectID cdssdk.ObjectID) error { + err := ctx.Exec("delete from PinnedObject where StorageID = ? and ObjectID = ?", stgID, objectID).Error + return err +} + +func (*PinnedObjectDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error { + err := ctx.Exec("delete from PinnedObject where ObjectID = ?", objectID).Error + return err +} + +func (*PinnedObjectDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { + if len(objectIDs) == 0 { + return nil + } + + err := ctx.Table("PinnedObject").Where("ObjectID in (?)", objectIDs).Delete(&cdssdk.PinnedObject{}).Error + return err +} + +func (*PinnedObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { + err := ctx.Table("PinnedObject").Where("ObjectID in (select ObjectID from Object where PackageID = ?)", packageID).Delete(&cdssdk.PinnedObject{}).Error + return err +} + +func (*PinnedObjectDB) DeleteInPackageAtStorage(ctx SQLContext, packageID cdssdk.PackageID, stgID cdssdk.StorageID) error { + err := ctx.Exec("delete PinnedObject from PinnedObject inner join Object on PinnedObject.ObjectID = Object.ObjectID where PackageID = ? and StorageID = ?", packageID, stgID).Error + return err +} + +func (*PinnedObjectDB) StorageBatchDelete(ctx SQLContext, stgID cdssdk.StorageID, objectIDs []cdssdk.ObjectID) error { + if len(objectIDs) == 0 { + return nil + } + + err := ctx.Table("PinnedObject").Where("StorageID = ? and ObjectID in (?)", stgID, objectIDs).Delete(&cdssdk.PinnedObject{}).Error + return err +} diff --git a/common/pkgs/downloader/iterator.go b/common/pkgs/downloader/iterator.go index 21c979d..732631d 100644 --- a/common/pkgs/downloader/iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -396,7 +396,10 @@ func (iter *DownloadObjectIterator) downloadFromNode(node *cdssdk.Node, req down return nil, fmt.Errorf("parsing plan: %w", err) } - exec := plans.Execute() + // TODO2 注入依赖 + exeCtx := exec.NewExecContext() + + exec := plans.Execute(exeCtx) go exec.Wait(context.TODO()) return exec.BeginRead(strHandle) diff --git a/common/pkgs/downloader/lrc_strip_iterator.go b/common/pkgs/downloader/lrc_strip_iterator.go index 94a00ac..9e40ec3 100644 --- a/common/pkgs/downloader/lrc_strip_iterator.go +++ b/common/pkgs/downloader/lrc_strip_iterator.go @@ -109,7 +109,11 @@ func (s *LRCStripIterator) downloading() { s.sendToDataChan(dataChanEntry{Error: err}) return } - exec := plans.Execute() + + // TODO2 注入依赖 + exeCtx := exec.NewExecContext() + + exec := plans.Execute(exeCtx) ctx, cancel := context.WithCancel(context.Background()) go exec.Wait(ctx) diff --git a/common/pkgs/downloader/strip_iterator.go b/common/pkgs/downloader/strip_iterator.go index fd74cdf..1e26035 100644 --- a/common/pkgs/downloader/strip_iterator.go +++ b/common/pkgs/downloader/strip_iterator.go @@ -209,7 +209,11 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { if err != nil { return 0, err } - exec := plans.Execute() + + // TODo2 注入依赖 + exeCtx := exec.NewExecContext() + + exec := plans.Execute(exeCtx) ctx, cancel := context.WithCancel(context.Background()) go exec.Wait(ctx) diff --git a/common/pkgs/ioswitch2/fromto.go b/common/pkgs/ioswitch2/fromto.go index 36d5cb1..b6125c7 100644 --- a/common/pkgs/ioswitch2/fromto.go +++ b/common/pkgs/ioswitch2/fromto.go @@ -3,7 +3,7 @@ package ioswitch2 import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" ) type From interface { diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index b013b2a..15cb69f 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -10,8 +10,8 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/pool" - "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" ) func init() { diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index b04ce75..81f6cdb 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -12,7 +12,7 @@ import ( "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2" - "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" ) type DefaultParser struct { diff --git a/common/pkgs/ioswitchlrc/fromto.go b/common/pkgs/ioswitchlrc/fromto.go index 2272d9e..5f24b56 100644 --- a/common/pkgs/ioswitchlrc/fromto.go +++ b/common/pkgs/ioswitchlrc/fromto.go @@ -3,7 +3,7 @@ package ioswitchlrc import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" ) type From interface { diff --git a/common/pkgs/ioswitchlrc/ops2/shard_store.go b/common/pkgs/ioswitchlrc/ops2/shard_store.go index b013b2a..15cb69f 100644 --- a/common/pkgs/ioswitchlrc/ops2/shard_store.go +++ b/common/pkgs/ioswitchlrc/ops2/shard_store.go @@ -10,8 +10,8 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/pool" - "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" ) func init() { diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index cdf72de..94dfbfd 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -9,7 +9,7 @@ import ( "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/ops2" - "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" ) // 计算输入流的打开范围。会把流的范围按条带大小取整 diff --git a/common/pkgs/mq/agent/cache.go b/common/pkgs/mq/agent/cache.go index 58a7030..2af9b17 100644 --- a/common/pkgs/mq/agent/cache.go +++ b/common/pkgs/mq/agent/cache.go @@ -3,6 +3,7 @@ package agent import ( "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" ) type CacheService interface { @@ -19,16 +20,17 @@ var _ = Register(Service.CheckCache) type CheckCache struct { mq.MessageBodyBase + StorageID cdssdk.StorageID `json:"storageID"` } type CheckCacheResp struct { mq.MessageBodyBase - FileHashes []string `json:"fileHashes"` + FileHashes []types.FileHash `json:"fileHashes"` } -func NewCheckCache() *CheckCache { - return &CheckCache{} +func NewCheckCache(stgID cdssdk.StorageID) *CheckCache { + return &CheckCache{StorageID: stgID} } -func NewCheckCacheResp(fileHashes []string) *CheckCacheResp { +func NewCheckCacheResp(fileHashes []types.FileHash) *CheckCacheResp { return &CheckCacheResp{ FileHashes: fileHashes, } @@ -42,15 +44,17 @@ var _ = Register(Service.CacheGC) type CacheGC struct { mq.MessageBodyBase - PinnedFileHashes []string `json:"pinnedFileHashes"` + StorageID cdssdk.StorageID `json:"storageID"` + Avaiables []types.FileHash `json:"avaiables"` } type CacheGCResp struct { mq.MessageBodyBase } -func ReqCacheGC(pinnedFileHashes []string) *CacheGC { +func ReqCacheGC(stgID cdssdk.StorageID, avaiables []types.FileHash) *CacheGC { return &CacheGC{ - PinnedFileHashes: pinnedFileHashes, + StorageID: stgID, + Avaiables: avaiables, } } func RespCacheGC() *CacheGCResp { @@ -67,16 +71,18 @@ type StartCacheMovePackage struct { mq.MessageBodyBase UserID cdssdk.UserID `json:"userID"` PackageID cdssdk.PackageID `json:"packageID"` + StorageID cdssdk.StorageID `json:"storageID"` } type StartCacheMovePackageResp struct { mq.MessageBodyBase TaskID string `json:"taskID"` } -func NewStartCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) *StartCacheMovePackage { +func NewStartCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgID cdssdk.StorageID) *StartCacheMovePackage { return &StartCacheMovePackage{ UserID: userID, PackageID: packageID, + StorageID: stgID, } } func NewStartCacheMovePackageResp(taskID string) *StartCacheMovePackageResp { diff --git a/common/pkgs/mq/agent/object.go b/common/pkgs/mq/agent/object.go deleted file mode 100644 index fd1b952..0000000 --- a/common/pkgs/mq/agent/object.go +++ /dev/null @@ -1,32 +0,0 @@ -package agent - -import "gitlink.org.cn/cloudream/common/pkgs/mq" - -type ObjectService interface { - PinObject(msg *PinObject) (*PinObjectResp, *mq.CodeMessage) -} - -// 启动Pin对象的任务 -var _ = Register(Service.PinObject) - -type PinObject struct { - mq.MessageBodyBase - FileHashes []string `json:"fileHashes"` - IsBackground bool `json:"isBackground"` -} -type PinObjectResp struct { - mq.MessageBodyBase -} - -func ReqPinObject(fileHashes []string, isBackground bool) *PinObject { - return &PinObject{ - FileHashes: fileHashes, - IsBackground: isBackground, - } -} -func RespPinObject() *PinObjectResp { - return &PinObjectResp{} -} -func (client *Client) PinObject(msg *PinObject, opts ...mq.RequestOption) (*PinObjectResp, error) { - return mq.Request(Service.PinObject, client.rabbitCli, msg, opts...) -} diff --git a/common/pkgs/mq/agent/server.go b/common/pkgs/mq/agent/server.go index 8819c17..ece9016 100644 --- a/common/pkgs/mq/agent/server.go +++ b/common/pkgs/mq/agent/server.go @@ -6,8 +6,6 @@ import ( ) type Service interface { - ObjectService - StorageService CacheService diff --git a/common/pkgs/mq/coordinator/cache.go b/common/pkgs/mq/coordinator/cache.go index ff75895..8aea944 100644 --- a/common/pkgs/mq/coordinator/cache.go +++ b/common/pkgs/mq/coordinator/cache.go @@ -17,16 +17,16 @@ var _ = Register(Service.CachePackageMoved) type CachePackageMoved struct { mq.MessageBodyBase PackageID cdssdk.PackageID `json:"packageID"` - NodeID cdssdk.NodeID `json:"nodeID"` + StorageID cdssdk.StorageID `json:"storageID"` } type CachePackageMovedResp struct { mq.MessageBodyBase } -func NewCachePackageMoved(packageID cdssdk.PackageID, nodeID cdssdk.NodeID) *CachePackageMoved { +func NewCachePackageMoved(packageID cdssdk.PackageID, stgID cdssdk.StorageID) *CachePackageMoved { return &CachePackageMoved{ PackageID: packageID, - NodeID: nodeID, + StorageID: stgID, } } func NewCachePackageMovedResp() *CachePackageMovedResp { diff --git a/common/pkgs/shardstore/pool/pool.go b/common/pkgs/storage/shard/pool/pool.go similarity index 66% rename from common/pkgs/shardstore/pool/pool.go rename to common/pkgs/storage/shard/pool/pool.go index 65fa3ba..12ecee8 100644 --- a/common/pkgs/shardstore/pool/pool.go +++ b/common/pkgs/storage/shard/pool/pool.go @@ -2,12 +2,16 @@ package pool import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" ) type ShardStorePool struct { } +func New() *ShardStorePool { + +} + func (p *ShardStorePool) Get(stgID cdssdk.StorageID) (types.ShardStore, error) { } diff --git a/common/pkgs/shardstore/storages/local/config.go b/common/pkgs/storage/shard/storages/local/config.go similarity index 100% rename from common/pkgs/shardstore/storages/local/config.go rename to common/pkgs/storage/shard/storages/local/config.go diff --git a/common/pkgs/shardstore/storages/local/local.go b/common/pkgs/storage/shard/storages/local/local.go similarity index 100% rename from common/pkgs/shardstore/storages/local/local.go rename to common/pkgs/storage/shard/storages/local/local.go diff --git a/common/pkgs/shardstore/storages/local/writer.go b/common/pkgs/storage/shard/storages/local/writer.go similarity index 100% rename from common/pkgs/shardstore/storages/local/writer.go rename to common/pkgs/storage/shard/storages/local/writer.go diff --git a/common/pkgs/shardstore/types/option.go b/common/pkgs/storage/shard/types/option.go similarity index 100% rename from common/pkgs/shardstore/types/option.go rename to common/pkgs/storage/shard/types/option.go diff --git a/common/pkgs/shardstore/types/shardstore.go b/common/pkgs/storage/shard/types/shardstore.go similarity index 86% rename from common/pkgs/shardstore/types/shardstore.go rename to common/pkgs/storage/shard/types/shardstore.go index 44c8402..fdc346f 100644 --- a/common/pkgs/shardstore/types/shardstore.go +++ b/common/pkgs/storage/shard/types/shardstore.go @@ -23,8 +23,10 @@ type ShardStore interface { Open(opt OpenOption) (io.ReadCloser, error) // 删除文件 Remove(hash FileHash) error - // 遍历所有文件,callback返回false则停止遍历 - Walk(callback func(info FileInfo) bool) error + // 获取所有文件信息,尽量保证操作是原子的 + ListAll() ([]FileInfo, error) + // 清除其他文件,只保留给定的文件,尽量保证操作是原子的 + Purge(availables []FileHash) error // 获得存储系统信息 Stats() Stats } diff --git a/coordinator/internal/mq/cache.go b/coordinator/internal/mq/cache.go index 0e097f8..2db8f54 100644 --- a/coordinator/internal/mq/cache.go +++ b/coordinator/internal/mq/cache.go @@ -18,12 +18,12 @@ func (svc *Service) CachePackageMoved(msg *coormq.CachePackageMoved) (*coormq.Ca return fmt.Errorf("getting package by id: %w", err) } - _, err = svc.db.Node().GetByID(tx, msg.NodeID) + _, err = svc.db.Node().GetByID(tx, msg.StorageID) if err != nil { return fmt.Errorf("getting node by id: %w", err) } - err = svc.db.PinnedObject().CreateFromPackage(tx, msg.PackageID, msg.NodeID) + err = svc.db.PinnedObject().CreateFromPackage(tx, msg.PackageID, msg.StorageID) if err != nil { return fmt.Errorf("creating pinned objects from package: %w", err) } @@ -31,7 +31,7 @@ func (svc *Service) CachePackageMoved(msg *coormq.CachePackageMoved) (*coormq.Ca return nil }) if err != nil { - logger.WithField("PackageID", msg.PackageID).WithField("NodeID", msg.NodeID).Warn(err.Error()) + logger.WithField("PackageID", msg.PackageID).WithField("NodeID", msg.StorageID).Warn(err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "create package pinned objects failed") } diff --git a/coordinator/internal/mq/storage.go b/coordinator/internal/mq/storage.go index 283d383..0d025f4 100644 --- a/coordinator/internal/mq/storage.go +++ b/coordinator/internal/mq/storage.go @@ -7,6 +7,7 @@ import ( "github.com/jmoiron/sqlx" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gorm.io/gorm" "gitlink.org.cn/cloudream/common/pkgs/mq" stgmod "gitlink.org.cn/cloudream/storage/common/models" @@ -15,7 +16,7 @@ import ( ) func (svc *Service) GetStorage(msg *coormq.GetStorage) (*coormq.GetStorageResp, *mq.CodeMessage) { - stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.UserID, msg.StorageID) + stg, err := svc.db2.Storage().GetUserStorage(svc.db2.DefCtx(), msg.UserID, msg.StorageID) if err != nil { logger.Warnf("getting user storage: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed") @@ -32,21 +33,21 @@ func (svc *Service) GetStorageDetail(msg *coormq.GetStorageDetail) (*coormq.GetS if err != nil { return fmt.Errorf("getting storage: %w", err) } + ret.Storage = stg shard, err := svc.db2.ShardStorage().GetByStorageID(tx, msg.StorageID) - if err != nil { + if err == nil { + ret.Shard = &shard + } else if err != gorm.ErrRecordNotFound { return fmt.Errorf("getting shard storage: %w", err) } shared, err := svc.db2.SharedStorage().GetByStorageID(tx, msg.StorageID) - if err != nil { + if err == nil { + ret.Shared = &shared + } else if err != gorm.ErrRecordNotFound { return fmt.Errorf("getting shared storage: %w", err) } - - ret.Storage = stg - ret.Shard = shard - ret.Shared = shared - return nil }) @@ -54,7 +55,7 @@ func (svc *Service) GetStorageDetail(msg *coormq.GetStorageDetail) (*coormq.GetS } func (svc *Service) GetStorageByName(msg *coormq.GetStorageByName) (*coormq.GetStorageByNameResp, *mq.CodeMessage) { - stg, err := svc.db.Storage().GetUserStorageByName(svc.db.SQLCtx(), msg.UserID, msg.Name) + stg, err := svc.db2.Storage().GetUserStorageByName(svc.db2.DefCtx(), msg.UserID, msg.Name) if err != nil { logger.Warnf("getting user storage by name: %s", err.Error()) diff --git a/go.mod b/go.mod index bb3cb29..1ca1e5f 100644 --- a/go.mod +++ b/go.mod @@ -95,5 +95,7 @@ require ( google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + gorm.io/driver/mysql v1.5.7 lukechampine.com/blake3 v1.1.7 // indirect ) diff --git a/go.sum b/go.sum index 1d5ea58..d88e489 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,7 @@ github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn github.com/go-playground/validator/v10 v10.8.0 h1:1kAa0fCrnpv+QYdkdcRzrRM7AyYs5o8+jZdJCz9xj6k= github.com/go-playground/validator/v10 v10.8.0/go.mod h1:9JhgTzTaE31GZDpH/HSvHiRJrJ3iKAgqqH0Bl/Ocjdk= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -271,6 +272,9 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/mysql v1.5.7 h1:MndhOPYOfEp2rHKgkZIhJ16eVUIRf2HmzgoPmh7FCWo= +gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkDM= +gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8= gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ= lukechampine.com/blake3 v1.1.7 h1:GgRMhmdsuK8+ii6UZFDL8Nb+VyMwadAgcJyfYHxG6n0=