diff --git a/internal/cmdline/object.go b/internal/cmdline/object.go index 819b5b7..7575995 100644 --- a/internal/cmdline/object.go +++ b/internal/cmdline/object.go @@ -62,7 +62,6 @@ func ObjectDownloadObject(ctx CommandContext, localFilePath string, objectID int return nil } - func ObjectDownloadObjectDir(ctx CommandContext, outputBaseDir string, dirName string) error { // 创建本地文件夹 err := os.MkdirAll(outputBaseDir, os.ModePerm) @@ -161,8 +160,7 @@ func ObjectUploadRepObject(ctx CommandContext, localFilePath string, bucketID in } func ObjectUploadEcObject(ctx CommandContext, localFilePath string, bucketID int64, objectName string, ecName string) error { - // TODO - //panic("not implement yet") + // TODO 参考rep的,改成异步流程 file, err := os.Open(localFilePath) if err != nil { return fmt.Errorf("open file %s failed, err: %w", localFilePath, err) diff --git a/internal/config/config.go b/internal/config/config.go index acba712..92526eb 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,15 +9,15 @@ import ( ) type Config struct { - GRPCPort int `json:"grpcPort"` - EcPacketSize int64 `json:"ecPacketSize"` - MaxRepCount int `json:"maxRepCount"` - LocalIP string `json:"localIP"` - ExternalIP string `json:"externalIP"` - Logger logger.Config `json:"logger"` - RabbitMQ racfg.Config `json:"rabbitMQ"` - IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon - DistLock distlock.Config `json:"distlock"` + GRPCPort int `json:"grpcPort"` + ECPacketSize int64 `json:"ecPacketSize"` + MaxRepCount int `json:"maxRepCount"` + LocalIP string `json:"localIP"` + ExternalIP string `json:"externalIP"` + Logger logger.Config `json:"logger"` + RabbitMQ racfg.Config `json:"rabbitMQ"` + IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon + DistLock distlock.Config `json:"distlock"` } var cfg Config diff --git a/internal/services/client_command_ec.go b/internal/services/client_command_ec.go index 31f6f4e..fcc3e2b 100644 --- a/internal/services/client_command_ec.go +++ b/internal/services/client_command_ec.go @@ -29,7 +29,8 @@ import ( ) func (svc *ObjectService) UploadEcObject(userID int64, bucketID int64, objectName string, file io.ReadCloser, fileSize int64, ecName string) error { - + // TODO 需要加锁 + /*reqBlder := reqbuilder.NewBuilder() for _, uploadObject := range t.Objects { reqBlder.Metadata(). @@ -37,21 +38,21 @@ func (svc *ObjectService) UploadEcObject(userID int64, bucketID int64, objectNam Object().CreateOne(t.bucketID, uploadObject.ObjectName) }*/ /* - mutex, err := reqBlder. - Metadata(). - // 用于判断用户是否有桶的权限 - UserBucket().ReadOne(userID, bucketID). - // 用于查询可用的上传节点 - Node().ReadAny(). - // 用于设置Rep配置 - ObjectRep().CreateAny(). - // 用于创建Cache记录 - Cache().CreateAny(). - MutexLock(ctx.DistLock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() + mutex, err := reqBlder. + Metadata(). + // 用于判断用户是否有桶的权限 + UserBucket().ReadOne(userID, bucketID). + // 用于查询可用的上传节点 + Node().ReadAny(). + // 用于设置Rep配置 + ObjectRep().CreateAny(). + // 用于创建Cache记录 + Cache().CreateAny(). + MutexLock(ctx.DistLock) + if err != nil { + return fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() */ //发送写请求,请求Coor分配写入节点Ip ecWriteResp, err := svc.coordinator.PreUploadEcObject(coormsg.NewPreUploadEcObject(bucketID, objectName, fileSize, ecName, userID, config.Cfg().ExternalIP)) @@ -93,7 +94,7 @@ func (svc *ObjectService) ecWrite(file io.ReadCloser, fileSize int64, ecK int, e var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN //计算每个块的packet数 - numPacket := (fileSize + int64(ecK)*config.Cfg().EcPacketSize - 1) / (int64(ecK) * config.Cfg().EcPacketSize) + numPacket := (fileSize + int64(ecK)*config.Cfg().ECPacketSize - 1) / (int64(ecK) * config.Cfg().ECPacketSize) //fmt.Println(numPacket) //创建channel loadBufs := make([]chan []byte, ecN) @@ -121,7 +122,7 @@ func (svc *ObjectService) ecWrite(file io.ReadCloser, fileSize int64, ecK int, e defer mutex.Unlock() */ for i := 0; i < ecN; i++ { - go svc.Send(nodes[i], encodeBufs[i], numPacket, &wg, hashs, i) + go svc.send(nodes[i], encodeBufs[i], numPacket, &wg, hashs, i) } wg.Wait() @@ -129,9 +130,10 @@ func (svc *ObjectService) ecWrite(file io.ReadCloser, fileSize int64, ecK int, e } -func (svc *ObjectService) downloadEcObject(fileSize int64, ecK int, ecN int, blockIDs []int, nodeIDs []int64, nodeIPs []string, hashs []string) (io.ReadCloser, error){ +func (svc *ObjectService) downloadEcObject(fileSize int64, ecK int, ecN int, blockIDs []int, nodeIDs []int64, nodeIPs []string, hashs []string) (io.ReadCloser, error) { + // TODO zkx 先试用同步方式实现逻辑,做好错误处理。同时也方便下面直接使用uploadToNode和uploadToLocalIPFS来优化代码结构 //wg := sync.WaitGroup{} - numPacket := (fileSize + int64(ecK)*config.Cfg().EcPacketSize - 1) / (int64(ecK) * config.Cfg().EcPacketSize) + numPacket := (fileSize + int64(ecK)*config.Cfg().ECPacketSize - 1) / (int64(ecK) * config.Cfg().ECPacketSize) getBufs := make([]chan []byte, ecN) decodeBufs := make([]chan []byte, ecK) for i := 0; i < ecN; i++ { @@ -145,10 +147,9 @@ func (svc *ObjectService) downloadEcObject(fileSize int64, ecK int, ecN int, blo } print(numPacket) go decode(getBufs[:], decodeBufs[:], blockIDs, ecK, numPacket) - r,w := io.Pipe() - print("11111") + r, w := io.Pipe() //persist函数,将解码得到的文件写入pipe - go func(){ + go func() { for i := 0; int64(i) < numPacket; i++ { for j := 0; j < len(decodeBufs); j++ { tmp := <-decodeBufs[j] @@ -166,17 +167,17 @@ func (svc *ObjectService) downloadEcObject(fileSize int64, ecK int, ecN int, blo func (svc *ObjectService) get(blockHash string, nodeIP string, getBuf chan []byte, numPacket int64) error { downloadFromAgent := false //使用本地IPFS获取 - if svc.ipfs != nil{ + if svc.ipfs != nil { log.Infof("try to use local IPFS to download file") //获取IPFS的reader - reader,err := svc.downloadFromLocalIPFS(blockHash) + reader, err := svc.downloadFromLocalIPFS(blockHash) if err != nil { downloadFromAgent = true fmt.Errorf("read ipfs block failed, err: %w", err) } defer reader.Close() - for i:=0; int64(i)