Browse Source

Add Explanatory Notes for SFP-Communication Module.

tags/1.0.0
shaozhuguang 5 years ago
parent
commit
a4ae5ab97b
33 changed files with 1100 additions and 259 deletions
  1. +1
    -1
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java
  2. +2
    -2
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java
  3. +4
    -2
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageExecutor.java
  4. +96
    -7
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java
  5. +20
    -2
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackBarrier.java
  6. +44
    -1
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackDataListener.java
  7. +79
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackLauncher.java
  8. +28
    -8
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AbstractAsyncExecutor.java
  9. +13
    -2
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AsyncExecutor.java
  10. +94
    -7
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Connection.java
  11. +43
    -14
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Receiver.java
  12. +25
    -9
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Sender.java
  13. +3
    -1
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverHandler.java
  14. +3
    -1
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverTrigger.java
  15. +1
    -1
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderHandler.java
  16. +3
    -3
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderTrigger.java
  17. +158
    -62
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/ReceiverHandler.java
  18. +1
    -4
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/SenderHandler.java
  19. +63
    -1
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/WatchDogHandler.java
  20. +16
    -3
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/listener/ReplyListener.java
  21. +76
    -18
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/ConnectionManager.java
  22. +122
    -51
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java
  23. +1
    -1
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/AbstractMessage.java
  24. +23
    -4
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/HeartBeatMessage.java
  25. +9
    -1
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/IMessage.java
  26. +6
    -1
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/LoadMessage.java
  27. +34
    -17
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/SessionMessage.java
  28. +32
    -1
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/TransferMessage.java
  29. +55
    -20
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/LocalNode.java
  30. +11
    -1
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/RemoteNode.java
  31. +1
    -1
      source/stp/stp-communication/src/test/java/com/jd/blockchain/SessionMessageTest.java
  32. +2
    -2
      source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java
  33. +31
    -10
      source/test/test-stp-community/src/test/java/com/jd/blockchain/StpTest.java

+ 1
- 1
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java View File

@@ -69,7 +69,7 @@ public class DSTransferProcess {


for (int i = 0; i < remoteSessions.length; i++) { for (int i = 0; i < remoteSessions.length; i++) {
DataSequenceMsgHandle msgHandle = new DataSequenceMsgHandle(dsReader, dsWriter); DataSequenceMsgHandle msgHandle = new DataSequenceMsgHandle(dsReader, dsWriter);
remoteSessions[i].initExecute(msgHandle);
remoteSessions[i].initExecutor(msgHandle);
remoteSessions[i].init(); remoteSessions[i].init();
} }
} }


+ 2
- 2
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java View File

@@ -1,6 +1,6 @@
package com.jd.blockchain.statetransfer; package com.jd.blockchain.statetransfer;


import com.jd.blockchain.stp.communication.MessageExecute;
import com.jd.blockchain.stp.communication.MessageExecutor;
import com.jd.blockchain.stp.communication.RemoteSession; import com.jd.blockchain.stp.communication.RemoteSession;


/** /**
@@ -9,7 +9,7 @@ import com.jd.blockchain.stp.communication.RemoteSession;
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
*/ */
public class DataSequenceMsgHandle implements MessageExecute {
public class DataSequenceMsgHandle implements MessageExecutor {


DataSequenceReader dsReader; DataSequenceReader dsReader;
DataSequenceWriter dsWriter; DataSequenceWriter dsWriter;


source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageExecute.java → source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageExecutor.java View File

@@ -9,13 +9,15 @@
package com.jd.blockchain.stp.communication; package com.jd.blockchain.stp.communication;


/** /**
*
* 消息执行器
* 该执行器由其他应用实现
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
* @date 2019-04-18 15:29
*/ */


public interface MessageExecute {
public interface MessageExecutor {


byte[] receive(String key, byte[] data, RemoteSession session); byte[] receive(String key, byte[] data, RemoteSession session);



+ 96
- 7
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java View File

@@ -13,6 +13,8 @@ import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
import com.jd.blockchain.stp.communication.connection.Connection; import com.jd.blockchain.stp.communication.connection.Connection;
import com.jd.blockchain.stp.communication.message.LoadMessage; import com.jd.blockchain.stp.communication.message.LoadMessage;


import java.util.concurrent.TimeUnit;



/** /**
* *
@@ -23,38 +25,125 @@ import com.jd.blockchain.stp.communication.message.LoadMessage;


public class RemoteSession { public class RemoteSession {


/**
* 远端节点ID
*/
private String id; private String id;


/**
* 远端连接
*/
private Connection connection; private Connection connection;


private MessageExecute messageExecute;
/**
* 对应远端节点消息的处理器
* 该处理器若为NULL,则使用当前节点默认处理器
*/
private MessageExecutor messageExecutor;


/**
* 构造器
* @param id
* 远端节点ID
* @param connection
* 对应连接
*/
public RemoteSession(String id, Connection connection) {
this(id, connection, null);
}


public RemoteSession(String id, Connection connection, MessageExecute messageExecute) {
/**
* 构造器
* @param id
* 远端ID
* @param connection
* 对应连接
* @param messageExecutor
* 对应远端消息处理器
*/
public RemoteSession(String id, Connection connection, MessageExecutor messageExecutor) {
this.id = id; this.id = id;
this.connection = connection; this.connection = connection;
this.messageExecute = messageExecute;
this.messageExecutor = messageExecutor;
} }


public void init() { public void init() {
connection.initSession(this); connection.initSession(this);
} }


public void initExecute(MessageExecute messageExecute) {
this.messageExecute = messageExecute;
public void initExecutor(MessageExecutor messageExecutor) {
this.messageExecutor = messageExecutor;
} }


/**
* 同步请求
* 该请求会阻塞原线程
*
* @param loadMessage
* 要请求的负载消息
* @return
* 应答,直到有消息应答或出现异常
* @throws Exception
*/
public byte[] request(LoadMessage loadMessage) throws Exception { public byte[] request(LoadMessage loadMessage) throws Exception {
return this.connection.request(this.id, loadMessage, null).getCallBackData(); return this.connection.request(this.id, loadMessage, null).getCallBackData();
} }


/**
* 同步请求
* 该请求会阻塞原线程
*
* @param loadMessage
* 要请求的负载消息
* @param time
* 请求的最长等待时间
* @param timeUnit
* 请求的最长等待单位
* @return
* 应答,直到有消息或时间截止或出现异常
* @throws Exception
*/
public byte[] request(LoadMessage loadMessage, long time, TimeUnit timeUnit) throws Exception {
return this.connection.request(this.id, loadMessage, null).getCallBackData(time, timeUnit);
}

/**
* 异步请求
* 不会阻塞调用线程
*
* @param loadMessage
* 要发送的负载消息
* @return
* 应答,需要调用者从Listener中获取结果
*/
public CallBackDataListener asyncRequest(LoadMessage loadMessage) { public CallBackDataListener asyncRequest(LoadMessage loadMessage) {
return asyncRequest(loadMessage, null); return asyncRequest(loadMessage, null);
} }


/**
* 异步请求
* 不会阻塞调用线程
*
* @param loadMessage
* 要请求的负载消息
* @param callBackBarrier
* 回调栅栏(用于多个请求时进行统一阻拦)
* @return
* 应答,需要调用者从Listener中获取结果
*/
public CallBackDataListener asyncRequest(LoadMessage loadMessage, CallBackBarrier callBackBarrier) { public CallBackDataListener asyncRequest(LoadMessage loadMessage, CallBackBarrier callBackBarrier) {
return this.connection.request(this.id, loadMessage, callBackBarrier); return this.connection.request(this.id, loadMessage, callBackBarrier);
} }


/**
* 应答
*
* @param key
* 请求消息的Key
* @param loadMessage
* 需要应答的负载消息
*/
public void reply(String key, LoadMessage loadMessage) { public void reply(String key, LoadMessage loadMessage) {
this.connection.reply(this.id, key, loadMessage); this.connection.reply(this.id, key, loadMessage);
} }
@@ -75,7 +164,7 @@ public class RemoteSession {
return id; return id;
} }


public MessageExecute messageExecute() {
return this.messageExecute;
public MessageExecutor messageExecutor() {
return this.messageExecutor;
} }
} }

+ 20
- 2
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackBarrier.java View File

@@ -8,12 +8,12 @@
*/ */
package com.jd.blockchain.stp.communication.callback; package com.jd.blockchain.stp.communication.callback;


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


/** /**
*
* 回调栅栏
* 用于对批量请求的应答回调处理
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/12 * @create 2019/4/12
* @since 1.0.0 * @since 1.0.0
@@ -23,12 +23,30 @@ public class CallBackBarrier {


private CountDownLatch countDownLatch; private CountDownLatch countDownLatch;


/**
* 默认最大尝试调用时间(单位:毫秒)
*/
private long maxTryCallMillSeconds = 2000; private long maxTryCallMillSeconds = 2000;



/**
* 静态构造器
* @param barrierLength
* 请求的远端数量
* @return
*/
public static final CallBackBarrier newCallBackBarrier(int barrierLength) { public static final CallBackBarrier newCallBackBarrier(int barrierLength) {
return new CallBackBarrier(barrierLength); return new CallBackBarrier(barrierLength);
} }


/**
* 静态构造器
* @param barrierLength
* 请求的远端数量
* @param maxTryCallMillSeconds
* 最大尝试的时间,单位:毫秒
* @return
*/
public static final CallBackBarrier newCallBackBarrier(int barrierLength, long maxTryCallMillSeconds) { public static final CallBackBarrier newCallBackBarrier(int barrierLength, long maxTryCallMillSeconds) {
return new CallBackBarrier(barrierLength, maxTryCallMillSeconds); return new CallBackBarrier(barrierLength, maxTryCallMillSeconds);
} }


+ 44
- 1
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackDataListener.java View File

@@ -18,7 +18,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;


/** /**
*
* 数据回调监听器
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/15 * @create 2019/4/15
* @since 1.0.0 * @since 1.0.0
@@ -26,30 +26,69 @@ import java.util.concurrent.locks.ReentrantLock;


public class CallBackDataListener { public class CallBackDataListener {


/**
* Future
*/
private CompletableFuture<byte[]> future = new CompletableFuture<>(); private CompletableFuture<byte[]> future = new CompletableFuture<>();


/**
* 远端节点
*/
private RemoteNode remoteNode; private RemoteNode remoteNode;


private boolean isFill = false; private boolean isFill = false;


private Lock lock = new ReentrantLock(); private Lock lock = new ReentrantLock();



/**
* 构造器
* @param remoteNode
* 远端节点信息
*/
public CallBackDataListener(RemoteNode remoteNode) { public CallBackDataListener(RemoteNode remoteNode) {
this.remoteNode = remoteNode; this.remoteNode = remoteNode;
} }


/**
* 获取返回的数据
* 调用该方法会阻塞当前线程,直到有数据返回或出现异常
* @return
* 应答结果
* @throws InterruptedException
* @throws ExecutionException
*/
public byte[] getCallBackData() throws InterruptedException, ExecutionException { public byte[] getCallBackData() throws InterruptedException, ExecutionException {
return future.get(); return future.get();
} }


/**
* 指定时间内获取返回的数据
* 调用该方法会阻塞当前线程,直到时间到达或有数据返回或出现异常
* @param time
* 超时时间
* @param timeUnit
* 超时单位
* @return
* 应答结果,若指定时间内没有数据,则返回null
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
public byte[] getCallBackData(long time, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { public byte[] getCallBackData(long time, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
return future.get(time, timeUnit); return future.get(time, timeUnit);
} }


/**
* 设置返回的数据
* @param data
*/
public void setCallBackData(byte[] data) { public void setCallBackData(byte[] data) {
// 防止数据多次设置
if (!isFill) { if (!isFill) {
try { try {
lock.lock(); lock.lock();
// Double Check
if (!isFill) { if (!isFill) {
future.complete(data); future.complete(data);
isFill = true; isFill = true;
@@ -64,6 +103,10 @@ public class CallBackDataListener {
return this.remoteNode; return this.remoteNode;
} }


/**
* 判断是否异步操作完成
* @return
*/
public boolean isDone() { public boolean isDone() {
return future.isDone(); return future.isDone();
} }

+ 79
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackLauncher.java View File

@@ -0,0 +1,79 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.callback.CallBackLauncher
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/17 下午6:27
* Description:
*/
package com.jd.blockchain.stp.communication.callback;

import java.util.concurrent.Semaphore;

/**
* 启动器回调
* @author shaozhuguang
* @create 2019/4/17
* @since 1.0.0
*/

public class CallBackLauncher {

/**
* 是否启动成功
*/
private boolean isBootSuccess = false;

/**
* 信号量
*/
private Semaphore isBooted = new Semaphore(0, true);

/**
* 异常
*/
private Exception exception;

/**
* 标识当前启动成功
*/
public void bootSuccess() {
isBootSuccess = true;
release();
}

/**
* 标识当前启动失败
* @param e
* 导致失败的异常信息
*/
public void bootFail(Exception e) {
this.exception = e;
isBootSuccess = false;
release();
}

/**
* 等待启动完成
* 调用该方法会阻塞当前线程,知道启动完成或发生异常
* @return
* 当前对象
* @throws InterruptedException
*/
public CallBackLauncher waitingBooted() throws InterruptedException {
this.isBooted.acquire();
return this;
}

public boolean isBootSuccess() {
return isBootSuccess;
}

public Exception exception() {
return exception;
}

private void release() {
this.isBooted.release();
}
}

+ 28
- 8
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AbstractAsyncExecutor.java View File

@@ -9,11 +9,12 @@
package com.jd.blockchain.stp.communication.connection; package com.jd.blockchain.stp.communication.connection;


import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.jd.blockchain.stp.communication.callback.CallBackLauncher;


import java.util.concurrent.*; import java.util.concurrent.*;


/** /**
*
* 抽象异步执行器
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/17 * @create 2019/4/17
* @since 1.0.0 * @since 1.0.0
@@ -21,19 +22,27 @@ import java.util.concurrent.*;


public abstract class AbstractAsyncExecutor implements AsyncExecutor{ public abstract class AbstractAsyncExecutor implements AsyncExecutor{


/**
* 线程池可处理队列的容量
*/
private static final int QUEUE_CAPACITY = 1024; private static final int QUEUE_CAPACITY = 1024;


protected final Semaphore isStarted = new Semaphore(0, true);

protected boolean isStartSuccess = false;
/**
* 回调执行器
*/
protected final CallBackLauncher callBackLauncher = new CallBackLauncher();


/**
* 默认提供的初始化活跃线程调度器
* @return
*/
@Override @Override
public ThreadPoolExecutor initRunThread() { public ThreadPoolExecutor initRunThread() {
ThreadFactory timerFactory = new ThreadFactoryBuilder() ThreadFactory timerFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNameFormat()).build(); .setNameFormat(threadNameFormat()).build();


ThreadPoolExecutor runThread = new ThreadPoolExecutor(1, 1, ThreadPoolExecutor runThread = new ThreadPoolExecutor(1, 1,
0, TimeUnit.MILLISECONDS,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_CAPACITY), new LinkedBlockingQueue<>(QUEUE_CAPACITY),
timerFactory, timerFactory,
new ThreadPoolExecutor.AbortPolicy()); new ThreadPoolExecutor.AbortPolicy());
@@ -41,11 +50,22 @@ public abstract class AbstractAsyncExecutor implements AsyncExecutor{
return runThread; return runThread;
} }


/**
* 启动完成后回调
* 该调用会阻塞当前线程,直到启动完成,无论是成功或失败
* @return
* 回调执行器
* 成功或失败会在回调执行器中有所体现
* @throws InterruptedException
*/
@Override @Override
public boolean waitStarted() throws InterruptedException {
this.isStarted.acquire();
return this.isStartSuccess;
public CallBackLauncher waitBooted() throws InterruptedException {
return callBackLauncher.waitingBooted();
} }


/**
* 线程池中的线程命名格式
* @return
*/
public abstract String threadNameFormat(); public abstract String threadNameFormat();
} }

+ 13
- 2
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AsyncExecutor.java View File

@@ -8,10 +8,12 @@
*/ */
package com.jd.blockchain.stp.communication.connection; package com.jd.blockchain.stp.communication.connection;


import com.jd.blockchain.stp.communication.callback.CallBackLauncher;

import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;


/** /**
*
* 异步执行器接口
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/17 * @create 2019/4/17
* @since 1.0.0 * @since 1.0.0
@@ -19,7 +21,16 @@ import java.util.concurrent.ThreadPoolExecutor;


public interface AsyncExecutor { public interface AsyncExecutor {


/**
* 初始化运行线程
* @return
*/
ThreadPoolExecutor initRunThread(); ThreadPoolExecutor initRunThread();


boolean waitStarted() throws InterruptedException;
/**
* 启动完成后返回调度执行器
* @return
* @throws InterruptedException
*/
CallBackLauncher waitBooted() throws InterruptedException;
} }

+ 94
- 7
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Connection.java View File

@@ -11,6 +11,7 @@ package com.jd.blockchain.stp.communication.connection;
import com.jd.blockchain.stp.communication.RemoteSession; import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.callback.CallBackBarrier; import com.jd.blockchain.stp.communication.callback.CallBackBarrier;
import com.jd.blockchain.stp.communication.callback.CallBackDataListener; import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
import com.jd.blockchain.stp.communication.callback.CallBackLauncher;
import com.jd.blockchain.stp.communication.connection.listener.ReplyListener; import com.jd.blockchain.stp.communication.connection.listener.ReplyListener;
import com.jd.blockchain.stp.communication.message.LoadMessage; import com.jd.blockchain.stp.communication.message.LoadMessage;
import com.jd.blockchain.stp.communication.message.SessionMessage; import com.jd.blockchain.stp.communication.message.SessionMessage;
@@ -21,35 +22,85 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;


/** /**
*
* 统一连接对象
* 该对象中有两个对象Receiver和Sender
* Receiver为复用对象(每个端口监听产生的Receiver只有一个)
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
* @date 2019-04-18 14:49
*/ */

public class Connection { public class Connection {


/**
* 远端节点
*/
private RemoteNode remoteNode; private RemoteNode remoteNode;


/**
* 接收器
*/
private Receiver receiver; private Receiver receiver;


/**
* 发送器
*/
private Sender sender; private Sender sender;


/**
* 构造器
*
* @param receiver
*/
public Connection(Receiver receiver) { public Connection(Receiver receiver) {
this.receiver = receiver; this.receiver = receiver;
} }


/**
* 初始化RemoteSession
*
* @param remoteSession
*/
public void initSession(RemoteSession remoteSession) { public void initSession(RemoteSession remoteSession) {
this.receiver.initRemoteSession(remoteSession.sessionId(), remoteSession); this.receiver.initRemoteSession(remoteSession.sessionId(), remoteSession);
} }


public boolean connect(RemoteNode remoteNode, String messageExecuteClass) throws InterruptedException {
/**
* 连接远端
*
* @param remoteNode
* 远端节点
* @param messageExecutorClass
* 希望远端节点处理本地节点消息时的消息处理器
* @return
* 回调执行器
* @throws InterruptedException
*/
public CallBackLauncher connect(RemoteNode remoteNode, String messageExecutorClass) throws InterruptedException {
this.remoteNode = remoteNode; this.remoteNode = remoteNode;
this.sender = new Sender(this.remoteNode, sessionMessage(messageExecuteClass));
this.sender = new Sender(this.remoteNode, sessionMessage(messageExecutorClass));
this.sender.connect(); this.sender.connect();
return this.sender.waitStarted();
return this.sender.waitBooted();
} }


/**
* 发送请求
*
* 处理过程简述如下:
* 1、生成底层消息(TransferMessage),其中消息类型为请求,用于描述本次发送的消息是用于请求应答;
* 2、根据消息的唯一Key,生成listenKey,并生成应答监听器
* 3、将应答监听器添加到Receiver中(Receiver中是以Map存储)
* 4、调用Sender发送消息至对端节点
* 5、返回应答监听器的回调数据监听对象
*
* @param sessionId
* 当前SessionId
* @param loadMessage
* 载体消息
* @param callBackBarrier
* 回调栅栏
* @return
*/
public CallBackDataListener request(String sessionId, LoadMessage loadMessage, CallBackBarrier callBackBarrier) { public CallBackDataListener request(String sessionId, LoadMessage loadMessage, CallBackBarrier callBackBarrier) {


TransferMessage transferMessage = transferMessage(sessionId, null, loadMessage, TransferMessage.MESSAGE_TYPE.TYPE_REQUEST); TransferMessage transferMessage = transferMessage(sessionId, null, loadMessage, TransferMessage.MESSAGE_TYPE.TYPE_REQUEST);
@@ -69,6 +120,16 @@ public class Connection {
return replyListener.callBackDataListener(); return replyListener.callBackDataListener();
} }


/**
* 发送应答
*
* @param sessionId
* 当前SessionID
* @param key
* 请求消息的Key,用于描述对应的请求
* @param loadMessage
* 应答的载体消息
*/
public void reply(String sessionId, String key, LoadMessage loadMessage) { public void reply(String sessionId, String key, LoadMessage loadMessage) {
TransferMessage transferMessage = transferMessage(sessionId, key, loadMessage, TransferMessage.MESSAGE_TYPE.TYPE_RESPONSE); TransferMessage transferMessage = transferMessage(sessionId, key, loadMessage, TransferMessage.MESSAGE_TYPE.TYPE_RESPONSE);


@@ -76,6 +137,12 @@ public class Connection {
this.sender.send(transferMessage); this.sender.send(transferMessage);
} }


/**
* 生成载体消息的Key
*
* @param loadMessage
* @return
*/
private String loadKey(LoadMessage loadMessage) { private String loadKey(LoadMessage loadMessage) {
// 使用Sha256求Hash // 使用Sha256求Hash
byte[] sha256Bytes = DigestUtils.sha256(loadMessage.toBytes()); byte[] sha256Bytes = DigestUtils.sha256(loadMessage.toBytes());
@@ -83,6 +150,19 @@ public class Connection {
return Base64.encodeBase64String(sha256Bytes); return Base64.encodeBase64String(sha256Bytes);
} }


/**
* 生成TransferMessage
*
* @param sessionId
* 节点ID
* @param key
* 消息Key
* @param loadMessage
* 载体消息
* @param messageType
* 消息类型
* @return
*/
private TransferMessage transferMessage(String sessionId, String key, LoadMessage loadMessage, TransferMessage.MESSAGE_TYPE messageType) { private TransferMessage transferMessage(String sessionId, String key, LoadMessage loadMessage, TransferMessage.MESSAGE_TYPE messageType) {


if (key == null || key.length() == 0) { if (key == null || key.length() == 0) {
@@ -95,12 +175,19 @@ public class Connection {
return transferMessage; return transferMessage;
} }


private SessionMessage sessionMessage(String messageExecuteClass) {
/**
* 生成SessionMessage
*
* @param messageExecutorClass
*
* @return
*/
private SessionMessage sessionMessage(String messageExecutorClass) {


LocalNode localNode = this.receiver.localNode(); LocalNode localNode = this.receiver.localNode();


SessionMessage sessionMessage = new SessionMessage( SessionMessage sessionMessage = new SessionMessage(
localNode.getHostName(), localNode.getPort(), messageExecuteClass);
localNode.getHostName(), localNode.getPort(), messageExecutorClass);


return sessionMessage; return sessionMessage;
} }


+ 43
- 14
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Receiver.java View File

@@ -14,7 +14,6 @@ import com.jd.blockchain.stp.communication.connection.handler.HeartBeatReceiverT
import com.jd.blockchain.stp.communication.connection.handler.ReceiverHandler; import com.jd.blockchain.stp.communication.connection.handler.ReceiverHandler;
import com.jd.blockchain.stp.communication.connection.listener.ReplyListener; import com.jd.blockchain.stp.communication.connection.listener.ReplyListener;
import com.jd.blockchain.stp.communication.manager.ConnectionManager; import com.jd.blockchain.stp.communication.manager.ConnectionManager;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import com.jd.blockchain.stp.communication.node.LocalNode; import com.jd.blockchain.stp.communication.node.LocalNode;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@@ -26,19 +25,14 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;


import java.io.Closeable; import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.*;


/** /**
*
* 接收器
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
@@ -46,18 +40,33 @@ import java.util.concurrent.*;


public class Receiver extends AbstractAsyncExecutor implements Closeable { public class Receiver extends AbstractAsyncExecutor implements Closeable {


/**
* Netty中的BOSS线程
*/
private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup bossGroup = new NioEventLoopGroup();


/**
* Netty中的Worker线程
*/
private final EventLoopGroup workerGroup = new NioEventLoopGroup(); private final EventLoopGroup workerGroup = new NioEventLoopGroup();


/**
* 本地节点
*/
private LocalNode localNode; private LocalNode localNode;


/**
* 消息接收Handler
*/
private ReceiverHandler receiverHandler; private ReceiverHandler receiverHandler;


public Receiver(LocalNode localNode) { public Receiver(LocalNode localNode) {
this.localNode = localNode; this.localNode = localNode;
} }


/**
* 启动监听
*/
public void startListen() { public void startListen() {
ServerBootstrap bootstrap = new ServerBootstrap(); ServerBootstrap bootstrap = new ServerBootstrap();


@@ -80,14 +89,14 @@ public class Receiver extends AbstractAsyncExecutor implements Closeable {
} }
}); });


// 由单独的线程启动
// 由单独的线程启动,防止外部调用线程阻塞
ThreadPoolExecutor runThread = initRunThread(); ThreadPoolExecutor runThread = initRunThread();
runThread.execute(() -> { runThread.execute(() -> {
try { try {
ChannelFuture f = bootstrap.bind().sync(); ChannelFuture f = bootstrap.bind().sync();
super.isStartSuccess = f.isSuccess();
super.isStarted.release();
if (super.isStartSuccess) {
boolean isStartSuccess = f.isSuccess();
if (isStartSuccess) {
super.callBackLauncher.bootSuccess();
// 启动成功 // 启动成功
f.channel().closeFuture().sync(); f.channel().closeFuture().sync();
} else { } else {
@@ -95,7 +104,7 @@ public class Receiver extends AbstractAsyncExecutor implements Closeable {
throw new Exception("Receiver start fail :" + f.cause().getMessage() + " !!!"); throw new Exception("Receiver start fail :" + f.cause().getMessage() + " !!!");
} }
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e);
super.callBackLauncher.bootFail(e);
} finally { } finally {
close(); close();
} }
@@ -107,14 +116,34 @@ public class Receiver extends AbstractAsyncExecutor implements Closeable {
return "receiver-pool-%d"; return "receiver-pool-%d";
} }


public void initReceiverHandler(ConnectionManager connectionManager, String messageExecuteClass) {
receiverHandler = new ReceiverHandler(connectionManager, messageExecuteClass);
/**
* 初始化ReceiverHandler
*
* @param connectionManager
* 连接管理器
* @param messageExecutorClass
* 当前节点的消息处理Class
*/
public void initReceiverHandler(ConnectionManager connectionManager, String messageExecutorClass) {
receiverHandler = new ReceiverHandler(connectionManager, messageExecutorClass, this.localNode.defaultMessageExecutor());
} }


/**
* 初始化远端Session
*
* @param sessionId
*
* @param remoteSession
*/
public void initRemoteSession(String sessionId, RemoteSession remoteSession) { public void initRemoteSession(String sessionId, RemoteSession remoteSession) {
receiverHandler.putRemoteSession(sessionId, remoteSession); receiverHandler.putRemoteSession(sessionId, remoteSession);
} }


/**
* 添加监听器
*
* @param replyListener
*/
public void addListener(ReplyListener replyListener) { public void addListener(ReplyListener replyListener) {
receiverHandler.addListener(replyListener); receiverHandler.addListener(replyListener);
} }


+ 25
- 9
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Sender.java View File

@@ -13,30 +13,27 @@ import com.jd.blockchain.stp.communication.message.IMessage;
import com.jd.blockchain.stp.communication.message.SessionMessage; import com.jd.blockchain.stp.communication.message.SessionMessage;
import com.jd.blockchain.stp.communication.node.RemoteNode; import com.jd.blockchain.stp.communication.node.RemoteNode;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;


import java.io.Closeable; import java.io.Closeable;
import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;


/** /**
* 发送器
* *
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
* @date 2019-04-18 15:08
*/ */

public class Sender extends AbstractAsyncExecutor implements Closeable { public class Sender extends AbstractAsyncExecutor implements Closeable {


private final EventLoopGroup loopGroup = new NioEventLoopGroup(); private final EventLoopGroup loopGroup = new NioEventLoopGroup();
@@ -45,12 +42,24 @@ public class Sender extends AbstractAsyncExecutor implements Closeable {


private ChannelFuture channelFuture; private ChannelFuture channelFuture;


/**
* 当前节点的SessionMessage
*/
private SessionMessage sessionMessage; private SessionMessage sessionMessage;


/**
* 远端HOST
*/
private String remoteHost; private String remoteHost;


/**
* 远端端口
*/
private int remotePort; private int remotePort;


/**
* 监听Handler(重连Handler)
*/
private WatchDogHandler watchDogHandler; private WatchDogHandler watchDogHandler;


public Sender(RemoteNode remoteNode, SessionMessage sessionMessage) { public Sender(RemoteNode remoteNode, SessionMessage sessionMessage) {
@@ -61,6 +70,9 @@ public class Sender extends AbstractAsyncExecutor implements Closeable {
init(remoteHost, remotePort, sessionMessage); init(remoteHost, remotePort, sessionMessage);
} }


/**
* 连接
*/
public void connect() { public void connect() {
watchDogHandler = new WatchDogHandler(this.remoteHost, this.remotePort, bootstrap); watchDogHandler = new WatchDogHandler(this.remoteHost, this.remotePort, bootstrap);


@@ -93,13 +105,13 @@ public class Sender extends AbstractAsyncExecutor implements Closeable {
try { try {
// 发起连接请求 // 发起连接请求
channelFuture = bootstrap.connect(this.remoteHost, this.remotePort).sync(); channelFuture = bootstrap.connect(this.remoteHost, this.remotePort).sync();

isStartSuccess = channelFuture.isSuccess();
isStarted.release();
boolean isStartSuccess = channelFuture.isSuccess();
if (isStartSuccess) { if (isStartSuccess) {
// 启动成功 // 启动成功
// 设置ChannelFuture对象,以便于发送的连接状态处理 // 设置ChannelFuture对象,以便于发送的连接状态处理
watchDogHandler.initChannelFuture(channelFuture); watchDogHandler.initChannelFuture(channelFuture);
// 释放等待
super.callBackLauncher.bootSuccess();
// 等待客户端关闭连接 // 等待客户端关闭连接
channelFuture.channel().closeFuture().sync(); channelFuture.channel().closeFuture().sync();
} else { } else {
@@ -107,7 +119,7 @@ public class Sender extends AbstractAsyncExecutor implements Closeable {
throw new Exception("Sender start fail :" + channelFuture.cause().getMessage() + " !!!"); throw new Exception("Sender start fail :" + channelFuture.cause().getMessage() + " !!!");
} }
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e);
super.callBackLauncher.bootFail(e);
} finally { } finally {
close(); close();
} }
@@ -135,6 +147,10 @@ public class Sender extends AbstractAsyncExecutor implements Closeable {
return "sender-pool-%d"; return "sender-pool-%d";
} }


/**
* 发送消息
* @param message
*/
public void send(IMessage message) { public void send(IMessage message) {
watchDogHandler.channelFuture().channel().writeAndFlush(message.toTransferByteBuf()); watchDogHandler.channelFuture().channel().writeAndFlush(message.toTransferByteBuf());
} }


+ 3
- 1
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverHandler.java View File

@@ -14,7 +14,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;


/** /**
*
* 心跳接收Handler
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/15 * @create 2019/4/15
* @since 1.0.0 * @since 1.0.0
@@ -24,11 +24,13 @@ public class HeartBeatReceiverHandler extends ChannelInboundHandlerAdapter {


@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断当前收到的信息是否为心跳信息
if (HeartBeatMessage.isHeartBeat(msg)) { if (HeartBeatMessage.isHeartBeat(msg)) {
// 收到的消息是心跳消息,此时需要回复一个心跳消息 // 收到的消息是心跳消息,此时需要回复一个心跳消息
HeartBeatMessage.write(ctx); HeartBeatMessage.write(ctx);
System.out.println("Receive HeartBeat Request Message -> " + msg.toString()); System.out.println("Receive HeartBeat Request Message -> " + msg.toString());
} else { } else {
// 非心跳信息的情况下交由其他Handler继续处理
super.channelRead(ctx, msg); super.channelRead(ctx, msg);
} }
} }


+ 3
- 1
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverTrigger.java View File

@@ -15,7 +15,7 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;


/** /**
*
* 心跳接收触发器
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/15 * @create 2019/4/15
* @since 1.0.0 * @since 1.0.0
@@ -28,12 +28,14 @@ public class HeartBeatReceiverTrigger extends ChannelInboundHandlerAdapter {
// 服务端只会接收心跳数据后应答,而不会主动应答 // 服务端只会接收心跳数据后应答,而不会主动应答
if (evt instanceof IdleStateEvent) { if (evt instanceof IdleStateEvent) {
IdleState idleState = ((IdleStateEvent) evt).state(); IdleState idleState = ((IdleStateEvent) evt).state();
// 读请求超时表示很久没有收到客户端请求
if (idleState.equals(IdleState.READER_IDLE)) { if (idleState.equals(IdleState.READER_IDLE)) {
// 长时间未收到客户端请求,则关闭连接 // 长时间未收到客户端请求,则关闭连接
System.out.println("Long Time UnReceive HeartBeat Request, Close Connection !!!"); System.out.println("Long Time UnReceive HeartBeat Request, Close Connection !!!");
ctx.close(); ctx.close();
} }
} else { } else {
// 非空闲状态事件,由其他Handler处理
super.userEventTriggered(ctx, evt); super.userEventTriggered(ctx, evt);
} }
} }

+ 1
- 1
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderHandler.java View File

@@ -14,7 +14,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;


/** /**
*
* 心跳发送Handler
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/15 * @create 2019/4/15
* @since 1.0.0 * @since 1.0.0


+ 3
- 3
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderTrigger.java View File

@@ -16,7 +16,7 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;


/** /**
*
* 心跳发送触发器
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/15 * @create 2019/4/15
* @since 1.0.0 * @since 1.0.0
@@ -27,7 +27,7 @@ public class HeartBeatSenderTrigger extends ChannelInboundHandlerAdapter {
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {


// 心跳事件
// 心跳事件(状态空闲事件)
if (evt instanceof IdleStateEvent) { if (evt instanceof IdleStateEvent) {
IdleState idleState = ((IdleStateEvent) evt).state(); IdleState idleState = ((IdleStateEvent) evt).state();
if (idleState.equals(IdleState.READER_IDLE)) { if (idleState.equals(IdleState.READER_IDLE)) {
@@ -40,7 +40,7 @@ public class HeartBeatSenderTrigger extends ChannelInboundHandlerAdapter {
System.out.println("Read TimeOut Trigger, Send HeartBeat Request !!!"); System.out.println("Read TimeOut Trigger, Send HeartBeat Request !!!");
HeartBeatMessage.write(ctx); HeartBeatMessage.write(ctx);
} }
// 还有一种情况是读写超时,该情况暂不处理
// TODO 还有一种情况是读写超时,该情况暂不处理
} else { } else {
super.userEventTriggered(ctx, evt); super.userEventTriggered(ctx, evt);
} }


+ 158
- 62
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/ReceiverHandler.java View File

@@ -9,7 +9,7 @@
package com.jd.blockchain.stp.communication.connection.handler; package com.jd.blockchain.stp.communication.connection.handler;


import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.jd.blockchain.stp.communication.MessageExecute;
import com.jd.blockchain.stp.communication.MessageExecutor;
import com.jd.blockchain.stp.communication.RemoteSession; import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.connection.Connection; import com.jd.blockchain.stp.communication.connection.Connection;
import com.jd.blockchain.stp.communication.connection.listener.ReplyListener; import com.jd.blockchain.stp.communication.connection.listener.ReplyListener;
@@ -28,7 +28,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;


/** /**
*
* 接收者消息处理Handler
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/12 * @create 2019/4/12
* @since 1.0.0 * @since 1.0.0
@@ -36,25 +36,62 @@ import java.util.concurrent.locks.ReentrantLock;
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Closeable { public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Closeable {


// 队列的最大容量为256K(防止队列溢出)
/**
* 队列的最大容量设置,默认为256K(防止队列溢出)
*/
private static final int QUEUE_CAPACITY = 256 * 1024; private static final int QUEUE_CAPACITY = 256 * 1024;


/**
* 远端RemoteSession信息集合
* Key为SessionId
* Sender发送的消息中会携带SessionId
* ReceiverHandler会根据不同的SessionId采用不同的MessageExecutor处理策略
*/
private final Map<String, RemoteSession> remoteSessions = new ConcurrentHashMap<>(); private final Map<String, RemoteSession> remoteSessions = new ConcurrentHashMap<>();


/**
* 监听器集合
* 对应Sender在发送请求之前会设置ReplyListener
* Key为每个请求消息的Hash,用于描述消息的唯一性
* 应答一方会在应答中加入对应的key,用于消息的映射
*/
private final Map<String, ReplyListener> allReplyListeners = new ConcurrentHashMap<>(); private final Map<String, ReplyListener> allReplyListeners = new ConcurrentHashMap<>();


private final Lock lock = new ReentrantLock();

private String messageExecuteClass;

/**
* session控制锁
* 用于防止对统一RemoteSession对象进行重复设置
*/
private final Lock sessionLock = new ReentrantLock();

/**
* 当前节点(本地节点)的消息处理器对应Class
* 该信息用于发送至其他节点,向其他节点通知遇到本节点请求时该如何处理
*/
private String localMsgExecutorClass;

/**
* 连接控制器,用于与远端节点连接
*/
private ConnectionManager connectionManager; private ConnectionManager connectionManager;


private ExecutorService msgExecutePool;
/**
* 消息处理执行线程池
* 防止执行内容过长,导致阻塞
*/
private ExecutorService msgExecutorPool;


public ReceiverHandler(ConnectionManager connectionManager, String messageExecuteClass) {
/**
* 默认消息处理器
* 当对应session获取到的RemoteSession中没有获取到指定MessageExecutor时,短时间内由其进行处理
*/
private MessageExecutor defaultMessageExecutor;

public ReceiverHandler(ConnectionManager connectionManager, String localMsgExecutorClass,
MessageExecutor defaultMessageExecutor) {
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
this.messageExecuteClass = messageExecuteClass;
init();
this.localMsgExecutorClass = localMsgExecutorClass;
this.defaultMessageExecutor = defaultMessageExecutor;
initMsgExecutorPool();
} }


public void putRemoteSession(String sessionId, RemoteSession remoteSession) { public void putRemoteSession(String sessionId, RemoteSession remoteSession) {
@@ -75,10 +112,10 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo
System.out.println("Receive Biz Message -> " + msg.toString()); System.out.println("Receive Biz Message -> " + msg.toString());
// 有数据接入 // 有数据接入
// 首先判断数据是否TransferMessage,当前Handler不处理非TransferMessage // 首先判断数据是否TransferMessage,当前Handler不处理非TransferMessage
TransferMessage tm = TransferMessage.toTransferMessageObj(msg);
TransferMessage tm = TransferMessage.toTransferMessage(msg);
if (tm == null) { if (tm == null) {
// 判断是否是SessionMessage // 判断是否是SessionMessage
SessionMessage sm = SessionMessage.toNodeSessionMessage(msg);
SessionMessage sm = SessionMessage.toSessionMessage(msg);
if (sm != null) { if (sm != null) {
executeSessionMessage(sm); executeSessionMessage(sm);
} else { } else {
@@ -106,36 +143,48 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo
ctx.close(); ctx.close();
} }


// 防止消息的处理过程阻塞主进程
/**
* 处理请求消息
*
* @param transferMessage
* 接收到的请求消息
*/
private void executeRequest(final TransferMessage transferMessage) { private void executeRequest(final TransferMessage transferMessage) {
msgExecutePool.execute(() -> {
msgExecutorPool.execute(() -> {
RemoteSession remoteSession = remoteSessions.get(transferMessage.getSessionId()); RemoteSession remoteSession = remoteSessions.get(transferMessage.getSessionId());
if (remoteSession != null) { if (remoteSession != null) {
MessageExecute messageExecute = remoteSession.messageExecute();
if (messageExecute != null) {
MessageExecute.REPLY replyType = messageExecute.replyType();
if (replyType != null) {
switch (messageExecute.replyType()) {
case MANUAL:
messageExecute.receive(transferMessage.loadKey(), transferMessage.load(), remoteSession);
break;
case AUTO:
String requestKey = transferMessage.loadKey();
byte[] replyMsg = messageExecute.receive(requestKey, transferMessage.load(), remoteSession);
// 应答
remoteSession.reply(requestKey, () -> replyMsg);
break;
default:
break;
}
MessageExecutor messageExecutor = remoteSession.messageExecutor();
if (messageExecutor == null) {
// 采用默认处理器进行处理
messageExecutor = defaultMessageExecutor;
}
MessageExecutor.REPLY replyType = messageExecutor.replyType();
if (replyType != null) {
switch (replyType) {
case MANUAL:
messageExecutor.receive(transferMessage.loadKey(), transferMessage.load(), remoteSession);
break;
case AUTO:
String requestKey = transferMessage.loadKey();
byte[] replyMsg = messageExecutor.receive(requestKey, transferMessage.load(), remoteSession);
// 应答
remoteSession.reply(requestKey, () -> replyMsg);
break;
default:
break;
} }
} }
} }
}); });
} }


/**
* 处理应答消息
* @param transferMessage
* 接收到的应答消息
*/
private void executeResponse(final TransferMessage transferMessage) { private void executeResponse(final TransferMessage transferMessage) {
msgExecutePool.execute(() -> {
msgExecutorPool.execute(() -> {
// listenKey和msgKey是不一致的 // listenKey和msgKey是不一致的
// msgKey是对消息本身设置key,listenKey是对整个消息(包括session信息) // msgKey是对消息本身设置key,listenKey是对整个消息(包括session信息)
String listenKey = transferMessage.toListenKey(); String listenKey = transferMessage.toListenKey();
@@ -165,52 +214,99 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo
}); });
} }


/**
* 处理SessionMessage
* @param sessionMessage
* 描述Session的消息对象
*/
private void executeSessionMessage(SessionMessage sessionMessage) { private void executeSessionMessage(SessionMessage sessionMessage) {
// 处理SessionMessage // 处理SessionMessage
String sessionId = sessionMessage.sessionId(); String sessionId = sessionMessage.sessionId();
if (sessionId != null && !remoteSessions.containsKey(sessionId)) {

try {
lock.lock();
// 生成对应的MessageExecute对象
String messageExecuteClass = sessionMessage.getMessageExecute();
MessageExecute messageExecute = null;
if (messageExecuteClass != null && messageExecuteClass.length() > 0) {
try {
Class<?> clazz = Class.forName(messageExecuteClass);
messageExecute = (MessageExecute) clazz.newInstance();
} catch (Exception e) {
// TODO 打印日志
e.printStackTrace();
}
}
if (sessionId != null) {
// 对于含有的RemoteSession的Map,需要判断其MessageExecutor是否为NULL
RemoteSession remoteSession = remoteSessions.get(sessionId);
if (remoteSession == null) {
try {
sessionLock.lock();
// 生成对应的MessageExecute对象
String meClass = sessionMessage.getMessageExecutor();
MessageExecutor messageExecutor = initMessageExecutor(meClass);


// 必须保证该对象不为空
if (messageExecute != null) {
// 说明尚未和请求来的客户端建立连接,需要建立连接 // 说明尚未和请求来的客户端建立连接,需要建立连接
Connection remoteConnection = this.connectionManager.connect(new RemoteNode( Connection remoteConnection = this.connectionManager.connect(new RemoteNode(
sessionMessage.getLocalHost(), sessionMessage.getListenPort()), sessionMessage.getLocalHost(), sessionMessage.getListenPort()),
this.messageExecuteClass);
RemoteSession remoteSession = new RemoteSession(sessionId, remoteConnection, messageExecute);
this.localMsgExecutorClass);
// 假设连接失败的话,返回的Connection对象为null,此时不放入Map,等后续再处理
if (remoteConnection != null) {

remoteSession = new RemoteSession(sessionId, remoteConnection, messageExecutor);


// Double check !!!
if (!remoteSessions.containsKey(sessionId)) {
remoteSessions.put(sessionId, remoteSession);
// Double check !!!
if (!remoteSessions.containsKey(sessionId)) {
remoteSessions.put(sessionId, remoteSession);
}
} }
} finally {
sessionLock.unlock();
} }
} finally {
lock.unlock();
} else {
// 需要判断MessageExecutor
MessageExecutor me = remoteSession.messageExecutor();
if (me == null) {
try {
sessionLock.lock();
// Double Check !!!
if (remoteSession.messageExecutor() == null) {
// 表明上次存储的MessageExecutor未创建成功,本次进行更新
String meClass = sessionMessage.getMessageExecutor();
MessageExecutor messageExecutor = initMessageExecutor(meClass);

// 防止NULL将其他的进行覆盖
if (messageExecutor != null) {
remoteSession.initExecutor(messageExecutor);
}
}
} finally {
sessionLock.unlock();
}
}
}
}
}

/**
* 初始化消息执行器
* 根据消息执行器的Class字符串生成对应的消息处理对象
* @param messageExecutorClass
* 消息执行器的Class字符串
* @return
* 对应的消息处理对象,产生任何异常都返回NULL
*/
private MessageExecutor initMessageExecutor(String messageExecutorClass) {
// 生成对应的MessageExecute对象
MessageExecutor messageExecutor = null;
if (messageExecutorClass != null && messageExecutorClass.length() > 0) {
try {
Class<?> clazz = Class.forName(messageExecutorClass);
messageExecutor = (MessageExecutor) clazz.newInstance();
} catch (Exception e) {
e.printStackTrace();
return null;
} }
} }
return messageExecutor;
} }


private void init() {
/**
* 初始化消息处理线程池
*/
private void initMsgExecutorPool() {


ThreadFactory msgExecuteThreadFactory = new ThreadFactoryBuilder() ThreadFactory msgExecuteThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("msg-execute-pool-%d").build();
.setNameFormat("msg-executor-pool-%d").build();


//Common Thread Pool //Common Thread Pool
msgExecutePool = new ThreadPoolExecutor(5, 10,
msgExecutorPool = new ThreadPoolExecutor(5, 10,
60, TimeUnit.SECONDS, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_CAPACITY), new LinkedBlockingQueue<>(QUEUE_CAPACITY),
msgExecuteThreadFactory, new ThreadPoolExecutor.AbortPolicy()); msgExecuteThreadFactory, new ThreadPoolExecutor.AbortPolicy());
@@ -218,6 +314,6 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo


@Override @Override
public void close() { public void close() {
msgExecutePool.shutdown();
msgExecutorPool.shutdown();
} }
} }

+ 1
- 4
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/SenderHandler.java View File

@@ -9,16 +9,13 @@
package com.jd.blockchain.stp.communication.connection.handler; package com.jd.blockchain.stp.communication.connection.handler;


import com.jd.blockchain.stp.communication.message.SessionMessage; import com.jd.blockchain.stp.communication.message.SessionMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;


import java.util.concurrent.Executors;


/** /**
*
* Sender对应Handler
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/16 * @create 2019/4/16
* @since 1.0.0 * @since 1.0.0


+ 63
- 1
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/WatchDogHandler.java View File

@@ -23,7 +23,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;


/** /**
*
* 连接监听器
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/12 * @create 2019/4/12
* @since 1.0.0 * @since 1.0.0
@@ -31,8 +31,15 @@ import java.util.concurrent.locks.ReentrantLock;
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class WatchDogHandler extends ChannelInboundHandlerAdapter implements Runnable, Closeable { public class WatchDogHandler extends ChannelInboundHandlerAdapter implements Runnable, Closeable {


/**
* 当前连接活跃状态
*/
private final AtomicBoolean currentActive = new AtomicBoolean(false); private final AtomicBoolean currentActive = new AtomicBoolean(false);


/**
* 重连的控制锁
* 防止重连过程中重复多次调用
*/
private final Lock reconnectLock = new ReentrantLock(); private final Lock reconnectLock = new ReentrantLock();


// 默认的最多重连次数 // 默认的最多重连次数
@@ -44,30 +51,70 @@ public class WatchDogHandler extends ChannelInboundHandlerAdapter implements Run
// 标识是否正常工作中,假设不再工作则不再重连 // 标识是否正常工作中,假设不再工作则不再重连
private boolean isWorking = true; private boolean isWorking = true;


/**
* 重连调度器
*/
private ScheduledExecutorService reconnectTimer; private ScheduledExecutorService reconnectTimer;


/**
* 远端的IP(域名)信息
*/
private String hostName; private String hostName;


/**
* 远端的端口
*/
private int port; private int port;


private Bootstrap bootstrap; private Bootstrap bootstrap;


/**
* 第一组Handler数组
*/
private ChannelHandler[] frontHandlers; private ChannelHandler[] frontHandlers;


/**
* 后一组Handler数组
*/
private ChannelHandler[] afterHandlers; private ChannelHandler[] afterHandlers;


/**
* 用于重连时对象重置
*/
private ChannelFuture channelFuture; private ChannelFuture channelFuture;


/**
* 构造器
* @param hostName
* 远端Host
* @param port
* 远端端口
* @param bootstrap
* Netty工作启动器
*/
public WatchDogHandler(String hostName, int port, Bootstrap bootstrap) { public WatchDogHandler(String hostName, int port, Bootstrap bootstrap) {
this.hostName = hostName; this.hostName = hostName;
this.port = port; this.port = port;
this.bootstrap = bootstrap; this.bootstrap = bootstrap;
} }


/**
* 构造器
* @param remoteNode
* 远端节点
* @param bootstrap
* Netty工作启动器
*/
public WatchDogHandler(RemoteNode remoteNode, Bootstrap bootstrap) { public WatchDogHandler(RemoteNode remoteNode, Bootstrap bootstrap) {
this(remoteNode.getHostName(), remoteNode.getPort(), bootstrap); this(remoteNode.getHostName(), remoteNode.getPort(), bootstrap);
} }


/**
* 配置重连需要的Handler
* 主要是为了对象的复用,同时有些Handler无法复用,对于每次连接请求必须要new新的对象
* @param frontHandlers
* @param afterHandlers
*/
public void init(ChannelHandler[] frontHandlers, ChannelHandler[] afterHandlers) { public void init(ChannelHandler[] frontHandlers, ChannelHandler[] afterHandlers) {
this.frontHandlers = frontHandlers; this.frontHandlers = frontHandlers;
this.afterHandlers = afterHandlers; this.afterHandlers = afterHandlers;
@@ -87,6 +134,12 @@ public class WatchDogHandler extends ChannelInboundHandlerAdapter implements Run
} }
} }


/**
* 连接成功调用
* 该连接成功表示完全连接成功,对于TCP而言就是三次握手成功
* @param ctx
* @throws Exception
*/
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 调用该方法表示连接成功 // 调用该方法表示连接成功
@@ -98,6 +151,12 @@ public class WatchDogHandler extends ChannelInboundHandlerAdapter implements Run
ctx.fireChannelActive(); ctx.fireChannelActive();
} }


/**
* 连接失败时调用
* 此处是触发重连的入口
* @param ctx
* @throws Exception
*/
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {


@@ -174,6 +233,9 @@ public class WatchDogHandler extends ChannelInboundHandlerAdapter implements Run
this.reconnectTimer.shutdown(); this.reconnectTimer.shutdown();
} }


/**
* 设置调度器
*/
private void initTimer() { private void initTimer() {
ThreadFactory timerFactory = new ThreadFactoryBuilder() ThreadFactory timerFactory = new ThreadFactoryBuilder()
.setNameFormat("reconnect-pool-%d").build(); .setNameFormat("reconnect-pool-%d").build();


+ 16
- 3
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/listener/ReplyListener.java View File

@@ -14,7 +14,7 @@ import com.jd.blockchain.stp.communication.node.RemoteNode;




/** /**
*
* 应答监听器
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/12 * @create 2019/4/12
* @since 1.0.0 * @since 1.0.0
@@ -22,17 +22,30 @@ import com.jd.blockchain.stp.communication.node.RemoteNode;


public class ReplyListener { public class ReplyListener {


/**
* 监听的Key,通常用于描述唯一的请求
*/
private String listenKey; private String listenKey;


/**
* 消息处理类型
* REMOVE:表示处理完该对象之后从缓存中清除
* HOLD:表示处理完该对象之后仍在缓存中保存
*/
private MANAGE_TYPE manageType = MANAGE_TYPE.REMOVE; private MANAGE_TYPE manageType = MANAGE_TYPE.REMOVE;


/**
* 数据回调监听器
*/
private CallBackDataListener callBackDataListener; private CallBackDataListener callBackDataListener;


/**
* 回调栅栏
*/
private CallBackBarrier callBackBarrier; private CallBackBarrier callBackBarrier;


public ReplyListener(String listenKey, RemoteNode remoteNode) { public ReplyListener(String listenKey, RemoteNode remoteNode) {
this.listenKey = listenKey;
this.callBackDataListener = new CallBackDataListener(remoteNode);
this(listenKey, remoteNode, null);
} }


public ReplyListener(String listenKey, RemoteNode remoteNode, CallBackBarrier callBackBarrier) { public ReplyListener(String listenKey, RemoteNode remoteNode, CallBackBarrier callBackBarrier) {


+ 76
- 18
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/ConnectionManager.java View File

@@ -8,7 +8,8 @@
*/ */
package com.jd.blockchain.stp.communication.manager; package com.jd.blockchain.stp.communication.manager;


import com.jd.blockchain.stp.communication.MessageExecute;
import com.jd.blockchain.stp.communication.MessageExecutor;
import com.jd.blockchain.stp.communication.callback.CallBackLauncher;
import com.jd.blockchain.stp.communication.connection.Receiver; import com.jd.blockchain.stp.communication.connection.Receiver;
import com.jd.blockchain.stp.communication.connection.Connection; import com.jd.blockchain.stp.communication.connection.Connection;
import com.jd.blockchain.stp.communication.node.LocalNode; import com.jd.blockchain.stp.communication.node.LocalNode;
@@ -20,24 +21,50 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;


/** /**
*
* 连接管理器
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
* @date 2019-04-18 15:11
*/ */


public class ConnectionManager { public class ConnectionManager {


/**
* Connection对应Map
* RemoteNode唯一性:IP(HOST)+PORT
*/
private static final Map<RemoteNode, Connection> connectionMap = new ConcurrentHashMap<>(); private static final Map<RemoteNode, Connection> connectionMap = new ConcurrentHashMap<>();


/**
* 连接管理器对应MAP
* 以监听端口(int)作为Key,进行唯一性约束
*/
private static final Map<Integer, ConnectionManager> connectionManagerMap = new ConcurrentHashMap<>(); private static final Map<Integer, ConnectionManager> connectionManagerMap = new ConcurrentHashMap<>();


/**
* connectionManagerMap控制锁
*/
private static final Lock managerLock = new ReentrantLock(); private static final Lock managerLock = new ReentrantLock();


/**
* connectionMap控制锁
*/
private static final Lock connectionLock = new ReentrantLock(); private static final Lock connectionLock = new ReentrantLock();


/**
* 当前ConnectionManager对应的Receiver
*/
private Receiver receiver; private Receiver receiver;


/**
* 静态ConnectionManager构造器
*
* @param localNode
* 本地节点
* @return
* 优先返回Map中的对象
*/
public static final ConnectionManager newConnectionManager(LocalNode localNode) { public static final ConnectionManager newConnectionManager(LocalNode localNode) {
int listenPort = localNode.getPort(); int listenPort = localNode.getPort();
if (!connectionManagerMap.containsKey(listenPort)) { if (!connectionManagerMap.containsKey(listenPort)) {
@@ -55,32 +82,61 @@ public class ConnectionManager {
return connectionManagerMap.get(listenPort); return connectionManagerMap.get(listenPort);
} }


/**
* 内部调用的静态构造器
*
* @param localNode
* 本地节点
* @return
*/
private static final ConnectionManager newInstance(LocalNode localNode) { private static final ConnectionManager newInstance(LocalNode localNode) {
return new ConnectionManager(new Receiver(localNode)); return new ConnectionManager(new Receiver(localNode));
} }


public final boolean start(String messageExecuteClass) throws InterruptedException {
receiver.initReceiverHandler(this, messageExecuteClass);
/**
* 启动
* 该启动是启动Receiver,返回启动的状态
*
* @param messageExecutorClass
* 当前节点希望其他节点收到该节点信息时的处理Handler
* @return
* 回调执行器
* @throws InterruptedException
*/
public final CallBackLauncher start(String messageExecutorClass) throws InterruptedException {
receiver.initReceiverHandler(this, messageExecutorClass);
receiver.startListen(); receiver.startListen();
// 判断是否启动完成,启动完成后再返回 // 判断是否启动完成,启动完成后再返回
return receiver.waitStarted();
return receiver.waitBooted();
} }


private ConnectionManager(Receiver receiver) { private ConnectionManager(Receiver receiver) {
this.receiver = receiver; this.receiver = receiver;
} }


public Connection connect(RemoteNode remoteNode, MessageExecute messageExecute) {
return connect(remoteNode, messageExecute.getClass().toString());
}

public Connection connect(RemoteNode remoteNode, String messageExecuteClass) {
/**
* 连接远端节点
*
* @param remoteNode
* 远端节点信息
* @param messageExecutorClass
* 希望远端节点接收到本节点消息时的处理Handler
* @return
*/
public Connection connect(RemoteNode remoteNode, String messageExecutorClass) {
if (!connectionMap.containsKey(remoteNode)) { if (!connectionMap.containsKey(remoteNode)) {
try { try {
connectionLock.lock(); connectionLock.lock();
if (!connectionMap.containsKey(remoteNode)) { if (!connectionMap.containsKey(remoteNode)) {
Connection connection = init(remoteNode, messageExecuteClass);
connectionMap.put(remoteNode, connection);
Connection connection = init(remoteNode, messageExecutorClass);
if (connection != null) {
// 保证都是连接成功的
connectionMap.put(remoteNode, connection);
return connection;
} else {
// 连接失败返回null
return null;
}
} }
} finally { } finally {
connectionLock.unlock(); connectionLock.unlock();
@@ -89,22 +145,24 @@ public class ConnectionManager {
return connectionMap.get(remoteNode); return connectionMap.get(remoteNode);
} }


private Connection init(RemoteNode remoteNode, String messageExecuteClass) {
private Connection init(RemoteNode remoteNode, String messageExecutorClass) {


// 初始化Connection // 初始化Connection
Connection remoteConnection = new Connection(this.receiver); Connection remoteConnection = new Connection(this.receiver);


try { try {
// 连接远端
boolean isSuccess = remoteConnection.connect(remoteNode, messageExecuteClass);
if (!isSuccess) {
throw new RuntimeException(String.format("RemoteNode {%s} Connect Fail !!!", remoteNode.toString()));
// 连接远端,需要发送当前节点处理的MessageExecuteClass
CallBackLauncher callBackLauncher = remoteConnection.connect(remoteNode, messageExecutorClass);
if (!callBackLauncher.isBootSuccess()) {
// TODO 打印错误日志
callBackLauncher.exception().printStackTrace();
return null;
} }
return remoteConnection;
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw e; throw e;
} }
return remoteConnection;
} }
} }

+ 122
- 51
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java View File

@@ -9,16 +9,24 @@
package com.jd.blockchain.stp.communication.manager; package com.jd.blockchain.stp.communication.manager;




import com.jd.blockchain.stp.communication.MessageExecute;
import com.jd.blockchain.stp.communication.MessageExecutor;
import com.jd.blockchain.stp.communication.RemoteSession; import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.callback.CallBackLauncher;
import com.jd.blockchain.stp.communication.connection.Connection; import com.jd.blockchain.stp.communication.connection.Connection;
import com.jd.blockchain.stp.communication.node.LocalNode; import com.jd.blockchain.stp.communication.node.LocalNode;
import com.jd.blockchain.stp.communication.node.RemoteNode; import com.jd.blockchain.stp.communication.node.RemoteNode;
import org.apache.commons.codec.binary.Hex; import org.apache.commons.codec.binary.Hex;


import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;



/** /**
*
* 远端Session管理器
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
@@ -26,84 +34,158 @@ import org.apache.commons.codec.binary.Hex;


public class RemoteSessionManager { public class RemoteSessionManager {


/**
* 可监听的最大端口
*/
private static final int MAX_PORT = 65535;

/**
* 节点Session的集合信息
*/
private Map<RemoteNode, RemoteSession> nodeRemoteSessionMap = new ConcurrentHashMap<>();

/**
* nodeRemoteSessionMap的控制锁
*/
private Lock lock = new ReentrantLock();

/**
* 连接管理器
* 用于管理底层的通信连接
*/
private ConnectionManager connectionManager; private ConnectionManager connectionManager;


/**
* 本地节点信息
*/
private LocalNode localNode; private LocalNode localNode;


/**
* 构造器
* @param localNode
* 本地节点信息
*/
public RemoteSessionManager(LocalNode localNode) { public RemoteSessionManager(LocalNode localNode) {
this.localNode = localNode; this.localNode = localNode;
// 校验本地节点的配置,防止异常
check(); check();
this.connectionManager = ConnectionManager.newConnectionManager(this.localNode); this.connectionManager = ConnectionManager.newConnectionManager(this.localNode);
try { try {
boolean isStartedSuccess = start(this.localNode.messageExecuteClass());
if (!isStartedSuccess) {
throw new RuntimeException(String.format("LocalNode {%s} Start Receiver Fail !!!", this.localNode.toString()));
CallBackLauncher callBackLauncher = start();
if (!callBackLauncher.isBootSuccess()) {
// 启动当前端口连接必须要成功,否则则退出,交由应用程序处理
throw new RuntimeException(callBackLauncher.exception());
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw e; throw e;
} }

}

private void check() {
// 要求端口范围:1~65535,messageExecuteClass不能为null
int listenPort = this.localNode.getPort();
if (listenPort <= 0 || listenPort > 65535) {
throw new IllegalArgumentException("Illegal Local Listen Port, Please Check !!!");
}

String messageExecuteClass = this.localNode.messageExecuteClass();
if (messageExecuteClass == null) {
throw new IllegalArgumentException("Illegal MessageExecute Class, Please Check !!!");
}
}

private boolean start(String messageExecuteClass) throws InterruptedException {
return this.connectionManager.start(messageExecuteClass);
} }


/**
* 生成新的Session
* @param remoteNode
* @return
*/
public RemoteSession newSession(RemoteNode remoteNode) { public RemoteSession newSession(RemoteNode remoteNode) {
return newSession(null, remoteNode); return newSession(null, remoteNode);
} }


/**
* RemoteSession对象生成器
* @param sessionId
* RemoteSession的Key
* @param remoteNode
* 远端节点信息
* @return
*/
public RemoteSession newSession(String sessionId, RemoteNode remoteNode) { public RemoteSession newSession(String sessionId, RemoteNode remoteNode) {
return newSession(sessionId, remoteNode, null);
}


public RemoteSession newSession(RemoteNode remoteNode, MessageExecute messageExecute) {
return newSession(null, remoteNode, messageExecute);
}
RemoteSession remoteSession = nodeRemoteSessionMap.get(remoteNode);


public RemoteSession newSession(String sessionId, RemoteNode remoteNode, MessageExecute messageExecute) {
if (sessionId == null) {
sessionId = toSessionId(localNode);
}
Connection remoteConnection = this.connectionManager.connect(remoteNode, localNode.messageExecuteClass());
if (remoteSession != null) {
return remoteSession;
} else {
try {
lock.lock();

// Double Check !!!
if (!nodeRemoteSessionMap.containsKey(remoteNode)) {
if (sessionId == null) {
sessionId = sessionId(localNode);
}
Connection remoteConnection = this.connectionManager.connect(remoteNode, localNode.messageExecutorClass());

if (remoteConnection == null) {
return null;
}


RemoteSession remoteSession = new RemoteSession(sessionId, remoteConnection, messageExecute);
remoteSession = new RemoteSession(sessionId, remoteConnection);


remoteSession.init();
remoteSession.init();


nodeRemoteSessionMap.put(remoteNode, remoteSession);

return remoteSession;
}
} finally {
lock.unlock();
}
}
return remoteSession; return remoteSession;
} }


public RemoteSession[] newSessions(RemoteNode[] remoteNodes) { public RemoteSession[] newSessions(RemoteNode[] remoteNodes) {

return newSessions(null, remoteNodes); return newSessions(null, remoteNodes);
} }


public RemoteSession[] newSessions(String[] sessionIds, RemoteNode[] remoteNodes) { public RemoteSession[] newSessions(String[] sessionIds, RemoteNode[] remoteNodes) {
checkSessions(sessionIds, remoteNodes);

List<RemoteSession> remoteSessionList = new ArrayList<>();

for (int i = 0; i < remoteNodes.length; i++) {
RemoteSession remoteSession;
if (sessionIds == null) {
remoteSession = newSession(remoteNodes[i]);
} else {
remoteSession = newSession(sessionIds[i], remoteNodes[i]);
}
if (remoteSession != null) {
remoteSessionList.add(remoteSession);
}
}


return newSessions(sessionIds, remoteNodes, null);
if (remoteSessionList.isEmpty()) {
return null;
}

RemoteSession[] remoteSessions = new RemoteSession[remoteSessionList.size()];

return remoteSessionList.toArray(remoteSessions);
} }


public RemoteSession[] newSessions(RemoteNode[] remoteNodes, MessageExecute messageExecute) {
private void check() {
// 要求端口范围:1~65535,messageExecuteClass不能为null
int listenPort = this.localNode.getPort();
if (listenPort <= 0 || listenPort > MAX_PORT) {
throw new IllegalArgumentException("Illegal Local Listen Port, Please Check !!!");
}

// 默认处理器必须包含,可不包含本机需要对端知晓的处理器
MessageExecutor defaultMessageExecutor = this.localNode.defaultMessageExecutor();
if (defaultMessageExecutor == null) {
throw new IllegalArgumentException("Illegal Default MessageExecutor, Please Check !!!");
}
}


return newSessions(null, remoteNodes, messageExecute);
private CallBackLauncher start() throws InterruptedException {
return this.connectionManager.start(this.localNode.messageExecutorClass());
} }


public RemoteSession[] newSessions(String[] sessionIds, RemoteNode[] remoteNodes, MessageExecute messageExecute) {
private void checkSessions(String[] sessionIds, RemoteNode[] remoteNodes) {
if (remoteNodes == null || remoteNodes.length <= 0) { if (remoteNodes == null || remoteNodes.length <= 0) {
throw new IllegalArgumentException("RemoteNodes is empty !!!"); throw new IllegalArgumentException("RemoteNodes is empty !!!");
} }
@@ -113,20 +195,9 @@ public class RemoteSessionManager {
throw new IllegalArgumentException("RemoteNodes and sessionIds are different in length !!!"); throw new IllegalArgumentException("RemoteNodes and sessionIds are different in length !!!");
} }
} }

RemoteSession[] remoteSessions = new RemoteSession[remoteNodes.length];

for (int i = 0; i < remoteNodes.length; i++) {
if (sessionIds == null) {
remoteSessions[i] = newSession(remoteNodes[i], messageExecute);
} else {
remoteSessions[i] = newSession(sessionIds[i], remoteNodes[i], messageExecute);
}
}
return remoteSessions;
} }


public String toSessionId(RemoteNode remoteNode) {
private String sessionId(RemoteNode remoteNode) {
return Hex.encodeHexString(remoteNode.toString().getBytes()); return Hex.encodeHexString(remoteNode.toString().getBytes());
} }
} }

+ 1
- 1
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/AbstractMessage.java View File

@@ -12,7 +12,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;


/** /**
*
* 抽象消息
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/17 * @create 2019/4/17
* @since 1.0.0 * @since 1.0.0


+ 23
- 4
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/HeartBeatMessage.java View File

@@ -22,22 +22,41 @@ import io.netty.util.CharsetUtil;


public class HeartBeatMessage implements IMessage { public class HeartBeatMessage implements IMessage {


/**
* 统一的心跳信息字符串
*/
private static final String HEARTBEAT_STRING = "JDChainHeartBeat"; private static final String HEARTBEAT_STRING = "JDChainHeartBeat";


/**
* 统一的心跳消息字符串对一个的ByteBuf
*/
private static final ByteBuf HEARTBEAT_MESSAGE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(HEARTBEAT_STRING + "\r\n", private static final ByteBuf HEARTBEAT_MESSAGE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(HEARTBEAT_STRING + "\r\n",
CharsetUtil.UTF_8)); CharsetUtil.UTF_8));


/**
* 将心跳消息写入Ctx
* @param ctx
*/
public static final void write(ChannelHandlerContext ctx) { public static final void write(ChannelHandlerContext ctx) {
ctx.writeAndFlush(HEARTBEAT_MESSAGE.duplicate()); ctx.writeAndFlush(HEARTBEAT_MESSAGE.duplicate());
} }


/**
* 判断接收的消息是否为心跳消息
*
* @param msg
* @return
*/
public static final boolean isHeartBeat(Object msg) { public static final boolean isHeartBeat(Object msg) {
if (msg instanceof String) {
return isHeartBeat((String) msg);
}
return false;
return isHeartBeat(msg.toString());
} }


/**
* 判断接收的消息是否为心跳消息
*
* @param msg
* @return
*/
public static final boolean isHeartBeat(String msg) { public static final boolean isHeartBeat(String msg) {
if (HEARTBEAT_STRING.equals(msg)) { if (HEARTBEAT_STRING.equals(msg)) {
return true; return true;


+ 9
- 1
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/IMessage.java View File

@@ -11,7 +11,7 @@ package com.jd.blockchain.stp.communication.message;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;


/** /**
*
* 消息接口
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/16 * @create 2019/4/16
* @since 1.0.0 * @since 1.0.0
@@ -19,7 +19,15 @@ import io.netty.buffer.ByteBuf;


public interface IMessage { public interface IMessage {


/**
* 消息转换为字符串
* @return
*/
String toTransfer(); String toTransfer();


/**
* 消息转换为ByteBuf
* @return
*/
ByteBuf toTransferByteBuf(); ByteBuf toTransferByteBuf();
} }

+ 6
- 1
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/LoadMessage.java View File

@@ -9,7 +9,8 @@
package com.jd.blockchain.stp.communication.message; package com.jd.blockchain.stp.communication.message;


/** /**
*
* 负载消息
* 该接口用于应用实现
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
@@ -17,5 +18,9 @@ package com.jd.blockchain.stp.communication.message;


public interface LoadMessage { public interface LoadMessage {


/**
* 将负载消息转换为字节数组
* @return
*/
byte[] toBytes(); byte[] toBytes();
} }

+ 34
- 17
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/SessionMessage.java View File

@@ -8,13 +8,11 @@
*/ */
package com.jd.blockchain.stp.communication.message; package com.jd.blockchain.stp.communication.message;


import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.commons.codec.binary.Hex; import org.apache.commons.codec.binary.Hex;


/** /**
*
* Session消息
* 该消息用于发送至远端节点,告诉远端节点本地的信息
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/16 * @create 2019/4/16
* @since 1.0.0 * @since 1.0.0
@@ -22,19 +20,28 @@ import org.apache.commons.codec.binary.Hex;


public class SessionMessage extends AbstractMessage implements IMessage { public class SessionMessage extends AbstractMessage implements IMessage {


/**
* 本地节点HOST
*/
private String localHost; private String localHost;


/**
* 本地节点监听端口
*/
private int listenPort; private int listenPort;


private String messageExecute;
/**
* 远端接收到本地节点信息时处理的Class
*/
private String messageExecutor;


public SessionMessage() { public SessionMessage() {
} }


public SessionMessage(String localHost, int listenPort, String messageExecute) {
public SessionMessage(String localHost, int listenPort, String messageExecutor) {
this.localHost = localHost; this.localHost = localHost;
this.listenPort = listenPort; this.listenPort = listenPort;
this.messageExecute = messageExecute;
this.messageExecutor = messageExecutor;
} }


public String getLocalHost() { public String getLocalHost() {
@@ -54,30 +61,37 @@ public class SessionMessage extends AbstractMessage implements IMessage {
return listenPort; return listenPort;
} }


public String getMessageExecute() {
return messageExecute;
public String getMessageExecutor() {
return messageExecutor;
} }


public void setMessageExecute(String messageExecute) {
this.messageExecute = messageExecute;
public void setMessageExecutor(String messageExecutor) {
this.messageExecutor = messageExecutor;
} }


public String sessionId() { public String sessionId() {
return Hex.encodeHexString((this.localHost + ":" + this.listenPort).getBytes()); return Hex.encodeHexString((this.localHost + ":" + this.listenPort).getBytes());
} }


public static SessionMessage toNodeSessionMessage(Object msg) {
/**
* 将对象(或者说接收到的消息)转换为SessionMessage
* @param msg
* 接收到的消息对象
* @return
* 可正确解析则返回,否则返回NULL
*/
public static SessionMessage toSessionMessage(Object msg) {
String msgString = msg.toString(); String msgString = msg.toString();
try { try {
String[] msgArray = msgString.split("\\|"); String[] msgArray = msgString.split("\\|");
if (msgArray.length == 2 || msgArray.length == 3) { if (msgArray.length == 2 || msgArray.length == 3) {
String host = msgArray[0]; String host = msgArray[0];
int port = Integer.parseInt(msgArray[1]); int port = Integer.parseInt(msgArray[1]);
String msgExecuteClass = null;
String msgExecutorClass = null;
if (msgArray.length == 3) { if (msgArray.length == 3) {
msgExecuteClass = msgArray[2];
msgExecutorClass = msgArray[2];
} }
return new SessionMessage(host, port, msgExecuteClass);
return new SessionMessage(host, port, msgExecutorClass);
} }
return null; return null;
} catch (Exception e) { } catch (Exception e) {
@@ -89,7 +103,10 @@ public class SessionMessage extends AbstractMessage implements IMessage {
public String toTransfer() { public String toTransfer() {
// 为区别于TransferMessage的JSON格式,该处使用字符串连接处理 // 为区别于TransferMessage的JSON格式,该处使用字符串连接处理
// 格式:localHost|port|class // 格式:localHost|port|class
String transferMsg = this.localHost + "|" + this.listenPort + "|" + this.messageExecute;
return transferMsg;
if (this.messageExecutor == null) {
return this.localHost + "|" + this.listenPort;
} else {
return this.localHost + "|" + this.listenPort + "|" + this.messageExecutor;
}
} }
} }

+ 32
- 1
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/TransferMessage.java View File

@@ -12,6 +12,7 @@ import com.alibaba.fastjson.JSON;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;


/** /**
* 底层传输协议
* *
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/11 * @create 2019/4/11
@@ -20,14 +21,32 @@ import org.apache.commons.codec.binary.Base64;


public class TransferMessage extends AbstractMessage implements IMessage{ public class TransferMessage extends AbstractMessage implements IMessage{


/**
* sessionId(描述节点信息)
*/
private String sessionId; private String sessionId;


/**
* 本次消息的类型
* 0:请求;
* 1:应答;
*/
private int type; private int type;


/**
* 消息的Key
*/
private String key; private String key;


/**
* 消息载体的内容
* 本内容不可被序列化
*/
private transient byte[] load; private transient byte[] load;


/**
* 消息载体的内容->Base64转换
*/
private String loadBase64; private String loadBase64;


public TransferMessage() { public TransferMessage() {
@@ -40,7 +59,13 @@ public class TransferMessage extends AbstractMessage implements IMessage{
this.load = load; this.load = load;
} }


public static TransferMessage toTransferMessageObj(Object msg) {
/**
* 转换为TransferMessage对象
*
* @param msg
* @return
*/
public static TransferMessage toTransferMessage(Object msg) {
if (msg == null) { if (msg == null) {
return null; return null;
} }
@@ -80,6 +105,12 @@ public class TransferMessage extends AbstractMessage implements IMessage{
return JSON.toJSONString(this); return JSON.toJSONString(this);
} }


/**
* 转换为监听的Key
* 该Key可描述为从远端发送来消息及其内容的唯一性
*
* @return
*/
public String toListenKey() { public String toListenKey() {
// 格式:sessionId:key // 格式:sessionId:key
return sessionId + ":" + key; return sessionId + ":" + key;


+ 55
- 20
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/LocalNode.java View File

@@ -8,10 +8,10 @@
*/ */
package com.jd.blockchain.stp.communication.node; package com.jd.blockchain.stp.communication.node;


import com.jd.blockchain.stp.communication.MessageExecute;
import com.jd.blockchain.stp.communication.MessageExecutor;


/** /**
*
* 本地节点
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/16 * @create 2019/4/16
* @since 1.0.0 * @since 1.0.0
@@ -19,34 +19,69 @@ import com.jd.blockchain.stp.communication.MessageExecute;


public class LocalNode extends RemoteNode { public class LocalNode extends RemoteNode {


private Class<?> messageExecute;
/**
* 当前节点消息处理器
* 该消息处理器用于描述远端节点收到当前节点的消息该如何处理
* 通常该消息处理器会以字符串的形式发送至远端节点
*/
private Class<?> messageExecutorClass;


public LocalNode(String hostName, int port) {
super(hostName, port);
}
/**
* 当前节点接收消息默认处理器
* 与messageExecutor不同,该字段描述的是当前节点接收到其他节点信息时的默认处理器
* 该参数硬性要求必须不能为空
*/
private MessageExecutor defaultMessageExecutor;


public LocalNode(String hostName, int port, MessageExecute messageExecute) {
super(hostName, port);
this.messageExecute = messageExecute.getClass();
/**
* 构造器
* @param hostName
* 当前节点Host,该Host必须是一种远端节点可访问的形式
* @param port
* 当前节点监听端口
* @param defaultMessageExecutor
* 当前节点接收到远端消息无法处理时的消息处理器
*
*/
public LocalNode(String hostName, int port, MessageExecutor defaultMessageExecutor) {
this(hostName, port, null, defaultMessageExecutor);
} }


public LocalNode(String hostName, int port, Class<?> messageExecute) {
/**
* 构造器
* @param hostName
* 当前节点Host,该Host必须是一种远端节点可访问的形式
* @param port
* 当前节点监听端口
* @param messageExecutorClass
* 当前节点期望远端节点接收到消息后的处理器
* @param defaultMessageExecutor
* 当前节点接收到远端消息无法处理时的消息处理器
*
*/
public LocalNode(String hostName, int port, Class<?> messageExecutorClass, MessageExecutor defaultMessageExecutor) {
super(hostName, port); super(hostName, port);
this.messageExecute = messageExecute;
this.messageExecutorClass = messageExecutorClass;
this.defaultMessageExecutor = defaultMessageExecutor;
} }


public String messageExecuteClass() {
if (this.messageExecute == null) {
/**
* 返回消息执行器的类对应的字符串
* 该返回值通常用于消息传递
* @return
*/
public String messageExecutorClass() {
if (this.messageExecutorClass == null) {
return null; return null;
} }
return this.messageExecute.getName();
}

public void setMessageExecute(MessageExecute messageExecute) {
this.messageExecute = messageExecute.getClass();
return this.messageExecutorClass.getName();
} }


public void setMessageExecute(Class<?> messageExecute) {
this.messageExecute = messageExecute;
/**
* 返回默认的消息处理器
* @return
*/
public MessageExecutor defaultMessageExecutor() {
return this.defaultMessageExecutor;
} }
} }

+ 11
- 1
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/RemoteNode.java View File

@@ -9,7 +9,7 @@
package com.jd.blockchain.stp.communication.node; package com.jd.blockchain.stp.communication.node;


/** /**
*
* 节点信息
* @author shaozhuguang * @author shaozhuguang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
@@ -17,8 +17,14 @@ package com.jd.blockchain.stp.communication.node;


public class RemoteNode { public class RemoteNode {


/**
* 监听端口
*/
private int port; private int port;


/**
* 当前节点域名
*/
private String hostName; private String hostName;


public RemoteNode(String hostName, int port) { public RemoteNode(String hostName, int port) {
@@ -42,6 +48,10 @@ public class RemoteNode {
this.hostName = hostName; this.hostName = hostName;
} }


/**
* 通过hostName+port形式作为判断节点的唯一标识
* @return
*/
@Override @Override
public int hashCode() { public int hashCode() {
return (hostName + ":" + port).hashCode(); return (hostName + ":" + port).hashCode();


+ 1
- 1
source/stp/stp-communication/src/test/java/com/jd/blockchain/SessionMessageTest.java View File

@@ -30,7 +30,7 @@ public class SessionMessageTest {
String transMsg = message.toTransfer(); String transMsg = message.toTransfer();
System.out.println(transMsg); System.out.println(transMsg);


SessionMessage sm = SessionMessage.toNodeSessionMessage(transMsg);
SessionMessage sm = SessionMessage.toSessionMessage(transMsg);


assertEquals(transMsg, sm.toTransfer()); assertEquals(transMsg, sm.toTransfer());
} }

+ 2
- 2
source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java View File

@@ -8,7 +8,7 @@
*/ */
package com.jd.blockchain.stp.commucation; package com.jd.blockchain.stp.commucation;


import com.jd.blockchain.stp.communication.MessageExecute;
import com.jd.blockchain.stp.communication.MessageExecutor;
import com.jd.blockchain.stp.communication.RemoteSession; import com.jd.blockchain.stp.communication.RemoteSession;


import java.nio.charset.Charset; import java.nio.charset.Charset;
@@ -20,7 +20,7 @@ import java.nio.charset.Charset;
* @since 1.0.0 * @since 1.0.0
*/ */


public class MyMessageExecutor implements MessageExecute {
public class MyMessageExecutor implements MessageExecutor {


@Override @Override
public byte[] receive(String key, byte[] data, RemoteSession session) { public byte[] receive(String key, byte[] data, RemoteSession session) {


+ 31
- 10
source/test/test-stp-community/src/test/java/com/jd/blockchain/StpTest.java View File

@@ -9,7 +9,6 @@
package com.jd.blockchain; package com.jd.blockchain;


import com.jd.blockchain.stp.commucation.MyMessageExecutor; import com.jd.blockchain.stp.commucation.MyMessageExecutor;
import com.jd.blockchain.stp.communication.MessageExecute;
import com.jd.blockchain.stp.communication.RemoteSession; import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.callback.CallBackBarrier; import com.jd.blockchain.stp.communication.callback.CallBackBarrier;
import com.jd.blockchain.stp.communication.callback.CallBackDataListener; import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
@@ -27,6 +26,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;


import static org.junit.Assert.assertNull;

/** /**
* *
* @author shaozhuguang * @author shaozhuguang
@@ -57,7 +58,7 @@ public class StpTest {
listenStart(); listenStart();
System.out.println("---------- listenComplete -----------"); System.out.println("---------- listenComplete -----------");
System.out.println("---------- ConnectionStart ----------"); System.out.println("---------- ConnectionStart ----------");
connectAllOthers();
connectOneOther();
System.out.println("---------- ConnectionComplete ----------"); System.out.println("---------- ConnectionComplete ----------");
} }


@@ -68,12 +69,17 @@ public class StpTest {
final int port = listenPorts[i], index = i; final int port = listenPorts[i], index = i;
threadPool.execute(() -> { threadPool.execute(() -> {
// 创建本地节点 // 创建本地节点
final LocalNode localNode = new LocalNode(remoteHost, port, MyMessageExecutor.class);
// 启动当前节点
RemoteSessionManager sessionManager = new RemoteSessionManager(localNode);
sessionManagers[index] = sessionManager;
countDownLatch.countDown();
System.out.printf("Current Node {%s} start success !!! \r\n", localNode.toString());
final LocalNode localNode = new LocalNode(remoteHost, port, new MyMessageExecutor());
try {
// 启动当前节点
RemoteSessionManager sessionManager = new RemoteSessionManager(localNode);
sessionManagers[index] = sessionManager;
System.out.printf("Current Node {%s} start success !!! \r\n", localNode.toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}); });
} }


@@ -99,7 +105,7 @@ public class StpTest {
} }
} }


remoteSessions = starter.newSessions(remoteNodes, new MyMessageExecutor());
remoteSessions = starter.newSessions(remoteNodes);
} }


private void connectOneOther() { private void connectOneOther() {
@@ -116,7 +122,22 @@ public class StpTest {
} }
} }


remoteSessions = starter.newSessions(remoteNodes, new MyMessageExecutor());
remoteSessions = starter.newSessions(remoteNodes);
}

private void connectOneErrorNode() {
// 所有节点完成之后,需要启动
// 启动一个节点
RemoteSessionManager starter = sessionManagers[0];

// 当前节点需要连接到其他3个节点
RemoteNode[] remoteNodes = new RemoteNode[1];

remoteNodes[0] = new RemoteNode(remoteHost, 10001);

remoteSessions = starter.newSessions(remoteNodes);

assertNull(remoteSessions);
} }






Loading…
Cancel
Save