From 3d546092dea43c4b87d17f16a380a169857e5ca2 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 22 Aug 2023 10:20:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0ClientPool=EF=BC=8C=E8=A7=A3?= =?UTF-8?q?=E9=99=A4=E5=AF=B9config=E7=9A=84=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consts/consts.go | 3 +- globals/globals.go | 11 + globals/pools.go | 36 ++ go.mod | 1 + go.sum | 2 + magefiles/main.go | 17 + models/models.go | 6 + pkgs/cmd/cmd.go | 3 + pkgs/cmd/create_ec_package.go | 238 ++++-------- pkgs/cmd/create_rep_package.go | 189 ++++------ pkgs/cmd/download_package.go | 66 ++-- pkgs/cmd/update_ec_package.go | 64 ++-- pkgs/cmd/update_rep_package.go | 55 ++- pkgs/grpc/agent/agent.pb.go | 345 ++++++++++++++++++ .../agent/agent.proto} | 12 +- pkgs/grpc/agent/agent_grpc.pb.go | 209 +++++++++++ pkgs/grpc/agent/client.go | 131 +++++++ pkgs/grpc/agent/pool.go | 43 +++ pkgs/grpc/config.go | 12 + pkgs/iterator/ec_object_iterator.go | 191 ++-------- pkgs/iterator/rep_object_iterator.go | 84 ++--- pkgs/mq/agent/client.go | 40 +- pkgs/mq/coordinator/client.go | 40 +- pkgs/mq/scanner/client.go | 40 +- pkgs/proto/Makefile | 2 - pkgs/proto/file_transport.pb.go | 343 ----------------- pkgs/proto/file_transport_grpc.pb.go | 209 ----------- utils/grpc/file_transport.go | 120 ------ 28 files changed, 1263 insertions(+), 1249 deletions(-) create mode 100644 globals/globals.go create mode 100644 globals/pools.go create mode 100644 magefiles/main.go create mode 100644 pkgs/cmd/cmd.go create mode 100644 pkgs/grpc/agent/agent.pb.go rename pkgs/{proto/file_transport.proto => grpc/agent/agent.proto} (57%) create mode 100644 pkgs/grpc/agent/agent_grpc.pb.go create mode 100644 pkgs/grpc/agent/client.go create mode 100644 pkgs/grpc/agent/pool.go create mode 100644 pkgs/grpc/config.go delete mode 100644 pkgs/proto/Makefile delete mode 100644 pkgs/proto/file_transport.pb.go delete mode 100644 pkgs/proto/file_transport_grpc.pb.go delete mode 100644 utils/grpc/file_transport.go diff --git a/consts/consts.go b/consts/consts.go index 36b27e9..6fc4bcb 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -1,7 +1,8 @@ package consts const ( - IPFSStateOK = "OK" + IPFSStateOK = "OK" + IPFSStateUnavailable = "Unavailable" StorageDirectoryStateOK = "OK" diff --git a/globals/globals.go b/globals/globals.go new file mode 100644 index 0000000..7393611 --- /dev/null +++ b/globals/globals.go @@ -0,0 +1,11 @@ +package globals + +import ( + stgmodels "gitlink.org.cn/cloudream/storage-common/models" +) + +var Local *stgmodels.LocalMachineInfo + +func InitLocal(info *stgmodels.LocalMachineInfo) { + Local = info +} diff --git a/globals/pools.go b/globals/pools.go new file mode 100644 index 0000000..51dd572 --- /dev/null +++ b/globals/pools.go @@ -0,0 +1,36 @@ +package globals + +import ( + "gitlink.org.cn/cloudream/common/pkgs/ipfs" + agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent" + stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" + agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" + coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" + scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" +) + +var AgentMQPool *agtmq.Pool + +var CoordinatorMQPool *coormq.Pool + +var ScannerMQPool *scmq.Pool + +func InitMQPool(cfg *stgmq.Config) { + AgentMQPool = agtmq.NewPool(cfg) + + CoordinatorMQPool = coormq.NewPool(cfg) + + ScannerMQPool = scmq.NewPool(cfg) +} + +var AgentRPCPool *agtrpc.Pool + +func InitAgentRPCPool(cfg *agtrpc.PoolConfig) { + AgentRPCPool = agtrpc.NewPool(cfg) +} + +var IPFSPool *ipfs.Pool + +func InitIPFSPool(cfg *ipfs.Config) { + IPFSPool = ipfs.NewPool(cfg) +} diff --git a/go.mod b/go.mod index 4c702ee..1f02dcc 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/go-ping/ping v1.1.0 github.com/go-sql-driver/mysql v1.7.1 github.com/jmoiron/sqlx v1.3.5 + github.com/magefile/mage v1.15.0 github.com/samber/lo v1.36.0 github.com/smartystreets/goconvey v1.8.0 gitlink.org.cn/cloudream/common v0.0.0 diff --git a/go.sum b/go.sum index b099d7e..5cb2600 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,8 @@ github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFG github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro= github.com/libp2p/go-libp2p v0.26.3 h1:6g/psubqwdaBqNNoidbRKSTBEYgaOuKBhHl8Q5tO+PM= github.com/libp2p/go-libp2p v0.26.3/go.mod h1:x75BN32YbwuY0Awm2Uix4d4KOz+/4piInkp4Wr3yOo8= +github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= +github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= diff --git a/magefiles/main.go b/magefiles/main.go new file mode 100644 index 0000000..b97191c --- /dev/null +++ b/magefiles/main.go @@ -0,0 +1,17 @@ +//go:build mage + +package main + +import ( + "path/filepath" + + "github.com/magefile/mage/sh" +) + +func Protos() error { + return proto("pkgs/grpc/agent", "agent.proto") +} + +func proto(dir string, fileName string) error { + return sh.Run("protoc", "--go_out="+dir, "--go-grpc_out="+dir, filepath.Join(dir, fileName)) +} diff --git a/models/models.go b/models/models.go index d92d282..f260816 100644 --- a/models/models.go +++ b/models/models.go @@ -81,3 +81,9 @@ func NewObjectECData(blocks []ObjectBlockData) ObjectECData { Blocks: blocks, } } + +type LocalMachineInfo struct { + NodeID *int64 `json:"nodeID"` + ExternalIP string `json:"externalIP"` + LocalIP string `json:"localIP"` +} diff --git a/pkgs/cmd/cmd.go b/pkgs/cmd/cmd.go new file mode 100644 index 0000000..0149075 --- /dev/null +++ b/pkgs/cmd/cmd.go @@ -0,0 +1,3 @@ +package cmd + +// 这个包主要存放一些公共的业务逻辑代码 diff --git a/pkgs/cmd/create_ec_package.go b/pkgs/cmd/create_ec_package.go index d24eb23..3eb6e5b 100644 --- a/pkgs/cmd/create_ec_package.go +++ b/pkgs/cmd/create_ec_package.go @@ -1,41 +1,34 @@ package cmd import ( - "bytes" "fmt" "io" "math/rand" "os" "path/filepath" "sync" - "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/models" - "gitlink.org.cn/cloudream/common/pkgs/logger" - mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc" + "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/ec" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" - agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" - agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) type CreateECPackage struct { - userID int64 - bucketID int64 - name string - objectIter iterator.UploadingObjectIterator - redundancy models.ECRedundancyInfo - ecPacketSize int64 - uploadConfig UploadConfig - - Result CreateECPackageResult + userID int64 + bucketID int64 + name string + objectIter iterator.UploadingObjectIterator + redundancy models.ECRedundancyInfo +} + +type UpdateECPackageContext struct { + *UpdatePackageContext + ECPacketSize int64 } type CreateECPackageResult struct { @@ -49,27 +42,24 @@ type ECObjectUploadResult struct { ObjectID int64 } -func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo, ecPacketSize int64, uploadConfig UploadConfig) *CreateECPackage { +func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { return &CreateECPackage{ - userID: userID, - bucketID: bucketID, - name: name, - objectIter: objIter, - redundancy: redundancy, - ecPacketSize: ecPacketSize, - uploadConfig: uploadConfig, + userID: userID, + bucketID: bucketID, + name: name, + objectIter: objIter, + redundancy: redundancy, } } -func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) { - err := t.do(ctx) - t.objectIter.Close() - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} +func (t *CreateECPackage) Execute(ctx *UpdateECPackageContext) (*CreateECPackageResult, error) { + defer t.objectIter.Close() + + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } -func (t *CreateECPackage) do(ctx TaskContext) error { // TODO2 /* reqBlder := reqbuilder.NewBuilder() @@ -101,20 +91,20 @@ func (t *CreateECPackage) do(ctx TaskContext) error { defer mutex.Unlock() */ - createPkgResp, err := ctx.Coordinator().CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name, + createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name, models.NewTypedRedundancyInfo(models.RedundancyRep, t.redundancy))) if err != nil { - return fmt.Errorf("creating package: %w", err) + return nil, fmt.Errorf("creating package: %w", err) } - getUserNodesResp, err := ctx.Coordinator().GetUserNodes(coormq.NewGetUserNodes(t.userID)) + getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) if err != nil { - return fmt.Errorf("getting user nodes: %w", err) + return nil, fmt.Errorf("getting user nodes: %w", err) } - findCliLocResp, err := ctx.Coordinator().FindClientLocation(coormq.NewFindClientLocation(t.uploadConfig.ExternalIP)) + findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP)) if err != nil { - return fmt.Errorf("finding client location: %w", err) + return nil, fmt.Errorf("finding client location: %w", err) } uploadNodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo { @@ -124,9 +114,9 @@ func (t *CreateECPackage) do(ctx TaskContext) error { } }) - getECResp, err := ctx.Coordinator().GetECConfig(coormq.NewGetECConfig(t.redundancy.ECName)) + getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(t.redundancy.ECName)) if err != nil { - return fmt.Errorf("getting ec: %w", err) + return nil, fmt.Errorf("getting ec: %w", err) } /* @@ -141,17 +131,23 @@ func (t *CreateECPackage) do(ctx TaskContext) error { defer mutex2.Unlock() */ - rets, err := uploadAndUpdateECPackage(ctx, createPkgResp.PackageID, t.objectIter, uploadNodeInfos, getECResp.Config, t.ecPacketSize, t.uploadConfig) + rets, err := uploadAndUpdateECPackage(ctx, createPkgResp.PackageID, t.objectIter, uploadNodeInfos, getECResp.Config) if err != nil { - return err + return nil, err } - t.Result.PackageID = createPkgResp.PackageID - t.Result.ObjectResults = rets - return nil + return &CreateECPackageResult{ + PackageID: createPkgResp.PackageID, + ObjectResults: rets, + }, nil } -func uploadAndUpdateECPackage(ctx TaskContext, packageID int64, objectIter iterator.UploadingObjectIterator, uploadNodes []UploadNodeInfo, ec model.Ec, ecPacketSize int64, uploadConfig UploadConfig) ([]ECObjectUploadResult, error) { +func uploadAndUpdateECPackage(ctx *UpdateECPackageContext, packageID int64, objectIter iterator.UploadingObjectIterator, uploadNodes []UploadNodeInfo, ec model.Ec) ([]ECObjectUploadResult, error) { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + var uploadRets []ECObjectUploadResult //上传文件夹 var adds []coormq.AddECObjectInfo @@ -164,7 +160,7 @@ func uploadAndUpdateECPackage(ctx TaskContext, packageID int64, objectIter itera return nil, fmt.Errorf("reading object: %w", err) } - fileHashes, uploadedNodeIDs, err := uploadECObject(ctx, objInfo, uploadNodes, ec, ecPacketSize, uploadConfig) + fileHashes, uploadedNodeIDs, err := uploadECObject(ctx, objInfo, uploadNodes, ec) uploadRets = append(uploadRets, ECObjectUploadResult{ Info: objInfo, Error: err, @@ -176,7 +172,7 @@ func uploadAndUpdateECPackage(ctx TaskContext, packageID int64, objectIter itera adds = append(adds, coormq.NewAddECObjectInfo(objInfo.Path, objInfo.Size, fileHashes, uploadedNodeIDs)) } - _, err := ctx.Coordinator().UpdateECPackage(coormq.NewUpdateECPackage(packageID, adds, nil)) + _, err = coorCli.UpdateECPackage(coormq.NewUpdateECPackage(packageID, adds, nil)) if err != nil { return nil, fmt.Errorf("updating package: %w", err) } @@ -185,7 +181,7 @@ func uploadAndUpdateECPackage(ctx TaskContext, packageID int64, objectIter itera } // 上传文件 -func uploadECObject(ctx TaskContext, obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ec model.Ec, ecPacketSize int64, uploadConfig UploadConfig) ([]string, []int64, error) { +func uploadECObject(ctx *UpdateECPackageContext, obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ec model.Ec) ([]string, []int64, error) { //生成纠删码的写入节点序列 nodes := make([]UploadNodeInfo, ec.EcN) numNodes := len(uploadNodes) @@ -194,7 +190,7 @@ func uploadECObject(ctx TaskContext, obj *iterator.IterUploadingObject, uploadNo nodes[i] = uploadNodes[(startWriteNodeID+i)%numNodes] } - hashs, err := ecWrite(obj.File, obj.Size, ec.EcK, ec.EcN, nodes, ecPacketSize, uploadConfig) + hashs, err := ecWrite(ctx, obj.File, obj.Size, ec.EcK, ec.EcN, nodes) if err != nil { return nil, nil, fmt.Errorf("EcWrite failed, err: %w", err) } @@ -219,14 +215,13 @@ func (t *CreateECPackage) chooseUploadNode(nodes []UploadNodeInfo) UploadNodeInf return nodes[rand.Intn(len(nodes))] } -func ecWrite(file io.ReadCloser, fileSize int64, ecK int, ecN int, nodes []UploadNodeInfo, ecPacketSize int64, uploadConfig UploadConfig) ([]string, error) { - +func ecWrite(ctx *UpdateECPackageContext, file io.ReadCloser, fileSize int64, ecK int, ecN int, nodes []UploadNodeInfo) ([]string, error) { // TODO 需要参考RepWrite函数的代码逻辑,做好错误处理 //获取文件大小 var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN //计算每个块的packet数 - numPacket := (fileSize + int64(ecK)*ecPacketSize - 1) / (int64(ecK) * ecPacketSize) + numPacket := (fileSize + int64(ecK)*ctx.ECPacketSize - 1) / (int64(ecK) * ctx.ECPacketSize) //fmt.Println(numPacket) //创建channel loadBufs := make([]chan []byte, ecN) @@ -239,7 +234,7 @@ func ecWrite(file io.ReadCloser, fileSize int64, ecK int, ecN int, nodes []Uploa } hashs := make([]string, ecN) //正式开始写入 - go load(file, loadBufs[:ecN], ecK, numPacket*int64(ecK), ecPacketSize) //从本地文件系统加载数据 + go load(file, loadBufs[:ecN], ecK, numPacket*int64(ecK), ctx.ECPacketSize) //从本地文件系统加载数据 go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket) var wg sync.WaitGroup @@ -253,8 +248,18 @@ func ecWrite(file io.ReadCloser, fileSize int64, ecK int, ecN int, nodes []Uploa } defer mutex.Unlock() */ - for i := 0; i < ecN; i++ { - go send(nodes[i], encodeBufs[i], numPacket, &wg, hashs, i, uploadConfig) + for idx := 0; idx < ecN; idx++ { + i := idx + reader := channelBytesReader{ + channel: encodeBufs[idx], + packetCount: numPacket, + } + go func() { + // TODO 处理错误 + fileHash, _ := uploadFile(&reader, nodes[i]) + hashs[i] = fileHash + wg.Done() + }() } wg.Wait() @@ -310,115 +315,26 @@ func encode(inBufs []chan []byte, outBufs []chan []byte, ecK int, coefs [][]int6 } } -func send(node UploadNodeInfo, inBuf chan []byte, numPacket int64, wg *sync.WaitGroup, hashs []string, idx int, uploadConfig UploadConfig) error { - // TODO zkx 先直接复制client\internal\task\upload_rep_objects.go中的uploadToNode和uploadToLocalIPFS来替代这部分逻辑 - // 方便之后异步化处理 - // uploadToAgent的逻辑反了,而且中间步骤失败,就必须打印日志后停止后续操作 +type channelBytesReader struct { + channel chan []byte + packetCount int64 + readingData []byte +} - uploadToAgent := true - if uploadConfig.LocalIPFS != nil { //使用IPFS传输 - //创建IPFS文件 - logger.Infof("try to use local IPFS to upload block") - writer, err := uploadConfig.LocalIPFS.CreateFile() - if err != nil { - uploadToAgent = false - fmt.Errorf("create IPFS file failed, err: %w", err) - } - //逐packet写进ipfs - for i := 0; int64(i) < numPacket; i++ { - buf := <-inBuf - reader := bytes.NewReader(buf) - _, err = io.Copy(writer, reader) - if err != nil { - uploadToAgent = false - fmt.Errorf("copying block data to IPFS file failed, err: %w", err) - } - } - //finish, 获取哈希 - fileHash, err := writer.Finish() - if err != nil { - logger.Warnf("upload block to local IPFS failed, so try to upload by agent, err: %s", err.Error()) - uploadToAgent = false - fmt.Errorf("finish writing blcok to IPFS failed, err: %w", err) +func (r *channelBytesReader) Read(buf []byte) (int, error) { + if len(r.readingData) == 0 { + if r.packetCount == 0 { + return 0, io.EOF } - hashs[idx] = fileHash - if err != nil { - } - nodeID := node.Node.NodeID - // 然后让最近节点pin本地上传的文件 - agentClient, err := agtmq.NewClient(nodeID, uploadConfig.MQ) - if err != nil { - uploadToAgent = false - fmt.Errorf("create agent client to %d failed, err: %w", nodeID, err) - } - defer agentClient.Close() - pinObjResp, err := agentClient.StartPinningObject(agtmq.NewStartPinningObject(fileHash)) - if err != nil { - uploadToAgent = false - fmt.Errorf("start pinning object: %w", err) - } - for { - waitResp, err := agentClient.WaitPinningObject(agtmq.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5)) - if err != nil { - uploadToAgent = false - fmt.Errorf("waitting pinning object: %w", err) - } - if waitResp.IsComplete { - if waitResp.Error != "" { - uploadToAgent = false - fmt.Errorf("agent pinning object: %s", waitResp.Error) - } - break - } - } - if uploadToAgent == false { - return nil - } + r.readingData = <-r.channel + r.packetCount-- } - //////////////////////////////通过Agent上传 - if uploadToAgent == true { - // 如果客户端与节点在同一个地域,则使用内网地址连接节点 - nodeIP := node.Node.ExternalIP - if node.IsSameLocation { - nodeIP = node.Node.LocalIP - - logger.Infof("client and node %d are at the same location, use local ip\n", node.Node.NodeID) - } - grpcAddr := fmt.Sprintf("%s:%d", nodeIP, uploadConfig.GRPCPort) - grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) - } - defer grpcCon.Close() + len := copy(buf, r.readingData) + r.readingData = r.readingData[:len] - client := agentcaller.NewFileTransportClient(grpcCon) - upload, err := mygrpc.SendFileAsStream(client) - if err != nil { - return fmt.Errorf("request to send file failed, err: %w", err) - } - // 发送文件数据 - for i := 0; int64(i) < numPacket; i++ { - buf := <-inBuf - reader := bytes.NewReader(buf) - _, err = io.Copy(upload, reader) - if err != nil { - // 发生错误则关闭连接 - upload.Abort(io.ErrClosedPipe) - return fmt.Errorf("copy block date to upload stream failed, err: %w", err) - } - } - // 发送EOF消息,并获得FileHash - fileHash, err := upload.Finish() - if err != nil { - upload.Abort(io.ErrClosedPipe) - return fmt.Errorf("send EOF failed, err: %w", err) - } - hashs[idx] = fileHash - wg.Done() - } - return nil + return len, nil } func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) { diff --git a/pkgs/cmd/create_rep_package.go b/pkgs/cmd/create_rep_package.go index 18d0d71..e9e77bc 100644 --- a/pkgs/cmd/create_rep_package.go +++ b/pkgs/cmd/create_rep_package.go @@ -9,19 +9,14 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" + distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/utils/ipfs" - mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc" + "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" - mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" - agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) type UploadNodeInfo struct { @@ -30,22 +25,15 @@ type UploadNodeInfo struct { } type CreateRepPackage struct { - userID int64 - bucketID int64 - name string - objectIter iterator.UploadingObjectIterator - redundancy models.RepRedundancyInfo - uploadConfig UploadConfig - - Result CreateRepPackageResult + userID int64 + bucketID int64 + name string + objectIter iterator.UploadingObjectIterator + redundancy models.RepRedundancyInfo } -type UploadConfig struct { - LocalIPFS *ipfs.IPFS - LocalNodeID *int64 - ExternalIP string - GRPCPort int - MQ *mymq.Config +type UpdatePackageContext struct { + Distlock *distsvc.Service } type CreateRepPackageResult struct { @@ -60,26 +48,24 @@ type RepObjectUploadResult struct { ObjectID int64 } -func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo, uploadConfig UploadConfig) *CreateRepPackage { +func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { return &CreateRepPackage{ - userID: userID, - bucketID: bucketID, - name: name, - objectIter: objIter, - redundancy: redundancy, - uploadConfig: uploadConfig, + userID: userID, + bucketID: bucketID, + name: name, + objectIter: objIter, + redundancy: redundancy, } } -func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { - err := t.do(ctx) - t.objectIter.Close() - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} +func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackageResult, error) { + defer t.objectIter.Close() + + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } -func (t *CreateRepPackage) do(ctx TaskContext) error { /* // TODO2 reqBlder := reqbuilder.NewBuilder() @@ -110,20 +96,20 @@ func (t *CreateRepPackage) do(ctx TaskContext) error { } defer mutex.Unlock() */ - createPkgResp, err := ctx.Coordinator().CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name, + createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name, models.NewTypedRedundancyInfo(models.RedundancyRep, t.redundancy))) if err != nil { - return fmt.Errorf("creating package: %w", err) + return nil, fmt.Errorf("creating package: %w", err) } - getUserNodesResp, err := ctx.Coordinator().GetUserNodes(coormq.NewGetUserNodes(t.userID)) + getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) if err != nil { - return fmt.Errorf("getting user nodes: %w", err) + return nil, fmt.Errorf("getting user nodes: %w", err) } - findCliLocResp, err := ctx.Coordinator().FindClientLocation(coormq.NewFindClientLocation(t.uploadConfig.ExternalIP)) + findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP)) if err != nil { - return fmt.Errorf("finding client location: %w", err) + return nil, fmt.Errorf("finding client location: %w", err) } nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo { @@ -137,25 +123,30 @@ func (t *CreateRepPackage) do(ctx TaskContext) error { // 防止上传的副本被清除 mutex2, err := reqbuilder.NewBuilder(). IPFS().CreateAnyRep(uploadNode.Node.NodeID). - MutexLock(ctx.DistLock()) + MutexLock(ctx.Distlock) if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) + return nil, fmt.Errorf("acquire locks failed, err: %w", err) } defer mutex2.Unlock() - rets, err := uploadAndUpdateRepPackage(ctx, createPkgResp.PackageID, t.objectIter, uploadNode, t.uploadConfig) + rets, err := uploadAndUpdateRepPackage(createPkgResp.PackageID, t.objectIter, uploadNode) if err != nil { - return err + return nil, err } - t.Result.PackageID = createPkgResp.PackageID - t.Result.ObjectResults = rets - return nil + return &CreateRepPackageResult{ + PackageID: createPkgResp.PackageID, + ObjectResults: rets, + }, nil } -func uploadAndUpdateRepPackage(ctx TaskContext, packageID int64, objectIter iterator.UploadingObjectIterator, uploadNode UploadNodeInfo, uploadConfig UploadConfig) ([]RepObjectUploadResult, error) { +func uploadAndUpdateRepPackage(packageID int64, objectIter iterator.UploadingObjectIterator, uploadNode UploadNodeInfo) ([]RepObjectUploadResult, error) { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + var uploadRets []RepObjectUploadResult - //上传文件夹 var adds []coormq.AddRepObjectInfo for { objInfo, err := objectIter.MoveNext() @@ -166,7 +157,7 @@ func uploadAndUpdateRepPackage(ctx TaskContext, packageID int64, objectIter iter return nil, fmt.Errorf("reading object: %w", err) } - fileHash, uploadedNodeIDs, err := uploadObject(ctx, objInfo, uploadNode, uploadConfig) + fileHash, err := uploadFile(objInfo.File, uploadNode) uploadRets = append(uploadRets, RepObjectUploadResult{ Info: objInfo, Error: err, @@ -176,10 +167,10 @@ func uploadAndUpdateRepPackage(ctx TaskContext, packageID int64, objectIter iter return nil, fmt.Errorf("uploading object: %w", err) } - adds = append(adds, coormq.NewAddRepObjectInfo(objInfo.Path, objInfo.Size, fileHash, uploadedNodeIDs)) + adds = append(adds, coormq.NewAddRepObjectInfo(objInfo.Path, objInfo.Size, fileHash, []int64{uploadNode.Node.NodeID})) } - _, err := ctx.Coordinator().UpdateRepPackage(coormq.NewUpdateRepPackage(packageID, adds, nil)) + _, err = coorCli.UpdateRepPackage(coormq.NewUpdateRepPackage(packageID, adds, nil)) if err != nil { return nil, fmt.Errorf("updating package: %w", err) } @@ -188,15 +179,15 @@ func uploadAndUpdateRepPackage(ctx TaskContext, packageID int64, objectIter iter } // 上传文件 -func uploadObject(ctx TaskContext, obj *iterator.IterUploadingObject, uploadNode UploadNodeInfo, uploadConfig UploadConfig) (string, []int64, error) { +func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { // 本地有IPFS,则直接从本地IPFS上传 - if uploadConfig.LocalIPFS != nil { + if globals.IPFSPool != nil { logger.Infof("try to use local IPFS to upload file") // 只有本地IPFS不是存储系统中的一个节点,才需要Pin文件 - fileHash, err := uploadToLocalIPFS(uploadConfig.LocalIPFS, obj.File, uploadNode.Node.NodeID, uploadConfig.LocalNodeID == nil, uploadConfig) + fileHash, err := uploadToLocalIPFS(file, uploadNode.Node.NodeID, globals.Local.NodeID == nil) if err == nil { - return fileHash, []int64{*uploadConfig.LocalNodeID}, nil + return fileHash, nil } else { logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.Node.NodeID, err.Error()) @@ -212,12 +203,12 @@ func uploadObject(ctx TaskContext, obj *iterator.IterUploadingObject, uploadNode logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.Node.NodeID) } - fileHash, err := uploadToNode(obj.File, nodeIP, uploadConfig) + fileHash, err := uploadToNode(file, nodeIP) if err != nil { - return "", nil, fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err) + return "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err) } - return fileHash, []int64{uploadNode.Node.NodeID}, nil + return fileHash, nil } // chooseUploadNode 选择一个上传文件的节点 @@ -232,86 +223,68 @@ func (t *CreateRepPackage) chooseUploadNode(nodes []UploadNodeInfo) UploadNodeIn return nodes[rand.Intn(len(nodes))] } -func uploadToNode(file io.ReadCloser, nodeIP string, uploadConfig UploadConfig) (string, error) { - // 建立grpc连接,发送请求 - grpcAddr := fmt.Sprintf("%s:%d", nodeIP, uploadConfig.GRPCPort) - grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return "", fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) - } - defer grpcCon.Close() - - client := agentcaller.NewFileTransportClient(grpcCon) - upload, err := mygrpc.SendFileAsStream(client) +func uploadToNode(file io.Reader, nodeIP string) (string, error) { + rpcCli, err := globals.AgentRPCPool.Acquire(nodeIP) if err != nil { - return "", fmt.Errorf("request to send file failed, err: %w", err) + return "", fmt.Errorf("new agent rpc client: %w", err) } + defer rpcCli.Close() - // 发送文件数据 - _, err = io.Copy(upload, file) - if err != nil { - // 发生错误则关闭连接 - upload.Abort(io.ErrClosedPipe) - return "", fmt.Errorf("copy file date to upload stream failed, err: %w", err) - } + return rpcCli.SendIPFSFile(file) +} - // 发送EOF消息,并获得FileHash - fileHash, err := upload.Finish() +func uploadToLocalIPFS(file io.Reader, nodeID int64, shouldPin bool) (string, error) { + ipfsCli, err := globals.IPFSPool.Acquire() if err != nil { - upload.Abort(io.ErrClosedPipe) - return "", fmt.Errorf("send EOF failed, err: %w", err) + return "", fmt.Errorf("new ipfs client: %w", err) } + defer ipfsCli.Close() - return fileHash, nil -} - -func uploadToLocalIPFS(ipfs *ipfs.IPFS, file io.ReadCloser, nodeID int64, shouldPin bool, uploadConfig UploadConfig) (string, error) { // 从本地IPFS上传文件 - writer, err := ipfs.CreateFile() + fileHash, err := ipfsCli.CreateFile(file) if err != nil { - return "", fmt.Errorf("create IPFS file failed, err: %w", err) + return "", fmt.Errorf("creating ipfs file: %w", err) } - _, err = io.Copy(writer, file) - if err != nil { - return "", fmt.Errorf("copy file data to IPFS failed, err: %w", err) + if !shouldPin { + return fileHash, nil } - fileHash, err := writer.Finish() + err = pinIPFSFile(nodeID, fileHash) if err != nil { - return "", fmt.Errorf("finish writing IPFS failed, err: %w", err) + return "", err } - if !shouldPin { - return fileHash, nil - } + return fileHash, nil +} - // 然后让最近节点pin本地上传的文件 - agentClient, err := agtmq.NewClient(nodeID, uploadConfig.MQ) +func pinIPFSFile(nodeID int64, fileHash string) error { + agtCli, err := globals.AgentMQPool.Acquire(nodeID) if err != nil { - return "", fmt.Errorf("create agent client to %d failed, err: %w", nodeID, err) + return fmt.Errorf("new agent client: %w", err) } - defer agentClient.Close() + defer agtCli.Close() - pinObjResp, err := agentClient.StartPinningObject(agtmq.NewStartPinningObject(fileHash)) + // 然后让最近节点pin本地上传的文件 + pinObjResp, err := agtCli.StartPinningObject(agtmq.NewStartPinningObject(fileHash)) if err != nil { - return "", fmt.Errorf("start pinning object: %w", err) + return fmt.Errorf("start pinning object: %w", err) } for { - waitResp, err := agentClient.WaitPinningObject(agtmq.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5)) + waitResp, err := agtCli.WaitPinningObject(agtmq.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5)) if err != nil { - return "", fmt.Errorf("waitting pinning object: %w", err) + return fmt.Errorf("waitting pinning object: %w", err) } if waitResp.IsComplete { if waitResp.Error != "" { - return "", fmt.Errorf("agent pinning object: %s", waitResp.Error) + return fmt.Errorf("agent pinning object: %s", waitResp.Error) } break } } - return fileHash, nil + return nil } diff --git a/pkgs/cmd/download_package.go b/pkgs/cmd/download_package.go index fabcba1..4bd60db 100644 --- a/pkgs/cmd/download_package.go +++ b/pkgs/cmd/download_package.go @@ -5,10 +5,11 @@ import ( "io" "os" "path/filepath" - "time" "gitlink.org.cn/cloudream/common/models" + distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" @@ -20,6 +21,11 @@ type DownloadPackage struct { outputPath string } +type DownloadPackageContext struct { + Distlock *distsvc.Service + ECPacketSize int64 +} + func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage { return &DownloadPackage{ userID: userID, @@ -28,15 +34,14 @@ func NewDownloadPackage(userID int64, packageID int64, outputPath string) *Downl } } -func (t *DownloadPackage) Execute(ctx TaskContext, complete CompleteFn) { - err := t.do(ctx) - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} +func (t *DownloadPackage) Execute(ctx *DownloadPackageContext) error { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() -func (t *DownloadPackage) do(ctx TaskContext) error { - getPkgResp, err := ctx.Coordinator().GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) + getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) if err != nil { return fmt.Errorf("getting package: %w", err) @@ -56,34 +61,43 @@ func (t *DownloadPackage) do(ctx TaskContext) error { return t.writeObject(objIter) } -func (t *DownloadPackage) downloadRep(ctx TaskContext) (iterator.DownloadingObjectIterator, error) { - getObjsResp, err := ctx.Coordinator().GetPackageObjects(coormq.NewGetPackageObjects(t.userID, t.packageID)) +func (t *DownloadPackage) downloadRep(ctx *DownloadPackageContext) (iterator.DownloadingObjectIterator, error) { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(t.userID, t.packageID)) if err != nil { return nil, fmt.Errorf("getting package objects: %w", err) } - getObjRepDataResp, err := ctx.Coordinator().GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(t.packageID)) + getObjRepDataResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(t.packageID)) if err != nil { return nil, fmt.Errorf("getting package object rep data: %w", err) } - iter := iterator.NewRepObjectIterator(getObjsResp.Objects, getObjRepDataResp.Data, ctx.Coordinator(), svc.distlock, myos.DownloadConfig{ - LocalIPFS: svc.ipfs, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, + iter := iterator.NewRepObjectIterator(getObjsResp.Objects, getObjRepDataResp.Data, &iterator.DownloadContext{ + Distlock: ctx.Distlock, }) return iter, nil } -func (t *DownloadPackage) downloadEC(ctx TaskContext, pkg model.Package) (iterator.DownloadingObjectIterator, error) { - getObjsResp, err := ctx.Coordinator().GetPackageObjects(coormq.NewGetPackageObjects(t.userID, t.packageID)) +func (t *DownloadPackage) downloadEC(ctx *DownloadPackageContext, pkg model.Package) (iterator.DownloadingObjectIterator, error) { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + + getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(t.userID, t.packageID)) if err != nil { return nil, fmt.Errorf("getting package objects: %w", err) } - getObjECDataResp, err := ctx.Coordinator().GetPackageObjectECData(coormq.NewGetPackageObjectECData(t.packageID)) + getObjECDataResp, err := coorCli.GetPackageObjectECData(coormq.NewGetPackageObjectECData(t.packageID)) if err != nil { return nil, fmt.Errorf("getting package object ec data: %w", err) } @@ -93,16 +107,16 @@ func (t *DownloadPackage) downloadEC(ctx TaskContext, pkg model.Package) (iterat return nil, fmt.Errorf("get ec redundancy info: %w", err) } - getECResp, err := ctx.Coordinator().GetECConfig(coormq.NewGetECConfig(ecRed.ECName)) + getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecRed.ECName)) if err != nil { return nil, fmt.Errorf("getting ec: %w", err) } - iter := iterator.NewECObjectIterator(getObjsResp.Objects, getObjECDataResp.Data, ctx.Coordinator(), svc.distlock, getECResp.Config, config.Cfg().ECPacketSize, myos.DownloadConfig{ - LocalIPFS: svc.ipfs, - ExternalIP: config.Cfg().ExternalIP, - GRPCPort: config.Cfg().GRPCPort, - MQ: &config.Cfg().RabbitMQ, + iter := iterator.NewECObjectIterator(getObjsResp.Objects, getObjECDataResp.Data, getECResp.Config, &iterator.ECDownloadContext{ + DownloadContext: &iterator.DownloadContext{ + Distlock: ctx.Distlock, + }, + ECPacketSize: ctx.ECPacketSize, }) return iter, nil diff --git a/pkgs/cmd/update_ec_package.go b/pkgs/cmd/update_ec_package.go index 1d41cab..2b6b55f 100644 --- a/pkgs/cmd/update_ec_package.go +++ b/pkgs/cmd/update_ec_package.go @@ -2,51 +2,44 @@ package cmd import ( "fmt" - "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/utils/serder" mysort "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) type UpdateECPackage struct { - userID int64 - packageID int64 - objectIter iterator.UploadingObjectIterator - ecPacketSize int64 - uploadConfig UploadConfig - - Result UpdateECPackageResult + userID int64 + packageID int64 + objectIter iterator.UploadingObjectIterator } type UpdateECPackageResult struct { ObjectResults []ECObjectUploadResult } -func NewUpdateECPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator, ecPacketSize int64, uploadConfig UploadConfig) *UpdateECPackage { +func NewUpdateECPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) *UpdateECPackage { return &UpdateECPackage{ - userID: userID, - packageID: packageID, - objectIter: objIter, - ecPacketSize: ecPacketSize, - uploadConfig: uploadConfig, + userID: userID, + packageID: packageID, + objectIter: objIter, } } -func (t *UpdateECPackage) Execute(ctx TaskContext, complete CompleteFn) { - err := t.do(ctx) - t.objectIter.Close() - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} +func (t *UpdateECPackage) Execute(ctx *UpdateECPackageContext) (*UpdateECPackageResult, error) { + defer t.objectIter.Close() + + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } -func (t *UpdateECPackage) do(ctx TaskContext) error { /* TODO2 reqBlder := reqbuilder.NewBuilder() @@ -78,19 +71,19 @@ func (t *UpdateECPackage) do(ctx TaskContext) error { defer mutex.Unlock() */ - getPkgResp, err := ctx.Coordinator().GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) + getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) if err != nil { - return fmt.Errorf("getting package: %w", err) + return nil, fmt.Errorf("getting package: %w", err) } - getUserNodesResp, err := ctx.Coordinator().GetUserNodes(coormq.NewGetUserNodes(t.userID)) + getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) if err != nil { - return fmt.Errorf("getting user nodes: %w", err) + return nil, fmt.Errorf("getting user nodes: %w", err) } - findCliLocResp, err := ctx.Coordinator().FindClientLocation(coormq.NewFindClientLocation(t.uploadConfig.ExternalIP)) + findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP)) if err != nil { - return fmt.Errorf("finding client location: %w", err) + return nil, fmt.Errorf("finding client location: %w", err) } nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo { @@ -102,12 +95,12 @@ func (t *UpdateECPackage) do(ctx TaskContext) error { var ecRed models.ECRedundancyInfo if err := serder.AnyToAny(getPkgResp.Package.Redundancy.Info, &ecRed); err != nil { - return fmt.Errorf("get ec redundancy info: %w", err) + return nil, fmt.Errorf("get ec redundancy info: %w", err) } - getECResp, err := ctx.Coordinator().GetECConfig(coormq.NewGetECConfig(ecRed.ECName)) + getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecRed.ECName)) if err != nil { - return fmt.Errorf("getting ec: %w", err) + return nil, fmt.Errorf("getting ec: %w", err) } /* @@ -122,13 +115,14 @@ func (t *UpdateECPackage) do(ctx TaskContext) error { defer mutex2.Unlock() */ - rets, err := uploadAndUpdateECPackage(ctx, t.packageID, t.objectIter, nodeInfos, getECResp.Config, t.ecPacketSize, t.uploadConfig) + rets, err := uploadAndUpdateECPackage(ctx, t.packageID, t.objectIter, nodeInfos, getECResp.Config) if err != nil { - return err + return nil, err } - t.Result.ObjectResults = rets - return nil + return &UpdateECPackageResult{ + ObjectResults: rets, + }, nil } // chooseUploadNode 选择一个上传文件的节点 diff --git a/pkgs/cmd/update_rep_package.go b/pkgs/cmd/update_rep_package.go index 1a075b1..df27f7c 100644 --- a/pkgs/cmd/update_rep_package.go +++ b/pkgs/cmd/update_rep_package.go @@ -2,24 +2,21 @@ package cmd import ( "fmt" - "time" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" mysort "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) type UpdateRepPackage struct { - userID int64 - packageID int64 - objectIter iterator.UploadingObjectIterator - uploadConfig UploadConfig - - Result UpdateRepPackageResult + userID int64 + packageID int64 + objectIter iterator.UploadingObjectIterator } type UpdateNodeInfo struct { @@ -31,24 +28,21 @@ type UpdateRepPackageResult struct { ObjectResults []RepObjectUploadResult } -func NewUpdateRepPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator, uploadConfig UploadConfig) *UpdateRepPackage { +func NewUpdateRepPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator) *UpdateRepPackage { return &UpdateRepPackage{ - userID: userID, - packageID: packageID, - objectIter: objectIter, - uploadConfig: uploadConfig, + userID: userID, + packageID: packageID, + objectIter: objectIter, } } -func (t *UpdateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { - err := t.do(ctx) - t.objectIter.Close() - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} +func (t *UpdateRepPackage) Execute(ctx *UpdatePackageContext) (*UpdateRepPackageResult, error) { + defer t.objectIter.Close() -func (t *UpdateRepPackage) do(ctx TaskContext) error { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } /* TODO2 reqBlder := reqbuilder.NewBuilder() @@ -79,14 +73,14 @@ func (t *UpdateRepPackage) do(ctx TaskContext) error { } defer mutex.Unlock() */ - getUserNodesResp, err := ctx.Coordinator().GetUserNodes(coormq.NewGetUserNodes(t.userID)) + getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) if err != nil { - return fmt.Errorf("getting user nodes: %w", err) + return nil, fmt.Errorf("getting user nodes: %w", err) } - findCliLocResp, err := ctx.Coordinator().FindClientLocation(coormq.NewFindClientLocation(t.uploadConfig.ExternalIP)) + findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP)) if err != nil { - return fmt.Errorf("finding client location: %w", err) + return nil, fmt.Errorf("finding client location: %w", err) } nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UpdateNodeInfo { @@ -108,19 +102,20 @@ func (t *UpdateRepPackage) do(ctx TaskContext) error { // 防止上传的副本被清除 mutex2, err := reqbuilder.NewBuilder(). IPFS().CreateAnyRep(uploadNode.Node.NodeID). - MutexLock(ctx.DistLock()) + MutexLock(ctx.Distlock) if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) + return nil, fmt.Errorf("acquire locks failed, err: %w", err) } defer mutex2.Unlock() - rets, err := uploadAndUpdateRepPackage(ctx, t.packageID, t.objectIter, uploadNode.UploadNodeInfo, t.uploadConfig) + rets, err := uploadAndUpdateRepPackage(t.packageID, t.objectIter, uploadNode.UploadNodeInfo) if err != nil { - return err + return nil, err } - t.Result.ObjectResults = rets - return nil + return &UpdateRepPackageResult{ + ObjectResults: rets, + }, nil } // chooseUploadNode 选择一个上传文件的节点 diff --git a/pkgs/grpc/agent/agent.pb.go b/pkgs/grpc/agent/agent.pb.go new file mode 100644 index 0000000..d90d461 --- /dev/null +++ b/pkgs/grpc/agent/agent.pb.go @@ -0,0 +1,345 @@ +// 使用的语法版本 + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.30.0 +// protoc v4.22.3 +// source: pkgs/grpc/agent/agent.proto + +package agent + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type FileDataPacketType int32 + +const ( + FileDataPacketType_Data FileDataPacketType = 0 + FileDataPacketType_EOF FileDataPacketType = 1 +) + +// Enum value maps for FileDataPacketType. +var ( + FileDataPacketType_name = map[int32]string{ + 0: "Data", + 1: "EOF", + } + FileDataPacketType_value = map[string]int32{ + "Data": 0, + "EOF": 1, + } +) + +func (x FileDataPacketType) Enum() *FileDataPacketType { + p := new(FileDataPacketType) + *p = x + return p +} + +func (x FileDataPacketType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (FileDataPacketType) Descriptor() protoreflect.EnumDescriptor { + return file_pkgs_grpc_agent_agent_proto_enumTypes[0].Descriptor() +} + +func (FileDataPacketType) Type() protoreflect.EnumType { + return &file_pkgs_grpc_agent_agent_proto_enumTypes[0] +} + +func (x FileDataPacketType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use FileDataPacketType.Descriptor instead. +func (FileDataPacketType) EnumDescriptor() ([]byte, []int) { + return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{0} +} + +// 文件数据。注意:只在Type为Data的时候,Data字段才能有数据 +type FileDataPacket struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Type FileDataPacketType `protobuf:"varint,1,opt,name=Type,proto3,enum=FileDataPacketType" json:"Type,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"` +} + +func (x *FileDataPacket) Reset() { + *x = FileDataPacket{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FileDataPacket) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FileDataPacket) ProtoMessage() {} + +func (x *FileDataPacket) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FileDataPacket.ProtoReflect.Descriptor instead. +func (*FileDataPacket) Descriptor() ([]byte, []int) { + return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{0} +} + +func (x *FileDataPacket) GetType() FileDataPacketType { + if x != nil { + return x.Type + } + return FileDataPacketType_Data +} + +func (x *FileDataPacket) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +type SendIPFSFileResp struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + FileHash string `protobuf:"bytes,1,opt,name=FileHash,proto3" json:"FileHash,omitempty"` +} + +func (x *SendIPFSFileResp) Reset() { + *x = SendIPFSFileResp{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SendIPFSFileResp) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SendIPFSFileResp) ProtoMessage() {} + +func (x *SendIPFSFileResp) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SendIPFSFileResp.ProtoReflect.Descriptor instead. +func (*SendIPFSFileResp) Descriptor() ([]byte, []int) { + return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{1} +} + +func (x *SendIPFSFileResp) GetFileHash() string { + if x != nil { + return x.FileHash + } + return "" +} + +type GetIPFSFileReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + FileHash string `protobuf:"bytes,1,opt,name=FileHash,proto3" json:"FileHash,omitempty"` +} + +func (x *GetIPFSFileReq) Reset() { + *x = GetIPFSFileReq{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetIPFSFileReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetIPFSFileReq) ProtoMessage() {} + +func (x *GetIPFSFileReq) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetIPFSFileReq.ProtoReflect.Descriptor instead. +func (*GetIPFSFileReq) Descriptor() ([]byte, []int) { + return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{2} +} + +func (x *GetIPFSFileReq) GetFileHash() string { + if x != nil { + return x.FileHash + } + return "" +} + +var File_pkgs_grpc_agent_agent_proto protoreflect.FileDescriptor + +var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{ + 0x0a, 0x1b, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x61, 0x67, 0x65, 0x6e, + 0x74, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4d, 0x0a, + 0x0e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, + 0x27, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x13, 0x2e, + 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, + 0x70, 0x65, 0x52, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x2e, 0x0a, 0x10, + 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x12, 0x1a, 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x22, 0x2c, 0x0a, 0x0e, + 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x12, 0x1a, + 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x2a, 0x27, 0x0a, 0x12, 0x46, 0x69, + 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, + 0x12, 0x08, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, + 0x46, 0x10, 0x01, 0x32, 0x74, 0x0a, 0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x36, 0x0a, 0x0c, + 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x46, + 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x11, 0x2e, + 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x22, 0x00, 0x28, 0x01, 0x12, 0x33, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, + 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, + 0x65, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, + 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x61, + 0x67, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_pkgs_grpc_agent_agent_proto_rawDescOnce sync.Once + file_pkgs_grpc_agent_agent_proto_rawDescData = file_pkgs_grpc_agent_agent_proto_rawDesc +) + +func file_pkgs_grpc_agent_agent_proto_rawDescGZIP() []byte { + file_pkgs_grpc_agent_agent_proto_rawDescOnce.Do(func() { + file_pkgs_grpc_agent_agent_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkgs_grpc_agent_agent_proto_rawDescData) + }) + return file_pkgs_grpc_agent_agent_proto_rawDescData +} + +var file_pkgs_grpc_agent_agent_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_pkgs_grpc_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_pkgs_grpc_agent_agent_proto_goTypes = []interface{}{ + (FileDataPacketType)(0), // 0: FileDataPacketType + (*FileDataPacket)(nil), // 1: FileDataPacket + (*SendIPFSFileResp)(nil), // 2: SendIPFSFileResp + (*GetIPFSFileReq)(nil), // 3: GetIPFSFileReq +} +var file_pkgs_grpc_agent_agent_proto_depIdxs = []int32{ + 0, // 0: FileDataPacket.Type:type_name -> FileDataPacketType + 1, // 1: Agent.SendIPFSFile:input_type -> FileDataPacket + 3, // 2: Agent.GetIPFSFile:input_type -> GetIPFSFileReq + 2, // 3: Agent.SendIPFSFile:output_type -> SendIPFSFileResp + 1, // 4: Agent.GetIPFSFile:output_type -> FileDataPacket + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_pkgs_grpc_agent_agent_proto_init() } +func file_pkgs_grpc_agent_agent_proto_init() { + if File_pkgs_grpc_agent_agent_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pkgs_grpc_agent_agent_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FileDataPacket); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkgs_grpc_agent_agent_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SendIPFSFileResp); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkgs_grpc_agent_agent_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetIPFSFileReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pkgs_grpc_agent_agent_proto_rawDesc, + NumEnums: 1, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pkgs_grpc_agent_agent_proto_goTypes, + DependencyIndexes: file_pkgs_grpc_agent_agent_proto_depIdxs, + EnumInfos: file_pkgs_grpc_agent_agent_proto_enumTypes, + MessageInfos: file_pkgs_grpc_agent_agent_proto_msgTypes, + }.Build() + File_pkgs_grpc_agent_agent_proto = out.File + file_pkgs_grpc_agent_agent_proto_rawDesc = nil + file_pkgs_grpc_agent_agent_proto_goTypes = nil + file_pkgs_grpc_agent_agent_proto_depIdxs = nil +} diff --git a/pkgs/proto/file_transport.proto b/pkgs/grpc/agent/agent.proto similarity index 57% rename from pkgs/proto/file_transport.proto rename to pkgs/grpc/agent/agent.proto index ee7a319..d8f53b4 100644 --- a/pkgs/proto/file_transport.proto +++ b/pkgs/grpc/agent/agent.proto @@ -2,7 +2,7 @@ syntax = "proto3"; // 生成的go文件包 -option go_package = "../proto;proto";//grpc这里生效了 +option go_package = ".;agent";//grpc这里生效了 enum FileDataPacketType { @@ -15,16 +15,16 @@ message FileDataPacket { bytes Data = 2; } -message SendResp { +message SendIPFSFileResp { string FileHash = 1; } -message GetReq { +message GetIPFSFileReq { string FileHash = 1; } -service FileTransport { - rpc SendFile(stream FileDataPacket)returns(SendResp){} - rpc GetFile(GetReq)returns(stream FileDataPacket){} +service Agent { + rpc SendIPFSFile(stream FileDataPacket)returns(SendIPFSFileResp){} + rpc GetIPFSFile(GetIPFSFileReq)returns(stream FileDataPacket){} } diff --git a/pkgs/grpc/agent/agent_grpc.pb.go b/pkgs/grpc/agent/agent_grpc.pb.go new file mode 100644 index 0000000..d32adde --- /dev/null +++ b/pkgs/grpc/agent/agent_grpc.pb.go @@ -0,0 +1,209 @@ +// 使用的语法版本 + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.22.3 +// source: pkgs/grpc/agent/agent.proto + +package agent + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + Agent_SendIPFSFile_FullMethodName = "/Agent/SendIPFSFile" + Agent_GetIPFSFile_FullMethodName = "/Agent/GetIPFSFile" +) + +// AgentClient is the client API for Agent service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type AgentClient interface { + SendIPFSFile(ctx context.Context, opts ...grpc.CallOption) (Agent_SendIPFSFileClient, error) + GetIPFSFile(ctx context.Context, in *GetIPFSFileReq, opts ...grpc.CallOption) (Agent_GetIPFSFileClient, error) +} + +type agentClient struct { + cc grpc.ClientConnInterface +} + +func NewAgentClient(cc grpc.ClientConnInterface) AgentClient { + return &agentClient{cc} +} + +func (c *agentClient) SendIPFSFile(ctx context.Context, opts ...grpc.CallOption) (Agent_SendIPFSFileClient, error) { + stream, err := c.cc.NewStream(ctx, &Agent_ServiceDesc.Streams[0], Agent_SendIPFSFile_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &agentSendIPFSFileClient{stream} + return x, nil +} + +type Agent_SendIPFSFileClient interface { + Send(*FileDataPacket) error + CloseAndRecv() (*SendIPFSFileResp, error) + grpc.ClientStream +} + +type agentSendIPFSFileClient struct { + grpc.ClientStream +} + +func (x *agentSendIPFSFileClient) Send(m *FileDataPacket) error { + return x.ClientStream.SendMsg(m) +} + +func (x *agentSendIPFSFileClient) CloseAndRecv() (*SendIPFSFileResp, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(SendIPFSFileResp) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *agentClient) GetIPFSFile(ctx context.Context, in *GetIPFSFileReq, opts ...grpc.CallOption) (Agent_GetIPFSFileClient, error) { + stream, err := c.cc.NewStream(ctx, &Agent_ServiceDesc.Streams[1], Agent_GetIPFSFile_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &agentGetIPFSFileClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Agent_GetIPFSFileClient interface { + Recv() (*FileDataPacket, error) + grpc.ClientStream +} + +type agentGetIPFSFileClient struct { + grpc.ClientStream +} + +func (x *agentGetIPFSFileClient) Recv() (*FileDataPacket, error) { + m := new(FileDataPacket) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// AgentServer is the server API for Agent service. +// All implementations must embed UnimplementedAgentServer +// for forward compatibility +type AgentServer interface { + SendIPFSFile(Agent_SendIPFSFileServer) error + GetIPFSFile(*GetIPFSFileReq, Agent_GetIPFSFileServer) error + mustEmbedUnimplementedAgentServer() +} + +// UnimplementedAgentServer must be embedded to have forward compatible implementations. +type UnimplementedAgentServer struct { +} + +func (UnimplementedAgentServer) SendIPFSFile(Agent_SendIPFSFileServer) error { + return status.Errorf(codes.Unimplemented, "method SendIPFSFile not implemented") +} +func (UnimplementedAgentServer) GetIPFSFile(*GetIPFSFileReq, Agent_GetIPFSFileServer) error { + return status.Errorf(codes.Unimplemented, "method GetIPFSFile not implemented") +} +func (UnimplementedAgentServer) mustEmbedUnimplementedAgentServer() {} + +// UnsafeAgentServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to AgentServer will +// result in compilation errors. +type UnsafeAgentServer interface { + mustEmbedUnimplementedAgentServer() +} + +func RegisterAgentServer(s grpc.ServiceRegistrar, srv AgentServer) { + s.RegisterService(&Agent_ServiceDesc, srv) +} + +func _Agent_SendIPFSFile_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(AgentServer).SendIPFSFile(&agentSendIPFSFileServer{stream}) +} + +type Agent_SendIPFSFileServer interface { + SendAndClose(*SendIPFSFileResp) error + Recv() (*FileDataPacket, error) + grpc.ServerStream +} + +type agentSendIPFSFileServer struct { + grpc.ServerStream +} + +func (x *agentSendIPFSFileServer) SendAndClose(m *SendIPFSFileResp) error { + return x.ServerStream.SendMsg(m) +} + +func (x *agentSendIPFSFileServer) Recv() (*FileDataPacket, error) { + m := new(FileDataPacket) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Agent_GetIPFSFile_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(GetIPFSFileReq) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(AgentServer).GetIPFSFile(m, &agentGetIPFSFileServer{stream}) +} + +type Agent_GetIPFSFileServer interface { + Send(*FileDataPacket) error + grpc.ServerStream +} + +type agentGetIPFSFileServer struct { + grpc.ServerStream +} + +func (x *agentGetIPFSFileServer) Send(m *FileDataPacket) error { + return x.ServerStream.SendMsg(m) +} + +// Agent_ServiceDesc is the grpc.ServiceDesc for Agent service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Agent_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "Agent", + HandlerType: (*AgentServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "SendIPFSFile", + Handler: _Agent_SendIPFSFile_Handler, + ClientStreams: true, + }, + { + StreamName: "GetIPFSFile", + Handler: _Agent_GetIPFSFile_Handler, + ServerStreams: true, + }, + }, + Metadata: "pkgs/grpc/agent/agent.proto", +} diff --git a/pkgs/grpc/agent/client.go b/pkgs/grpc/agent/client.go new file mode 100644 index 0000000..e23a6ee --- /dev/null +++ b/pkgs/grpc/agent/client.go @@ -0,0 +1,131 @@ +package agent + +import ( + "context" + "fmt" + "io" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type Client struct { + con *grpc.ClientConn + cli AgentClient +} + +func NewClient(addr string) (*Client, error) { + con, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, err + } + + return &Client{ + con: con, + cli: NewAgentClient(con), + }, nil +} + +func (c *Client) SendIPFSFile(file io.Reader) (string, error) { + sendCli, err := c.cli.SendIPFSFile(context.Background()) + if err != nil { + return "", err + } + + buf := make([]byte, 4096) + for { + rd, err := file.Read(buf) + if err == io.EOF { + err := sendCli.Send(&FileDataPacket{ + Type: FileDataPacketType_EOF, + Data: buf[:rd], + }) + if err != nil { + return "", fmt.Errorf("sending EOF packet: %w", err) + } + + resp, err := sendCli.CloseAndRecv() + if err != nil { + return "", fmt.Errorf("receiving response: %w", err) + } + + return resp.FileHash, nil + } + + if err != nil { + return "", fmt.Errorf("reading file data: %w", err) + } + + err = sendCli.Send(&FileDataPacket{ + Type: FileDataPacketType_Data, + Data: buf[:rd], + }) + if err != nil { + return "", fmt.Errorf("sending data packet: %w", err) + } + } +} + +type fileReadCloser struct { + io.ReadCloser + stream Agent_GetIPFSFileClient + cancelFn context.CancelFunc + readingData []byte + recvEOF bool +} + +func (s *fileReadCloser) Read(p []byte) (int, error) { + if len(s.readingData) == 0 && !s.recvEOF { + resp, err := s.stream.Recv() + if err != nil { + return 0, err + } + + if resp.Type == FileDataPacketType_Data { + s.readingData = resp.Data + + } else if resp.Type == FileDataPacketType_EOF { + s.readingData = resp.Data + s.recvEOF = true + + } else { + return 0, fmt.Errorf("unsupported packt type: %v", resp.Type) + } + } + + cnt := copy(p, s.readingData) + s.readingData = s.readingData[cnt:] + + if len(s.readingData) == 0 && s.recvEOF { + return cnt, io.EOF + } + + return cnt, nil +} + +func (s *fileReadCloser) Close() error { + s.cancelFn() + + return nil +} + +func (c *Client) GetIPFSFile(fileHash string) (io.ReadCloser, error) { + ctx, cancel := context.WithCancel(context.Background()) + + stream, err := c.cli.GetIPFSFile(ctx, &GetIPFSFileReq{ + FileHash: fileHash, + }) + if err != nil { + cancel() + return nil, fmt.Errorf("request grpc failed, err: %w", err) + } + + return &fileReadCloser{ + stream: stream, + cancelFn: cancel, + }, nil +} + +func (c *Client) Close() { + c.con.Close() +} diff --git a/pkgs/grpc/agent/pool.go b/pkgs/grpc/agent/pool.go new file mode 100644 index 0000000..342fb93 --- /dev/null +++ b/pkgs/grpc/agent/pool.go @@ -0,0 +1,43 @@ +package agent + +import ( + "fmt" +) + +type PoolConfig struct { + Port int `json:"port"` +} + +type PoolClient struct { + *Client + owner *Pool +} + +func (c *PoolClient) Close() { + c.owner.Release(c) +} + +type Pool struct { + grpcCfg *PoolConfig +} + +func NewPool(grpcCfg *PoolConfig) *Pool { + return &Pool{ + grpcCfg: grpcCfg, + } +} +func (p *Pool) Acquire(ip string) (*PoolClient, error) { + cli, err := NewClient(fmt.Sprintf("%s:%d", ip, p.grpcCfg.Port)) + if err != nil { + return nil, err + } + + return &PoolClient{ + Client: cli, + owner: p, + }, nil +} + +func (p *Pool) Release(cli *PoolClient) { + cli.Close() +} diff --git a/pkgs/grpc/config.go b/pkgs/grpc/config.go new file mode 100644 index 0000000..f3dfb54 --- /dev/null +++ b/pkgs/grpc/config.go @@ -0,0 +1,12 @@ +package grpc + +import "fmt" + +type Config struct { + IP string `json:"ip"` + Port int `json:"port"` +} + +func (c *Config) MakeListenAddress() string { + return fmt.Sprintf("%s:%d", c.IP, c.Port) +} diff --git a/pkgs/iterator/ec_object_iterator.go b/pkgs/iterator/ec_object_iterator.go index b6a39f0..3b489e4 100644 --- a/pkgs/iterator/ec_object_iterator.go +++ b/pkgs/iterator/ec_object_iterator.go @@ -7,19 +7,12 @@ import ( "os" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" - distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/models" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/ec" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - - myio "gitlink.org.cn/cloudream/common/utils/io" - agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto" - mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc" ) type ECObjectIterator struct { @@ -28,31 +21,36 @@ type ECObjectIterator struct { currentIndex int inited bool - coorCli *coormq.Client - distlock *distsvc.Service - ec model.Ec - ecPacketSize int64 - downloadConfig DownloadConfig - cliLocation model.Location + ec model.Ec + downloadCtx *ECDownloadContext + cliLocation model.Location +} + +type ECDownloadContext struct { + *DownloadContext + ECPacketSize int64 } -func NewECObjectIterator(objects []model.Object, objectECData []models.ObjectECData, coorCli *coormq.Client, distlock *distsvc.Service, ec model.Ec, ecPacketSize int64, downloadConfig DownloadConfig) *ECObjectIterator { +func NewECObjectIterator(objects []model.Object, objectECData []models.ObjectECData, ec model.Ec, downloadCtx *ECDownloadContext) *ECObjectIterator { return &ECObjectIterator{ - objects: objects, - objectECData: objectECData, - coorCli: coorCli, - distlock: distlock, - ec: ec, - ecPacketSize: ecPacketSize, - downloadConfig: downloadConfig, + objects: objects, + objectECData: objectECData, + ec: ec, + downloadCtx: downloadCtx, } } func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + if !i.inited { i.inited = true - findCliLocResp, err := i.coorCli.FindClientLocation(coormq.NewFindClientLocation(i.downloadConfig.ExternalIP)) + findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP)) if err != nil { return nil, fmt.Errorf("finding client location: %w", err) } @@ -63,12 +61,12 @@ func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) { return nil, ErrNoMoreItem } - item, err := i.doMove() + item, err := i.doMove(coorCli) i.currentIndex++ return item, err } -func (iter *ECObjectIterator) doMove() (*IterDownloadingObject, error) { +func (iter *ECObjectIterator) doMove(coorCli *coormq.PoolClient) (*IterDownloadingObject, error) { obj := iter.objects[iter.currentIndex] ecData := iter.objectECData[iter.currentIndex] @@ -82,7 +80,7 @@ func (iter *ECObjectIterator) doMove() (*IterDownloadingObject, error) { for i := 0; i < ecK; i++ { hashs[i] = blocks[i].FileHash - getNodesResp, err := iter.coorCli.GetNodes(coormq.NewGetNodes(blocks[i].NodeIDs)) + getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(blocks[i].NodeIDs)) if err != nil { return nil, fmt.Errorf("getting nodes: %w", err) } @@ -140,25 +138,10 @@ func (i *ECObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) Downlo return entries[rand.Intn(len(entries))] } -func (i *ECObjectIterator) downloadObject(nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) { - if i.downloadConfig.LocalIPFS != nil { - logger.Infof("try to use local IPFS to download file") - - reader, err := i.downloadFromLocalIPFS(fileHash) - if err == nil { - return reader, nil - } - - logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error()) - } - - return i.downloadFromNode(nodeID, nodeIP, fileHash) -} - func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int, blockIDs []int, nodeIDs []int64, nodeIPs []string, hashs []string) (io.ReadCloser, error) { // TODO zkx 先试用同步方式实现逻辑,做好错误处理。同时也方便下面直接使用uploadToNode和uploadToLocalIPFS来优化代码结构 //wg := sync.WaitGroup{} - numPacket := (fileSize + int64(ecK)*iter.ecPacketSize - 1) / (int64(ecK) * iter.ecPacketSize) + numPacket := (fileSize + int64(ecK)*iter.downloadCtx.ECPacketSize - 1) / (int64(ecK) * iter.downloadCtx.ECPacketSize) getBufs := make([]chan []byte, ecN) decodeBufs := make([]chan []byte, ecK) for i := 0; i < ecN; i++ { @@ -167,8 +150,19 @@ func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int, for i := 0; i < ecK; i++ { decodeBufs[i] = make(chan []byte) } - for i := 0; i < len(blockIDs); i++ { - go iter.get(hashs[i], nodeIPs[i], getBufs[blockIDs[i]], numPacket) + for idx := 0; idx < len(blockIDs); idx++ { + i := idx + go func() { + // TODO 处理错误 + file, _ := downloadFile(iter.downloadCtx.DownloadContext, nodeIDs[i], nodeIPs[i], hashs[i]) + + for p := int64(0); p < numPacket; p++ { + buf := make([]byte, iter.downloadCtx.ECPacketSize) + // TODO 处理错误 + io.ReadFull(file, buf) + getBufs[blockIDs[i]] <- buf + } + }() } print(numPacket) go decode(getBufs[:], decodeBufs[:], blockIDs, ecK, numPacket) @@ -189,74 +183,6 @@ func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int, return r, nil } -func (iter *ECObjectIterator) get(fileHash string, nodeIP string, getBuf chan []byte, numPacket int64) error { - downloadFromAgent := false - //使用本地IPFS获取 - if iter.downloadConfig.LocalIPFS != nil { - logger.Infof("try to use local IPFS to download file") - //获取IPFS的reader - reader, err := iter.downloadFromLocalIPFS(fileHash) - if err != nil { - downloadFromAgent = true - fmt.Errorf("read ipfs block failed, err: %w", err) - } - defer reader.Close() - for i := 0; int64(i) < numPacket; i++ { - buf := make([]byte, iter.ecPacketSize) - _, err := io.ReadFull(reader, buf) - if err != nil { - downloadFromAgent = true - fmt.Errorf("read file falied, err:%w", err) - } - getBuf <- buf - } - if downloadFromAgent == false { - close(getBuf) - return nil - } - } else { - downloadFromAgent = true - } - //从agent获取 - if downloadFromAgent == true { - /*// 二次获取锁 - mutex, err := reqbuilder.NewBuilder(). - // 用于从IPFS下载文件 - IPFS().ReadOneRep(nodeID, fileHash). - MutexLock(svc.distlock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - */ - // 连接grpc - grpcAddr := fmt.Sprintf("%s:%d", nodeIP, iter.downloadConfig.GRPCPort) - conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) - } - // 下载文件 - client := agentcaller.NewFileTransportClient(conn) - reader, err := mygrpc.GetFileAsStream(client, fileHash) - if err != nil { - conn.Close() - return fmt.Errorf("request to get file failed, err: %w", err) - } - for index := 0; int64(index) < numPacket; index++ { - buf := make([]byte, iter.ecPacketSize) - _, _ = reader.Read(buf) - fmt.Println(buf) - fmt.Println(numPacket, "\n") - getBuf <- buf - } - close(getBuf) - reader.Close() - return nil - } - return nil - -} - func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) { fmt.Println("decode ") var tmpIn [][]byte @@ -296,44 +222,3 @@ func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int close(outBufs[i]) } } - -func (i *ECObjectIterator) downloadFromNode(nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) { - // 二次获取锁 - mutex, err := reqbuilder.NewBuilder(). - // 用于从IPFS下载文件 - IPFS().ReadOneRep(nodeID, fileHash). - MutexLock(i.distlock) - if err != nil { - return nil, fmt.Errorf("acquire locks failed, err: %w", err) - } - - // 连接grpc - grpcAddr := fmt.Sprintf("%s:%d", nodeIP, i.downloadConfig.GRPCPort) - conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil, fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) - } - - // 下载文件 - client := agentcaller.NewFileTransportClient(conn) - reader, err := mygrpc.GetFileAsStream(client, fileHash) - if err != nil { - conn.Close() - return nil, fmt.Errorf("request to get file failed, err: %w", err) - } - - reader = myio.AfterReadClosed(reader, func(io.ReadCloser) { - conn.Close() - mutex.Unlock() - }) - return reader, nil -} - -func (i *ECObjectIterator) downloadFromLocalIPFS(fileHash string) (io.ReadCloser, error) { - reader, err := i.downloadConfig.LocalIPFS.OpenRead(fileHash) - if err != nil { - return nil, fmt.Errorf("read ipfs file failed, err: %w", err) - } - - return reader, nil -} diff --git a/pkgs/iterator/rep_object_iterator.go b/pkgs/iterator/rep_object_iterator.go index f887938..c316ee0 100644 --- a/pkgs/iterator/rep_object_iterator.go +++ b/pkgs/iterator/rep_object_iterator.go @@ -9,17 +9,11 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/utils/ipfs" + myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/models" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" - mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - - myio "gitlink.org.cn/cloudream/common/utils/io" - agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto" - mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc" ) type DownloadingObjectIterator = Iterator[*IterDownloadingObject] @@ -30,10 +24,8 @@ type RepObjectIterator struct { currentIndex int inited bool - coorCli *coormq.Client - distlock *distsvc.Service - downloadConfig DownloadConfig - cliLocation model.Location + downloadCtx *DownloadContext + cliLocation model.Location } type IterDownloadingObject struct { @@ -46,29 +38,29 @@ type DownloadNodeInfo struct { IsSameLocation bool } -type DownloadConfig struct { - LocalIPFS *ipfs.IPFS - LocalNodeID *int64 - ExternalIP string - GRPCPort int - MQ *mymq.Config +type DownloadContext struct { + Distlock *distsvc.Service } -func NewRepObjectIterator(objects []model.Object, objectRepData []models.ObjectRepData, coorCli *coormq.Client, distlock *distsvc.Service, downloadConfig DownloadConfig) *RepObjectIterator { +func NewRepObjectIterator(objects []model.Object, objectRepData []models.ObjectRepData, downloadCtx *DownloadContext) *RepObjectIterator { return &RepObjectIterator{ - objects: objects, - objectRepData: objectRepData, - coorCli: coorCli, - distlock: distlock, - downloadConfig: downloadConfig, + objects: objects, + objectRepData: objectRepData, + downloadCtx: downloadCtx, } } func (i *RepObjectIterator) MoveNext() (*IterDownloadingObject, error) { + coorCli, err := globals.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer coorCli.Close() + if !i.inited { i.inited = true - findCliLocResp, err := i.coorCli.FindClientLocation(coormq.NewFindClientLocation(i.downloadConfig.ExternalIP)) + findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP)) if err != nil { return nil, fmt.Errorf("finding client location: %w", err) } @@ -79,18 +71,18 @@ func (i *RepObjectIterator) MoveNext() (*IterDownloadingObject, error) { return nil, ErrNoMoreItem } - item, err := i.doMove() + item, err := i.doMove(coorCli) i.currentIndex++ return item, err } -func (i *RepObjectIterator) doMove() (*IterDownloadingObject, error) { +func (i *RepObjectIterator) doMove(coorCli *coormq.PoolClient) (*IterDownloadingObject, error) { repData := i.objectRepData[i.currentIndex] if len(repData.NodeIDs) == 0 { return nil, fmt.Errorf("no node has this file %s", repData.FileHash) } - getNodesResp, err := i.coorCli.GetNodes(coormq.NewGetNodes(repData.NodeIDs)) + getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(repData.NodeIDs)) if err != nil { return nil, fmt.Errorf("getting nodes: %w", err) } @@ -113,7 +105,7 @@ func (i *RepObjectIterator) doMove() (*IterDownloadingObject, error) { logger.Infof("client and node %d are at the same location, use local ip\n", downloadNode.Node.NodeID) } - reader, err := i.downloadObject(downloadNode.Node.NodeID, nodeIP, repData.FileHash) + reader, err := downloadFile(i.downloadCtx, downloadNode.Node.NodeID, nodeIP, repData.FileHash) if err != nil { return nil, fmt.Errorf("rep read failed, err: %w", err) } @@ -139,11 +131,11 @@ func (i *RepObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) Downl return entries[rand.Intn(len(entries))] } -func (i *RepObjectIterator) downloadObject(nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) { - if i.downloadConfig.LocalIPFS != nil { +func downloadFile(ctx *DownloadContext, nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) { + if globals.IPFSPool != nil { logger.Infof("try to use local IPFS to download file") - reader, err := i.downloadFromLocalIPFS(fileHash) + reader, err := downloadFromLocalIPFS(fileHash) if err == nil { return reader, nil } @@ -151,43 +143,43 @@ func (i *RepObjectIterator) downloadObject(nodeID int64, nodeIP string, fileHash logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error()) } - return i.downloadFromNode(nodeID, nodeIP, fileHash) + return downloadFromNode(ctx, nodeID, nodeIP, fileHash) } -func (i *RepObjectIterator) downloadFromNode(nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) { +func downloadFromNode(ctx *DownloadContext, nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) { // 二次获取锁 mutex, err := reqbuilder.NewBuilder(). // 用于从IPFS下载文件 IPFS().ReadOneRep(nodeID, fileHash). - MutexLock(i.distlock) + MutexLock(ctx.Distlock) if err != nil { return nil, fmt.Errorf("acquire locks failed, err: %w", err) } // 连接grpc - grpcAddr := fmt.Sprintf("%s:%d", nodeIP, i.downloadConfig.GRPCPort) - conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + agtCli, err := globals.AgentRPCPool.Acquire(nodeIP) if err != nil { - return nil, fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) + return nil, fmt.Errorf("new agent grpc client: %w", err) } - // 下载文件 - client := agentcaller.NewFileTransportClient(conn) - reader, err := mygrpc.GetFileAsStream(client, fileHash) + reader, err := agtCli.GetIPFSFile(fileHash) if err != nil { - conn.Close() - return nil, fmt.Errorf("request to get file failed, err: %w", err) + return nil, fmt.Errorf("getting ipfs file: %w", err) } reader = myio.AfterReadClosed(reader, func(io.ReadCloser) { - conn.Close() mutex.Unlock() }) return reader, nil } -func (i *RepObjectIterator) downloadFromLocalIPFS(fileHash string) (io.ReadCloser, error) { - reader, err := i.downloadConfig.LocalIPFS.OpenRead(fileHash) +func downloadFromLocalIPFS(fileHash string) (io.ReadCloser, error) { + ipfsCli, err := globals.IPFSPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new ipfs client: %w", err) + } + + reader, err := ipfsCli.OpenRead(fileHash) if err != nil { return nil, fmt.Errorf("read ipfs file failed, err: %w", err) } diff --git a/pkgs/mq/agent/client.go b/pkgs/mq/agent/client.go index fd78214..04edb1f 100644 --- a/pkgs/mq/agent/client.go +++ b/pkgs/mq/agent/client.go @@ -2,7 +2,7 @@ package agent import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" + stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" ) type Client struct { @@ -10,8 +10,8 @@ type Client struct { id int64 } -func NewClient(id int64, cfg *mymq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), mymq.MakeAgentQueueName(id), "") +func NewClient(id int64, cfg *stgmq.Config) (*Client, error) { + rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), stgmq.MakeAgentQueueName(id), "") if err != nil { return nil, err } @@ -25,3 +25,37 @@ func NewClient(id int64, cfg *mymq.Config) (*Client, error) { func (c *Client) Close() { c.rabbitCli.Close() } + +type PoolClient struct { + *Client + owner *Pool +} + +func (c *PoolClient) Close() { + c.owner.Release(c) +} + +type Pool struct { + mqcfg *stgmq.Config +} + +func NewPool(mqcfg *stgmq.Config) *Pool { + return &Pool{ + mqcfg: mqcfg, + } +} +func (p *Pool) Acquire(id int64) (*PoolClient, error) { + cli, err := NewClient(id, p.mqcfg) + if err != nil { + return nil, err + } + + return &PoolClient{ + Client: cli, + owner: p, + }, nil +} + +func (p *Pool) Release(cli *PoolClient) { + cli.Client.Close() +} diff --git a/pkgs/mq/coordinator/client.go b/pkgs/mq/coordinator/client.go index a3ccca2..353d3af 100644 --- a/pkgs/mq/coordinator/client.go +++ b/pkgs/mq/coordinator/client.go @@ -2,15 +2,15 @@ package coordinator import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" + stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" ) type Client struct { rabbitCli *mq.RabbitMQClient } -func NewClient(cfg *mymq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), mymq.COORDINATOR_QUEUE_NAME, "") +func NewClient(cfg *stgmq.Config) (*Client, error) { + rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), stgmq.COORDINATOR_QUEUE_NAME, "") if err != nil { return nil, err } @@ -23,3 +23,37 @@ func NewClient(cfg *mymq.Config) (*Client, error) { func (c *Client) Close() { c.rabbitCli.Close() } + +type PoolClient struct { + *Client + owner *Pool +} + +func (c *PoolClient) Close() { + c.owner.Release(c) +} + +type Pool struct { + mqcfg *stgmq.Config +} + +func NewPool(mqcfg *stgmq.Config) *Pool { + return &Pool{ + mqcfg: mqcfg, + } +} +func (p *Pool) Acquire() (*PoolClient, error) { + cli, err := NewClient(p.mqcfg) + if err != nil { + return nil, err + } + + return &PoolClient{ + Client: cli, + owner: p, + }, nil +} + +func (p *Pool) Release(cli *PoolClient) { + cli.Client.Close() +} diff --git a/pkgs/mq/scanner/client.go b/pkgs/mq/scanner/client.go index da5fcdd..b87f567 100644 --- a/pkgs/mq/scanner/client.go +++ b/pkgs/mq/scanner/client.go @@ -2,15 +2,15 @@ package scanner import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" + stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" ) type Client struct { rabbitCli *mq.RabbitMQClient } -func NewClient(cfg *mymq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), mymq.SCANNER_QUEUE_NAME, "") +func NewClient(cfg *stgmq.Config) (*Client, error) { + rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), stgmq.SCANNER_QUEUE_NAME, "") if err != nil { return nil, err } @@ -23,3 +23,37 @@ func NewClient(cfg *mymq.Config) (*Client, error) { func (c *Client) Close() { c.rabbitCli.Close() } + +type PoolClient struct { + *Client + owner *Pool +} + +func (c *PoolClient) Close() { + c.owner.Release(c) +} + +type Pool struct { + mqcfg *stgmq.Config +} + +func NewPool(mqcfg *stgmq.Config) *Pool { + return &Pool{ + mqcfg: mqcfg, + } +} +func (p *Pool) Acquire() (*PoolClient, error) { + cli, err := NewClient(p.mqcfg) + if err != nil { + return nil, err + } + + return &PoolClient{ + Client: cli, + owner: p, + }, nil +} + +func (p *Pool) Release(cli *PoolClient) { + cli.Client.Close() +} diff --git a/pkgs/proto/Makefile b/pkgs/proto/Makefile deleted file mode 100644 index 3c43934..0000000 --- a/pkgs/proto/Makefile +++ /dev/null @@ -1,2 +0,0 @@ -protoc: - protoc --go_out=. --go-grpc_out=. .\file_transport.proto \ No newline at end of file diff --git a/pkgs/proto/file_transport.pb.go b/pkgs/proto/file_transport.pb.go deleted file mode 100644 index 8c7ffa0..0000000 --- a/pkgs/proto/file_transport.pb.go +++ /dev/null @@ -1,343 +0,0 @@ -// 使用的语法版本 - -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.30.0 -// protoc v4.22.3 -// source: file_transport.proto - -package proto - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type FileDataPacketType int32 - -const ( - FileDataPacketType_Data FileDataPacketType = 0 - FileDataPacketType_EOF FileDataPacketType = 1 -) - -// Enum value maps for FileDataPacketType. -var ( - FileDataPacketType_name = map[int32]string{ - 0: "Data", - 1: "EOF", - } - FileDataPacketType_value = map[string]int32{ - "Data": 0, - "EOF": 1, - } -) - -func (x FileDataPacketType) Enum() *FileDataPacketType { - p := new(FileDataPacketType) - *p = x - return p -} - -func (x FileDataPacketType) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (FileDataPacketType) Descriptor() protoreflect.EnumDescriptor { - return file_file_transport_proto_enumTypes[0].Descriptor() -} - -func (FileDataPacketType) Type() protoreflect.EnumType { - return &file_file_transport_proto_enumTypes[0] -} - -func (x FileDataPacketType) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use FileDataPacketType.Descriptor instead. -func (FileDataPacketType) EnumDescriptor() ([]byte, []int) { - return file_file_transport_proto_rawDescGZIP(), []int{0} -} - -// 文件数据。注意:只在Type为Data的时候,Data字段才能有数据 -type FileDataPacket struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Type FileDataPacketType `protobuf:"varint,1,opt,name=Type,proto3,enum=FileDataPacketType" json:"Type,omitempty"` - Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"` -} - -func (x *FileDataPacket) Reset() { - *x = FileDataPacket{} - if protoimpl.UnsafeEnabled { - mi := &file_file_transport_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *FileDataPacket) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*FileDataPacket) ProtoMessage() {} - -func (x *FileDataPacket) ProtoReflect() protoreflect.Message { - mi := &file_file_transport_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use FileDataPacket.ProtoReflect.Descriptor instead. -func (*FileDataPacket) Descriptor() ([]byte, []int) { - return file_file_transport_proto_rawDescGZIP(), []int{0} -} - -func (x *FileDataPacket) GetType() FileDataPacketType { - if x != nil { - return x.Type - } - return FileDataPacketType_Data -} - -func (x *FileDataPacket) GetData() []byte { - if x != nil { - return x.Data - } - return nil -} - -type SendResp struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - FileHash string `protobuf:"bytes,1,opt,name=FileHash,proto3" json:"FileHash,omitempty"` -} - -func (x *SendResp) Reset() { - *x = SendResp{} - if protoimpl.UnsafeEnabled { - mi := &file_file_transport_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *SendResp) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SendResp) ProtoMessage() {} - -func (x *SendResp) ProtoReflect() protoreflect.Message { - mi := &file_file_transport_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SendResp.ProtoReflect.Descriptor instead. -func (*SendResp) Descriptor() ([]byte, []int) { - return file_file_transport_proto_rawDescGZIP(), []int{1} -} - -func (x *SendResp) GetFileHash() string { - if x != nil { - return x.FileHash - } - return "" -} - -type GetReq struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - FileHash string `protobuf:"bytes,1,opt,name=FileHash,proto3" json:"FileHash,omitempty"` -} - -func (x *GetReq) Reset() { - *x = GetReq{} - if protoimpl.UnsafeEnabled { - mi := &file_file_transport_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *GetReq) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*GetReq) ProtoMessage() {} - -func (x *GetReq) ProtoReflect() protoreflect.Message { - mi := &file_file_transport_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use GetReq.ProtoReflect.Descriptor instead. -func (*GetReq) Descriptor() ([]byte, []int) { - return file_file_transport_proto_rawDescGZIP(), []int{2} -} - -func (x *GetReq) GetFileHash() string { - if x != nil { - return x.FileHash - } - return "" -} - -var File_file_transport_proto protoreflect.FileDescriptor - -var file_file_transport_proto_rawDesc = []byte{ - 0x0a, 0x14, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4d, 0x0a, 0x0e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, - 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x27, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x13, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, - 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x54, 0x79, 0x70, - 0x65, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x26, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x73, - 0x70, 0x12, 0x1a, 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x22, 0x24, 0x0a, - 0x06, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x12, 0x1a, 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, - 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, - 0x61, 0x73, 0x68, 0x2a, 0x27, 0x0a, 0x12, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, - 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x61, 0x74, - 0x61, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, 0x46, 0x10, 0x01, 0x32, 0x64, 0x0a, 0x0d, - 0x46, 0x69, 0x6c, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x2a, 0x0a, - 0x08, 0x53, 0x65, 0x6e, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, - 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x09, 0x2e, 0x53, 0x65, 0x6e, - 0x64, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x27, 0x0a, 0x07, 0x47, 0x65, 0x74, - 0x46, 0x69, 0x6c, 0x65, 0x12, 0x07, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, - 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, - 0x30, 0x01, 0x42, 0x10, 0x5a, 0x0e, 0x2e, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_file_transport_proto_rawDescOnce sync.Once - file_file_transport_proto_rawDescData = file_file_transport_proto_rawDesc -) - -func file_file_transport_proto_rawDescGZIP() []byte { - file_file_transport_proto_rawDescOnce.Do(func() { - file_file_transport_proto_rawDescData = protoimpl.X.CompressGZIP(file_file_transport_proto_rawDescData) - }) - return file_file_transport_proto_rawDescData -} - -var file_file_transport_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_file_transport_proto_msgTypes = make([]protoimpl.MessageInfo, 3) -var file_file_transport_proto_goTypes = []interface{}{ - (FileDataPacketType)(0), // 0: FileDataPacketType - (*FileDataPacket)(nil), // 1: FileDataPacket - (*SendResp)(nil), // 2: SendResp - (*GetReq)(nil), // 3: GetReq -} -var file_file_transport_proto_depIdxs = []int32{ - 0, // 0: FileDataPacket.Type:type_name -> FileDataPacketType - 1, // 1: FileTransport.SendFile:input_type -> FileDataPacket - 3, // 2: FileTransport.GetFile:input_type -> GetReq - 2, // 3: FileTransport.SendFile:output_type -> SendResp - 1, // 4: FileTransport.GetFile:output_type -> FileDataPacket - 3, // [3:5] is the sub-list for method output_type - 1, // [1:3] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name -} - -func init() { file_file_transport_proto_init() } -func file_file_transport_proto_init() { - if File_file_transport_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_file_transport_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*FileDataPacket); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_file_transport_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SendResp); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_file_transport_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*GetReq); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_file_transport_proto_rawDesc, - NumEnums: 1, - NumMessages: 3, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_file_transport_proto_goTypes, - DependencyIndexes: file_file_transport_proto_depIdxs, - EnumInfos: file_file_transport_proto_enumTypes, - MessageInfos: file_file_transport_proto_msgTypes, - }.Build() - File_file_transport_proto = out.File - file_file_transport_proto_rawDesc = nil - file_file_transport_proto_goTypes = nil - file_file_transport_proto_depIdxs = nil -} diff --git a/pkgs/proto/file_transport_grpc.pb.go b/pkgs/proto/file_transport_grpc.pb.go deleted file mode 100644 index 7f3dea0..0000000 --- a/pkgs/proto/file_transport_grpc.pb.go +++ /dev/null @@ -1,209 +0,0 @@ -// 使用的语法版本 - -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.3.0 -// - protoc v4.22.3 -// source: file_transport.proto - -package proto - -import ( - context "context" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 - -const ( - FileTransport_SendFile_FullMethodName = "/FileTransport/SendFile" - FileTransport_GetFile_FullMethodName = "/FileTransport/GetFile" -) - -// FileTransportClient is the client API for FileTransport service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type FileTransportClient interface { - SendFile(ctx context.Context, opts ...grpc.CallOption) (FileTransport_SendFileClient, error) - GetFile(ctx context.Context, in *GetReq, opts ...grpc.CallOption) (FileTransport_GetFileClient, error) -} - -type fileTransportClient struct { - cc grpc.ClientConnInterface -} - -func NewFileTransportClient(cc grpc.ClientConnInterface) FileTransportClient { - return &fileTransportClient{cc} -} - -func (c *fileTransportClient) SendFile(ctx context.Context, opts ...grpc.CallOption) (FileTransport_SendFileClient, error) { - stream, err := c.cc.NewStream(ctx, &FileTransport_ServiceDesc.Streams[0], FileTransport_SendFile_FullMethodName, opts...) - if err != nil { - return nil, err - } - x := &fileTransportSendFileClient{stream} - return x, nil -} - -type FileTransport_SendFileClient interface { - Send(*FileDataPacket) error - CloseAndRecv() (*SendResp, error) - grpc.ClientStream -} - -type fileTransportSendFileClient struct { - grpc.ClientStream -} - -func (x *fileTransportSendFileClient) Send(m *FileDataPacket) error { - return x.ClientStream.SendMsg(m) -} - -func (x *fileTransportSendFileClient) CloseAndRecv() (*SendResp, error) { - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - m := new(SendResp) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *fileTransportClient) GetFile(ctx context.Context, in *GetReq, opts ...grpc.CallOption) (FileTransport_GetFileClient, error) { - stream, err := c.cc.NewStream(ctx, &FileTransport_ServiceDesc.Streams[1], FileTransport_GetFile_FullMethodName, opts...) - if err != nil { - return nil, err - } - x := &fileTransportGetFileClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type FileTransport_GetFileClient interface { - Recv() (*FileDataPacket, error) - grpc.ClientStream -} - -type fileTransportGetFileClient struct { - grpc.ClientStream -} - -func (x *fileTransportGetFileClient) Recv() (*FileDataPacket, error) { - m := new(FileDataPacket) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// FileTransportServer is the server API for FileTransport service. -// All implementations must embed UnimplementedFileTransportServer -// for forward compatibility -type FileTransportServer interface { - SendFile(FileTransport_SendFileServer) error - GetFile(*GetReq, FileTransport_GetFileServer) error - mustEmbedUnimplementedFileTransportServer() -} - -// UnimplementedFileTransportServer must be embedded to have forward compatible implementations. -type UnimplementedFileTransportServer struct { -} - -func (UnimplementedFileTransportServer) SendFile(FileTransport_SendFileServer) error { - return status.Errorf(codes.Unimplemented, "method SendFile not implemented") -} -func (UnimplementedFileTransportServer) GetFile(*GetReq, FileTransport_GetFileServer) error { - return status.Errorf(codes.Unimplemented, "method GetFile not implemented") -} -func (UnimplementedFileTransportServer) mustEmbedUnimplementedFileTransportServer() {} - -// UnsafeFileTransportServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to FileTransportServer will -// result in compilation errors. -type UnsafeFileTransportServer interface { - mustEmbedUnimplementedFileTransportServer() -} - -func RegisterFileTransportServer(s grpc.ServiceRegistrar, srv FileTransportServer) { - s.RegisterService(&FileTransport_ServiceDesc, srv) -} - -func _FileTransport_SendFile_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(FileTransportServer).SendFile(&fileTransportSendFileServer{stream}) -} - -type FileTransport_SendFileServer interface { - SendAndClose(*SendResp) error - Recv() (*FileDataPacket, error) - grpc.ServerStream -} - -type fileTransportSendFileServer struct { - grpc.ServerStream -} - -func (x *fileTransportSendFileServer) SendAndClose(m *SendResp) error { - return x.ServerStream.SendMsg(m) -} - -func (x *fileTransportSendFileServer) Recv() (*FileDataPacket, error) { - m := new(FileDataPacket) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func _FileTransport_GetFile_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(GetReq) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(FileTransportServer).GetFile(m, &fileTransportGetFileServer{stream}) -} - -type FileTransport_GetFileServer interface { - Send(*FileDataPacket) error - grpc.ServerStream -} - -type fileTransportGetFileServer struct { - grpc.ServerStream -} - -func (x *fileTransportGetFileServer) Send(m *FileDataPacket) error { - return x.ServerStream.SendMsg(m) -} - -// FileTransport_ServiceDesc is the grpc.ServiceDesc for FileTransport service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var FileTransport_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "FileTransport", - HandlerType: (*FileTransportServer)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "SendFile", - Handler: _FileTransport_SendFile_Handler, - ClientStreams: true, - }, - { - StreamName: "GetFile", - Handler: _FileTransport_GetFile_Handler, - ServerStreams: true, - }, - }, - Metadata: "file_transport.proto", -} diff --git a/utils/grpc/file_transport.go b/utils/grpc/file_transport.go deleted file mode 100644 index fd542ac..0000000 --- a/utils/grpc/file_transport.go +++ /dev/null @@ -1,120 +0,0 @@ -package grpc - -import ( - "context" - "fmt" - "io" - - myio "gitlink.org.cn/cloudream/common/utils/io" - "gitlink.org.cn/cloudream/storage-common/pkgs/proto" -) - -type fileReadCloser struct { - io.ReadCloser - stream proto.FileTransport_GetFileClient - cancelFn context.CancelFunc - readData []byte -} - -func (s *fileReadCloser) Read(p []byte) (int, error) { - - if s.readData == nil { - resp, err := s.stream.Recv() - if err != nil { - return 0, err - } - - if resp.Type == proto.FileDataPacketType_Data { - s.readData = resp.Data - - } else if resp.Type == proto.FileDataPacketType_EOF { - return 0, io.EOF - - } else { - return 0, fmt.Errorf("unsuppoted packt type: %v", resp.Type) - } - } - - cnt := copy(p, s.readData) - - if len(s.readData) == cnt { - s.readData = nil - } else { - s.readData = s.readData[cnt:] - } - - return cnt, nil -} - -func (s *fileReadCloser) Close() error { - s.cancelFn() - - return nil -} - -func GetFileAsStream(client proto.FileTransportClient, fileHash string) (io.ReadCloser, error) { - ctx, cancel := context.WithCancel(context.Background()) - - stream, err := client.GetFile(ctx, &proto.GetReq{ - FileHash: fileHash, - }) - if err != nil { - cancel() - return nil, fmt.Errorf("request grpc failed, err: %w", err) - } - - return &fileReadCloser{ - stream: stream, - cancelFn: cancel, - }, nil -} - -type fileWriteCloser struct { - myio.PromiseWriteCloser[string] - stream proto.FileTransport_SendFileClient -} - -func (s *fileWriteCloser) Write(p []byte) (int, error) { - err := s.stream.Send(&proto.FileDataPacket{ - Type: proto.FileDataPacketType_Data, - Data: p, - }) - - if err != nil { - return 0, err - } - - return len(p), nil -} - -func (s *fileWriteCloser) Abort(err error) { - s.stream.CloseSend() -} - -func (s *fileWriteCloser) Finish() (string, error) { - err := s.stream.Send(&proto.FileDataPacket{ - Type: proto.FileDataPacketType_EOF, - }) - - if err != nil { - return "", fmt.Errorf("send EOF packet failed, err: %w", err) - } - - resp, err := s.stream.CloseAndRecv() - if err != nil { - return "", fmt.Errorf("receive response failed, err: %w", err) - } - - return resp.FileHash, nil -} - -func SendFileAsStream(client proto.FileTransportClient) (myio.PromiseWriteCloser[string], error) { - stream, err := client.SendFile(context.Background()) - if err != nil { - return nil, err - } - - return &fileWriteCloser{ - stream: stream, - }, nil -}