From a0a68fe933a996d220ffb03642aab246d6a44f51 Mon Sep 17 00:00:00 2001 From: shaozhuguang Date: Thu, 18 Apr 2019 17:48:53 +0800 Subject: [PATCH] Add Close receiver function --- .../manager/ConnectionManager.java | 9 +- .../manager/RemoteSessionManager.java | 9 ++ .../stp/commucation/StpReceiversBoot.java | 75 +++++++++++++++ .../jd/blockchain/StpReceiversBootTest.java | 43 +++++++++ .../java/com/jd/blockchain/StpSenderTest.java | 96 +++++++++++++++++++ 5 files changed, 231 insertions(+), 1 deletion(-) create mode 100644 source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/StpReceiversBoot.java create mode 100644 source/test/test-stp-community/src/test/java/com/jd/blockchain/StpReceiversBootTest.java create mode 100644 source/test/test-stp-community/src/test/java/com/jd/blockchain/StpSenderTest.java diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/ConnectionManager.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/ConnectionManager.java index 3f322e97..0db2cdfe 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/ConnectionManager.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/ConnectionManager.java @@ -8,7 +8,6 @@ */ package com.jd.blockchain.stp.communication.manager; -import com.jd.blockchain.stp.communication.MessageExecutor; import com.jd.blockchain.stp.communication.callback.CallBackLauncher; import com.jd.blockchain.stp.communication.connection.Receiver; import com.jd.blockchain.stp.communication.connection.Connection; @@ -145,6 +144,14 @@ public class ConnectionManager { return connectionMap.get(remoteNode); } + /** + * 关闭Receiver + * + */ + public void closeReceiver() { + this.receiver.close(); + } + private Connection init(RemoteNode remoteNode, String messageExecutorClass) { // 初始化Connection diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java index 3403d2a0..50adda54 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java @@ -167,6 +167,15 @@ public class RemoteSessionManager { return remoteSessionList.toArray(remoteSessions); } + /** + * 返回底层通信管理器 + * + * @return + */ + public ConnectionManager connectionManager() { + return this.connectionManager; + } + private void check() { // 要求端口范围:1~65535,messageExecuteClass不能为null int listenPort = this.localNode.getPort(); diff --git a/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/StpReceiversBoot.java b/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/StpReceiversBoot.java new file mode 100644 index 00000000..e725d77b --- /dev/null +++ b/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/StpReceiversBoot.java @@ -0,0 +1,75 @@ +/** + * Copyright: Copyright 2016-2020 JD.COM All Right Reserved + * FileName: com.jd.blockchain.StpReceiversBoot + * Author: shaozhuguang + * Department: Y事业部 + * Date: 2019/4/18 下午3:44 + * Description: + */ +package com.jd.blockchain.stp.commucation; + +import com.jd.blockchain.stp.communication.MessageExecutor; +import com.jd.blockchain.stp.communication.manager.RemoteSessionManager; +import com.jd.blockchain.stp.communication.node.LocalNode; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * + * @author shaozhuguang + * @create 2019/4/18 + * @since 1.0.0 + */ + +public class StpReceiversBoot { + + private int[] listenPorts; + + private final String remoteHost = "127.0.0.1"; + + private ExecutorService threadPool; + + public StpReceiversBoot(int... ports) { + listenPorts = ports; + threadPool = Executors.newFixedThreadPool(ports.length + 2); + } + + public RemoteSessionManager[] start(MessageExecutor messageExecutor) { + + final int totalSessionSize = listenPorts.length; + + CountDownLatch countDownLatch = new CountDownLatch(totalSessionSize); + + RemoteSessionManager[] sessionManagers = new RemoteSessionManager[totalSessionSize]; + for (int i = 0; i < totalSessionSize; i++) { + final int port = listenPorts[i], index = i; + threadPool.execute(() -> { + // 创建本地节点 + final LocalNode localNode = new LocalNode(remoteHost, port, messageExecutor); + try { + // 启动当前节点 + RemoteSessionManager sessionManager = new RemoteSessionManager(localNode); + sessionManagers[index] = sessionManager; + System.out.printf("Current Node {%s} start success !!! \r\n", localNode.toString()); + } catch (Exception e) { + e.printStackTrace(); + } finally { + countDownLatch.countDown(); + } + }); + } + + // 等待所有节点启动完成 + try { + countDownLatch.await(); + } catch (Exception e) { + e.printStackTrace(); + } + + return sessionManagers; + } + + +} \ No newline at end of file diff --git a/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpReceiversBootTest.java b/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpReceiversBootTest.java new file mode 100644 index 00000000..1cd6e985 --- /dev/null +++ b/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpReceiversBootTest.java @@ -0,0 +1,43 @@ +/** + * Copyright: Copyright 2016-2020 JD.COM All Right Reserved + * FileName: com.jd.blockchain.StpReceiversBootTest + * Author: shaozhuguang + * Department: Y事业部 + * Date: 2019/4/18 下午3:53 + * Description: + */ +package com.jd.blockchain; + +import com.jd.blockchain.stp.commucation.MyMessageExecutor; +import com.jd.blockchain.stp.commucation.StpReceiversBoot; +import com.jd.blockchain.stp.communication.manager.RemoteSessionManager; +import org.junit.Test; + +/** + * + * @author shaozhuguang + * @create 2019/4/18 + * @since 1.0.0 + */ + +public class StpReceiversBootTest { + + public static final int[] localPorts = new int[]{9900, 9901}; + + @Test + public void test() { + StpReceiversBoot stpReceiversBoot = new StpReceiversBoot(9900, 9901); + RemoteSessionManager[] sessionManagers = stpReceiversBoot.start(new MyMessageExecutor()); + + try { + Thread.sleep(10000); + + // 关闭所有的监听器 + for (RemoteSessionManager sessionManager : sessionManagers) { + sessionManager.connectionManager().closeReceiver(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpSenderTest.java b/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpSenderTest.java new file mode 100644 index 00000000..29741ae0 --- /dev/null +++ b/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpSenderTest.java @@ -0,0 +1,96 @@ +/** + * Copyright: Copyright 2016-2020 JD.COM All Right Reserved + * FileName: com.jd.blockchain.StpSenderTest + * Author: shaozhuguang + * Department: Y事业部 + * Date: 2019/4/18 下午3:56 + * Description: + */ +package com.jd.blockchain; + +import com.jd.blockchain.stp.commucation.MyMessageExecutor; +import com.jd.blockchain.stp.commucation.StpReceiversBoot; +import com.jd.blockchain.stp.communication.RemoteSession; +import com.jd.blockchain.stp.communication.callback.CallBackBarrier; +import com.jd.blockchain.stp.communication.callback.CallBackDataListener; +import com.jd.blockchain.stp.communication.manager.RemoteSessionManager; +import com.jd.blockchain.stp.communication.message.LoadMessage; +import com.jd.blockchain.stp.communication.node.RemoteNode; +import org.junit.Test; + +import java.util.Iterator; +import java.util.LinkedList; + +/** + * + * @author shaozhuguang + * @create 2019/4/18 + * @since 1.0.0 + */ + +public class StpSenderTest { + + // 本地的端口 + private static final int localPort = 9800; + + // 连接的远端端口集合 + private static final int[] remotePorts = StpReceiversBootTest.localPorts; + + // 本地节点信息 + private static final String localHost = "127.0.0.1"; + + @Test + public void test() { + // 首先启动本地节点 + StpReceiversBoot stpReceiversBoot = new StpReceiversBoot(localPort); + RemoteSessionManager[] sessionManagers = stpReceiversBoot.start(new MyMessageExecutor()); + + // 本地节点启动完成后 + if (sessionManagers != null && sessionManagers.length > 0) { + RemoteSessionManager localSessionManager = sessionManagers[0]; + + // 连接远端的两个节点 + RemoteNode[] remoteNodes = new RemoteNode[]{ + new RemoteNode(localHost, remotePorts[0]), + new RemoteNode(localHost, remotePorts[1]) + }; + + RemoteSession[] remoteSessions = localSessionManager.newSessions(remoteNodes); + + // 生成请求对象 + LoadMessage loadMessage = new StpTest.StpLoadMessage(localHost + ":" + localPort); + + // 异步发送处理过程 + CallBackBarrier callBackBarrier = CallBackBarrier.newCallBackBarrier(remoteSessions.length, 10000); + + // 发送请求至remotes + LinkedList responses = new LinkedList<>(); + for (RemoteSession remoteSession : remoteSessions) { + CallBackDataListener response = remoteSession.asyncRequest(loadMessage, callBackBarrier); + responses.addLast(response); + } + + // 超时判断 + try { + if (callBackBarrier.tryCall()) { + + // 说明结果已经全部返回 + // 打印出所有的结果 + // 通过迭代器遍历链表 + Iterator iterator = responses.iterator(); + while (iterator.hasNext()) { + CallBackDataListener response = iterator.next(); + // 判断是否已完成,对于没有完成的直接放弃(因为已经超时) + if (response.isDone()) { + System.out.printf("Receive Response {%s} {%s} \r\n", + response.remoteNode().toString(), new String(response.getCallBackData())); + } + } + } + Thread.sleep(Integer.MAX_VALUE); + } catch (Exception e) { + e.printStackTrace(); + } + } + } +} \ No newline at end of file