diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..bb9d0f7 --- /dev/null +++ b/Makefile @@ -0,0 +1,15 @@ +OUTPUT_BINARY_NAME = "cloud-agent" +OUTPUT_DIR_NAME = "cloud-agent" + + +ASSETS_DIR_NAME = "assets" +BUILD_DIR = "../../build" + +build: + go build -o ${BUILD_DIR}/${OUTPUT_DIR_NAME}/${OUTPUT_BINARY_NAME} + if [ -d ${ASSETS_DIR_NAME} ];then \ + cp -r ${ASSETS_DIR_NAME}/* ${BUILD_DIR}/${OUTPUT_DIR_NAME}/; \ + fi + +clean: + rm -f ${BUILD_DIR}/${OUTPUT_DIR_NAME}/${OUTPUT_BINARY_NAME} \ No newline at end of file diff --git a/assets/config/config.json b/assets/config/config.json new file mode 100644 index 0000000..9eb1498 --- /dev/null +++ b/assets/config/config.json @@ -0,0 +1,10 @@ +{ + "grpcPort": 5010, + "grpcPacketSize": 10, + "localIP": "localhost", + "logger": { + "outputFileName": "cloud-agent", + "outputDirectory": "log", + "level": "debug" + } +} \ No newline at end of file diff --git a/command_service.go b/command_service.go index cb7cb89..0e143f7 100644 --- a/command_service.go +++ b/command_service.go @@ -7,6 +7,7 @@ import ( "strconv" "sync" + "gitlink.org.cn/cloudream/agent/config" "gitlink.org.cn/cloudream/ec" "gitlink.org.cn/cloudream/utils" @@ -48,13 +49,13 @@ func (service *CommandService) RepMove(msg *ramsg.RepMoveCommand) ramsg.AgentMov fmt.Println(fURL) fileInfo, _ := inFile.Stat() fileSizeInBytes := fileInfo.Size() - numWholePacket := fileSizeInBytes / packetSizeInBytes - lastPacketInBytes := fileSizeInBytes % packetSizeInBytes + numWholePacket := fileSizeInBytes / config.Cfg().GRCPPacketSize + lastPacketInBytes := fileSizeInBytes % config.Cfg().GRCPPacketSize fmt.Println(fileSizeInBytes) fmt.Println(numWholePacket) fmt.Println(lastPacketInBytes) for i := 0; int64(i) < numWholePacket; i++ { - buf := make([]byte, packetSizeInBytes) + buf := make([]byte, config.Cfg().GRCPPacketSize) inFile.Read(buf) outFile.Write(buf) } @@ -107,17 +108,17 @@ func (service *CommandService) RepMove(msg *ramsg.RepMoveCommand) ramsg.AgentMov //源文件 data := CatIPFS(hashs[0]) - numWholePacket := fileSizeInBytes / packetSizeInBytes - lastPacketInBytes := fileSizeInBytes % packetSizeInBytes + numWholePacket := fileSizeInBytes / int64(config.Cfg().GRCPPacketSize) + lastPacketInBytes := fileSizeInBytes % int64(config.Cfg().GRCPPacketSize) fmt.Println(fileSizeInBytes) fmt.Println(numWholePacket) fmt.Println(lastPacketInBytes) for i := 0; int64(i) < numWholePacket; i++ { - buf := []byte(data[i*packetSizeInBytes : i*packetSizeInBytes+packetSizeInBytes]) + buf := []byte(data[i*config.Cfg().GRCPPacketSize : i*config.Cfg().GRCPPacketSize+config.Cfg().GRCPPacketSize]) outFile.Write(buf) } if lastPacketInBytes > 0 { - buf := []byte(data[numWholePacket*packetSizeInBytes : numWholePacket*packetSizeInBytes+lastPacketInBytes]) + buf := []byte(data[numWholePacket*int64(config.Cfg().GRCPPacketSize) : numWholePacket*int64(config.Cfg().GRCPPacketSize)+lastPacketInBytes]) outFile.Write(buf) } outFile.Close() @@ -129,7 +130,7 @@ func (service *CommandService) RepMove(msg *ramsg.RepMoveCommand) ramsg.AgentMov return ramsg.NewAgentMoveRespFailed(errorcode.OPERATION_FAILED, fmt.Sprintf("create coordinator client failed")) } defer coorClient.Close() - coorClient.TempCacheReport(LocalIp, hashs) + coorClient.TempCacheReport(config.Cfg().LocalIP, hashs) return ramsg.NewAgentMoveRespOK() } @@ -147,7 +148,7 @@ func (service *CommandService) ECMove(msg *ramsg.ECMoveCommand) ramsg.AgentMoveR ecPolicy := ecPolicies[ecName] ecK := ecPolicy.GetK() ecN := ecPolicy.GetN() - numPacket := (fileSizeInBytes + int64(ecK)*packetSizeInBytes - 1) / (int64(ecK) * packetSizeInBytes) + numPacket := (fileSizeInBytes + int64(ecK)*int64(config.Cfg().GRCPPacketSize) - 1) / (int64(ecK) * int64(config.Cfg().GRCPPacketSize)) getBufs := make([]chan []byte, ecN) decodeBufs := make([]chan []byte, ecK) @@ -175,7 +176,7 @@ func (service *CommandService) ECMove(msg *ramsg.ECMoveCommand) ramsg.AgentMoveR return ramsg.NewAgentMoveRespFailed(errorcode.OPERATION_FAILED, fmt.Sprintf("create coordinator client failed")) } defer coorClient.Close() - coorClient.TempCacheReport(LocalIp, hashs) + coorClient.TempCacheReport(config.Cfg().LocalIP, hashs) return ramsg.NewAgentMoveRespOK() } @@ -227,7 +228,7 @@ func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int func get(blockHash string, getBuf chan []byte, numPacket int64) { data := CatIPFS(blockHash) for i := 0; int64(i) < numPacket; i++ { - buf := []byte(data[i*packetSizeInBytes : i*packetSizeInBytes+packetSizeInBytes]) + buf := []byte(data[i*config.Cfg().GRCPPacketSize : i*config.Cfg().GRCPPacketSize+config.Cfg().GRCPPacketSize]) getBuf <- buf } close(getBuf) diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..cd59f53 --- /dev/null +++ b/config/config.go @@ -0,0 +1,23 @@ +package config + +import ( + c "gitlink.org.cn/cloudream/utils/config" + log "gitlink.org.cn/cloudream/utils/logger" +) + +type Config struct { + GRPCPort int `json:"grpcPort"` + GRCPPacketSize int `json:"grpcPacketSize"` + LocalIP string `json:"localIP"` + Logger log.Config `json:"logger"` +} + +var cfg Config + +func Init() error { + return c.DefaultLoad(&cfg) +} + +func Cfg() *Config { + return &cfg +} diff --git a/dataServer.go b/dataServer.go index ee7d7db..f45838e 100644 --- a/dataServer.go +++ b/dataServer.go @@ -11,6 +11,7 @@ import ( "bytes" "io/ioutil" + "gitlink.org.cn/cloudream/agent/config" agentserver "gitlink.org.cn/cloudream/proto" shell "github.com/ipfs/go-ipfs-api" @@ -177,8 +178,8 @@ func (s *anyOne) GetBlockOrReplica(req *agentserver.GetReq, server agentserver.T fSize := len(data) fmt.Println(fSize) - numPacket := fSize / packetSizeInBytes - lastPacketInBytes := fSize % packetSizeInBytes + numPacket := fSize / config.Cfg().GRCPPacketSize + lastPacketInBytes := fSize % config.Cfg().GRCPPacketSize if lastPacketInBytes > 0 { numPacket++ } @@ -188,10 +189,10 @@ func (s *anyOne) GetBlockOrReplica(req *agentserver.GetReq, server agentserver.T if i == numPacket-1 && lastPacketInBytes > 0 { buf = make([]byte, lastPacketInBytes) } else { - buf = make([]byte, packetSizeInBytes) + buf = make([]byte, config.Cfg().GRCPPacketSize) } fmt.Println(len(buf)) - buf = []byte(data[i*packetSizeInBytes : i*packetSizeInBytes+packetSizeInBytes]) + buf = []byte(data[i*config.Cfg().GRCPPacketSize : i*config.Cfg().GRCPPacketSize+config.Cfg().GRCPPacketSize]) fmt.Println(buf) print("#@#@#@#@#") diff --git a/heartReporter.go b/heartReporter.go index a92f3ee..5c36014 100644 --- a/heartReporter.go +++ b/heartReporter.go @@ -1,10 +1,11 @@ package main import ( - "fmt" "sync" "time" + log "github.com/sirupsen/logrus" + "gitlink.org.cn/cloudream/agent/config" racli "gitlink.org.cn/cloudream/rabbitmq/client" "gitlink.org.cn/cloudream/utils" "gitlink.org.cn/cloudream/utils/consts" @@ -14,7 +15,7 @@ func reportStatus(wg *sync.WaitGroup) { coorCli, err := racli.NewCoordinatorClient() if err != nil { wg.Done() - // TODO 日志 + log.Error("new coordinator client failed, err: %w", err) return } @@ -27,28 +28,32 @@ func reportStatus(wg *sync.WaitGroup) { waitG.Add(len(ips)) for i := 0; i < len(ips); i++ { go func(i int, wg *sync.WaitGroup) { - connStatus := utils.GetConnStatus(ips[i]) - fmt.Println(connStatus) + connStatus, err := utils.GetConnStatus(ips[i]) + if err != nil { + wg.Done() + log.Warnf("ping %s failed, err: %s", ips[i], err.Error()) + return + } + + log.Debugf("connection status to %s: %+v", ips[i], connStatus) + if connStatus.IsReachable { agentDelay[i] = int(connStatus.Delay.Milliseconds()) + 1 } else { agentDelay[i] = -1 } - print(agentDelay[i]) - //wg.Wait() wg.Done() }(i, &waitG) } waitG.Wait() - fmt.Println(agentDelay) //TODO: 查看本地IPFS daemon是否正常,记录到ipfsStatus ipfsStatus := consts.IPFS_STATUS_OK //TODO:访问自身资源目录(配置文件中获取路径),记录是否正常,记录到localDirStatus localDirStatus := consts.LOCAL_DIR_STATUS_OK //发送心跳 - coorCli.AgentStatusReport("localhost", agentDelay, ipfsStatus, localDirStatus) + coorCli.AgentStatusReport(config.Cfg().LocalIP, agentDelay, ipfsStatus, localDirStatus) time.Sleep(time.Minute * 5) } diff --git a/main.go b/main.go index 512adb2..8841415 100644 --- a/main.go +++ b/main.go @@ -1,34 +1,43 @@ package main import ( + "fmt" "net" + "os" "sync" + "gitlink.org.cn/cloudream/agent/config" agentserver "gitlink.org.cn/cloudream/proto" + "gitlink.org.cn/cloudream/utils/logger" "google.golang.org/grpc" rasvr "gitlink.org.cn/cloudream/rabbitmq/server" ) -//TODO xh: 读取配置文件,初始化变量,获取packetSizeInBytes、grpc port、ipfs port、最大副本数、本机公网Ip等信息,参照src/utils/config.go - -const ( - Port = ":5010" - packetSizeInBytes = 10 - LocalIp = "localhost" -) - +// TODO 此数据是否在运行时会发生变化? var AgentIpList []string func main() { + err := config.Init() + if err != nil { + fmt.Printf("init config failed, err: %s", err.Error()) + os.Exit(1) + } + + err = logger.Init(&config.Cfg().Logger) + if err != nil { + fmt.Printf("init logger failed, err: %s", err.Error()) + os.Exit(1) + } + AgentIpList = []string{"pcm01", "pcm1", "pcm2"} //处置协调端、客户端命令(可多建几个) wg := sync.WaitGroup{} wg.Add(2) // 启动命令服务器 - cmdSvr, err := rasvr.NewAgentServer(&CommandService{}, LocalIp) + cmdSvr, err := rasvr.NewAgentServer(&CommandService{}, config.Cfg().LocalIP) if err != nil { // TODO 错误日志 return @@ -38,7 +47,7 @@ func main() { go reportStatus(&wg) //网络延迟感知 //面向客户端收发数据 - lis, err := net.Listen("tcp", Port) + lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", config.Cfg().GRPCPort)) if err != nil { panic(err) }