Browse Source

拆分common库

pull/4/head
Sydonian 2 years ago
parent
commit
d18491cffe
8 changed files with 65 additions and 364 deletions
  1. +0
    -26
      consts/consts.go
  2. +1
    -1
      go.mod
  3. +1
    -37
      models/models.go
  4. +63
    -0
      pkg/mq/message_dispatcher.go
  5. +0
    -74
      utils/config.go
  6. +0
    -123
      utils/grpc/file_transport.go
  7. +0
    -82
      utils/ping.go
  8. +0
    -21
      utils/utils.go

+ 0
- 26
consts/consts.go View File

@@ -1,26 +0,0 @@
package consts

const (
IPFSStateOK = "OK"

StorageDirectoryStateOK = "OK"

NodeStateNormal = "Normal"
NodeStateUnavailable = "Unavailable"
)

const (
ObjectStateNormal = "Normal"
ObjectStateDeleted = "Deleted"
)

const (
StorageObjectStateNormal = "Normal"
StorageObjectStateDeleted = "Deleted"
StorageObjectStateOutdated = "Outdated"
)

const (
CacheStatePinned = "Pinned"
CacheStateTemp = "Temp"
)

+ 1
- 1
go.mod View File

@@ -1,6 +1,6 @@
module gitlink.org.cn/cloudream/common

go 1.18
go 1.20

require (
github.com/antonfisher/nested-logrus-formatter v1.3.1


+ 1
- 37
models/models.go View File

@@ -16,40 +16,4 @@ type RepRedundancyConfig struct {
}

type ECRedundancyConfig struct {
}

type RedundancyDataTypes interface{}
type RedundancyDataTypesConst interface {
RepRedundancyData | ECRedundancyData
}
type RepRedundancyData struct {
FileHash string `json:"fileHash"`
}

func NewRedundancyRepData(fileHash string) RepRedundancyData {
return RepRedundancyData{
FileHash: fileHash,
}
}

type ECRedundancyData struct {
Blocks []ObjectBlock `json:"blocks"`
}

func NewECRedundancyData(blocks []ObjectBlock) ECRedundancyData {
return ECRedundancyData{
Blocks: blocks,
}
}

type ObjectBlock struct {
Index int `json:"index"`
FileHash string `json:"fileHash"`
}

func NewObjectBlock(index int, fileHash string) ObjectBlock {
return ObjectBlock{
Index: index,
FileHash: fileHash,
}
}
}

+ 63
- 0
pkg/mq/message_dispatcher.go View File

@@ -0,0 +1,63 @@
package mq

import (
"fmt"

myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
)

type HandlerFn func(svcBase any, msg *Message) (*Message, error)

type MessageDispatcher struct {
Handlers map[myreflect.Type]HandlerFn
}

func NewMessageDispatcher() MessageDispatcher {
return MessageDispatcher{
Handlers: make(map[myreflect.Type]HandlerFn),
}
}

func (h *MessageDispatcher) Add(typ myreflect.Type, handler HandlerFn) {
h.Handlers[typ] = handler
}

func (h *MessageDispatcher) Handle(svcBase any, msg *Message) (*Message, error) {
typ := myreflect.TypeOfValue(msg.Body)
fn, ok := h.Handlers[typ]
if !ok {
return nil, fmt.Errorf("unsupported message type: %s", typ.Name())
}

return fn(svcBase, msg)
}

// 将Service中的一个接口函数作为指定类型消息的处理函数
func AddServiceFn[TSvc any, TReq any, TResp any](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg *TReq) (*TResp, *CodeMessage)) {
dispatcher.Add(myreflect.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) {

reqMsgBody := reqMsg.Body.(TReq)
ret, codeMsg := svcFn(svcBase.(TSvc), &reqMsgBody)

var body MessageBodyTypes
if ret != nil {
body = *ret
}

respMsg := MakeMessage(body)
respMsg.SetCodeMessage(codeMsg.Code, codeMsg.Message)

return &respMsg, nil
})
}

// 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数
func AddNoRespServiceFn[TSvc any, TReq any](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg *TReq)) {
dispatcher.Add(myreflect.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) {

reqMsgBody := reqMsg.Body.(TReq)
svcFn(svcBase.(TSvc), &reqMsgBody)

return nil, nil
})
}

+ 0
- 74
utils/config.go View File

@@ -1,74 +0,0 @@
package utils

import (
"fmt"
"regexp"
"strconv"

"github.com/beevik/etree"
)

type EcConfig struct {
ecid string `xml:"ecid"`
class string `xml:"class"`
n int `xml:"n"`
k int `xml:"k"`
w int `xml:"w"`
opt int `xml:"opt"`
}

func (r *EcConfig) GetK() int {
return r.k
}

func (r *EcConfig) GetN() int {
return r.n
}

func GetEcPolicy() *map[string]EcConfig {
doc := etree.NewDocument()
if err := doc.ReadFromFile("../conf/sysSetting.xml"); err != nil {
panic(err)
}
ecMap := make(map[string]EcConfig, 20)
root := doc.SelectElement("setting")
for _, attr := range root.SelectElements("attribute") {
if name := attr.SelectElement("name"); name.Text() == "ec.policy" {
for _, eci := range attr.SelectElements("value") {
tt := EcConfig{}
tt.ecid = eci.SelectElement("ecid").Text()
tt.class = eci.SelectElement("class").Text()
tt.n, _ = strconv.Atoi(eci.SelectElement("n").Text())
tt.k, _ = strconv.Atoi(eci.SelectElement("k").Text())
tt.w, _ = strconv.Atoi(eci.SelectElement("w").Text())
tt.opt, _ = strconv.Atoi(eci.SelectElement("opt").Text())
ecMap[tt.ecid] = tt
}
}
}
fmt.Println(ecMap)
return &ecMap
//
}

func GetAgentIps() []string {
doc := etree.NewDocument()
if err := doc.ReadFromFile("../conf/sysSetting.xml"); err != nil {
panic(err)
}
root := doc.SelectElement("setting")
var ips []string // 定义存储 IP 的字符串切片

for _, attr := range root.SelectElements("attribute") {
if name := attr.SelectElement("name"); name.Text() == "agents.addr" {
for _, ip := range attr.SelectElements("value") {
ipRegex := regexp.MustCompile(`\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b`)
match := ipRegex.FindString(ip.Text())
print(match)
ips = append(ips, match)
}
}
}

return ips
}

+ 0
- 123
utils/grpc/file_transport.go View File

@@ -1,123 +0,0 @@
package grpc

// TODO 拆分到存储服务的common包里去
/*
import (
"context"
"fmt"
"io"

myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/proto"
)

type fileReadCloser struct {
io.ReadCloser
stream proto.FileTransport_GetFileClient
cancelFn context.CancelFunc
readData []byte
}

func (s *fileReadCloser) Read(p []byte) (int, error) {

if s.readData == nil {
resp, err := s.stream.Recv()
if err != nil {
return 0, err
}

if resp.Type == proto.FileDataPacketType_Data {
s.readData = resp.Data

} else if resp.Type == proto.FileDataPacketType_EOF {
return 0, io.EOF

} else {
return 0, fmt.Errorf("unsuppoted packt type: %v", resp.Type)
}
}

cnt := copy(p, s.readData)

if len(s.readData) == cnt {
s.readData = nil
} else {
s.readData = s.readData[cnt:]
}

return cnt, nil
}

func (s *fileReadCloser) Close() error {
s.cancelFn()

return nil
}

func GetFileAsStream(client proto.FileTransportClient, fileHash string) (io.ReadCloser, error) {
ctx, cancel := context.WithCancel(context.Background())

stream, err := client.GetFile(ctx, &proto.GetReq{
FileHash: fileHash,
})
if err != nil {
cancel()
return nil, fmt.Errorf("request grpc failed, err: %w", err)
}

return &fileReadCloser{
stream: stream,
cancelFn: cancel,
}, nil
}

type fileWriteCloser struct {
myio.PromiseWriteCloser[string]
stream proto.FileTransport_SendFileClient
}

func (s *fileWriteCloser) Write(p []byte) (int, error) {
err := s.stream.Send(&proto.FileDataPacket{
Type: proto.FileDataPacketType_Data,
Data: p,
})

if err != nil {
return 0, err
}

return len(p), nil
}

func (s *fileWriteCloser) Abort(err error) {
s.stream.CloseSend()
}

func (s *fileWriteCloser) Finish() (string, error) {
err := s.stream.Send(&proto.FileDataPacket{
Type: proto.FileDataPacketType_EOF,
})

if err != nil {
return "", fmt.Errorf("send EOF packet failed, err: %w", err)
}

resp, err := s.stream.CloseAndRecv()
if err != nil {
return "", fmt.Errorf("receive response failed, err: %w", err)
}

return resp.FileHash, nil
}

func SendFileAsStream(client proto.FileTransportClient) (myio.PromiseWriteCloser[string], error) {
stream, err := client.SendFile(context.Background())
if err != nil {
return nil, err
}

return &fileWriteCloser{
stream: stream,
}, nil
}
*/

+ 0
- 82
utils/ping.go View File

@@ -1,82 +0,0 @@
package utils

import (
//"fmt"
"github.com/go-ping/ping"
//"net"
"io/ioutil"
"net/http"
"strings"
"time"
)

type ConnStatus struct {
Addr string
IsReachable bool
Delay time.Duration
TTL int
}

// 获取本地主机 IP 地址
func getLocalIP() string {
resp, err := http.Get("https://api.ipify.org")
if err != nil {
panic(err)
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
panic(err)
}

ip := strings.TrimSpace(string(body))
return ip
}

func GetConnStatus(remoteIP string) (*ConnStatus, error) {
// 本地主机 IP 地址
//localIP := getLocalIP()
//print("!@#@#!")
//print(localIP)
conn := ConnStatus{
Addr: remoteIP,
IsReachable: false,
}
pinger, err := ping.NewPinger(remoteIP)

if err != nil {
return nil, err
}
pinger.Count = 5 // 设置 ping 次数为 5
// pinger.Interval = 1 // 设置 ping 时间间隔为 1 秒
//pinger.Timeout = 2 // 设置 ping 超时时间为 2 秒
//pinger.SetPrivileged(true) // 设置使用特权模式以获取 TTL 值
pinger.OnRecv = func(pkt *ping.Packet) {
//fmt.Printf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v (DUP!)\n",
// pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.Ttl)
conn.TTL = pkt.Ttl
}

/*pinger.OnDuplicateRecv = func(pkt *ping.Packet) {
fmt.Printf("%d bytes from %s: icmp_seq=%d time=%v ttl=%v (DUP!)\n",
pkt.Nbytes, pkt.IPAddr, pkt.Seq, pkt.Rtt, pkt.Ttl)
}*/

pinger.OnFinish = func(stats *ping.Statistics) {
//fmt.Printf("\n--- %s ping statistics ---\n", stats.Addr)
//fmt.Printf("%d packets transmitted, %d packets received, %v%% packet loss\n",
// stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss)
//fmt.Printf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n",
// stats.MinRtt, stats.AvgRtt, stats.MaxRtt, stats.StdDevRtt)
if stats.PacketLoss == 0.0 {
conn.IsReachable = true
}
conn.Delay = stats.AvgRtt
}
err = pinger.Run() // Blocks until finished.
if err != nil {
return nil, err
}
return &conn, nil
}

+ 0
- 21
utils/utils.go View File

@@ -1,21 +0,0 @@
package utils

import (
"fmt"
"strings"
)

// MakeMoveOperationFileName Move操作时,写入的文件的名称
func MakeMoveOperationFileName(objectID int64, userID int64) string {
return fmt.Sprintf("%d-%d", objectID, userID)
}

// GetDirectoryName 根据objectName获取所属的文件夹名
func GetDirectoryName(objectName string) string {
parts := strings.Split(objectName, "/")
//若为文件,dirName设置为空
if len(parts) == 1 {
return ""
}
return parts[0]
}

Loading…
Cancel
Save