Browse Source

Merge pull request '增加ClientPool,解除对config的依赖' (#3) from feature_gxh into master

gitlink
baohan 2 years ago
parent
commit
a9e09e6923
28 changed files with 1263 additions and 1249 deletions
  1. +2
    -1
      consts/consts.go
  2. +11
    -0
      globals/globals.go
  3. +36
    -0
      globals/pools.go
  4. +1
    -0
      go.mod
  5. +2
    -0
      go.sum
  6. +17
    -0
      magefiles/main.go
  7. +6
    -0
      models/models.go
  8. +3
    -0
      pkgs/cmd/cmd.go
  9. +77
    -161
      pkgs/cmd/create_ec_package.go
  10. +81
    -108
      pkgs/cmd/create_rep_package.go
  11. +40
    -26
      pkgs/cmd/download_package.go
  12. +29
    -35
      pkgs/cmd/update_ec_package.go
  13. +25
    -30
      pkgs/cmd/update_rep_package.go
  14. +345
    -0
      pkgs/grpc/agent/agent.pb.go
  15. +6
    -6
      pkgs/grpc/agent/agent.proto
  16. +209
    -0
      pkgs/grpc/agent/agent_grpc.pb.go
  17. +131
    -0
      pkgs/grpc/agent/client.go
  18. +43
    -0
      pkgs/grpc/agent/pool.go
  19. +12
    -0
      pkgs/grpc/config.go
  20. +38
    -153
      pkgs/iterator/ec_object_iterator.go
  21. +38
    -46
      pkgs/iterator/rep_object_iterator.go
  22. +37
    -3
      pkgs/mq/agent/client.go
  23. +37
    -3
      pkgs/mq/coordinator/client.go
  24. +37
    -3
      pkgs/mq/scanner/client.go
  25. +0
    -2
      pkgs/proto/Makefile
  26. +0
    -343
      pkgs/proto/file_transport.pb.go
  27. +0
    -209
      pkgs/proto/file_transport_grpc.pb.go
  28. +0
    -120
      utils/grpc/file_transport.go

+ 2
- 1
consts/consts.go View File

@@ -1,7 +1,8 @@
package consts package consts


const ( const (
IPFSStateOK = "OK"
IPFSStateOK = "OK"
IPFSStateUnavailable = "Unavailable"


StorageDirectoryStateOK = "OK" StorageDirectoryStateOK = "OK"




+ 11
- 0
globals/globals.go View File

@@ -0,0 +1,11 @@
package globals

import (
stgmodels "gitlink.org.cn/cloudream/storage-common/models"
)

var Local *stgmodels.LocalMachineInfo

func InitLocal(info *stgmodels.LocalMachineInfo) {
Local = info
}

+ 36
- 0
globals/pools.go View File

@@ -0,0 +1,36 @@
package globals

import (
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent"
stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner"
)

var AgentMQPool *agtmq.Pool

var CoordinatorMQPool *coormq.Pool

var ScannerMQPool *scmq.Pool

func InitMQPool(cfg *stgmq.Config) {
AgentMQPool = agtmq.NewPool(cfg)

CoordinatorMQPool = coormq.NewPool(cfg)

ScannerMQPool = scmq.NewPool(cfg)
}

var AgentRPCPool *agtrpc.Pool

func InitAgentRPCPool(cfg *agtrpc.PoolConfig) {
AgentRPCPool = agtrpc.NewPool(cfg)
}

var IPFSPool *ipfs.Pool

func InitIPFSPool(cfg *ipfs.Config) {
IPFSPool = ipfs.NewPool(cfg)
}

+ 1
- 0
go.mod View File

@@ -6,6 +6,7 @@ require (
github.com/go-ping/ping v1.1.0 github.com/go-ping/ping v1.1.0
github.com/go-sql-driver/mysql v1.7.1 github.com/go-sql-driver/mysql v1.7.1
github.com/jmoiron/sqlx v1.3.5 github.com/jmoiron/sqlx v1.3.5
github.com/magefile/mage v1.15.0
github.com/samber/lo v1.36.0 github.com/samber/lo v1.36.0
github.com/smartystreets/goconvey v1.8.0 github.com/smartystreets/goconvey v1.8.0
gitlink.org.cn/cloudream/common v0.0.0 gitlink.org.cn/cloudream/common v0.0.0


+ 2
- 0
go.sum View File

@@ -69,6 +69,8 @@ github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFG
github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro= github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro=
github.com/libp2p/go-libp2p v0.26.3 h1:6g/psubqwdaBqNNoidbRKSTBEYgaOuKBhHl8Q5tO+PM= github.com/libp2p/go-libp2p v0.26.3 h1:6g/psubqwdaBqNNoidbRKSTBEYgaOuKBhHl8Q5tO+PM=
github.com/libp2p/go-libp2p v0.26.3/go.mod h1:x75BN32YbwuY0Awm2Uix4d4KOz+/4piInkp4Wr3yOo8= github.com/libp2p/go-libp2p v0.26.3/go.mod h1:x75BN32YbwuY0Awm2Uix4d4KOz+/4piInkp4Wr3yOo8=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=


+ 17
- 0
magefiles/main.go View File

@@ -0,0 +1,17 @@
//go:build mage

package main

import (
"path/filepath"

"github.com/magefile/mage/sh"
)

func Protos() error {
return proto("pkgs/grpc/agent", "agent.proto")
}

func proto(dir string, fileName string) error {
return sh.Run("protoc", "--go_out="+dir, "--go-grpc_out="+dir, filepath.Join(dir, fileName))
}

+ 6
- 0
models/models.go View File

@@ -81,3 +81,9 @@ func NewObjectECData(blocks []ObjectBlockData) ObjectECData {
Blocks: blocks, Blocks: blocks,
} }
} }

type LocalMachineInfo struct {
NodeID *int64 `json:"nodeID"`
ExternalIP string `json:"externalIP"`
LocalIP string `json:"localIP"`
}

+ 3
- 0
pkgs/cmd/cmd.go View File

@@ -0,0 +1,3 @@
package cmd

// 这个包主要存放一些公共的业务逻辑代码

+ 77
- 161
pkgs/cmd/create_ec_package.go View File

@@ -1,41 +1,34 @@
package cmd package cmd


import ( import (
"bytes"
"fmt" "fmt"
"io" "io"
"math/rand" "math/rand"
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"time"


"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc"


"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/ec" "gitlink.org.cn/cloudream/storage-common/pkgs/ec"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
) )


type CreateECPackage struct { type CreateECPackage struct {
userID int64
bucketID int64
name string
objectIter iterator.UploadingObjectIterator
redundancy models.ECRedundancyInfo
ecPacketSize int64
uploadConfig UploadConfig

Result CreateECPackageResult
userID int64
bucketID int64
name string
objectIter iterator.UploadingObjectIterator
redundancy models.ECRedundancyInfo
}

type UpdateECPackageContext struct {
*UpdatePackageContext
ECPacketSize int64
} }


type CreateECPackageResult struct { type CreateECPackageResult struct {
@@ -49,27 +42,24 @@ type ECObjectUploadResult struct {
ObjectID int64 ObjectID int64
} }


func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo, ecPacketSize int64, uploadConfig UploadConfig) *CreateECPackage {
func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage {
return &CreateECPackage{ return &CreateECPackage{
userID: userID,
bucketID: bucketID,
name: name,
objectIter: objIter,
redundancy: redundancy,
ecPacketSize: ecPacketSize,
uploadConfig: uploadConfig,
userID: userID,
bucketID: bucketID,
name: name,
objectIter: objIter,
redundancy: redundancy,
} }
} }


func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
t.objectIter.Close()
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}
func (t *CreateECPackage) Execute(ctx *UpdateECPackageContext) (*CreateECPackageResult, error) {
defer t.objectIter.Close()
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}


func (t *CreateECPackage) do(ctx TaskContext) error {
// TODO2 // TODO2
/* /*
reqBlder := reqbuilder.NewBuilder() reqBlder := reqbuilder.NewBuilder()
@@ -101,20 +91,20 @@ func (t *CreateECPackage) do(ctx TaskContext) error {
defer mutex.Unlock() defer mutex.Unlock()
*/ */


createPkgResp, err := ctx.Coordinator().CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name,
createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name,
models.NewTypedRedundancyInfo(models.RedundancyRep, t.redundancy))) models.NewTypedRedundancyInfo(models.RedundancyRep, t.redundancy)))
if err != nil { if err != nil {
return fmt.Errorf("creating package: %w", err)
return nil, fmt.Errorf("creating package: %w", err)
} }


getUserNodesResp, err := ctx.Coordinator().GetUserNodes(coormq.NewGetUserNodes(t.userID))
getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID))
if err != nil { if err != nil {
return fmt.Errorf("getting user nodes: %w", err)
return nil, fmt.Errorf("getting user nodes: %w", err)
} }


findCliLocResp, err := ctx.Coordinator().FindClientLocation(coormq.NewFindClientLocation(t.uploadConfig.ExternalIP))
findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP))
if err != nil { if err != nil {
return fmt.Errorf("finding client location: %w", err)
return nil, fmt.Errorf("finding client location: %w", err)
} }


uploadNodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo { uploadNodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo {
@@ -124,9 +114,9 @@ func (t *CreateECPackage) do(ctx TaskContext) error {
} }
}) })


getECResp, err := ctx.Coordinator().GetECConfig(coormq.NewGetECConfig(t.redundancy.ECName))
getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(t.redundancy.ECName))
if err != nil { if err != nil {
return fmt.Errorf("getting ec: %w", err)
return nil, fmt.Errorf("getting ec: %w", err)
} }


/* /*
@@ -141,17 +131,23 @@ func (t *CreateECPackage) do(ctx TaskContext) error {
defer mutex2.Unlock() defer mutex2.Unlock()
*/ */


rets, err := uploadAndUpdateECPackage(ctx, createPkgResp.PackageID, t.objectIter, uploadNodeInfos, getECResp.Config, t.ecPacketSize, t.uploadConfig)
rets, err := uploadAndUpdateECPackage(ctx, createPkgResp.PackageID, t.objectIter, uploadNodeInfos, getECResp.Config)
if err != nil { if err != nil {
return err
return nil, err
} }


t.Result.PackageID = createPkgResp.PackageID
t.Result.ObjectResults = rets
return nil
return &CreateECPackageResult{
PackageID: createPkgResp.PackageID,
ObjectResults: rets,
}, nil
} }


func uploadAndUpdateECPackage(ctx TaskContext, packageID int64, objectIter iterator.UploadingObjectIterator, uploadNodes []UploadNodeInfo, ec model.Ec, ecPacketSize int64, uploadConfig UploadConfig) ([]ECObjectUploadResult, error) {
func uploadAndUpdateECPackage(ctx *UpdateECPackageContext, packageID int64, objectIter iterator.UploadingObjectIterator, uploadNodes []UploadNodeInfo, ec model.Ec) ([]ECObjectUploadResult, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}

var uploadRets []ECObjectUploadResult var uploadRets []ECObjectUploadResult
//上传文件夹 //上传文件夹
var adds []coormq.AddECObjectInfo var adds []coormq.AddECObjectInfo
@@ -164,7 +160,7 @@ func uploadAndUpdateECPackage(ctx TaskContext, packageID int64, objectIter itera
return nil, fmt.Errorf("reading object: %w", err) return nil, fmt.Errorf("reading object: %w", err)
} }


fileHashes, uploadedNodeIDs, err := uploadECObject(ctx, objInfo, uploadNodes, ec, ecPacketSize, uploadConfig)
fileHashes, uploadedNodeIDs, err := uploadECObject(ctx, objInfo, uploadNodes, ec)
uploadRets = append(uploadRets, ECObjectUploadResult{ uploadRets = append(uploadRets, ECObjectUploadResult{
Info: objInfo, Info: objInfo,
Error: err, Error: err,
@@ -176,7 +172,7 @@ func uploadAndUpdateECPackage(ctx TaskContext, packageID int64, objectIter itera
adds = append(adds, coormq.NewAddECObjectInfo(objInfo.Path, objInfo.Size, fileHashes, uploadedNodeIDs)) adds = append(adds, coormq.NewAddECObjectInfo(objInfo.Path, objInfo.Size, fileHashes, uploadedNodeIDs))
} }


_, err := ctx.Coordinator().UpdateECPackage(coormq.NewUpdateECPackage(packageID, adds, nil))
_, err = coorCli.UpdateECPackage(coormq.NewUpdateECPackage(packageID, adds, nil))
if err != nil { if err != nil {
return nil, fmt.Errorf("updating package: %w", err) return nil, fmt.Errorf("updating package: %w", err)
} }
@@ -185,7 +181,7 @@ func uploadAndUpdateECPackage(ctx TaskContext, packageID int64, objectIter itera
} }


// 上传文件 // 上传文件
func uploadECObject(ctx TaskContext, obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ec model.Ec, ecPacketSize int64, uploadConfig UploadConfig) ([]string, []int64, error) {
func uploadECObject(ctx *UpdateECPackageContext, obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ec model.Ec) ([]string, []int64, error) {
//生成纠删码的写入节点序列 //生成纠删码的写入节点序列
nodes := make([]UploadNodeInfo, ec.EcN) nodes := make([]UploadNodeInfo, ec.EcN)
numNodes := len(uploadNodes) numNodes := len(uploadNodes)
@@ -194,7 +190,7 @@ func uploadECObject(ctx TaskContext, obj *iterator.IterUploadingObject, uploadNo
nodes[i] = uploadNodes[(startWriteNodeID+i)%numNodes] nodes[i] = uploadNodes[(startWriteNodeID+i)%numNodes]
} }


hashs, err := ecWrite(obj.File, obj.Size, ec.EcK, ec.EcN, nodes, ecPacketSize, uploadConfig)
hashs, err := ecWrite(ctx, obj.File, obj.Size, ec.EcK, ec.EcN, nodes)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("EcWrite failed, err: %w", err) return nil, nil, fmt.Errorf("EcWrite failed, err: %w", err)
} }
@@ -219,14 +215,13 @@ func (t *CreateECPackage) chooseUploadNode(nodes []UploadNodeInfo) UploadNodeInf
return nodes[rand.Intn(len(nodes))] return nodes[rand.Intn(len(nodes))]
} }


func ecWrite(file io.ReadCloser, fileSize int64, ecK int, ecN int, nodes []UploadNodeInfo, ecPacketSize int64, uploadConfig UploadConfig) ([]string, error) {

func ecWrite(ctx *UpdateECPackageContext, file io.ReadCloser, fileSize int64, ecK int, ecN int, nodes []UploadNodeInfo) ([]string, error) {
// TODO 需要参考RepWrite函数的代码逻辑,做好错误处理 // TODO 需要参考RepWrite函数的代码逻辑,做好错误处理
//获取文件大小 //获取文件大小


var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN
//计算每个块的packet数 //计算每个块的packet数
numPacket := (fileSize + int64(ecK)*ecPacketSize - 1) / (int64(ecK) * ecPacketSize)
numPacket := (fileSize + int64(ecK)*ctx.ECPacketSize - 1) / (int64(ecK) * ctx.ECPacketSize)
//fmt.Println(numPacket) //fmt.Println(numPacket)
//创建channel //创建channel
loadBufs := make([]chan []byte, ecN) loadBufs := make([]chan []byte, ecN)
@@ -239,7 +234,7 @@ func ecWrite(file io.ReadCloser, fileSize int64, ecK int, ecN int, nodes []Uploa
} }
hashs := make([]string, ecN) hashs := make([]string, ecN)
//正式开始写入 //正式开始写入
go load(file, loadBufs[:ecN], ecK, numPacket*int64(ecK), ecPacketSize) //从本地文件系统加载数据
go load(file, loadBufs[:ecN], ecK, numPacket*int64(ecK), ctx.ECPacketSize) //从本地文件系统加载数据
go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket) go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket)


var wg sync.WaitGroup var wg sync.WaitGroup
@@ -253,8 +248,18 @@ func ecWrite(file io.ReadCloser, fileSize int64, ecK int, ecN int, nodes []Uploa
} }
defer mutex.Unlock() defer mutex.Unlock()
*/ */
for i := 0; i < ecN; i++ {
go send(nodes[i], encodeBufs[i], numPacket, &wg, hashs, i, uploadConfig)
for idx := 0; idx < ecN; idx++ {
i := idx
reader := channelBytesReader{
channel: encodeBufs[idx],
packetCount: numPacket,
}
go func() {
// TODO 处理错误
fileHash, _ := uploadFile(&reader, nodes[i])
hashs[i] = fileHash
wg.Done()
}()
} }
wg.Wait() wg.Wait()


@@ -310,115 +315,26 @@ func encode(inBufs []chan []byte, outBufs []chan []byte, ecK int, coefs [][]int6
} }
} }


func send(node UploadNodeInfo, inBuf chan []byte, numPacket int64, wg *sync.WaitGroup, hashs []string, idx int, uploadConfig UploadConfig) error {
// TODO zkx 先直接复制client\internal\task\upload_rep_objects.go中的uploadToNode和uploadToLocalIPFS来替代这部分逻辑
// 方便之后异步化处理
// uploadToAgent的逻辑反了,而且中间步骤失败,就必须打印日志后停止后续操作
type channelBytesReader struct {
channel chan []byte
packetCount int64
readingData []byte
}


uploadToAgent := true
if uploadConfig.LocalIPFS != nil { //使用IPFS传输
//创建IPFS文件
logger.Infof("try to use local IPFS to upload block")
writer, err := uploadConfig.LocalIPFS.CreateFile()
if err != nil {
uploadToAgent = false
fmt.Errorf("create IPFS file failed, err: %w", err)
}
//逐packet写进ipfs
for i := 0; int64(i) < numPacket; i++ {
buf := <-inBuf
reader := bytes.NewReader(buf)
_, err = io.Copy(writer, reader)
if err != nil {
uploadToAgent = false
fmt.Errorf("copying block data to IPFS file failed, err: %w", err)
}
}
//finish, 获取哈希
fileHash, err := writer.Finish()
if err != nil {
logger.Warnf("upload block to local IPFS failed, so try to upload by agent, err: %s", err.Error())
uploadToAgent = false
fmt.Errorf("finish writing blcok to IPFS failed, err: %w", err)
func (r *channelBytesReader) Read(buf []byte) (int, error) {
if len(r.readingData) == 0 {
if r.packetCount == 0 {
return 0, io.EOF
} }
hashs[idx] = fileHash
if err != nil {
}
nodeID := node.Node.NodeID
// 然后让最近节点pin本地上传的文件
agentClient, err := agtmq.NewClient(nodeID, uploadConfig.MQ)
if err != nil {
uploadToAgent = false
fmt.Errorf("create agent client to %d failed, err: %w", nodeID, err)
}
defer agentClient.Close()


pinObjResp, err := agentClient.StartPinningObject(agtmq.NewStartPinningObject(fileHash))
if err != nil {
uploadToAgent = false
fmt.Errorf("start pinning object: %w", err)
}
for {
waitResp, err := agentClient.WaitPinningObject(agtmq.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5))
if err != nil {
uploadToAgent = false
fmt.Errorf("waitting pinning object: %w", err)
}
if waitResp.IsComplete {
if waitResp.Error != "" {
uploadToAgent = false
fmt.Errorf("agent pinning object: %s", waitResp.Error)
}
break
}
}
if uploadToAgent == false {
return nil
}
r.readingData = <-r.channel
r.packetCount--
} }
//////////////////////////////通过Agent上传
if uploadToAgent == true {
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := node.Node.ExternalIP
if node.IsSameLocation {
nodeIP = node.Node.LocalIP

logger.Infof("client and node %d are at the same location, use local ip\n", node.Node.NodeID)
}


grpcAddr := fmt.Sprintf("%s:%d", nodeIP, uploadConfig.GRPCPort)
grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
defer grpcCon.Close()
len := copy(buf, r.readingData)
r.readingData = r.readingData[:len]


client := agentcaller.NewFileTransportClient(grpcCon)
upload, err := mygrpc.SendFileAsStream(client)
if err != nil {
return fmt.Errorf("request to send file failed, err: %w", err)
}
// 发送文件数据
for i := 0; int64(i) < numPacket; i++ {
buf := <-inBuf
reader := bytes.NewReader(buf)
_, err = io.Copy(upload, reader)
if err != nil {
// 发生错误则关闭连接
upload.Abort(io.ErrClosedPipe)
return fmt.Errorf("copy block date to upload stream failed, err: %w", err)
}
}
// 发送EOF消息,并获得FileHash
fileHash, err := upload.Finish()
if err != nil {
upload.Abort(io.ErrClosedPipe)
return fmt.Errorf("send EOF failed, err: %w", err)
}
hashs[idx] = fileHash
wg.Done()
}
return nil
return len, nil
} }


func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) { func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) {


+ 81
- 108
pkgs/cmd/create_rep_package.go View File

@@ -9,19 +9,14 @@ import (
"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/ipfs"
mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc"


"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
) )


type UploadNodeInfo struct { type UploadNodeInfo struct {
@@ -30,22 +25,15 @@ type UploadNodeInfo struct {
} }


type CreateRepPackage struct { type CreateRepPackage struct {
userID int64
bucketID int64
name string
objectIter iterator.UploadingObjectIterator
redundancy models.RepRedundancyInfo
uploadConfig UploadConfig

Result CreateRepPackageResult
userID int64
bucketID int64
name string
objectIter iterator.UploadingObjectIterator
redundancy models.RepRedundancyInfo
} }


type UploadConfig struct {
LocalIPFS *ipfs.IPFS
LocalNodeID *int64
ExternalIP string
GRPCPort int
MQ *mymq.Config
type UpdatePackageContext struct {
Distlock *distsvc.Service
} }


type CreateRepPackageResult struct { type CreateRepPackageResult struct {
@@ -60,26 +48,24 @@ type RepObjectUploadResult struct {
ObjectID int64 ObjectID int64
} }


func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo, uploadConfig UploadConfig) *CreateRepPackage {
func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage {
return &CreateRepPackage{ return &CreateRepPackage{
userID: userID,
bucketID: bucketID,
name: name,
objectIter: objIter,
redundancy: redundancy,
uploadConfig: uploadConfig,
userID: userID,
bucketID: bucketID,
name: name,
objectIter: objIter,
redundancy: redundancy,
} }
} }


func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
t.objectIter.Close()
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}
func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackageResult, error) {
defer t.objectIter.Close()
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}


func (t *CreateRepPackage) do(ctx TaskContext) error {
/* /*
// TODO2 // TODO2
reqBlder := reqbuilder.NewBuilder() reqBlder := reqbuilder.NewBuilder()
@@ -110,20 +96,20 @@ func (t *CreateRepPackage) do(ctx TaskContext) error {
} }
defer mutex.Unlock() defer mutex.Unlock()
*/ */
createPkgResp, err := ctx.Coordinator().CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name,
createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name,
models.NewTypedRedundancyInfo(models.RedundancyRep, t.redundancy))) models.NewTypedRedundancyInfo(models.RedundancyRep, t.redundancy)))
if err != nil { if err != nil {
return fmt.Errorf("creating package: %w", err)
return nil, fmt.Errorf("creating package: %w", err)
} }


getUserNodesResp, err := ctx.Coordinator().GetUserNodes(coormq.NewGetUserNodes(t.userID))
getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID))
if err != nil { if err != nil {
return fmt.Errorf("getting user nodes: %w", err)
return nil, fmt.Errorf("getting user nodes: %w", err)
} }


findCliLocResp, err := ctx.Coordinator().FindClientLocation(coormq.NewFindClientLocation(t.uploadConfig.ExternalIP))
findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP))
if err != nil { if err != nil {
return fmt.Errorf("finding client location: %w", err)
return nil, fmt.Errorf("finding client location: %w", err)
} }


nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo { nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo {
@@ -137,25 +123,30 @@ func (t *CreateRepPackage) do(ctx TaskContext) error {
// 防止上传的副本被清除 // 防止上传的副本被清除
mutex2, err := reqbuilder.NewBuilder(). mutex2, err := reqbuilder.NewBuilder().
IPFS().CreateAnyRep(uploadNode.Node.NodeID). IPFS().CreateAnyRep(uploadNode.Node.NodeID).
MutexLock(ctx.DistLock())
MutexLock(ctx.Distlock)
if err != nil { if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
} }
defer mutex2.Unlock() defer mutex2.Unlock()


rets, err := uploadAndUpdateRepPackage(ctx, createPkgResp.PackageID, t.objectIter, uploadNode, t.uploadConfig)
rets, err := uploadAndUpdateRepPackage(createPkgResp.PackageID, t.objectIter, uploadNode)
if err != nil { if err != nil {
return err
return nil, err
} }


t.Result.PackageID = createPkgResp.PackageID
t.Result.ObjectResults = rets
return nil
return &CreateRepPackageResult{
PackageID: createPkgResp.PackageID,
ObjectResults: rets,
}, nil
} }


func uploadAndUpdateRepPackage(ctx TaskContext, packageID int64, objectIter iterator.UploadingObjectIterator, uploadNode UploadNodeInfo, uploadConfig UploadConfig) ([]RepObjectUploadResult, error) {
func uploadAndUpdateRepPackage(packageID int64, objectIter iterator.UploadingObjectIterator, uploadNode UploadNodeInfo) ([]RepObjectUploadResult, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}

var uploadRets []RepObjectUploadResult var uploadRets []RepObjectUploadResult
//上传文件夹
var adds []coormq.AddRepObjectInfo var adds []coormq.AddRepObjectInfo
for { for {
objInfo, err := objectIter.MoveNext() objInfo, err := objectIter.MoveNext()
@@ -166,7 +157,7 @@ func uploadAndUpdateRepPackage(ctx TaskContext, packageID int64, objectIter iter
return nil, fmt.Errorf("reading object: %w", err) return nil, fmt.Errorf("reading object: %w", err)
} }


fileHash, uploadedNodeIDs, err := uploadObject(ctx, objInfo, uploadNode, uploadConfig)
fileHash, err := uploadFile(objInfo.File, uploadNode)
uploadRets = append(uploadRets, RepObjectUploadResult{ uploadRets = append(uploadRets, RepObjectUploadResult{
Info: objInfo, Info: objInfo,
Error: err, Error: err,
@@ -176,10 +167,10 @@ func uploadAndUpdateRepPackage(ctx TaskContext, packageID int64, objectIter iter
return nil, fmt.Errorf("uploading object: %w", err) return nil, fmt.Errorf("uploading object: %w", err)
} }


adds = append(adds, coormq.NewAddRepObjectInfo(objInfo.Path, objInfo.Size, fileHash, uploadedNodeIDs))
adds = append(adds, coormq.NewAddRepObjectInfo(objInfo.Path, objInfo.Size, fileHash, []int64{uploadNode.Node.NodeID}))
} }


_, err := ctx.Coordinator().UpdateRepPackage(coormq.NewUpdateRepPackage(packageID, adds, nil))
_, err = coorCli.UpdateRepPackage(coormq.NewUpdateRepPackage(packageID, adds, nil))
if err != nil { if err != nil {
return nil, fmt.Errorf("updating package: %w", err) return nil, fmt.Errorf("updating package: %w", err)
} }
@@ -188,15 +179,15 @@ func uploadAndUpdateRepPackage(ctx TaskContext, packageID int64, objectIter iter
} }


// 上传文件 // 上传文件
func uploadObject(ctx TaskContext, obj *iterator.IterUploadingObject, uploadNode UploadNodeInfo, uploadConfig UploadConfig) (string, []int64, error) {
func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) {
// 本地有IPFS,则直接从本地IPFS上传 // 本地有IPFS,则直接从本地IPFS上传
if uploadConfig.LocalIPFS != nil {
if globals.IPFSPool != nil {
logger.Infof("try to use local IPFS to upload file") logger.Infof("try to use local IPFS to upload file")


// 只有本地IPFS不是存储系统中的一个节点,才需要Pin文件 // 只有本地IPFS不是存储系统中的一个节点,才需要Pin文件
fileHash, err := uploadToLocalIPFS(uploadConfig.LocalIPFS, obj.File, uploadNode.Node.NodeID, uploadConfig.LocalNodeID == nil, uploadConfig)
fileHash, err := uploadToLocalIPFS(file, uploadNode.Node.NodeID, globals.Local.NodeID == nil)
if err == nil { if err == nil {
return fileHash, []int64{*uploadConfig.LocalNodeID}, nil
return fileHash, nil


} else { } else {
logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.Node.NodeID, err.Error()) logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.Node.NodeID, err.Error())
@@ -212,12 +203,12 @@ func uploadObject(ctx TaskContext, obj *iterator.IterUploadingObject, uploadNode
logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.Node.NodeID) logger.Infof("client and node %d are at the same location, use local ip\n", uploadNode.Node.NodeID)
} }


fileHash, err := uploadToNode(obj.File, nodeIP, uploadConfig)
fileHash, err := uploadToNode(file, nodeIP)
if err != nil { if err != nil {
return "", nil, fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
return "", fmt.Errorf("upload to node %s failed, err: %w", nodeIP, err)
} }


return fileHash, []int64{uploadNode.Node.NodeID}, nil
return fileHash, nil
} }


// chooseUploadNode 选择一个上传文件的节点 // chooseUploadNode 选择一个上传文件的节点
@@ -232,86 +223,68 @@ func (t *CreateRepPackage) chooseUploadNode(nodes []UploadNodeInfo) UploadNodeIn
return nodes[rand.Intn(len(nodes))] return nodes[rand.Intn(len(nodes))]
} }


func uploadToNode(file io.ReadCloser, nodeIP string, uploadConfig UploadConfig) (string, error) {
// 建立grpc连接,发送请求
grpcAddr := fmt.Sprintf("%s:%d", nodeIP, uploadConfig.GRPCPort)
grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return "", fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
defer grpcCon.Close()

client := agentcaller.NewFileTransportClient(grpcCon)
upload, err := mygrpc.SendFileAsStream(client)
func uploadToNode(file io.Reader, nodeIP string) (string, error) {
rpcCli, err := globals.AgentRPCPool.Acquire(nodeIP)
if err != nil { if err != nil {
return "", fmt.Errorf("request to send file failed, err: %w", err)
return "", fmt.Errorf("new agent rpc client: %w", err)
} }
defer rpcCli.Close()


// 发送文件数据
_, err = io.Copy(upload, file)
if err != nil {
// 发生错误则关闭连接
upload.Abort(io.ErrClosedPipe)
return "", fmt.Errorf("copy file date to upload stream failed, err: %w", err)
}
return rpcCli.SendIPFSFile(file)
}


// 发送EOF消息,并获得FileHash
fileHash, err := upload.Finish()
func uploadToLocalIPFS(file io.Reader, nodeID int64, shouldPin bool) (string, error) {
ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil { if err != nil {
upload.Abort(io.ErrClosedPipe)
return "", fmt.Errorf("send EOF failed, err: %w", err)
return "", fmt.Errorf("new ipfs client: %w", err)
} }
defer ipfsCli.Close()


return fileHash, nil
}

func uploadToLocalIPFS(ipfs *ipfs.IPFS, file io.ReadCloser, nodeID int64, shouldPin bool, uploadConfig UploadConfig) (string, error) {
// 从本地IPFS上传文件 // 从本地IPFS上传文件
writer, err := ipfs.CreateFile()
fileHash, err := ipfsCli.CreateFile(file)
if err != nil { if err != nil {
return "", fmt.Errorf("create IPFS file failed, err: %w", err)
return "", fmt.Errorf("creating ipfs file: %w", err)
} }


_, err = io.Copy(writer, file)
if err != nil {
return "", fmt.Errorf("copy file data to IPFS failed, err: %w", err)
if !shouldPin {
return fileHash, nil
} }


fileHash, err := writer.Finish()
err = pinIPFSFile(nodeID, fileHash)
if err != nil { if err != nil {
return "", fmt.Errorf("finish writing IPFS failed, err: %w", err)
return "", err
} }


if !shouldPin {
return fileHash, nil
}
return fileHash, nil
}


// 然后让最近节点pin本地上传的文件
agentClient, err := agtmq.NewClient(nodeID, uploadConfig.MQ)
func pinIPFSFile(nodeID int64, fileHash string) error {
agtCli, err := globals.AgentMQPool.Acquire(nodeID)
if err != nil { if err != nil {
return "", fmt.Errorf("create agent client to %d failed, err: %w", nodeID, err)
return fmt.Errorf("new agent client: %w", err)
} }
defer agentClient.Close()
defer agtCli.Close()


pinObjResp, err := agentClient.StartPinningObject(agtmq.NewStartPinningObject(fileHash))
// 然后让最近节点pin本地上传的文件
pinObjResp, err := agtCli.StartPinningObject(agtmq.NewStartPinningObject(fileHash))
if err != nil { if err != nil {
return "", fmt.Errorf("start pinning object: %w", err)
return fmt.Errorf("start pinning object: %w", err)
} }


for { for {
waitResp, err := agentClient.WaitPinningObject(agtmq.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5))
waitResp, err := agtCli.WaitPinningObject(agtmq.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5))
if err != nil { if err != nil {
return "", fmt.Errorf("waitting pinning object: %w", err)
return fmt.Errorf("waitting pinning object: %w", err)
} }


if waitResp.IsComplete { if waitResp.IsComplete {
if waitResp.Error != "" { if waitResp.Error != "" {
return "", fmt.Errorf("agent pinning object: %s", waitResp.Error)
return fmt.Errorf("agent pinning object: %s", waitResp.Error)
} }


break break
} }
} }


return fileHash, nil
return nil
} }

+ 40
- 26
pkgs/cmd/download_package.go View File

@@ -5,10 +5,11 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"time"


"gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/models"
distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
@@ -20,6 +21,11 @@ type DownloadPackage struct {
outputPath string outputPath string
} }


type DownloadPackageContext struct {
Distlock *distsvc.Service
ECPacketSize int64
}

func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage { func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage {
return &DownloadPackage{ return &DownloadPackage{
userID: userID, userID: userID,
@@ -28,15 +34,14 @@ func NewDownloadPackage(userID int64, packageID int64, outputPath string) *Downl
} }
} }


func (t *DownloadPackage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}
func (t *DownloadPackage) Execute(ctx *DownloadPackageContext) error {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()


func (t *DownloadPackage) do(ctx TaskContext) error {
getPkgResp, err := ctx.Coordinator().GetPackage(coormq.NewGetPackage(t.userID, t.packageID))
getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID))
if err != nil { if err != nil {


return fmt.Errorf("getting package: %w", err) return fmt.Errorf("getting package: %w", err)
@@ -56,34 +61,43 @@ func (t *DownloadPackage) do(ctx TaskContext) error {
return t.writeObject(objIter) return t.writeObject(objIter)
} }


func (t *DownloadPackage) downloadRep(ctx TaskContext) (iterator.DownloadingObjectIterator, error) {
getObjsResp, err := ctx.Coordinator().GetPackageObjects(coormq.NewGetPackageObjects(t.userID, t.packageID))
func (t *DownloadPackage) downloadRep(ctx *DownloadPackageContext) (iterator.DownloadingObjectIterator, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(t.userID, t.packageID))
if err != nil { if err != nil {
return nil, fmt.Errorf("getting package objects: %w", err) return nil, fmt.Errorf("getting package objects: %w", err)
} }


getObjRepDataResp, err := ctx.Coordinator().GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(t.packageID))
getObjRepDataResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(t.packageID))
if err != nil { if err != nil {
return nil, fmt.Errorf("getting package object rep data: %w", err) return nil, fmt.Errorf("getting package object rep data: %w", err)
} }


iter := iterator.NewRepObjectIterator(getObjsResp.Objects, getObjRepDataResp.Data, ctx.Coordinator(), svc.distlock, myos.DownloadConfig{
LocalIPFS: svc.ipfs,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
iter := iterator.NewRepObjectIterator(getObjsResp.Objects, getObjRepDataResp.Data, &iterator.DownloadContext{
Distlock: ctx.Distlock,
}) })


return iter, nil return iter, nil
} }


func (t *DownloadPackage) downloadEC(ctx TaskContext, pkg model.Package) (iterator.DownloadingObjectIterator, error) {
getObjsResp, err := ctx.Coordinator().GetPackageObjects(coormq.NewGetPackageObjects(t.userID, t.packageID))
func (t *DownloadPackage) downloadEC(ctx *DownloadPackageContext, pkg model.Package) (iterator.DownloadingObjectIterator, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

getObjsResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(t.userID, t.packageID))
if err != nil { if err != nil {
return nil, fmt.Errorf("getting package objects: %w", err) return nil, fmt.Errorf("getting package objects: %w", err)
} }


getObjECDataResp, err := ctx.Coordinator().GetPackageObjectECData(coormq.NewGetPackageObjectECData(t.packageID))
getObjECDataResp, err := coorCli.GetPackageObjectECData(coormq.NewGetPackageObjectECData(t.packageID))
if err != nil { if err != nil {
return nil, fmt.Errorf("getting package object ec data: %w", err) return nil, fmt.Errorf("getting package object ec data: %w", err)
} }
@@ -93,16 +107,16 @@ func (t *DownloadPackage) downloadEC(ctx TaskContext, pkg model.Package) (iterat
return nil, fmt.Errorf("get ec redundancy info: %w", err) return nil, fmt.Errorf("get ec redundancy info: %w", err)
} }


getECResp, err := ctx.Coordinator().GetECConfig(coormq.NewGetECConfig(ecRed.ECName))
getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecRed.ECName))
if err != nil { if err != nil {
return nil, fmt.Errorf("getting ec: %w", err) return nil, fmt.Errorf("getting ec: %w", err)
} }


iter := iterator.NewECObjectIterator(getObjsResp.Objects, getObjECDataResp.Data, ctx.Coordinator(), svc.distlock, getECResp.Config, config.Cfg().ECPacketSize, myos.DownloadConfig{
LocalIPFS: svc.ipfs,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
iter := iterator.NewECObjectIterator(getObjsResp.Objects, getObjECDataResp.Data, getECResp.Config, &iterator.ECDownloadContext{
DownloadContext: &iterator.DownloadContext{
Distlock: ctx.Distlock,
},
ECPacketSize: ctx.ECPacketSize,
}) })


return iter, nil return iter, nil


+ 29
- 35
pkgs/cmd/update_ec_package.go View File

@@ -2,51 +2,44 @@ package cmd


import ( import (
"fmt" "fmt"
"time"


"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
mysort "gitlink.org.cn/cloudream/common/utils/sort" mysort "gitlink.org.cn/cloudream/common/utils/sort"


"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
) )


type UpdateECPackage struct { type UpdateECPackage struct {
userID int64
packageID int64
objectIter iterator.UploadingObjectIterator
ecPacketSize int64
uploadConfig UploadConfig

Result UpdateECPackageResult
userID int64
packageID int64
objectIter iterator.UploadingObjectIterator
} }


type UpdateECPackageResult struct { type UpdateECPackageResult struct {
ObjectResults []ECObjectUploadResult ObjectResults []ECObjectUploadResult
} }


func NewUpdateECPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator, ecPacketSize int64, uploadConfig UploadConfig) *UpdateECPackage {
func NewUpdateECPackage(userID int64, packageID int64, objIter iterator.UploadingObjectIterator) *UpdateECPackage {
return &UpdateECPackage{ return &UpdateECPackage{
userID: userID,
packageID: packageID,
objectIter: objIter,
ecPacketSize: ecPacketSize,
uploadConfig: uploadConfig,
userID: userID,
packageID: packageID,
objectIter: objIter,
} }
} }


func (t *UpdateECPackage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
t.objectIter.Close()
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}
func (t *UpdateECPackage) Execute(ctx *UpdateECPackageContext) (*UpdateECPackageResult, error) {
defer t.objectIter.Close()
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}


func (t *UpdateECPackage) do(ctx TaskContext) error {
/* /*
TODO2 TODO2
reqBlder := reqbuilder.NewBuilder() reqBlder := reqbuilder.NewBuilder()
@@ -78,19 +71,19 @@ func (t *UpdateECPackage) do(ctx TaskContext) error {
defer mutex.Unlock() defer mutex.Unlock()
*/ */


getPkgResp, err := ctx.Coordinator().GetPackage(coormq.NewGetPackage(t.userID, t.packageID))
getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID))
if err != nil { if err != nil {
return fmt.Errorf("getting package: %w", err)
return nil, fmt.Errorf("getting package: %w", err)
} }


getUserNodesResp, err := ctx.Coordinator().GetUserNodes(coormq.NewGetUserNodes(t.userID))
getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID))
if err != nil { if err != nil {
return fmt.Errorf("getting user nodes: %w", err)
return nil, fmt.Errorf("getting user nodes: %w", err)
} }


findCliLocResp, err := ctx.Coordinator().FindClientLocation(coormq.NewFindClientLocation(t.uploadConfig.ExternalIP))
findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP))
if err != nil { if err != nil {
return fmt.Errorf("finding client location: %w", err)
return nil, fmt.Errorf("finding client location: %w", err)
} }


nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo { nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo {
@@ -102,12 +95,12 @@ func (t *UpdateECPackage) do(ctx TaskContext) error {


var ecRed models.ECRedundancyInfo var ecRed models.ECRedundancyInfo
if err := serder.AnyToAny(getPkgResp.Package.Redundancy.Info, &ecRed); err != nil { if err := serder.AnyToAny(getPkgResp.Package.Redundancy.Info, &ecRed); err != nil {
return fmt.Errorf("get ec redundancy info: %w", err)
return nil, fmt.Errorf("get ec redundancy info: %w", err)
} }


getECResp, err := ctx.Coordinator().GetECConfig(coormq.NewGetECConfig(ecRed.ECName))
getECResp, err := coorCli.GetECConfig(coormq.NewGetECConfig(ecRed.ECName))
if err != nil { if err != nil {
return fmt.Errorf("getting ec: %w", err)
return nil, fmt.Errorf("getting ec: %w", err)
} }


/* /*
@@ -122,13 +115,14 @@ func (t *UpdateECPackage) do(ctx TaskContext) error {
defer mutex2.Unlock() defer mutex2.Unlock()
*/ */


rets, err := uploadAndUpdateECPackage(ctx, t.packageID, t.objectIter, nodeInfos, getECResp.Config, t.ecPacketSize, t.uploadConfig)
rets, err := uploadAndUpdateECPackage(ctx, t.packageID, t.objectIter, nodeInfos, getECResp.Config)
if err != nil { if err != nil {
return err
return nil, err
} }


t.Result.ObjectResults = rets
return nil
return &UpdateECPackageResult{
ObjectResults: rets,
}, nil
} }


// chooseUploadNode 选择一个上传文件的节点 // chooseUploadNode 选择一个上传文件的节点


+ 25
- 30
pkgs/cmd/update_rep_package.go View File

@@ -2,24 +2,21 @@ package cmd


import ( import (
"fmt" "fmt"
"time"


"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
mysort "gitlink.org.cn/cloudream/common/utils/sort" mysort "gitlink.org.cn/cloudream/common/utils/sort"


"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
) )


type UpdateRepPackage struct { type UpdateRepPackage struct {
userID int64
packageID int64
objectIter iterator.UploadingObjectIterator
uploadConfig UploadConfig

Result UpdateRepPackageResult
userID int64
packageID int64
objectIter iterator.UploadingObjectIterator
} }


type UpdateNodeInfo struct { type UpdateNodeInfo struct {
@@ -31,24 +28,21 @@ type UpdateRepPackageResult struct {
ObjectResults []RepObjectUploadResult ObjectResults []RepObjectUploadResult
} }


func NewUpdateRepPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator, uploadConfig UploadConfig) *UpdateRepPackage {
func NewUpdateRepPackage(userID int64, packageID int64, objectIter iterator.UploadingObjectIterator) *UpdateRepPackage {
return &UpdateRepPackage{ return &UpdateRepPackage{
userID: userID,
packageID: packageID,
objectIter: objectIter,
uploadConfig: uploadConfig,
userID: userID,
packageID: packageID,
objectIter: objectIter,
} }
} }


func (t *UpdateRepPackage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
t.objectIter.Close()
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}
func (t *UpdateRepPackage) Execute(ctx *UpdatePackageContext) (*UpdateRepPackageResult, error) {
defer t.objectIter.Close()


func (t *UpdateRepPackage) do(ctx TaskContext) error {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
/* /*
TODO2 TODO2
reqBlder := reqbuilder.NewBuilder() reqBlder := reqbuilder.NewBuilder()
@@ -79,14 +73,14 @@ func (t *UpdateRepPackage) do(ctx TaskContext) error {
} }
defer mutex.Unlock() defer mutex.Unlock()
*/ */
getUserNodesResp, err := ctx.Coordinator().GetUserNodes(coormq.NewGetUserNodes(t.userID))
getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID))
if err != nil { if err != nil {
return fmt.Errorf("getting user nodes: %w", err)
return nil, fmt.Errorf("getting user nodes: %w", err)
} }


findCliLocResp, err := ctx.Coordinator().FindClientLocation(coormq.NewFindClientLocation(t.uploadConfig.ExternalIP))
findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP))
if err != nil { if err != nil {
return fmt.Errorf("finding client location: %w", err)
return nil, fmt.Errorf("finding client location: %w", err)
} }


nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UpdateNodeInfo { nodeInfos := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UpdateNodeInfo {
@@ -108,19 +102,20 @@ func (t *UpdateRepPackage) do(ctx TaskContext) error {
// 防止上传的副本被清除 // 防止上传的副本被清除
mutex2, err := reqbuilder.NewBuilder(). mutex2, err := reqbuilder.NewBuilder().
IPFS().CreateAnyRep(uploadNode.Node.NodeID). IPFS().CreateAnyRep(uploadNode.Node.NodeID).
MutexLock(ctx.DistLock())
MutexLock(ctx.Distlock)
if err != nil { if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
} }
defer mutex2.Unlock() defer mutex2.Unlock()


rets, err := uploadAndUpdateRepPackage(ctx, t.packageID, t.objectIter, uploadNode.UploadNodeInfo, t.uploadConfig)
rets, err := uploadAndUpdateRepPackage(t.packageID, t.objectIter, uploadNode.UploadNodeInfo)
if err != nil { if err != nil {
return err
return nil, err
} }


t.Result.ObjectResults = rets
return nil
return &UpdateRepPackageResult{
ObjectResults: rets,
}, nil
} }


// chooseUploadNode 选择一个上传文件的节点 // chooseUploadNode 选择一个上传文件的节点


+ 345
- 0
pkgs/grpc/agent/agent.pb.go View File

@@ -0,0 +1,345 @@
// 使用的语法版本

// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.30.0
// protoc v4.22.3
// source: pkgs/grpc/agent/agent.proto

package agent

import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)

const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)

type FileDataPacketType int32

const (
FileDataPacketType_Data FileDataPacketType = 0
FileDataPacketType_EOF FileDataPacketType = 1
)

// Enum value maps for FileDataPacketType.
var (
FileDataPacketType_name = map[int32]string{
0: "Data",
1: "EOF",
}
FileDataPacketType_value = map[string]int32{
"Data": 0,
"EOF": 1,
}
)

func (x FileDataPacketType) Enum() *FileDataPacketType {
p := new(FileDataPacketType)
*p = x
return p
}

func (x FileDataPacketType) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}

func (FileDataPacketType) Descriptor() protoreflect.EnumDescriptor {
return file_pkgs_grpc_agent_agent_proto_enumTypes[0].Descriptor()
}

func (FileDataPacketType) Type() protoreflect.EnumType {
return &file_pkgs_grpc_agent_agent_proto_enumTypes[0]
}

func (x FileDataPacketType) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}

// Deprecated: Use FileDataPacketType.Descriptor instead.
func (FileDataPacketType) EnumDescriptor() ([]byte, []int) {
return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{0}
}

// 文件数据。注意:只在Type为Data的时候,Data字段才能有数据
type FileDataPacket struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

Type FileDataPacketType `protobuf:"varint,1,opt,name=Type,proto3,enum=FileDataPacketType" json:"Type,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"`
}

func (x *FileDataPacket) Reset() {
*x = FileDataPacket{}
if protoimpl.UnsafeEnabled {
mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *FileDataPacket) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*FileDataPacket) ProtoMessage() {}

func (x *FileDataPacket) ProtoReflect() protoreflect.Message {
mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use FileDataPacket.ProtoReflect.Descriptor instead.
func (*FileDataPacket) Descriptor() ([]byte, []int) {
return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{0}
}

func (x *FileDataPacket) GetType() FileDataPacketType {
if x != nil {
return x.Type
}
return FileDataPacketType_Data
}

func (x *FileDataPacket) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}

type SendIPFSFileResp struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

FileHash string `protobuf:"bytes,1,opt,name=FileHash,proto3" json:"FileHash,omitempty"`
}

func (x *SendIPFSFileResp) Reset() {
*x = SendIPFSFileResp{}
if protoimpl.UnsafeEnabled {
mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *SendIPFSFileResp) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*SendIPFSFileResp) ProtoMessage() {}

func (x *SendIPFSFileResp) ProtoReflect() protoreflect.Message {
mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use SendIPFSFileResp.ProtoReflect.Descriptor instead.
func (*SendIPFSFileResp) Descriptor() ([]byte, []int) {
return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{1}
}

func (x *SendIPFSFileResp) GetFileHash() string {
if x != nil {
return x.FileHash
}
return ""
}

type GetIPFSFileReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

FileHash string `protobuf:"bytes,1,opt,name=FileHash,proto3" json:"FileHash,omitempty"`
}

func (x *GetIPFSFileReq) Reset() {
*x = GetIPFSFileReq{}
if protoimpl.UnsafeEnabled {
mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *GetIPFSFileReq) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*GetIPFSFileReq) ProtoMessage() {}

func (x *GetIPFSFileReq) ProtoReflect() protoreflect.Message {
mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use GetIPFSFileReq.ProtoReflect.Descriptor instead.
func (*GetIPFSFileReq) Descriptor() ([]byte, []int) {
return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{2}
}

func (x *GetIPFSFileReq) GetFileHash() string {
if x != nil {
return x.FileHash
}
return ""
}

var File_pkgs_grpc_agent_agent_proto protoreflect.FileDescriptor

var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{
0x0a, 0x1b, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x61, 0x67, 0x65, 0x6e,
0x74, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4d, 0x0a,
0x0e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12,
0x27, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x13, 0x2e,
0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79,
0x70, 0x65, 0x52, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61,
0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x2e, 0x0a, 0x10,
0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70,
0x12, 0x1a, 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x22, 0x2c, 0x0a, 0x0e,
0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x12, 0x1a,
0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x2a, 0x27, 0x0a, 0x12, 0x46, 0x69,
0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65,
0x12, 0x08, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f,
0x46, 0x10, 0x01, 0x32, 0x74, 0x0a, 0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x36, 0x0a, 0x0c,
0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x46,
0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x11, 0x2e,
0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70,
0x22, 0x00, 0x28, 0x01, 0x12, 0x33, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46,
0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c,
0x65, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50,
0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x61,
0x67, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}

var (
file_pkgs_grpc_agent_agent_proto_rawDescOnce sync.Once
file_pkgs_grpc_agent_agent_proto_rawDescData = file_pkgs_grpc_agent_agent_proto_rawDesc
)

func file_pkgs_grpc_agent_agent_proto_rawDescGZIP() []byte {
file_pkgs_grpc_agent_agent_proto_rawDescOnce.Do(func() {
file_pkgs_grpc_agent_agent_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkgs_grpc_agent_agent_proto_rawDescData)
})
return file_pkgs_grpc_agent_agent_proto_rawDescData
}

var file_pkgs_grpc_agent_agent_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_pkgs_grpc_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_pkgs_grpc_agent_agent_proto_goTypes = []interface{}{
(FileDataPacketType)(0), // 0: FileDataPacketType
(*FileDataPacket)(nil), // 1: FileDataPacket
(*SendIPFSFileResp)(nil), // 2: SendIPFSFileResp
(*GetIPFSFileReq)(nil), // 3: GetIPFSFileReq
}
var file_pkgs_grpc_agent_agent_proto_depIdxs = []int32{
0, // 0: FileDataPacket.Type:type_name -> FileDataPacketType
1, // 1: Agent.SendIPFSFile:input_type -> FileDataPacket
3, // 2: Agent.GetIPFSFile:input_type -> GetIPFSFileReq
2, // 3: Agent.SendIPFSFile:output_type -> SendIPFSFileResp
1, // 4: Agent.GetIPFSFile:output_type -> FileDataPacket
3, // [3:5] is the sub-list for method output_type
1, // [1:3] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}

func init() { file_pkgs_grpc_agent_agent_proto_init() }
func file_pkgs_grpc_agent_agent_proto_init() {
if File_pkgs_grpc_agent_agent_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pkgs_grpc_agent_agent_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FileDataPacket); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pkgs_grpc_agent_agent_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SendIPFSFileResp); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pkgs_grpc_agent_agent_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*GetIPFSFileReq); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pkgs_grpc_agent_agent_proto_rawDesc,
NumEnums: 1,
NumMessages: 3,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_pkgs_grpc_agent_agent_proto_goTypes,
DependencyIndexes: file_pkgs_grpc_agent_agent_proto_depIdxs,
EnumInfos: file_pkgs_grpc_agent_agent_proto_enumTypes,
MessageInfos: file_pkgs_grpc_agent_agent_proto_msgTypes,
}.Build()
File_pkgs_grpc_agent_agent_proto = out.File
file_pkgs_grpc_agent_agent_proto_rawDesc = nil
file_pkgs_grpc_agent_agent_proto_goTypes = nil
file_pkgs_grpc_agent_agent_proto_depIdxs = nil
}

pkgs/proto/file_transport.proto → pkgs/grpc/agent/agent.proto View File

@@ -2,7 +2,7 @@
syntax = "proto3"; syntax = "proto3";


// 生成的go文件包 // 生成的go文件包
option go_package = "../proto;proto";//grpc这里生效了
option go_package = ".;agent";//grpc这里生效了




enum FileDataPacketType { enum FileDataPacketType {
@@ -15,16 +15,16 @@ message FileDataPacket {
bytes Data = 2; bytes Data = 2;
} }


message SendResp {
message SendIPFSFileResp {
string FileHash = 1; string FileHash = 1;
} }


message GetReq {
message GetIPFSFileReq {
string FileHash = 1; string FileHash = 1;
} }


service FileTransport {
rpc SendFile(stream FileDataPacket)returns(SendResp){}
rpc GetFile(GetReq)returns(stream FileDataPacket){}
service Agent {
rpc SendIPFSFile(stream FileDataPacket)returns(SendIPFSFileResp){}
rpc GetIPFSFile(GetIPFSFileReq)returns(stream FileDataPacket){}
} }



+ 209
- 0
pkgs/grpc/agent/agent_grpc.pb.go View File

@@ -0,0 +1,209 @@
// 使用的语法版本

// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.22.3
// source: pkgs/grpc/agent/agent.proto

package agent

import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7

const (
Agent_SendIPFSFile_FullMethodName = "/Agent/SendIPFSFile"
Agent_GetIPFSFile_FullMethodName = "/Agent/GetIPFSFile"
)

// AgentClient is the client API for Agent service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type AgentClient interface {
SendIPFSFile(ctx context.Context, opts ...grpc.CallOption) (Agent_SendIPFSFileClient, error)
GetIPFSFile(ctx context.Context, in *GetIPFSFileReq, opts ...grpc.CallOption) (Agent_GetIPFSFileClient, error)
}

type agentClient struct {
cc grpc.ClientConnInterface
}

func NewAgentClient(cc grpc.ClientConnInterface) AgentClient {
return &agentClient{cc}
}

func (c *agentClient) SendIPFSFile(ctx context.Context, opts ...grpc.CallOption) (Agent_SendIPFSFileClient, error) {
stream, err := c.cc.NewStream(ctx, &Agent_ServiceDesc.Streams[0], Agent_SendIPFSFile_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &agentSendIPFSFileClient{stream}
return x, nil
}

type Agent_SendIPFSFileClient interface {
Send(*FileDataPacket) error
CloseAndRecv() (*SendIPFSFileResp, error)
grpc.ClientStream
}

type agentSendIPFSFileClient struct {
grpc.ClientStream
}

func (x *agentSendIPFSFileClient) Send(m *FileDataPacket) error {
return x.ClientStream.SendMsg(m)
}

func (x *agentSendIPFSFileClient) CloseAndRecv() (*SendIPFSFileResp, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(SendIPFSFileResp)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

func (c *agentClient) GetIPFSFile(ctx context.Context, in *GetIPFSFileReq, opts ...grpc.CallOption) (Agent_GetIPFSFileClient, error) {
stream, err := c.cc.NewStream(ctx, &Agent_ServiceDesc.Streams[1], Agent_GetIPFSFile_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &agentGetIPFSFileClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}

type Agent_GetIPFSFileClient interface {
Recv() (*FileDataPacket, error)
grpc.ClientStream
}

type agentGetIPFSFileClient struct {
grpc.ClientStream
}

func (x *agentGetIPFSFileClient) Recv() (*FileDataPacket, error) {
m := new(FileDataPacket)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

// AgentServer is the server API for Agent service.
// All implementations must embed UnimplementedAgentServer
// for forward compatibility
type AgentServer interface {
SendIPFSFile(Agent_SendIPFSFileServer) error
GetIPFSFile(*GetIPFSFileReq, Agent_GetIPFSFileServer) error
mustEmbedUnimplementedAgentServer()
}

// UnimplementedAgentServer must be embedded to have forward compatible implementations.
type UnimplementedAgentServer struct {
}

func (UnimplementedAgentServer) SendIPFSFile(Agent_SendIPFSFileServer) error {
return status.Errorf(codes.Unimplemented, "method SendIPFSFile not implemented")
}
func (UnimplementedAgentServer) GetIPFSFile(*GetIPFSFileReq, Agent_GetIPFSFileServer) error {
return status.Errorf(codes.Unimplemented, "method GetIPFSFile not implemented")
}
func (UnimplementedAgentServer) mustEmbedUnimplementedAgentServer() {}

// UnsafeAgentServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to AgentServer will
// result in compilation errors.
type UnsafeAgentServer interface {
mustEmbedUnimplementedAgentServer()
}

func RegisterAgentServer(s grpc.ServiceRegistrar, srv AgentServer) {
s.RegisterService(&Agent_ServiceDesc, srv)
}

func _Agent_SendIPFSFile_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(AgentServer).SendIPFSFile(&agentSendIPFSFileServer{stream})
}

type Agent_SendIPFSFileServer interface {
SendAndClose(*SendIPFSFileResp) error
Recv() (*FileDataPacket, error)
grpc.ServerStream
}

type agentSendIPFSFileServer struct {
grpc.ServerStream
}

func (x *agentSendIPFSFileServer) SendAndClose(m *SendIPFSFileResp) error {
return x.ServerStream.SendMsg(m)
}

func (x *agentSendIPFSFileServer) Recv() (*FileDataPacket, error) {
m := new(FileDataPacket)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

func _Agent_GetIPFSFile_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(GetIPFSFileReq)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(AgentServer).GetIPFSFile(m, &agentGetIPFSFileServer{stream})
}

type Agent_GetIPFSFileServer interface {
Send(*FileDataPacket) error
grpc.ServerStream
}

type agentGetIPFSFileServer struct {
grpc.ServerStream
}

func (x *agentGetIPFSFileServer) Send(m *FileDataPacket) error {
return x.ServerStream.SendMsg(m)
}

// Agent_ServiceDesc is the grpc.ServiceDesc for Agent service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Agent_ServiceDesc = grpc.ServiceDesc{
ServiceName: "Agent",
HandlerType: (*AgentServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "SendIPFSFile",
Handler: _Agent_SendIPFSFile_Handler,
ClientStreams: true,
},
{
StreamName: "GetIPFSFile",
Handler: _Agent_GetIPFSFile_Handler,
ServerStreams: true,
},
},
Metadata: "pkgs/grpc/agent/agent.proto",
}

+ 131
- 0
pkgs/grpc/agent/client.go View File

@@ -0,0 +1,131 @@
package agent

import (
"context"
"fmt"
"io"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type Client struct {
con *grpc.ClientConn
cli AgentClient
}

func NewClient(addr string) (*Client, error) {
con, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}

return &Client{
con: con,
cli: NewAgentClient(con),
}, nil
}

func (c *Client) SendIPFSFile(file io.Reader) (string, error) {
sendCli, err := c.cli.SendIPFSFile(context.Background())
if err != nil {
return "", err
}

buf := make([]byte, 4096)
for {
rd, err := file.Read(buf)
if err == io.EOF {
err := sendCli.Send(&FileDataPacket{
Type: FileDataPacketType_EOF,
Data: buf[:rd],
})
if err != nil {
return "", fmt.Errorf("sending EOF packet: %w", err)
}

resp, err := sendCli.CloseAndRecv()
if err != nil {
return "", fmt.Errorf("receiving response: %w", err)
}

return resp.FileHash, nil
}

if err != nil {
return "", fmt.Errorf("reading file data: %w", err)
}

err = sendCli.Send(&FileDataPacket{
Type: FileDataPacketType_Data,
Data: buf[:rd],
})
if err != nil {
return "", fmt.Errorf("sending data packet: %w", err)
}
}
}

type fileReadCloser struct {
io.ReadCloser
stream Agent_GetIPFSFileClient
cancelFn context.CancelFunc
readingData []byte
recvEOF bool
}

func (s *fileReadCloser) Read(p []byte) (int, error) {
if len(s.readingData) == 0 && !s.recvEOF {
resp, err := s.stream.Recv()
if err != nil {
return 0, err
}

if resp.Type == FileDataPacketType_Data {
s.readingData = resp.Data

} else if resp.Type == FileDataPacketType_EOF {
s.readingData = resp.Data
s.recvEOF = true

} else {
return 0, fmt.Errorf("unsupported packt type: %v", resp.Type)
}
}

cnt := copy(p, s.readingData)
s.readingData = s.readingData[cnt:]

if len(s.readingData) == 0 && s.recvEOF {
return cnt, io.EOF
}

return cnt, nil
}

func (s *fileReadCloser) Close() error {
s.cancelFn()

return nil
}

func (c *Client) GetIPFSFile(fileHash string) (io.ReadCloser, error) {
ctx, cancel := context.WithCancel(context.Background())

stream, err := c.cli.GetIPFSFile(ctx, &GetIPFSFileReq{
FileHash: fileHash,
})
if err != nil {
cancel()
return nil, fmt.Errorf("request grpc failed, err: %w", err)
}

return &fileReadCloser{
stream: stream,
cancelFn: cancel,
}, nil
}

func (c *Client) Close() {
c.con.Close()
}

+ 43
- 0
pkgs/grpc/agent/pool.go View File

@@ -0,0 +1,43 @@
package agent

import (
"fmt"
)

type PoolConfig struct {
Port int `json:"port"`
}

type PoolClient struct {
*Client
owner *Pool
}

func (c *PoolClient) Close() {
c.owner.Release(c)
}

type Pool struct {
grpcCfg *PoolConfig
}

func NewPool(grpcCfg *PoolConfig) *Pool {
return &Pool{
grpcCfg: grpcCfg,
}
}
func (p *Pool) Acquire(ip string) (*PoolClient, error) {
cli, err := NewClient(fmt.Sprintf("%s:%d", ip, p.grpcCfg.Port))
if err != nil {
return nil, err
}

return &PoolClient{
Client: cli,
owner: p,
}, nil
}

func (p *Pool) Release(cli *PoolClient) {
cli.Close()
}

+ 12
- 0
pkgs/grpc/config.go View File

@@ -0,0 +1,12 @@
package grpc

import "fmt"

type Config struct {
IP string `json:"ip"`
Port int `json:"port"`
}

func (c *Config) MakeListenAddress() string {
return fmt.Sprintf("%s:%d", c.IP, c.Port)
}

+ 38
- 153
pkgs/iterator/ec_object_iterator.go View File

@@ -7,19 +7,12 @@ import (
"os" "os"


"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/models" "gitlink.org.cn/cloudream/storage-common/models"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/ec" "gitlink.org.cn/cloudream/storage-common/pkgs/ec"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

myio "gitlink.org.cn/cloudream/common/utils/io"
agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto"
mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc"
) )


type ECObjectIterator struct { type ECObjectIterator struct {
@@ -28,31 +21,36 @@ type ECObjectIterator struct {
currentIndex int currentIndex int
inited bool inited bool


coorCli *coormq.Client
distlock *distsvc.Service
ec model.Ec
ecPacketSize int64
downloadConfig DownloadConfig
cliLocation model.Location
ec model.Ec
downloadCtx *ECDownloadContext
cliLocation model.Location
}

type ECDownloadContext struct {
*DownloadContext
ECPacketSize int64
} }


func NewECObjectIterator(objects []model.Object, objectECData []models.ObjectECData, coorCli *coormq.Client, distlock *distsvc.Service, ec model.Ec, ecPacketSize int64, downloadConfig DownloadConfig) *ECObjectIterator {
func NewECObjectIterator(objects []model.Object, objectECData []models.ObjectECData, ec model.Ec, downloadCtx *ECDownloadContext) *ECObjectIterator {
return &ECObjectIterator{ return &ECObjectIterator{
objects: objects,
objectECData: objectECData,
coorCli: coorCli,
distlock: distlock,
ec: ec,
ecPacketSize: ecPacketSize,
downloadConfig: downloadConfig,
objects: objects,
objectECData: objectECData,
ec: ec,
downloadCtx: downloadCtx,
} }
} }


func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) { func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

if !i.inited { if !i.inited {
i.inited = true i.inited = true


findCliLocResp, err := i.coorCli.FindClientLocation(coormq.NewFindClientLocation(i.downloadConfig.ExternalIP))
findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP))
if err != nil { if err != nil {
return nil, fmt.Errorf("finding client location: %w", err) return nil, fmt.Errorf("finding client location: %w", err)
} }
@@ -63,12 +61,12 @@ func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) {
return nil, ErrNoMoreItem return nil, ErrNoMoreItem
} }


item, err := i.doMove()
item, err := i.doMove(coorCli)
i.currentIndex++ i.currentIndex++
return item, err return item, err
} }


func (iter *ECObjectIterator) doMove() (*IterDownloadingObject, error) {
func (iter *ECObjectIterator) doMove(coorCli *coormq.PoolClient) (*IterDownloadingObject, error) {
obj := iter.objects[iter.currentIndex] obj := iter.objects[iter.currentIndex]
ecData := iter.objectECData[iter.currentIndex] ecData := iter.objectECData[iter.currentIndex]


@@ -82,7 +80,7 @@ func (iter *ECObjectIterator) doMove() (*IterDownloadingObject, error) {
for i := 0; i < ecK; i++ { for i := 0; i < ecK; i++ {
hashs[i] = blocks[i].FileHash hashs[i] = blocks[i].FileHash


getNodesResp, err := iter.coorCli.GetNodes(coormq.NewGetNodes(blocks[i].NodeIDs))
getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(blocks[i].NodeIDs))
if err != nil { if err != nil {
return nil, fmt.Errorf("getting nodes: %w", err) return nil, fmt.Errorf("getting nodes: %w", err)
} }
@@ -140,25 +138,10 @@ func (i *ECObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) Downlo
return entries[rand.Intn(len(entries))] return entries[rand.Intn(len(entries))]
} }


func (i *ECObjectIterator) downloadObject(nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) {
if i.downloadConfig.LocalIPFS != nil {
logger.Infof("try to use local IPFS to download file")

reader, err := i.downloadFromLocalIPFS(fileHash)
if err == nil {
return reader, nil
}

logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error())
}

return i.downloadFromNode(nodeID, nodeIP, fileHash)
}

func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int, blockIDs []int, nodeIDs []int64, nodeIPs []string, hashs []string) (io.ReadCloser, error) { func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int, blockIDs []int, nodeIDs []int64, nodeIPs []string, hashs []string) (io.ReadCloser, error) {
// TODO zkx 先试用同步方式实现逻辑,做好错误处理。同时也方便下面直接使用uploadToNode和uploadToLocalIPFS来优化代码结构 // TODO zkx 先试用同步方式实现逻辑,做好错误处理。同时也方便下面直接使用uploadToNode和uploadToLocalIPFS来优化代码结构
//wg := sync.WaitGroup{} //wg := sync.WaitGroup{}
numPacket := (fileSize + int64(ecK)*iter.ecPacketSize - 1) / (int64(ecK) * iter.ecPacketSize)
numPacket := (fileSize + int64(ecK)*iter.downloadCtx.ECPacketSize - 1) / (int64(ecK) * iter.downloadCtx.ECPacketSize)
getBufs := make([]chan []byte, ecN) getBufs := make([]chan []byte, ecN)
decodeBufs := make([]chan []byte, ecK) decodeBufs := make([]chan []byte, ecK)
for i := 0; i < ecN; i++ { for i := 0; i < ecN; i++ {
@@ -167,8 +150,19 @@ func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int,
for i := 0; i < ecK; i++ { for i := 0; i < ecK; i++ {
decodeBufs[i] = make(chan []byte) decodeBufs[i] = make(chan []byte)
} }
for i := 0; i < len(blockIDs); i++ {
go iter.get(hashs[i], nodeIPs[i], getBufs[blockIDs[i]], numPacket)
for idx := 0; idx < len(blockIDs); idx++ {
i := idx
go func() {
// TODO 处理错误
file, _ := downloadFile(iter.downloadCtx.DownloadContext, nodeIDs[i], nodeIPs[i], hashs[i])

for p := int64(0); p < numPacket; p++ {
buf := make([]byte, iter.downloadCtx.ECPacketSize)
// TODO 处理错误
io.ReadFull(file, buf)
getBufs[blockIDs[i]] <- buf
}
}()
} }
print(numPacket) print(numPacket)
go decode(getBufs[:], decodeBufs[:], blockIDs, ecK, numPacket) go decode(getBufs[:], decodeBufs[:], blockIDs, ecK, numPacket)
@@ -189,74 +183,6 @@ func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int,
return r, nil return r, nil
} }


func (iter *ECObjectIterator) get(fileHash string, nodeIP string, getBuf chan []byte, numPacket int64) error {
downloadFromAgent := false
//使用本地IPFS获取
if iter.downloadConfig.LocalIPFS != nil {
logger.Infof("try to use local IPFS to download file")
//获取IPFS的reader
reader, err := iter.downloadFromLocalIPFS(fileHash)
if err != nil {
downloadFromAgent = true
fmt.Errorf("read ipfs block failed, err: %w", err)
}
defer reader.Close()
for i := 0; int64(i) < numPacket; i++ {
buf := make([]byte, iter.ecPacketSize)
_, err := io.ReadFull(reader, buf)
if err != nil {
downloadFromAgent = true
fmt.Errorf("read file falied, err:%w", err)
}
getBuf <- buf
}
if downloadFromAgent == false {
close(getBuf)
return nil
}
} else {
downloadFromAgent = true
}
//从agent获取
if downloadFromAgent == true {
/*// 二次获取锁
mutex, err := reqbuilder.NewBuilder().
// 用于从IPFS下载文件
IPFS().ReadOneRep(nodeID, fileHash).
MutexLock(svc.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
*/
// 连接grpc
grpcAddr := fmt.Sprintf("%s:%d", nodeIP, iter.downloadConfig.GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}
// 下载文件
client := agentcaller.NewFileTransportClient(conn)
reader, err := mygrpc.GetFileAsStream(client, fileHash)
if err != nil {
conn.Close()
return fmt.Errorf("request to get file failed, err: %w", err)
}
for index := 0; int64(index) < numPacket; index++ {
buf := make([]byte, iter.ecPacketSize)
_, _ = reader.Read(buf)
fmt.Println(buf)
fmt.Println(numPacket, "\n")
getBuf <- buf
}
close(getBuf)
reader.Close()
return nil
}
return nil

}

func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) { func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) {
fmt.Println("decode ") fmt.Println("decode ")
var tmpIn [][]byte var tmpIn [][]byte
@@ -296,44 +222,3 @@ func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int
close(outBufs[i]) close(outBufs[i])
} }
} }

func (i *ECObjectIterator) downloadFromNode(nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) {
// 二次获取锁
mutex, err := reqbuilder.NewBuilder().
// 用于从IPFS下载文件
IPFS().ReadOneRep(nodeID, fileHash).
MutexLock(i.distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}

// 连接grpc
grpcAddr := fmt.Sprintf("%s:%d", nodeIP, i.downloadConfig.GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
}

// 下载文件
client := agentcaller.NewFileTransportClient(conn)
reader, err := mygrpc.GetFileAsStream(client, fileHash)
if err != nil {
conn.Close()
return nil, fmt.Errorf("request to get file failed, err: %w", err)
}

reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
conn.Close()
mutex.Unlock()
})
return reader, nil
}

func (i *ECObjectIterator) downloadFromLocalIPFS(fileHash string) (io.ReadCloser, error) {
reader, err := i.downloadConfig.LocalIPFS.OpenRead(fileHash)
if err != nil {
return nil, fmt.Errorf("read ipfs file failed, err: %w", err)
}

return reader, nil
}

+ 38
- 46
pkgs/iterator/rep_object_iterator.go View File

@@ -9,17 +9,11 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/ipfs"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/models" "gitlink.org.cn/cloudream/storage-common/models"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

myio "gitlink.org.cn/cloudream/common/utils/io"
agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto"
mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc"
) )


type DownloadingObjectIterator = Iterator[*IterDownloadingObject] type DownloadingObjectIterator = Iterator[*IterDownloadingObject]
@@ -30,10 +24,8 @@ type RepObjectIterator struct {
currentIndex int currentIndex int
inited bool inited bool


coorCli *coormq.Client
distlock *distsvc.Service
downloadConfig DownloadConfig
cliLocation model.Location
downloadCtx *DownloadContext
cliLocation model.Location
} }


type IterDownloadingObject struct { type IterDownloadingObject struct {
@@ -46,29 +38,29 @@ type DownloadNodeInfo struct {
IsSameLocation bool IsSameLocation bool
} }


type DownloadConfig struct {
LocalIPFS *ipfs.IPFS
LocalNodeID *int64
ExternalIP string
GRPCPort int
MQ *mymq.Config
type DownloadContext struct {
Distlock *distsvc.Service
} }


func NewRepObjectIterator(objects []model.Object, objectRepData []models.ObjectRepData, coorCli *coormq.Client, distlock *distsvc.Service, downloadConfig DownloadConfig) *RepObjectIterator {
func NewRepObjectIterator(objects []model.Object, objectRepData []models.ObjectRepData, downloadCtx *DownloadContext) *RepObjectIterator {
return &RepObjectIterator{ return &RepObjectIterator{
objects: objects,
objectRepData: objectRepData,
coorCli: coorCli,
distlock: distlock,
downloadConfig: downloadConfig,
objects: objects,
objectRepData: objectRepData,
downloadCtx: downloadCtx,
} }
} }


func (i *RepObjectIterator) MoveNext() (*IterDownloadingObject, error) { func (i *RepObjectIterator) MoveNext() (*IterDownloadingObject, error) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

if !i.inited { if !i.inited {
i.inited = true i.inited = true


findCliLocResp, err := i.coorCli.FindClientLocation(coormq.NewFindClientLocation(i.downloadConfig.ExternalIP))
findCliLocResp, err := coorCli.FindClientLocation(coormq.NewFindClientLocation(globals.Local.ExternalIP))
if err != nil { if err != nil {
return nil, fmt.Errorf("finding client location: %w", err) return nil, fmt.Errorf("finding client location: %w", err)
} }
@@ -79,18 +71,18 @@ func (i *RepObjectIterator) MoveNext() (*IterDownloadingObject, error) {
return nil, ErrNoMoreItem return nil, ErrNoMoreItem
} }


item, err := i.doMove()
item, err := i.doMove(coorCli)
i.currentIndex++ i.currentIndex++
return item, err return item, err
} }


func (i *RepObjectIterator) doMove() (*IterDownloadingObject, error) {
func (i *RepObjectIterator) doMove(coorCli *coormq.PoolClient) (*IterDownloadingObject, error) {
repData := i.objectRepData[i.currentIndex] repData := i.objectRepData[i.currentIndex]
if len(repData.NodeIDs) == 0 { if len(repData.NodeIDs) == 0 {
return nil, fmt.Errorf("no node has this file %s", repData.FileHash) return nil, fmt.Errorf("no node has this file %s", repData.FileHash)
} }


getNodesResp, err := i.coorCli.GetNodes(coormq.NewGetNodes(repData.NodeIDs))
getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(repData.NodeIDs))
if err != nil { if err != nil {
return nil, fmt.Errorf("getting nodes: %w", err) return nil, fmt.Errorf("getting nodes: %w", err)
} }
@@ -113,7 +105,7 @@ func (i *RepObjectIterator) doMove() (*IterDownloadingObject, error) {
logger.Infof("client and node %d are at the same location, use local ip\n", downloadNode.Node.NodeID) logger.Infof("client and node %d are at the same location, use local ip\n", downloadNode.Node.NodeID)
} }


reader, err := i.downloadObject(downloadNode.Node.NodeID, nodeIP, repData.FileHash)
reader, err := downloadFile(i.downloadCtx, downloadNode.Node.NodeID, nodeIP, repData.FileHash)
if err != nil { if err != nil {
return nil, fmt.Errorf("rep read failed, err: %w", err) return nil, fmt.Errorf("rep read failed, err: %w", err)
} }
@@ -139,11 +131,11 @@ func (i *RepObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) Downl
return entries[rand.Intn(len(entries))] return entries[rand.Intn(len(entries))]
} }


func (i *RepObjectIterator) downloadObject(nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) {
if i.downloadConfig.LocalIPFS != nil {
func downloadFile(ctx *DownloadContext, nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) {
if globals.IPFSPool != nil {
logger.Infof("try to use local IPFS to download file") logger.Infof("try to use local IPFS to download file")


reader, err := i.downloadFromLocalIPFS(fileHash)
reader, err := downloadFromLocalIPFS(fileHash)
if err == nil { if err == nil {
return reader, nil return reader, nil
} }
@@ -151,43 +143,43 @@ func (i *RepObjectIterator) downloadObject(nodeID int64, nodeIP string, fileHash
logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error()) logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error())
} }


return i.downloadFromNode(nodeID, nodeIP, fileHash)
return downloadFromNode(ctx, nodeID, nodeIP, fileHash)
} }


func (i *RepObjectIterator) downloadFromNode(nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) {
func downloadFromNode(ctx *DownloadContext, nodeID int64, nodeIP string, fileHash string) (io.ReadCloser, error) {
// 二次获取锁 // 二次获取锁
mutex, err := reqbuilder.NewBuilder(). mutex, err := reqbuilder.NewBuilder().
// 用于从IPFS下载文件 // 用于从IPFS下载文件
IPFS().ReadOneRep(nodeID, fileHash). IPFS().ReadOneRep(nodeID, fileHash).
MutexLock(i.distlock)
MutexLock(ctx.Distlock)
if err != nil { if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err) return nil, fmt.Errorf("acquire locks failed, err: %w", err)
} }


// 连接grpc // 连接grpc
grpcAddr := fmt.Sprintf("%s:%d", nodeIP, i.downloadConfig.GRPCPort)
conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
agtCli, err := globals.AgentRPCPool.Acquire(nodeIP)
if err != nil { if err != nil {
return nil, fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err)
return nil, fmt.Errorf("new agent grpc client: %w", err)
} }


// 下载文件
client := agentcaller.NewFileTransportClient(conn)
reader, err := mygrpc.GetFileAsStream(client, fileHash)
reader, err := agtCli.GetIPFSFile(fileHash)
if err != nil { if err != nil {
conn.Close()
return nil, fmt.Errorf("request to get file failed, err: %w", err)
return nil, fmt.Errorf("getting ipfs file: %w", err)
} }


reader = myio.AfterReadClosed(reader, func(io.ReadCloser) { reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
conn.Close()
mutex.Unlock() mutex.Unlock()
}) })
return reader, nil return reader, nil
} }


func (i *RepObjectIterator) downloadFromLocalIPFS(fileHash string) (io.ReadCloser, error) {
reader, err := i.downloadConfig.LocalIPFS.OpenRead(fileHash)
func downloadFromLocalIPFS(fileHash string) (io.ReadCloser, error) {
ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new ipfs client: %w", err)
}

reader, err := ipfsCli.OpenRead(fileHash)
if err != nil { if err != nil {
return nil, fmt.Errorf("read ipfs file failed, err: %w", err) return nil, fmt.Errorf("read ipfs file failed, err: %w", err)
} }


+ 37
- 3
pkgs/mq/agent/client.go View File

@@ -2,7 +2,7 @@ package agent


import ( import (
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
) )


type Client struct { type Client struct {
@@ -10,8 +10,8 @@ type Client struct {
id int64 id int64
} }


func NewClient(id int64, cfg *mymq.Config) (*Client, error) {
rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), mymq.MakeAgentQueueName(id), "")
func NewClient(id int64, cfg *stgmq.Config) (*Client, error) {
rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), stgmq.MakeAgentQueueName(id), "")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -25,3 +25,37 @@ func NewClient(id int64, cfg *mymq.Config) (*Client, error) {
func (c *Client) Close() { func (c *Client) Close() {
c.rabbitCli.Close() c.rabbitCli.Close()
} }

type PoolClient struct {
*Client
owner *Pool
}

func (c *PoolClient) Close() {
c.owner.Release(c)
}

type Pool struct {
mqcfg *stgmq.Config
}

func NewPool(mqcfg *stgmq.Config) *Pool {
return &Pool{
mqcfg: mqcfg,
}
}
func (p *Pool) Acquire(id int64) (*PoolClient, error) {
cli, err := NewClient(id, p.mqcfg)
if err != nil {
return nil, err
}

return &PoolClient{
Client: cli,
owner: p,
}, nil
}

func (p *Pool) Release(cli *PoolClient) {
cli.Client.Close()
}

+ 37
- 3
pkgs/mq/coordinator/client.go View File

@@ -2,15 +2,15 @@ package coordinator


import ( import (
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
) )


type Client struct { type Client struct {
rabbitCli *mq.RabbitMQClient rabbitCli *mq.RabbitMQClient
} }


func NewClient(cfg *mymq.Config) (*Client, error) {
rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), mymq.COORDINATOR_QUEUE_NAME, "")
func NewClient(cfg *stgmq.Config) (*Client, error) {
rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), stgmq.COORDINATOR_QUEUE_NAME, "")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -23,3 +23,37 @@ func NewClient(cfg *mymq.Config) (*Client, error) {
func (c *Client) Close() { func (c *Client) Close() {
c.rabbitCli.Close() c.rabbitCli.Close()
} }

type PoolClient struct {
*Client
owner *Pool
}

func (c *PoolClient) Close() {
c.owner.Release(c)
}

type Pool struct {
mqcfg *stgmq.Config
}

func NewPool(mqcfg *stgmq.Config) *Pool {
return &Pool{
mqcfg: mqcfg,
}
}
func (p *Pool) Acquire() (*PoolClient, error) {
cli, err := NewClient(p.mqcfg)
if err != nil {
return nil, err
}

return &PoolClient{
Client: cli,
owner: p,
}, nil
}

func (p *Pool) Release(cli *PoolClient) {
cli.Client.Close()
}

+ 37
- 3
pkgs/mq/scanner/client.go View File

@@ -2,15 +2,15 @@ package scanner


import ( import (
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
) )


type Client struct { type Client struct {
rabbitCli *mq.RabbitMQClient rabbitCli *mq.RabbitMQClient
} }


func NewClient(cfg *mymq.Config) (*Client, error) {
rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), mymq.SCANNER_QUEUE_NAME, "")
func NewClient(cfg *stgmq.Config) (*Client, error) {
rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), stgmq.SCANNER_QUEUE_NAME, "")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -23,3 +23,37 @@ func NewClient(cfg *mymq.Config) (*Client, error) {
func (c *Client) Close() { func (c *Client) Close() {
c.rabbitCli.Close() c.rabbitCli.Close()
} }

type PoolClient struct {
*Client
owner *Pool
}

func (c *PoolClient) Close() {
c.owner.Release(c)
}

type Pool struct {
mqcfg *stgmq.Config
}

func NewPool(mqcfg *stgmq.Config) *Pool {
return &Pool{
mqcfg: mqcfg,
}
}
func (p *Pool) Acquire() (*PoolClient, error) {
cli, err := NewClient(p.mqcfg)
if err != nil {
return nil, err
}

return &PoolClient{
Client: cli,
owner: p,
}, nil
}

func (p *Pool) Release(cli *PoolClient) {
cli.Client.Close()
}

+ 0
- 2
pkgs/proto/Makefile View File

@@ -1,2 +0,0 @@
protoc:
protoc --go_out=. --go-grpc_out=. .\file_transport.proto

+ 0
- 343
pkgs/proto/file_transport.pb.go View File

@@ -1,343 +0,0 @@
// 使用的语法版本

// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.30.0
// protoc v4.22.3
// source: file_transport.proto

package proto

import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)

const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)

type FileDataPacketType int32

const (
FileDataPacketType_Data FileDataPacketType = 0
FileDataPacketType_EOF FileDataPacketType = 1
)

// Enum value maps for FileDataPacketType.
var (
FileDataPacketType_name = map[int32]string{
0: "Data",
1: "EOF",
}
FileDataPacketType_value = map[string]int32{
"Data": 0,
"EOF": 1,
}
)

func (x FileDataPacketType) Enum() *FileDataPacketType {
p := new(FileDataPacketType)
*p = x
return p
}

func (x FileDataPacketType) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}

func (FileDataPacketType) Descriptor() protoreflect.EnumDescriptor {
return file_file_transport_proto_enumTypes[0].Descriptor()
}

func (FileDataPacketType) Type() protoreflect.EnumType {
return &file_file_transport_proto_enumTypes[0]
}

func (x FileDataPacketType) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}

// Deprecated: Use FileDataPacketType.Descriptor instead.
func (FileDataPacketType) EnumDescriptor() ([]byte, []int) {
return file_file_transport_proto_rawDescGZIP(), []int{0}
}

// 文件数据。注意:只在Type为Data的时候,Data字段才能有数据
type FileDataPacket struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

Type FileDataPacketType `protobuf:"varint,1,opt,name=Type,proto3,enum=FileDataPacketType" json:"Type,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"`
}

func (x *FileDataPacket) Reset() {
*x = FileDataPacket{}
if protoimpl.UnsafeEnabled {
mi := &file_file_transport_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *FileDataPacket) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*FileDataPacket) ProtoMessage() {}

func (x *FileDataPacket) ProtoReflect() protoreflect.Message {
mi := &file_file_transport_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use FileDataPacket.ProtoReflect.Descriptor instead.
func (*FileDataPacket) Descriptor() ([]byte, []int) {
return file_file_transport_proto_rawDescGZIP(), []int{0}
}

func (x *FileDataPacket) GetType() FileDataPacketType {
if x != nil {
return x.Type
}
return FileDataPacketType_Data
}

func (x *FileDataPacket) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}

type SendResp struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

FileHash string `protobuf:"bytes,1,opt,name=FileHash,proto3" json:"FileHash,omitempty"`
}

func (x *SendResp) Reset() {
*x = SendResp{}
if protoimpl.UnsafeEnabled {
mi := &file_file_transport_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *SendResp) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*SendResp) ProtoMessage() {}

func (x *SendResp) ProtoReflect() protoreflect.Message {
mi := &file_file_transport_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use SendResp.ProtoReflect.Descriptor instead.
func (*SendResp) Descriptor() ([]byte, []int) {
return file_file_transport_proto_rawDescGZIP(), []int{1}
}

func (x *SendResp) GetFileHash() string {
if x != nil {
return x.FileHash
}
return ""
}

type GetReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

FileHash string `protobuf:"bytes,1,opt,name=FileHash,proto3" json:"FileHash,omitempty"`
}

func (x *GetReq) Reset() {
*x = GetReq{}
if protoimpl.UnsafeEnabled {
mi := &file_file_transport_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *GetReq) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*GetReq) ProtoMessage() {}

func (x *GetReq) ProtoReflect() protoreflect.Message {
mi := &file_file_transport_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use GetReq.ProtoReflect.Descriptor instead.
func (*GetReq) Descriptor() ([]byte, []int) {
return file_file_transport_proto_rawDescGZIP(), []int{2}
}

func (x *GetReq) GetFileHash() string {
if x != nil {
return x.FileHash
}
return ""
}

var File_file_transport_proto protoreflect.FileDescriptor

var file_file_transport_proto_rawDesc = []byte{
0x0a, 0x14, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4d, 0x0a, 0x0e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61,
0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x27, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x13, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74,
0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x54, 0x79, 0x70,
0x65, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x26, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x52, 0x65, 0x73,
0x70, 0x12, 0x1a, 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x22, 0x24, 0x0a,
0x06, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x12, 0x1a, 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48,
0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48,
0x61, 0x73, 0x68, 0x2a, 0x27, 0x0a, 0x12, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50,
0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x61, 0x74,
0x61, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, 0x46, 0x10, 0x01, 0x32, 0x64, 0x0a, 0x0d,
0x46, 0x69, 0x6c, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x2a, 0x0a,
0x08, 0x53, 0x65, 0x6e, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65,
0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x09, 0x2e, 0x53, 0x65, 0x6e,
0x64, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x27, 0x0a, 0x07, 0x47, 0x65, 0x74,
0x46, 0x69, 0x6c, 0x65, 0x12, 0x07, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e,
0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00,
0x30, 0x01, 0x42, 0x10, 0x5a, 0x0e, 0x2e, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x3b, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}

var (
file_file_transport_proto_rawDescOnce sync.Once
file_file_transport_proto_rawDescData = file_file_transport_proto_rawDesc
)

func file_file_transport_proto_rawDescGZIP() []byte {
file_file_transport_proto_rawDescOnce.Do(func() {
file_file_transport_proto_rawDescData = protoimpl.X.CompressGZIP(file_file_transport_proto_rawDescData)
})
return file_file_transport_proto_rawDescData
}

var file_file_transport_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_file_transport_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_file_transport_proto_goTypes = []interface{}{
(FileDataPacketType)(0), // 0: FileDataPacketType
(*FileDataPacket)(nil), // 1: FileDataPacket
(*SendResp)(nil), // 2: SendResp
(*GetReq)(nil), // 3: GetReq
}
var file_file_transport_proto_depIdxs = []int32{
0, // 0: FileDataPacket.Type:type_name -> FileDataPacketType
1, // 1: FileTransport.SendFile:input_type -> FileDataPacket
3, // 2: FileTransport.GetFile:input_type -> GetReq
2, // 3: FileTransport.SendFile:output_type -> SendResp
1, // 4: FileTransport.GetFile:output_type -> FileDataPacket
3, // [3:5] is the sub-list for method output_type
1, // [1:3] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}

func init() { file_file_transport_proto_init() }
func file_file_transport_proto_init() {
if File_file_transport_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_file_transport_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FileDataPacket); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_file_transport_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SendResp); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_file_transport_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*GetReq); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_file_transport_proto_rawDesc,
NumEnums: 1,
NumMessages: 3,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_file_transport_proto_goTypes,
DependencyIndexes: file_file_transport_proto_depIdxs,
EnumInfos: file_file_transport_proto_enumTypes,
MessageInfos: file_file_transport_proto_msgTypes,
}.Build()
File_file_transport_proto = out.File
file_file_transport_proto_rawDesc = nil
file_file_transport_proto_goTypes = nil
file_file_transport_proto_depIdxs = nil
}

+ 0
- 209
pkgs/proto/file_transport_grpc.pb.go View File

@@ -1,209 +0,0 @@
// 使用的语法版本

// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.22.3
// source: file_transport.proto

package proto

import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7

const (
FileTransport_SendFile_FullMethodName = "/FileTransport/SendFile"
FileTransport_GetFile_FullMethodName = "/FileTransport/GetFile"
)

// FileTransportClient is the client API for FileTransport service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type FileTransportClient interface {
SendFile(ctx context.Context, opts ...grpc.CallOption) (FileTransport_SendFileClient, error)
GetFile(ctx context.Context, in *GetReq, opts ...grpc.CallOption) (FileTransport_GetFileClient, error)
}

type fileTransportClient struct {
cc grpc.ClientConnInterface
}

func NewFileTransportClient(cc grpc.ClientConnInterface) FileTransportClient {
return &fileTransportClient{cc}
}

func (c *fileTransportClient) SendFile(ctx context.Context, opts ...grpc.CallOption) (FileTransport_SendFileClient, error) {
stream, err := c.cc.NewStream(ctx, &FileTransport_ServiceDesc.Streams[0], FileTransport_SendFile_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &fileTransportSendFileClient{stream}
return x, nil
}

type FileTransport_SendFileClient interface {
Send(*FileDataPacket) error
CloseAndRecv() (*SendResp, error)
grpc.ClientStream
}

type fileTransportSendFileClient struct {
grpc.ClientStream
}

func (x *fileTransportSendFileClient) Send(m *FileDataPacket) error {
return x.ClientStream.SendMsg(m)
}

func (x *fileTransportSendFileClient) CloseAndRecv() (*SendResp, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(SendResp)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

func (c *fileTransportClient) GetFile(ctx context.Context, in *GetReq, opts ...grpc.CallOption) (FileTransport_GetFileClient, error) {
stream, err := c.cc.NewStream(ctx, &FileTransport_ServiceDesc.Streams[1], FileTransport_GetFile_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &fileTransportGetFileClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}

type FileTransport_GetFileClient interface {
Recv() (*FileDataPacket, error)
grpc.ClientStream
}

type fileTransportGetFileClient struct {
grpc.ClientStream
}

func (x *fileTransportGetFileClient) Recv() (*FileDataPacket, error) {
m := new(FileDataPacket)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

// FileTransportServer is the server API for FileTransport service.
// All implementations must embed UnimplementedFileTransportServer
// for forward compatibility
type FileTransportServer interface {
SendFile(FileTransport_SendFileServer) error
GetFile(*GetReq, FileTransport_GetFileServer) error
mustEmbedUnimplementedFileTransportServer()
}

// UnimplementedFileTransportServer must be embedded to have forward compatible implementations.
type UnimplementedFileTransportServer struct {
}

func (UnimplementedFileTransportServer) SendFile(FileTransport_SendFileServer) error {
return status.Errorf(codes.Unimplemented, "method SendFile not implemented")
}
func (UnimplementedFileTransportServer) GetFile(*GetReq, FileTransport_GetFileServer) error {
return status.Errorf(codes.Unimplemented, "method GetFile not implemented")
}
func (UnimplementedFileTransportServer) mustEmbedUnimplementedFileTransportServer() {}

// UnsafeFileTransportServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to FileTransportServer will
// result in compilation errors.
type UnsafeFileTransportServer interface {
mustEmbedUnimplementedFileTransportServer()
}

func RegisterFileTransportServer(s grpc.ServiceRegistrar, srv FileTransportServer) {
s.RegisterService(&FileTransport_ServiceDesc, srv)
}

func _FileTransport_SendFile_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(FileTransportServer).SendFile(&fileTransportSendFileServer{stream})
}

type FileTransport_SendFileServer interface {
SendAndClose(*SendResp) error
Recv() (*FileDataPacket, error)
grpc.ServerStream
}

type fileTransportSendFileServer struct {
grpc.ServerStream
}

func (x *fileTransportSendFileServer) SendAndClose(m *SendResp) error {
return x.ServerStream.SendMsg(m)
}

func (x *fileTransportSendFileServer) Recv() (*FileDataPacket, error) {
m := new(FileDataPacket)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

func _FileTransport_GetFile_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(GetReq)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(FileTransportServer).GetFile(m, &fileTransportGetFileServer{stream})
}

type FileTransport_GetFileServer interface {
Send(*FileDataPacket) error
grpc.ServerStream
}

type fileTransportGetFileServer struct {
grpc.ServerStream
}

func (x *fileTransportGetFileServer) Send(m *FileDataPacket) error {
return x.ServerStream.SendMsg(m)
}

// FileTransport_ServiceDesc is the grpc.ServiceDesc for FileTransport service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var FileTransport_ServiceDesc = grpc.ServiceDesc{
ServiceName: "FileTransport",
HandlerType: (*FileTransportServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "SendFile",
Handler: _FileTransport_SendFile_Handler,
ClientStreams: true,
},
{
StreamName: "GetFile",
Handler: _FileTransport_GetFile_Handler,
ServerStreams: true,
},
},
Metadata: "file_transport.proto",
}

+ 0
- 120
utils/grpc/file_transport.go View File

@@ -1,120 +0,0 @@
package grpc

import (
"context"
"fmt"
"io"

myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/storage-common/pkgs/proto"
)

type fileReadCloser struct {
io.ReadCloser
stream proto.FileTransport_GetFileClient
cancelFn context.CancelFunc
readData []byte
}

func (s *fileReadCloser) Read(p []byte) (int, error) {

if s.readData == nil {
resp, err := s.stream.Recv()
if err != nil {
return 0, err
}

if resp.Type == proto.FileDataPacketType_Data {
s.readData = resp.Data

} else if resp.Type == proto.FileDataPacketType_EOF {
return 0, io.EOF

} else {
return 0, fmt.Errorf("unsuppoted packt type: %v", resp.Type)
}
}

cnt := copy(p, s.readData)

if len(s.readData) == cnt {
s.readData = nil
} else {
s.readData = s.readData[cnt:]
}

return cnt, nil
}

func (s *fileReadCloser) Close() error {
s.cancelFn()

return nil
}

func GetFileAsStream(client proto.FileTransportClient, fileHash string) (io.ReadCloser, error) {
ctx, cancel := context.WithCancel(context.Background())

stream, err := client.GetFile(ctx, &proto.GetReq{
FileHash: fileHash,
})
if err != nil {
cancel()
return nil, fmt.Errorf("request grpc failed, err: %w", err)
}

return &fileReadCloser{
stream: stream,
cancelFn: cancel,
}, nil
}

type fileWriteCloser struct {
myio.PromiseWriteCloser[string]
stream proto.FileTransport_SendFileClient
}

func (s *fileWriteCloser) Write(p []byte) (int, error) {
err := s.stream.Send(&proto.FileDataPacket{
Type: proto.FileDataPacketType_Data,
Data: p,
})

if err != nil {
return 0, err
}

return len(p), nil
}

func (s *fileWriteCloser) Abort(err error) {
s.stream.CloseSend()
}

func (s *fileWriteCloser) Finish() (string, error) {
err := s.stream.Send(&proto.FileDataPacket{
Type: proto.FileDataPacketType_EOF,
})

if err != nil {
return "", fmt.Errorf("send EOF packet failed, err: %w", err)
}

resp, err := s.stream.CloseAndRecv()
if err != nil {
return "", fmt.Errorf("receive response failed, err: %w", err)
}

return resp.FileHash, nil
}

func SendFileAsStream(client proto.FileTransportClient) (myio.PromiseWriteCloser[string], error) {
stream, err := client.SendFile(context.Background())
if err != nil {
return nil, err
}

return &fileWriteCloser{
stream: stream,
}, nil
}

Loading…
Cancel
Save