From c65896df102547baf426f15f84f8d947fe93301b Mon Sep 17 00:00:00 2001 From: shaozhuguang Date: Tue, 23 Apr 2019 18:10:36 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9STP=E5=BA=95=E5=B1=82?= =?UTF-8?q?=E9=80=9A=E4=BF=A1=E5=A4=84=E7=90=86=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../stp/communication/RemoteSession.java | 70 ++++++++++++++----- .../communication/connection/Connection.java | 8 ++- .../communication/connection/Receiver.java | 2 +- .../stp/communication/connection/Sender.java | 59 +++++++--------- .../connection/handler/ReceiverHandler.java | 34 +++++++-- .../connection/handler/SenderHandler.java | 28 +++++++- .../manager/RemoteSessionManager.java | 56 ++++----------- .../stp/commucation/MyMessageExecutor.java | 4 +- 8 files changed, 156 insertions(+), 105 deletions(-) diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java index f72a071c..0ce9fceb 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java @@ -12,11 +12,14 @@ import com.jd.blockchain.stp.communication.callback.CallBackBarrier; import com.jd.blockchain.stp.communication.callback.CallBackDataListener; import com.jd.blockchain.stp.communication.connection.Connection; import com.jd.blockchain.stp.communication.message.LoadMessage; +import com.jd.blockchain.stp.communication.node.RemoteNode; +import org.apache.commons.codec.binary.Hex; import java.util.concurrent.TimeUnit; /** + * 远端Session * * @author shaozhuguang * @create 2019/4/11 @@ -26,9 +29,14 @@ import java.util.concurrent.TimeUnit; public class RemoteSession { /** - * 远端节点ID + * 本地节点ID */ - private String id; + private String localId; + + /** + * 远端节点 + */ + private RemoteNode remoteNode; /** * 远端连接 @@ -41,31 +49,31 @@ public class RemoteSession { */ private MessageExecutor messageExecutor; - /** * 构造器 - * @param id - * 远端节点ID + * @param localId + * 本地节点ID * @param connection * 对应连接 */ - public RemoteSession(String id, Connection connection) { - this(id, connection, null); + public RemoteSession(String localId, Connection connection) { + this(localId, connection, null); } /** * 构造器 - * @param id - * 远端ID + * @param localId + * 本地节点ID * @param connection * 对应连接 * @param messageExecutor * 对应远端消息处理器 */ - public RemoteSession(String id, Connection connection, MessageExecutor messageExecutor) { - this.id = id; + public RemoteSession(String localId, Connection connection, MessageExecutor messageExecutor) { + this.localId = localId; this.connection = connection; this.messageExecutor = messageExecutor; + this.remoteNode = connection.remoteNode(); } public void init() { @@ -87,7 +95,7 @@ public class RemoteSession { * @throws Exception */ public byte[] request(LoadMessage loadMessage) throws Exception { - return this.connection.request(this.id, loadMessage, null).getCallBackData(); + return this.connection.request(this.localId, loadMessage, null).getCallBackData(); } /** @@ -105,7 +113,7 @@ public class RemoteSession { * @throws Exception */ public byte[] request(LoadMessage loadMessage, long time, TimeUnit timeUnit) throws Exception { - return this.connection.request(this.id, loadMessage, null).getCallBackData(time, timeUnit); + return this.connection.request(this.localId, loadMessage, null).getCallBackData(time, timeUnit); } /** @@ -133,7 +141,7 @@ public class RemoteSession { * 应答,需要调用者从Listener中获取结果 */ public CallBackDataListener asyncRequest(LoadMessage loadMessage, CallBackBarrier callBackBarrier) { - return this.connection.request(this.id, loadMessage, callBackBarrier); + return this.connection.request(this.localId, loadMessage, callBackBarrier); } /** @@ -145,7 +153,7 @@ public class RemoteSession { * 需要应答的负载消息 */ public void reply(String key, LoadMessage loadMessage) { - this.connection.reply(this.id, key, loadMessage); + this.connection.reply(this.localId, key, loadMessage); } public void closeAll() { @@ -160,11 +168,39 @@ public class RemoteSession { this.connection.closeSender(); } - public String sessionId() { - return id; + /** + * 返回本地节点ID + * + * @return + */ + public String localId() { + return localId; + } + + /** + * 返回远端对应的SessionID + * + * @return + */ + public String remoteSessionId() { + return Hex.encodeHexString(remoteNode.toString().getBytes()); } + /** + * 返回远端对应执行器 + * + * @return + */ public MessageExecutor messageExecutor() { return this.messageExecutor; } + + /** + * 返回对应远端节点 + * + * @return + */ + public RemoteNode remoteNode() { + return remoteNode; + } } \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Connection.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Connection.java index 4c676ca4..5d534686 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Connection.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Connection.java @@ -64,7 +64,7 @@ public class Connection { * @param remoteSession */ public void initSession(RemoteSession remoteSession) { - this.receiver.initRemoteSession(remoteSession.sessionId(), remoteSession); + this.receiver.initRemoteSession(remoteSession.remoteSessionId(), remoteSession); } /** @@ -80,7 +80,7 @@ public class Connection { */ public CallBackLauncher connect(RemoteNode remoteNode, String messageExecutorClass) throws InterruptedException { this.remoteNode = remoteNode; - this.sender = new Sender(this.remoteNode, sessionMessage(messageExecutorClass)); + this.sender = new Sender(this.receiver.localNode(), this.remoteNode, sessionMessage(messageExecutorClass)); this.sender.connect(); return this.sender.waitBooted(); } @@ -206,6 +206,10 @@ public class Connection { closeSender(); } + public RemoteNode remoteNode() { + return remoteNode; + } + public void closeReceiver() { this.receiver.close(); } diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Receiver.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Receiver.java index 50eab985..fdbb0df6 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Receiver.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Receiver.java @@ -125,7 +125,7 @@ public class Receiver extends AbstractAsyncExecutor implements Closeable { * 当前节点的消息处理Class */ public void initReceiverHandler(ConnectionManager connectionManager, String messageExecutorClass) { - receiverHandler = new ReceiverHandler(connectionManager, messageExecutorClass, this.localNode.defaultMessageExecutor()); + receiverHandler = new ReceiverHandler(connectionManager, messageExecutorClass, this.localNode); } /** diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Sender.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Sender.java index dcef4009..7c17f18f 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Sender.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Sender.java @@ -11,6 +11,7 @@ package com.jd.blockchain.stp.communication.connection; import com.jd.blockchain.stp.communication.connection.handler.*; import com.jd.blockchain.stp.communication.message.IMessage; import com.jd.blockchain.stp.communication.message.SessionMessage; +import com.jd.blockchain.stp.communication.node.LocalNode; import com.jd.blockchain.stp.communication.node.RemoteNode; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; @@ -47,34 +48,38 @@ public class Sender extends AbstractAsyncExecutor implements Closeable { */ private SessionMessage sessionMessage; - /** - * 远端HOST - */ - private String remoteHost; + private LocalNode localNode; - /** - * 远端端口 - */ - private int remotePort; + private RemoteNode remoteNode; + +// /** +// * 远端HOST +// */ +// private String remoteHost; +// +// /** +// * 远端端口 +// */ +// private int remotePort; /** * 监听Handler(重连Handler) */ private WatchDogHandler watchDogHandler; - public Sender(RemoteNode remoteNode, SessionMessage sessionMessage) { - init(remoteNode, sessionMessage); + public Sender(LocalNode localNode, RemoteNode remoteNode, SessionMessage sessionMessage) { + init(localNode, remoteNode, sessionMessage); } - public Sender(String remoteHost, int remotePort, SessionMessage sessionMessage) { - init(remoteHost, remotePort, sessionMessage); - } +// public Sender(String remoteHost, int remotePort, SessionMessage sessionMessage) { +// init(remoteHost, remotePort, sessionMessage); +// } /** * 连接 */ public void connect() { - watchDogHandler = new WatchDogHandler(this.remoteHost, this.remotePort, bootstrap); + watchDogHandler = new WatchDogHandler(this.remoteNode.getHostName(), this.remoteNode.getPort(), bootstrap); ChannelHandlers frontChannelHandlers = new ChannelHandlers() .addHandler(watchDogHandler); @@ -83,7 +88,7 @@ public class Sender extends AbstractAsyncExecutor implements Closeable { .addHandler(new StringDecoder()) .addHandler(new HeartBeatSenderTrigger()) .addHandler(new HeartBeatSenderHandler()) - .addHandler(new SenderHandler(this.sessionMessage)); + .addHandler(new SenderHandler(this.localNode, this.remoteNode, this.sessionMessage)); // 初始化watchDogHandler watchDogHandler.init(frontChannelHandlers.toArray(), afterChannelHandlers.toArray()); @@ -105,7 +110,7 @@ public class Sender extends AbstractAsyncExecutor implements Closeable { runThread.execute(() -> { try { // 发起连接请求 - channelFuture = bootstrap.connect(this.remoteHost, this.remotePort).sync(); + channelFuture = bootstrap.connect(this.remoteNode.getHostName(), this.remoteNode.getPort()).sync(); boolean isStartSuccess = channelFuture.isSuccess(); if (isStartSuccess) { // 启动成功 @@ -130,28 +135,16 @@ public class Sender extends AbstractAsyncExecutor implements Closeable { /** * 初始化相关配置 * + * @param localNode + * 本地节点 * @param remoteNode * 远端节点 * @param sessionMessage * 本地节点连接到远端节点后发送的SessionMessage */ - private void init(RemoteNode remoteNode, SessionMessage sessionMessage) { - init(remoteNode.getHostName(), remoteNode.getPort(), sessionMessage); - } - - /** - * 初始化相关配置 - * - * @param remoteHost - * 远端HOST - * @param remotePort - * 远端端口 - * @param sessionMessage - * 本地节点连接到远端节点后发送的SessionMessage - */ - private void init(String remoteHost, int remotePort, SessionMessage sessionMessage) { - this.remoteHost = remoteHost; - this.remotePort = remotePort; + private void init(LocalNode localNode, RemoteNode remoteNode, SessionMessage sessionMessage) { + this.localNode = localNode; + this.remoteNode = remoteNode; this.sessionMessage = sessionMessage; diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/ReceiverHandler.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/ReceiverHandler.java index 8a13db4e..3459a804 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/ReceiverHandler.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/ReceiverHandler.java @@ -16,10 +16,12 @@ import com.jd.blockchain.stp.communication.connection.listener.ReplyListener; import com.jd.blockchain.stp.communication.manager.ConnectionManager; import com.jd.blockchain.stp.communication.message.SessionMessage; import com.jd.blockchain.stp.communication.message.TransferMessage; +import com.jd.blockchain.stp.communication.node.LocalNode; import com.jd.blockchain.stp.communication.node.RemoteNode; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.commons.codec.binary.Hex; import java.io.Closeable; import java.util.Map; @@ -86,11 +88,17 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo */ private MessageExecutor defaultMessageExecutor; + /** + * 本地节点 + */ + private LocalNode localNode; + public ReceiverHandler(ConnectionManager connectionManager, String localMsgExecutorClass, - MessageExecutor defaultMessageExecutor) { + LocalNode localNode) { this.connectionManager = connectionManager; this.localMsgExecutorClass = localMsgExecutorClass; - this.defaultMessageExecutor = defaultMessageExecutor; + this.defaultMessageExecutor = localNode.defaultMessageExecutor(); + this.localNode = localNode; initMsgExecutorPool(); } @@ -109,7 +117,7 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - System.out.println("Receive Biz Message -> " + msg.toString()); + System.out.printf("%s Receive Biz Message -> %s \r\n", this.localNode.toString(), msg.toString()); // 有数据接入 // 首先判断数据是否TransferMessage,当前Handler不处理非TransferMessage TransferMessage tm = TransferMessage.toTransferMessage(msg); @@ -239,7 +247,7 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo // 假设连接失败的话,返回的Connection对象为null,此时不放入Map,等后续再处理 if (remoteConnection != null) { - remoteSession = new RemoteSession(sessionId, remoteConnection, messageExecutor); + remoteSession = new RemoteSession(this.localId(), remoteConnection, messageExecutor); // Double check !!! if (!remoteSessions.containsKey(sessionId)) { @@ -312,6 +320,24 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo msgExecuteThreadFactory, new ThreadPoolExecutor.AbortPolicy()); } + /** + * 返回本地节点 + * + * @return + */ + public LocalNode localNode() { + return localNode; + } + + /** + * 返回本地节点ID + * + * @return + */ + private String localId() { + return Hex.encodeHexString(localNode.toString().getBytes()); + } + @Override public void close() { msgExecutorPool.shutdown(); diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/SenderHandler.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/SenderHandler.java index b3d194cb..ed763434 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/SenderHandler.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/SenderHandler.java @@ -9,6 +9,8 @@ package com.jd.blockchain.stp.communication.connection.handler; import com.jd.blockchain.stp.communication.message.SessionMessage; +import com.jd.blockchain.stp.communication.node.LocalNode; +import com.jd.blockchain.stp.communication.node.RemoteNode; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -23,16 +25,38 @@ import io.netty.channel.ChannelInboundHandlerAdapter; @ChannelHandler.Sharable public class SenderHandler extends ChannelInboundHandlerAdapter { + /** + * 本地session信息 + */ private SessionMessage sessionMessage; - public SenderHandler(SessionMessage sessionMessage) { + /** + * 本地节点 + */ + private LocalNode localNode; + + /** + * 远端节点 + */ + private RemoteNode remoteNode; + + public SenderHandler(LocalNode localNode, RemoteNode remoteNode, SessionMessage sessionMessage) { + this.localNode = localNode; + this.remoteNode = remoteNode; this.sessionMessage = sessionMessage; } + /** + * 连接远端节点成功时触发 + * + * @param ctx + * @throws Exception + */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { + // 发送本机信息(包括IP、端口等)至对端 - System.out.println("Connection Receiver Success, Send Local Node Information !!!"); + System.out.printf("%s Connect %s Success, Send Local Node Information !!! \r\n", this.localNode, this.remoteNode); ctx.writeAndFlush(sessionMessage.toTransferByteBuf()); } diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java index 8025252a..1086fb33 100644 --- a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java @@ -60,6 +60,11 @@ public class RemoteSessionManager { */ private LocalNode localNode; + /** + * 本地节点ID + */ + private String localId; + /** * 构造器 * @param localNode @@ -67,6 +72,7 @@ public class RemoteSessionManager { */ public RemoteSessionManager(LocalNode localNode) { this.localNode = localNode; + this.localId = localId(); // 校验本地节点的配置,防止异常 check(); this.connectionManager = ConnectionManager.newConnectionManager(this.localNode); @@ -83,24 +89,13 @@ public class RemoteSessionManager { } } - /** - * 生成新的Session - * @param remoteNode - * @return - */ - public RemoteSession newSession(RemoteNode remoteNode) { - return newSession(null, remoteNode); - } - /** * RemoteSession对象生成器 - * @param sessionId - * RemoteSession的Key * @param remoteNode * 远端节点信息 * @return */ - public RemoteSession newSession(String sessionId, RemoteNode remoteNode) { + public RemoteSession newSession(RemoteNode remoteNode) { RemoteSession remoteSession = nodeRemoteSessionMap.get(remoteNode); @@ -112,16 +107,13 @@ public class RemoteSessionManager { // Double Check !!! if (!nodeRemoteSessionMap.containsKey(remoteNode)) { - if (sessionId == null) { - sessionId = sessionId(localNode); - } Connection remoteConnection = this.connectionManager.connect(remoteNode, localNode.messageExecutorClass()); if (remoteConnection == null) { return null; } - remoteSession = new RemoteSession(sessionId, remoteConnection); + remoteSession = new RemoteSession(localId, remoteConnection); remoteSession.init(); @@ -133,26 +125,14 @@ public class RemoteSessionManager { lock.unlock(); } } - return remoteSession; + return null; } public RemoteSession[] newSessions(RemoteNode[] remoteNodes) { - - return newSessions(null, remoteNodes); - } - - public RemoteSession[] newSessions(String[] sessionIds, RemoteNode[] remoteNodes) { - checkSessions(sessionIds, remoteNodes); - List remoteSessionList = new ArrayList<>(); for (int i = 0; i < remoteNodes.length; i++) { - RemoteSession remoteSession; - if (sessionIds == null) { - remoteSession = newSession(remoteNodes[i]); - } else { - remoteSession = newSession(sessionIds[i], remoteNodes[i]); - } + RemoteSession remoteSession = newSession(remoteNodes[i]); if (remoteSession != null) { remoteSessionList.add(remoteSession); } @@ -194,19 +174,7 @@ public class RemoteSessionManager { return this.connectionManager.start(this.localNode.messageExecutorClass()); } - private void checkSessions(String[] sessionIds, RemoteNode[] remoteNodes) { - if (remoteNodes == null || remoteNodes.length <= 0) { - throw new IllegalArgumentException("RemoteNodes is empty !!!"); - } - - if (sessionIds != null) { - if (sessionIds.length != remoteNodes.length) { - throw new IllegalArgumentException("RemoteNodes and sessionIds are different in length !!!"); - } - } - } - - private String sessionId(RemoteNode remoteNode) { - return Hex.encodeHexString(remoteNode.toString().getBytes()); + private String localId() { + return Hex.encodeHexString(localNode.toString().getBytes()); } } \ No newline at end of file diff --git a/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java b/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java index e1223981..26656a8b 100644 --- a/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java +++ b/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java @@ -25,8 +25,8 @@ public class MyMessageExecutor implements MessageExecutor { @Override public byte[] receive(String key, byte[] data, RemoteSession session) { String receiveMsg = new String(data, Charset.defaultCharset()); - System.out.printf("receive client {%s} request {%s} \r\n", session.sessionId(), receiveMsg); - String msg = session.sessionId() + " -> received !!!"; + System.out.printf("receive client {%s} request {%s} \r\n", session.remoteNode().toString(), receiveMsg); + String msg = session.localId() + " -> received !!!"; return msg.getBytes(Charset.defaultCharset()); }