Browse Source

增加ClientPool,解除对config的依赖

gitlink
Sydonian 2 years ago
parent
commit
41a9aed6e9
17 changed files with 283 additions and 146 deletions
  1. +13
    -13
      internal/config/config.go
  2. +0
    -19
      internal/services/cmd/agent.go
  3. +0
    -21
      internal/services/cmd/service.go
  4. +34
    -24
      internal/services/grpc/service.go
  5. +29
    -0
      internal/services/mq/agent.go
  6. +18
    -9
      internal/services/mq/ipfs.go
  7. +1
    -1
      internal/services/mq/object.go
  8. +15
    -0
      internal/services/mq/service.go
  9. +28
    -27
      internal/services/mq/storage.go
  10. +38
    -0
      internal/task/create_ec_package.go
  11. +34
    -0
      internal/task/create_rep_package.go
  12. +26
    -0
      internal/task/download_package.go
  13. +14
    -1
      internal/task/ipfs_pin.go
  14. +14
    -1
      internal/task/ipfs_read.go
  15. +3
    -9
      internal/task/task.go
  16. +13
    -17
      main.go
  17. +3
    -4
      status_report.go

+ 13
- 13
internal/config/config.go View File

@@ -2,25 +2,25 @@ package config


import ( import (
"gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
log "gitlink.org.cn/cloudream/common/pkgs/logger" log "gitlink.org.cn/cloudream/common/pkgs/logger"
c "gitlink.org.cn/cloudream/common/utils/config" c "gitlink.org.cn/cloudream/common/utils/config"
"gitlink.org.cn/cloudream/common/utils/ipfs"
stgmodels "gitlink.org.cn/cloudream/storage-common/models"
"gitlink.org.cn/cloudream/storage-common/pkgs/grpc"
stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
) )


type Config struct { type Config struct {
ID int64 `json:"id"`
GRPCListenAddress string `json:"grpcListenAddress"`
GRPCPort int `json:"grpcPort"`
ECPacketSize int64 `json:"ecPacketSize"`
LocalIP string `json:"localIP"`
ExternalIP string `json:"externalIP"`
StorageBaseDir string `json:"storageBaseDir"`
TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒
Logger log.Config `json:"logger"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
IPFS ipfs.Config `json:"ipfs"`
DistLock distlock.Config `json:"distlock"`
ID int64 `json:"id"`
Local stgmodels.LocalMachineInfo `json:"local"`
GRPC *grpc.Config `json:"grpc"`
ECPacketSize int64 `json:"ecPacketSize"`
StorageBaseDir string `json:"storageBaseDir"`
TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒
Logger log.Config `json:"logger"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
IPFS ipfs.Config `json:"ipfs"`
DistLock distlock.Config `json:"distlock"`
} }


var cfg Config var cfg Config


+ 0
- 19
internal/services/cmd/agent.go View File

@@ -1,19 +0,0 @@
package cmd

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/consts"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
)

func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) {
var ipfsState string

if svc.ipfs.IsUp() {
ipfsState = consts.IPFSStateOK
} else {
ipfsState = consts.IPFSStateOK
}

return mq.ReplyOK(agtmq.NewGetStateResp(ipfsState))
}

+ 0
- 21
internal/services/cmd/service.go View File

@@ -1,21 +0,0 @@
package cmd

import (
"gitlink.org.cn/cloudream/common/utils/ipfs"
"gitlink.org.cn/cloudream/storage-agent/internal/task"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

type Service struct {
ipfs *ipfs.IPFS
taskManager *task.Manager
coordinator *coormq.Client
}

func NewService(ipfs *ipfs.IPFS, taskMgr *task.Manager, coordinator *coormq.Client) *Service {
return &Service{
ipfs: ipfs,
taskManager: taskMgr,
coordinator: coordinator,
}
}

internal/services/grpc/grpc_service.go → internal/services/grpc/service.go View File

@@ -6,25 +6,29 @@ import (


log "gitlink.org.cn/cloudream/common/pkgs/logger" log "gitlink.org.cn/cloudream/common/pkgs/logger"
myio "gitlink.org.cn/cloudream/common/utils/io" myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/ipfs"
agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/proto"
"gitlink.org.cn/cloudream/storage-common/globals"
agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent"
) )


type GRPCService struct {
agentserver.FileTransportServer
ipfs *ipfs.IPFS
type Service struct {
agentserver.AgentServer
} }


func NewService(ipfs *ipfs.IPFS) *GRPCService {
return &GRPCService{
ipfs: ipfs,
}
func NewService() *Service {
return &Service{}
} }


func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) error {
func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) error {
log.Debugf("client upload file") log.Debugf("client upload file")


writer, err := s.ipfs.CreateFile()
ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
log.Warnf("new ipfs client: %s", err.Error())
return fmt.Errorf("new ipfs client: %w", err)
}
defer ipfsCli.Close()

writer, err := ipfsCli.CreateFileStream()
if err != nil { if err != nil {
log.Warnf("create file failed, err: %s", err.Error()) log.Warnf("create file failed, err: %s", err.Error())
return fmt.Errorf("create file failed, err: %w", err) return fmt.Errorf("create file failed, err: %w", err)
@@ -45,18 +49,17 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer)
return fmt.Errorf("recv message failed, err: %w", err) return fmt.Errorf("recv message failed, err: %w", err)
} }


if msg.Type == agentserver.FileDataPacketType_Data {
err = myio.WriteAll(writer, msg.Data)
if err != nil {
// 关闭文件写入,不需要返回的hash和error
writer.Abort(io.ErrClosedPipe)
log.Warnf("write data to file failed, err: %s", err.Error())
return fmt.Errorf("write data to file failed, err: %w", err)
}
err = myio.WriteAll(writer, msg.Data)
if err != nil {
// 关闭文件写入,不需要返回的hash和error
writer.Abort(io.ErrClosedPipe)
log.Warnf("write data to file failed, err: %s", err.Error())
return fmt.Errorf("write data to file failed, err: %w", err)
}


recvSize += int64(len(msg.Data))
recvSize += int64(len(msg.Data))


} else if msg.Type == agentserver.FileDataPacketType_EOF {
if msg.Type == agentserver.FileDataPacketType_EOF {
// 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash // 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash
hash, err := writer.Finish() hash, err := writer.Finish()
if err != nil { if err != nil {
@@ -65,7 +68,7 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer)
} }


// 并将结果返回到客户端 // 并将结果返回到客户端
err = server.SendAndClose(&agentserver.SendResp{
err = server.SendAndClose(&agentserver.SendIPFSFileResp{
FileHash: hash, FileHash: hash,
}) })
if err != nil { if err != nil {
@@ -78,10 +81,17 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer)
} }
} }


func (s *GRPCService) GetFile(req *agentserver.GetReq, server agentserver.FileTransport_GetFileServer) error {
func (s *Service) GetIPFSFile(req *agentserver.GetIPFSFileReq, server agentserver.Agent_GetIPFSFileServer) error {
log.WithField("FileHash", req.FileHash).Debugf("client download file") log.WithField("FileHash", req.FileHash).Debugf("client download file")


reader, err := s.ipfs.OpenRead(req.FileHash)
ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
log.Warnf("new ipfs client: %s", err.Error())
return fmt.Errorf("new ipfs client: %w", err)
}
defer ipfsCli.Close()

reader, err := ipfsCli.OpenRead(req.FileHash)
if err != nil { if err != nil {
log.Warnf("open file %s to read failed, err: %s", req.FileHash, err.Error()) log.Warnf("open file %s to read failed, err: %s", req.FileHash, err.Error())
return fmt.Errorf("open file to read failed, err: %w", err) return fmt.Errorf("open file to read failed, err: %w", err)

+ 29
- 0
internal/services/mq/agent.go View File

@@ -0,0 +1,29 @@
package mq

import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/consts"
"gitlink.org.cn/cloudream/storage-common/globals"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
)

func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) {
var ipfsState string

ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
logger.Warnf("new ipfs client: %s", err.Error())
ipfsState = consts.IPFSStateUnavailable

} else {
if ipfsCli.IsUp() {
ipfsState = consts.IPFSStateOK
} else {
ipfsState = consts.IPFSStateUnavailable
}
ipfsCli.Close()
}

return mq.ReplyOK(agtmq.NewGetStateResp(ipfsState))
}

internal/services/cmd/ipfs.go → internal/services/mq/ipfs.go View File

@@ -1,20 +1,29 @@
package cmd
package mq


import ( import (
"time" "time"


shell "github.com/ipfs/go-ipfs-api" shell "github.com/ipfs/go-ipfs-api"
"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-agent/internal/task"
"gitlink.org.cn/cloudream/storage-common/consts" "gitlink.org.cn/cloudream/storage-common/consts"
"gitlink.org.cn/cloudream/storage-common/globals"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
) )


func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.CodeMessage) { func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.CodeMessage) {
filesMap, err := svc.ipfs.GetPinnedFiles()
ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
logger.Warnf("new ipfs client: %s", err.Error())
return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "new ipfs client failed")
}
defer ipfsCli.Close()

filesMap, err := ipfsCli.GetPinnedFiles()
if err != nil { if err != nil {
logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error())
return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "get pinned files from ipfs failed") return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "get pinned files from ipfs failed")
@@ -22,13 +31,13 @@ func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.C


// TODO 根据锁定清单过滤被锁定的文件的记录 // TODO 根据锁定清单过滤被锁定的文件的记录
if msg.IsComplete { if msg.IsComplete {
return svc.checkComplete(msg, filesMap)
return svc.checkComplete(msg, filesMap, ipfsCli)
} else { } else {
return svc.checkIncrement(msg, filesMap)
return svc.checkIncrement(msg, filesMap, ipfsCli)
} }
} }


func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmq.CheckIPFSResp, *mq.CodeMessage) {
func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckIPFSResp, *mq.CodeMessage) {
var entries []agtmq.CheckIPFSRespEntry var entries []agtmq.CheckIPFSRespEntry
for _, cache := range msg.Caches { for _, cache := range msg.Caches {
_, ok := filesMap[cache.FileHash] _, ok := filesMap[cache.FileHash]
@@ -37,7 +46,7 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she
// 不处理 // 不处理
} else if cache.State == consts.CacheStateTemp { } else if cache.State == consts.CacheStateTemp {
logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp") logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp")
err := svc.ipfs.Unpin(cache.FileHash)
err := ipfsCli.Unpin(cache.FileHash)
if err != nil { if err != nil {
logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error()) logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error())
} }
@@ -63,7 +72,7 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she
return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries)) return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries))
} }


func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmq.CheckIPFSResp, *mq.CodeMessage) {
func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckIPFSResp, *mq.CodeMessage) {
var entries []agtmq.CheckIPFSRespEntry var entries []agtmq.CheckIPFSRespEntry
for _, cache := range msg.Caches { for _, cache := range msg.Caches {
_, ok := filesMap[cache.FileHash] _, ok := filesMap[cache.FileHash]
@@ -72,7 +81,7 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel
// 不处理 // 不处理
} else if cache.State == consts.CacheStateTemp { } else if cache.State == consts.CacheStateTemp {
logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp") logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp")
err := svc.ipfs.Unpin(cache.FileHash)
err := ipfsCli.Unpin(cache.FileHash)
if err != nil { if err != nil {
logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error()) logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error())
} }
@@ -96,7 +105,7 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel
// map中剩下的数据是没有被遍历过,即Cache中没有记录的,那么就Unpin文件,并产生一条Temp记录 // map中剩下的数据是没有被遍历过,即Cache中没有记录的,那么就Unpin文件,并产生一条Temp记录
for hash := range filesMap { for hash := range filesMap {
logger.WithField("FileHash", hash).Debugf("unpin for no cacah entry") logger.WithField("FileHash", hash).Debugf("unpin for no cacah entry")
err := svc.ipfs.Unpin(hash)
err := ipfsCli.Unpin(hash)
if err != nil { if err != nil {
logger.WithField("FileHash", hash).Warnf("unpin file failed, err: %s", err.Error()) logger.WithField("FileHash", hash).Warnf("unpin file failed, err: %s", err.Error())
} }

internal/services/cmd/object.go → internal/services/mq/object.go View File

@@ -1,4 +1,4 @@
package cmd
package mq


import ( import (
"time" "time"

+ 15
- 0
internal/services/mq/service.go View File

@@ -0,0 +1,15 @@
package mq

import (
"gitlink.org.cn/cloudream/storage-agent/internal/task"
)

type Service struct {
taskManager *task.Manager
}

func NewService(taskMgr *task.Manager) *Service {
return &Service{
taskManager: taskMgr,
}
}

internal/services/cmd/storage.go → internal/services/mq/storage.go View File

@@ -1,4 +1,4 @@
package cmd
package mq


import ( import (
"io/fs" "io/fs"
@@ -13,18 +13,26 @@ import (
"gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/config"
mytask "gitlink.org.cn/cloudream/storage-agent/internal/task" mytask "gitlink.org.cn/cloudream/storage-agent/internal/task"
"gitlink.org.cn/cloudream/storage-common/consts" "gitlink.org.cn/cloudream/storage-common/consts"
"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/utils" "gitlink.org.cn/cloudream/storage-common/utils"


"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
stgcmd "gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
myos "gitlink.org.cn/cloudream/storage-common/utils/os"
) )


func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) (*agtmq.StartStorageMovePackageResp, *mq.CodeMessage) { func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) (*agtmq.StartStorageMovePackageResp, *mq.CodeMessage) {
getStgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
logger.Warnf("new coordinator client: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed")
}
defer coorCli.Close()

getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
if err != nil { if err != nil {
logger.WithField("StorageID", msg.StorageID). logger.WithField("StorageID", msg.StorageID).
Warnf("getting storage info: %s", err.Error()) Warnf("getting storage info: %s", err.Error())
@@ -40,7 +48,7 @@ func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage)
return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed") return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed")
} }


tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext](stgcmd.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath)))
tsk := svc.taskManager.StartNew(mytask.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath))
return mq.ReplyOK(agtmq.NewStartStorageMovePackageResp(tsk.ID())) return mq.ReplyOK(agtmq.NewStartStorageMovePackageResp(tsk.ID()))
} }


@@ -152,7 +160,15 @@ func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, fileInfos []fs
} }


func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePackage) (*agtmq.StartStorageCreatePackageResp, *mq.CodeMessage) { func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePackage) (*agtmq.StartStorageCreatePackageResp, *mq.CodeMessage) {
getStgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
logger.Warnf("new coordinator client: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed")
}
defer coorCli.Close()

getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
if err != nil { if err != nil {
logger.WithField("StorageID", msg.StorageID). logger.WithField("StorageID", msg.StorageID).
Warnf("getting storage info: %s", err.Error()) Warnf("getting storage info: %s", err.Error())
@@ -180,7 +196,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka
return nil, mq.Failed(errorcode.OperationFailed, "read directory failed") return nil, mq.Failed(errorcode.OperationFailed, "read directory failed")
} }


objIter := myos.NewUploadingObjectIterator(fullPath, uploadFilePathes)
objIter := iterator.NewUploadingObjectIterator(fullPath, uploadFilePathes)


if msg.Redundancy.Type == models.RedundancyRep { if msg.Redundancy.Type == models.RedundancyRep {
repInfo, err := msg.Redundancy.ToRepInfo() repInfo, err := msg.Redundancy.ToRepInfo()
@@ -190,14 +206,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka
return nil, mq.Failed(errorcode.OperationFailed, "get rep redundancy info failed") return nil, mq.Failed(errorcode.OperationFailed, "get rep redundancy info failed")
} }


tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext](
stgcmd.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo, stgcmd.UploadConfig{
LocalIPFS: svc.ipfs,
LocalNodeID: &config.Cfg().ID,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})))
tsk := svc.taskManager.StartNew(mytask.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo))
return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID()))
} }


@@ -208,14 +217,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka
return nil, mq.Failed(errorcode.OperationFailed, "get ec redundancy info failed") return nil, mq.Failed(errorcode.OperationFailed, "get ec redundancy info failed")
} }


tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext](
stgcmd.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo, config.Cfg().ECPacketSize, stgcmd.UploadConfig{
LocalIPFS: svc.ipfs,
LocalNodeID: &config.Cfg().ID,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})))
tsk := svc.taskManager.StartNew(mytask.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo))
return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID()))
} }


@@ -231,17 +233,16 @@ func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(false, "", 0)) return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(false, "", 0))
} }


wrapTask := tsk.Body().(*stgcmd.TaskWrapper[mytask.TaskContext])

if tsk.Error() != nil { if tsk.Error() != nil {
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, tsk.Error().Error(), 0)) return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, tsk.Error().Error(), 0))
} }


if repTask, ok := wrapTask.InnerTask().(*stgcmd.CreateRepPackage); ok {
// TODO 避免判断类型
if repTask, ok := tsk.Body().(*mytask.CreateRepPackage); ok {
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", repTask.Result.PackageID)) return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", repTask.Result.PackageID))
} }


if ecTask, ok := wrapTask.InnerTask().(*stgcmd.CreateECPackage); ok {
if ecTask, ok := tsk.Body().(*mytask.CreateECPackage); ok {
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", ecTask.Result.PackageID)) return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", ecTask.Result.PackageID))
} }



+ 38
- 0
internal/task/create_ec_package.go View File

@@ -0,0 +1,38 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

type CreateECPackageResult = cmd.CreateECPackageResult

type CreateECPackage struct {
cmd cmd.CreateECPackage

Result *CreateECPackageResult
}

func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage {
return &CreateECPackage{
cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy),
}
}

func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{
UpdatePackageContext: &cmd.UpdatePackageContext{
Distlock: ctx.distlock,
},
ECPacketSize: config.Cfg().ECPacketSize,
})
t.Result = ret

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 34
- 0
internal/task/create_rep_package.go View File

@@ -0,0 +1,34 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

type CreateRepPackageResult = cmd.CreateRepPackageResult

type CreateRepPackage struct {
cmd cmd.CreateRepPackage

Result *CreateRepPackageResult
}

func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage {
return &CreateRepPackage{
cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy),
}
}

func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock,
})
t.Result = ret

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 26
- 0
internal/task/download_package.go View File

@@ -0,0 +1,26 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
)

type DownloadPackage struct {
cmd *cmd.DownloadPackage
}

func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage {
return &DownloadPackage{
cmd: cmd.NewDownloadPackage(userID, packageID, outputPath),
}
}
func (t *DownloadPackage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.cmd.Execute(&cmd.DownloadPackageContext{
Distlock: ctx.distlock,
})

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 14
- 1
internal/task/ipfs_pin.go View File

@@ -5,6 +5,7 @@ import (
"time" "time"


"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-common/globals"
) )


type IPFSPin struct { type IPFSPin struct {
@@ -31,7 +32,19 @@ func (t *IPFSPin) Execute(ctx TaskContext, complete CompleteFn) {
log.Debugf("begin with %v", logger.FormatStruct(t)) log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end") defer log.Debugf("end")


err := ctx.ipfs.Pin(t.FileHash)
ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
err := fmt.Errorf("new ipfs client: %w", err)
log.Warn(err.Error())

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
return
}
defer ipfsCli.Close()

err = ipfsCli.Pin(t.FileHash)
if err != nil { if err != nil {
err := fmt.Errorf("pin file failed, err: %w", err) err := fmt.Errorf("pin file failed, err: %w", err)
log.WithField("FileHash", t.FileHash).Warn(err.Error()) log.WithField("FileHash", t.FileHash).Warn(err.Error())


+ 14
- 1
internal/task/ipfs_read.go View File

@@ -8,6 +8,7 @@ import (
"time" "time"


"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-common/globals"
) )


type IPFSRead struct { type IPFSRead struct {
@@ -61,7 +62,19 @@ func (t *IPFSRead) Execute(ctx TaskContext, complete CompleteFn) {
} }
defer outputFile.Close() defer outputFile.Close()


rd, err := ctx.ipfs.OpenRead(t.FileHash)
ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
err := fmt.Errorf("new ipfs client: %w", err)
log.Warn(err.Error())

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
return
}
defer ipfsCli.Close()

rd, err := ipfsCli.OpenRead(t.FileHash)
if err != nil { if err != nil {
err := fmt.Errorf("read ipfs file failed, err: %w", err) err := fmt.Errorf("read ipfs file failed, err: %w", err)
log.WithField("FileHash", t.FileHash).Warn(err.Error()) log.WithField("FileHash", t.FileHash).Warn(err.Error())


+ 3
- 9
internal/task/task.go View File

@@ -3,14 +3,10 @@ package task
import ( import (
distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/common/utils/ipfs"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
) )


type TaskContext struct { type TaskContext struct {
ipfs *ipfs.IPFS
coordinator *coormq.Client
distlock *distsvc.Service
distlock *distsvc.Service
} }


// 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用,
@@ -25,10 +21,8 @@ type Task = task.Task[TaskContext]


type CompleteOption = task.CompleteOption type CompleteOption = task.CompleteOption


func NewManager(ipfs *ipfs.IPFS, coorCli *coormq.Client, distLock *distsvc.Service) Manager {
func NewManager(distlock *distsvc.Service) Manager {
return task.NewManager(TaskContext{ return task.NewManager(TaskContext{
ipfs: ipfs,
coordinator: coorCli,
distlock: distLock,
distlock: distlock,
}) })
} }

+ 13
- 17
main.go View File

@@ -8,18 +8,17 @@ import (


distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
log "gitlink.org.cn/cloudream/common/pkgs/logger" log "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/ipfs"
"gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-agent/internal/task" "gitlink.org.cn/cloudream/storage-agent/internal/task"
agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/proto"
"gitlink.org.cn/cloudream/storage-common/globals"
agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent"


"google.golang.org/grpc" "google.golang.org/grpc"


agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"


cmdsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/cmd"
grpcsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/grpc" grpcsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/grpc"
cmdsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/mq"
) )


// TODO 此数据是否在运行时会发生变化? // TODO 此数据是否在运行时会发生变化?
@@ -41,15 +40,12 @@ func main() {
os.Exit(1) os.Exit(1)
} }


ipfs, err := ipfs.NewIPFS(&config.Cfg().IPFS)
if err != nil {
log.Fatalf("new ipfs failed, err: %s", err.Error())
}

coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ)
if err != nil {
log.Fatalf("new ipfs failed, err: %s", err.Error())
}
globals.InitLocal(&config.Cfg().Local)
globals.InitMQPool(&config.Cfg().RabbitMQ)
globals.InitAgentRPCPool(&agtrpc.PoolConfig{
Port: config.Cfg().GRPC.Port,
})
globals.InitIPFSPool(&config.Cfg().IPFS)


distlock, err := distsvc.NewService(&config.Cfg().DistLock) distlock, err := distsvc.NewService(&config.Cfg().DistLock)
if err != nil { if err != nil {
@@ -60,11 +56,11 @@ func main() {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(5) wg.Add(5)


taskMgr := task.NewManager(ipfs, coorCli, distlock)
taskMgr := task.NewManager(distlock)


// 启动命令服务器 // 启动命令服务器
// TODO 需要设计AgentID持久化机制 // TODO 需要设计AgentID持久化机制
agtSvr, err := agtmq.NewServer(cmdsvc.NewService(ipfs, &taskMgr, coorCli), config.Cfg().ID, &config.Cfg().RabbitMQ)
agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr), config.Cfg().ID, &config.Cfg().RabbitMQ)
if err != nil { if err != nil {
log.Fatalf("new agent server failed, err: %s", err.Error()) log.Fatalf("new agent server failed, err: %s", err.Error())
} }
@@ -76,14 +72,14 @@ func main() {
go reportStatus(&wg) //网络延迟感知 go reportStatus(&wg) //网络延迟感知


//面向客户端收发数据 //面向客户端收发数据
listenAddr := config.Cfg().GRPCListenAddress
listenAddr := config.Cfg().GRPC.MakeListenAddress()
lis, err := net.Listen("tcp", listenAddr) lis, err := net.Listen("tcp", listenAddr)
if err != nil { if err != nil {
log.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) log.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error())
} }


s := grpc.NewServer() s := grpc.NewServer()
agentserver.RegisterFileTransportServer(s, grpcsvc.NewService(ipfs))
agtrpc.RegisterAgentServer(s, grpcsvc.NewService())
go serveGRPC(s, lis, &wg) go serveGRPC(s, lis, &wg)


go serveDistLock(distlock) go serveDistLock(distlock)


+ 3
- 4
status_report.go View File

@@ -7,13 +7,12 @@ import (
log "gitlink.org.cn/cloudream/common/pkgs/logger" log "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-common/consts" "gitlink.org.cn/cloudream/storage-common/consts"
coorcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage-common/utils" "gitlink.org.cn/cloudream/storage-common/utils"
) )


func reportStatus(wg *sync.WaitGroup) { func reportStatus(wg *sync.WaitGroup) {
coorCli, err := coorcli.NewClient(&config.Cfg().RabbitMQ)
coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ)
if err != nil { if err != nil {
wg.Done() wg.Done()
log.Error("new coordinator client failed, err: %w", err) log.Error("new coordinator client failed, err: %w", err)
@@ -56,7 +55,7 @@ func reportStatus(wg *sync.WaitGroup) {


//发送心跳 //发送心跳
// TODO 由于数据结构未定,暂时不发送真实数据 // TODO 由于数据结构未定,暂时不发送真实数据
coorCli.AgentStatusReport(coormsg.NewAgentStatusReportBody(config.Cfg().ID, []int64{}, []int{}, ipfsStatus, localDirStatus))
coorCli.AgentStatusReport(coormq.NewAgentStatusReportBody(config.Cfg().ID, []int64{}, []int{}, ipfsStatus, localDirStatus))


time.Sleep(time.Minute * 5) time.Sleep(time.Minute * 5)
} }


Loading…
Cancel
Save