diff --git a/agent/internal/config/config.go b/agent/internal/config/config.go index f699534..f8c04a9 100644 --- a/agent/internal/config/config.go +++ b/agent/internal/config/config.go @@ -14,7 +14,6 @@ type Config struct { ID int64 `json:"id"` Local stgmodels.LocalMachineInfo `json:"local"` GRPC *grpc.Config `json:"grpc"` - ECPacketSize int64 `json:"ecPacketSize"` TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒 Logger log.Config `json:"logger"` RabbitMQ stgmq.Config `json:"rabbitMQ"` diff --git a/agent/internal/services/grpc/service.go b/agent/internal/services/grpc/service.go index bff1bb8..2e61eb3 100644 --- a/agent/internal/services/grpc/service.go +++ b/agent/internal/services/grpc/service.go @@ -80,6 +80,7 @@ func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) erro return fmt.Errorf("send response failed, err: %w", err) } + log.Debugf("%d bytes received ", recvSize) return nil } } diff --git a/client/internal/cmdline/package.go b/client/internal/cmdline/package.go index 46aa0f6..a30d3e3 100644 --- a/client/internal/cmdline/package.go +++ b/client/internal/cmdline/package.go @@ -9,7 +9,6 @@ import ( "github.com/jedib0t/go-pretty/v6/table" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/client/internal/config" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -186,7 +185,9 @@ func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath strin } } -func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string, nodeAffinity []int64) error { +func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string, chunkSize int64, nodeAffinity []int64) error { + rootPath = filepath.Clean(rootPath) + var uploadFilePathes []string err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { if err != nil { @@ -209,14 +210,14 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, } objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, cdssdk.NewECRedundancyInfo(ecName, config.Cfg().ECPacketSize), nodeAff) + taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, cdssdk.NewECRedundancyInfo(ecName, chunkSize), nodeAff) if err != nil { return fmt.Errorf("upload file data failed, err: %w", err) } for { - complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5) + complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingECPackage(taskID, time.Second*5) if complete { if err != nil { return fmt.Errorf("uploading ec package: %w", err) @@ -224,12 +225,11 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, tb := table.NewWriter() - tb.AppendHeader(table.Row{"Path", "ObjectID", "FileHash"}) + tb.AppendHeader(table.Row{"Path", "ObjectID"}) for i := 0; i < len(uploadObjectResult.ObjectResults); i++ { tb.AppendRow(table.Row{ uploadObjectResult.ObjectResults[i].Info.Path, uploadObjectResult.ObjectResults[i].ObjectID, - uploadObjectResult.ObjectResults[i].FileHash, }) } fmt.Print(tb.Render()) @@ -319,9 +319,9 @@ func init() { commands.MustAdd(PackageUpdateRepPackage, "pkg", "update", "rep") - commands.MustAdd(PackageUploadRepPackage, "pkg", "new", "ec") + commands.MustAdd(PackageUploadECPackage, "pkg", "new", "ec") - commands.MustAdd(PackageUpdateRepPackage, "pkg", "update", "ec") + commands.MustAdd(PackageUpdateECPackage, "pkg", "update", "ec") commands.MustAdd(PackageDeletePackage, "pkg", "delete") diff --git a/client/internal/config/config.go b/client/internal/config/config.go index b8aa80d..107f7c1 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -11,14 +11,13 @@ import ( ) type Config struct { - Local stgmodels.LocalMachineInfo `json:"local"` - AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` - ECPacketSize int64 `json:"ecPacketSize"` - MaxRepCount int `json:"maxRepCount"` - Logger logger.Config `json:"logger"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` - IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon - DistLock distlock.Config `json:"distlock"` + Local stgmodels.LocalMachineInfo `json:"local"` + AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` + MaxRepCount int `json:"maxRepCount"` + Logger logger.Config `json:"logger"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` + IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon + DistLock distlock.Config `json:"distlock"` } var cfg Config diff --git a/client/internal/services/package.go b/client/internal/services/package.go index b02a771..2677252 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -172,10 +172,10 @@ func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, return tsk.ID(), nil } -func (svc *PackageService) WaitCreatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateRepPackageResult, error) { +func (svc *PackageService) WaitCreatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateECPackageResult, error) { tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - cteatePkgTask := tsk.Body().(*mytask.CreateRepPackage) + cteatePkgTask := tsk.Body().(*mytask.CreateECPackage) return true, cteatePkgTask.Result, tsk.Error() } return false, nil, nil diff --git a/common/assets/confs/agent.config.json b/common/assets/confs/agent.config.json index 7e1db00..b340adf 100644 --- a/common/assets/confs/agent.config.json +++ b/common/assets/confs/agent.config.json @@ -9,7 +9,6 @@ "ip": "127.0.0.1", "port": 5010 }, - "ecPacketSize": 10, "tempFileLifetime": 3600, "logger": { "output": "file", diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index cdc5dd2..81a9ea6 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -6,7 +6,6 @@ "agentGRPC": { "port": 5010 }, - "ecPacketSize": 10, "maxRepCount": 10, "logger": { "output": "stdout", diff --git a/common/assets/scripts/create_database.sql b/common/assets/scripts/create_database.sql index b2be415..2fdcdfe 100644 --- a/common/assets/scripts/create_database.sql +++ b/common/assets/scripts/create_database.sql @@ -17,16 +17,33 @@ create table Node ( ) comment = '节点表'; insert into - Node (NodeID, Name, LocalIP, ExternalIP, LocalGRPCPort, ExternalGRPCPort, LocationID, State) + Node ( + NodeID, + Name, + LocalIP, + ExternalIP, + LocalGRPCPort, + ExternalGRPCPort, + LocationID, + State + ) values - (1, "localhost", "localhost", "localhost", 5010, 5010, 1, "alive") -create table Storage ( - StorageID int not null auto_increment primary key comment '存储服务ID', - Name varchar(100) not null comment '存储服务名称', - NodeID int not null comment '存储服务所在节点的ID', - Directory varchar(4096) not null comment '存储服务所在节点的目录', - State varchar(100) comment '状态' -) comment = "存储服务表"; + ( + 1, + "localhost", + "localhost", + "localhost", + 5010, + 5010, + 1, + "alive" + ) create table Storage ( + StorageID int not null auto_increment primary key comment '存储服务ID', + Name varchar(100) not null comment '存储服务名称', + NodeID int not null comment '存储服务所在节点的ID', + Directory varchar(4096) not null comment '存储服务所在节点的目录', + State varchar(100) comment '状态' + ) comment = "存储服务表"; insert into Storage (StorageID, Name, NodeID, Directory, State) @@ -145,7 +162,7 @@ values (1, "Local"); create table Ec ( - EcID int not null comment '纠删码ID', + EcID int not null primary key comment '纠删码ID', Name varchar(128) not null comment '纠删码名称', EcK int not null comment 'ecK', EcN int not null comment 'ecN' diff --git a/common/models/models.go b/common/models/models.go index bb945a8..6296953 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -4,32 +4,6 @@ import "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" /// TODO 将分散在各处的公共结构体定义集中到这里来 -type RedundancyData interface{} -type RedundancyDataConst interface { - RepRedundancyData | ECRedundancyData | RedundancyData -} -type RepRedundancyData struct { - FileHash string `json:"fileHash"` -} - -func NewRedundancyRepData(fileHash string) RepRedundancyData { - return RepRedundancyData{ - FileHash: fileHash, - } -} - -type ECRedundancyData struct { - Ec EC `json:"ec"` - Blocks []ObjectBlockData `json:"blocks"` -} - -func NewRedundancyEcData(ec EC, blocks []ObjectBlockData) ECRedundancyData { - return ECRedundancyData{ - Ec: ec, - Blocks: blocks, - } -} - type EC struct { ID int64 `json:"id"` K int `json:"k"` @@ -37,6 +11,15 @@ type EC struct { ChunkSize int64 `json:"chunkSize"` } +func NewEc(id int64, k int, n int, chunkSize int64) EC { + return EC{ + ID: id, + K: k, + N: n, + ChunkSize: chunkSize, + } +} + type ObjectBlockData struct { Index int `json:"index"` FileHash string `json:"fileHash"` @@ -51,15 +34,6 @@ func NewObjectBlockData(index int, fileHash string, nodeIDs []int64) ObjectBlock } } -func NewEc(id int64, k int, n int, chunkSize int64) EC { - return EC{ - ID: id, - K: k, - N: n, - ChunkSize: chunkSize, - } -} - type ObjectRepData struct { Object model.Object `json:"object"` FileHash string `json:"fileHash"` diff --git a/common/pkgs/cmd/create_ec_package.go b/common/pkgs/cmd/create_ec_package.go index 1e7743d..fd77010 100644 --- a/common/pkgs/cmd/create_ec_package.go +++ b/common/pkgs/cmd/create_ec_package.go @@ -4,14 +4,13 @@ import ( "fmt" "io" "math/rand" - "os" - "path/filepath" "sync" "github.com/samber/lo" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + myio "gitlink.org.cn/cloudream/common/utils/io" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" @@ -185,175 +184,65 @@ func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObje } // 上传文件 -func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecInfo cdssdk.ECRedundancyInfo, ec model.Ec) ([]string, []int64, error) { - //生成纠删码的写入节点序列 - nodes := make([]UploadNodeInfo, ec.EcN) - numNodes := len(uploadNodes) - startWriteNodeID := rand.Intn(numNodes) - for i := 0; i < ec.EcN; i++ { - nodes[i] = uploadNodes[(startWriteNodeID+i)%numNodes] - } +func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecInfo cdssdk.ECRedundancyInfo, ecMod model.Ec) ([]string, []int64, error) { + uploadNodes = shuffleNodes(uploadNodes, ecMod.EcN) - hashs, err := ecWrite(obj.File, obj.Size, ecInfo.PacketSize, ec.EcK, ec.EcN, nodes) + rs, err := ec.NewRs(ecMod.EcK, ecMod.EcN, ecInfo.ChunkSize) if err != nil { - return nil, nil, fmt.Errorf("EcWrite failed, err: %w", err) + return nil, nil, err } - nodeIDs := make([]int64, len(nodes)) - for i := 0; i < len(nodes); i++ { - nodeIDs[i] = nodes[i].Node.NodeID + outputs := myio.ChunkedSplit(obj.File, ecInfo.ChunkSize, ecMod.EcK, myio.ChunkedSplitOption{ + FillZeros: true, + }) + var readers []io.Reader + for _, o := range outputs { + readers = append(readers, o) } + defer func() { + for _, o := range outputs { + o.Close() + } + }() - return hashs, nodeIDs, nil -} + encStrs := rs.EncodeAll(readers) -// chooseUploadNode 选择一个上传文件的节点 -// 1. 从与当前客户端相同地域的节点中随机选一个 -// 2. 没有用的话从所有节点中随机选一个 -func (t *CreateECPackage) chooseUploadNode(nodes []UploadNodeInfo) UploadNodeInfo { - sameLocationNodes := lo.Filter(nodes, func(e UploadNodeInfo, i int) bool { return e.IsSameLocation }) - if len(sameLocationNodes) > 0 { - return sameLocationNodes[rand.Intn(len(sameLocationNodes))] - } + wg := sync.WaitGroup{} - return nodes[rand.Intn(len(nodes))] -} + nodeIDs := make([]int64, ecMod.EcN) + fileHashes := make([]string, ecMod.EcN) + anyErrs := make([]error, ecMod.EcN) -func ecWrite(file io.ReadCloser, fileSize int64, packetSize int64, ecK int, ecN int, nodes []UploadNodeInfo) ([]string, error) { - // TODO 需要参考RepWrite函数的代码逻辑,做好错误处理 - //获取文件大小 - - var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN - //计算每个块的packet数 - numPacket := (fileSize + int64(ecK)*packetSize - 1) / (int64(ecK) * packetSize) - //fmt.Println(numPacket) - //创建channel - loadBufs := make([]chan []byte, ecN) - encodeBufs := make([]chan []byte, ecN) - for i := 0; i < ecN; i++ { - loadBufs[i] = make(chan []byte) - } - for i := 0; i < ecN; i++ { - encodeBufs[i] = make(chan []byte) - } - hashs := make([]string, ecN) - //正式开始写入 - go load(file, loadBufs[:ecN], ecK, numPacket*int64(ecK), packetSize) //从本地文件系统加载数据 - go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket) - - var wg sync.WaitGroup - wg.Add(ecN) - - for idx := 0; idx < ecN; idx++ { - i := idx - reader := channelBytesReader{ - channel: encodeBufs[idx], - packetCount: numPacket, - } + for i := range encStrs { + idx := i + wg.Add(1) + nodeIDs[idx] = uploadNodes[idx].Node.NodeID go func() { - // TODO 处理错误 - fileHash, _ := uploadFile(&reader, nodes[i]) - hashs[i] = fileHash - wg.Done() + defer wg.Done() + fileHashes[idx], anyErrs[idx] = uploadFile(encStrs[idx], uploadNodes[idx]) }() } - wg.Wait() - - return hashs, nil -} - -func load(file io.ReadCloser, loadBufs []chan []byte, ecK int, totalNumPacket int64, ecPacketSize int64) error { - - for i := 0; int64(i) < totalNumPacket; i++ { - - buf := make([]byte, ecPacketSize) - idx := i % ecK - _, err := file.Read(buf) - if err != nil { - return fmt.Errorf("read file falied, err:%w", err) - } - loadBufs[idx] <- buf - - if idx == ecK-1 { - for j := ecK; j < len(loadBufs); j++ { - zeroPkt := make([]byte, ecPacketSize) - loadBufs[j] <- zeroPkt - } - } - if err != nil && err != io.EOF { - return fmt.Errorf("load file to buf failed, err:%w", err) - } - } - for i := 0; i < len(loadBufs); i++ { - - close(loadBufs[i]) - } - file.Close() - return nil -} + wg.Wait() -func encode(inBufs []chan []byte, outBufs []chan []byte, ecK int, coefs [][]int64, numPacket int64) { - var tmpIn [][]byte - tmpIn = make([][]byte, len(outBufs)) - enc := ec.NewRsEnc(ecK, len(outBufs)) - for i := 0; int64(i) < numPacket; i++ { - for j := 0; j < len(outBufs); j++ { - tmpIn[j] = <-inBufs[j] + for i, e := range anyErrs { + if e != nil { + return nil, nil, fmt.Errorf("uploading file to node %d: %w", uploadNodes[i].Node.NodeID, e) } - enc.Encode(tmpIn) - for j := 0; j < len(outBufs); j++ { - outBufs[j] <- tmpIn[j] - } - } - for i := 0; i < len(outBufs); i++ { - close(outBufs[i]) } -} -type channelBytesReader struct { - channel chan []byte - packetCount int64 - readingData []byte + return fileHashes, nodeIDs, nil } -func (r *channelBytesReader) Read(buf []byte) (int, error) { - if len(r.readingData) == 0 { - if r.packetCount == 0 { - return 0, io.EOF - } - - r.readingData = <-r.channel - r.packetCount-- +func shuffleNodes(uploadNodes []UploadNodeInfo, extendTo int) []UploadNodeInfo { + for i := len(uploadNodes); i < extendTo; i++ { + uploadNodes = append(uploadNodes, uploadNodes[rand.Intn(len(uploadNodes))]) } - len := copy(buf, r.readingData) - r.readingData = r.readingData[:len] - - return len, nil -} + // 随机排列上传节点 + rand.Shuffle(len(uploadNodes), func(i, j int) { + uploadNodes[i], uploadNodes[j] = uploadNodes[j], uploadNodes[i] + }) -func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) { - fDir, err := os.Executable() - if err != nil { - panic(err) - } - fURL := filepath.Join(filepath.Dir(fDir), "assets") - _, err = os.Stat(fURL) - if os.IsNotExist(err) { - os.MkdirAll(fURL, os.ModePerm) - } - file, err := os.Create(filepath.Join(fURL, localFilePath)) - if err != nil { - return - } - for i := 0; int64(i) < numPacket; i++ { - for j := 0; j < len(inBuf); j++ { - tmp := <-inBuf[j] - fmt.Println(tmp) - file.Write(tmp) - } - } - file.Close() - wg.Done() + return uploadNodes } diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index d35bedf..55ee3e5 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -19,7 +19,7 @@ func (db *DB) ObjectBlock() *ObjectBlockDB { } func (db *ObjectBlockDB) Create(ctx SQLContext, objectID int64, index int, fileHash string) error { - _, err := ctx.Exec("insert into ObjectBlock(ObjectID, Index, FileHash) values(?,?,?)", objectID, index, fileHash) + _, err := ctx.Exec("insert into ObjectBlock values(?,?,?)", objectID, index, fileHash) return err } diff --git a/common/pkgs/ec/rs_test.go b/common/pkgs/ec/rs_test.go index 26db54c..fb60f28 100644 --- a/common/pkgs/ec/rs_test.go +++ b/common/pkgs/ec/rs_test.go @@ -2,222 +2,262 @@ package ec import ( "bytes" - "fmt" "io" - "io/ioutil" + "sync" "testing" - "gitlink.org.cn/cloudream/common/pkgs/ipfs" - //"gitlink.org.cn/cloudream/common/pkgs/ipfs" - //"gitlink.org.cn/cloudream/storage/agent/internal/config" - //stgglb "gitlink.org.cn/cloudream/storage/common/globals" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" + . "github.com/smartystreets/goconvey/convey" ) -func test_Encode(t *testing.T) { - enc, _ := NewRs(3, 5, 10) - rc := make([]io.ReadCloser, 3) - rc[0] = ioutil.NopCloser(bytes.NewBufferString("11111111")) - rc[1] = ioutil.NopCloser(bytes.NewBufferString("22222222")) - rc[2] = ioutil.NopCloser(bytes.NewBufferString("33333333")) - /*rc[0].Close() - rc[1].Close() - rc[2].Close()*/ - print("#$$$$$$$$$$$") - out, _ := enc.ReconstructData(rc, []int{0, 1, 2}) - //out, _ := enc.Encode(rc) - buf := make([]byte, 100) - out[0].Read(buf) - fmt.Println(buf) - out[1].Read(buf) - fmt.Println(buf) - t.Logf(string(buf)) - t.Log(buf) -} +func Test_EncodeReconstruct(t *testing.T) { + Convey("编码后使用校验块重建数据", t, func() { + rs, err := NewRs(2, 3, 5) + So(err, ShouldBeNil) -/* ------------------------------------------------- -hash:QmX49sGugmtVPfNo13q84YL1NwGmr5yzWDDmJZ7PniQ9b6 -内容:1111122222233333333334444444445663454543534534 - -hash:QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW -(5,3),chunkSize:6 -data1:QmS2t7xFgTMTX2DGYsbDdmHnGvaG6sc7D9k1R2WZyuDx56 -data2:QmUSZvuABjfGKF1c4VxvVBdH31SroDm2QyLGBrVFomRM8P -data3:QmcD3RpUh5rwMhf9yBywBeT6ibT1P5DSJC67aoD77jhTBn -内容:qqqqqqqqwwwwwwwwwwwwwweeeeeeeeeeeeerrrrrrrrrrr ------------------------------------------------------ -*/ -func test_Fetch(t *testing.T) { - - blkReader, _ := NewBlockReader() - /*****************************FetchBlock*************************/ - /*r, _ := blkReader.FetchBLock("QmX49sGugmtVPfNo13q84YL1NwGmr5yzWDDmJZ7PniQ9b6") - data, _ := ioutil.ReadAll(r) - t.Logf(string(data))*/ - - /**********************FetchBlocks************************************ - hashs := []string{"QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", "QmX49sGugmtVPfNo13q84YL1NwGmr5yzWDDmJZ7PniQ9b6"} - rs, _ := blkReader.FetchBLocks(hashs) - data1, _ := ioutil.ReadAll(rs[0]) - data2, _ := ioutil.ReadAll(rs[1]) - t.Logf(string(data1)) - t.Logf(string(data2)) - /*************************JumpFetchBlock*********************************/ - blkReader.SetJumpRead("QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", 46, 3) - blkReader.SetchunkSize(6) - r, _ := blkReader.JumpFetchBlock(1) - data, _ := ioutil.ReadAll(r) - t.Logf(string(data)) -} -func test_Fetch_and_Encode(t *testing.T) { - chunkSize := int64(6) - blkReader, _ := NewBlockReader() - defer blkReader.Close() - blkReader.SetJumpRead("QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", 46, 3) - blkReader.SetchunkSize(int64(chunkSize)) - dataBlocks := make([]io.ReadCloser, 3) - for i := range dataBlocks { - dataBlocks[i], _ = blkReader.JumpFetchBlock(i) - } - enc, _ := NewRs(3, 5, chunkSize) - parityBlocks, _ := enc.Encode(dataBlocks) - - parityData := make([]string, 2) - finished := false - for { - if finished { - break + outputs := rs.EncodeAll([]io.Reader{ + bytes.NewReader([]byte{1, 2, 3, 4, 5}), + bytes.NewReader([]byte{6, 7, 8, 9, 10}), + }) + + var outputData = [][]byte{ + make([]byte, 5), + make([]byte, 5), + make([]byte, 5), } - buf := make([]byte, chunkSize) - for i, pipe := range parityBlocks { - _, err := pipe.Read(buf) - if err != nil { - finished = true - break - } - parityData[i] = parityData[i] + string(buf) + + { // 编码所有块 + errs := make([]error, 3) + + wg := sync.WaitGroup{} + for i := range outputs { + idx := i + + wg.Add(1) + go func() { + defer wg.Done() + _, err := io.ReadFull(outputs[idx], outputData[idx]) + errs[idx] = err + }() + } + + wg.Wait() + + for _, e := range errs { + if e != io.EOF { + So(e, ShouldBeNil) + } + } + + So(outputData[0], ShouldResemble, []byte{1, 2, 3, 4, 5}) + So(outputData[1], ShouldResemble, []byte{6, 7, 8, 9, 10}) } - } - t.Logf(parityData[0]) - t.Logf(parityData[1]) -} + { // 重建所有数据块 + recOutputs := rs.ReconstructData([]io.Reader{ + bytes.NewBuffer(outputData[1]), + bytes.NewBuffer(outputData[2]), + }, []int{1, 2}) + + recOutputData := [][]byte{ + make([]byte, 5), + make([]byte, 5), + } + errs := make([]error, 2) + + wg := sync.WaitGroup{} + for i := range recOutputs { + idx := i + + wg.Add(1) + go func() { + defer wg.Done() + _, err := io.ReadFull(recOutputs[idx], recOutputData[idx]) + errs[idx] = err + }() + } + + wg.Wait() -func test_Fetch_and_Encode_and_Degraded(t *testing.T) { - chunkSize := int64(6) - blkReader, _ := NewBlockReader() - defer blkReader.Close() - blkReader.SetJumpRead("QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", 46, 3) - blkReader.SetchunkSize(int64(chunkSize)) - dataBlocks := make([]io.ReadCloser, 3) - for i := range dataBlocks { - dataBlocks[i], _ = blkReader.JumpFetchBlock(i) - } - enc, _ := NewRs(3, 5, chunkSize) - parityBlocks, _ := enc.Encode(dataBlocks) - go func() { - ioutil.ReadAll(parityBlocks[0]) - }() - degradedBlocks := make([]io.ReadCloser, 3) - degradedBlocks[0], _ = blkReader.JumpFetchBlock(1) - degradedBlocks[1], _ = blkReader.JumpFetchBlock(2) - degradedBlocks[2] = parityBlocks[1] - newDataBlocks, _ := enc.ReconstructData(degradedBlocks, []int{1, 2, 4}) - newData := make([]string, 3) - finished := false - for { - if finished { - break + for _, e := range errs { + if e != io.EOF { + So(e, ShouldBeNil) + } + } + + So(recOutputData[0], ShouldResemble, []byte{1, 2, 3, 4, 5}) + So(recOutputData[1], ShouldResemble, []byte{6, 7, 8, 9, 10}) } - buf := make([]byte, chunkSize) - for i, pipe := range newDataBlocks { - _, err := pipe.Read(buf) - if err != nil { - finished = true - break - } - newData[i] = newData[i] + string(buf) + + { // 重建指定的数据块 + recOutputs := rs.ReconstructSome([]io.Reader{ + bytes.NewBuffer(outputData[1]), + bytes.NewBuffer(outputData[2]), + }, []int{1, 2}, []int{0, 1}) + + recOutputData := [][]byte{ + make([]byte, 5), + make([]byte, 5), + } + errs := make([]error, 2) + + wg := sync.WaitGroup{} + for i := range recOutputs { + idx := i + + wg.Add(1) + go func() { + defer wg.Done() + _, err := io.ReadFull(recOutputs[idx], recOutputData[idx]) + errs[idx] = err + }() + } + + wg.Wait() + + for _, e := range errs { + if e != io.EOF { + So(e, ShouldBeNil) + } + } + + So(recOutputData[0], ShouldResemble, []byte{1, 2, 3, 4, 5}) + So(recOutputData[1], ShouldResemble, []byte{6, 7, 8, 9, 10}) } - } - t.Logf(newData[0]) - t.Logf(newData[1]) - t.Logf(newData[2]) -} + { // 重建指定的数据块 + recOutputs := rs.ReconstructSome([]io.Reader{ + bytes.NewBuffer(outputData[1]), + bytes.NewBuffer(outputData[2]), + }, []int{1, 2}, []int{0}) -func test_pin_data_blocks(t *testing.T) { - chunkSize := int64(6) - blkReader, _ := NewBlockReader() - defer blkReader.Close() - blkReader.SetJumpRead("QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", 46, 3) - blkReader.SetchunkSize(int64(chunkSize)) - dataBlocks := make([]io.ReadCloser, 3) - ipfsclient, _ := stgglb.IPFSPool.Acquire() - for i := range dataBlocks { - dataBlocks[i], _ = blkReader.JumpFetchBlock(i) - hash, _ := ipfsclient.CreateFile(dataBlocks[i]) - t.Logf(hash) - } + recOutputData := [][]byte{ + make([]byte, 5), + } + errs := make([]error, 2) -} + wg := sync.WaitGroup{} + for i := range recOutputs { + idx := i + + wg.Add(1) + go func() { + defer wg.Done() + _, err := io.ReadFull(recOutputs[idx], recOutputData[idx]) + errs[idx] = err + }() + } + + wg.Wait() -func print_ioreaders(t *testing.T, readers []io.ReadCloser, chunkSize int64) { - newData := make([]string, len(readers)) - finished := false - for { - if finished { - break + for _, e := range errs { + if e != io.EOF { + So(e, ShouldBeNil) + } + } + + So(recOutputData[0], ShouldResemble, []byte{1, 2, 3, 4, 5}) } - buf := make([]byte, chunkSize) - for i, pipe := range readers { - _, err := pipe.Read(buf) - if err != nil { - finished = true - break - } - newData[i] = newData[i] + string(buf) + + { // 重建指定的数据块 + recOutputs := rs.ReconstructSome([]io.Reader{ + bytes.NewBuffer(outputData[1]), + bytes.NewBuffer(outputData[2]), + }, []int{1, 2}, []int{1}) + + recOutputData := [][]byte{ + make([]byte, 5), + } + errs := make([]error, 2) + + wg := sync.WaitGroup{} + for i := range recOutputs { + idx := i + + wg.Add(1) + go func() { + defer wg.Done() + _, err := io.ReadFull(recOutputs[idx], recOutputData[idx]) + errs[idx] = err + }() + } + + wg.Wait() + + for _, e := range errs { + if e != io.EOF { + So(e, ShouldBeNil) + } + } + + So(recOutputData[0], ShouldResemble, []byte{6, 7, 8, 9, 10}) } - } - for _, data := range newData { - t.Logf(data) - } -} -func test_reconstructData(t *testing.T) { - /* - blkReader, _ := NewBlockReader() - defer blkReader.Close() - hashs := []string{"QmS2t7xFgTMTX2DGYsbDdmHnGvaG6sc7D9k1R2WZyuDx56", "QmUSZvuABjfGKF1c4VxvVBdH31SroDm2QyLGBrVFomRM8P", "QmcD3RpUh5rwMhf9yBywBeT6ibT1P5DSJC67aoD77jhTBn"} - dataBlocks, _ := blkReader.FetchBLocks(hashs) - chunkSize := int64(6) - enc, _ := NewRs(3, 5, chunkSize) - print("@@@@@@@@@") - newDataBlocks, _ := enc.ReconstructSome(dataBlocks, []int{0, 1, 2}, []int{3, 4}) - print("!!!!!!!!!") - print_ioreaders(t, newDataBlocks, chunkSize) - */ -} -func Test_main(t *testing.T) { - //test_Encode(t) - //stgglb.InitLocal(&config.Cfg().Local) - stgglb.InitIPFSPool(&ipfs.Config{Port: 5001}) - //test_Fetch(t) - //test_Fetch_and_Encode(t) - //test_Fetch_and_Encode_and_Degraded(t) - //test_pin_data_blocks(t) - test_reconstructData(t) -} + { // 单独产生校验块 + encOutputs := rs.Encode([]io.Reader{ + bytes.NewBuffer(outputData[0]), + bytes.NewBuffer(outputData[1]), + }) + + encOutputData := [][]byte{ + make([]byte, 5), + } + errs := make([]error, 2) + + wg := sync.WaitGroup{} + for i := range encOutputs { + idx := i -/* -func Test_Fetch_Encode_ReconstructData(t *testing.T) { - inFileName := "test.txt" - enc, _ := NewRs(3, 5, 10) - file, err := os.Open(inFileName) - if err != nil { - t.Error(err) - } - var data io.ReadCloser - data = file - //enc.Encode(data) -}*/ + wg.Add(1) + go func() { + defer wg.Done() + _, err := io.ReadFull(encOutputs[idx], encOutputData[idx]) + errs[idx] = err + }() + } + + wg.Wait() + + for _, e := range errs { + if e != io.EOF { + So(e, ShouldBeNil) + } + } + + So(encOutputData[0], ShouldResemble, outputData[2]) + } + + { // 使用ReconstructAny单独重建校验块 + encOutputs := rs.ReconstructAny([]io.Reader{ + bytes.NewBuffer(outputData[0]), + bytes.NewBuffer(outputData[1]), + }, []int{0, 1}, []int{2}) + + encOutputData := [][]byte{ + make([]byte, 5), + } + errs := make([]error, 2) + + wg := sync.WaitGroup{} + for i := range encOutputs { + idx := i + + wg.Add(1) + go func() { + defer wg.Done() + _, err := io.ReadFull(encOutputs[idx], encOutputData[idx]) + errs[idx] = err + }() + } + + wg.Wait() + + for _, e := range errs { + if e != io.EOF { + So(e, ShouldBeNil) + } + } + + So(encOutputData[0], ShouldResemble, outputData[2]) + } + }) +} diff --git a/common/pkgs/ec/stream_rs.go b/common/pkgs/ec/stream_rs.go index c683468..c1d68e1 100644 --- a/common/pkgs/ec/stream_rs.go +++ b/common/pkgs/ec/stream_rs.go @@ -27,122 +27,117 @@ func NewRs(k int, n int, chunkSize int64) (*Rs, error) { return &enc, err } -// 编码 -func (r *Rs) Encode(data []io.ReadCloser) ([]io.ReadCloser, error) { - output := make([]io.ReadCloser, r.ecP) - parity := make([]*io.PipeWriter, r.ecP) - for i := range output { - var reader *io.PipeReader - reader, parity[i] = io.Pipe() - output[i] = reader +// 编码。仅输出校验块 +func (r *Rs) Encode(input []io.Reader) []io.ReadCloser { + outReaders := make([]io.ReadCloser, r.ecP) + outWriters := make([]*io.PipeWriter, r.ecP) + for i := 0; i < r.ecP; i++ { + outReaders[i], outWriters[i] = io.Pipe() } + go func() { chunks := make([][]byte, r.ecN) - for i := range chunks { - chunks[i] = make([]byte, r.chunkSize) + for idx := 0; idx < r.ecN; idx++ { + chunks[idx] = make([]byte, r.chunkSize) } + + var closeErr error + loop: for { - finished := false - //读数据块到buff + //读块到buff for i := 0; i < r.ecK; i++ { - _, err := data[i].Read(chunks[i]) + _, err := io.ReadFull(input[i], chunks[i]) if err != nil { - finished = true - break + closeErr = err + break loop } } - if finished { - break - } - //编码 + err := r.encoder.Encode(chunks) if err != nil { return } - //输出到writer - for i := r.ecK; i < r.ecN; i++ { - parity[i-r.ecK].Write(chunks[i]) + + //输出到outWriter + for i := range outWriters { + err := myio.WriteAll(outWriters[i], chunks[i+r.ecK]) + if err != nil { + closeErr = err + break loop + } } } - for i := range data { - data[i].Close() - } - for i := range parity { - parity[i].Close() + + for i := range outWriters { + outWriters[i].CloseWithError(closeErr) } }() - return output, nil + + return outReaders } -// 降级读,任意k个块恢复出原始数据块 -func (r *Rs) ReconstructData(input []io.ReadCloser, inBlockIdx []int) ([]io.ReadCloser, error) { - dataReader := make([]io.ReadCloser, r.ecK) - dataWriter := make([]*io.PipeWriter, r.ecK) - for i := 0; i < r.ecK; i++ { - var reader *io.PipeReader - reader, dataWriter[i] = io.Pipe() - dataReader[i] = reader +// 编码。输出包含所有的数据块和校验块 +func (r *Rs) EncodeAll(input []io.Reader) []io.ReadCloser { + outReaders := make([]io.ReadCloser, r.ecN) + outWriters := make([]*io.PipeWriter, r.ecN) + for i := 0; i < r.ecN; i++ { + outReaders[i], outWriters[i] = io.Pipe() } + go func() { chunks := make([][]byte, r.ecN) - for i := range chunks { - chunks[i] = make([]byte, r.chunkSize) - } - constructIdx := make([]bool, r.ecN) - for i := 0; i < r.ecN; i++ { - constructIdx[i] = false - } - for i := 0; i < r.ecK; i++ { - constructIdx[inBlockIdx[i]] = true - } - nilIdx := make([]int, r.ecP) - ct := 0 - for i := 0; i < r.ecN; i++ { - if !constructIdx[i] { - nilIdx[ct] = i - ct++ - } + for idx := 0; idx < r.ecN; idx++ { + chunks[idx] = make([]byte, r.chunkSize) } + var closeErr error + loop: for { - finished := false - - //读数据块到buff + //读块到buff for i := 0; i < r.ecK; i++ { - _, err := input[i].Read(chunks[inBlockIdx[i]]) + _, err := io.ReadFull(input[i], chunks[i]) if err != nil { - finished = true - break + closeErr = err + break loop } } - for i := 0; i < r.ecP; i++ { - chunks[nilIdx[i]] = nil - } - if finished { - break - } - //解码 - err := r.encoder.ReconstructData(chunks) + + err := r.encoder.Encode(chunks) if err != nil { return } - //输出到writer - for i := 0; i < r.ecK; i++ { - dataWriter[i].Write(chunks[i]) + + //输出到outWriter + for i := range outWriters { + err := myio.WriteAll(outWriters[i], chunks[i]) + if err != nil { + closeErr = err + break loop + } } } - for i := range input { - input[i].Close() - } - for i := range dataWriter { - dataWriter[i].Close() + + for i := range outWriters { + outWriters[i].CloseWithError(closeErr) } }() - return dataReader, nil + + return outReaders +} + +// 降级读,任意k个块恢复出所有原始的数据块。 +func (r *Rs) ReconstructData(input []io.Reader, inBlockIdx []int) []io.ReadCloser { + outIndexes := make([]int, r.ecK) + for i := 0; i < r.ecK; i++ { + outIndexes[i] = i + } + + return r.ReconstructSome(input, inBlockIdx, outIndexes) } -// 修复,任意k个块恢复若干想要的块。调用者应该保证input的每一个流长度相同,且均为chunkSize的整数倍 -func (r *Rs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx []int) ([]io.ReadCloser, error) { +// 修复,任意k个块恢复指定的数据块。 +// 调用者应该保证input的每一个流长度相同,且均为chunkSize的整数倍 +func (r *Rs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx []int) []io.ReadCloser { outReaders := make([]io.ReadCloser, len(outBlockIdx)) outWriters := make([]*io.PipeWriter, len(outBlockIdx)) for i := 0; i < len(outBlockIdx); i++ { @@ -151,6 +146,7 @@ func (r *Rs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx [] go func() { chunks := make([][]byte, r.ecN) + // 只初始化输入的buf,输出的buf在调用重建函数之后,会自动建立出来 for _, idx := range inBlockIdx { chunks[idx] = make([]byte, r.chunkSize) } @@ -161,6 +157,11 @@ func (r *Rs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx [] outBools[idx] = true } + inBools := make([]bool, r.ecN) + for _, idx := range inBlockIdx { + inBools[idx] = true + } + var closeErr error loop: for { @@ -185,6 +186,80 @@ func (r *Rs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx [] closeErr = err break loop } + + // 设置buf长度为0,cap不会受影响。注:如果一个块既是输入又是输出,那不能清空这个块 + if !inBools[outBlockIdx[i]] { + chunks[outBlockIdx[i]] = chunks[outBlockIdx[i]][:0] + } + } + } + + for i := range outWriters { + outWriters[i].CloseWithError(closeErr) + } + }() + + return outReaders +} + +// 重建任意块,包括数据块和校验块。 +// 当前的实现会把不需要的块都重建出来,所以应该避免使用这个函数。 +func (r *Rs) ReconstructAny(input []io.Reader, inBlockIdxes []int, outBlockIdxes []int) []io.ReadCloser { + outReaders := make([]io.ReadCloser, len(outBlockIdxes)) + outWriters := make([]*io.PipeWriter, len(outBlockIdxes)) + for i := 0; i < len(outBlockIdxes); i++ { + outReaders[i], outWriters[i] = io.Pipe() + } + + go func() { + chunks := make([][]byte, r.ecN) + // 只初始化输入的buf,输出的buf在调用重建函数之后,会自动建立出来 + for _, idx := range inBlockIdxes { + chunks[idx] = make([]byte, r.chunkSize) + } + + //outBools:要输出的若干块idx + outBools := make([]bool, r.ecN) + for _, idx := range outBlockIdxes { + outBools[idx] = true + } + + inBools := make([]bool, r.ecN) + for _, idx := range inBlockIdxes { + inBools[idx] = true + } + + var closeErr error + loop: + for { + //读块到buff + for i := 0; i < r.ecK; i++ { + _, err := io.ReadFull(input[i], chunks[inBlockIdxes[i]]) + if err != nil { + closeErr = err + break loop + } + } + + err := r.encoder.Reconstruct(chunks) + if err != nil { + return + } + + //输出到outWriter + for i := range outBlockIdxes { + outIndex := outBlockIdxes[i] + + err := myio.WriteAll(outWriters[i], chunks[outIndex]) + if err != nil { + closeErr = err + break loop + } + + // 设置buf长度为0,cap不会受影响。注:如果一个块既是输入又是输出,那不能清空这个块 + if !inBools[outIndex] { + chunks[outIndex] = chunks[outIndex][:0] + } } } @@ -193,5 +268,5 @@ func (r *Rs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx [] } }() - return outReaders, nil + return outReaders } diff --git a/common/pkgs/ioswitch/ops/ops.go b/common/pkgs/ioswitch/ops/ops.go index 5ac5a17..48d1b6e 100644 --- a/common/pkgs/ioswitch/ops/ops.go +++ b/common/pkgs/ioswitch/ops/ops.go @@ -10,7 +10,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/types" myio "gitlink.org.cn/cloudream/common/utils/io" - mymath "gitlink.org.cn/cloudream/common/utils/math" "gitlink.org.cn/cloudream/common/utils/serder" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" @@ -25,7 +24,7 @@ var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[ioswitc (*GRPCSend)(nil), (*GRPCFetch)(nil), (*ECCompute)(nil), - (*Combine)(nil), + (*Join)(nil), ))) type IPFSRead struct { @@ -192,10 +191,7 @@ func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { inputs = append(inputs, s.Stream) } - outputs, err := rs.ReconstructSome(inputs, o.InputBlockIndexes, o.OutputBlockIndexes) - if err != nil { - return fmt.Errorf("reconstructing: %w", err) - } + outputs := rs.ReconstructSome(inputs, o.InputBlockIndexes, o.OutputBlockIndexes) wg := sync.WaitGroup{} for i, id := range o.OutputIDs { @@ -209,62 +205,37 @@ func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { return nil } -type Combine struct { +type Join struct { InputIDs []ioswitch.StreamID `json:"inputIDs"` OutputID ioswitch.StreamID `json:"outputID"` Length int64 `json:"length"` } -func (o *Combine) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { +func (o *Join) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { strs, err := sw.WaitStreams(planID, o.InputIDs...) if err != nil { return err } + + var strReaders []io.Reader + for _, s := range strs { + strReaders = append(strReaders, s.Stream) + } defer func() { for _, str := range strs { str.Stream.Close() } }() - length := o.Length - - pr, pw := io.Pipe() - sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, pr)) - - buf := make([]byte, 4096) - for _, str := range strs { - for { - bufLen := mymath.Min(length, int64(len(buf))) - if bufLen == 0 { - return nil - } - - rd, err := str.Stream.Read(buf[:bufLen]) - if err != nil { - if err != io.EOF { - return err - } - - length -= int64(rd) - err = myio.WriteAll(pw, buf[:rd]) - if err != nil { - return err - } - - break - } - - length -= int64(rd) - err = myio.WriteAll(pw, buf[:rd]) - if err != nil { - return err - } - } - } - - if length > 0 { - return fmt.Errorf("want %d bytes, but only get %d bytes", o.Length, o.Length-length) - } + fut := future.NewSetVoid() + sw.StreamReady(planID, + ioswitch.NewStream(o.OutputID, + myio.AfterReadClosed(myio.Length(myio.Join(strReaders), o.Length), func(closer io.ReadCloser) { + fut.SetVoid() + }), + ), + ) + fut.Wait(context.TODO()) return nil } diff --git a/common/pkgs/ioswitch/plans/plan_builder.go b/common/pkgs/ioswitch/plans/plan_builder.go index 9caea68..8d0ab13 100644 --- a/common/pkgs/ioswitch/plans/plan_builder.go +++ b/common/pkgs/ioswitch/plans/plan_builder.go @@ -189,7 +189,7 @@ func (b *AgentPlanBuilder) ECCompute(ec stgmod.EC, inBlockIndexes []int, outBloc return mstr } -func (b *AgentPlanBuilder) Combine(length int64, streams ...*AgentStream) *AgentStream { +func (b *AgentPlanBuilder) Join(length int64, streams ...*AgentStream) *AgentStream { agtStr := &AgentStream{ owner: b, info: b.owner.newStream(), @@ -200,7 +200,7 @@ func (b *AgentPlanBuilder) Combine(length int64, streams ...*AgentStream) *Agent inputStrIDs = append(inputStrIDs, str.info.ID) } - b.ops = append(b.ops, &ops.Combine{ + b.ops = append(b.ops, &ops.Join{ InputIDs: inputStrIDs, OutputID: agtStr.info.ID, Length: length, diff --git a/common/pkgs/iterator/ec_object_iterator.go b/common/pkgs/iterator/ec_object_iterator.go index 5f34e08..97b9ae1 100644 --- a/common/pkgs/iterator/ec_object_iterator.go +++ b/common/pkgs/iterator/ec_object_iterator.go @@ -4,13 +4,12 @@ import ( "fmt" "io" "math/rand" - "os" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + myio "gitlink.org.cn/cloudream/common/utils/io" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmodels "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" @@ -73,19 +72,23 @@ func (iter *ECObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingOb obj := iter.objects[iter.currentIndex] ecData := iter.objectECData[iter.currentIndex] - blocks := ecData.Blocks - ec := iter.ec - ecK := ec.EcK - ecN := ec.EcN //采取直接读,优先选内网节点 - hashs := make([]string, ecK) - nds := make([]DownloadNodeInfo, ecK) - for i := 0; i < ecK; i++ { - hashs[i] = blocks[i].FileHash + var chosenNodes []DownloadNodeInfo + var chosenBlocks []stgmodels.ObjectBlockData + for i := range ecData.Blocks { + if len(chosenBlocks) == iter.ec.EcK { + break + } + + // 块没有被任何节点缓存或者获取失败都没关系,只要能获取到k个块的信息就行 + + if len(ecData.Blocks[i].NodeIDs) == 0 { + continue + } - getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(blocks[i].NodeIDs)) + getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(ecData.Blocks[i].NodeIDs)) if err != nil { - return nil, fmt.Errorf("getting nodes: %w", err) + continue } downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo { @@ -95,36 +98,23 @@ func (iter *ECObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingOb } }) - nds[i] = iter.chooseDownloadNode(downloadNodes) - } + chosenBlocks = append(chosenBlocks, ecData.Blocks[i]) + chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes)) - //nodeIDs, nodeIPs直接按照第1~ecK个排列 - nodeIDs := make([]int64, ecK) - nodeIPs := make([]string, ecK) - grpcPorts := make([]int, ecK) - for i := 0; i < ecK; i++ { - nodeIDs[i] = nds[i].Node.NodeID - nodeIPs[i] = nds[i].Node.ExternalIP - grpcPorts[i] = nds[i].Node.ExternalGRPCPort - if nds[i].IsSameLocation { - nodeIPs[i] = nds[i].Node.LocalIP - grpcPorts[i] = nds[i].Node.LocalGRPCPort - logger.Infof("client and node %d are at the same location, use local ip", nds[i].Node.NodeID) - } } - fileSize := obj.Size - blockIDs := make([]int, ecK) - for i := 0; i < ecK; i++ { - blockIDs[i] = i + if len(chosenBlocks) < iter.ec.EcK { + return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", iter.ec.EcK, len(chosenBlocks)) } - reader, err := iter.downloadEcObject(fileSize, ecK, ecN, blockIDs, nodeIDs, nodeIPs, grpcPorts, hashs) + + reader, err := iter.downloadEcObject(iter.downloadCtx, obj.Size, chosenNodes, chosenBlocks) if err != nil { return nil, fmt.Errorf("ec read failed, err: %w", err) } return &IterDownloadingObject{ - File: reader, + Object: obj, + File: reader, }, nil } @@ -146,87 +136,36 @@ func (i *ECObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) Downlo return entries[rand.Intn(len(entries))] } -func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int, blockIDs []int, nodeIDs []int64, nodeIPs []string, grpcPorts []int, hashs []string) (io.ReadCloser, error) { - // TODO zkx 先试用同步方式实现逻辑,做好错误处理。同时也方便下面直接使用uploadToNode和uploadToLocalIPFS来优化代码结构 - //wg := sync.WaitGroup{} - numPacket := (fileSize + int64(ecK)*iter.ecInfo.PacketSize - 1) / (int64(ecK) * iter.ecInfo.PacketSize) - getBufs := make([]chan []byte, ecN) - decodeBufs := make([]chan []byte, ecK) - for i := 0; i < ecN; i++ { - getBufs[i] = make(chan []byte) - } - for i := 0; i < ecK; i++ { - decodeBufs[i] = make(chan []byte) - } - for idx := 0; idx < len(blockIDs); idx++ { - i := idx - go func() { - // TODO 处理错误 - file, _ := downloadFile(iter.downloadCtx, nodeIDs[i], nodeIPs[i], grpcPorts[i], hashs[i]) - - for p := int64(0); p < numPacket; p++ { - buf := make([]byte, iter.ecInfo.PacketSize) - // TODO 处理错误 - io.ReadFull(file, buf) - getBufs[blockIDs[i]] <- buf - } - }() - } - print(numPacket) - go decode(getBufs[:], decodeBufs[:], blockIDs, ecK, numPacket) - r, w := io.Pipe() - //persist函数,将解码得到的文件写入pipe - go func() { - for i := 0; int64(i) < numPacket; i++ { - for j := 0; j < len(decodeBufs); j++ { - tmp := <-decodeBufs[j] - _, err := w.Write(tmp) - if err != nil { - fmt.Errorf("persist file falied, err:%w", err) - } - } - } - w.Close() - }() - return r, nil -} +func (iter *ECObjectIterator) downloadEcObject(ctx *DownloadContext, fileSize int64, nodes []DownloadNodeInfo, blocks []stgmodels.ObjectBlockData) (io.ReadCloser, error) { + var fileStrs []io.ReadCloser -func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) { - fmt.Println("decode ") - var tmpIn [][]byte - var zeroPkt []byte - tmpIn = make([][]byte, len(inBufs)) - hasBlock := map[int]bool{} - for j := 0; j < len(blockSeq); j++ { - hasBlock[blockSeq[j]] = true - } - needRepair := false //检测是否传入了所有数据块 - for j := 0; j < len(outBufs); j++ { - if blockSeq[j] != j { - needRepair = true - } + rs, err := ec.NewRs(iter.ec.EcK, iter.ec.EcN, iter.ecInfo.ChunkSize) + if err != nil { + return nil, fmt.Errorf("new rs: %w", err) } - enc := ec.NewRsEnc(ecK, len(inBufs)) - for i := 0; int64(i) < numPacket; i++ { - print("!!!!!") - for j := 0; j < len(inBufs); j++ { - if hasBlock[j] { - tmpIn[j] = <-inBufs[j] - } else { - tmpIn[j] = zeroPkt - } - } - if needRepair { - err := enc.Repair(tmpIn) - if err != nil { - fmt.Fprintf(os.Stderr, "Decode Repair Error: %s", err.Error()) + + for i := range blocks { + str, err := downloadFile(ctx, nodes[i], blocks[i].FileHash) + if err != nil { + for i -= 1; i >= 0; i-- { + fileStrs[i].Close() } + return nil, fmt.Errorf("donwloading file: %w", err) } - for j := 0; j < len(outBufs); j++ { - outBufs[j] <- tmpIn[j] - } + + fileStrs = append(fileStrs, str) } - for i := 0; i < len(outBufs); i++ { - close(outBufs[i]) + + fileReaders, filesCloser := myio.ToReaders(fileStrs) + + var indexes []int + for _, b := range blocks { + indexes = append(indexes, b.Index) } + + outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes)) + return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(iter.ecInfo.ChunkSize)), fileSize), func(c io.ReadCloser) { + filesCloser() + outputsCloser() + }), nil } diff --git a/common/pkgs/iterator/rep_object_iterator.go b/common/pkgs/iterator/rep_object_iterator.go index f1ac0b2..de2504f 100644 --- a/common/pkgs/iterator/rep_object_iterator.go +++ b/common/pkgs/iterator/rep_object_iterator.go @@ -99,20 +99,7 @@ func (i *RepObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObje } }) - // 选择下载节点 - downloadNode := i.chooseDownloadNode(downloadNodes) - - // 如果客户端与节点在同一个地域,则使用内网地址连接节点 - nodeIP := downloadNode.Node.ExternalIP - grpcPort := downloadNode.Node.ExternalGRPCPort - if downloadNode.IsSameLocation { - nodeIP = downloadNode.Node.LocalIP - grpcPort = downloadNode.Node.LocalGRPCPort - - logger.Infof("client and node %d are at the same location, use local ip", downloadNode.Node.NodeID) - } - - reader, err := downloadFile(i.downloadCtx, downloadNode.Node.NodeID, nodeIP, grpcPort, repData.FileHash) + reader, err := downloadFile(i.downloadCtx, i.chooseDownloadNode(downloadNodes), repData.FileHash) if err != nil { return nil, fmt.Errorf("rep read failed, err: %w", err) } @@ -140,7 +127,17 @@ func (i *RepObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) Downl return entries[rand.Intn(len(entries))] } -func downloadFile(ctx *DownloadContext, nodeID int64, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) { +func downloadFile(ctx *DownloadContext, node DownloadNodeInfo, fileHash string) (io.ReadCloser, error) { + // 如果客户端与节点在同一个地域,则使用内网地址连接节点 + nodeIP := node.Node.ExternalIP + grpcPort := node.Node.ExternalGRPCPort + if node.IsSameLocation { + nodeIP = node.Node.LocalIP + grpcPort = node.Node.LocalGRPCPort + + logger.Infof("client and node %d are at the same location, use local ip", node.Node.NodeID) + } + if stgglb.IPFSPool != nil { logger.Infof("try to use local IPFS to download file") @@ -152,7 +149,7 @@ func downloadFile(ctx *DownloadContext, nodeID int64, nodeIP string, grpcPort in logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error()) } - return downloadFromNode(ctx, nodeID, nodeIP, grpcPort, fileHash) + return downloadFromNode(ctx, node.Node.NodeID, nodeIP, grpcPort, fileHash) } func downloadFromNode(ctx *DownloadContext, nodeID int64, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) {