Browse Source

修改STP底层通信处理代码

tags/1.0.0
shaozhuguang 6 years ago
parent
commit
c65896df10
8 changed files with 156 additions and 105 deletions
  1. +53
    -17
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java
  2. +6
    -2
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Connection.java
  3. +1
    -1
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Receiver.java
  4. +26
    -33
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Sender.java
  5. +30
    -4
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/ReceiverHandler.java
  6. +26
    -2
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/SenderHandler.java
  7. +12
    -44
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java
  8. +2
    -2
      source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java

+ 53
- 17
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java View File

@@ -12,11 +12,14 @@ import com.jd.blockchain.stp.communication.callback.CallBackBarrier;
import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
import com.jd.blockchain.stp.communication.connection.Connection;
import com.jd.blockchain.stp.communication.message.LoadMessage;
import com.jd.blockchain.stp.communication.node.RemoteNode;
import org.apache.commons.codec.binary.Hex;

import java.util.concurrent.TimeUnit;


/**
* 远端Session
*
* @author shaozhuguang
* @create 2019/4/11
@@ -26,9 +29,14 @@ import java.util.concurrent.TimeUnit;
public class RemoteSession {

/**
* 远端节点ID
* 本地节点ID
*/
private String id;
private String localId;

/**
* 远端节点
*/
private RemoteNode remoteNode;

/**
* 远端连接
@@ -41,31 +49,31 @@ public class RemoteSession {
*/
private MessageExecutor messageExecutor;


/**
* 构造器
* @param id
* 远端节点ID
* @param localId
* 本地节点ID
* @param connection
* 对应连接
*/
public RemoteSession(String id, Connection connection) {
this(id, connection, null);
public RemoteSession(String localId, Connection connection) {
this(localId, connection, null);
}

/**
* 构造器
* @param id
* 远端ID
* @param localId
* 本地节点ID
* @param connection
* 对应连接
* @param messageExecutor
* 对应远端消息处理器
*/
public RemoteSession(String id, Connection connection, MessageExecutor messageExecutor) {
this.id = id;
public RemoteSession(String localId, Connection connection, MessageExecutor messageExecutor) {
this.localId = localId;
this.connection = connection;
this.messageExecutor = messageExecutor;
this.remoteNode = connection.remoteNode();
}

public void init() {
@@ -87,7 +95,7 @@ public class RemoteSession {
* @throws Exception
*/
public byte[] request(LoadMessage loadMessage) throws Exception {
return this.connection.request(this.id, loadMessage, null).getCallBackData();
return this.connection.request(this.localId, loadMessage, null).getCallBackData();
}

/**
@@ -105,7 +113,7 @@ public class RemoteSession {
* @throws Exception
*/
public byte[] request(LoadMessage loadMessage, long time, TimeUnit timeUnit) throws Exception {
return this.connection.request(this.id, loadMessage, null).getCallBackData(time, timeUnit);
return this.connection.request(this.localId, loadMessage, null).getCallBackData(time, timeUnit);
}

/**
@@ -133,7 +141,7 @@ public class RemoteSession {
* 应答,需要调用者从Listener中获取结果
*/
public CallBackDataListener asyncRequest(LoadMessage loadMessage, CallBackBarrier callBackBarrier) {
return this.connection.request(this.id, loadMessage, callBackBarrier);
return this.connection.request(this.localId, loadMessage, callBackBarrier);
}

/**
@@ -145,7 +153,7 @@ public class RemoteSession {
* 需要应答的负载消息
*/
public void reply(String key, LoadMessage loadMessage) {
this.connection.reply(this.id, key, loadMessage);
this.connection.reply(this.localId, key, loadMessage);
}

public void closeAll() {
@@ -160,11 +168,39 @@ public class RemoteSession {
this.connection.closeSender();
}

public String sessionId() {
return id;
/**
* 返回本地节点ID
*
* @return
*/
public String localId() {
return localId;
}

/**
* 返回远端对应的SessionID
*
* @return
*/
public String remoteSessionId() {
return Hex.encodeHexString(remoteNode.toString().getBytes());
}

/**
* 返回远端对应执行器
*
* @return
*/
public MessageExecutor messageExecutor() {
return this.messageExecutor;
}

/**
* 返回对应远端节点
*
* @return
*/
public RemoteNode remoteNode() {
return remoteNode;
}
}

+ 6
- 2
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Connection.java View File

@@ -64,7 +64,7 @@ public class Connection {
* @param remoteSession
*/
public void initSession(RemoteSession remoteSession) {
this.receiver.initRemoteSession(remoteSession.sessionId(), remoteSession);
this.receiver.initRemoteSession(remoteSession.remoteSessionId(), remoteSession);
}

/**
@@ -80,7 +80,7 @@ public class Connection {
*/
public CallBackLauncher connect(RemoteNode remoteNode, String messageExecutorClass) throws InterruptedException {
this.remoteNode = remoteNode;
this.sender = new Sender(this.remoteNode, sessionMessage(messageExecutorClass));
this.sender = new Sender(this.receiver.localNode(), this.remoteNode, sessionMessage(messageExecutorClass));
this.sender.connect();
return this.sender.waitBooted();
}
@@ -206,6 +206,10 @@ public class Connection {
closeSender();
}

public RemoteNode remoteNode() {
return remoteNode;
}

public void closeReceiver() {
this.receiver.close();
}


+ 1
- 1
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Receiver.java View File

@@ -125,7 +125,7 @@ public class Receiver extends AbstractAsyncExecutor implements Closeable {
* 当前节点的消息处理Class
*/
public void initReceiverHandler(ConnectionManager connectionManager, String messageExecutorClass) {
receiverHandler = new ReceiverHandler(connectionManager, messageExecutorClass, this.localNode.defaultMessageExecutor());
receiverHandler = new ReceiverHandler(connectionManager, messageExecutorClass, this.localNode);
}

/**


+ 26
- 33
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Sender.java View File

@@ -11,6 +11,7 @@ package com.jd.blockchain.stp.communication.connection;
import com.jd.blockchain.stp.communication.connection.handler.*;
import com.jd.blockchain.stp.communication.message.IMessage;
import com.jd.blockchain.stp.communication.message.SessionMessage;
import com.jd.blockchain.stp.communication.node.LocalNode;
import com.jd.blockchain.stp.communication.node.RemoteNode;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
@@ -47,34 +48,38 @@ public class Sender extends AbstractAsyncExecutor implements Closeable {
*/
private SessionMessage sessionMessage;

/**
* 远端HOST
*/
private String remoteHost;
private LocalNode localNode;

/**
* 远端端口
*/
private int remotePort;
private RemoteNode remoteNode;

// /**
// * 远端HOST
// */
// private String remoteHost;
//
// /**
// * 远端端口
// */
// private int remotePort;

/**
* 监听Handler(重连Handler)
*/
private WatchDogHandler watchDogHandler;

public Sender(RemoteNode remoteNode, SessionMessage sessionMessage) {
init(remoteNode, sessionMessage);
public Sender(LocalNode localNode, RemoteNode remoteNode, SessionMessage sessionMessage) {
init(localNode, remoteNode, sessionMessage);
}

public Sender(String remoteHost, int remotePort, SessionMessage sessionMessage) {
init(remoteHost, remotePort, sessionMessage);
}
// public Sender(String remoteHost, int remotePort, SessionMessage sessionMessage) {
// init(remoteHost, remotePort, sessionMessage);
// }

/**
* 连接
*/
public void connect() {
watchDogHandler = new WatchDogHandler(this.remoteHost, this.remotePort, bootstrap);
watchDogHandler = new WatchDogHandler(this.remoteNode.getHostName(), this.remoteNode.getPort(), bootstrap);

ChannelHandlers frontChannelHandlers = new ChannelHandlers()
.addHandler(watchDogHandler);
@@ -83,7 +88,7 @@ public class Sender extends AbstractAsyncExecutor implements Closeable {
.addHandler(new StringDecoder())
.addHandler(new HeartBeatSenderTrigger())
.addHandler(new HeartBeatSenderHandler())
.addHandler(new SenderHandler(this.sessionMessage));
.addHandler(new SenderHandler(this.localNode, this.remoteNode, this.sessionMessage));

// 初始化watchDogHandler
watchDogHandler.init(frontChannelHandlers.toArray(), afterChannelHandlers.toArray());
@@ -105,7 +110,7 @@ public class Sender extends AbstractAsyncExecutor implements Closeable {
runThread.execute(() -> {
try {
// 发起连接请求
channelFuture = bootstrap.connect(this.remoteHost, this.remotePort).sync();
channelFuture = bootstrap.connect(this.remoteNode.getHostName(), this.remoteNode.getPort()).sync();
boolean isStartSuccess = channelFuture.isSuccess();
if (isStartSuccess) {
// 启动成功
@@ -130,28 +135,16 @@ public class Sender extends AbstractAsyncExecutor implements Closeable {
/**
* 初始化相关配置
*
* @param localNode
* 本地节点
* @param remoteNode
* 远端节点
* @param sessionMessage
* 本地节点连接到远端节点后发送的SessionMessage
*/
private void init(RemoteNode remoteNode, SessionMessage sessionMessage) {
init(remoteNode.getHostName(), remoteNode.getPort(), sessionMessage);
}

/**
* 初始化相关配置
*
* @param remoteHost
* 远端HOST
* @param remotePort
* 远端端口
* @param sessionMessage
* 本地节点连接到远端节点后发送的SessionMessage
*/
private void init(String remoteHost, int remotePort, SessionMessage sessionMessage) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
private void init(LocalNode localNode, RemoteNode remoteNode, SessionMessage sessionMessage) {
this.localNode = localNode;
this.remoteNode = remoteNode;

this.sessionMessage = sessionMessage;



+ 30
- 4
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/ReceiverHandler.java View File

@@ -16,10 +16,12 @@ import com.jd.blockchain.stp.communication.connection.listener.ReplyListener;
import com.jd.blockchain.stp.communication.manager.ConnectionManager;
import com.jd.blockchain.stp.communication.message.SessionMessage;
import com.jd.blockchain.stp.communication.message.TransferMessage;
import com.jd.blockchain.stp.communication.node.LocalNode;
import com.jd.blockchain.stp.communication.node.RemoteNode;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.commons.codec.binary.Hex;

import java.io.Closeable;
import java.util.Map;
@@ -86,11 +88,17 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo
*/
private MessageExecutor defaultMessageExecutor;

/**
* 本地节点
*/
private LocalNode localNode;

public ReceiverHandler(ConnectionManager connectionManager, String localMsgExecutorClass,
MessageExecutor defaultMessageExecutor) {
LocalNode localNode) {
this.connectionManager = connectionManager;
this.localMsgExecutorClass = localMsgExecutorClass;
this.defaultMessageExecutor = defaultMessageExecutor;
this.defaultMessageExecutor = localNode.defaultMessageExecutor();
this.localNode = localNode;
initMsgExecutorPool();
}

@@ -109,7 +117,7 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

System.out.println("Receive Biz Message -> " + msg.toString());
System.out.printf("%s Receive Biz Message -> %s \r\n", this.localNode.toString(), msg.toString());
// 有数据接入
// 首先判断数据是否TransferMessage,当前Handler不处理非TransferMessage
TransferMessage tm = TransferMessage.toTransferMessage(msg);
@@ -239,7 +247,7 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo
// 假设连接失败的话,返回的Connection对象为null,此时不放入Map,等后续再处理
if (remoteConnection != null) {

remoteSession = new RemoteSession(sessionId, remoteConnection, messageExecutor);
remoteSession = new RemoteSession(this.localId(), remoteConnection, messageExecutor);

// Double check !!!
if (!remoteSessions.containsKey(sessionId)) {
@@ -312,6 +320,24 @@ public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Clo
msgExecuteThreadFactory, new ThreadPoolExecutor.AbortPolicy());
}

/**
* 返回本地节点
*
* @return
*/
public LocalNode localNode() {
return localNode;
}

/**
* 返回本地节点ID
*
* @return
*/
private String localId() {
return Hex.encodeHexString(localNode.toString().getBytes());
}

@Override
public void close() {
msgExecutorPool.shutdown();


+ 26
- 2
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/SenderHandler.java View File

@@ -9,6 +9,8 @@
package com.jd.blockchain.stp.communication.connection.handler;

import com.jd.blockchain.stp.communication.message.SessionMessage;
import com.jd.blockchain.stp.communication.node.LocalNode;
import com.jd.blockchain.stp.communication.node.RemoteNode;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -23,16 +25,38 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
@ChannelHandler.Sharable
public class SenderHandler extends ChannelInboundHandlerAdapter {

/**
* 本地session信息
*/
private SessionMessage sessionMessage;

public SenderHandler(SessionMessage sessionMessage) {
/**
* 本地节点
*/
private LocalNode localNode;

/**
* 远端节点
*/
private RemoteNode remoteNode;

public SenderHandler(LocalNode localNode, RemoteNode remoteNode, SessionMessage sessionMessage) {
this.localNode = localNode;
this.remoteNode = remoteNode;
this.sessionMessage = sessionMessage;
}

/**
* 连接远端节点成功时触发
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

// 发送本机信息(包括IP、端口等)至对端
System.out.println("Connection Receiver Success, Send Local Node Information !!!");
System.out.printf("%s Connect %s Success, Send Local Node Information !!! \r\n", this.localNode, this.remoteNode);
ctx.writeAndFlush(sessionMessage.toTransferByteBuf());
}



+ 12
- 44
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java View File

@@ -60,6 +60,11 @@ public class RemoteSessionManager {
*/
private LocalNode localNode;

/**
* 本地节点ID
*/
private String localId;

/**
* 构造器
* @param localNode
@@ -67,6 +72,7 @@ public class RemoteSessionManager {
*/
public RemoteSessionManager(LocalNode localNode) {
this.localNode = localNode;
this.localId = localId();
// 校验本地节点的配置,防止异常
check();
this.connectionManager = ConnectionManager.newConnectionManager(this.localNode);
@@ -83,24 +89,13 @@ public class RemoteSessionManager {
}
}

/**
* 生成新的Session
* @param remoteNode
* @return
*/
public RemoteSession newSession(RemoteNode remoteNode) {
return newSession(null, remoteNode);
}

/**
* RemoteSession对象生成器
* @param sessionId
* RemoteSession的Key
* @param remoteNode
* 远端节点信息
* @return
*/
public RemoteSession newSession(String sessionId, RemoteNode remoteNode) {
public RemoteSession newSession(RemoteNode remoteNode) {

RemoteSession remoteSession = nodeRemoteSessionMap.get(remoteNode);

@@ -112,16 +107,13 @@ public class RemoteSessionManager {

// Double Check !!!
if (!nodeRemoteSessionMap.containsKey(remoteNode)) {
if (sessionId == null) {
sessionId = sessionId(localNode);
}
Connection remoteConnection = this.connectionManager.connect(remoteNode, localNode.messageExecutorClass());

if (remoteConnection == null) {
return null;
}

remoteSession = new RemoteSession(sessionId, remoteConnection);
remoteSession = new RemoteSession(localId, remoteConnection);

remoteSession.init();

@@ -133,26 +125,14 @@ public class RemoteSessionManager {
lock.unlock();
}
}
return remoteSession;
return null;
}

public RemoteSession[] newSessions(RemoteNode[] remoteNodes) {

return newSessions(null, remoteNodes);
}

public RemoteSession[] newSessions(String[] sessionIds, RemoteNode[] remoteNodes) {
checkSessions(sessionIds, remoteNodes);

List<RemoteSession> remoteSessionList = new ArrayList<>();

for (int i = 0; i < remoteNodes.length; i++) {
RemoteSession remoteSession;
if (sessionIds == null) {
remoteSession = newSession(remoteNodes[i]);
} else {
remoteSession = newSession(sessionIds[i], remoteNodes[i]);
}
RemoteSession remoteSession = newSession(remoteNodes[i]);
if (remoteSession != null) {
remoteSessionList.add(remoteSession);
}
@@ -194,19 +174,7 @@ public class RemoteSessionManager {
return this.connectionManager.start(this.localNode.messageExecutorClass());
}

private void checkSessions(String[] sessionIds, RemoteNode[] remoteNodes) {
if (remoteNodes == null || remoteNodes.length <= 0) {
throw new IllegalArgumentException("RemoteNodes is empty !!!");
}

if (sessionIds != null) {
if (sessionIds.length != remoteNodes.length) {
throw new IllegalArgumentException("RemoteNodes and sessionIds are different in length !!!");
}
}
}

private String sessionId(RemoteNode remoteNode) {
return Hex.encodeHexString(remoteNode.toString().getBytes());
private String localId() {
return Hex.encodeHexString(localNode.toString().getBytes());
}
}

+ 2
- 2
source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java View File

@@ -25,8 +25,8 @@ public class MyMessageExecutor implements MessageExecutor {
@Override
public byte[] receive(String key, byte[] data, RemoteSession session) {
String receiveMsg = new String(data, Charset.defaultCharset());
System.out.printf("receive client {%s} request {%s} \r\n", session.sessionId(), receiveMsg);
String msg = session.sessionId() + " -> received !!!";
System.out.printf("receive client {%s} request {%s} \r\n", session.remoteNode().toString(), receiveMsg);
String msg = session.localId() + " -> received !!!";
return msg.getBytes(Charset.defaultCharset());
}



Loading…
Cancel
Save