diff --git a/agent/internal/config/config.go b/agent/internal/config/config.go index f8c04a9..141d5de 100644 --- a/agent/internal/config/config.go +++ b/agent/internal/config/config.go @@ -6,6 +6,7 @@ import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" c "gitlink.org.cn/cloudream/common/utils/config" stgmodels "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/grpc" stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) @@ -19,6 +20,7 @@ type Config struct { RabbitMQ stgmq.Config `json:"rabbitMQ"` IPFS ipfs.Config `json:"ipfs"` DistLock distlock.Config `json:"distlock"` + Connectivity connectivity.Config `json:"connectivity"` } var cfg Config diff --git a/agent/internal/grpc/ping.go b/agent/internal/grpc/ping.go new file mode 100644 index 0000000..ff0fd20 --- /dev/null +++ b/agent/internal/grpc/ping.go @@ -0,0 +1,11 @@ +package grpc + +import ( + "context" + + agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" +) + +func (s *Service) Ping(context.Context, *agtrpc.PingReq) (*agtrpc.PingResp, error) { + return &agtrpc.PingResp{}, nil +} diff --git a/agent/internal/task/create_package.go b/agent/internal/task/create_package.go index 275d522..1ddd848 100644 --- a/agent/internal/task/create_package.go +++ b/agent/internal/task/create_package.go @@ -64,7 +64,8 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c } uploadRet, err := cmd.NewUploadObjects(t.userID, createResp.PackageID, t.objIter, t.nodeAffinity).Execute(&cmd.UploadObjectsContext{ - Distlock: ctx.distlock, + Distlock: ctx.distlock, + Connectivity: ctx.connectivity, }) if err != nil { err = fmt.Errorf("uploading objects: %w", err) diff --git a/agent/internal/task/task.go b/agent/internal/task/task.go index f41112c..469c0d0 100644 --- a/agent/internal/task/task.go +++ b/agent/internal/task/task.go @@ -3,12 +3,14 @@ package task import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type TaskContext struct { - distlock *distlock.Service - sw *ioswitch.Switch + distlock *distlock.Service + sw *ioswitch.Switch + connectivity *connectivity.Collector } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -23,9 +25,10 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(distlock *distlock.Service, sw *ioswitch.Switch) Manager { +func NewManager(distlock *distlock.Service, sw *ioswitch.Switch, connectivity *connectivity.Collector) Manager { return task.NewManager(TaskContext{ - distlock: distlock, - sw: sw, + distlock: distlock, + sw: sw, + connectivity: connectivity, }) } diff --git a/agent/main.go b/agent/main.go index 1949f69..b1aa82b 100644 --- a/agent/main.go +++ b/agent/main.go @@ -7,9 +7,11 @@ import ( "sync" log "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/agent/internal/config" "gitlink.org.cn/cloudream/storage/agent/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" @@ -20,6 +22,7 @@ import ( "google.golang.org/grpc" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" grpcsvc "gitlink.org.cn/cloudream/storage/agent/internal/grpc" cmdsvc "gitlink.org.cn/cloudream/storage/agent/internal/mq" @@ -49,6 +52,41 @@ func main() { stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) stgglb.InitIPFSPool(&config.Cfg().IPFS) + // 启动网络连通性检测,并就地检测一次 + conCol := connectivity.NewCollector(&config.Cfg().Connectivity, func(collector *connectivity.Collector) { + log := log.WithField("Connectivity", "") + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + log.Warnf("acquire coordinator mq failed, err: %s", err.Error()) + return + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + cons := collector.GetAll() + nodeCons := make([]cdssdk.NodeConnectivity, 0, len(cons)) + for _, con := range cons { + var delay *float32 + if con.Delay != nil { + v := float32(con.Delay.Microseconds()) / 1000 + delay = &v + } + + nodeCons = append(nodeCons, cdssdk.NodeConnectivity{ + FromNodeID: *stgglb.Local.NodeID, + ToNodeID: con.ToNodeID, + Delay: delay, + TestTime: con.TestTime, + }) + } + + _, err = coorCli.UpdateNodeConnectivities(coormq.ReqUpdateNodeConnectivities(nodeCons)) + if err != nil { + log.Warnf("update node connectivities: %v", err) + } + }) + conCol.CollectInPlace() + distlock, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { log.Fatalf("new ipfs failed, err: %s", err.Error()) @@ -60,7 +98,7 @@ func main() { wg := sync.WaitGroup{} wg.Add(4) - taskMgr := task.NewManager(distlock, &sw) + taskMgr := task.NewManager(distlock, &sw, &conCol) // 启动命令服务器 // TODO 需要设计AgentID持久化机制 @@ -74,8 +112,6 @@ func main() { go serveAgentServer(agtSvr, &wg) - // go reportStatus(&wg) //网络延迟感知 - //面向客户端收发数据 listenAddr := config.Cfg().GRPC.MakeListenAddress() lis, err := net.Listen("tcp", listenAddr) diff --git a/client/internal/config/config.go b/client/internal/config/config.go index c08d366..cf6d64a 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -6,17 +6,19 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/config" stgmodels "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) type Config struct { - Local stgmodels.LocalMachineInfo `json:"local"` - AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` - Logger logger.Config `json:"logger"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` - IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon - DistLock distlock.Config `json:"distlock"` + Local stgmodels.LocalMachineInfo `json:"local"` + AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` + Logger logger.Config `json:"logger"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` + IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon + DistLock distlock.Config `json:"distlock"` + Connectivity connectivity.Config `json:"connectivity"` } var cfg Config diff --git a/client/internal/task/task.go b/client/internal/task/task.go index 7f0a27a..a1ec9f6 100644 --- a/client/internal/task/task.go +++ b/client/internal/task/task.go @@ -3,10 +3,12 @@ package task import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" ) type TaskContext struct { - distlock *distlock.Service + distlock *distlock.Service + connectivity *connectivity.Collector } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -21,8 +23,9 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(distlock *distlock.Service) Manager { +func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector) Manager { return task.NewManager(TaskContext{ - distlock: distlock, + distlock: distlock, + connectivity: connectivity, }) } diff --git a/client/internal/task/upload_objects.go b/client/internal/task/upload_objects.go index b138add..ca0f8a6 100644 --- a/client/internal/task/upload_objects.go +++ b/client/internal/task/upload_objects.go @@ -25,7 +25,8 @@ func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objectIt func (t *UploadObjects) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { ret, err := t.cmd.Execute(&cmd.UploadObjectsContext{ - Distlock: ctx.distlock, + Distlock: ctx.distlock, + Connectivity: ctx.connectivity, }) t.Result = ret diff --git a/client/main.go b/client/main.go index 9fe03d5..101b013 100644 --- a/client/main.go +++ b/client/main.go @@ -12,6 +12,7 @@ import ( "gitlink.org.cn/cloudream/storage/client/internal/services" "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" ) @@ -37,6 +38,10 @@ func main() { stgglb.InitIPFSPool(config.Cfg().IPFS) } + // 启动网络连通性检测,并就地检测一次 + conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil) + conCol.CollectInPlace() + distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { logger.Warnf("new distlock service failed, err: %s", err.Error()) @@ -44,7 +49,7 @@ func main() { } go serveDistLock(distlockSvc) - taskMgr := task.NewManager(distlockSvc) + taskMgr := task.NewManager(distlockSvc, &conCol) svc, err := services.NewService(distlockSvc, &taskMgr) if err != nil { diff --git a/common/assets/confs/agent.config.json b/common/assets/confs/agent.config.json index 226711e..e54c953 100644 --- a/common/assets/confs/agent.config.json +++ b/common/assets/confs/agent.config.json @@ -32,5 +32,8 @@ "etcdLockLeaseTimeSec": 5, "randomReleasingDelayMs": 3000, "serviceDescription": "I am a agent" + }, + "connectivity": { + "testInterval": 300 } } \ No newline at end of file diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index d7ce483..9791f83 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -25,5 +25,8 @@ "etcdLockLeaseTimeSec": 5, "randomReleasingDelayMs": 3000, "serviceDescription": "I am a client" + }, + "connectivity": { + "testInterval": 300 } } \ No newline at end of file diff --git a/common/assets/scripts/create_database.sql b/common/assets/scripts/create_database.sql index 415b0bf..42f3249 100644 --- a/common/assets/scripts/create_database.sql +++ b/common/assets/scripts/create_database.sql @@ -52,12 +52,13 @@ insert into values (1, "HuaWei-Cloud", 1, "/", "Online"); -create table NodeDelay ( - SourceNodeID int not null comment '发起检测的节点ID', - DestinationNodeID int not null comment '被检测节点的ID', - DelayInMs int not null comment '发起节点与被检测节点间延迟(毫秒)', - primary key(SourceNodeID, DestinationNodeID) -) comment = '节点延迟表'; +create table NodeConnectivity ( + FromNodeID int not null comment '发起检测的节点ID', + ToNodeID int not null comment '被检测节点的ID', + Delay float comment '发起节点与被检测节点间延迟(毫秒),为null代表节点不可达', + TestTime timestamp comment '进行连通性测试的时间', + primary key(FromNodeID, ToNodeID) +) comment = '节点连通性表'; create table User ( UserID int not null primary key comment '用户ID', diff --git a/common/pkgs/cmd/upload_objects.go b/common/pkgs/cmd/upload_objects.go index b4234f3..59a66bc 100644 --- a/common/pkgs/cmd/upload_objects.go +++ b/common/pkgs/cmd/upload_objects.go @@ -3,6 +3,7 @@ package cmd import ( "fmt" "io" + "math" "math/rand" "time" @@ -11,8 +12,10 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/sort2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" @@ -39,11 +42,13 @@ type ObjectUploadResult struct { type UploadNodeInfo struct { Node cdssdk.Node + Delay time.Duration IsSameLocation bool } type UploadObjectsContext struct { - Distlock *distlock.Service + Distlock *distlock.Service + Connectivity *connectivity.Collector } func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *UploadObjects { @@ -68,12 +73,24 @@ func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult return nil, fmt.Errorf("getting user nodes: %w", err) } + cons := ctx.Connectivity.GetAll() userNodes := lo.Map(getUserNodesResp.Nodes, func(node cdssdk.Node, index int) UploadNodeInfo { + delay := time.Duration(math.MaxInt64) + + con, ok := cons[node.NodeID] + if ok && con.Delay != nil { + delay = *con.Delay + } + return UploadNodeInfo{ Node: node, + Delay: delay, IsSameLocation: node.LocationID == stgglb.Local.LocationID, } }) + if len(userNodes) == 0 { + return nil, fmt.Errorf("user no available nodes") + } // 给上传节点的IPFS加锁 ipfsReqBlder := reqbuilder.NewBuilder() @@ -109,7 +126,7 @@ func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult // chooseUploadNode 选择一个上传文件的节点 // 1. 选择设置了亲和性的节点 // 2. 从与当前客户端相同地域的节点中随机选一个 -// 3. 没有用的话从所有节点中随机选一个 +// 3. 没有的话从所有节点选择延迟最低的节点 func chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) UploadNodeInfo { if nodeAffinity != nil { aff, ok := lo.Find(nodes, func(node UploadNodeInfo) bool { return node.Node.NodeID == *nodeAffinity }) @@ -123,7 +140,10 @@ func chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) Uploa return sameLocationNodes[rand.Intn(len(sameLocationNodes))] } - return nodes[rand.Intn(len(nodes))] + // 选择延迟最低的节点 + nodes = sort2.Sort(nodes, func(e1, e2 UploadNodeInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) }) + + return nodes[0] } func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, userNodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) ([]ObjectUploadResult, error) { diff --git a/common/pkgs/connectivity/collector.go b/common/pkgs/connectivity/collector.go new file mode 100644 index 0000000..20894d7 --- /dev/null +++ b/common/pkgs/connectivity/collector.go @@ -0,0 +1,198 @@ +package connectivity + +import ( + "math/rand" + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" +) + +type Connectivity struct { + ToNodeID cdssdk.NodeID + Delay *time.Duration + TestTime time.Time +} + +type Collector struct { + cfg *Config + onCollected func(collector *Collector) + collectNow chan any + close chan any + connectivities map[cdssdk.NodeID]Connectivity + lock *sync.RWMutex +} + +func NewCollector(cfg *Config, onCollected func(collector *Collector)) Collector { + rpt := Collector{ + cfg: cfg, + collectNow: make(chan any), + close: make(chan any), + connectivities: make(map[cdssdk.NodeID]Connectivity), + lock: &sync.RWMutex{}, + onCollected: onCollected, + } + go rpt.serve() + return rpt +} + +func (r *Collector) Get(nodeID cdssdk.NodeID) *Connectivity { + r.lock.RLock() + defer r.lock.RUnlock() + + con, ok := r.connectivities[nodeID] + if ok { + return &con + } + + return nil +} +func (r *Collector) GetAll() map[cdssdk.NodeID]Connectivity { + r.lock.RLock() + defer r.lock.RUnlock() + + ret := make(map[cdssdk.NodeID]Connectivity) + for k, v := range r.connectivities { + ret[k] = v + } + + return ret +} + +// 启动一次收集 +func (r *Collector) CollecNow() { + select { + case r.collectNow <- nil: + default: + } +} + +// 就地进行收集,会阻塞当前线程 +func (r *Collector) CollectInPlace() { + r.testing() +} + +func (r *Collector) Close() { + select { + case r.close <- nil: + default: + } +} + +func (r *Collector) serve() { + log := logger.WithType[Collector]("") + log.Info("start connectivity reporter") + + // 为了防止同时启动的节点会集中进行Ping,所以第一次上报间隔为0-TestInterval秒之间随机 + startup := true + firstReportDelay := time.Duration(float64(r.cfg.TestInterval) * float64(time.Second) * rand.Float64()) + ticker := time.NewTicker(firstReportDelay) + +loop: + for { + select { + case <-ticker.C: + r.testing() + if startup { + startup = false + ticker.Reset(time.Duration(r.cfg.TestInterval) * time.Second) + } + + case <-r.collectNow: + r.testing() + + case <-r.close: + ticker.Stop() + break loop + } + } + + log.Info("stop connectivity reporter") +} + +func (r *Collector) testing() { + log := logger.WithType[Collector]("") + log.Debug("do testing") + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getNodeResp, err := coorCli.GetNodes(coormq.NewGetNodes(nil)) + if err != nil { + return + } + + wg := sync.WaitGroup{} + cons := make([]Connectivity, len(getNodeResp.Nodes)) + for i, node := range getNodeResp.Nodes { + tmpIdx := i + tmpNode := node + + wg.Add(1) + go func() { + defer wg.Done() + cons[tmpIdx] = r.ping(tmpNode) + }() + } + + wg.Wait() + + r.lock.Lock() + // 删除所有node的记录,然后重建,避免node数量变化时导致残余数据 + r.connectivities = make(map[cdssdk.NodeID]Connectivity) + for _, con := range cons { + r.connectivities[con.ToNodeID] = con + } + r.lock.Unlock() + + if r.onCollected != nil { + r.onCollected(r) + } +} + +func (r *Collector) ping(node cdssdk.Node) Connectivity { + log := logger.WithType[Collector]("").WithField("NodeID", node.NodeID) + + ip := node.ExternalIP + port := node.ExternalGRPCPort + if node.LocationID == stgglb.Local.LocationID { + ip = node.LocalIP + port = node.LocalGRPCPort + } + + agtCli, err := stgglb.AgentRPCPool.Acquire(ip, port) + if err != nil { + log.Warnf("new agent %v:%v rpc client: %w", ip, port, err) + return Connectivity{ + ToNodeID: node.NodeID, + Delay: nil, + TestTime: time.Now(), + } + } + defer stgglb.AgentRPCPool.Release(agtCli) + + start := time.Now() + err = agtCli.Ping(*stgglb.Local.NodeID) + if err != nil { + log.Warnf("ping: %v", err) + return Connectivity{ + ToNodeID: node.NodeID, + Delay: nil, + TestTime: time.Now(), + } + } + + // 此时间差为一个来回的时间,因此单程延迟需要除以2 + delay := time.Since(start) / 2 + return Connectivity{ + ToNodeID: node.NodeID, + Delay: &delay, + TestTime: time.Now(), + } +} diff --git a/common/pkgs/connectivity/config.go b/common/pkgs/connectivity/config.go new file mode 100644 index 0000000..c839fcb --- /dev/null +++ b/common/pkgs/connectivity/config.go @@ -0,0 +1,5 @@ +package connectivity + +type Config struct { + TestInterval int `json:"testInterval"` // 进行测试的间隔 +} diff --git a/common/pkgs/db/model/model.go b/common/pkgs/db/model/model.go index 7b5ee66..a48946f 100644 --- a/common/pkgs/db/model/model.go +++ b/common/pkgs/db/model/model.go @@ -20,12 +20,6 @@ type Storage struct { State string `db:"State" json:"state"` } -type NodeDelay struct { - SourceNodeID int64 `db:"SourceNodeID"` - DestinationNodeID int64 `db:"DestinationNodeID"` - DelayInMs int `db:"DelayInMs"` -} - type User struct { UserID cdssdk.UserID `db:"UserID" json:"userID"` Password string `db:"PassWord" json:"password"` @@ -52,6 +46,8 @@ type Package = cdssdk.Package type Object = cdssdk.Object +type NodeConnectivity = cdssdk.NodeConnectivity + // 由于Object的Redundancy字段是interface,所以不能直接将查询结果scan成Object,必须先scan成TempObject, // 再.ToObject()转成Object type TempObject struct { diff --git a/common/pkgs/db/node_connectivity.go b/common/pkgs/db/node_connectivity.go new file mode 100644 index 0000000..ffc3307 --- /dev/null +++ b/common/pkgs/db/node_connectivity.go @@ -0,0 +1,32 @@ +package db + +import ( + "github.com/jmoiron/sqlx" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" +) + +type NodeConnectivityDB struct { + *DB +} + +func (db *DB) NodeConnectivity() *NodeConnectivityDB { + return &NodeConnectivityDB{DB: db} +} + +func (db *NodeConnectivityDB) BatchGetByFromNode(ctx SQLContext, nodeIDs []cdssdk.NodeID) ([]model.NodeConnectivity, error) { + var ret []model.NodeConnectivity + + sql, args, err := sqlx.In("select * from NodeConnectivity where NodeID in (?)", nodeIDs) + if err != nil { + return nil, err + } + + return ret, sqlx.Select(ctx, &ret, sql, args...) +} + +func (db *NodeConnectivityDB) BatchUpdateOrCreate(ctx SQLContext, cons []model.NodeConnectivity) error { + return BatchNamedExec(ctx, + "insert into NodeConnectivity(FromNodeID, ToNodeID, Delay, TestTime) values(:FromNodeID, :ToNodeID, :Delay, :TestTime) as new"+ + " on duplicate key update Delay = new.Delay, TestTime = new.TestTime", 4, cons, nil) +} diff --git a/common/pkgs/grpc/agent/agent.pb.go b/common/pkgs/grpc/agent/agent.pb.go index c5f933b..1e70002 100644 --- a/common/pkgs/grpc/agent/agent.pb.go +++ b/common/pkgs/grpc/agent/agent.pb.go @@ -386,6 +386,91 @@ func (x *FetchStreamReq) GetStreamID() string { return "" } +type PingReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + FromNodeID int64 `protobuf:"varint,1,opt,name=FromNodeID,proto3" json:"FromNodeID,omitempty"` +} + +func (x *PingReq) Reset() { + *x = PingReq{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PingReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PingReq) ProtoMessage() {} + +func (x *PingReq) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PingReq.ProtoReflect.Descriptor instead. +func (*PingReq) Descriptor() ([]byte, []int) { + return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{6} +} + +func (x *PingReq) GetFromNodeID() int64 { + if x != nil { + return x.FromNodeID + } + return 0 +} + +type PingResp struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *PingResp) Reset() { + *x = PingResp{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PingResp) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PingResp) ProtoMessage() {} + +func (x *PingResp) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PingResp.ProtoReflect.Descriptor instead. +func (*PingResp) Descriptor() ([]byte, []int) { + return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{7} +} + var File_pkgs_grpc_agent_agent_proto protoreflect.FileDescriptor var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{ @@ -415,26 +500,32 @@ var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{ 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x12, 0x1a, 0x0a, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x2a, 0x37, 0x0a, 0x14, - 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, 0x46, 0x10, 0x00, 0x12, 0x08, 0x0a, - 0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x41, - 0x72, 0x67, 0x73, 0x10, 0x02, 0x32, 0xe1, 0x01, 0x0a, 0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, - 0x36, 0x0a, 0x0c, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, - 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, - 0x1a, 0x11, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, - 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x33, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x49, 0x50, - 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, - 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, - 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, 0x12, 0x34, 0x0a, 0x0a, - 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x11, 0x2e, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x0f, 0x2e, - 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, - 0x28, 0x01, 0x12, 0x35, 0x0a, 0x0b, 0x46, 0x65, 0x74, 0x63, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x12, 0x0f, 0x2e, 0x46, 0x65, 0x74, 0x63, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, - 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, - 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x61, - 0x67, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x28, 0x09, 0x52, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x22, 0x29, 0x0a, 0x07, + 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x12, 0x1e, 0x0a, 0x0a, 0x46, 0x72, 0x6f, 0x6d, 0x4e, + 0x6f, 0x64, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x46, 0x72, 0x6f, + 0x6d, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x44, 0x22, 0x0a, 0x0a, 0x08, 0x50, 0x69, 0x6e, 0x67, 0x52, + 0x65, 0x73, 0x70, 0x2a, 0x37, 0x0a, 0x14, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, + 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x45, + 0x4f, 0x46, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x01, 0x12, 0x0c, + 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x41, 0x72, 0x67, 0x73, 0x10, 0x02, 0x32, 0x80, 0x02, 0x0a, + 0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x36, 0x0a, 0x0c, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, + 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, + 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x11, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, + 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x33, + 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, + 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x1a, 0x0f, + 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, + 0x00, 0x30, 0x01, 0x12, 0x34, 0x0a, 0x0a, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x12, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, + 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x0f, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x35, 0x0a, 0x0b, 0x46, 0x65, 0x74, + 0x63, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x0f, 0x2e, 0x46, 0x65, 0x74, 0x63, 0x68, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, + 0x12, 0x1d, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x08, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, + 0x65, 0x71, 0x1a, 0x09, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, 0x42, + 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -450,7 +541,7 @@ func file_pkgs_grpc_agent_agent_proto_rawDescGZIP() []byte { } var file_pkgs_grpc_agent_agent_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_pkgs_grpc_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_pkgs_grpc_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_pkgs_grpc_agent_agent_proto_goTypes = []interface{}{ (StreamDataPacketType)(0), // 0: StreamDataPacketType (*FileDataPacket)(nil), // 1: FileDataPacket @@ -459,6 +550,8 @@ var file_pkgs_grpc_agent_agent_proto_goTypes = []interface{}{ (*StreamDataPacket)(nil), // 4: StreamDataPacket (*SendStreamResp)(nil), // 5: SendStreamResp (*FetchStreamReq)(nil), // 6: FetchStreamReq + (*PingReq)(nil), // 7: PingReq + (*PingResp)(nil), // 8: PingResp } var file_pkgs_grpc_agent_agent_proto_depIdxs = []int32{ 0, // 0: FileDataPacket.Type:type_name -> StreamDataPacketType @@ -467,12 +560,14 @@ var file_pkgs_grpc_agent_agent_proto_depIdxs = []int32{ 3, // 3: Agent.GetIPFSFile:input_type -> GetIPFSFileReq 4, // 4: Agent.SendStream:input_type -> StreamDataPacket 6, // 5: Agent.FetchStream:input_type -> FetchStreamReq - 2, // 6: Agent.SendIPFSFile:output_type -> SendIPFSFileResp - 1, // 7: Agent.GetIPFSFile:output_type -> FileDataPacket - 5, // 8: Agent.SendStream:output_type -> SendStreamResp - 4, // 9: Agent.FetchStream:output_type -> StreamDataPacket - 6, // [6:10] is the sub-list for method output_type - 2, // [2:6] is the sub-list for method input_type + 7, // 6: Agent.Ping:input_type -> PingReq + 2, // 7: Agent.SendIPFSFile:output_type -> SendIPFSFileResp + 1, // 8: Agent.GetIPFSFile:output_type -> FileDataPacket + 5, // 9: Agent.SendStream:output_type -> SendStreamResp + 4, // 10: Agent.FetchStream:output_type -> StreamDataPacket + 8, // 11: Agent.Ping:output_type -> PingResp + 7, // [7:12] is the sub-list for method output_type + 2, // [2:7] is the sub-list for method input_type 2, // [2:2] is the sub-list for extension type_name 2, // [2:2] is the sub-list for extension extendee 0, // [0:2] is the sub-list for field type_name @@ -556,6 +651,30 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } + file_pkgs_grpc_agent_agent_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PingReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkgs_grpc_agent_agent_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PingResp); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -563,7 +682,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkgs_grpc_agent_agent_proto_rawDesc, NumEnums: 1, - NumMessages: 6, + NumMessages: 8, NumExtensions: 0, NumServices: 1, }, diff --git a/common/pkgs/grpc/agent/agent.proto b/common/pkgs/grpc/agent/agent.proto index f756fec..7f08bde 100644 --- a/common/pkgs/grpc/agent/agent.proto +++ b/common/pkgs/grpc/agent/agent.proto @@ -40,11 +40,18 @@ message FetchStreamReq { string StreamID = 2; } +message PingReq { + int64 FromNodeID = 1; +} +message PingResp {} + service Agent { rpc SendIPFSFile(stream FileDataPacket)returns(SendIPFSFileResp){} rpc GetIPFSFile(GetIPFSFileReq)returns(stream FileDataPacket){} rpc SendStream(stream StreamDataPacket)returns(SendStreamResp){} rpc FetchStream(FetchStreamReq)returns(stream StreamDataPacket){} + + rpc Ping(PingReq) returns(PingResp){} } diff --git a/common/pkgs/grpc/agent/agent_grpc.pb.go b/common/pkgs/grpc/agent/agent_grpc.pb.go index 95b2f92..81d5910 100644 --- a/common/pkgs/grpc/agent/agent_grpc.pb.go +++ b/common/pkgs/grpc/agent/agent_grpc.pb.go @@ -25,6 +25,7 @@ const ( Agent_GetIPFSFile_FullMethodName = "/Agent/GetIPFSFile" Agent_SendStream_FullMethodName = "/Agent/SendStream" Agent_FetchStream_FullMethodName = "/Agent/FetchStream" + Agent_Ping_FullMethodName = "/Agent/Ping" ) // AgentClient is the client API for Agent service. @@ -35,6 +36,7 @@ type AgentClient interface { GetIPFSFile(ctx context.Context, in *GetIPFSFileReq, opts ...grpc.CallOption) (Agent_GetIPFSFileClient, error) SendStream(ctx context.Context, opts ...grpc.CallOption) (Agent_SendStreamClient, error) FetchStream(ctx context.Context, in *FetchStreamReq, opts ...grpc.CallOption) (Agent_FetchStreamClient, error) + Ping(ctx context.Context, in *PingReq, opts ...grpc.CallOption) (*PingResp, error) } type agentClient struct { @@ -177,6 +179,15 @@ func (x *agentFetchStreamClient) Recv() (*StreamDataPacket, error) { return m, nil } +func (c *agentClient) Ping(ctx context.Context, in *PingReq, opts ...grpc.CallOption) (*PingResp, error) { + out := new(PingResp) + err := c.cc.Invoke(ctx, Agent_Ping_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // AgentServer is the server API for Agent service. // All implementations must embed UnimplementedAgentServer // for forward compatibility @@ -185,6 +196,7 @@ type AgentServer interface { GetIPFSFile(*GetIPFSFileReq, Agent_GetIPFSFileServer) error SendStream(Agent_SendStreamServer) error FetchStream(*FetchStreamReq, Agent_FetchStreamServer) error + Ping(context.Context, *PingReq) (*PingResp, error) mustEmbedUnimplementedAgentServer() } @@ -204,6 +216,9 @@ func (UnimplementedAgentServer) SendStream(Agent_SendStreamServer) error { func (UnimplementedAgentServer) FetchStream(*FetchStreamReq, Agent_FetchStreamServer) error { return status.Errorf(codes.Unimplemented, "method FetchStream not implemented") } +func (UnimplementedAgentServer) Ping(context.Context, *PingReq) (*PingResp, error) { + return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") +} func (UnimplementedAgentServer) mustEmbedUnimplementedAgentServer() {} // UnsafeAgentServer may be embedded to opt out of forward compatibility for this service. @@ -311,13 +326,36 @@ func (x *agentFetchStreamServer) Send(m *StreamDataPacket) error { return x.ServerStream.SendMsg(m) } +func _Agent_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PingReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AgentServer).Ping(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Agent_Ping_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AgentServer).Ping(ctx, req.(*PingReq)) + } + return interceptor(ctx, in, info, handler) +} + // Agent_ServiceDesc is the grpc.ServiceDesc for Agent service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) var Agent_ServiceDesc = grpc.ServiceDesc{ ServiceName: "Agent", HandlerType: (*AgentServer)(nil), - Methods: []grpc.MethodDesc{}, + Methods: []grpc.MethodDesc{ + { + MethodName: "Ping", + Handler: _Agent_Ping_Handler, + }, + }, Streams: []grpc.StreamDesc{ { StreamName: "SendIPFSFile", diff --git a/common/pkgs/grpc/agent/client.go b/common/pkgs/grpc/agent/client.go index 440e49a..74ea749 100644 --- a/common/pkgs/grpc/agent/client.go +++ b/common/pkgs/grpc/agent/client.go @@ -5,6 +5,7 @@ import ( "fmt" "io" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -209,6 +210,13 @@ func (c *Client) FetchStream(planID ioswitch.PlanID, streamID ioswitch.StreamID) }, nil } +func (c *Client) Ping(fromNodeID cdssdk.NodeID) error { + _, err := c.cli.Ping(context.Background(), &PingReq{ + FromNodeID: int64(fromNodeID), + }) + return err +} + func (c *Client) Close() { c.con.Close() } diff --git a/common/pkgs/mq/coordinator/node.go b/common/pkgs/mq/coordinator/node.go index 558949a..53c0ddd 100644 --- a/common/pkgs/mq/coordinator/node.go +++ b/common/pkgs/mq/coordinator/node.go @@ -9,6 +9,10 @@ type NodeService interface { GetUserNodes(msg *GetUserNodes) (*GetUserNodesResp, *mq.CodeMessage) GetNodes(msg *GetNodes) (*GetNodesResp, *mq.CodeMessage) + + GetNodeConnectivities(msg *GetNodeConnectivities) (*GetNodeConnectivitiesResp, *mq.CodeMessage) + + UpdateNodeConnectivities(msg *UpdateNodeConnectivities) (*UpdateNodeConnectivitiesResp, *mq.CodeMessage) } // 查询用户可用的节点 @@ -71,3 +75,52 @@ func (r *GetNodesResp) GetNode(id cdssdk.NodeID) *cdssdk.Node { func (client *Client) GetNodes(msg *GetNodes) (*GetNodesResp, error) { return mq.Request(Service.GetNodes, client.rabbitCli, msg) } + +// 获取节点连通性信息 +var _ = Register(Service.GetNodeConnectivities) + +type GetNodeConnectivities struct { + mq.MessageBodyBase + NodeIDs []cdssdk.NodeID `json:"nodeIDs"` +} +type GetNodeConnectivitiesResp struct { + mq.MessageBodyBase + Connectivities []cdssdk.NodeConnectivity `json:"nodes"` +} + +func ReqGetNodeConnectivities(nodeIDs []cdssdk.NodeID) *GetNodeConnectivities { + return &GetNodeConnectivities{ + NodeIDs: nodeIDs, + } +} +func RespGetNodeConnectivities(cons []cdssdk.NodeConnectivity) *GetNodeConnectivitiesResp { + return &GetNodeConnectivitiesResp{ + Connectivities: cons, + } +} +func (client *Client) GetNodeConnectivities(msg *GetNodeConnectivities) (*GetNodeConnectivitiesResp, error) { + return mq.Request(Service.GetNodeConnectivities, client.rabbitCli, msg) +} + +// 批量更新节点连通性信息 +var _ = Register(Service.UpdateNodeConnectivities) + +type UpdateNodeConnectivities struct { + mq.MessageBodyBase + Connectivities []cdssdk.NodeConnectivity `json:"connectivities"` +} +type UpdateNodeConnectivitiesResp struct { + mq.MessageBodyBase +} + +func ReqUpdateNodeConnectivities(cons []cdssdk.NodeConnectivity) *UpdateNodeConnectivities { + return &UpdateNodeConnectivities{ + Connectivities: cons, + } +} +func RespUpdateNodeConnectivities() *UpdateNodeConnectivitiesResp { + return &UpdateNodeConnectivitiesResp{} +} +func (client *Client) UpdateNodeConnectivities(msg *UpdateNodeConnectivities) (*UpdateNodeConnectivitiesResp, error) { + return mq.Request(Service.UpdateNodeConnectivities, client.rabbitCli, msg) +} diff --git a/coordinator/internal/mq/node.go b/coordinator/internal/mq/node.go index 778c956..0b51245 100644 --- a/coordinator/internal/mq/node.go +++ b/coordinator/internal/mq/node.go @@ -1,6 +1,10 @@ package mq import ( + "database/sql" + "fmt" + + "github.com/jmoiron/sqlx" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" @@ -46,3 +50,48 @@ func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.Co return mq.ReplyOK(coormq.NewGetNodesResp(nodes)) } + +func (svc *Service) GetNodeConnectivities(msg *coormq.GetNodeConnectivities) (*coormq.GetNodeConnectivitiesResp, *mq.CodeMessage) { + cons, err := svc.db.NodeConnectivity().BatchGetByFromNode(svc.db.SQLCtx(), msg.NodeIDs) + if err != nil { + logger.Warnf("batch get node connectivities by from node: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "batch get node connectivities by from node failed") + } + + return mq.ReplyOK(coormq.RespGetNodeConnectivities(cons)) +} + +func (svc *Service) UpdateNodeConnectivities(msg *coormq.UpdateNodeConnectivities) (*coormq.UpdateNodeConnectivitiesResp, *mq.CodeMessage) { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + // 只有发起节点和目的节点都存在,才能插入这条记录到数据库 + allNodes, err := svc.db.Node().GetAllNodes(tx) + if err != nil { + return fmt.Errorf("getting all nodes: %w", err) + } + + allNodeID := make(map[cdssdk.NodeID]bool) + for _, node := range allNodes { + allNodeID[node.NodeID] = true + } + + var avaiCons []cdssdk.NodeConnectivity + for _, con := range msg.Connectivities { + if allNodeID[con.FromNodeID] && allNodeID[con.ToNodeID] { + avaiCons = append(avaiCons, con) + } + } + + err = svc.db.NodeConnectivity().BatchUpdateOrCreate(tx, avaiCons) + if err != nil { + return fmt.Errorf("batch update or create node connectivities: %s", err) + } + + return nil + }) + if err != nil { + logger.Warn(err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) + } + + return mq.ReplyOK(coormq.RespUpdateNodeConnectivities()) +}