Browse Source

Add Close receiver function

tags/1.0.0
shaozhuguang 5 years ago
parent
commit
a0a68fe933
5 changed files with 231 additions and 1 deletions
  1. +8
    -1
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/ConnectionManager.java
  2. +9
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java
  3. +75
    -0
      source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/StpReceiversBoot.java
  4. +43
    -0
      source/test/test-stp-community/src/test/java/com/jd/blockchain/StpReceiversBootTest.java
  5. +96
    -0
      source/test/test-stp-community/src/test/java/com/jd/blockchain/StpSenderTest.java

+ 8
- 1
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/ConnectionManager.java View File

@@ -8,7 +8,6 @@
*/ */
package com.jd.blockchain.stp.communication.manager; package com.jd.blockchain.stp.communication.manager;


import com.jd.blockchain.stp.communication.MessageExecutor;
import com.jd.blockchain.stp.communication.callback.CallBackLauncher; import com.jd.blockchain.stp.communication.callback.CallBackLauncher;
import com.jd.blockchain.stp.communication.connection.Receiver; import com.jd.blockchain.stp.communication.connection.Receiver;
import com.jd.blockchain.stp.communication.connection.Connection; import com.jd.blockchain.stp.communication.connection.Connection;
@@ -145,6 +144,14 @@ public class ConnectionManager {
return connectionMap.get(remoteNode); return connectionMap.get(remoteNode);
} }


/**
* 关闭Receiver
*
*/
public void closeReceiver() {
this.receiver.close();
}

private Connection init(RemoteNode remoteNode, String messageExecutorClass) { private Connection init(RemoteNode remoteNode, String messageExecutorClass) {


// 初始化Connection // 初始化Connection


+ 9
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java View File

@@ -167,6 +167,15 @@ public class RemoteSessionManager {
return remoteSessionList.toArray(remoteSessions); return remoteSessionList.toArray(remoteSessions);
} }


/**
* 返回底层通信管理器
*
* @return
*/
public ConnectionManager connectionManager() {
return this.connectionManager;
}

private void check() { private void check() {
// 要求端口范围:1~65535,messageExecuteClass不能为null // 要求端口范围:1~65535,messageExecuteClass不能为null
int listenPort = this.localNode.getPort(); int listenPort = this.localNode.getPort();


+ 75
- 0
source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/StpReceiversBoot.java View File

@@ -0,0 +1,75 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.StpReceiversBoot
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/18 下午3:44
* Description:
*/
package com.jd.blockchain.stp.commucation;

import com.jd.blockchain.stp.communication.MessageExecutor;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import com.jd.blockchain.stp.communication.node.LocalNode;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
*
* @author shaozhuguang
* @create 2019/4/18
* @since 1.0.0
*/

public class StpReceiversBoot {

private int[] listenPorts;

private final String remoteHost = "127.0.0.1";

private ExecutorService threadPool;

public StpReceiversBoot(int... ports) {
listenPorts = ports;
threadPool = Executors.newFixedThreadPool(ports.length + 2);
}

public RemoteSessionManager[] start(MessageExecutor messageExecutor) {

final int totalSessionSize = listenPorts.length;

CountDownLatch countDownLatch = new CountDownLatch(totalSessionSize);

RemoteSessionManager[] sessionManagers = new RemoteSessionManager[totalSessionSize];
for (int i = 0; i < totalSessionSize; i++) {
final int port = listenPorts[i], index = i;
threadPool.execute(() -> {
// 创建本地节点
final LocalNode localNode = new LocalNode(remoteHost, port, messageExecutor);
try {
// 启动当前节点
RemoteSessionManager sessionManager = new RemoteSessionManager(localNode);
sessionManagers[index] = sessionManager;
System.out.printf("Current Node {%s} start success !!! \r\n", localNode.toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
}

// 等待所有节点启动完成
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}

return sessionManagers;
}


}

+ 43
- 0
source/test/test-stp-community/src/test/java/com/jd/blockchain/StpReceiversBootTest.java View File

@@ -0,0 +1,43 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.StpReceiversBootTest
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/18 下午3:53
* Description:
*/
package com.jd.blockchain;

import com.jd.blockchain.stp.commucation.MyMessageExecutor;
import com.jd.blockchain.stp.commucation.StpReceiversBoot;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import org.junit.Test;

/**
*
* @author shaozhuguang
* @create 2019/4/18
* @since 1.0.0
*/

public class StpReceiversBootTest {

public static final int[] localPorts = new int[]{9900, 9901};

@Test
public void test() {
StpReceiversBoot stpReceiversBoot = new StpReceiversBoot(9900, 9901);
RemoteSessionManager[] sessionManagers = stpReceiversBoot.start(new MyMessageExecutor());

try {
Thread.sleep(10000);

// 关闭所有的监听器
for (RemoteSessionManager sessionManager : sessionManagers) {
sessionManager.connectionManager().closeReceiver();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

+ 96
- 0
source/test/test-stp-community/src/test/java/com/jd/blockchain/StpSenderTest.java View File

@@ -0,0 +1,96 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.StpSenderTest
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/18 下午3:56
* Description:
*/
package com.jd.blockchain;

import com.jd.blockchain.stp.commucation.MyMessageExecutor;
import com.jd.blockchain.stp.commucation.StpReceiversBoot;
import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.callback.CallBackBarrier;
import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import com.jd.blockchain.stp.communication.message.LoadMessage;
import com.jd.blockchain.stp.communication.node.RemoteNode;
import org.junit.Test;

import java.util.Iterator;
import java.util.LinkedList;

/**
*
* @author shaozhuguang
* @create 2019/4/18
* @since 1.0.0
*/

public class StpSenderTest {

// 本地的端口
private static final int localPort = 9800;

// 连接的远端端口集合
private static final int[] remotePorts = StpReceiversBootTest.localPorts;

// 本地节点信息
private static final String localHost = "127.0.0.1";

@Test
public void test() {
// 首先启动本地节点
StpReceiversBoot stpReceiversBoot = new StpReceiversBoot(localPort);
RemoteSessionManager[] sessionManagers = stpReceiversBoot.start(new MyMessageExecutor());

// 本地节点启动完成后
if (sessionManagers != null && sessionManagers.length > 0) {
RemoteSessionManager localSessionManager = sessionManagers[0];

// 连接远端的两个节点
RemoteNode[] remoteNodes = new RemoteNode[]{
new RemoteNode(localHost, remotePorts[0]),
new RemoteNode(localHost, remotePorts[1])
};

RemoteSession[] remoteSessions = localSessionManager.newSessions(remoteNodes);

// 生成请求对象
LoadMessage loadMessage = new StpTest.StpLoadMessage(localHost + ":" + localPort);

// 异步发送处理过程
CallBackBarrier callBackBarrier = CallBackBarrier.newCallBackBarrier(remoteSessions.length, 10000);

// 发送请求至remotes
LinkedList<CallBackDataListener> responses = new LinkedList<>();
for (RemoteSession remoteSession : remoteSessions) {
CallBackDataListener response = remoteSession.asyncRequest(loadMessage, callBackBarrier);
responses.addLast(response);
}

// 超时判断
try {
if (callBackBarrier.tryCall()) {

// 说明结果已经全部返回
// 打印出所有的结果
// 通过迭代器遍历链表
Iterator<CallBackDataListener> iterator = responses.iterator();
while (iterator.hasNext()) {
CallBackDataListener response = iterator.next();
// 判断是否已完成,对于没有完成的直接放弃(因为已经超时)
if (response.isDone()) {
System.out.printf("Receive Response {%s} {%s} \r\n",
response.remoteNode().toString(), new String(response.getCallBackData()));
}
}
}
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

Loading…
Cancel
Save