diff --git a/agent/internal/services/grpc/io.go b/agent/internal/grpc/io.go similarity index 100% rename from agent/internal/services/grpc/io.go rename to agent/internal/grpc/io.go diff --git a/agent/internal/services/grpc/service.go b/agent/internal/grpc/service.go similarity index 100% rename from agent/internal/services/grpc/service.go rename to agent/internal/grpc/service.go diff --git a/agent/internal/services/mq/agent.go b/agent/internal/mq/agent.go similarity index 100% rename from agent/internal/services/mq/agent.go rename to agent/internal/mq/agent.go diff --git a/agent/internal/services/mq/cache.go b/agent/internal/mq/cache.go similarity index 100% rename from agent/internal/services/mq/cache.go rename to agent/internal/mq/cache.go diff --git a/agent/internal/services/mq/io.go b/agent/internal/mq/io.go similarity index 100% rename from agent/internal/services/mq/io.go rename to agent/internal/mq/io.go diff --git a/agent/internal/services/mq/object.go b/agent/internal/mq/object.go similarity index 100% rename from agent/internal/services/mq/object.go rename to agent/internal/mq/object.go diff --git a/agent/internal/services/mq/service.go b/agent/internal/mq/service.go similarity index 100% rename from agent/internal/services/mq/service.go rename to agent/internal/mq/service.go diff --git a/agent/internal/services/mq/storage.go b/agent/internal/mq/storage.go similarity index 100% rename from agent/internal/services/mq/storage.go rename to agent/internal/mq/storage.go diff --git a/agent/main.go b/agent/main.go index e7c5258..1949f69 100644 --- a/agent/main.go +++ b/agent/main.go @@ -5,7 +5,6 @@ import ( "net" "os" "sync" - "time" log "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage/agent/internal/config" @@ -20,14 +19,10 @@ import ( "google.golang.org/grpc" - "gitlink.org.cn/cloudream/storage/common/consts" - "gitlink.org.cn/cloudream/storage/common/utils" - agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" - grpcsvc "gitlink.org.cn/cloudream/storage/agent/internal/services/grpc" - cmdsvc "gitlink.org.cn/cloudream/storage/agent/internal/services/mq" + grpcsvc "gitlink.org.cn/cloudream/storage/agent/internal/grpc" + cmdsvc "gitlink.org.cn/cloudream/storage/agent/internal/mq" ) // TODO 此数据是否在运行时会发生变化? @@ -136,57 +131,3 @@ func serveDistLock(svc *distlock.Service) { log.Info("distlock stopped") } - -func reportStatus(wg *sync.WaitGroup) { - coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ) - if err != nil { - wg.Done() - log.Error("new coordinator client failed, err: %w", err) - return - } - - // TODO 增加退出死循环的方法 - for { - //挨个ping其他agent(AgentIpList),记录延迟到AgentDelay - // TODO AgentIP考虑放到配置文件里或者启动时从coor获取 - ips := utils.GetAgentIps() - agentDelay := make([]int, len(ips)) - waitG := sync.WaitGroup{} - waitG.Add(len(ips)) - for i := 0; i < len(ips); i++ { - go func(i int, wg *sync.WaitGroup) { - connStatus, err := utils.GetConnStatus(ips[i]) - if err != nil { - wg.Done() - log.Warnf("ping %s failed, err: %s", ips[i], err.Error()) - return - } - - log.Debugf("connection status to %s: %+v", ips[i], connStatus) - - if connStatus.IsReachable { - agentDelay[i] = int(connStatus.Delay.Milliseconds()) + 1 - } else { - agentDelay[i] = -1 - } - - wg.Done() - }(i, &waitG) - } - waitG.Wait() - //TODO: 查看本地IPFS daemon是否正常,记录到ipfsStatus - ipfsStatus := consts.IPFSStateOK - //TODO:访问自身资源目录(配置文件中获取路径),记录是否正常,记录到localDirStatus - localDirStatus := consts.StorageDirectoryStateOK - - //发送心跳 - // TODO 由于数据结构未定,暂时不发送真实数据 - coorCli.AgentStatusReport(coormq.NewAgentStatusReportBody(config.Cfg().ID, []int64{}, []int{}, ipfsStatus, localDirStatus)) - - time.Sleep(time.Minute * 5) - } - - coorCli.Close() - - wg.Done() -} diff --git a/common/utils/config.go b/common/utils/config.go deleted file mode 100644 index 332b851..0000000 --- a/common/utils/config.go +++ /dev/null @@ -1,74 +0,0 @@ -package utils - -import ( - "fmt" - "regexp" - "strconv" - - "github.com/beevik/etree" -) - -type EcConfig struct { - ecid string `xml:"ecid"` - class string `xml:"class"` - n int `xml:"n"` - k int `xml:"k"` - w int `xml:"w"` - opt int `xml:"opt"` -} - -func (r *EcConfig) GetK() int { - return r.k -} - -func (r *EcConfig) GetN() int { - return r.n -} - -func GetEcPolicy() *map[string]EcConfig { - doc := etree.NewDocument() - if err := doc.ReadFromFile("../confs/sysSetting.xml"); err != nil { - panic(err) - } - ecMap := make(map[string]EcConfig, 20) - root := doc.SelectElement("setting") - for _, attr := range root.SelectElements("attribute") { - if name := attr.SelectElement("name"); name.Text() == "ec.policy" { - for _, eci := range attr.SelectElements("value") { - tt := EcConfig{} - tt.ecid = eci.SelectElement("ecid").Text() - tt.class = eci.SelectElement("class").Text() - tt.n, _ = strconv.Atoi(eci.SelectElement("n").Text()) - tt.k, _ = strconv.Atoi(eci.SelectElement("k").Text()) - tt.w, _ = strconv.Atoi(eci.SelectElement("w").Text()) - tt.opt, _ = strconv.Atoi(eci.SelectElement("opt").Text()) - ecMap[tt.ecid] = tt - } - } - } - fmt.Println(ecMap) - return &ecMap - // -} - -func GetAgentIps() []string { - doc := etree.NewDocument() - if err := doc.ReadFromFile("../confs/sysSetting.xml"); err != nil { - panic(err) - } - root := doc.SelectElement("setting") - var ips []string // 定义存储 IP 的字符串切片 - - for _, attr := range root.SelectElements("attribute") { - if name := attr.SelectElement("name"); name.Text() == "agents.addr" { - for _, ip := range attr.SelectElements("value") { - ipRegex := regexp.MustCompile(`\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b`) - match := ipRegex.FindString(ip.Text()) - print(match) - ips = append(ips, match) - } - } - } - - return ips -} diff --git a/common/utils/ping.go b/common/utils/ping.go deleted file mode 100644 index 85532d3..0000000 --- a/common/utils/ping.go +++ /dev/null @@ -1,82 +0,0 @@ -package utils - -import ( - //"fmt" - "github.com/go-ping/ping" - //"net" - "io/ioutil" - "net/http" - "strings" - "time" -) - -type ConnStatus struct { - Addr string - IsReachable bool - Delay time.Duration - TTL int -} - -// 获取本地主机 IP 地址 -func getLocalIP() string { - resp, err := http.Get("https://api.ipify.org") - if err != nil { - panic(err) - } - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - panic(err) - } - - ip := strings.TrimSpace(string(body)) - return ip -} - -func GetConnStatus(remoteIP string) (*ConnStatus, error) { - // 本地主机 IP 地址 - //localIP := getLocalIP() - //print("!@#@#!") - //print(localIP) - conn := ConnStatus{ - Addr: remoteIP, - IsReachable: false, - } - pinger, err := ping.NewPinger(remoteIP) - - if err != nil { - return nil, err - } - pinger.Count = 5 // 设置 ping 次数为 5 - // pinger.Interval = 1 // 设置 ping 时间间隔为 1 秒 - //pinger.Timeout = 2 // 设置 ping 超时时间为 2 秒 - //pinger.SetPrivileged(true) // 设置使用特权模式以获取 TTL 值 - pinger.OnRecv = func(pkt *ping.Packet) { - //fmt.Printf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v (DUP!)\n", - // pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.Ttl) - conn.TTL = pkt.Ttl - } - - /*pinger.OnDuplicateRecv = func(pkt *ping.Packet) { - fmt.Printf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v (DUP!)\n", - pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.Ttl) - }*/ - - pinger.OnFinish = func(stats *ping.Statistics) { - //fmt.Printf("\n--- %s ping statistics ---\n", stats.Addr) - //fmt.Printf("%d packets transmitted, %d packets received, %v%% packet loss\n", - // stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss) - //fmt.Printf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n", - // stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt) - if stats.PacketLoss == 0.0 { - conn.IsReachable = true - } - conn.Delay = stats.AvgRtt - } - err = pinger.Run() // Blocks until finished. - if err != nil { - return nil, err - } - return &conn, nil -} diff --git a/coordinator/internal/services/agent.go b/coordinator/internal/mq/agent.go similarity index 97% rename from coordinator/internal/services/agent.go rename to coordinator/internal/mq/agent.go index fa4137c..f782325 100644 --- a/coordinator/internal/services/agent.go +++ b/coordinator/internal/mq/agent.go @@ -1,4 +1,4 @@ -package services +package mq import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" diff --git a/coordinator/internal/services/bucket.go b/coordinator/internal/mq/bucket.go similarity index 99% rename from coordinator/internal/services/bucket.go rename to coordinator/internal/mq/bucket.go index 564b105..78e5b41 100644 --- a/coordinator/internal/services/bucket.go +++ b/coordinator/internal/mq/bucket.go @@ -1,4 +1,4 @@ -package services +package mq import ( "database/sql" diff --git a/coordinator/internal/services/cache.go b/coordinator/internal/mq/cache.go similarity index 98% rename from coordinator/internal/services/cache.go rename to coordinator/internal/mq/cache.go index 9df882c..215f8a0 100644 --- a/coordinator/internal/services/cache.go +++ b/coordinator/internal/mq/cache.go @@ -1,4 +1,4 @@ -package services +package mq import ( "database/sql" diff --git a/coordinator/internal/services/node.go b/coordinator/internal/mq/node.go similarity index 98% rename from coordinator/internal/services/node.go rename to coordinator/internal/mq/node.go index 0614264..fb6e13b 100644 --- a/coordinator/internal/services/node.go +++ b/coordinator/internal/mq/node.go @@ -1,4 +1,4 @@ -package services +package mq import ( "gitlink.org.cn/cloudream/common/consts/errorcode" diff --git a/coordinator/internal/services/object.go b/coordinator/internal/mq/object.go similarity index 99% rename from coordinator/internal/services/object.go rename to coordinator/internal/mq/object.go index d901c88..876644c 100644 --- a/coordinator/internal/services/object.go +++ b/coordinator/internal/mq/object.go @@ -1,4 +1,4 @@ -package services +package mq import ( "database/sql" diff --git a/coordinator/internal/services/package.go b/coordinator/internal/mq/package.go similarity index 99% rename from coordinator/internal/services/package.go rename to coordinator/internal/mq/package.go index b4822e9..d7bf332 100644 --- a/coordinator/internal/services/package.go +++ b/coordinator/internal/mq/package.go @@ -1,4 +1,4 @@ -package services +package mq import ( "database/sql" diff --git a/coordinator/internal/services/service.go b/coordinator/internal/mq/service.go similarity index 94% rename from coordinator/internal/services/service.go rename to coordinator/internal/mq/service.go index 7d7db1d..1b00486 100644 --- a/coordinator/internal/services/service.go +++ b/coordinator/internal/mq/service.go @@ -1,4 +1,4 @@ -package services +package mq import ( mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" diff --git a/coordinator/internal/services/storage.go b/coordinator/internal/mq/storage.go similarity index 98% rename from coordinator/internal/services/storage.go rename to coordinator/internal/mq/storage.go index abdaa6a..08fe3e4 100644 --- a/coordinator/internal/services/storage.go +++ b/coordinator/internal/mq/storage.go @@ -1,4 +1,4 @@ -package services +package mq import ( "database/sql" diff --git a/coordinator/main.go b/coordinator/main.go index ced3f98..182e847 100644 --- a/coordinator/main.go +++ b/coordinator/main.go @@ -9,7 +9,7 @@ import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" "gitlink.org.cn/cloudream/storage/coordinator/internal/config" - "gitlink.org.cn/cloudream/storage/coordinator/internal/services" + "gitlink.org.cn/cloudream/storage/coordinator/internal/mq" ) func main() { @@ -35,7 +35,7 @@ func main() { logger.Fatalf("new scanner client failed, err: %s", err.Error()) } - coorSvr, err := coormq.NewServer(services.NewService(db, scanner), &config.Cfg().RabbitMQ) + coorSvr, err := coormq.NewServer(mq.NewService(db, scanner), &config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new coordinator server failed, err: %s", err.Error()) } diff --git a/scanner/internal/services/event.go b/scanner/internal/mq/event.go similarity index 96% rename from scanner/internal/services/event.go rename to scanner/internal/mq/event.go index 9333c13..e6f83c0 100644 --- a/scanner/internal/services/event.go +++ b/scanner/internal/mq/event.go @@ -1,4 +1,4 @@ -package services +package mq import ( "gitlink.org.cn/cloudream/common/pkgs/logger" diff --git a/scanner/internal/services/service.go b/scanner/internal/mq/service.go similarity index 93% rename from scanner/internal/services/service.go rename to scanner/internal/mq/service.go index e87a8d2..0a234d3 100644 --- a/scanner/internal/services/service.go +++ b/scanner/internal/mq/service.go @@ -1,4 +1,4 @@ -package services +package mq import ( "gitlink.org.cn/cloudream/storage/scanner/internal/event" diff --git a/scanner/main.go b/scanner/main.go index a80d1b4..f3aee3d 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -12,7 +12,7 @@ import ( scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" "gitlink.org.cn/cloudream/storage/scanner/internal/config" "gitlink.org.cn/cloudream/storage/scanner/internal/event" - "gitlink.org.cn/cloudream/storage/scanner/internal/services" + "gitlink.org.cn/cloudream/storage/scanner/internal/mq" "gitlink.org.cn/cloudream/storage/scanner/internal/tickevent" ) @@ -49,7 +49,7 @@ func main() { eventExecutor := event.NewExecutor(db, distlockSvc) go serveEventExecutor(&eventExecutor, &wg) - agtSvr, err := scmq.NewServer(services.NewService(&eventExecutor), &config.Cfg().RabbitMQ) + agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), &config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new agent server failed, err: %s", err.Error()) }