Browse Source

增加检测节点间延迟的机制

gitlink
Sydonian 1 year ago
parent
commit
2166767aae
23 changed files with 660 additions and 64 deletions
  1. +2
    -0
      agent/internal/config/config.go
  2. +11
    -0
      agent/internal/grpc/ping.go
  3. +2
    -1
      agent/internal/task/create_package.go
  4. +8
    -5
      agent/internal/task/task.go
  5. +39
    -3
      agent/main.go
  6. +8
    -6
      client/internal/config/config.go
  7. +6
    -3
      client/internal/task/task.go
  8. +2
    -1
      client/internal/task/upload_objects.go
  9. +6
    -1
      client/main.go
  10. +3
    -0
      common/assets/confs/agent.config.json
  11. +3
    -0
      common/assets/confs/client.config.json
  12. +7
    -6
      common/assets/scripts/create_database.sql
  13. +23
    -3
      common/pkgs/cmd/upload_objects.go
  14. +198
    -0
      common/pkgs/connectivity/collector.go
  15. +5
    -0
      common/pkgs/connectivity/config.go
  16. +2
    -6
      common/pkgs/db/model/model.go
  17. +32
    -0
      common/pkgs/db/node_connectivity.go
  18. +147
    -28
      common/pkgs/grpc/agent/agent.pb.go
  19. +7
    -0
      common/pkgs/grpc/agent/agent.proto
  20. +39
    -1
      common/pkgs/grpc/agent/agent_grpc.pb.go
  21. +8
    -0
      common/pkgs/grpc/agent/client.go
  22. +53
    -0
      common/pkgs/mq/coordinator/node.go
  23. +49
    -0
      coordinator/internal/mq/node.go

+ 2
- 0
agent/internal/config/config.go View File

@@ -6,6 +6,7 @@ import (
log "gitlink.org.cn/cloudream/common/pkgs/logger"
c "gitlink.org.cn/cloudream/common/utils/config"
stgmodels "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/grpc"
stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
)
@@ -19,6 +20,7 @@ type Config struct {
RabbitMQ stgmq.Config `json:"rabbitMQ"`
IPFS ipfs.Config `json:"ipfs"`
DistLock distlock.Config `json:"distlock"`
Connectivity connectivity.Config `json:"connectivity"`
}

var cfg Config


+ 11
- 0
agent/internal/grpc/ping.go View File

@@ -0,0 +1,11 @@
package grpc

import (
"context"

agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
)

func (s *Service) Ping(context.Context, *agtrpc.PingReq) (*agtrpc.PingResp, error) {
return &agtrpc.PingResp{}, nil
}

+ 2
- 1
agent/internal/task/create_package.go View File

@@ -64,7 +64,8 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c
}

uploadRet, err := cmd.NewUploadObjects(t.userID, createResp.PackageID, t.objIter, t.nodeAffinity).Execute(&cmd.UploadObjectsContext{
Distlock: ctx.distlock,
Distlock: ctx.distlock,
Connectivity: ctx.connectivity,
})
if err != nil {
err = fmt.Errorf("uploading objects: %w", err)


+ 8
- 5
agent/internal/task/task.go View File

@@ -3,12 +3,14 @@ package task
import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type TaskContext struct {
distlock *distlock.Service
sw *ioswitch.Switch
distlock *distlock.Service
sw *ioswitch.Switch
connectivity *connectivity.Collector
}

// 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用,
@@ -23,9 +25,10 @@ type Task = task.Task[TaskContext]

type CompleteOption = task.CompleteOption

func NewManager(distlock *distlock.Service, sw *ioswitch.Switch) Manager {
func NewManager(distlock *distlock.Service, sw *ioswitch.Switch, connectivity *connectivity.Collector) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
sw: sw,
distlock: distlock,
sw: sw,
connectivity: connectivity,
})
}

+ 39
- 3
agent/main.go View File

@@ -7,9 +7,11 @@ import (
"sync"

log "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/agent/internal/config"
"gitlink.org.cn/cloudream/storage/agent/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
@@ -20,6 +22,7 @@ import (
"google.golang.org/grpc"

agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"

grpcsvc "gitlink.org.cn/cloudream/storage/agent/internal/grpc"
cmdsvc "gitlink.org.cn/cloudream/storage/agent/internal/mq"
@@ -49,6 +52,41 @@ func main() {
stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{})
stgglb.InitIPFSPool(&config.Cfg().IPFS)

// 启动网络连通性检测,并就地检测一次
conCol := connectivity.NewCollector(&config.Cfg().Connectivity, func(collector *connectivity.Collector) {
log := log.WithField("Connectivity", "")

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
log.Warnf("acquire coordinator mq failed, err: %s", err.Error())
return
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

cons := collector.GetAll()
nodeCons := make([]cdssdk.NodeConnectivity, 0, len(cons))
for _, con := range cons {
var delay *float32
if con.Delay != nil {
v := float32(con.Delay.Microseconds()) / 1000
delay = &v
}

nodeCons = append(nodeCons, cdssdk.NodeConnectivity{
FromNodeID: *stgglb.Local.NodeID,
ToNodeID: con.ToNodeID,
Delay: delay,
TestTime: con.TestTime,
})
}

_, err = coorCli.UpdateNodeConnectivities(coormq.ReqUpdateNodeConnectivities(nodeCons))
if err != nil {
log.Warnf("update node connectivities: %v", err)
}
})
conCol.CollectInPlace()

distlock, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil {
log.Fatalf("new ipfs failed, err: %s", err.Error())
@@ -60,7 +98,7 @@ func main() {
wg := sync.WaitGroup{}
wg.Add(4)

taskMgr := task.NewManager(distlock, &sw)
taskMgr := task.NewManager(distlock, &sw, &conCol)

// 启动命令服务器
// TODO 需要设计AgentID持久化机制
@@ -74,8 +112,6 @@ func main() {

go serveAgentServer(agtSvr, &wg)

// go reportStatus(&wg) //网络延迟感知

//面向客户端收发数据
listenAddr := config.Cfg().GRPC.MakeListenAddress()
lis, err := net.Listen("tcp", listenAddr)


+ 8
- 6
client/internal/config/config.go View File

@@ -6,17 +6,19 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/config"
stgmodels "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
)

type Config struct {
Local stgmodels.LocalMachineInfo `json:"local"`
AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"`
Logger logger.Config `json:"logger"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon
DistLock distlock.Config `json:"distlock"`
Local stgmodels.LocalMachineInfo `json:"local"`
AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"`
Logger logger.Config `json:"logger"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon
DistLock distlock.Config `json:"distlock"`
Connectivity connectivity.Config `json:"connectivity"`
}

var cfg Config


+ 6
- 3
client/internal/task/task.go View File

@@ -3,10 +3,12 @@ package task
import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
)

type TaskContext struct {
distlock *distlock.Service
distlock *distlock.Service
connectivity *connectivity.Collector
}

// 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用,
@@ -21,8 +23,9 @@ type Task = task.Task[TaskContext]

type CompleteOption = task.CompleteOption

func NewManager(distlock *distlock.Service) Manager {
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
distlock: distlock,
connectivity: connectivity,
})
}

+ 2
- 1
client/internal/task/upload_objects.go View File

@@ -25,7 +25,8 @@ func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIt

func (t *UploadObjects) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
ret, err := t.cmd.Execute(&cmd.UploadObjectsContext{
Distlock: ctx.distlock,
Distlock: ctx.distlock,
Connectivity: ctx.connectivity,
})

t.Result = ret


+ 6
- 1
client/main.go View File

@@ -12,6 +12,7 @@ import (
"gitlink.org.cn/cloudream/storage/client/internal/services"
"gitlink.org.cn/cloudream/storage/client/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
)

@@ -37,6 +38,10 @@ func main() {
stgglb.InitIPFSPool(config.Cfg().IPFS)
}

// 启动网络连通性检测,并就地检测一次
conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil)
conCol.CollectInPlace()

distlockSvc, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil {
logger.Warnf("new distlock service failed, err: %s", err.Error())
@@ -44,7 +49,7 @@ func main() {
}
go serveDistLock(distlockSvc)

taskMgr := task.NewManager(distlockSvc)
taskMgr := task.NewManager(distlockSvc, &conCol)

svc, err := services.NewService(distlockSvc, &taskMgr)
if err != nil {


+ 3
- 0
common/assets/confs/agent.config.json View File

@@ -32,5 +32,8 @@
"etcdLockLeaseTimeSec": 5,
"randomReleasingDelayMs": 3000,
"serviceDescription": "I am a agent"
},
"connectivity": {
"testInterval": 300
}
}

+ 3
- 0
common/assets/confs/client.config.json View File

@@ -25,5 +25,8 @@
"etcdLockLeaseTimeSec": 5,
"randomReleasingDelayMs": 3000,
"serviceDescription": "I am a client"
},
"connectivity": {
"testInterval": 300
}
}

+ 7
- 6
common/assets/scripts/create_database.sql View File

@@ -52,12 +52,13 @@ insert into
values
(1, "HuaWei-Cloud", 1, "/", "Online");

create table NodeDelay (
SourceNodeID int not null comment '发起检测的节点ID',
DestinationNodeID int not null comment '被检测节点的ID',
DelayInMs int not null comment '发起节点与被检测节点间延迟(毫秒)',
primary key(SourceNodeID, DestinationNodeID)
) comment = '节点延迟表';
create table NodeConnectivity (
FromNodeID int not null comment '发起检测的节点ID',
ToNodeID int not null comment '被检测节点的ID',
Delay float comment '发起节点与被检测节点间延迟(毫秒),为null代表节点不可达',
TestTime timestamp comment '进行连通性测试的时间',
primary key(FromNodeID, ToNodeID)
) comment = '节点连通性表';

create table User (
UserID int not null primary key comment '用户ID',


+ 23
- 3
common/pkgs/cmd/upload_objects.go View File

@@ -3,6 +3,7 @@ package cmd
import (
"fmt"
"io"
"math"
"math/rand"
"time"

@@ -11,8 +12,10 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/sort2"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
@@ -39,11 +42,13 @@ type ObjectUploadResult struct {

type UploadNodeInfo struct {
Node cdssdk.Node
Delay time.Duration
IsSameLocation bool
}

type UploadObjectsContext struct {
Distlock *distlock.Service
Distlock *distlock.Service
Connectivity *connectivity.Collector
}

func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *UploadObjects {
@@ -68,12 +73,24 @@ func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult
return nil, fmt.Errorf("getting user nodes: %w", err)
}

cons := ctx.Connectivity.GetAll()
userNodes := lo.Map(getUserNodesResp.Nodes, func(node cdssdk.Node, index int) UploadNodeInfo {
delay := time.Duration(math.MaxInt64)

con, ok := cons[node.NodeID]
if ok && con.Delay != nil {
delay = *con.Delay
}

return UploadNodeInfo{
Node: node,
Delay: delay,
IsSameLocation: node.LocationID == stgglb.Local.LocationID,
}
})
if len(userNodes) == 0 {
return nil, fmt.Errorf("user no available nodes")
}

// 给上传节点的IPFS加锁
ipfsReqBlder := reqbuilder.NewBuilder()
@@ -109,7 +126,7 @@ func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult
// chooseUploadNode 选择一个上传文件的节点
// 1. 选择设置了亲和性的节点
// 2. 从与当前客户端相同地域的节点中随机选一个
// 3. 没有用的话从所有节点中随机选一个
// 3. 没有的话从所有节点选择延迟最低的节点
func chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) UploadNodeInfo {
if nodeAffinity != nil {
aff, ok := lo.Find(nodes, func(node UploadNodeInfo) bool { return node.Node.NodeID == *nodeAffinity })
@@ -123,7 +140,10 @@ func chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) Uploa
return sameLocationNodes[rand.Intn(len(sameLocationNodes))]
}

return nodes[rand.Intn(len(nodes))]
// 选择延迟最低的节点
nodes = sort2.Sort(nodes, func(e1, e2 UploadNodeInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) })

return nodes[0]
}

func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, userNodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) ([]ObjectUploadResult, error) {


+ 198
- 0
common/pkgs/connectivity/collector.go View File

@@ -0,0 +1,198 @@
package connectivity

import (
"math/rand"
"sync"
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type Connectivity struct {
ToNodeID cdssdk.NodeID
Delay *time.Duration
TestTime time.Time
}

type Collector struct {
cfg *Config
onCollected func(collector *Collector)
collectNow chan any
close chan any
connectivities map[cdssdk.NodeID]Connectivity
lock *sync.RWMutex
}

func NewCollector(cfg *Config, onCollected func(collector *Collector)) Collector {
rpt := Collector{
cfg: cfg,
collectNow: make(chan any),
close: make(chan any),
connectivities: make(map[cdssdk.NodeID]Connectivity),
lock: &sync.RWMutex{},
onCollected: onCollected,
}
go rpt.serve()
return rpt
}

func (r *Collector) Get(nodeID cdssdk.NodeID) *Connectivity {
r.lock.RLock()
defer r.lock.RUnlock()

con, ok := r.connectivities[nodeID]
if ok {
return &con
}

return nil
}
func (r *Collector) GetAll() map[cdssdk.NodeID]Connectivity {
r.lock.RLock()
defer r.lock.RUnlock()

ret := make(map[cdssdk.NodeID]Connectivity)
for k, v := range r.connectivities {
ret[k] = v
}

return ret
}

// 启动一次收集
func (r *Collector) CollecNow() {
select {
case r.collectNow <- nil:
default:
}
}

// 就地进行收集,会阻塞当前线程
func (r *Collector) CollectInPlace() {
r.testing()
}

func (r *Collector) Close() {
select {
case r.close <- nil:
default:
}
}

func (r *Collector) serve() {
log := logger.WithType[Collector]("")
log.Info("start connectivity reporter")

// 为了防止同时启动的节点会集中进行Ping,所以第一次上报间隔为0-TestInterval秒之间随机
startup := true
firstReportDelay := time.Duration(float64(r.cfg.TestInterval) * float64(time.Second) * rand.Float64())
ticker := time.NewTicker(firstReportDelay)

loop:
for {
select {
case <-ticker.C:
r.testing()
if startup {
startup = false
ticker.Reset(time.Duration(r.cfg.TestInterval) * time.Second)
}

case <-r.collectNow:
r.testing()

case <-r.close:
ticker.Stop()
break loop
}
}

log.Info("stop connectivity reporter")
}

func (r *Collector) testing() {
log := logger.WithType[Collector]("")
log.Debug("do testing")

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getNodeResp, err := coorCli.GetNodes(coormq.NewGetNodes(nil))
if err != nil {
return
}

wg := sync.WaitGroup{}
cons := make([]Connectivity, len(getNodeResp.Nodes))
for i, node := range getNodeResp.Nodes {
tmpIdx := i
tmpNode := node

wg.Add(1)
go func() {
defer wg.Done()
cons[tmpIdx] = r.ping(tmpNode)
}()
}

wg.Wait()

r.lock.Lock()
// 删除所有node的记录,然后重建,避免node数量变化时导致残余数据
r.connectivities = make(map[cdssdk.NodeID]Connectivity)
for _, con := range cons {
r.connectivities[con.ToNodeID] = con
}
r.lock.Unlock()

if r.onCollected != nil {
r.onCollected(r)
}
}

func (r *Collector) ping(node cdssdk.Node) Connectivity {
log := logger.WithType[Collector]("").WithField("NodeID", node.NodeID)

ip := node.ExternalIP
port := node.ExternalGRPCPort
if node.LocationID == stgglb.Local.LocationID {
ip = node.LocalIP
port = node.LocalGRPCPort
}

agtCli, err := stgglb.AgentRPCPool.Acquire(ip, port)
if err != nil {
log.Warnf("new agent %v:%v rpc client: %w", ip, port, err)
return Connectivity{
ToNodeID: node.NodeID,
Delay: nil,
TestTime: time.Now(),
}
}
defer stgglb.AgentRPCPool.Release(agtCli)

start := time.Now()
err = agtCli.Ping(*stgglb.Local.NodeID)
if err != nil {
log.Warnf("ping: %v", err)
return Connectivity{
ToNodeID: node.NodeID,
Delay: nil,
TestTime: time.Now(),
}
}

// 此时间差为一个来回的时间,因此单程延迟需要除以2
delay := time.Since(start) / 2
return Connectivity{
ToNodeID: node.NodeID,
Delay: &delay,
TestTime: time.Now(),
}
}

+ 5
- 0
common/pkgs/connectivity/config.go View File

@@ -0,0 +1,5 @@
package connectivity

type Config struct {
TestInterval int `json:"testInterval"` // 进行测试的间隔
}

+ 2
- 6
common/pkgs/db/model/model.go View File

@@ -20,12 +20,6 @@ type Storage struct {
State string `db:"State" json:"state"`
}

type NodeDelay struct {
SourceNodeID int64 `db:"SourceNodeID"`
DestinationNodeID int64 `db:"DestinationNodeID"`
DelayInMs int `db:"DelayInMs"`
}

type User struct {
UserID cdssdk.UserID `db:"UserID" json:"userID"`
Password string `db:"PassWord" json:"password"`
@@ -52,6 +46,8 @@ type Package = cdssdk.Package

type Object = cdssdk.Object

type NodeConnectivity = cdssdk.NodeConnectivity

// 由于Object的Redundancy字段是interface,所以不能直接将查询结果scan成Object,必须先scan成TempObject,
// 再.ToObject()转成Object
type TempObject struct {


+ 32
- 0
common/pkgs/db/node_connectivity.go View File

@@ -0,0 +1,32 @@
package db

import (
"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type NodeConnectivityDB struct {
*DB
}

func (db *DB) NodeConnectivity() *NodeConnectivityDB {
return &NodeConnectivityDB{DB: db}
}

func (db *NodeConnectivityDB) BatchGetByFromNode(ctx SQLContext, nodeIDs []cdssdk.NodeID) ([]model.NodeConnectivity, error) {
var ret []model.NodeConnectivity

sql, args, err := sqlx.In("select * from NodeConnectivity where NodeID in (?)", nodeIDs)
if err != nil {
return nil, err
}

return ret, sqlx.Select(ctx, &ret, sql, args...)
}

func (db *NodeConnectivityDB) BatchUpdateOrCreate(ctx SQLContext, cons []model.NodeConnectivity) error {
return BatchNamedExec(ctx,
"insert into NodeConnectivity(FromNodeID, ToNodeID, Delay, TestTime) values(:FromNodeID, :ToNodeID, :Delay, :TestTime) as new"+
" on duplicate key update Delay = new.Delay, TestTime = new.TestTime", 4, cons, nil)
}

+ 147
- 28
common/pkgs/grpc/agent/agent.pb.go View File

@@ -386,6 +386,91 @@ func (x *FetchStreamReq) GetStreamID() string {
return ""
}

type PingReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

FromNodeID int64 `protobuf:"varint,1,opt,name=FromNodeID,proto3" json:"FromNodeID,omitempty"`
}

func (x *PingReq) Reset() {
*x = PingReq{}
if protoimpl.UnsafeEnabled {
mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *PingReq) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*PingReq) ProtoMessage() {}

func (x *PingReq) ProtoReflect() protoreflect.Message {
mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[6]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use PingReq.ProtoReflect.Descriptor instead.
func (*PingReq) Descriptor() ([]byte, []int) {
return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{6}
}

func (x *PingReq) GetFromNodeID() int64 {
if x != nil {
return x.FromNodeID
}
return 0
}

type PingResp struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}

func (x *PingResp) Reset() {
*x = PingResp{}
if protoimpl.UnsafeEnabled {
mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *PingResp) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*PingResp) ProtoMessage() {}

func (x *PingResp) ProtoReflect() protoreflect.Message {
mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[7]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use PingResp.ProtoReflect.Descriptor instead.
func (*PingResp) Descriptor() ([]byte, []int) {
return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{7}
}

var File_pkgs_grpc_agent_agent_proto protoreflect.FileDescriptor

var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{
@@ -415,26 +500,32 @@ var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{
0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e,
0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44,
0x12, 0x1a, 0x0a, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01,
0x28, 0x09, 0x52, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x2a, 0x37, 0x0a, 0x14,
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74,
0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, 0x46, 0x10, 0x00, 0x12, 0x08, 0x0a,
0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x41,
0x72, 0x67, 0x73, 0x10, 0x02, 0x32, 0xe1, 0x01, 0x0a, 0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12,
0x36, 0x0a, 0x0c, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12,
0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74,
0x1a, 0x11, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52,
0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x33, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x49, 0x50,
0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53,
0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61,
0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, 0x12, 0x34, 0x0a, 0x0a,
0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x11, 0x2e, 0x53, 0x74, 0x72,
0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x0f, 0x2e,
0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00,
0x28, 0x01, 0x12, 0x35, 0x0a, 0x0b, 0x46, 0x65, 0x74, 0x63, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x12, 0x0f, 0x2e, 0x46, 0x65, 0x74, 0x63, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52,
0x65, 0x71, 0x1a, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50,
0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x61,
0x67, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x28, 0x09, 0x52, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x22, 0x29, 0x0a, 0x07,
0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x72, 0x6f, 0x6d, 0x4e,
0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x46, 0x72, 0x6f,
0x6d, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x22, 0x0a, 0x0a, 0x08, 0x50, 0x69, 0x6e, 0x67, 0x52,
0x65, 0x73, 0x70, 0x2a, 0x37, 0x0a, 0x14, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74,
0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x45,
0x4f, 0x46, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x01, 0x12, 0x0c,
0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x41, 0x72, 0x67, 0x73, 0x10, 0x02, 0x32, 0x80, 0x02, 0x0a,
0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x36, 0x0a, 0x0c, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50,
0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74,
0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x11, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50,
0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x33,
0x0a, 0x0b, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e,
0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x1a, 0x0f,
0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22,
0x00, 0x30, 0x01, 0x12, 0x34, 0x0a, 0x0a, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x12, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61,
0x63, 0x6b, 0x65, 0x74, 0x1a, 0x0f, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x35, 0x0a, 0x0b, 0x46, 0x65, 0x74,
0x63, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x0f, 0x2e, 0x46, 0x65, 0x74, 0x63, 0x68,
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01,
0x12, 0x1d, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x08, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52,
0x65, 0x71, 0x1a, 0x09, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x42,
0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}

var (
@@ -450,7 +541,7 @@ func file_pkgs_grpc_agent_agent_proto_rawDescGZIP() []byte {
}

var file_pkgs_grpc_agent_agent_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_pkgs_grpc_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_pkgs_grpc_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 8)
var file_pkgs_grpc_agent_agent_proto_goTypes = []interface{}{
(StreamDataPacketType)(0), // 0: StreamDataPacketType
(*FileDataPacket)(nil), // 1: FileDataPacket
@@ -459,6 +550,8 @@ var file_pkgs_grpc_agent_agent_proto_goTypes = []interface{}{
(*StreamDataPacket)(nil), // 4: StreamDataPacket
(*SendStreamResp)(nil), // 5: SendStreamResp
(*FetchStreamReq)(nil), // 6: FetchStreamReq
(*PingReq)(nil), // 7: PingReq
(*PingResp)(nil), // 8: PingResp
}
var file_pkgs_grpc_agent_agent_proto_depIdxs = []int32{
0, // 0: FileDataPacket.Type:type_name -> StreamDataPacketType
@@ -467,12 +560,14 @@ var file_pkgs_grpc_agent_agent_proto_depIdxs = []int32{
3, // 3: Agent.GetIPFSFile:input_type -> GetIPFSFileReq
4, // 4: Agent.SendStream:input_type -> StreamDataPacket
6, // 5: Agent.FetchStream:input_type -> FetchStreamReq
2, // 6: Agent.SendIPFSFile:output_type -> SendIPFSFileResp
1, // 7: Agent.GetIPFSFile:output_type -> FileDataPacket
5, // 8: Agent.SendStream:output_type -> SendStreamResp
4, // 9: Agent.FetchStream:output_type -> StreamDataPacket
6, // [6:10] is the sub-list for method output_type
2, // [2:6] is the sub-list for method input_type
7, // 6: Agent.Ping:input_type -> PingReq
2, // 7: Agent.SendIPFSFile:output_type -> SendIPFSFileResp
1, // 8: Agent.GetIPFSFile:output_type -> FileDataPacket
5, // 9: Agent.SendStream:output_type -> SendStreamResp
4, // 10: Agent.FetchStream:output_type -> StreamDataPacket
8, // 11: Agent.Ping:output_type -> PingResp
7, // [7:12] is the sub-list for method output_type
2, // [2:7] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
@@ -556,6 +651,30 @@ func file_pkgs_grpc_agent_agent_proto_init() {
return nil
}
}
file_pkgs_grpc_agent_agent_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PingReq); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pkgs_grpc_agent_agent_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PingResp); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -563,7 +682,7 @@ func file_pkgs_grpc_agent_agent_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pkgs_grpc_agent_agent_proto_rawDesc,
NumEnums: 1,
NumMessages: 6,
NumMessages: 8,
NumExtensions: 0,
NumServices: 1,
},


+ 7
- 0
common/pkgs/grpc/agent/agent.proto View File

@@ -40,11 +40,18 @@ message FetchStreamReq {
string StreamID = 2;
}

message PingReq {
int64 FromNodeID = 1;
}
message PingResp {}

service Agent {
rpc SendIPFSFile(stream FileDataPacket)returns(SendIPFSFileResp){}
rpc GetIPFSFile(GetIPFSFileReq)returns(stream FileDataPacket){}

rpc SendStream(stream StreamDataPacket)returns(SendStreamResp){}
rpc FetchStream(FetchStreamReq)returns(stream StreamDataPacket){}

rpc Ping(PingReq) returns(PingResp){}
}


+ 39
- 1
common/pkgs/grpc/agent/agent_grpc.pb.go View File

@@ -25,6 +25,7 @@ const (
Agent_GetIPFSFile_FullMethodName = "/Agent/GetIPFSFile"
Agent_SendStream_FullMethodName = "/Agent/SendStream"
Agent_FetchStream_FullMethodName = "/Agent/FetchStream"
Agent_Ping_FullMethodName = "/Agent/Ping"
)

// AgentClient is the client API for Agent service.
@@ -35,6 +36,7 @@ type AgentClient interface {
GetIPFSFile(ctx context.Context, in *GetIPFSFileReq, opts ...grpc.CallOption) (Agent_GetIPFSFileClient, error)
SendStream(ctx context.Context, opts ...grpc.CallOption) (Agent_SendStreamClient, error)
FetchStream(ctx context.Context, in *FetchStreamReq, opts ...grpc.CallOption) (Agent_FetchStreamClient, error)
Ping(ctx context.Context, in *PingReq, opts ...grpc.CallOption) (*PingResp, error)
}

type agentClient struct {
@@ -177,6 +179,15 @@ func (x *agentFetchStreamClient) Recv() (*StreamDataPacket, error) {
return m, nil
}

func (c *agentClient) Ping(ctx context.Context, in *PingReq, opts ...grpc.CallOption) (*PingResp, error) {
out := new(PingResp)
err := c.cc.Invoke(ctx, Agent_Ping_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

// AgentServer is the server API for Agent service.
// All implementations must embed UnimplementedAgentServer
// for forward compatibility
@@ -185,6 +196,7 @@ type AgentServer interface {
GetIPFSFile(*GetIPFSFileReq, Agent_GetIPFSFileServer) error
SendStream(Agent_SendStreamServer) error
FetchStream(*FetchStreamReq, Agent_FetchStreamServer) error
Ping(context.Context, *PingReq) (*PingResp, error)
mustEmbedUnimplementedAgentServer()
}

@@ -204,6 +216,9 @@ func (UnimplementedAgentServer) SendStream(Agent_SendStreamServer) error {
func (UnimplementedAgentServer) FetchStream(*FetchStreamReq, Agent_FetchStreamServer) error {
return status.Errorf(codes.Unimplemented, "method FetchStream not implemented")
}
func (UnimplementedAgentServer) Ping(context.Context, *PingReq) (*PingResp, error) {
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
}
func (UnimplementedAgentServer) mustEmbedUnimplementedAgentServer() {}

// UnsafeAgentServer may be embedded to opt out of forward compatibility for this service.
@@ -311,13 +326,36 @@ func (x *agentFetchStreamServer) Send(m *StreamDataPacket) error {
return x.ServerStream.SendMsg(m)
}

func _Agent_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PingReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AgentServer).Ping(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Agent_Ping_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AgentServer).Ping(ctx, req.(*PingReq))
}
return interceptor(ctx, in, info, handler)
}

// Agent_ServiceDesc is the grpc.ServiceDesc for Agent service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Agent_ServiceDesc = grpc.ServiceDesc{
ServiceName: "Agent",
HandlerType: (*AgentServer)(nil),
Methods: []grpc.MethodDesc{},
Methods: []grpc.MethodDesc{
{
MethodName: "Ping",
Handler: _Agent_Ping_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "SendIPFSFile",


+ 8
- 0
common/pkgs/grpc/agent/client.go View File

@@ -5,6 +5,7 @@ import (
"fmt"
"io"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@@ -209,6 +210,13 @@ func (c *Client) FetchStream(planID ioswitch.PlanID, streamID ioswitch.StreamID)
}, nil
}

func (c *Client) Ping(fromNodeID cdssdk.NodeID) error {
_, err := c.cli.Ping(context.Background(), &PingReq{
FromNodeID: int64(fromNodeID),
})
return err
}

func (c *Client) Close() {
c.con.Close()
}

+ 53
- 0
common/pkgs/mq/coordinator/node.go View File

@@ -9,6 +9,10 @@ type NodeService interface {
GetUserNodes(msg *GetUserNodes) (*GetUserNodesResp, *mq.CodeMessage)

GetNodes(msg *GetNodes) (*GetNodesResp, *mq.CodeMessage)

GetNodeConnectivities(msg *GetNodeConnectivities) (*GetNodeConnectivitiesResp, *mq.CodeMessage)

UpdateNodeConnectivities(msg *UpdateNodeConnectivities) (*UpdateNodeConnectivitiesResp, *mq.CodeMessage)
}

// 查询用户可用的节点
@@ -71,3 +75,52 @@ func (r *GetNodesResp) GetNode(id cdssdk.NodeID) *cdssdk.Node {
func (client *Client) GetNodes(msg *GetNodes) (*GetNodesResp, error) {
return mq.Request(Service.GetNodes, client.rabbitCli, msg)
}

// 获取节点连通性信息
var _ = Register(Service.GetNodeConnectivities)

type GetNodeConnectivities struct {
mq.MessageBodyBase
NodeIDs []cdssdk.NodeID `json:"nodeIDs"`
}
type GetNodeConnectivitiesResp struct {
mq.MessageBodyBase
Connectivities []cdssdk.NodeConnectivity `json:"nodes"`
}

func ReqGetNodeConnectivities(nodeIDs []cdssdk.NodeID) *GetNodeConnectivities {
return &GetNodeConnectivities{
NodeIDs: nodeIDs,
}
}
func RespGetNodeConnectivities(cons []cdssdk.NodeConnectivity) *GetNodeConnectivitiesResp {
return &GetNodeConnectivitiesResp{
Connectivities: cons,
}
}
func (client *Client) GetNodeConnectivities(msg *GetNodeConnectivities) (*GetNodeConnectivitiesResp, error) {
return mq.Request(Service.GetNodeConnectivities, client.rabbitCli, msg)
}

// 批量更新节点连通性信息
var _ = Register(Service.UpdateNodeConnectivities)

type UpdateNodeConnectivities struct {
mq.MessageBodyBase
Connectivities []cdssdk.NodeConnectivity `json:"connectivities"`
}
type UpdateNodeConnectivitiesResp struct {
mq.MessageBodyBase
}

func ReqUpdateNodeConnectivities(cons []cdssdk.NodeConnectivity) *UpdateNodeConnectivities {
return &UpdateNodeConnectivities{
Connectivities: cons,
}
}
func RespUpdateNodeConnectivities() *UpdateNodeConnectivitiesResp {
return &UpdateNodeConnectivitiesResp{}
}
func (client *Client) UpdateNodeConnectivities(msg *UpdateNodeConnectivities) (*UpdateNodeConnectivitiesResp, error) {
return mq.Request(Service.UpdateNodeConnectivities, client.rabbitCli, msg)
}

+ 49
- 0
coordinator/internal/mq/node.go View File

@@ -1,6 +1,10 @@
package mq

import (
"database/sql"
"fmt"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
@@ -46,3 +50,48 @@ func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.Co

return mq.ReplyOK(coormq.NewGetNodesResp(nodes))
}

func (svc *Service) GetNodeConnectivities(msg *coormq.GetNodeConnectivities) (*coormq.GetNodeConnectivitiesResp, *mq.CodeMessage) {
cons, err := svc.db.NodeConnectivity().BatchGetByFromNode(svc.db.SQLCtx(), msg.NodeIDs)
if err != nil {
logger.Warnf("batch get node connectivities by from node: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "batch get node connectivities by from node failed")
}

return mq.ReplyOK(coormq.RespGetNodeConnectivities(cons))
}

func (svc *Service) UpdateNodeConnectivities(msg *coormq.UpdateNodeConnectivities) (*coormq.UpdateNodeConnectivitiesResp, *mq.CodeMessage) {
err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
// 只有发起节点和目的节点都存在,才能插入这条记录到数据库
allNodes, err := svc.db.Node().GetAllNodes(tx)
if err != nil {
return fmt.Errorf("getting all nodes: %w", err)
}

allNodeID := make(map[cdssdk.NodeID]bool)
for _, node := range allNodes {
allNodeID[node.NodeID] = true
}

var avaiCons []cdssdk.NodeConnectivity
for _, con := range msg.Connectivities {
if allNodeID[con.FromNodeID] && allNodeID[con.ToNodeID] {
avaiCons = append(avaiCons, con)
}
}

err = svc.db.NodeConnectivity().BatchUpdateOrCreate(tx, avaiCons)
if err != nil {
return fmt.Errorf("batch update or create node connectivities: %s", err)
}

return nil
})
if err != nil {
logger.Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

return mq.ReplyOK(coormq.RespUpdateNodeConnectivities())
}

Loading…
Cancel
Save