From d18491cffe2b37e6ef92b997e4212881368d1b05 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 14 Aug 2023 14:53:50 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8B=86=E5=88=86common=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consts/consts.go | 26 -------- go.mod | 2 +- models/models.go | 38 +---------- pkg/mq/message_dispatcher.go | 63 ++++++++++++++++++ utils/config.go | 74 --------------------- utils/grpc/file_transport.go | 123 ----------------------------------- utils/ping.go | 82 ----------------------- utils/utils.go | 21 ------ 8 files changed, 65 insertions(+), 364 deletions(-) delete mode 100644 consts/consts.go create mode 100644 pkg/mq/message_dispatcher.go delete mode 100644 utils/config.go delete mode 100644 utils/grpc/file_transport.go delete mode 100644 utils/ping.go delete mode 100644 utils/utils.go diff --git a/consts/consts.go b/consts/consts.go deleted file mode 100644 index dd9f224..0000000 --- a/consts/consts.go +++ /dev/null @@ -1,26 +0,0 @@ -package consts - -const ( - IPFSStateOK = "OK" - - StorageDirectoryStateOK = "OK" - - NodeStateNormal = "Normal" - NodeStateUnavailable = "Unavailable" -) - -const ( - ObjectStateNormal = "Normal" - ObjectStateDeleted = "Deleted" -) - -const ( - StorageObjectStateNormal = "Normal" - StorageObjectStateDeleted = "Deleted" - StorageObjectStateOutdated = "Outdated" -) - -const ( - CacheStatePinned = "Pinned" - CacheStateTemp = "Temp" -) diff --git a/go.mod b/go.mod index bdedf25..40b043b 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module gitlink.org.cn/cloudream/common -go 1.18 +go 1.20 require ( github.com/antonfisher/nested-logrus-formatter v1.3.1 diff --git a/models/models.go b/models/models.go index 10d8a61..ee6364d 100644 --- a/models/models.go +++ b/models/models.go @@ -16,40 +16,4 @@ type RepRedundancyConfig struct { } type ECRedundancyConfig struct { -} - -type RedundancyDataTypes interface{} -type RedundancyDataTypesConst interface { - RepRedundancyData | ECRedundancyData -} -type RepRedundancyData struct { - FileHash string `json:"fileHash"` -} - -func NewRedundancyRepData(fileHash string) RepRedundancyData { - return RepRedundancyData{ - FileHash: fileHash, - } -} - -type ECRedundancyData struct { - Blocks []ObjectBlock `json:"blocks"` -} - -func NewECRedundancyData(blocks []ObjectBlock) ECRedundancyData { - return ECRedundancyData{ - Blocks: blocks, - } -} - -type ObjectBlock struct { - Index int `json:"index"` - FileHash string `json:"fileHash"` -} - -func NewObjectBlock(index int, fileHash string) ObjectBlock { - return ObjectBlock{ - Index: index, - FileHash: fileHash, - } -} +} \ No newline at end of file diff --git a/pkg/mq/message_dispatcher.go b/pkg/mq/message_dispatcher.go new file mode 100644 index 0000000..0a78c6d --- /dev/null +++ b/pkg/mq/message_dispatcher.go @@ -0,0 +1,63 @@ +package mq + +import ( + "fmt" + + myreflect "gitlink.org.cn/cloudream/common/utils/reflect" +) + +type HandlerFn func(svcBase any, msg *Message) (*Message, error) + +type MessageDispatcher struct { + Handlers map[myreflect.Type]HandlerFn +} + +func NewMessageDispatcher() MessageDispatcher { + return MessageDispatcher{ + Handlers: make(map[myreflect.Type]HandlerFn), + } +} + +func (h *MessageDispatcher) Add(typ myreflect.Type, handler HandlerFn) { + h.Handlers[typ] = handler +} + +func (h *MessageDispatcher) Handle(svcBase any, msg *Message) (*Message, error) { + typ := myreflect.TypeOfValue(msg.Body) + fn, ok := h.Handlers[typ] + if !ok { + return nil, fmt.Errorf("unsupported message type: %s", typ.Name()) + } + + return fn(svcBase, msg) +} + +// 将Service中的一个接口函数作为指定类型消息的处理函数 +func AddServiceFn[TSvc any, TReq any, TResp any](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg *TReq) (*TResp, *CodeMessage)) { + dispatcher.Add(myreflect.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) { + + reqMsgBody := reqMsg.Body.(TReq) + ret, codeMsg := svcFn(svcBase.(TSvc), &reqMsgBody) + + var body MessageBodyTypes + if ret != nil { + body = *ret + } + + respMsg := MakeMessage(body) + respMsg.SetCodeMessage(codeMsg.Code, codeMsg.Message) + + return &respMsg, nil + }) +} + +// 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数 +func AddNoRespServiceFn[TSvc any, TReq any](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg *TReq)) { + dispatcher.Add(myreflect.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) { + + reqMsgBody := reqMsg.Body.(TReq) + svcFn(svcBase.(TSvc), &reqMsgBody) + + return nil, nil + }) +} diff --git a/utils/config.go b/utils/config.go deleted file mode 100644 index d69134e..0000000 --- a/utils/config.go +++ /dev/null @@ -1,74 +0,0 @@ -package utils - -import ( - "fmt" - "regexp" - "strconv" - - "github.com/beevik/etree" -) - -type EcConfig struct { - ecid string `xml:"ecid"` - class string `xml:"class"` - n int `xml:"n"` - k int `xml:"k"` - w int `xml:"w"` - opt int `xml:"opt"` -} - -func (r *EcConfig) GetK() int { - return r.k -} - -func (r *EcConfig) GetN() int { - return r.n -} - -func GetEcPolicy() *map[string]EcConfig { - doc := etree.NewDocument() - if err := doc.ReadFromFile("../conf/sysSetting.xml"); err != nil { - panic(err) - } - ecMap := make(map[string]EcConfig, 20) - root := doc.SelectElement("setting") - for _, attr := range root.SelectElements("attribute") { - if name := attr.SelectElement("name"); name.Text() == "ec.policy" { - for _, eci := range attr.SelectElements("value") { - tt := EcConfig{} - tt.ecid = eci.SelectElement("ecid").Text() - tt.class = eci.SelectElement("class").Text() - tt.n, _ = strconv.Atoi(eci.SelectElement("n").Text()) - tt.k, _ = strconv.Atoi(eci.SelectElement("k").Text()) - tt.w, _ = strconv.Atoi(eci.SelectElement("w").Text()) - tt.opt, _ = strconv.Atoi(eci.SelectElement("opt").Text()) - ecMap[tt.ecid] = tt - } - } - } - fmt.Println(ecMap) - return &ecMap - // -} - -func GetAgentIps() []string { - doc := etree.NewDocument() - if err := doc.ReadFromFile("../conf/sysSetting.xml"); err != nil { - panic(err) - } - root := doc.SelectElement("setting") - var ips []string // 定义存储 IP 的字符串切片 - - for _, attr := range root.SelectElements("attribute") { - if name := attr.SelectElement("name"); name.Text() == "agents.addr" { - for _, ip := range attr.SelectElements("value") { - ipRegex := regexp.MustCompile(`\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b`) - match := ipRegex.FindString(ip.Text()) - print(match) - ips = append(ips, match) - } - } - } - - return ips -} diff --git a/utils/grpc/file_transport.go b/utils/grpc/file_transport.go deleted file mode 100644 index e43d986..0000000 --- a/utils/grpc/file_transport.go +++ /dev/null @@ -1,123 +0,0 @@ -package grpc - -// TODO 拆分到存储服务的common包里去 -/* -import ( - "context" - "fmt" - "io" - - myio "gitlink.org.cn/cloudream/common/utils/io" - "gitlink.org.cn/cloudream/proto" -) - -type fileReadCloser struct { - io.ReadCloser - stream proto.FileTransport_GetFileClient - cancelFn context.CancelFunc - readData []byte -} - -func (s *fileReadCloser) Read(p []byte) (int, error) { - - if s.readData == nil { - resp, err := s.stream.Recv() - if err != nil { - return 0, err - } - - if resp.Type == proto.FileDataPacketType_Data { - s.readData = resp.Data - - } else if resp.Type == proto.FileDataPacketType_EOF { - return 0, io.EOF - - } else { - return 0, fmt.Errorf("unsuppoted packt type: %v", resp.Type) - } - } - - cnt := copy(p, s.readData) - - if len(s.readData) == cnt { - s.readData = nil - } else { - s.readData = s.readData[cnt:] - } - - return cnt, nil -} - -func (s *fileReadCloser) Close() error { - s.cancelFn() - - return nil -} - -func GetFileAsStream(client proto.FileTransportClient, fileHash string) (io.ReadCloser, error) { - ctx, cancel := context.WithCancel(context.Background()) - - stream, err := client.GetFile(ctx, &proto.GetReq{ - FileHash: fileHash, - }) - if err != nil { - cancel() - return nil, fmt.Errorf("request grpc failed, err: %w", err) - } - - return &fileReadCloser{ - stream: stream, - cancelFn: cancel, - }, nil -} - -type fileWriteCloser struct { - myio.PromiseWriteCloser[string] - stream proto.FileTransport_SendFileClient -} - -func (s *fileWriteCloser) Write(p []byte) (int, error) { - err := s.stream.Send(&proto.FileDataPacket{ - Type: proto.FileDataPacketType_Data, - Data: p, - }) - - if err != nil { - return 0, err - } - - return len(p), nil -} - -func (s *fileWriteCloser) Abort(err error) { - s.stream.CloseSend() -} - -func (s *fileWriteCloser) Finish() (string, error) { - err := s.stream.Send(&proto.FileDataPacket{ - Type: proto.FileDataPacketType_EOF, - }) - - if err != nil { - return "", fmt.Errorf("send EOF packet failed, err: %w", err) - } - - resp, err := s.stream.CloseAndRecv() - if err != nil { - return "", fmt.Errorf("receive response failed, err: %w", err) - } - - return resp.FileHash, nil -} - -func SendFileAsStream(client proto.FileTransportClient) (myio.PromiseWriteCloser[string], error) { - stream, err := client.SendFile(context.Background()) - if err != nil { - return nil, err - } - - return &fileWriteCloser{ - stream: stream, - }, nil -} -*/ diff --git a/utils/ping.go b/utils/ping.go deleted file mode 100644 index 85532d3..0000000 --- a/utils/ping.go +++ /dev/null @@ -1,82 +0,0 @@ -package utils - -import ( - //"fmt" - "github.com/go-ping/ping" - //"net" - "io/ioutil" - "net/http" - "strings" - "time" -) - -type ConnStatus struct { - Addr string - IsReachable bool - Delay time.Duration - TTL int -} - -// 获取本地主机 IP 地址 -func getLocalIP() string { - resp, err := http.Get("https://api.ipify.org") - if err != nil { - panic(err) - } - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - panic(err) - } - - ip := strings.TrimSpace(string(body)) - return ip -} - -func GetConnStatus(remoteIP string) (*ConnStatus, error) { - // 本地主机 IP 地址 - //localIP := getLocalIP() - //print("!@#@#!") - //print(localIP) - conn := ConnStatus{ - Addr: remoteIP, - IsReachable: false, - } - pinger, err := ping.NewPinger(remoteIP) - - if err != nil { - return nil, err - } - pinger.Count = 5 // 设置 ping 次数为 5 - // pinger.Interval = 1 // 设置 ping 时间间隔为 1 秒 - //pinger.Timeout = 2 // 设置 ping 超时时间为 2 秒 - //pinger.SetPrivileged(true) // 设置使用特权模式以获取 TTL 值 - pinger.OnRecv = func(pkt *ping.Packet) { - //fmt.Printf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v (DUP!)\n", - // pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.Ttl) - conn.TTL = pkt.Ttl - } - - /*pinger.OnDuplicateRecv = func(pkt *ping.Packet) { - fmt.Printf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v (DUP!)\n", - pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.Ttl) - }*/ - - pinger.OnFinish = func(stats *ping.Statistics) { - //fmt.Printf("\n--- %s ping statistics ---\n", stats.Addr) - //fmt.Printf("%d packets transmitted, %d packets received, %v%% packet loss\n", - // stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss) - //fmt.Printf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n", - // stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt) - if stats.PacketLoss == 0.0 { - conn.IsReachable = true - } - conn.Delay = stats.AvgRtt - } - err = pinger.Run() // Blocks until finished. - if err != nil { - return nil, err - } - return &conn, nil -} diff --git a/utils/utils.go b/utils/utils.go deleted file mode 100644 index 1717250..0000000 --- a/utils/utils.go +++ /dev/null @@ -1,21 +0,0 @@ -package utils - -import ( - "fmt" - "strings" -) - -// MakeMoveOperationFileName Move操作时,写入的文件的名称 -func MakeMoveOperationFileName(objectID int64, userID int64) string { - return fmt.Sprintf("%d-%d", objectID, userID) -} - -// GetDirectoryName 根据objectName获取所属的文件夹名 -func GetDirectoryName(objectName string) string { - parts := strings.Split(objectName, "/") - //若为文件,dirName设置为空 - if len(parts) == 1 { - return "" - } - return parts[0] -}