diff --git a/clientCommand.go b/clientCommand.go index fcc6cd7..59aec46 100644 --- a/clientCommand.go +++ b/clientCommand.go @@ -2,22 +2,24 @@ package main import ( "context" - "encoding/json" "fmt" "io" "os" "path/filepath" agentcaller "proto" - "rabbitmq" "strconv" "sync" - "time" + + raclient "gitlink.org.cn/cloudream/rabbitmq/client" + "gitlink.org.cn/cloudream/utils" + "gitlink.org.cn/cloudream/utils/consts" + "gitlink.org.cn/cloudream/utils/consts/errorcode" + myio "gitlink.org.cn/cloudream/utils/io" //"reflect" //"github.com/pborman/uuid" //"github.com/streadway/amqp" "ec" - "utils" "google.golang.org/grpc" @@ -25,317 +27,154 @@ import ( ) const ( + // TODO2 改为配置文件读取 port = ":5010" packetSizeInBytes = 10 ) -//TODO xh:调整函数顺序,以及函数名大小写 -func Move(bucketName string, objectName string, destination string) { +func Move(bucketName string, objectName string, destination string) error { //将bucketName, objectName, destination发给协调端 fmt.Println("move " + bucketName + "/" + objectName + " to " + destination) + userId := 0 + //获取块hash,ip,序号,编码参数等 //发送写请求,分配写入节点Ip - userId := 0 - command1 := rabbitmq.MoveCommand{ - BucketName: bucketName, - ObjectName: objectName, - UserId: userId, - Destination: destination, - } - c1, _ := json.Marshal(command1) - b1 := append([]byte("05"), c1...) - fmt.Println(string(b1)) - rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue") - rabbit1.PublishSimple(b1) - - //接收消息,赋值给ip, repHash, fileSizeInBytes - var res1 rabbitmq.MoveRes - var redundancy string - var hashs []string - var fileSizeInBytes int64 - var ecName string - var ids []int - queueName := "coorClientQueue" + strconv.Itoa(userId) - rabbit2 := rabbitmq.NewRabbitMQSimple(queueName) - msgs := rabbit2.ConsumeSimple(time.Millisecond, true) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - for d := range msgs { - _ = json.Unmarshal(d.Body, &res1) - redundancy = res1.Redundancy - ids = res1.Ids - hashs = res1.Hashs - fileSizeInBytes = res1.FileSizeInBytes - ecName = res1.EcName - wg.Done() + // 先向协调端请求文件相关的数据 + coorClient, err := raclient.NewCoordinatorClient() + if err != nil { + return fmt.Errorf("create coordinator client failed, err: %w", err) + } + defer coorClient.Close() + + moveResp, err := coorClient.Move(bucketName, objectName, userId, destination) + if err != nil { + return fmt.Errorf("request to coordinator failed, err: %w", err) + } + if moveResp.ErrorCode != errorcode.OK { + return fmt.Errorf("coordinator operation failed, code: %s, message: %s", moveResp.ErrorCode, moveResp.Message) + } + + // 然后向代理端发送移动文件的请求 + agentClient, err := raclient.NewAgentClient(destination) + if err != nil { + return fmt.Errorf("create agent client to %s failed, err: %w", destination, err) + } + defer agentClient.Close() + + switch moveResp.Redundancy { + case consts.REDUNDANCY_REP: + agentMoveResp, err := agentClient.RepMove(moveResp.Hashes, bucketName, objectName, userId, moveResp.FileSizeInBytes) + if err != nil { + return fmt.Errorf("request to agent %s failed, err: %w", destination, err) } - }() - wg.Wait() - fmt.Println(redundancy) - fmt.Println(hashs) - fmt.Println(ids) - fmt.Println(fileSizeInBytes) - fmt.Println(ecName) - //根据redundancy调用repMove和ecMove - - rabbit3 := rabbitmq.NewRabbitMQSimple("agentQueue" + destination) - var b2 []byte - switch redundancy { - case "rep": - command2 := rabbitmq.RepMoveCommand{ - Hashs: hashs, - BucketName: bucketName, - ObjectName: objectName, - UserId: userId, - FileSizeInBytes: fileSizeInBytes, + if agentMoveResp.ErrorCode != errorcode.OK { + return fmt.Errorf("agent %s operation failed, code: %s, messsage: %s", destination, agentMoveResp.ErrorCode, agentMoveResp.Message) } - c2, _ := json.Marshal(command2) - b2 = append([]byte("00"), c2...) - case "ec": - command2 := rabbitmq.EcMoveCommand{ - Hashs: hashs, - Ids: ids, - EcName: ecName, - BucketName: bucketName, - ObjectName: objectName, - UserId: userId, - FileSizeInBytes: fileSizeInBytes, + + case consts.REDUNDANCY_EC: + agentMoveResp, err := agentClient.ECMove(moveResp.Hashes, moveResp.IDs, moveResp.ECName, bucketName, objectName, userId, moveResp.FileSizeInBytes) + if err != nil { + return fmt.Errorf("request to agent %s failed, err: %w", destination, err) } - c2, _ := json.Marshal(command2) - b2 = append([]byte("01"), c2...) - } - fmt.Println(b2) - rabbit3.PublishSimple(b2) - //接受调度成功与否的消息 - //接受第二轮通讯结果 - var res2 rabbitmq.AgentMoveRes - queueName = "agentClientQueue" + strconv.Itoa(userId) - rabbit4 := rabbitmq.NewRabbitMQSimple(queueName) - msgs = rabbit4.ConsumeSimple(time.Millisecond, true) - wg.Add(1) - go func() { - for d := range msgs { - _ = json.Unmarshal(d.Body, &res2) - if res2.MoveCode == 0 { - wg.Done() - fmt.Println("Move Success") - } + if agentMoveResp.ErrorCode != errorcode.OK { + return fmt.Errorf("agent %s operation failed, code: %s, messsage: %s", destination, agentMoveResp.ErrorCode, agentMoveResp.Message) } - }() - wg.Wait() + } - rabbit1.Destroy() - rabbit2.Destroy() - rabbit3.Destroy() - rabbit4.Destroy() + return nil } -func Read(localFilePath string, bucketName string, objectName string) { +func Read(localFilePath string, bucketName string, objectName string) error { fmt.Println("read " + bucketName + "/" + objectName + " to " + localFilePath) //获取块hash,ip,序号,编码参数等 //发送写请求,分配写入节点Ip userId := 0 - command1 := rabbitmq.ReadCommand{ - BucketName: bucketName, - ObjectName: objectName, - UserId: userId, - } - c1, _ := json.Marshal(command1) - //TODO xh: 用常量定义"02"等 - b1 := append([]byte("02"), c1...) - fmt.Println(b1) - rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue") - rabbit1.PublishSimple(b1) - - //接收消息,赋值给ip, repHash, fileSizeInBytes - var res1 rabbitmq.ReadRes - var hashs []string - var ips []string - var fileSizeInBytes int64 - var ecName string - var ids []int - var redundancy string - queueName := "coorClientQueue" + strconv.Itoa(userId) - rabbit2 := rabbitmq.NewRabbitMQSimple(queueName) - msgs := rabbit2.ConsumeSimple(time.Millisecond, true) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - //TODO xh: 增加消息队列等待超时机制(配置文件指定最长等待时间,超时报错返回) - for d := range msgs { - _ = json.Unmarshal(d.Body, &res1) - ips = res1.Ips - hashs = res1.Hashs - ids = res1.BlockIds - ecName = res1.EcName - fileSizeInBytes = res1.FileSizeInBytes - redundancy = res1.Redundancy - wg.Done() - } - }() - wg.Wait() - fmt.Println(redundancy) - fmt.Println(ips) - fmt.Println(hashs) - fmt.Println(ids) - fmt.Println(ecName) - fmt.Println(fileSizeInBytes) - rabbit1.Destroy() - rabbit2.Destroy() - switch redundancy { - //TODO xh: redundancy换为bool型,用常量EC表示ec,REP表示rep + + // 先向协调端请求文件相关的数据 + coorClient, err := raclient.NewCoordinatorClient() + if err != nil { + return fmt.Errorf("create coordinator client failed, err: %w", err) + } + defer coorClient.Close() + + readResp, err := coorClient.Read(bucketName, objectName, userId) + if err != nil { + return fmt.Errorf("request to coordinator failed, err: %w", err) + } + if readResp.ErrorCode != errorcode.OK { + return fmt.Errorf("coordinator operation failed, code: %s, message: %s", readResp.ErrorCode, readResp.Message) + } + + switch readResp.Redundancy { case "rep": - repRead(fileSizeInBytes, ips[0], hashs[0], localFilePath) + err = repRead(readResp.FileSizeInBytes, readResp.IPs[0], readResp.Hashes[0], localFilePath) + if err != nil { + return fmt.Errorf("rep read failed, err: %w", err) + } + case "ec": - ecRead(fileSizeInBytes, ips, hashs, ids, ecName, localFilePath) + // TODO EC部分的代码要考虑重构 + ecRead(readResp.FileSizeInBytes, readResp.IPs, readResp.Hashes, readResp.BlockIDs, readResp.ECName, localFilePath) } + return nil } -func repRead(fileSizeInBytes int64, ip string, repHash string, localFilePath string) { - numPacket := (fileSizeInBytes + packetSizeInBytes - 1) / (packetSizeInBytes) - fmt.Println(numPacket) - //rpc相关 - conn, err := grpc.Dial(ip+port, grpc.WithInsecure()) +func repRead(fileSizeInBytes int64, ip string, repHash string, localFilePath string) error { + grpcAddr := ip + port + conn, err := grpc.Dial(grpcAddr, grpc.WithInsecure()) if err != nil { - panic(err) + return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) } + defer conn.Close() + client := agentcaller.NewTranBlockOrReplicaClient(conn) - fDir, err := os.Executable() + curExecPath, err := os.Executable() if err != nil { - panic(err) + return fmt.Errorf("get executable directory failed, err: %w", err) } - //TODO xh:删除assets,改为读到当前目录与localFilePath的叠加路径 - fURL := filepath.Join(filepath.Dir(fDir), "assets") - _, err = os.Stat(fURL) - if os.IsNotExist(err) { - os.MkdirAll(fURL, os.ModePerm) + + outputFilePath := filepath.Join(filepath.Dir(curExecPath), localFilePath) + outputFileDir := filepath.Dir(outputFilePath) + + err = os.MkdirAll(outputFileDir, os.ModePerm) + if err != nil { + return fmt.Errorf("create output file directory %s failed, err: %w", outputFileDir, err) } - file, err := os.Create(filepath.Join(fURL, localFilePath)) + outputFile, err := os.Create(outputFilePath) if err != nil { - return + return fmt.Errorf("create output file %s failed, err: %w", outputFilePath, err) } + defer outputFile.Close() + /* TO DO: 判断本地有没有ipfs daemon、能否获取相应对象的cid 如果本地有ipfs daemon且能获取相应对象的cid,则获取对象cid对应的ipfsblock的cid,通过ipfs网络获取这些ipfsblock 否则,像目前一样,使用grpc向指定节点获取 */ - stream, _ := client.GetBlockOrReplica(context.Background(), &agentcaller.GetReq{ + stream, err := client.GetBlockOrReplica(context.Background(), &agentcaller.GetReq{ BlockOrReplicaHash: repHash, }) - fmt.Println(numPacket) - for i := 0; int64(i) < numPacket; i++ { - fmt.Println(i) - res, _ := stream.Recv() - fmt.Println(res.BlockOrReplicaData) - file.Write(res.BlockOrReplicaData) - } - file.Close() - conn.Close() -} - -func RepWrite(localFilePath string, bucketName string, objectName string, numRep int) { - userId := 0 - //获取文件大小 - fileInfo, _ := os.Stat(localFilePath) - fileSizeInBytes := fileInfo.Size() - fmt.Println(fileSizeInBytes) - - //写入对象的packet数 - numWholePacket := fileSizeInBytes / packetSizeInBytes - lastPacketInBytes := fileSizeInBytes % packetSizeInBytes - numPacket := numWholePacket - if lastPacketInBytes > 0 { - numPacket++ + if err != nil { + return fmt.Errorf("request grpc failed, err: %w", err) } - //发送写请求,请求Coor分配写入节点Ip - //TO DO: 加入两个字段,本机IP和当前进程号 - command1 := rabbitmq.RepWriteCommand{ - BucketName: bucketName, - ObjectName: objectName, - FileSizeInBytes: fileSizeInBytes, - NumRep: numRep, - UserId: userId, - } - c1, _ := json.Marshal(command1) - b1 := append([]byte("03"), c1...) - fmt.Println(b1) - rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue") - rabbit1.PublishSimple(b1) - - var res1 rabbitmq.WriteRes - var ips []string - /* - TODO xh: 判断writeRes里的状态码 - 如果有错,就报错返回,结束程序 - 如果没错,就把得到的IP值赋给ips - */ - - //TODO xh: queueName调整:coorClientQueue+"_"+"本机Ip"+"_"+"进程号" - queueName := "coorClientQueue" + strconv.Itoa(userId) - rabbit2 := rabbitmq.NewRabbitMQSimple(queueName) - msgs := rabbit2.ConsumeSimple(time.Millisecond, true) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - for d := range msgs { - _ = json.Unmarshal(d.Body, &res1) - ips = res1.Ips + numPacket := (fileSizeInBytes + packetSizeInBytes - 1) / (packetSizeInBytes) + for i := int64(0); i < numPacket; i++ { + resp, err := stream.Recv() + if err != nil { + return fmt.Errorf("read file data on grpc stream failed, err: %w", err) } - wg.Done() - }() - wg.Wait() - //创建channel - loadDistributeBufs := make([]chan []byte, numRep) - for i := 0; i < numRep; i++ { - loadDistributeBufs[i] = make(chan []byte) - } - - //正式开始写入 - hashs := make([]string, numRep) - go loadDistribute(localFilePath, loadDistributeBufs[:], numWholePacket, lastPacketInBytes) //从本地文件系统加载数据 - wg.Add(numRep) - for i := 0; i < numRep; i++ { - //TODO xh: send的第一个参数不需要了 - go send("rep.json"+strconv.Itoa(i), ips[i], loadDistributeBufs[i], numPacket, &wg, hashs, i) //"block1.json"这样参数不需要 + err = myio.WriteAll(outputFile, resp.BlockOrReplicaData) + if err != nil { + return fmt.Errorf("write file data to local file failed, err: %w", err) + } } - wg.Wait() - //第二轮通讯:插入元数据hashs - //TODO xh: 加入pid字段 - command2 := rabbitmq.WriteHashCommand{ - BucketName: bucketName, - ObjectName: objectName, - Hashs: hashs, - Ips: ips, - UserId: userId, - } - c1, _ = json.Marshal(command2) - b1 = append([]byte("04"), c1...) - rabbit1.PublishSimple(b1) - - //接受第二轮通讯结果 - var res2 rabbitmq.WriteHashRes - msgs = rabbit2.ConsumeSimple(time.Millisecond, true) - wg.Add(1) - go func() { - for d := range msgs { - _ = json.Unmarshal(d.Body, &res2) - if res2.MetaCode == 0 { - wg.Done() - } - //TODO xh: MetaCode不为零,代表插入出错,需输出错误 - } - }() - wg.Wait() - rabbit1.Destroy() - rabbit2.Destroy() - // + return nil } func ecRead(fileSizeInBytes int64, ips []string, blockHashs []string, blockIds []int, ecName string, localFilePath string) { @@ -369,15 +208,80 @@ func ecRead(fileSizeInBytes int64, ips []string, blockHashs []string, blockIds [ go decode(getBufs[:], decodeBufs[:], blockSeq, ecK, coefs, numPacket) go persist(decodeBufs[:], numPacket, localFilePath, &wg) wg.Wait() +} + +func RepWrite(localFilePath string, bucketName string, objectName string, numRep int) error { + userId := 0 + //获取文件大小 + fileInfo, err := os.Stat(localFilePath) + if err != nil { + return fmt.Errorf("get file %s state failed, err: %w", localFilePath, err) + } + fileSizeInBytes := fileInfo.Size() + + //写入对象的packet数 + numWholePacket := fileSizeInBytes / packetSizeInBytes + lastPacketInBytes := fileSizeInBytes % packetSizeInBytes + numPacket := numWholePacket + if lastPacketInBytes > 0 { + numPacket++ + } + + coorClient, err := raclient.NewCoordinatorClient() + if err != nil { + return fmt.Errorf("create coordinator client failed, err: %w", err) + } + defer coorClient.Close() + + //发送写请求,请求Coor分配写入节点Ip + repWriteResp, err := coorClient.RepWrite(bucketName, objectName, fileSizeInBytes, numRep, userId) + if err != nil { + return fmt.Errorf("request to coordinator failed, err: %w", err) + } + if repWriteResp.ErrorCode != errorcode.OK { + return fmt.Errorf("coordinator RepWrite failed, err: %w", err) + } + + //创建channel + loadDistributeBufs := make([]chan []byte, numRep) + for i := 0; i < numRep; i++ { + loadDistributeBufs[i] = make(chan []byte) + } + + //正式开始写入 + hashs := make([]string, numRep) + go loadDistribute(localFilePath, loadDistributeBufs[:], numWholePacket, lastPacketInBytes) //从本地文件系统加载数据 + var wg sync.WaitGroup + wg.Add(numRep) + for i := 0; i < numRep; i++ { + //TODO xh: send的第一个参数不需要了 + // TODO2 见上 + go send("rep.json"+strconv.Itoa(i), repWriteResp.IPs[i], loadDistributeBufs[i], numPacket, &wg, hashs, i) //"block1.json"这样参数不需要 + } + wg.Wait() + // 记录写入的文件的Hash + writeRepHashResp, err := coorClient.WriteRepHash(bucketName, objectName, hashs, repWriteResp.IPs, userId) + if err != nil { + return fmt.Errorf("request to coordinator failed, err: %w", err) + } + if writeRepHashResp.ErrorCode != errorcode.OK { + return fmt.Errorf("coordinator WriteRepHash failed, err: %w", err) + } + + return nil } -func EcWrite(localFilePath string, bucketName string, objectName string, ecName string) { +func EcWrite(localFilePath string, bucketName string, objectName string, ecName string) error { fmt.Println("write " + localFilePath + " as " + bucketName + "/" + objectName) + //获取文件大小 - fileInfo, _ := os.Stat(localFilePath) + fileInfo, err := os.Stat(localFilePath) + if err != nil { + return fmt.Errorf("get file %s state failed, err: %w", localFilePath, err) + } fileSizeInBytes := fileInfo.Size() - fmt.Println(fileSizeInBytes) + //调用纠删码库,获取编码参数及生成矩阵 ecPolicies := *utils.GetEcPolicy() ecPolicy := ecPolicies[ecName] @@ -398,38 +302,21 @@ func EcWrite(localFilePath string, bucketName string, objectName string, ecName numPacket := (fileSizeInBytes + int64(ecK)*packetSizeInBytes - 1) / (int64(ecK) * packetSizeInBytes) fmt.Println(numPacket) - //发送写请求,分配写入节点 userId := 0 - //发送写请求,分配写入节点Ip - command1 := rabbitmq.EcWriteCommand{ - BucketName: bucketName, - ObjectName: objectName, - FileSizeInBytes: fileSizeInBytes, - EcName: ecName, - UserId: userId, - } // - c1, _ := json.Marshal(command1) - b1 := append([]byte("00"), c1...) // - fmt.Println(b1) - rabbit1 := rabbitmq.NewRabbitMQSimple("coorQueue") - rabbit1.PublishSimple(b1) - - //接收消息,赋值给ips - var res1 rabbitmq.WriteRes - var ips []string - queueName := "coorClientQueue" + strconv.Itoa(userId) - rabbit2 := rabbitmq.NewRabbitMQSimple(queueName) - msgs := rabbit2.ConsumeSimple(time.Millisecond, true) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - for d := range msgs { - _ = json.Unmarshal(d.Body, &res1) - ips = res1.Ips - wg.Done() - } - }() - wg.Wait() + coorClient, err := raclient.NewCoordinatorClient() + if err != nil { + return fmt.Errorf("create coordinator client failed, err: %w", err) + } + defer coorClient.Close() + + //发送写请求,请求Coor分配写入节点Ip + ecWriteResp, err := coorClient.ECWrite(bucketName, objectName, fileSizeInBytes, ecName, userId) + if err != nil { + return fmt.Errorf("request to coordinator failed, err: %w", err) + } + if ecWriteResp.ErrorCode != errorcode.OK { + return fmt.Errorf("coordinator ECWrite failed, err: %w", err) + } //创建channel loadBufs := make([]chan []byte, ecN) @@ -451,41 +338,24 @@ func EcWrite(localFilePath string, bucketName string, objectName string, ecName go load(localFilePath, loadBufs[:ecN], ecK, numPacket*int64(ecK), fileSizeInBytes) //从本地文件系统加载数据 go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket) + var wg sync.WaitGroup wg.Add(ecN) for i := 0; i < ecN; i++ { - go send(blockNames[i], ips[i], encodeBufs[i], numPacket, &wg, hashs, i) + go send(blockNames[i], ecWriteResp.IPs[i], encodeBufs[i], numPacket, &wg, hashs, i) } wg.Wait() - //fmt.Println(hashs) + //第二轮通讯:插入元数据hashs - command2 := rabbitmq.WriteHashCommand{ - BucketName: bucketName, - ObjectName: objectName, - Hashs: hashs, - Ips: ips, - UserId: userId, - } - c1, _ = json.Marshal(command2) - b1 = append([]byte("01"), c1...) - rabbit1.PublishSimple(b1) - - //接受第二轮通讯结果 - var res2 rabbitmq.WriteHashRes - msgs = rabbit2.ConsumeSimple(time.Millisecond, true) - wg.Add(1) - go func() { - for d := range msgs { - _ = json.Unmarshal(d.Body, &res2) - if res2.MetaCode == 0 { - wg.Done() - } - } - }() - wg.Wait() - rabbit1.Destroy() - rabbit2.Destroy() - // + writeRepHashResp, err := coorClient.WriteECHash(bucketName, objectName, hashs, ecWriteResp.IPs, userId) + if err != nil { + return fmt.Errorf("request to coordinator failed, err: %w", err) + } + if writeRepHashResp.ErrorCode != errorcode.OK { + return fmt.Errorf("coordinator WriteECHash failed, err: %w", err) + } + + return nil } func repMove(ip string, hash string) { diff --git a/go.mod b/go.mod index aa58fe3..60850f1 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,11 @@ -module client +module gitlink.org.cn/cloudream/client go 1.18 -require google.golang.org/grpc v1.53.0 +require ( + google.golang.org/grpc v1.53.0 + gitlink.org.cn/cloudream/rabbitmq v0.0.0 +) require ( github.com/golang/protobuf v1.5.2 // indirect @@ -13,3 +16,5 @@ require ( google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect google.golang.org/protobuf v1.28.1 // indirect ) + +replace gitlink.org.cn/cloudream/rabbitmq => ../rabbitmq diff --git a/go.sum b/go.sum index c5ea9bc..64953be 100644 --- a/go.sum +++ b/go.sum @@ -1,21 +1,29 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w= -google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= -google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc= -google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= +google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd h1:sLpv7bNL1AsX3fdnWh9WVh7ejIzXdOc1RRHGeAmeStU= +google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak= +google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= +google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= +google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= +google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= -google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= diff --git a/main.go b/main.go index 239ec33..86e44ba 100644 --- a/main.go +++ b/main.go @@ -1,24 +1,15 @@ package main import ( - //"context" - //"io" "fmt" "os" - //"path/filepath" - //"sync" - "strconv" - //agentcaller "proto" - - //"github.com/pborman/uuid" - //"github.com/streadway/amqp" - - //"google.golang.org/grpc" - _ "google.golang.org/grpc/balancer/grpclb" + + "strconv" ) +// TODO2 配置读取 //TODO xh: 读取配置文件,初始化变量,获取packetSizeInBytes、grpc port、ipfs port、最大副本数、本机公网Ip等信息,参照src/utils/config.go func main() { @@ -30,19 +21,27 @@ func main() { switch args[1] { case "ecWrite": - EcWrite(args[2], args[3], args[4], args[5]) //TODO: 写入对象时,Coor判断对象是否已存在,如果存在,则直接返回 + if err := EcWrite(args[2], args[3], args[4], args[5]); err != nil { + fmt.Printf("ec write failed, err: %s", err.Error()) + } case "write": numRep, _ := strconv.Atoi(args[5]) if numRep <= 0 || numRep > 10 { //TODO xh:10改为从配置文件中读出的最大副本数 print("write::InputError!") //TODO xh:优化提示语 } else { - RepWrite(args[2], args[3], args[4], numRep) + if err := RepWrite(args[2], args[3], args[4], numRep); err != nil { + fmt.Printf("rep write failed, err: %s", err.Error()) + } } case "read": - Read(args[2], args[3], args[4]) + if err := Read(args[2], args[3], args[4]); err != nil { + fmt.Printf("read failed, err: %s", err.Error()) + } case "move": - Move(args[2], args[3], args[4]) //bucket object destination + if err := Move(args[2], args[3], args[4]); err != nil { + fmt.Printf("move failed, err: %s", err.Error()) + } } /* TO DO future: