From a4ae5ab97be2373bac10a05f721a5b5ec3223c90 Mon Sep 17 00:00:00 2001 From: shaozhuguang Date: Thu, 18 Apr 2019 15:41:17 +0800 Subject: [PATCH] Add Explanatory Notes for SFP-Communication Module. --- .../statetransfer/DSTransferProcess.java | 2 +- .../statetransfer/DataSequenceMsgHandle.java | 4 +- ...ssageExecute.java => MessageExecutor.java} | 6 +- .../stp/communication/RemoteSession.java | 103 +++++++- .../callback/CallBackBarrier.java | 22 +- .../callback/CallBackDataListener.java | 45 +++- .../callback/CallBackLauncher.java | 79 +++++++ .../connection/AbstractAsyncExecutor.java | 36 ++- .../connection/AsyncExecutor.java | 15 +- .../communication/connection/Connection.java | 101 +++++++- .../communication/connection/Receiver.java | 57 +++-- .../stp/communication/connection/Sender.java | 34 ++- .../handler/HeartBeatReceiverHandler.java | 4 +- .../handler/HeartBeatReceiverTrigger.java | 4 +- .../handler/HeartBeatSenderHandler.java | 2 +- .../handler/HeartBeatSenderTrigger.java | 6 +- .../connection/handler/ReceiverHandler.java | 220 +++++++++++++----- .../connection/handler/SenderHandler.java | 5 +- .../connection/handler/WatchDogHandler.java | 64 ++++- .../connection/listener/ReplyListener.java | 19 +- .../manager/ConnectionManager.java | 94 ++++++-- .../manager/RemoteSessionManager.java | 173 ++++++++++---- .../message/AbstractMessage.java | 2 +- .../message/HeartBeatMessage.java | 27 ++- .../stp/communication/message/IMessage.java | 10 +- .../communication/message/LoadMessage.java | 7 +- .../communication/message/SessionMessage.java | 51 ++-- .../message/TransferMessage.java | 33 ++- .../stp/communication/node/LocalNode.java | 75 ++++-- .../stp/communication/node/RemoteNode.java | 12 +- .../com/jd/blockchain/SessionMessageTest.java | 2 +- .../stp/commucation/MyMessageExecutor.java | 4 +- .../test/java/com/jd/blockchain/StpTest.java | 41 +++- 33 files changed, 1100 insertions(+), 259 deletions(-) rename source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/{MessageExecute.java => MessageExecutor.java} (87%) create mode 100644 source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackLauncher.java diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java index 1fba56ff..6d5b699f 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java @@ -69,7 +69,7 @@ public class DSTransferProcess { for (int i = 0; i < remoteSessions.length; i++) { DataSequenceMsgHandle msgHandle = new DataSequenceMsgHandle(dsReader, dsWriter); - remoteSessions[i].initExecute(msgHandle); + remoteSessions[i].initExecutor(msgHandle); remoteSessions[i].init(); } } diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java index 77fa0417..d396ff3e 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java @@ -1,6 +1,6 @@ package com.jd.blockchain.statetransfer; -import com.jd.blockchain.stp.communication.MessageExecute; +import com.jd.blockchain.stp.communication.MessageExecutor; import com.jd.blockchain.stp.communication.RemoteSession; /** @@ -9,7 +9,7 @@ import com.jd.blockchain.stp.communication.RemoteSession; * @create 2019/4/11 * @since 1.0.0 */ -public class DataSequenceMsgHandle implements MessageExecute { +public class DataSequenceMsgHandle implements MessageExecutor { DataSequenceReader dsReader; DataSequenceWriter dsWriter; diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageExecute.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageExecutor.java similarity index 87% rename from source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageExecute.java rename to source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageExecutor.java index 7640d122..ca85b0c8 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageExecute.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageExecutor.java @@ -9,13 +9,15 @@ package com.jd.blockchain.stp.communication; /** - * + * 消息执行器 + * 该执行器由其他应用实现 * @author shaozhuguang * @create 2019/4/11 * @since 1.0.0 + * @date 2019-04-18 15:29 */ -public interface MessageExecute { +public interface MessageExecutor { byte[] receive(String key, byte[] data, RemoteSession session); diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java index b6ad6794..cc4dd167 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java @@ -13,6 +13,8 @@ import com.jd.blockchain.stp.communication.callback.CallBackDataListener; import com.jd.blockchain.stp.communication.connection.Connection; import com.jd.blockchain.stp.communication.message.LoadMessage; +import java.util.concurrent.TimeUnit; + /** * @@ -23,38 +25,125 @@ import com.jd.blockchain.stp.communication.message.LoadMessage; public class RemoteSession { + /** + * 远端节点ID + */ private String id; + /** + * 远端连接 + */ private Connection connection; - private MessageExecute messageExecute; + /** + * 对应远端节点消息的处理器 + * 该处理器若为NULL,则使用当前节点默认处理器 + */ + private MessageExecutor messageExecutor; + + + /** + * 构造器 + * @param id + * 远端节点ID + * @param connection + * 对应连接 + */ + public RemoteSession(String id, Connection connection) { + this(id, connection, null); + } - public RemoteSession(String id, Connection connection, MessageExecute messageExecute) { + /** + * 构造器 + * @param id + * 远端ID + * @param connection + * 对应连接 + * @param messageExecutor + * 对应远端消息处理器 + */ + public RemoteSession(String id, Connection connection, MessageExecutor messageExecutor) { this.id = id; this.connection = connection; - this.messageExecute = messageExecute; + this.messageExecutor = messageExecutor; } public void init() { connection.initSession(this); } - public void initExecute(MessageExecute messageExecute) { - this.messageExecute = messageExecute; + public void initExecutor(MessageExecutor messageExecutor) { + this.messageExecutor = messageExecutor; } + /** + * 同步请求 + * 该请求会阻塞原线程 + * + * @param loadMessage + * 要请求的负载消息 + * @return + * 应答,直到有消息应答或出现异常 + * @throws Exception + */ public byte[] request(LoadMessage loadMessage) throws Exception { return this.connection.request(this.id, loadMessage, null).getCallBackData(); } + /** + * 同步请求 + * 该请求会阻塞原线程 + * + * @param loadMessage + * 要请求的负载消息 + * @param time + * 请求的最长等待时间 + * @param timeUnit + * 请求的最长等待单位 + * @return + * 应答,直到有消息或时间截止或出现异常 + * @throws Exception + */ + public byte[] request(LoadMessage loadMessage, long time, TimeUnit timeUnit) throws Exception { + return this.connection.request(this.id, loadMessage, null).getCallBackData(time, timeUnit); + } + + /** + * 异步请求 + * 不会阻塞调用线程 + * + * @param loadMessage + * 要发送的负载消息 + * @return + * 应答,需要调用者从Listener中获取结果 + */ public CallBackDataListener asyncRequest(LoadMessage loadMessage) { return asyncRequest(loadMessage, null); } + /** + * 异步请求 + * 不会阻塞调用线程 + * + * @param loadMessage + * 要请求的负载消息 + * @param callBackBarrier + * 回调栅栏(用于多个请求时进行统一阻拦) + * @return + * 应答,需要调用者从Listener中获取结果 + */ public CallBackDataListener asyncRequest(LoadMessage loadMessage, CallBackBarrier callBackBarrier) { return this.connection.request(this.id, loadMessage, callBackBarrier); } + /** + * 应答 + * + * @param key + * 请求消息的Key + * @param loadMessage + * 需要应答的负载消息 + */ public void reply(String key, LoadMessage loadMessage) { this.connection.reply(this.id, key, loadMessage); } @@ -75,7 +164,7 @@ public class RemoteSession { return id; } - public MessageExecute messageExecute() { - return this.messageExecute; + public MessageExecutor messageExecutor() { + return this.messageExecutor; } } \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackBarrier.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackBarrier.java index 433533a0..c9580e10 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackBarrier.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackBarrier.java @@ -8,12 +8,12 @@ */ package com.jd.blockchain.stp.communication.callback; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** - * + * 回调栅栏 + * 用于对批量请求的应答回调处理 * @author shaozhuguang * @create 2019/4/12 * @since 1.0.0 @@ -23,12 +23,30 @@ public class CallBackBarrier { private CountDownLatch countDownLatch; + /** + * 默认最大尝试调用时间(单位:毫秒) + */ private long maxTryCallMillSeconds = 2000; + + /** + * 静态构造器 + * @param barrierLength + * 请求的远端数量 + * @return + */ public static final CallBackBarrier newCallBackBarrier(int barrierLength) { return new CallBackBarrier(barrierLength); } + /** + * 静态构造器 + * @param barrierLength + * 请求的远端数量 + * @param maxTryCallMillSeconds + * 最大尝试的时间,单位:毫秒 + * @return + */ public static final CallBackBarrier newCallBackBarrier(int barrierLength, long maxTryCallMillSeconds) { return new CallBackBarrier(barrierLength, maxTryCallMillSeconds); } diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackDataListener.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackDataListener.java index 252765f9..e9aa4417 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackDataListener.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackDataListener.java @@ -18,7 +18,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** - * + * 数据回调监听器 * @author shaozhuguang * @create 2019/4/15 * @since 1.0.0 @@ -26,30 +26,69 @@ import java.util.concurrent.locks.ReentrantLock; public class CallBackDataListener { + /** + * Future + */ private CompletableFuture future = new CompletableFuture<>(); + /** + * 远端节点 + */ private RemoteNode remoteNode; private boolean isFill = false; private Lock lock = new ReentrantLock(); + + /** + * 构造器 + * @param remoteNode + * 远端节点信息 + */ public CallBackDataListener(RemoteNode remoteNode) { this.remoteNode = remoteNode; } + /** + * 获取返回的数据 + * 调用该方法会阻塞当前线程,直到有数据返回或出现异常 + * @return + * 应答结果 + * @throws InterruptedException + * @throws ExecutionException + */ public byte[] getCallBackData() throws InterruptedException, ExecutionException { return future.get(); } + /** + * 指定时间内获取返回的数据 + * 调用该方法会阻塞当前线程,直到时间到达或有数据返回或出现异常 + * @param time + * 超时时间 + * @param timeUnit + * 超时单位 + * @return + * 应答结果,若指定时间内没有数据,则返回null + * @throws InterruptedException + * @throws ExecutionException + * @throws TimeoutException + */ public byte[] getCallBackData(long time, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { return future.get(time, timeUnit); } + /** + * 设置返回的数据 + * @param data + */ public void setCallBackData(byte[] data) { + // 防止数据多次设置 if (!isFill) { try { lock.lock(); + // Double Check if (!isFill) { future.complete(data); isFill = true; @@ -64,6 +103,10 @@ public class CallBackDataListener { return this.remoteNode; } + /** + * 判断是否异步操作完成 + * @return + */ public boolean isDone() { return future.isDone(); } diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackLauncher.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackLauncher.java new file mode 100644 index 00000000..bef91db4 --- /dev/null +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackLauncher.java @@ -0,0 +1,79 @@ +/** + * Copyright: Copyright 2016-2020 JD.COM All Right Reserved + * FileName: com.jd.blockchain.stp.communication.callback.CallBackLauncher + * Author: shaozhuguang + * Department: Y事业部 + * Date: 2019/4/17 下午6:27 + * Description: + */ +package com.jd.blockchain.stp.communication.callback; + +import java.util.concurrent.Semaphore; + +/** + * 启动器回调 + * @author shaozhuguang + * @create 2019/4/17 + * @since 1.0.0 + */ + +public class CallBackLauncher { + + /** + * 是否启动成功 + */ + private boolean isBootSuccess = false; + + /** + * 信号量 + */ + private Semaphore isBooted = new Semaphore(0, true); + + /** + * 异常 + */ + private Exception exception; + + /** + * 标识当前启动成功 + */ + public void bootSuccess() { + isBootSuccess = true; + release(); + } + + /** + * 标识当前启动失败 + * @param e + * 导致失败的异常信息 + */ + public void bootFail(Exception e) { + this.exception = e; + isBootSuccess = false; + release(); + } + + /** + * 等待启动完成 + * 调用该方法会阻塞当前线程,知道启动完成或发生异常 + * @return + * 当前对象 + * @throws InterruptedException + */ + public CallBackLauncher waitingBooted() throws InterruptedException { + this.isBooted.acquire(); + return this; + } + + public boolean isBootSuccess() { + return isBootSuccess; + } + + public Exception exception() { + return exception; + } + + private void release() { + this.isBooted.release(); + } +} \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AbstractAsyncExecutor.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AbstractAsyncExecutor.java index b1c07df7..273fcbc0 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AbstractAsyncExecutor.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AbstractAsyncExecutor.java @@ -9,11 +9,12 @@ package com.jd.blockchain.stp.communication.connection; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.jd.blockchain.stp.communication.callback.CallBackLauncher; import java.util.concurrent.*; /** - * + * 抽象异步执行器 * @author shaozhuguang * @create 2019/4/17 * @since 1.0.0 @@ -21,19 +22,27 @@ import java.util.concurrent.*; public abstract class AbstractAsyncExecutor implements AsyncExecutor{ + /** + * 线程池可处理队列的容量 + */ private static final int QUEUE_CAPACITY = 1024; - protected final Semaphore isStarted = new Semaphore(0, true); - - protected boolean isStartSuccess = false; + /** + * 回调执行器 + */ + protected final CallBackLauncher callBackLauncher = new CallBackLauncher(); + /** + * 默认提供的初始化活跃线程调度器 + * @return + */ @Override public ThreadPoolExecutor initRunThread() { ThreadFactory timerFactory = new ThreadFactoryBuilder() .setNameFormat(threadNameFormat()).build(); ThreadPoolExecutor runThread = new ThreadPoolExecutor(1, 1, - 0, TimeUnit.MILLISECONDS, + 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_CAPACITY), timerFactory, new ThreadPoolExecutor.AbortPolicy()); @@ -41,11 +50,22 @@ public abstract class AbstractAsyncExecutor implements AsyncExecutor{ return runThread; } + /** + * 启动完成后回调 + * 该调用会阻塞当前线程,直到启动完成,无论是成功或失败 + * @return + * 回调执行器 + * 成功或失败会在回调执行器中有所体现 + * @throws InterruptedException + */ @Override - public boolean waitStarted() throws InterruptedException { - this.isStarted.acquire(); - return this.isStartSuccess; + public CallBackLauncher waitBooted() throws InterruptedException { + return callBackLauncher.waitingBooted(); } + /** + * 线程池中的线程命名格式 + * @return + */ public abstract String threadNameFormat(); } \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AsyncExecutor.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AsyncExecutor.java index 170b1ea5..22c0c28a 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AsyncExecutor.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AsyncExecutor.java @@ -8,10 +8,12 @@ */ package com.jd.blockchain.stp.communication.connection; +import com.jd.blockchain.stp.communication.callback.CallBackLauncher; + import java.util.concurrent.ThreadPoolExecutor; /** - * + * 异步执行器接口 * @author shaozhuguang * @create 2019/4/17 * @since 1.0.0 @@ -19,7 +21,16 @@ import java.util.concurrent.ThreadPoolExecutor; public interface AsyncExecutor { + /** + * 初始化运行线程 + * @return + */ ThreadPoolExecutor initRunThread(); - boolean waitStarted() throws InterruptedException; + /** + * 启动完成后返回调度执行器 + * @return + * @throws InterruptedException + */ + CallBackLauncher waitBooted() throws InterruptedException; } \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Connection.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Connection.java index 1cb673f7..7ceedf77 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Connection.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Connection.java @@ -11,6 +11,7 @@ package com.jd.blockchain.stp.communication.connection; import com.jd.blockchain.stp.communication.RemoteSession; import com.jd.blockchain.stp.communication.callback.CallBackBarrier; import com.jd.blockchain.stp.communication.callback.CallBackDataListener; +import com.jd.blockchain.stp.communication.callback.CallBackLauncher; import com.jd.blockchain.stp.communication.connection.listener.ReplyListener; import com.jd.blockchain.stp.communication.message.LoadMessage; import com.jd.blockchain.stp.communication.message.SessionMessage; @@ -21,35 +22,85 @@ import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.digest.DigestUtils; /** - * + * 统一连接对象 + * 该对象中有两个对象Receiver和Sender + * Receiver为复用对象(每个端口监听产生的Receiver只有一个) * @author shaozhuguang * @create 2019/4/11 * @since 1.0.0 + * @date 2019-04-18 14:49 */ - public class Connection { + /** + * 远端节点 + */ private RemoteNode remoteNode; + /** + * 接收器 + */ private Receiver receiver; + /** + * 发送器 + */ private Sender sender; + /** + * 构造器 + * + * @param receiver + */ public Connection(Receiver receiver) { this.receiver = receiver; } + /** + * 初始化RemoteSession + * + * @param remoteSession + */ public void initSession(RemoteSession remoteSession) { this.receiver.initRemoteSession(remoteSession.sessionId(), remoteSession); } - public boolean connect(RemoteNode remoteNode, String messageExecuteClass) throws InterruptedException { + /** + * 连接远端 + * + * @param remoteNode + * 远端节点 + * @param messageExecutorClass + * 希望远端节点处理本地节点消息时的消息处理器 + * @return + * 回调执行器 + * @throws InterruptedException + */ + public CallBackLauncher connect(RemoteNode remoteNode, String messageExecutorClass) throws InterruptedException { this.remoteNode = remoteNode; - this.sender = new Sender(this.remoteNode, sessionMessage(messageExecuteClass)); + this.sender = new Sender(this.remoteNode, sessionMessage(messageExecutorClass)); this.sender.connect(); - return this.sender.waitStarted(); + return this.sender.waitBooted(); } + /** + * 发送请求 + * + * 处理过程简述如下: + * 1、生成底层消息(TransferMessage),其中消息类型为请求,用于描述本次发送的消息是用于请求应答; + * 2、根据消息的唯一Key,生成listenKey,并生成应答监听器 + * 3、将应答监听器添加到Receiver中(Receiver中是以Map存储) + * 4、调用Sender发送消息至对端节点 + * 5、返回应答监听器的回调数据监听对象 + * + * @param sessionId + * 当前SessionId + * @param loadMessage + * 载体消息 + * @param callBackBarrier + * 回调栅栏 + * @return + */ public CallBackDataListener request(String sessionId, LoadMessage loadMessage, CallBackBarrier callBackBarrier) { TransferMessage transferMessage = transferMessage(sessionId, null, loadMessage, TransferMessage.MESSAGE_TYPE.TYPE_REQUEST); @@ -69,6 +120,16 @@ public class Connection { return replyListener.callBackDataListener(); } + /** + * 发送应答 + * + * @param sessionId + * 当前SessionID + * @param key + * 请求消息的Key,用于描述对应的请求 + * @param loadMessage + * 应答的载体消息 + */ public void reply(String sessionId, String key, LoadMessage loadMessage) { TransferMessage transferMessage = transferMessage(sessionId, key, loadMessage, TransferMessage.MESSAGE_TYPE.TYPE_RESPONSE); @@ -76,6 +137,12 @@ public class Connection { this.sender.send(transferMessage); } + /** + * 生成载体消息的Key + * + * @param loadMessage + * @return + */ private String loadKey(LoadMessage loadMessage) { // 使用Sha256求Hash byte[] sha256Bytes = DigestUtils.sha256(loadMessage.toBytes()); @@ -83,6 +150,19 @@ public class Connection { return Base64.encodeBase64String(sha256Bytes); } + /** + * 生成TransferMessage + * + * @param sessionId + * 节点ID + * @param key + * 消息Key + * @param loadMessage + * 载体消息 + * @param messageType + * 消息类型 + * @return + */ private TransferMessage transferMessage(String sessionId, String key, LoadMessage loadMessage, TransferMessage.MESSAGE_TYPE messageType) { if (key == null || key.length() == 0) { @@ -95,12 +175,19 @@ public class Connection { return transferMessage; } - private SessionMessage sessionMessage(String messageExecuteClass) { + /** + * 生成SessionMessage + * + * @param messageExecutorClass + * + * @return + */ + private SessionMessage sessionMessage(String messageExecutorClass) { LocalNode localNode = this.receiver.localNode(); SessionMessage sessionMessage = new SessionMessage( - localNode.getHostName(), localNode.getPort(), messageExecuteClass); + localNode.getHostName(), localNode.getPort(), messageExecutorClass); return sessionMessage; } diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Receiver.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Receiver.java index 3adee11d..c9dae7fa 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Receiver.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Receiver.java @@ -14,7 +14,6 @@ import com.jd.blockchain.stp.communication.connection.handler.HeartBeatReceiverT import com.jd.blockchain.stp.communication.connection.handler.ReceiverHandler; import com.jd.blockchain.stp.communication.connection.listener.ReplyListener; import com.jd.blockchain.stp.communication.manager.ConnectionManager; -import com.jd.blockchain.stp.communication.manager.RemoteSessionManager; import com.jd.blockchain.stp.communication.node.LocalNode; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; @@ -26,19 +25,14 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; -import io.netty.handler.codec.string.StringEncoder; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import java.io.Closeable; -import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Map; import java.util.concurrent.*; /** - * + * 接收器 * @author shaozhuguang * @create 2019/4/11 * @since 1.0.0 @@ -46,18 +40,33 @@ import java.util.concurrent.*; public class Receiver extends AbstractAsyncExecutor implements Closeable { + /** + * Netty中的BOSS线程 + */ private final EventLoopGroup bossGroup = new NioEventLoopGroup(); + /** + * Netty中的Worker线程 + */ private final EventLoopGroup workerGroup = new NioEventLoopGroup(); + /** + * 本地节点 + */ private LocalNode localNode; + /** + * 消息接收Handler + */ private ReceiverHandler receiverHandler; public Receiver(LocalNode localNode) { this.localNode = localNode; } + /** + * 启动监听 + */ public void startListen() { ServerBootstrap bootstrap = new ServerBootstrap(); @@ -80,14 +89,14 @@ public class Receiver extends AbstractAsyncExecutor implements Closeable { } }); - // 由单独的线程启动 + // 由单独的线程启动,防止外部调用线程阻塞 ThreadPoolExecutor runThread = initRunThread(); runThread.execute(() -> { try { ChannelFuture f = bootstrap.bind().sync(); - super.isStartSuccess = f.isSuccess(); - super.isStarted.release(); - if (super.isStartSuccess) { + boolean isStartSuccess = f.isSuccess(); + if (isStartSuccess) { + super.callBackLauncher.bootSuccess(); // 启动成功 f.channel().closeFuture().sync(); } else { @@ -95,7 +104,7 @@ public class Receiver extends AbstractAsyncExecutor implements Closeable { throw new Exception("Receiver start fail :" + f.cause().getMessage() + " !!!"); } } catch (Exception e) { - throw new RuntimeException(e); + super.callBackLauncher.bootFail(e); } finally { close(); } @@ -107,14 +116,34 @@ public class Receiver extends AbstractAsyncExecutor implements Closeable { return "receiver-pool-%d"; } - public void initReceiverHandler(ConnectionManager connectionManager, String messageExecuteClass) { - receiverHandler = new ReceiverHandler(connectionManager, messageExecuteClass); + /** + * 初始化ReceiverHandler + * + * @param connectionManager + * 连接管理器 + * @param messageExecutorClass + * 当前节点的消息处理Class + */ + public void initReceiverHandler(ConnectionManager connectionManager, String messageExecutorClass) { + receiverHandler = new ReceiverHandler(connectionManager, messageExecutorClass, this.localNode.defaultMessageExecutor()); } + /** + * 初始化远端Session + * + * @param sessionId + * + * @param remoteSession + */ public void initRemoteSession(String sessionId, RemoteSession remoteSession) { receiverHandler.putRemoteSession(sessionId, remoteSession); } + /** + * 添加监听器 + * + * @param replyListener + */ public void addListener(ReplyListener replyListener) { receiverHandler.addListener(replyListener); } diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Sender.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Sender.java index 6e9dde46..cbc57467 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Sender.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Sender.java @@ -13,30 +13,27 @@ import com.jd.blockchain.stp.communication.message.IMessage; import com.jd.blockchain.stp.communication.message.SessionMessage; import com.jd.blockchain.stp.communication.node.RemoteNode; import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; -import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import java.io.Closeable; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** + * 发送器 * * @author shaozhuguang * @create 2019/4/11 * @since 1.0.0 + * @date 2019-04-18 15:08 */ - public class Sender extends AbstractAsyncExecutor implements Closeable { private final EventLoopGroup loopGroup = new NioEventLoopGroup(); @@ -45,12 +42,24 @@ public class Sender extends AbstractAsyncExecutor implements Closeable { private ChannelFuture channelFuture; + /** + * 当前节点的SessionMessage + */ private SessionMessage sessionMessage; + /** + * 远端HOST + */ private String remoteHost; + /** + * 远端端口 + */ private int remotePort; + /** + * 监听Handler(重连Handler) + */ private WatchDogHandler watchDogHandler; public Sender(RemoteNode remoteNode, SessionMessage sessionMessage) { @@ -61,6 +70,9 @@ public class Sender extends AbstractAsyncExecutor implements Closeable { init(remoteHost, remotePort, sessionMessage); } + /** + * 连接 + */ public void connect() { watchDogHandler = new WatchDogHandler(this.remoteHost, this.remotePort, bootstrap); @@ -93,13 +105,13 @@ public class Sender extends AbstractAsyncExecutor implements Closeable { try { // 发起连接请求 channelFuture = bootstrap.connect(this.remoteHost, this.remotePort).sync(); - - isStartSuccess = channelFuture.isSuccess(); - isStarted.release(); + boolean isStartSuccess = channelFuture.isSuccess(); if (isStartSuccess) { // 启动成功 // 设置ChannelFuture对象,以便于发送的连接状态处理 watchDogHandler.initChannelFuture(channelFuture); + // 释放等待 + super.callBackLauncher.bootSuccess(); // 等待客户端关闭连接 channelFuture.channel().closeFuture().sync(); } else { @@ -107,7 +119,7 @@ public class Sender extends AbstractAsyncExecutor implements Closeable { throw new Exception("Sender start fail :" + channelFuture.cause().getMessage() + " !!!"); } } catch (Exception e) { - throw new RuntimeException(e); + super.callBackLauncher.bootFail(e); } finally { close(); } @@ -135,6 +147,10 @@ public class Sender extends AbstractAsyncExecutor implements Closeable { return "sender-pool-%d"; } + /** + * 发送消息 + * @param message + */ public void send(IMessage message) { watchDogHandler.channelFuture().channel().writeAndFlush(message.toTransferByteBuf()); } diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverHandler.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverHandler.java index a67920e9..47d65176 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverHandler.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverHandler.java @@ -14,7 +14,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** - * + * 心跳接收Handler * @author shaozhuguang * @create 2019/4/15 * @since 1.0.0 @@ -24,11 +24,13 @@ public class HeartBeatReceiverHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + // 判断当前收到的信息是否为心跳信息 if (HeartBeatMessage.isHeartBeat(msg)) { // 收到的消息是心跳消息,此时需要回复一个心跳消息 HeartBeatMessage.write(ctx); System.out.println("Receive HeartBeat Request Message -> " + msg.toString()); } else { + // 非心跳信息的情况下交由其他Handler继续处理 super.channelRead(ctx, msg); } } diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverTrigger.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverTrigger.java index 575895be..588b6de3 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverTrigger.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverTrigger.java @@ -15,7 +15,7 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; /** - * + * 心跳接收触发器 * @author shaozhuguang * @create 2019/4/15 * @since 1.0.0 @@ -28,12 +28,14 @@ public class HeartBeatReceiverTrigger extends ChannelInboundHandlerAdapter { // 服务端只会接收心跳数据后应答,而不会主动应答 if (evt instanceof IdleStateEvent) { IdleState idleState = ((IdleStateEvent) evt).state(); + // 读请求超时表示很久没有收到客户端请求 if (idleState.equals(IdleState.READER_IDLE)) { // 长时间未收到客户端请求,则关闭连接 System.out.println("Long Time UnReceive HeartBeat Request, Close Connection !!!"); ctx.close(); } } else { + // 非空闲状态事件,由其他Handler处理 super.userEventTriggered(ctx, evt); } } diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderHandler.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderHandler.java index da471003..a477c42c 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderHandler.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderHandler.java @@ -14,7 +14,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** - * + * 心跳发送Handler * @author shaozhuguang * @create 2019/4/15 * @since 1.0.0 diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderTrigger.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderTrigger.java index 496dc348..fc059d54 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderTrigger.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderTrigger.java @@ -16,7 +16,7 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; /** - * + * 心跳发送触发器 * @author shaozhuguang * @create 2019/4/15 * @since 1.0.0 @@ -27,7 +27,7 @@ public class HeartBeatSenderTrigger extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - // 心跳事件 + // 心跳事件(状态空闲事件) if (evt instanceof IdleStateEvent) { IdleState idleState = ((IdleStateEvent) evt).state(); if (idleState.equals(IdleState.READER_IDLE)) { @@ -40,7 +40,7 @@ public class HeartBeatSenderTrigger extends ChannelInboundHandlerAdapter { System.out.println("Read TimeOut Trigger, Send HeartBeat Request !!!"); HeartBeatMessage.write(ctx); } - // 还有一种情况是读写超时,该情况暂不处理 + // TODO 还有一种情况是读写超时,该情况暂不处理 } else { super.userEventTriggered(ctx, evt); } diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/ReceiverHandler.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/ReceiverHandler.java index 179ff0da..1bcb5ef3 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/ReceiverHandler.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/ReceiverHandler.java @@ -9,7 +9,7 @@ package com.jd.blockchain.stp.communication.connection.handler; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.jd.blockchain.stp.communication.MessageExecute; +import com.jd.blockchain.stp.communication.MessageExecutor; import com.jd.blockchain.stp.communication.RemoteSession; import com.jd.blockchain.stp.communication.connection.Connection; import com.jd.blockchain.stp.communication.connection.listener.ReplyListener; @@ -28,7 +28,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** - * + * 接收者消息处理Handler * @author shaozhuguang * @create 2019/4/12 * @since 1.0.0 @@ -36,25 +36,62 @@ import java.util.concurrent.locks.ReentrantLock; @ChannelHandler.Sharable public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Closeable { - // 队列的最大容量为256K(防止队列溢出) + /** + * 队列的最大容量设置,默认为256K(防止队列溢出) + */ private static final int QUEUE_CAPACITY = 256 * 1024; + /** + * 远端RemoteSession信息集合 + * Key为SessionId + * Sender发送的消息中会携带SessionId + * ReceiverHandler会根据不同的SessionId采用不同的MessageExecutor处理策略 + */ private final Map remoteSessions = new ConcurrentHashMap<>(); + /** + * 监听器集合 + * 对应Sender在发送请求之前会设置ReplyListener + * Key为每个请求消息的Hash,用于描述消息的唯一性 + * 应答一方会在应答中加入对应的key,用于消息的映射 + */ private final Map allReplyListeners = new ConcurrentHashMap<>(); - private final Lock lock = new ReentrantLock(); - - private String messageExecuteClass; - + /** + * session控制锁 + * 用于防止对统一RemoteSession对象进行重复设置 + */ + private final Lock sessionLock = new ReentrantLock(); + + /** + * 当前节点(本地节点)的消息处理器对应Class + * 该信息用于发送至其他节点,向其他节点通知遇到本节点请求时该如何处理 + */ + private String localMsgExecutorClass; + + /** + * 连接控制器,用于与远端节点连接 + */ private ConnectionManager connectionManager; - private ExecutorService msgExecutePool; + /** + * 消息处理执行线程池 + * 防止执行内容过长,导致阻塞 + */ + private ExecutorService msgExecutorPool; - public ReceiverHandler(ConnectionManager connectionManager, String messageExecuteClass) { + /** + * 默认消息处理器 + * 当对应session获取到的RemoteSession中没有获取到指定MessageExecutor时,短时间内由其进行处理 + */ + private MessageExecutor defaultMessageExecutor; + + public ReceiverHandler(ConnectionManager connectionManager, String localMsgExecutorClass, + MessageExecutor defaultMessageExecutor) { this.connectionManager = connectionManager; - this.messageExecuteClass = messageExecuteClass; - init(); + this.localMsgExecutorClass = localMsgExecutorClass; + this.defaultMessageExecutor = defaultMessageExecutor; + initMsgExecutorPool(); } public void putRemoteSession(String sessionId, RemoteSession remoteSession) { @@ -75,10 +112,10 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo System.out.println("Receive Biz Message -> " + msg.toString()); // 有数据接入 // 首先判断数据是否TransferMessage,当前Handler不处理非TransferMessage - TransferMessage tm = TransferMessage.toTransferMessageObj(msg); + TransferMessage tm = TransferMessage.toTransferMessage(msg); if (tm == null) { // 判断是否是SessionMessage - SessionMessage sm = SessionMessage.toNodeSessionMessage(msg); + SessionMessage sm = SessionMessage.toSessionMessage(msg); if (sm != null) { executeSessionMessage(sm); } else { @@ -106,36 +143,48 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo ctx.close(); } - // 防止消息的处理过程阻塞主进程 + /** + * 处理请求消息 + * + * @param transferMessage + * 接收到的请求消息 + */ private void executeRequest(final TransferMessage transferMessage) { - msgExecutePool.execute(() -> { + msgExecutorPool.execute(() -> { RemoteSession remoteSession = remoteSessions.get(transferMessage.getSessionId()); if (remoteSession != null) { - MessageExecute messageExecute = remoteSession.messageExecute(); - if (messageExecute != null) { - MessageExecute.REPLY replyType = messageExecute.replyType(); - if (replyType != null) { - switch (messageExecute.replyType()) { - case MANUAL: - messageExecute.receive(transferMessage.loadKey(), transferMessage.load(), remoteSession); - break; - case AUTO: - String requestKey = transferMessage.loadKey(); - byte[] replyMsg = messageExecute.receive(requestKey, transferMessage.load(), remoteSession); - // 应答 - remoteSession.reply(requestKey, () -> replyMsg); - break; - default: - break; - } + MessageExecutor messageExecutor = remoteSession.messageExecutor(); + if (messageExecutor == null) { + // 采用默认处理器进行处理 + messageExecutor = defaultMessageExecutor; + } + MessageExecutor.REPLY replyType = messageExecutor.replyType(); + if (replyType != null) { + switch (replyType) { + case MANUAL: + messageExecutor.receive(transferMessage.loadKey(), transferMessage.load(), remoteSession); + break; + case AUTO: + String requestKey = transferMessage.loadKey(); + byte[] replyMsg = messageExecutor.receive(requestKey, transferMessage.load(), remoteSession); + // 应答 + remoteSession.reply(requestKey, () -> replyMsg); + break; + default: + break; } } } }); } + /** + * 处理应答消息 + * @param transferMessage + * 接收到的应答消息 + */ private void executeResponse(final TransferMessage transferMessage) { - msgExecutePool.execute(() -> { + msgExecutorPool.execute(() -> { // listenKey和msgKey是不一致的 // msgKey是对消息本身设置key,listenKey是对整个消息(包括session信息) String listenKey = transferMessage.toListenKey(); @@ -165,52 +214,99 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo }); } + /** + * 处理SessionMessage + * @param sessionMessage + * 描述Session的消息对象 + */ private void executeSessionMessage(SessionMessage sessionMessage) { // 处理SessionMessage String sessionId = sessionMessage.sessionId(); - if (sessionId != null && !remoteSessions.containsKey(sessionId)) { - - try { - lock.lock(); - // 生成对应的MessageExecute对象 - String messageExecuteClass = sessionMessage.getMessageExecute(); - MessageExecute messageExecute = null; - if (messageExecuteClass != null && messageExecuteClass.length() > 0) { - try { - Class clazz = Class.forName(messageExecuteClass); - messageExecute = (MessageExecute) clazz.newInstance(); - } catch (Exception e) { - // TODO 打印日志 - e.printStackTrace(); - } - } + if (sessionId != null) { + // 对于含有的RemoteSession的Map,需要判断其MessageExecutor是否为NULL + RemoteSession remoteSession = remoteSessions.get(sessionId); + if (remoteSession == null) { + try { + sessionLock.lock(); + // 生成对应的MessageExecute对象 + String meClass = sessionMessage.getMessageExecutor(); + MessageExecutor messageExecutor = initMessageExecutor(meClass); - // 必须保证该对象不为空 - if (messageExecute != null) { // 说明尚未和请求来的客户端建立连接,需要建立连接 Connection remoteConnection = this.connectionManager.connect(new RemoteNode( sessionMessage.getLocalHost(), sessionMessage.getListenPort()), - this.messageExecuteClass); - RemoteSession remoteSession = new RemoteSession(sessionId, remoteConnection, messageExecute); + this.localMsgExecutorClass); + // 假设连接失败的话,返回的Connection对象为null,此时不放入Map,等后续再处理 + if (remoteConnection != null) { + + remoteSession = new RemoteSession(sessionId, remoteConnection, messageExecutor); - // Double check !!! - if (!remoteSessions.containsKey(sessionId)) { - remoteSessions.put(sessionId, remoteSession); + // Double check !!! + if (!remoteSessions.containsKey(sessionId)) { + remoteSessions.put(sessionId, remoteSession); + } } + } finally { + sessionLock.unlock(); } - } finally { - lock.unlock(); + } else { + // 需要判断MessageExecutor + MessageExecutor me = remoteSession.messageExecutor(); + if (me == null) { + try { + sessionLock.lock(); + // Double Check !!! + if (remoteSession.messageExecutor() == null) { + // 表明上次存储的MessageExecutor未创建成功,本次进行更新 + String meClass = sessionMessage.getMessageExecutor(); + MessageExecutor messageExecutor = initMessageExecutor(meClass); + + // 防止NULL将其他的进行覆盖 + if (messageExecutor != null) { + remoteSession.initExecutor(messageExecutor); + } + } + } finally { + sessionLock.unlock(); + } + } + } + } + } + + /** + * 初始化消息执行器 + * 根据消息执行器的Class字符串生成对应的消息处理对象 + * @param messageExecutorClass + * 消息执行器的Class字符串 + * @return + * 对应的消息处理对象,产生任何异常都返回NULL + */ + private MessageExecutor initMessageExecutor(String messageExecutorClass) { + // 生成对应的MessageExecute对象 + MessageExecutor messageExecutor = null; + if (messageExecutorClass != null && messageExecutorClass.length() > 0) { + try { + Class clazz = Class.forName(messageExecutorClass); + messageExecutor = (MessageExecutor) clazz.newInstance(); + } catch (Exception e) { + e.printStackTrace(); + return null; } } + return messageExecutor; } - private void init() { + /** + * 初始化消息处理线程池 + */ + private void initMsgExecutorPool() { ThreadFactory msgExecuteThreadFactory = new ThreadFactoryBuilder() - .setNameFormat("msg-execute-pool-%d").build(); + .setNameFormat("msg-executor-pool-%d").build(); //Common Thread Pool - msgExecutePool = new ThreadPoolExecutor(5, 10, + msgExecutorPool = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_CAPACITY), msgExecuteThreadFactory, new ThreadPoolExecutor.AbortPolicy()); @@ -218,6 +314,6 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo @Override public void close() { - msgExecutePool.shutdown(); + msgExecutorPool.shutdown(); } } \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/SenderHandler.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/SenderHandler.java index d1590318..ac86ea54 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/SenderHandler.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/SenderHandler.java @@ -9,16 +9,13 @@ package com.jd.blockchain.stp.communication.connection.handler; import com.jd.blockchain.stp.communication.message.SessionMessage; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import java.util.concurrent.Executors; /** - * + * Sender对应Handler * @author shaozhuguang * @create 2019/4/16 * @since 1.0.0 diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/WatchDogHandler.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/WatchDogHandler.java index dac7b823..ca41fc9a 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/WatchDogHandler.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/WatchDogHandler.java @@ -23,7 +23,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** - * + * 连接监听器 * @author shaozhuguang * @create 2019/4/12 * @since 1.0.0 @@ -31,8 +31,15 @@ import java.util.concurrent.locks.ReentrantLock; @ChannelHandler.Sharable public class WatchDogHandler extends ChannelInboundHandlerAdapter implements Runnable, Closeable { + /** + * 当前连接活跃状态 + */ private final AtomicBoolean currentActive = new AtomicBoolean(false); + /** + * 重连的控制锁 + * 防止重连过程中重复多次调用 + */ private final Lock reconnectLock = new ReentrantLock(); // 默认的最多重连次数 @@ -44,30 +51,70 @@ public class WatchDogHandler extends ChannelInboundHandlerAdapter implements Run // 标识是否正常工作中,假设不再工作则不再重连 private boolean isWorking = true; + /** + * 重连调度器 + */ private ScheduledExecutorService reconnectTimer; + /** + * 远端的IP(域名)信息 + */ private String hostName; + /** + * 远端的端口 + */ private int port; private Bootstrap bootstrap; + /** + * 第一组Handler数组 + */ private ChannelHandler[] frontHandlers; + /** + * 后一组Handler数组 + */ private ChannelHandler[] afterHandlers; + /** + * 用于重连时对象重置 + */ private ChannelFuture channelFuture; + /** + * 构造器 + * @param hostName + * 远端Host + * @param port + * 远端端口 + * @param bootstrap + * Netty工作启动器 + */ public WatchDogHandler(String hostName, int port, Bootstrap bootstrap) { this.hostName = hostName; this.port = port; this.bootstrap = bootstrap; } + /** + * 构造器 + * @param remoteNode + * 远端节点 + * @param bootstrap + * Netty工作启动器 + */ public WatchDogHandler(RemoteNode remoteNode, Bootstrap bootstrap) { this(remoteNode.getHostName(), remoteNode.getPort(), bootstrap); } + /** + * 配置重连需要的Handler + * 主要是为了对象的复用,同时有些Handler无法复用,对于每次连接请求必须要new新的对象 + * @param frontHandlers + * @param afterHandlers + */ public void init(ChannelHandler[] frontHandlers, ChannelHandler[] afterHandlers) { this.frontHandlers = frontHandlers; this.afterHandlers = afterHandlers; @@ -87,6 +134,12 @@ public class WatchDogHandler extends ChannelInboundHandlerAdapter implements Run } } + /** + * 连接成功调用 + * 该连接成功表示完全连接成功,对于TCP而言就是三次握手成功 + * @param ctx + * @throws Exception + */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 调用该方法表示连接成功 @@ -98,6 +151,12 @@ public class WatchDogHandler extends ChannelInboundHandlerAdapter implements Run ctx.fireChannelActive(); } + /** + * 连接失败时调用 + * 此处是触发重连的入口 + * @param ctx + * @throws Exception + */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { @@ -174,6 +233,9 @@ public class WatchDogHandler extends ChannelInboundHandlerAdapter implements Run this.reconnectTimer.shutdown(); } + /** + * 设置调度器 + */ private void initTimer() { ThreadFactory timerFactory = new ThreadFactoryBuilder() .setNameFormat("reconnect-pool-%d").build(); diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/listener/ReplyListener.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/listener/ReplyListener.java index 842e6e8e..47d11883 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/listener/ReplyListener.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/listener/ReplyListener.java @@ -14,7 +14,7 @@ import com.jd.blockchain.stp.communication.node.RemoteNode; /** - * + * 应答监听器 * @author shaozhuguang * @create 2019/4/12 * @since 1.0.0 @@ -22,17 +22,30 @@ import com.jd.blockchain.stp.communication.node.RemoteNode; public class ReplyListener { + /** + * 监听的Key,通常用于描述唯一的请求 + */ private String listenKey; + /** + * 消息处理类型 + * REMOVE:表示处理完该对象之后从缓存中清除 + * HOLD:表示处理完该对象之后仍在缓存中保存 + */ private MANAGE_TYPE manageType = MANAGE_TYPE.REMOVE; + /** + * 数据回调监听器 + */ private CallBackDataListener callBackDataListener; + /** + * 回调栅栏 + */ private CallBackBarrier callBackBarrier; public ReplyListener(String listenKey, RemoteNode remoteNode) { - this.listenKey = listenKey; - this.callBackDataListener = new CallBackDataListener(remoteNode); + this(listenKey, remoteNode, null); } public ReplyListener(String listenKey, RemoteNode remoteNode, CallBackBarrier callBackBarrier) { diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/ConnectionManager.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/ConnectionManager.java index e4aef871..3f322e97 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/ConnectionManager.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/ConnectionManager.java @@ -8,7 +8,8 @@ */ package com.jd.blockchain.stp.communication.manager; -import com.jd.blockchain.stp.communication.MessageExecute; +import com.jd.blockchain.stp.communication.MessageExecutor; +import com.jd.blockchain.stp.communication.callback.CallBackLauncher; import com.jd.blockchain.stp.communication.connection.Receiver; import com.jd.blockchain.stp.communication.connection.Connection; import com.jd.blockchain.stp.communication.node.LocalNode; @@ -20,24 +21,50 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** - * + * 连接管理器 * @author shaozhuguang * @create 2019/4/11 * @since 1.0.0 + * @date 2019-04-18 15:11 */ public class ConnectionManager { + /** + * Connection对应Map + * RemoteNode唯一性:IP(HOST)+PORT + */ private static final Map connectionMap = new ConcurrentHashMap<>(); + /** + * 连接管理器对应MAP + * 以监听端口(int)作为Key,进行唯一性约束 + */ private static final Map connectionManagerMap = new ConcurrentHashMap<>(); + /** + * connectionManagerMap控制锁 + */ private static final Lock managerLock = new ReentrantLock(); + /** + * connectionMap控制锁 + */ private static final Lock connectionLock = new ReentrantLock(); + /** + * 当前ConnectionManager对应的Receiver + */ private Receiver receiver; + /** + * 静态ConnectionManager构造器 + * + * @param localNode + * 本地节点 + * @return + * 优先返回Map中的对象 + */ public static final ConnectionManager newConnectionManager(LocalNode localNode) { int listenPort = localNode.getPort(); if (!connectionManagerMap.containsKey(listenPort)) { @@ -55,32 +82,61 @@ public class ConnectionManager { return connectionManagerMap.get(listenPort); } + /** + * 内部调用的静态构造器 + * + * @param localNode + * 本地节点 + * @return + */ private static final ConnectionManager newInstance(LocalNode localNode) { return new ConnectionManager(new Receiver(localNode)); } - public final boolean start(String messageExecuteClass) throws InterruptedException { - receiver.initReceiverHandler(this, messageExecuteClass); + /** + * 启动 + * 该启动是启动Receiver,返回启动的状态 + * + * @param messageExecutorClass + * 当前节点希望其他节点收到该节点信息时的处理Handler + * @return + * 回调执行器 + * @throws InterruptedException + */ + public final CallBackLauncher start(String messageExecutorClass) throws InterruptedException { + receiver.initReceiverHandler(this, messageExecutorClass); receiver.startListen(); // 判断是否启动完成,启动完成后再返回 - return receiver.waitStarted(); + return receiver.waitBooted(); } private ConnectionManager(Receiver receiver) { this.receiver = receiver; } - public Connection connect(RemoteNode remoteNode, MessageExecute messageExecute) { - return connect(remoteNode, messageExecute.getClass().toString()); - } - - public Connection connect(RemoteNode remoteNode, String messageExecuteClass) { + /** + * 连接远端节点 + * + * @param remoteNode + * 远端节点信息 + * @param messageExecutorClass + * 希望远端节点接收到本节点消息时的处理Handler + * @return + */ + public Connection connect(RemoteNode remoteNode, String messageExecutorClass) { if (!connectionMap.containsKey(remoteNode)) { try { connectionLock.lock(); if (!connectionMap.containsKey(remoteNode)) { - Connection connection = init(remoteNode, messageExecuteClass); - connectionMap.put(remoteNode, connection); + Connection connection = init(remoteNode, messageExecutorClass); + if (connection != null) { + // 保证都是连接成功的 + connectionMap.put(remoteNode, connection); + return connection; + } else { + // 连接失败返回null + return null; + } } } finally { connectionLock.unlock(); @@ -89,22 +145,24 @@ public class ConnectionManager { return connectionMap.get(remoteNode); } - private Connection init(RemoteNode remoteNode, String messageExecuteClass) { + private Connection init(RemoteNode remoteNode, String messageExecutorClass) { // 初始化Connection Connection remoteConnection = new Connection(this.receiver); try { - // 连接远端 - boolean isSuccess = remoteConnection.connect(remoteNode, messageExecuteClass); - if (!isSuccess) { - throw new RuntimeException(String.format("RemoteNode {%s} Connect Fail !!!", remoteNode.toString())); + // 连接远端,需要发送当前节点处理的MessageExecuteClass + CallBackLauncher callBackLauncher = remoteConnection.connect(remoteNode, messageExecutorClass); + if (!callBackLauncher.isBootSuccess()) { + // TODO 打印错误日志 + callBackLauncher.exception().printStackTrace(); + return null; } + return remoteConnection; } catch (InterruptedException e) { throw new RuntimeException(e); } catch (RuntimeException e) { throw e; } - return remoteConnection; } } \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java index bdcfc085..3403d2a0 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java @@ -9,16 +9,24 @@ package com.jd.blockchain.stp.communication.manager; -import com.jd.blockchain.stp.communication.MessageExecute; +import com.jd.blockchain.stp.communication.MessageExecutor; import com.jd.blockchain.stp.communication.RemoteSession; +import com.jd.blockchain.stp.communication.callback.CallBackLauncher; import com.jd.blockchain.stp.communication.connection.Connection; import com.jd.blockchain.stp.communication.node.LocalNode; import com.jd.blockchain.stp.communication.node.RemoteNode; import org.apache.commons.codec.binary.Hex; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + /** - * + * 远端Session管理器 * @author shaozhuguang * @create 2019/4/11 * @since 1.0.0 @@ -26,84 +34,158 @@ import org.apache.commons.codec.binary.Hex; public class RemoteSessionManager { + /** + * 可监听的最大端口 + */ + private static final int MAX_PORT = 65535; + + /** + * 节点Session的集合信息 + */ + private Map nodeRemoteSessionMap = new ConcurrentHashMap<>(); + + /** + * nodeRemoteSessionMap的控制锁 + */ + private Lock lock = new ReentrantLock(); + + /** + * 连接管理器 + * 用于管理底层的通信连接 + */ private ConnectionManager connectionManager; + /** + * 本地节点信息 + */ private LocalNode localNode; + /** + * 构造器 + * @param localNode + * 本地节点信息 + */ public RemoteSessionManager(LocalNode localNode) { this.localNode = localNode; + // 校验本地节点的配置,防止异常 check(); this.connectionManager = ConnectionManager.newConnectionManager(this.localNode); try { - boolean isStartedSuccess = start(this.localNode.messageExecuteClass()); - if (!isStartedSuccess) { - throw new RuntimeException(String.format("LocalNode {%s} Start Receiver Fail !!!", this.localNode.toString())); + CallBackLauncher callBackLauncher = start(); + if (!callBackLauncher.isBootSuccess()) { + // 启动当前端口连接必须要成功,否则则退出,交由应用程序处理 + throw new RuntimeException(callBackLauncher.exception()); } } catch (InterruptedException e) { throw new RuntimeException(e); } catch (RuntimeException e) { throw e; } - - } - - private void check() { - // 要求端口范围:1~65535,messageExecuteClass不能为null - int listenPort = this.localNode.getPort(); - if (listenPort <= 0 || listenPort > 65535) { - throw new IllegalArgumentException("Illegal Local Listen Port, Please Check !!!"); - } - - String messageExecuteClass = this.localNode.messageExecuteClass(); - if (messageExecuteClass == null) { - throw new IllegalArgumentException("Illegal MessageExecute Class, Please Check !!!"); - } - } - - private boolean start(String messageExecuteClass) throws InterruptedException { - return this.connectionManager.start(messageExecuteClass); } + /** + * 生成新的Session + * @param remoteNode + * @return + */ public RemoteSession newSession(RemoteNode remoteNode) { return newSession(null, remoteNode); } + /** + * RemoteSession对象生成器 + * @param sessionId + * RemoteSession的Key + * @param remoteNode + * 远端节点信息 + * @return + */ public RemoteSession newSession(String sessionId, RemoteNode remoteNode) { - return newSession(sessionId, remoteNode, null); - } - public RemoteSession newSession(RemoteNode remoteNode, MessageExecute messageExecute) { - return newSession(null, remoteNode, messageExecute); - } + RemoteSession remoteSession = nodeRemoteSessionMap.get(remoteNode); - public RemoteSession newSession(String sessionId, RemoteNode remoteNode, MessageExecute messageExecute) { - if (sessionId == null) { - sessionId = toSessionId(localNode); - } - Connection remoteConnection = this.connectionManager.connect(remoteNode, localNode.messageExecuteClass()); + if (remoteSession != null) { + return remoteSession; + } else { + try { + lock.lock(); + + // Double Check !!! + if (!nodeRemoteSessionMap.containsKey(remoteNode)) { + if (sessionId == null) { + sessionId = sessionId(localNode); + } + Connection remoteConnection = this.connectionManager.connect(remoteNode, localNode.messageExecutorClass()); + + if (remoteConnection == null) { + return null; + } - RemoteSession remoteSession = new RemoteSession(sessionId, remoteConnection, messageExecute); + remoteSession = new RemoteSession(sessionId, remoteConnection); - remoteSession.init(); + remoteSession.init(); + nodeRemoteSessionMap.put(remoteNode, remoteSession); + + return remoteSession; + } + } finally { + lock.unlock(); + } + } return remoteSession; } public RemoteSession[] newSessions(RemoteNode[] remoteNodes) { + return newSessions(null, remoteNodes); } public RemoteSession[] newSessions(String[] sessionIds, RemoteNode[] remoteNodes) { + checkSessions(sessionIds, remoteNodes); + + List remoteSessionList = new ArrayList<>(); + + for (int i = 0; i < remoteNodes.length; i++) { + RemoteSession remoteSession; + if (sessionIds == null) { + remoteSession = newSession(remoteNodes[i]); + } else { + remoteSession = newSession(sessionIds[i], remoteNodes[i]); + } + if (remoteSession != null) { + remoteSessionList.add(remoteSession); + } + } - return newSessions(sessionIds, remoteNodes, null); + if (remoteSessionList.isEmpty()) { + return null; + } + + RemoteSession[] remoteSessions = new RemoteSession[remoteSessionList.size()]; + + return remoteSessionList.toArray(remoteSessions); } - public RemoteSession[] newSessions(RemoteNode[] remoteNodes, MessageExecute messageExecute) { + private void check() { + // 要求端口范围:1~65535,messageExecuteClass不能为null + int listenPort = this.localNode.getPort(); + if (listenPort <= 0 || listenPort > MAX_PORT) { + throw new IllegalArgumentException("Illegal Local Listen Port, Please Check !!!"); + } + + // 默认处理器必须包含,可不包含本机需要对端知晓的处理器 + MessageExecutor defaultMessageExecutor = this.localNode.defaultMessageExecutor(); + if (defaultMessageExecutor == null) { + throw new IllegalArgumentException("Illegal Default MessageExecutor, Please Check !!!"); + } + } - return newSessions(null, remoteNodes, messageExecute); + private CallBackLauncher start() throws InterruptedException { + return this.connectionManager.start(this.localNode.messageExecutorClass()); } - public RemoteSession[] newSessions(String[] sessionIds, RemoteNode[] remoteNodes, MessageExecute messageExecute) { + private void checkSessions(String[] sessionIds, RemoteNode[] remoteNodes) { if (remoteNodes == null || remoteNodes.length <= 0) { throw new IllegalArgumentException("RemoteNodes is empty !!!"); } @@ -113,20 +195,9 @@ public class RemoteSessionManager { throw new IllegalArgumentException("RemoteNodes and sessionIds are different in length !!!"); } } - - RemoteSession[] remoteSessions = new RemoteSession[remoteNodes.length]; - - for (int i = 0; i < remoteNodes.length; i++) { - if (sessionIds == null) { - remoteSessions[i] = newSession(remoteNodes[i], messageExecute); - } else { - remoteSessions[i] = newSession(sessionIds[i], remoteNodes[i], messageExecute); - } - } - return remoteSessions; } - public String toSessionId(RemoteNode remoteNode) { + private String sessionId(RemoteNode remoteNode) { return Hex.encodeHexString(remoteNode.toString().getBytes()); } } \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/AbstractMessage.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/AbstractMessage.java index b872fbfb..8cd0a3e9 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/AbstractMessage.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/AbstractMessage.java @@ -12,7 +12,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; /** - * + * 抽象消息 * @author shaozhuguang * @create 2019/4/17 * @since 1.0.0 diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/HeartBeatMessage.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/HeartBeatMessage.java index 6395d030..a0f5974c 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/HeartBeatMessage.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/HeartBeatMessage.java @@ -22,22 +22,41 @@ import io.netty.util.CharsetUtil; public class HeartBeatMessage implements IMessage { + /** + * 统一的心跳信息字符串 + */ private static final String HEARTBEAT_STRING = "JDChainHeartBeat"; + /** + * 统一的心跳消息字符串对一个的ByteBuf + */ private static final ByteBuf HEARTBEAT_MESSAGE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(HEARTBEAT_STRING + "\r\n", CharsetUtil.UTF_8)); + /** + * 将心跳消息写入Ctx + * @param ctx + */ public static final void write(ChannelHandlerContext ctx) { ctx.writeAndFlush(HEARTBEAT_MESSAGE.duplicate()); } + /** + * 判断接收的消息是否为心跳消息 + * + * @param msg + * @return + */ public static final boolean isHeartBeat(Object msg) { - if (msg instanceof String) { - return isHeartBeat((String) msg); - } - return false; + return isHeartBeat(msg.toString()); } + /** + * 判断接收的消息是否为心跳消息 + * + * @param msg + * @return + */ public static final boolean isHeartBeat(String msg) { if (HEARTBEAT_STRING.equals(msg)) { return true; diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/IMessage.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/IMessage.java index 0e3ed5df..6cb0a0a4 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/IMessage.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/IMessage.java @@ -11,7 +11,7 @@ package com.jd.blockchain.stp.communication.message; import io.netty.buffer.ByteBuf; /** - * + * 消息接口 * @author shaozhuguang * @create 2019/4/16 * @since 1.0.0 @@ -19,7 +19,15 @@ import io.netty.buffer.ByteBuf; public interface IMessage { + /** + * 消息转换为字符串 + * @return + */ String toTransfer(); + /** + * 消息转换为ByteBuf + * @return + */ ByteBuf toTransferByteBuf(); } \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/LoadMessage.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/LoadMessage.java index acdedee2..795fa8c1 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/LoadMessage.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/LoadMessage.java @@ -9,7 +9,8 @@ package com.jd.blockchain.stp.communication.message; /** - * + * 负载消息 + * 该接口用于应用实现 * @author shaozhuguang * @create 2019/4/11 * @since 1.0.0 @@ -17,5 +18,9 @@ package com.jd.blockchain.stp.communication.message; public interface LoadMessage { + /** + * 将负载消息转换为字节数组 + * @return + */ byte[] toBytes(); } \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/SessionMessage.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/SessionMessage.java index 10a121a3..c685b77c 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/SessionMessage.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/SessionMessage.java @@ -8,13 +8,11 @@ */ package com.jd.blockchain.stp.communication.message; -import com.alibaba.fastjson.JSON; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import org.apache.commons.codec.binary.Hex; /** - * + * Session消息 + * 该消息用于发送至远端节点,告诉远端节点本地的信息 * @author shaozhuguang * @create 2019/4/16 * @since 1.0.0 @@ -22,19 +20,28 @@ import org.apache.commons.codec.binary.Hex; public class SessionMessage extends AbstractMessage implements IMessage { + /** + * 本地节点HOST + */ private String localHost; + /** + * 本地节点监听端口 + */ private int listenPort; - private String messageExecute; + /** + * 远端接收到本地节点信息时处理的Class + */ + private String messageExecutor; public SessionMessage() { } - public SessionMessage(String localHost, int listenPort, String messageExecute) { + public SessionMessage(String localHost, int listenPort, String messageExecutor) { this.localHost = localHost; this.listenPort = listenPort; - this.messageExecute = messageExecute; + this.messageExecutor = messageExecutor; } public String getLocalHost() { @@ -54,30 +61,37 @@ public class SessionMessage extends AbstractMessage implements IMessage { return listenPort; } - public String getMessageExecute() { - return messageExecute; + public String getMessageExecutor() { + return messageExecutor; } - public void setMessageExecute(String messageExecute) { - this.messageExecute = messageExecute; + public void setMessageExecutor(String messageExecutor) { + this.messageExecutor = messageExecutor; } public String sessionId() { return Hex.encodeHexString((this.localHost + ":" + this.listenPort).getBytes()); } - public static SessionMessage toNodeSessionMessage(Object msg) { + /** + * 将对象(或者说接收到的消息)转换为SessionMessage + * @param msg + * 接收到的消息对象 + * @return + * 可正确解析则返回,否则返回NULL + */ + public static SessionMessage toSessionMessage(Object msg) { String msgString = msg.toString(); try { String[] msgArray = msgString.split("\\|"); if (msgArray.length == 2 || msgArray.length == 3) { String host = msgArray[0]; int port = Integer.parseInt(msgArray[1]); - String msgExecuteClass = null; + String msgExecutorClass = null; if (msgArray.length == 3) { - msgExecuteClass = msgArray[2]; + msgExecutorClass = msgArray[2]; } - return new SessionMessage(host, port, msgExecuteClass); + return new SessionMessage(host, port, msgExecutorClass); } return null; } catch (Exception e) { @@ -89,7 +103,10 @@ public class SessionMessage extends AbstractMessage implements IMessage { public String toTransfer() { // 为区别于TransferMessage的JSON格式,该处使用字符串连接处理 // 格式:localHost|port|class - String transferMsg = this.localHost + "|" + this.listenPort + "|" + this.messageExecute; - return transferMsg; + if (this.messageExecutor == null) { + return this.localHost + "|" + this.listenPort; + } else { + return this.localHost + "|" + this.listenPort + "|" + this.messageExecutor; + } } } \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/TransferMessage.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/TransferMessage.java index 7f0659fa..8bc9a448 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/TransferMessage.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/TransferMessage.java @@ -12,6 +12,7 @@ import com.alibaba.fastjson.JSON; import org.apache.commons.codec.binary.Base64; /** + * 底层传输协议 * * @author shaozhuguang * @create 2019/4/11 @@ -20,14 +21,32 @@ import org.apache.commons.codec.binary.Base64; public class TransferMessage extends AbstractMessage implements IMessage{ + /** + * sessionId(描述节点信息) + */ private String sessionId; + /** + * 本次消息的类型 + * 0:请求; + * 1:应答; + */ private int type; + /** + * 消息的Key + */ private String key; + /** + * 消息载体的内容 + * 本内容不可被序列化 + */ private transient byte[] load; + /** + * 消息载体的内容->Base64转换 + */ private String loadBase64; public TransferMessage() { @@ -40,7 +59,13 @@ public class TransferMessage extends AbstractMessage implements IMessage{ this.load = load; } - public static TransferMessage toTransferMessageObj(Object msg) { + /** + * 转换为TransferMessage对象 + * + * @param msg + * @return + */ + public static TransferMessage toTransferMessage(Object msg) { if (msg == null) { return null; } @@ -80,6 +105,12 @@ public class TransferMessage extends AbstractMessage implements IMessage{ return JSON.toJSONString(this); } + /** + * 转换为监听的Key + * 该Key可描述为从远端发送来消息及其内容的唯一性 + * + * @return + */ public String toListenKey() { // 格式:sessionId:key return sessionId + ":" + key; diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/LocalNode.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/LocalNode.java index f58bd034..64c203bd 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/LocalNode.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/LocalNode.java @@ -8,10 +8,10 @@ */ package com.jd.blockchain.stp.communication.node; -import com.jd.blockchain.stp.communication.MessageExecute; +import com.jd.blockchain.stp.communication.MessageExecutor; /** - * + * 本地节点 * @author shaozhuguang * @create 2019/4/16 * @since 1.0.0 @@ -19,34 +19,69 @@ import com.jd.blockchain.stp.communication.MessageExecute; public class LocalNode extends RemoteNode { - private Class messageExecute; + /** + * 当前节点消息处理器 + * 该消息处理器用于描述远端节点收到当前节点的消息该如何处理 + * 通常该消息处理器会以字符串的形式发送至远端节点 + */ + private Class messageExecutorClass; - public LocalNode(String hostName, int port) { - super(hostName, port); - } + /** + * 当前节点接收消息默认处理器 + * 与messageExecutor不同,该字段描述的是当前节点接收到其他节点信息时的默认处理器 + * 该参数硬性要求必须不能为空 + */ + private MessageExecutor defaultMessageExecutor; - public LocalNode(String hostName, int port, MessageExecute messageExecute) { - super(hostName, port); - this.messageExecute = messageExecute.getClass(); + /** + * 构造器 + * @param hostName + * 当前节点Host,该Host必须是一种远端节点可访问的形式 + * @param port + * 当前节点监听端口 + * @param defaultMessageExecutor + * 当前节点接收到远端消息无法处理时的消息处理器 + * + */ + public LocalNode(String hostName, int port, MessageExecutor defaultMessageExecutor) { + this(hostName, port, null, defaultMessageExecutor); } - public LocalNode(String hostName, int port, Class messageExecute) { + /** + * 构造器 + * @param hostName + * 当前节点Host,该Host必须是一种远端节点可访问的形式 + * @param port + * 当前节点监听端口 + * @param messageExecutorClass + * 当前节点期望远端节点接收到消息后的处理器 + * @param defaultMessageExecutor + * 当前节点接收到远端消息无法处理时的消息处理器 + * + */ + public LocalNode(String hostName, int port, Class messageExecutorClass, MessageExecutor defaultMessageExecutor) { super(hostName, port); - this.messageExecute = messageExecute; + this.messageExecutorClass = messageExecutorClass; + this.defaultMessageExecutor = defaultMessageExecutor; } - public String messageExecuteClass() { - if (this.messageExecute == null) { + /** + * 返回消息执行器的类对应的字符串 + * 该返回值通常用于消息传递 + * @return + */ + public String messageExecutorClass() { + if (this.messageExecutorClass == null) { return null; } - return this.messageExecute.getName(); - } - - public void setMessageExecute(MessageExecute messageExecute) { - this.messageExecute = messageExecute.getClass(); + return this.messageExecutorClass.getName(); } - public void setMessageExecute(Class messageExecute) { - this.messageExecute = messageExecute; + /** + * 返回默认的消息处理器 + * @return + */ + public MessageExecutor defaultMessageExecutor() { + return this.defaultMessageExecutor; } } \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/RemoteNode.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/RemoteNode.java index dceb9115..7e2004c8 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/RemoteNode.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/RemoteNode.java @@ -9,7 +9,7 @@ package com.jd.blockchain.stp.communication.node; /** - * + * 节点信息 * @author shaozhuguang * @create 2019/4/11 * @since 1.0.0 @@ -17,8 +17,14 @@ package com.jd.blockchain.stp.communication.node; public class RemoteNode { + /** + * 监听端口 + */ private int port; + /** + * 当前节点域名 + */ private String hostName; public RemoteNode(String hostName, int port) { @@ -42,6 +48,10 @@ public class RemoteNode { this.hostName = hostName; } + /** + * 通过hostName+port形式作为判断节点的唯一标识 + * @return + */ @Override public int hashCode() { return (hostName + ":" + port).hashCode(); diff --git a/source/stp/stp-communication/src/test/java/com/jd/blockchain/SessionMessageTest.java b/source/stp/stp-communication/src/test/java/com/jd/blockchain/SessionMessageTest.java index 4a9211b6..b3cee4ab 100644 --- a/source/stp/stp-communication/src/test/java/com/jd/blockchain/SessionMessageTest.java +++ b/source/stp/stp-communication/src/test/java/com/jd/blockchain/SessionMessageTest.java @@ -30,7 +30,7 @@ public class SessionMessageTest { String transMsg = message.toTransfer(); System.out.println(transMsg); - SessionMessage sm = SessionMessage.toNodeSessionMessage(transMsg); + SessionMessage sm = SessionMessage.toSessionMessage(transMsg); assertEquals(transMsg, sm.toTransfer()); } diff --git a/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java b/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java index aa8f6014..e1223981 100644 --- a/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java +++ b/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java @@ -8,7 +8,7 @@ */ package com.jd.blockchain.stp.commucation; -import com.jd.blockchain.stp.communication.MessageExecute; +import com.jd.blockchain.stp.communication.MessageExecutor; import com.jd.blockchain.stp.communication.RemoteSession; import java.nio.charset.Charset; @@ -20,7 +20,7 @@ import java.nio.charset.Charset; * @since 1.0.0 */ -public class MyMessageExecutor implements MessageExecute { +public class MyMessageExecutor implements MessageExecutor { @Override public byte[] receive(String key, byte[] data, RemoteSession session) { diff --git a/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpTest.java b/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpTest.java index 422fb9a4..2e66c8e1 100644 --- a/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpTest.java +++ b/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpTest.java @@ -9,7 +9,6 @@ package com.jd.blockchain; import com.jd.blockchain.stp.commucation.MyMessageExecutor; -import com.jd.blockchain.stp.communication.MessageExecute; import com.jd.blockchain.stp.communication.RemoteSession; import com.jd.blockchain.stp.communication.callback.CallBackBarrier; import com.jd.blockchain.stp.communication.callback.CallBackDataListener; @@ -27,6 +26,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.junit.Assert.assertNull; + /** * * @author shaozhuguang @@ -57,7 +58,7 @@ public class StpTest { listenStart(); System.out.println("---------- listenComplete -----------"); System.out.println("---------- ConnectionStart ----------"); - connectAllOthers(); + connectOneOther(); System.out.println("---------- ConnectionComplete ----------"); } @@ -68,12 +69,17 @@ public class StpTest { final int port = listenPorts[i], index = i; threadPool.execute(() -> { // 创建本地节点 - final LocalNode localNode = new LocalNode(remoteHost, port, MyMessageExecutor.class); - // 启动当前节点 - RemoteSessionManager sessionManager = new RemoteSessionManager(localNode); - sessionManagers[index] = sessionManager; - countDownLatch.countDown(); - System.out.printf("Current Node {%s} start success !!! \r\n", localNode.toString()); + final LocalNode localNode = new LocalNode(remoteHost, port, new MyMessageExecutor()); + try { + // 启动当前节点 + RemoteSessionManager sessionManager = new RemoteSessionManager(localNode); + sessionManagers[index] = sessionManager; + System.out.printf("Current Node {%s} start success !!! \r\n", localNode.toString()); + } catch (Exception e) { + e.printStackTrace(); + } finally { + countDownLatch.countDown(); + } }); } @@ -99,7 +105,7 @@ public class StpTest { } } - remoteSessions = starter.newSessions(remoteNodes, new MyMessageExecutor()); + remoteSessions = starter.newSessions(remoteNodes); } private void connectOneOther() { @@ -116,7 +122,22 @@ public class StpTest { } } - remoteSessions = starter.newSessions(remoteNodes, new MyMessageExecutor()); + remoteSessions = starter.newSessions(remoteNodes); + } + + private void connectOneErrorNode() { + // 所有节点完成之后,需要启动 + // 启动一个节点 + RemoteSessionManager starter = sessionManagers[0]; + + // 当前节点需要连接到其他3个节点 + RemoteNode[] remoteNodes = new RemoteNode[1]; + + remoteNodes[0] = new RemoteNode(remoteHost, 10001); + + remoteSessions = starter.newSessions(remoteNodes); + + assertNull(remoteSessions); }