From 41a9aed6e96e54ab5ed3022ae9a901b013bdc605 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 22 Aug 2023 10:15:57 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0ClientPool=EF=BC=8C=E8=A7=A3?= =?UTF-8?q?=E9=99=A4=E5=AF=B9config=E7=9A=84=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/config/config.go | 26 ++++----- internal/services/cmd/agent.go | 19 ------ internal/services/cmd/service.go | 21 ------- .../grpc/{grpc_service.go => service.go} | 58 +++++++++++-------- internal/services/mq/agent.go | 29 ++++++++++ internal/services/{cmd => mq}/ipfs.go | 27 ++++++--- internal/services/{cmd => mq}/object.go | 2 +- internal/services/mq/service.go | 15 +++++ internal/services/{cmd => mq}/storage.go | 55 +++++++++--------- internal/task/create_ec_package.go | 38 ++++++++++++ internal/task/create_rep_package.go | 34 +++++++++++ internal/task/download_package.go | 26 +++++++++ internal/task/ipfs_pin.go | 15 ++++- internal/task/ipfs_read.go | 15 ++++- internal/task/task.go | 12 +--- main.go | 30 +++++----- status_report.go | 7 +-- 17 files changed, 283 insertions(+), 146 deletions(-) delete mode 100644 internal/services/cmd/agent.go delete mode 100644 internal/services/cmd/service.go rename internal/services/grpc/{grpc_service.go => service.go} (67%) create mode 100644 internal/services/mq/agent.go rename internal/services/{cmd => mq}/ipfs.go (81%) rename internal/services/{cmd => mq}/object.go (99%) create mode 100644 internal/services/mq/service.go rename internal/services/{cmd => mq}/storage.go (82%) create mode 100644 internal/task/create_ec_package.go create mode 100644 internal/task/create_rep_package.go create mode 100644 internal/task/download_package.go diff --git a/internal/config/config.go b/internal/config/config.go index c4617f1..a04949b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -2,25 +2,25 @@ package config import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" log "gitlink.org.cn/cloudream/common/pkgs/logger" c "gitlink.org.cn/cloudream/common/utils/config" - "gitlink.org.cn/cloudream/common/utils/ipfs" + stgmodels "gitlink.org.cn/cloudream/storage-common/models" + "gitlink.org.cn/cloudream/storage-common/pkgs/grpc" stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" ) type Config struct { - ID int64 `json:"id"` - GRPCListenAddress string `json:"grpcListenAddress"` - GRPCPort int `json:"grpcPort"` - ECPacketSize int64 `json:"ecPacketSize"` - LocalIP string `json:"localIP"` - ExternalIP string `json:"externalIP"` - StorageBaseDir string `json:"storageBaseDir"` - TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒 - Logger log.Config `json:"logger"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` - IPFS ipfs.Config `json:"ipfs"` - DistLock distlock.Config `json:"distlock"` + ID int64 `json:"id"` + Local stgmodels.LocalMachineInfo `json:"local"` + GRPC *grpc.Config `json:"grpc"` + ECPacketSize int64 `json:"ecPacketSize"` + StorageBaseDir string `json:"storageBaseDir"` + TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒 + Logger log.Config `json:"logger"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` + IPFS ipfs.Config `json:"ipfs"` + DistLock distlock.Config `json:"distlock"` } var cfg Config diff --git a/internal/services/cmd/agent.go b/internal/services/cmd/agent.go deleted file mode 100644 index 6fbfc7f..0000000 --- a/internal/services/cmd/agent.go +++ /dev/null @@ -1,19 +0,0 @@ -package cmd - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - "gitlink.org.cn/cloudream/storage-common/consts" - agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" -) - -func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) { - var ipfsState string - - if svc.ipfs.IsUp() { - ipfsState = consts.IPFSStateOK - } else { - ipfsState = consts.IPFSStateOK - } - - return mq.ReplyOK(agtmq.NewGetStateResp(ipfsState)) -} diff --git a/internal/services/cmd/service.go b/internal/services/cmd/service.go deleted file mode 100644 index c9eecd1..0000000 --- a/internal/services/cmd/service.go +++ /dev/null @@ -1,21 +0,0 @@ -package cmd - -import ( - "gitlink.org.cn/cloudream/common/utils/ipfs" - "gitlink.org.cn/cloudream/storage-agent/internal/task" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" -) - -type Service struct { - ipfs *ipfs.IPFS - taskManager *task.Manager - coordinator *coormq.Client -} - -func NewService(ipfs *ipfs.IPFS, taskMgr *task.Manager, coordinator *coormq.Client) *Service { - return &Service{ - ipfs: ipfs, - taskManager: taskMgr, - coordinator: coordinator, - } -} diff --git a/internal/services/grpc/grpc_service.go b/internal/services/grpc/service.go similarity index 67% rename from internal/services/grpc/grpc_service.go rename to internal/services/grpc/service.go index 11210b6..57a76e4 100644 --- a/internal/services/grpc/grpc_service.go +++ b/internal/services/grpc/service.go @@ -6,25 +6,29 @@ import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" myio "gitlink.org.cn/cloudream/common/utils/io" - "gitlink.org.cn/cloudream/common/utils/ipfs" - agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/proto" + "gitlink.org.cn/cloudream/storage-common/globals" + agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent" ) -type GRPCService struct { - agentserver.FileTransportServer - ipfs *ipfs.IPFS +type Service struct { + agentserver.AgentServer } -func NewService(ipfs *ipfs.IPFS) *GRPCService { - return &GRPCService{ - ipfs: ipfs, - } +func NewService() *Service { + return &Service{} } -func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) error { +func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) error { log.Debugf("client upload file") - writer, err := s.ipfs.CreateFile() + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + log.Warnf("new ipfs client: %s", err.Error()) + return fmt.Errorf("new ipfs client: %w", err) + } + defer ipfsCli.Close() + + writer, err := ipfsCli.CreateFileStream() if err != nil { log.Warnf("create file failed, err: %s", err.Error()) return fmt.Errorf("create file failed, err: %w", err) @@ -45,18 +49,17 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) return fmt.Errorf("recv message failed, err: %w", err) } - if msg.Type == agentserver.FileDataPacketType_Data { - err = myio.WriteAll(writer, msg.Data) - if err != nil { - // 关闭文件写入,不需要返回的hash和error - writer.Abort(io.ErrClosedPipe) - log.Warnf("write data to file failed, err: %s", err.Error()) - return fmt.Errorf("write data to file failed, err: %w", err) - } + err = myio.WriteAll(writer, msg.Data) + if err != nil { + // 关闭文件写入,不需要返回的hash和error + writer.Abort(io.ErrClosedPipe) + log.Warnf("write data to file failed, err: %s", err.Error()) + return fmt.Errorf("write data to file failed, err: %w", err) + } - recvSize += int64(len(msg.Data)) + recvSize += int64(len(msg.Data)) - } else if msg.Type == agentserver.FileDataPacketType_EOF { + if msg.Type == agentserver.FileDataPacketType_EOF { // 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash hash, err := writer.Finish() if err != nil { @@ -65,7 +68,7 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) } // 并将结果返回到客户端 - err = server.SendAndClose(&agentserver.SendResp{ + err = server.SendAndClose(&agentserver.SendIPFSFileResp{ FileHash: hash, }) if err != nil { @@ -78,10 +81,17 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) } } -func (s *GRPCService) GetFile(req *agentserver.GetReq, server agentserver.FileTransport_GetFileServer) error { +func (s *Service) GetIPFSFile(req *agentserver.GetIPFSFileReq, server agentserver.Agent_GetIPFSFileServer) error { log.WithField("FileHash", req.FileHash).Debugf("client download file") - reader, err := s.ipfs.OpenRead(req.FileHash) + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + log.Warnf("new ipfs client: %s", err.Error()) + return fmt.Errorf("new ipfs client: %w", err) + } + defer ipfsCli.Close() + + reader, err := ipfsCli.OpenRead(req.FileHash) if err != nil { log.Warnf("open file %s to read failed, err: %s", req.FileHash, err.Error()) return fmt.Errorf("open file to read failed, err: %w", err) diff --git a/internal/services/mq/agent.go b/internal/services/mq/agent.go new file mode 100644 index 0000000..5246465 --- /dev/null +++ b/internal/services/mq/agent.go @@ -0,0 +1,29 @@ +package mq + +import ( + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/storage-common/consts" + "gitlink.org.cn/cloudream/storage-common/globals" + agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" +) + +func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) { + var ipfsState string + + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + logger.Warnf("new ipfs client: %s", err.Error()) + ipfsState = consts.IPFSStateUnavailable + + } else { + if ipfsCli.IsUp() { + ipfsState = consts.IPFSStateOK + } else { + ipfsState = consts.IPFSStateUnavailable + } + ipfsCli.Close() + } + + return mq.ReplyOK(agtmq.NewGetStateResp(ipfsState)) +} diff --git a/internal/services/cmd/ipfs.go b/internal/services/mq/ipfs.go similarity index 81% rename from internal/services/cmd/ipfs.go rename to internal/services/mq/ipfs.go index 5d56fb9..fa101ae 100644 --- a/internal/services/cmd/ipfs.go +++ b/internal/services/mq/ipfs.go @@ -1,20 +1,29 @@ -package cmd +package mq import ( "time" shell "github.com/ipfs/go-ipfs-api" "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-common/consts" + "gitlink.org.cn/cloudream/storage-common/globals" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" ) func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { - filesMap, err := svc.ipfs.GetPinnedFiles() + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + logger.Warnf("new ipfs client: %s", err.Error()) + return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "new ipfs client failed") + } + defer ipfsCli.Close() + + filesMap, err := ipfsCli.GetPinnedFiles() if err != nil { logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "get pinned files from ipfs failed") @@ -22,13 +31,13 @@ func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.C // TODO 根据锁定清单过滤被锁定的文件的记录 if msg.IsComplete { - return svc.checkComplete(msg, filesMap) + return svc.checkComplete(msg, filesMap, ipfsCli) } else { - return svc.checkIncrement(msg, filesMap) + return svc.checkIncrement(msg, filesMap, ipfsCli) } } -func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { +func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { var entries []agtmq.CheckIPFSRespEntry for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] @@ -37,7 +46,7 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she // 不处理 } else if cache.State == consts.CacheStateTemp { logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp") - err := svc.ipfs.Unpin(cache.FileHash) + err := ipfsCli.Unpin(cache.FileHash) if err != nil { logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error()) } @@ -63,7 +72,7 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries)) } -func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { +func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { var entries []agtmq.CheckIPFSRespEntry for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] @@ -72,7 +81,7 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel // 不处理 } else if cache.State == consts.CacheStateTemp { logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp") - err := svc.ipfs.Unpin(cache.FileHash) + err := ipfsCli.Unpin(cache.FileHash) if err != nil { logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error()) } @@ -96,7 +105,7 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel // map中剩下的数据是没有被遍历过,即Cache中没有记录的,那么就Unpin文件,并产生一条Temp记录 for hash := range filesMap { logger.WithField("FileHash", hash).Debugf("unpin for no cacah entry") - err := svc.ipfs.Unpin(hash) + err := ipfsCli.Unpin(hash) if err != nil { logger.WithField("FileHash", hash).Warnf("unpin file failed, err: %s", err.Error()) } diff --git a/internal/services/cmd/object.go b/internal/services/mq/object.go similarity index 99% rename from internal/services/cmd/object.go rename to internal/services/mq/object.go index c1972c4..499ffb5 100644 --- a/internal/services/cmd/object.go +++ b/internal/services/mq/object.go @@ -1,4 +1,4 @@ -package cmd +package mq import ( "time" diff --git a/internal/services/mq/service.go b/internal/services/mq/service.go new file mode 100644 index 0000000..52a9599 --- /dev/null +++ b/internal/services/mq/service.go @@ -0,0 +1,15 @@ +package mq + +import ( + "gitlink.org.cn/cloudream/storage-agent/internal/task" +) + +type Service struct { + taskManager *task.Manager +} + +func NewService(taskMgr *task.Manager) *Service { + return &Service{ + taskManager: taskMgr, + } +} diff --git a/internal/services/cmd/storage.go b/internal/services/mq/storage.go similarity index 82% rename from internal/services/cmd/storage.go rename to internal/services/mq/storage.go index 19e4859..06711ed 100644 --- a/internal/services/cmd/storage.go +++ b/internal/services/mq/storage.go @@ -1,4 +1,4 @@ -package cmd +package mq import ( "io/fs" @@ -13,18 +13,26 @@ import ( "gitlink.org.cn/cloudream/storage-agent/internal/config" mytask "gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-common/consts" + "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/utils" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/mq" - stgcmd "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" - myos "gitlink.org.cn/cloudream/storage-common/utils/os" ) func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) (*agtmq.StartStorageMovePackageResp, *mq.CodeMessage) { - getStgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + logger.Warnf("new coordinator client: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") + } + defer coorCli.Close() + + getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) if err != nil { logger.WithField("StorageID", msg.StorageID). Warnf("getting storage info: %s", err.Error()) @@ -40,7 +48,7 @@ func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed") } - tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext](stgcmd.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath))) + tsk := svc.taskManager.StartNew(mytask.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath)) return mq.ReplyOK(agtmq.NewStartStorageMovePackageResp(tsk.ID())) } @@ -152,7 +160,15 @@ func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, fileInfos []fs } func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePackage) (*agtmq.StartStorageCreatePackageResp, *mq.CodeMessage) { - getStgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + logger.Warnf("new coordinator client: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") + } + defer coorCli.Close() + + getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) if err != nil { logger.WithField("StorageID", msg.StorageID). Warnf("getting storage info: %s", err.Error()) @@ -180,7 +196,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "read directory failed") } - objIter := myos.NewUploadingObjectIterator(fullPath, uploadFilePathes) + objIter := iterator.NewUploadingObjectIterator(fullPath, uploadFilePathes) if msg.Redundancy.Type == models.RedundancyRep { repInfo, err := msg.Redundancy.ToRepInfo() @@ -190,14 +206,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "get rep redundancy info failed") } - tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext]( - stgcmd.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo, stgcmd.UploadConfig{ - LocalIPFS: svc.ipfs, - LocalNodeID: &config.Cfg().ID, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }))) + tsk := svc.taskManager.StartNew(mytask.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo)) return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) } @@ -208,14 +217,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka return nil, mq.Failed(errorcode.OperationFailed, "get ec redundancy info failed") } - tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext]( - stgcmd.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo, config.Cfg().ECPacketSize, stgcmd.UploadConfig{ - LocalIPFS: svc.ipfs, - LocalNodeID: &config.Cfg().ID, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, - }))) + tsk := svc.taskManager.StartNew(mytask.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo)) return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) } @@ -231,17 +233,16 @@ func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(false, "", 0)) } - wrapTask := tsk.Body().(*stgcmd.TaskWrapper[mytask.TaskContext]) - if tsk.Error() != nil { return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, tsk.Error().Error(), 0)) } - if repTask, ok := wrapTask.InnerTask().(*stgcmd.CreateRepPackage); ok { + // TODO 避免判断类型 + if repTask, ok := tsk.Body().(*mytask.CreateRepPackage); ok { return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", repTask.Result.PackageID)) } - if ecTask, ok := wrapTask.InnerTask().(*stgcmd.CreateECPackage); ok { + if ecTask, ok := tsk.Body().(*mytask.CreateECPackage); ok { return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", ecTask.Result.PackageID)) } diff --git a/internal/task/create_ec_package.go b/internal/task/create_ec_package.go new file mode 100644 index 0000000..fb69e8c --- /dev/null +++ b/internal/task/create_ec_package.go @@ -0,0 +1,38 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/storage-agent/internal/config" + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type CreateECPackageResult = cmd.CreateECPackageResult + +type CreateECPackage struct { + cmd cmd.CreateECPackage + + Result *CreateECPackageResult +} + +func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { + return &CreateECPackage{ + cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy), + } +} + +func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) { + ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{ + UpdatePackageContext: &cmd.UpdatePackageContext{ + Distlock: ctx.distlock, + }, + ECPacketSize: config.Cfg().ECPacketSize, + }) + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/create_rep_package.go b/internal/task/create_rep_package.go new file mode 100644 index 0000000..7b15c64 --- /dev/null +++ b/internal/task/create_rep_package.go @@ -0,0 +1,34 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/common/models" + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" + "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" +) + +type CreateRepPackageResult = cmd.CreateRepPackageResult + +type CreateRepPackage struct { + cmd cmd.CreateRepPackage + + Result *CreateRepPackageResult +} + +func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { + return &CreateRepPackage{ + cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy), + } +} + +func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { + ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ + Distlock: ctx.distlock, + }) + t.Result = ret + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/download_package.go b/internal/task/download_package.go new file mode 100644 index 0000000..c3c39ab --- /dev/null +++ b/internal/task/download_package.go @@ -0,0 +1,26 @@ +package task + +import ( + "time" + + "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" +) + +type DownloadPackage struct { + cmd *cmd.DownloadPackage +} + +func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage { + return &DownloadPackage{ + cmd: cmd.NewDownloadPackage(userID, packageID, outputPath), + } +} +func (t *DownloadPackage) Execute(ctx TaskContext, complete CompleteFn) { + err := t.cmd.Execute(&cmd.DownloadPackageContext{ + Distlock: ctx.distlock, + }) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/internal/task/ipfs_pin.go b/internal/task/ipfs_pin.go index 1a7fe7a..0489d5c 100644 --- a/internal/task/ipfs_pin.go +++ b/internal/task/ipfs_pin.go @@ -5,6 +5,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage-common/globals" ) type IPFSPin struct { @@ -31,7 +32,19 @@ func (t *IPFSPin) Execute(ctx TaskContext, complete CompleteFn) { log.Debugf("begin with %v", logger.FormatStruct(t)) defer log.Debugf("end") - err := ctx.ipfs.Pin(t.FileHash) + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + err := fmt.Errorf("new ipfs client: %w", err) + log.Warn(err.Error()) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + defer ipfsCli.Close() + + err = ipfsCli.Pin(t.FileHash) if err != nil { err := fmt.Errorf("pin file failed, err: %w", err) log.WithField("FileHash", t.FileHash).Warn(err.Error()) diff --git a/internal/task/ipfs_read.go b/internal/task/ipfs_read.go index 91fb877..7ddfa6f 100644 --- a/internal/task/ipfs_read.go +++ b/internal/task/ipfs_read.go @@ -8,6 +8,7 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage-common/globals" ) type IPFSRead struct { @@ -61,7 +62,19 @@ func (t *IPFSRead) Execute(ctx TaskContext, complete CompleteFn) { } defer outputFile.Close() - rd, err := ctx.ipfs.OpenRead(t.FileHash) + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + err := fmt.Errorf("new ipfs client: %w", err) + log.Warn(err.Error()) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + defer ipfsCli.Close() + + rd, err := ipfsCli.OpenRead(t.FileHash) if err != nil { err := fmt.Errorf("read ipfs file failed, err: %w", err) log.WithField("FileHash", t.FileHash).Warn(err.Error()) diff --git a/internal/task/task.go b/internal/task/task.go index 6bbd84f..3653981 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -3,14 +3,10 @@ package task import ( distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/pkgs/task" - "gitlink.org.cn/cloudream/common/utils/ipfs" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) type TaskContext struct { - ipfs *ipfs.IPFS - coordinator *coormq.Client - distlock *distsvc.Service + distlock *distsvc.Service } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -25,10 +21,8 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(ipfs *ipfs.IPFS, coorCli *coormq.Client, distLock *distsvc.Service) Manager { +func NewManager(distlock *distsvc.Service) Manager { return task.NewManager(TaskContext{ - ipfs: ipfs, - coordinator: coorCli, - distlock: distLock, + distlock: distlock, }) } diff --git a/main.go b/main.go index b568b39..396b052 100644 --- a/main.go +++ b/main.go @@ -8,18 +8,17 @@ import ( distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" log "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/task" - agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/proto" + "gitlink.org.cn/cloudream/storage-common/globals" + agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent" "google.golang.org/grpc" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" - coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" - cmdsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/cmd" grpcsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/grpc" + cmdsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/mq" ) // TODO 此数据是否在运行时会发生变化? @@ -41,15 +40,12 @@ func main() { os.Exit(1) } - ipfs, err := ipfs.NewIPFS(&config.Cfg().IPFS) - if err != nil { - log.Fatalf("new ipfs failed, err: %s", err.Error()) - } - - coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ) - if err != nil { - log.Fatalf("new ipfs failed, err: %s", err.Error()) - } + globals.InitLocal(&config.Cfg().Local) + globals.InitMQPool(&config.Cfg().RabbitMQ) + globals.InitAgentRPCPool(&agtrpc.PoolConfig{ + Port: config.Cfg().GRPC.Port, + }) + globals.InitIPFSPool(&config.Cfg().IPFS) distlock, err := distsvc.NewService(&config.Cfg().DistLock) if err != nil { @@ -60,11 +56,11 @@ func main() { wg := sync.WaitGroup{} wg.Add(5) - taskMgr := task.NewManager(ipfs, coorCli, distlock) + taskMgr := task.NewManager(distlock) // 启动命令服务器 // TODO 需要设计AgentID持久化机制 - agtSvr, err := agtmq.NewServer(cmdsvc.NewService(ipfs, &taskMgr, coorCli), config.Cfg().ID, &config.Cfg().RabbitMQ) + agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr), config.Cfg().ID, &config.Cfg().RabbitMQ) if err != nil { log.Fatalf("new agent server failed, err: %s", err.Error()) } @@ -76,14 +72,14 @@ func main() { go reportStatus(&wg) //网络延迟感知 //面向客户端收发数据 - listenAddr := config.Cfg().GRPCListenAddress + listenAddr := config.Cfg().GRPC.MakeListenAddress() lis, err := net.Listen("tcp", listenAddr) if err != nil { log.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) } s := grpc.NewServer() - agentserver.RegisterFileTransportServer(s, grpcsvc.NewService(ipfs)) + agtrpc.RegisterAgentServer(s, grpcsvc.NewService()) go serveGRPC(s, lis, &wg) go serveDistLock(distlock) diff --git a/status_report.go b/status_report.go index e4ebc71..c7ebbe7 100644 --- a/status_report.go +++ b/status_report.go @@ -7,13 +7,12 @@ import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-common/consts" - coorcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator" - coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" "gitlink.org.cn/cloudream/storage-common/utils" ) func reportStatus(wg *sync.WaitGroup) { - coorCli, err := coorcli.NewClient(&config.Cfg().RabbitMQ) + coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ) if err != nil { wg.Done() log.Error("new coordinator client failed, err: %w", err) @@ -56,7 +55,7 @@ func reportStatus(wg *sync.WaitGroup) { //发送心跳 // TODO 由于数据结构未定,暂时不发送真实数据 - coorCli.AgentStatusReport(coormsg.NewAgentStatusReportBody(config.Cfg().ID, []int64{}, []int{}, ipfsStatus, localDirStatus)) + coorCli.AgentStatusReport(coormq.NewAgentStatusReportBody(config.Cfg().ID, []int64{}, []int{}, ipfsStatus, localDirStatus)) time.Sleep(time.Minute * 5) }