Browse Source

Add stp communication module

tags/1.0.0
shaozhuguang 5 years ago
parent
commit
562d801e3f
41 changed files with 2355 additions and 377 deletions
  1. +6
    -0
      source/pom.xml
  2. +3
    -2
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSProcessManager.java
  3. +4
    -4
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java
  4. +8
    -3
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java
  5. +15
    -0
      source/stp/stp-communication/pom.xml
  6. +36
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageExecute.java
  7. +0
    -21
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageHandler.java
  8. +31
    -44
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java
  9. +0
    -98
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSessionManager.java
  10. +56
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackBarrier.java
  11. +70
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackDataListener.java
  12. +51
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AbstractAsyncExecutor.java
  13. +25
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AsyncExecutor.java
  14. +120
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Connection.java
  15. +132
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Receiver.java
  16. +162
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Sender.java
  17. +41
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverHandler.java
  18. +40
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverTrigger.java
  19. +42
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderHandler.java
  20. +48
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderTrigger.java
  21. +223
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/ReceiverHandler.java
  22. +46
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/SenderHandler.java
  23. +183
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/WatchDogHandler.java
  24. +74
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/listener/ReplyListener.java
  25. +0
    -27
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/inner/Receiver.java
  26. +0
    -35
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/inner/Sender.java
  27. +110
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/ConnectionManager.java
  28. +132
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java
  29. +30
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/AbstractMessage.java
  30. +57
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/HeartBeatMessage.java
  31. +25
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/IMessage.java
  32. +95
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/SessionMessage.java
  33. +117
    -11
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/TransferMessage.java
  34. +52
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/LocalNode.java
  35. +25
    -1
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/RemoteNode.java
  36. +37
    -0
      source/stp/stp-communication/src/test/java/com/jd/blockchain/SessionMessageTest.java
  37. +0
    -131
      source/stp/stp-communication/src/test/java/com/jd/blockchain/StpTest.java
  38. +1
    -0
      source/test/pom.xml
  39. +36
    -0
      source/test/test-stp-community/pom.xml
  40. +37
    -0
      source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java
  41. +185
    -0
      source/test/test-stp-community/src/test/java/com/jd/blockchain/StpTest.java

+ 6
- 0
source/pom.xml View File

@@ -299,6 +299,12 @@
<artifactId>commons-collections4</artifactId>
<version>4.1</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
</dependencies>
</dependencyManagement>



+ 3
- 2
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSProcessManager.java View File

@@ -1,7 +1,7 @@
package com.jd.blockchain.statetransfer;

import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.RemoteSessionManager;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;

import java.net.InetSocketAddress;
import java.util.Map;
@@ -20,7 +20,8 @@ public class DSProcessManager {

DSTransferProcess startDSProcess(DataSequenceInfo dsInfo, InetSocketAddress listener, InetSocketAddress[] targets, DataSequenceWriter dsWriter, DataSequenceReader dsReader) {

RemoteSessionManager remoteSessionManager = new RemoteSessionManager(listener.getPort());
// RemoteSessionManager remoteSessionManager = new RemoteSessionManager(listener.getPort());
RemoteSessionManager remoteSessionManager = null;
DSTransferProcess dsTransferProcess = new DSTransferProcess(dsInfo, remoteSessionManager, targets, dsWriter, dsReader);

dsTransferProcess.start();


+ 4
- 4
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java View File

@@ -1,8 +1,8 @@
package com.jd.blockchain.statetransfer;

import com.jd.blockchain.stp.communication.RemoteNode;
import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.RemoteSessionManager;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import com.jd.blockchain.stp.communication.node.RemoteNode;

import java.net.InetSocketAddress;

@@ -69,8 +69,8 @@ public class DSTransferProcess {

for (int i = 0; i < remoteSessions.length; i++) {
DataSequenceMsgHandle msgHandle = new DataSequenceMsgHandle(dsReader, dsWriter);
remoteSessions[i].initHandler(msgHandle);
remoteSessions[i].connect();
remoteSessions[i].initExecute(msgHandle);
remoteSessions[i].init();
}
}



+ 8
- 3
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java View File

@@ -1,6 +1,6 @@
package com.jd.blockchain.statetransfer;

import com.jd.blockchain.stp.communication.MessageHandler;
import com.jd.blockchain.stp.communication.MessageExecute;
import com.jd.blockchain.stp.communication.RemoteSession;

/**
@@ -9,7 +9,7 @@ import com.jd.blockchain.stp.communication.RemoteSession;
* @create 2019/4/11
* @since 1.0.0
*/
public class DataSequenceMsgHandle implements MessageHandler {
public class DataSequenceMsgHandle implements MessageExecute {

DataSequenceReader dsReader;
DataSequenceWriter dsWriter;
@@ -20,8 +20,13 @@ public class DataSequenceMsgHandle implements MessageHandler {
}

@Override
public void receive(byte[] key, byte[] data, RemoteSession session) {
public byte[] receive(String key, byte[] data, RemoteSession session) {
return new byte[0];
}

@Override
public REPLY replyType() {
return REPLY.AUTO;
}

/**


+ 15
- 0
source/stp/stp-communication/pom.xml View File

@@ -26,10 +26,25 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.29.Final</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
</project>

+ 36
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageExecute.java View File

@@ -0,0 +1,36 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.MessageExecute
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午10:59
* Description:
*/
package com.jd.blockchain.stp.communication;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public interface MessageExecute {

byte[] receive(String key, byte[] data, RemoteSession session);

REPLY replyType();

// 应答方式
enum REPLY {
// 手动应答:Receiver不会自动发送应答请求,需要调用
// session.reply(String key, LoadMessage loadMessage) 或
// asyncReply(String key, LoadMessage loadMessage)
MANUAL,

// 自动应答:Receiver会根据receive方法的应答结果自动调用应答
// 使用者不能重新调用
AUTO,
;
}
}

+ 0
- 21
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageHandler.java View File

@@ -1,21 +0,0 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.MessageHandler
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午10:59
* Description:
*/
package com.jd.blockchain.stp.communication;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public interface MessageHandler {

void receive(byte[] key, byte[] data, RemoteSession session);
}

+ 31
- 44
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java View File

@@ -8,14 +8,11 @@
*/
package com.jd.blockchain.stp.communication;

import com.jd.blockchain.stp.communication.inner.Receiver;
import com.jd.blockchain.stp.communication.inner.Sender;
import com.jd.blockchain.stp.communication.callback.CallBackBarrier;
import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
import com.jd.blockchain.stp.communication.connection.Connection;
import com.jd.blockchain.stp.communication.message.LoadMessage;

import java.net.InetAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;

/**
*
@@ -28,67 +25,57 @@ public class RemoteSession {

private String id;

private RemoteNode remoteNode;
private Connection connection;

private Sender sender;

private Receiver receiver;

private MessageHandler messageHandler;

public void initHandler(MessageHandler messageHandler) {
private MessageExecute messageExecute;

public RemoteSession(String id, Connection connection, MessageExecute messageExecute) {
this.id = id;
this.connection = connection;
this.messageExecute = messageExecute;
}

public void connect() {

public void init() {
connection.initSession(this);
}

public byte[] send(LoadMessage loadMessage) {

return null;
public void initExecute(MessageExecute messageExecute) {
this.messageExecute = messageExecute;
}

public Future<byte[]> asyncSend(LoadMessage loadMessage) {
return null;
public byte[] request(LoadMessage loadMessage) throws Exception {
return this.connection.request(this.id, loadMessage, null).getCallBackData();
}

public Future<byte[]> asyncSend(LoadMessage loadMessage, CountDownLatch countDownLatch) {
return null;
public CallBackDataListener asyncRequest(LoadMessage loadMessage) {
return asyncRequest(loadMessage, null);
}

public byte[] send(byte[] loadMessage) {

return null;
public CallBackDataListener asyncRequest(LoadMessage loadMessage, CallBackBarrier callBackBarrier) {
return this.connection.request(this.id, loadMessage, callBackBarrier);
}

public Future<byte[]> asyncSend(byte[] loadMessage) {

return null;
public void reply(String key, LoadMessage loadMessage) {
this.connection.reply(this.id, key, loadMessage);
}

public Future<byte[]> asyncSend(byte[] loadMessage, CountDownLatch countDownLatch) {

return null;
public void closeAll() {
this.connection.closeAll();
}

public void reply(byte[] key, LoadMessage loadMessage) {
public void closeReceiver() {
this.connection.closeReceiver();
}

public void asyncReply(byte[] key, LoadMessage loadMessage) {
public void closeSender() {
this.connection.closeSender();
}

public void reply(byte[] key, byte[] loadMessage) {
public String sessionId() {
return id;
}

public void asyncReply(byte[] key, byte[] loadMessage) {

}

public void close() {

public MessageExecute messageExecute() {
return this.messageExecute;
}
}

+ 0
- 98
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSessionManager.java View File

@@ -1,98 +0,0 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.RemoteSessionManager
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午11:22
* Description:
*/
package com.jd.blockchain.stp.communication;

import com.jd.blockchain.stp.communication.inner.Receiver;

import java.net.InetAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class RemoteSessionManager {

private static Map<Integer, Receiver> receiverMap = new ConcurrentHashMap<>();

private static final Lock lock = new ReentrantLock();

public RemoteSessionManager(int listenPort) {
if (listenPort <= 0) {
throw new IllegalArgumentException("Illegal port, please check !!!");
}

if (!receiverMap.containsKey(listenPort)) {
try {
lock.lock();
if (!receiverMap.containsKey(listenPort)) {
Receiver receiver = initReceiver(listenPort);
receiverMap.put(listenPort, receiver);
}
} finally {
lock.unlock();
}
}
}

public RemoteSession newSession(RemoteNode remoteNode) {
return newSession(toRemoteId(remoteNode), remoteNode);
}

public RemoteSession newSession(String remoteId, RemoteNode remoteNode) {
return newSession(remoteId, remoteNode, null);
}

public RemoteSession newSession(RemoteNode remoteNode, MessageHandler messageHandler) {
return newSession(toRemoteId(remoteNode), remoteNode, messageHandler);
}

public RemoteSession newSession(String remoteId, RemoteNode remoteNode, MessageHandler messageHandler) {


return null;
}

public RemoteSession[] newSessions(RemoteNode[] remoteNodes) {


return null;
}

public RemoteSession[] newSessions(String[] remoteIds, RemoteNode[] remoteNodes) {

return null;
}

public RemoteSession[] newSessions(RemoteNode[] remoteNodes, MessageHandler messageHandler) {

return null;
}

public RemoteSession[] newSessions(String[] remoteIds, RemoteNode[] remoteNodes, MessageHandler messageHandler) {

return null;
}

public String toRemoteId(RemoteNode remoteNode) {

return null;
}

private Receiver initReceiver(int port) {

return null;
}
}

+ 56
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackBarrier.java View File

@@ -0,0 +1,56 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.CallBackBarrier
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/12 上午10:22
* Description:
*/
package com.jd.blockchain.stp.communication.callback;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
*
* @author shaozhuguang
* @create 2019/4/12
* @since 1.0.0
*/

public class CallBackBarrier {

private CountDownLatch countDownLatch;

private long maxTryCallMillSeconds = 2000;

public static final CallBackBarrier newCallBackBarrier(int barrierLength) {
return new CallBackBarrier(barrierLength);
}

public static final CallBackBarrier newCallBackBarrier(int barrierLength, long maxTryCallMillSeconds) {
return new CallBackBarrier(barrierLength, maxTryCallMillSeconds);
}

private CallBackBarrier(int barrierLength) {
this.countDownLatch = new CountDownLatch(barrierLength);
}

private CallBackBarrier(int barrierLength, long maxTryCallMillSeconds) {
this.countDownLatch = new CountDownLatch(barrierLength);
this.maxTryCallMillSeconds = maxTryCallMillSeconds;
}

public void release() {
countDownLatch.countDown();
}

public boolean tryCall() throws InterruptedException {
return countDownLatch.await(maxTryCallMillSeconds, TimeUnit.MILLISECONDS);
}

public boolean tryCall(long timeout, TimeUnit unit) throws InterruptedException {
return countDownLatch.await(timeout, unit);
}
}

+ 70
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/callback/CallBackDataListener.java View File

@@ -0,0 +1,70 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.callback.CallBackDataListener
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/15 下午4:40
* Description:
*/
package com.jd.blockchain.stp.communication.callback;

import com.jd.blockchain.stp.communication.node.RemoteNode;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
*
* @author shaozhuguang
* @create 2019/4/15
* @since 1.0.0
*/

public class CallBackDataListener {

private CompletableFuture<byte[]> future = new CompletableFuture<>();

private RemoteNode remoteNode;

private boolean isFill = false;

private Lock lock = new ReentrantLock();

public CallBackDataListener(RemoteNode remoteNode) {
this.remoteNode = remoteNode;
}

public byte[] getCallBackData() throws InterruptedException, ExecutionException {
return future.get();
}

public byte[] getCallBackData(long time, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
return future.get(time, timeUnit);
}

public void setCallBackData(byte[] data) {
if (!isFill) {
try {
lock.lock();
if (!isFill) {
future.complete(data);
isFill = true;
}
} finally {
lock.unlock();
}
}
}

public RemoteNode remoteNode() {
return this.remoteNode;
}

public boolean isDone() {
return future.isDone();
}
}

+ 51
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AbstractAsyncExecutor.java View File

@@ -0,0 +1,51 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.connection.AbstractAsyncExecutor
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/17 上午11:16
* Description:
*/
package com.jd.blockchain.stp.communication.connection;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.concurrent.*;

/**
*
* @author shaozhuguang
* @create 2019/4/17
* @since 1.0.0
*/

public abstract class AbstractAsyncExecutor implements AsyncExecutor{

private static final int QUEUE_CAPACITY = 1024;

protected final Semaphore isStarted = new Semaphore(0, true);

protected boolean isStartSuccess = false;

@Override
public ThreadPoolExecutor initRunThread() {
ThreadFactory timerFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNameFormat()).build();

ThreadPoolExecutor runThread = new ThreadPoolExecutor(1, 1,
0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(QUEUE_CAPACITY),
timerFactory,
new ThreadPoolExecutor.AbortPolicy());

return runThread;
}

@Override
public boolean waitStarted() throws InterruptedException {
this.isStarted.acquire();
return this.isStartSuccess;
}

public abstract String threadNameFormat();
}

+ 25
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/AsyncExecutor.java View File

@@ -0,0 +1,25 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.connection.AsyncExecutor
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/17 上午11:14
* Description:
*/
package com.jd.blockchain.stp.communication.connection;

import java.util.concurrent.ThreadPoolExecutor;

/**
*
* @author shaozhuguang
* @create 2019/4/17
* @since 1.0.0
*/

public interface AsyncExecutor {

ThreadPoolExecutor initRunThread();

boolean waitStarted() throws InterruptedException;
}

+ 120
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Connection.java View File

@@ -0,0 +1,120 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.connection.Connection
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 下午5:39
* Description:
*/
package com.jd.blockchain.stp.communication.connection;

import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.callback.CallBackBarrier;
import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
import com.jd.blockchain.stp.communication.connection.listener.ReplyListener;
import com.jd.blockchain.stp.communication.message.LoadMessage;
import com.jd.blockchain.stp.communication.message.SessionMessage;
import com.jd.blockchain.stp.communication.message.TransferMessage;
import com.jd.blockchain.stp.communication.node.LocalNode;
import com.jd.blockchain.stp.communication.node.RemoteNode;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.digest.DigestUtils;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class Connection {

private RemoteNode remoteNode;

private Receiver receiver;

private Sender sender;

public Connection(Receiver receiver) {
this.receiver = receiver;
}

public void initSession(RemoteSession remoteSession) {
this.receiver.initRemoteSession(remoteSession.sessionId(), remoteSession);
}

public boolean connect(RemoteNode remoteNode, String messageExecuteClass) throws InterruptedException {
this.remoteNode = remoteNode;
this.sender = new Sender(this.remoteNode, sessionMessage(messageExecuteClass));
this.sender.connect();
return this.sender.waitStarted();
}

public CallBackDataListener request(String sessionId, LoadMessage loadMessage, CallBackBarrier callBackBarrier) {

TransferMessage transferMessage = transferMessage(sessionId, null, loadMessage, TransferMessage.MESSAGE_TYPE.TYPE_REQUEST);

// 监听器的Key
String listenKey = transferMessage.toListenKey();

// 创建监听器
ReplyListener replyListener = new ReplyListener(listenKey, this.remoteNode, callBackBarrier);

// 添加监听器至Receiver
this.receiver.addListener(replyListener);

// 发送请求
this.sender.send(transferMessage);

return replyListener.callBackDataListener();
}

public void reply(String sessionId, String key, LoadMessage loadMessage) {
TransferMessage transferMessage = transferMessage(sessionId, key, loadMessage, TransferMessage.MESSAGE_TYPE.TYPE_RESPONSE);

// 通过Sender发送数据
this.sender.send(transferMessage);
}

private String loadKey(LoadMessage loadMessage) {
// 使用Sha256求Hash
byte[] sha256Bytes = DigestUtils.sha256(loadMessage.toBytes());
// 使用base64作为Key
return Base64.encodeBase64String(sha256Bytes);
}

private TransferMessage transferMessage(String sessionId, String key, LoadMessage loadMessage, TransferMessage.MESSAGE_TYPE messageType) {

if (key == null || key.length() == 0) {
key = loadKey(loadMessage);
}

TransferMessage transferMessage = new TransferMessage(
sessionId, messageType.code(), key, loadMessage.toBytes());

return transferMessage;
}

private SessionMessage sessionMessage(String messageExecuteClass) {

LocalNode localNode = this.receiver.localNode();

SessionMessage sessionMessage = new SessionMessage(
localNode.getHostName(), localNode.getPort(), messageExecuteClass);

return sessionMessage;
}

public void closeAll() {
closeReceiver();
closeSender();
}

public void closeReceiver() {
this.receiver.close();
}

public void closeSender() {
this.sender.close();
}
}

+ 132
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Receiver.java View File

@@ -0,0 +1,132 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.inner.Receiver
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午10:59
* Description:
*/
package com.jd.blockchain.stp.communication.connection;

import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.connection.handler.HeartBeatReceiverHandler;
import com.jd.blockchain.stp.communication.connection.handler.HeartBeatReceiverTrigger;
import com.jd.blockchain.stp.communication.connection.handler.ReceiverHandler;
import com.jd.blockchain.stp.communication.connection.listener.ReplyListener;
import com.jd.blockchain.stp.communication.manager.ConnectionManager;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import com.jd.blockchain.stp.communication.node.LocalNode;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.*;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class Receiver extends AbstractAsyncExecutor implements Closeable {

private final EventLoopGroup bossGroup = new NioEventLoopGroup();

private final EventLoopGroup workerGroup = new NioEventLoopGroup();

private LocalNode localNode;

private ReceiverHandler receiverHandler;

public Receiver(LocalNode localNode) {
this.localNode = localNode;
}

public void startListen() {
ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.localAddress(new InetSocketAddress(this.localNode.getPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
// .addLast(new LoggingHandler(LogLevel.ERROR))
.addLast(new IdleStateHandler(8, 0, 0, TimeUnit.SECONDS))
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
.addLast(new HeartBeatReceiverTrigger())
.addLast(new HeartBeatReceiverHandler())
.addLast(receiverHandler);
}
});

// 由单独的线程启动
ThreadPoolExecutor runThread = initRunThread();
runThread.execute(() -> {
try {
ChannelFuture f = bootstrap.bind().sync();
super.isStartSuccess = f.isSuccess();
super.isStarted.release();
if (super.isStartSuccess) {
// 启动成功
f.channel().closeFuture().sync();
} else {
// 启动失败
throw new Exception("Receiver start fail :" + f.cause().getMessage() + " !!!");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
close();
}
});
}

@Override
public String threadNameFormat() {
return "receiver-pool-%d";
}

public void initReceiverHandler(ConnectionManager connectionManager, String messageExecuteClass) {
receiverHandler = new ReceiverHandler(connectionManager, messageExecuteClass);
}

public void initRemoteSession(String sessionId, RemoteSession remoteSession) {
receiverHandler.putRemoteSession(sessionId, remoteSession);
}

public void addListener(ReplyListener replyListener) {
receiverHandler.addListener(replyListener);
}

@Override
public void close() {
receiverHandler.close();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

public LocalNode localNode() {
return this.localNode;
}
}

+ 162
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/Sender.java View File

@@ -0,0 +1,162 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.inner.Sender
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午10:58
* Description:
*/
package com.jd.blockchain.stp.communication.connection;

import com.jd.blockchain.stp.communication.connection.handler.*;
import com.jd.blockchain.stp.communication.message.IMessage;
import com.jd.blockchain.stp.communication.message.SessionMessage;
import com.jd.blockchain.stp.communication.node.RemoteNode;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.io.Closeable;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class Sender extends AbstractAsyncExecutor implements Closeable {

private final EventLoopGroup loopGroup = new NioEventLoopGroup();

private Bootstrap bootstrap;

private ChannelFuture channelFuture;

private SessionMessage sessionMessage;

private String remoteHost;

private int remotePort;

private WatchDogHandler watchDogHandler;

public Sender(RemoteNode remoteNode, SessionMessage sessionMessage) {
init(remoteNode, sessionMessage);
}

public Sender(String remoteHost, int remotePort, SessionMessage sessionMessage) {
init(remoteHost, remotePort, sessionMessage);
}

public void connect() {
watchDogHandler = new WatchDogHandler(this.remoteHost, this.remotePort, bootstrap);

ChannelHandlers frontChannelHandlers = new ChannelHandlers()
.addHandler(watchDogHandler);

ChannelHandlers afterChannelHandlers = new ChannelHandlers()
.addHandler(new StringDecoder())
.addHandler(new HeartBeatSenderTrigger())
.addHandler(new HeartBeatSenderHandler())
.addHandler(new SenderHandler(this.sessionMessage));

// 初始化watchDogHandler
watchDogHandler.init(frontChannelHandlers.toArray(), afterChannelHandlers.toArray());

bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(frontChannelHandlers.toArray())
.addLast(new IdleStateHandler(10, 4, 0, TimeUnit.SECONDS))
.addLast(new LineBasedFrameDecoder(1024))
.addLast(afterChannelHandlers.toArray());
}
});

ThreadPoolExecutor runThread = initRunThread();

runThread.execute(() -> {
try {
// 发起连接请求
channelFuture = bootstrap.connect(this.remoteHost, this.remotePort).sync();

isStartSuccess = channelFuture.isSuccess();
isStarted.release();
if (isStartSuccess) {
// 启动成功
// 设置ChannelFuture对象,以便于发送的连接状态处理
watchDogHandler.initChannelFuture(channelFuture);
// 等待客户端关闭连接
channelFuture.channel().closeFuture().sync();
} else {
// 启动失败
throw new Exception("Sender start fail :" + channelFuture.cause().getMessage() + " !!!");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
close();
}
});
}

private void init(RemoteNode remoteNode, SessionMessage sessionMessage) {
init(remoteNode.getHostName(), remoteNode.getPort(), sessionMessage);
}

private void init(String remoteHost, int remotePort, SessionMessage sessionMessage) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;

this.sessionMessage = sessionMessage;

this.bootstrap = new Bootstrap().group(loopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true);
}

@Override
public String threadNameFormat() {
return "sender-pool-%d";
}

public void send(IMessage message) {
watchDogHandler.channelFuture().channel().writeAndFlush(message.toTransferByteBuf());
}

@Override
public void close() {
// 因为要重连,需要仍然需要使用该LoopGroup,因此不能关闭
// loopGroup.shutdownGracefully();
}

public static class ChannelHandlers {

private List<ChannelHandler> channelHandlers = new ArrayList<>();

public ChannelHandlers addHandler(ChannelHandler channelHandler) {
channelHandlers.add(channelHandler);
return this;
}

public ChannelHandler[] toArray() {
ChannelHandler[] channelHandlerArray = new ChannelHandler[channelHandlers.size()];
return channelHandlers.toArray(channelHandlerArray);
}
}
}

+ 41
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverHandler.java View File

@@ -0,0 +1,41 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.connection.handler.HeartBeatSenderHandler
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/15 上午10:10
* Description:
*/
package com.jd.blockchain.stp.communication.connection.handler;

import com.jd.blockchain.stp.communication.message.HeartBeatMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
*
* @author shaozhuguang
* @create 2019/4/15
* @since 1.0.0
*/
@ChannelHandler.Sharable
public class HeartBeatReceiverHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (HeartBeatMessage.isHeartBeat(msg)) {
// 收到的消息是心跳消息,此时需要回复一个心跳消息
HeartBeatMessage.write(ctx);
System.out.println("Receive HeartBeat Request Message -> " + msg.toString());
} else {
super.channelRead(ctx, msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 出现异常直接关闭连接
ctx.close();
}
}

+ 40
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatReceiverTrigger.java View File

@@ -0,0 +1,40 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.connection.handler.HeartBeatSenderTrigger
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/15 上午10:11
* Description:
*/
package com.jd.blockchain.stp.communication.connection.handler;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

/**
*
* @author shaozhuguang
* @create 2019/4/15
* @since 1.0.0
*/
@ChannelHandler.Sharable
public class HeartBeatReceiverTrigger extends ChannelInboundHandlerAdapter {

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 服务端只会接收心跳数据后应答,而不会主动应答
if (evt instanceof IdleStateEvent) {
IdleState idleState = ((IdleStateEvent) evt).state();
if (idleState.equals(IdleState.READER_IDLE)) {
// 长时间未收到客户端请求,则关闭连接
System.out.println("Long Time UnReceive HeartBeat Request, Close Connection !!!");
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}

+ 42
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderHandler.java View File

@@ -0,0 +1,42 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.connection.handler.HeartBeatSenderHandler
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/15 上午10:10
* Description:
*/
package com.jd.blockchain.stp.communication.connection.handler;

import com.jd.blockchain.stp.communication.message.HeartBeatMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
*
* @author shaozhuguang
* @create 2019/4/15
* @since 1.0.0
*/
@ChannelHandler.Sharable
public class HeartBeatSenderHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断收到的消息
if (HeartBeatMessage.isHeartBeat(msg)) {
// 假设收到的消息是字符串,并且是心跳消息,说明由服务端发送了心跳信息
// TODO 此处不需要进行消息反馈,只需要打印日志即可
System.out.println("Receive HeartBeat Response Message -> " + msg.toString());
} else {
super.channelRead(ctx, msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 出现异常直接关闭连接
ctx.close();
}
}

+ 48
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/HeartBeatSenderTrigger.java View File

@@ -0,0 +1,48 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.connection.handler.HeartBeatSenderTrigger
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/15 上午10:11
* Description:
*/
package com.jd.blockchain.stp.communication.connection.handler;

import com.jd.blockchain.stp.communication.message.HeartBeatMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

/**
*
* @author shaozhuguang
* @create 2019/4/15
* @since 1.0.0
*/
@ChannelHandler.Sharable
public class HeartBeatSenderTrigger extends ChannelInboundHandlerAdapter {

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

// 心跳事件
if (evt instanceof IdleStateEvent) {
IdleState idleState = ((IdleStateEvent) evt).state();
if (idleState.equals(IdleState.READER_IDLE)) {
// Sender读超时,表示在指定时间内未收到Receiver的应答
// 此时关闭连接,自动调用重连机制,进行重连操作
System.out.println("Long Time UnReceive HeartBeat Response, Close Connection !!!");
ctx.close();
} else if (idleState == IdleState.WRITER_IDLE) {
// Sender写超时,表示很长时间没有发送消息了,需要发送消息至Receiver
System.out.println("Read TimeOut Trigger, Send HeartBeat Request !!!");
HeartBeatMessage.write(ctx);
}
// 还有一种情况是读写超时,该情况暂不处理
} else {
super.userEventTriggered(ctx, evt);
}
}
}

+ 223
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/ReceiverHandler.java View File

@@ -0,0 +1,223 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.connection.handler.ReceiverHandler
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/12 上午11:14
* Description:
*/
package com.jd.blockchain.stp.communication.connection.handler;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.jd.blockchain.stp.communication.MessageExecute;
import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.connection.Connection;
import com.jd.blockchain.stp.communication.connection.listener.ReplyListener;
import com.jd.blockchain.stp.communication.manager.ConnectionManager;
import com.jd.blockchain.stp.communication.message.SessionMessage;
import com.jd.blockchain.stp.communication.message.TransferMessage;
import com.jd.blockchain.stp.communication.node.RemoteNode;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
*
* @author shaozhuguang
* @create 2019/4/12
* @since 1.0.0
*/
@ChannelHandler.Sharable
public class ReceiverHandler extends ChannelInboundHandlerAdapter implements Closeable {

// 队列的最大容量为256K(防止队列溢出)
private static final int QUEUE_CAPACITY = 256 * 1024;

private final Map<String, RemoteSession> remoteSessions = new ConcurrentHashMap<>();

private final Map<String, ReplyListener> allReplyListeners = new ConcurrentHashMap<>();

private final Lock lock = new ReentrantLock();

private String messageExecuteClass;

private ConnectionManager connectionManager;

private ExecutorService msgExecutePool;

public ReceiverHandler(ConnectionManager connectionManager, String messageExecuteClass) {
this.connectionManager = connectionManager;
this.messageExecuteClass = messageExecuteClass;
init();
}

public void putRemoteSession(String sessionId, RemoteSession remoteSession) {
remoteSessions.put(sessionId, remoteSession);
}

public void addListener(ReplyListener replyListener) {
allReplyListeners.put(replyListener.listenKey(), replyListener);
}

public void removeListener(String key) {
this.allReplyListeners.remove(key);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

System.out.println("Receive Biz Message -> " + msg.toString());
// 有数据接入
// 首先判断数据是否TransferMessage,当前Handler不处理非TransferMessage
TransferMessage tm = TransferMessage.toTransferMessageObj(msg);
if (tm == null) {
// 判断是否是SessionMessage
SessionMessage sm = SessionMessage.toNodeSessionMessage(msg);
if (sm != null) {
executeSessionMessage(sm);
} else {
super.channelRead(ctx, msg);
}
} else {
TransferMessage.MESSAGE_TYPE messageType = TransferMessage.MESSAGE_TYPE.valueOf(tm.getType());
// 对于请求和应答处理方式不同
if (messageType.equals(TransferMessage.MESSAGE_TYPE.TYPE_REQUEST)) {
// 假设是请求消息
executeRequest(tm);
} else if (messageType.equals(TransferMessage.MESSAGE_TYPE.TYPE_RESPONSE)) {
// 假设是应答消息
executeResponse(tm);
} else {
// todo 其他消息只需要打印日志即可


}
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}

// 防止消息的处理过程阻塞主进程
private void executeRequest(final TransferMessage transferMessage) {
msgExecutePool.execute(() -> {
RemoteSession remoteSession = remoteSessions.get(transferMessage.getSessionId());
if (remoteSession != null) {
MessageExecute messageExecute = remoteSession.messageExecute();
if (messageExecute != null) {
MessageExecute.REPLY replyType = messageExecute.replyType();
if (replyType != null) {
switch (messageExecute.replyType()) {
case MANUAL:
messageExecute.receive(transferMessage.loadKey(), transferMessage.load(), remoteSession);
break;
case AUTO:
String requestKey = transferMessage.loadKey();
byte[] replyMsg = messageExecute.receive(requestKey, transferMessage.load(), remoteSession);
// 应答
remoteSession.reply(requestKey, () -> replyMsg);
break;
default:
break;
}
}
}
}
});
}

private void executeResponse(final TransferMessage transferMessage) {
msgExecutePool.execute(() -> {
// listenKey和msgKey是不一致的
// msgKey是对消息本身设置key,listenKey是对整个消息(包括session信息)
String listenKey = transferMessage.toListenKey();

ReplyListener replyListener = allReplyListeners.get(listenKey);

if (replyListener != null) {
// 填充对应的结果
replyListener.replyData(transferMessage.load());

ReplyListener.MANAGE_TYPE manageType = replyListener.manageType();

if (manageType != null) {
switch (manageType) {
case REMOVE:
// 将对象从Map中移除
removeListener(listenKey);
break;
case HOLD:
default:
// todo 打印日志

break;
}
}
}
});
}

private void executeSessionMessage(SessionMessage sessionMessage) {
// 处理SessionMessage
String sessionId = sessionMessage.sessionId();
if (sessionId != null && !remoteSessions.containsKey(sessionId)) {

try {
lock.lock();
// 生成对应的MessageExecute对象
String messageExecuteClass = sessionMessage.getMessageExecute();
MessageExecute messageExecute = null;
if (messageExecuteClass != null && messageExecuteClass.length() > 0) {
try {
Class<?> clazz = Class.forName(messageExecuteClass);
messageExecute = (MessageExecute) clazz.newInstance();
} catch (Exception e) {
// TODO 打印日志
e.printStackTrace();
}
}

// 必须保证该对象不为空
if (messageExecute != null) {
// 说明尚未和请求来的客户端建立连接,需要建立连接
Connection remoteConnection = this.connectionManager.connect(new RemoteNode(
sessionMessage.getLocalHost(), sessionMessage.getListenPort()),
this.messageExecuteClass);
RemoteSession remoteSession = new RemoteSession(sessionId, remoteConnection, messageExecute);

// Double check !!!
if (!remoteSessions.containsKey(sessionId)) {
remoteSessions.put(sessionId, remoteSession);
}
}
} finally {
lock.unlock();
}
}
}

private void init() {

ThreadFactory msgExecuteThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("msg-execute-pool-%d").build();

//Common Thread Pool
msgExecutePool = new ThreadPoolExecutor(5, 10,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_CAPACITY),
msgExecuteThreadFactory, new ThreadPoolExecutor.AbortPolicy());
}

@Override
public void close() {
msgExecutePool.shutdown();
}
}

+ 46
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/SenderHandler.java View File

@@ -0,0 +1,46 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.connection.handler.SenderHandler
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/16 下午2:00
* Description:
*/
package com.jd.blockchain.stp.communication.connection.handler;

import com.jd.blockchain.stp.communication.message.SessionMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Executors;

/**
*
* @author shaozhuguang
* @create 2019/4/16
* @since 1.0.0
*/
@ChannelHandler.Sharable
public class SenderHandler extends ChannelInboundHandlerAdapter {

private SessionMessage sessionMessage;

public SenderHandler(SessionMessage sessionMessage) {
this.sessionMessage = sessionMessage;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 发送本机信息(包括IP、端口等)至对端
System.out.println("Connection Receiver Success, Send Local Node Information !!!");
ctx.writeAndFlush(sessionMessage.toTransferByteBuf());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

+ 183
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/handler/WatchDogHandler.java View File

@@ -0,0 +1,183 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.connection.SenderWatchDog
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/12 下午4:56
* Description:
*/
package com.jd.blockchain.stp.communication.connection.handler;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.jd.blockchain.stp.communication.message.HeartBeatMessage;
import com.jd.blockchain.stp.communication.node.RemoteNode;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.io.Closeable;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
*
* @author shaozhuguang
* @create 2019/4/12
* @since 1.0.0
*/
@ChannelHandler.Sharable
public class WatchDogHandler extends ChannelInboundHandlerAdapter implements Runnable, Closeable {

private final AtomicBoolean currentActive = new AtomicBoolean(false);

private final Lock reconnectLock = new ReentrantLock();

// 默认的最多重连次数
private final int maxReconnectSize = 16;

// 默认重连的时间
private final int defaultReconnectSeconds = 2;

// 标识是否正常工作中,假设不再工作则不再重连
private boolean isWorking = true;

private ScheduledExecutorService reconnectTimer;

private String hostName;

private int port;

private Bootstrap bootstrap;

private ChannelHandler[] frontHandlers;

private ChannelHandler[] afterHandlers;

private ChannelFuture channelFuture;

public WatchDogHandler(String hostName, int port, Bootstrap bootstrap) {
this.hostName = hostName;
this.port = port;
this.bootstrap = bootstrap;
}

public WatchDogHandler(RemoteNode remoteNode, Bootstrap bootstrap) {
this(remoteNode.getHostName(), remoteNode.getPort(), bootstrap);
}

public void init(ChannelHandler[] frontHandlers, ChannelHandler[] afterHandlers) {
this.frontHandlers = frontHandlers;
this.afterHandlers = afterHandlers;
initTimer();
}

public void initChannelFuture(ChannelFuture channelFuture) {
this.channelFuture = channelFuture;
}

public ChannelFuture channelFuture() {
try {
reconnectLock.lock();
return this.channelFuture;
} finally {
reconnectLock.unlock();
}
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 调用该方法表示连接成功
connectSuccess();

// 连接成功后发送心跳消息至服务端
HeartBeatMessage.write(ctx);

ctx.fireChannelActive();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {

System.err.println("Connection Exception, Close And Reconnect !!!");
// 调用该方法时表示连接关闭了(无论是什么原因)
// 连接关闭的情况下需要重新连接

connectFail();

ctx.close();

for (int i = 0; i < maxReconnectSize; i++) {
reconnectTimer.schedule(this, defaultReconnectSeconds << i, TimeUnit.SECONDS);
}

ctx.fireChannelInactive();
}

@Override
public void run() {
if (isNeedReconnect()) {
// 重连
try {
reconnectLock.lock();
if (isNeedReconnect()) {

bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(frontHandlers)
.addLast(new IdleStateHandler(10, 4, 0, TimeUnit.SECONDS))
.addLast(new LineBasedFrameDecoder(1024))
.addLast(afterHandlers)
;
}
});

channelFuture = bootstrap.connect(hostName, port);

// 增加监听器用于判断本次重连是否成功
channelFuture.addListener((ChannelFutureListener) future -> {
boolean isReconnectSuccess = future.isSuccess();
if (isReconnectSuccess) {
// 连接成功
connectSuccess();
} else {
connectFail();
}
});

}
} finally {
reconnectLock.unlock();
}
}
}

private boolean isNeedReconnect() {
return isWorking && !currentActive.get();
}

private void connectSuccess() {
this.currentActive.set(true);
}

private void connectFail() {
this.currentActive.set(false);
}

@Override
public void close() {
this.isWorking = false;
this.reconnectTimer.shutdown();
}

private void initTimer() {
ThreadFactory timerFactory = new ThreadFactoryBuilder()
.setNameFormat("reconnect-pool-%d").build();

reconnectTimer = new ScheduledThreadPoolExecutor(1, timerFactory, new ThreadPoolExecutor.AbortPolicy());
}
}

+ 74
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/connection/listener/ReplyListener.java View File

@@ -0,0 +1,74 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.connection.listener.ReplyListener
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/12 上午10:36
* Description:
*/
package com.jd.blockchain.stp.communication.connection.listener;

import com.jd.blockchain.stp.communication.callback.CallBackBarrier;
import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
import com.jd.blockchain.stp.communication.node.RemoteNode;


/**
*
* @author shaozhuguang
* @create 2019/4/12
* @since 1.0.0
*/

public class ReplyListener {

private String listenKey;

private MANAGE_TYPE manageType = MANAGE_TYPE.REMOVE;

private CallBackDataListener callBackDataListener;

private CallBackBarrier callBackBarrier;

public ReplyListener(String listenKey, RemoteNode remoteNode) {
this.listenKey = listenKey;
this.callBackDataListener = new CallBackDataListener(remoteNode);
}

public ReplyListener(String listenKey, RemoteNode remoteNode, CallBackBarrier callBackBarrier) {
this.listenKey = listenKey;
this.callBackDataListener = new CallBackDataListener(remoteNode);
this.callBackBarrier = callBackBarrier;
}

public void setManageType(MANAGE_TYPE manageType) {
this.manageType = manageType;
}

public String listenKey() {
return listenKey;
}

public CallBackDataListener callBackDataListener() {
return this.callBackDataListener;
}

public void replyData(byte[] reply) {
// 设置数据
this.callBackDataListener.setCallBackData(reply);
if (this.callBackBarrier != null) {
// 同步释放对应的栅栏
this.callBackBarrier.release();
}
}

public MANAGE_TYPE manageType() {
return this.manageType;
}

public enum MANAGE_TYPE {
HOLD,
REMOVE,
;
}
}

+ 0
- 27
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/inner/Receiver.java View File

@@ -1,27 +0,0 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.inner.Receiver
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午10:59
* Description:
*/
package com.jd.blockchain.stp.communication.inner;

import com.jd.blockchain.stp.communication.MessageHandler;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class Receiver {

private MessageHandler messageHandler;

public void listen() {

}
}

+ 0
- 35
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/inner/Sender.java View File

@@ -1,35 +0,0 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.inner.Sender
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午10:58
* Description:
*/
package com.jd.blockchain.stp.communication.inner;

import java.net.InetAddress;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class Sender {

private InetAddress inetAddress;

public void connect() {

}

public void send(String message) {

}

public void send(byte[] message) {

}
}

+ 110
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/ConnectionManager.java View File

@@ -0,0 +1,110 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.ConnectionManager
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 下午6:11
* Description:
*/
package com.jd.blockchain.stp.communication.manager;

import com.jd.blockchain.stp.communication.MessageExecute;
import com.jd.blockchain.stp.communication.connection.Receiver;
import com.jd.blockchain.stp.communication.connection.Connection;
import com.jd.blockchain.stp.communication.node.LocalNode;
import com.jd.blockchain.stp.communication.node.RemoteNode;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class ConnectionManager {

private static final Map<RemoteNode, Connection> connectionMap = new ConcurrentHashMap<>();

private static final Map<Integer, ConnectionManager> connectionManagerMap = new ConcurrentHashMap<>();

private static final Lock managerLock = new ReentrantLock();

private static final Lock connectionLock = new ReentrantLock();

private Receiver receiver;

public static final ConnectionManager newConnectionManager(LocalNode localNode) {
int listenPort = localNode.getPort();
if (!connectionManagerMap.containsKey(listenPort)) {
try {
managerLock.lock();
if (!connectionManagerMap.containsKey(listenPort)) {
ConnectionManager connectionManager = newInstance(localNode);
connectionManagerMap.put(listenPort, connectionManager);
return connectionManager;
}
} finally {
managerLock.unlock();
}
}
return connectionManagerMap.get(listenPort);
}

private static final ConnectionManager newInstance(LocalNode localNode) {
return new ConnectionManager(new Receiver(localNode));
}

public final boolean start(String messageExecuteClass) throws InterruptedException {
receiver.initReceiverHandler(this, messageExecuteClass);
receiver.startListen();
// 判断是否启动完成,启动完成后再返回
return receiver.waitStarted();
}

private ConnectionManager(Receiver receiver) {
this.receiver = receiver;
}

public Connection connect(RemoteNode remoteNode, MessageExecute messageExecute) {
return connect(remoteNode, messageExecute.getClass().toString());
}

public Connection connect(RemoteNode remoteNode, String messageExecuteClass) {
if (!connectionMap.containsKey(remoteNode)) {
try {
connectionLock.lock();
if (!connectionMap.containsKey(remoteNode)) {
Connection connection = init(remoteNode, messageExecuteClass);
connectionMap.put(remoteNode, connection);
}
} finally {
connectionLock.unlock();
}
}
return connectionMap.get(remoteNode);
}

private Connection init(RemoteNode remoteNode, String messageExecuteClass) {

// 初始化Connection
Connection remoteConnection = new Connection(this.receiver);

try {
// 连接远端
boolean isSuccess = remoteConnection.connect(remoteNode, messageExecuteClass);
if (!isSuccess) {
throw new RuntimeException(String.format("RemoteNode {%s} Connect Fail !!!", remoteNode.toString()));
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (RuntimeException e) {
throw e;
}
return remoteConnection;
}
}

+ 132
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/manager/RemoteSessionManager.java View File

@@ -0,0 +1,132 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.RemoteSessionManager
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午11:22
* Description:
*/
package com.jd.blockchain.stp.communication.manager;


import com.jd.blockchain.stp.communication.MessageExecute;
import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.connection.Connection;
import com.jd.blockchain.stp.communication.node.LocalNode;
import com.jd.blockchain.stp.communication.node.RemoteNode;
import org.apache.commons.codec.binary.Hex;


/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class RemoteSessionManager {

private ConnectionManager connectionManager;

private LocalNode localNode;

public RemoteSessionManager(LocalNode localNode) {
this.localNode = localNode;
check();
this.connectionManager = ConnectionManager.newConnectionManager(this.localNode);
try {
boolean isStartedSuccess = start(this.localNode.messageExecuteClass());
if (!isStartedSuccess) {
throw new RuntimeException(String.format("LocalNode {%s} Start Receiver Fail !!!", this.localNode.toString()));
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (RuntimeException e) {
throw e;
}

}

private void check() {
// 要求端口范围:1~65535,messageExecuteClass不能为null
int listenPort = this.localNode.getPort();
if (listenPort <= 0 || listenPort > 65535) {
throw new IllegalArgumentException("Illegal Local Listen Port, Please Check !!!");
}

String messageExecuteClass = this.localNode.messageExecuteClass();
if (messageExecuteClass == null) {
throw new IllegalArgumentException("Illegal MessageExecute Class, Please Check !!!");
}
}

private boolean start(String messageExecuteClass) throws InterruptedException {
return this.connectionManager.start(messageExecuteClass);
}

public RemoteSession newSession(RemoteNode remoteNode) {
return newSession(null, remoteNode);
}

public RemoteSession newSession(String sessionId, RemoteNode remoteNode) {
return newSession(sessionId, remoteNode, null);
}

public RemoteSession newSession(RemoteNode remoteNode, MessageExecute messageExecute) {
return newSession(null, remoteNode, messageExecute);
}

public RemoteSession newSession(String sessionId, RemoteNode remoteNode, MessageExecute messageExecute) {
if (sessionId == null) {
sessionId = toSessionId(localNode);
}
Connection remoteConnection = this.connectionManager.connect(remoteNode, localNode.messageExecuteClass());

RemoteSession remoteSession = new RemoteSession(sessionId, remoteConnection, messageExecute);

remoteSession.init();

return remoteSession;
}

public RemoteSession[] newSessions(RemoteNode[] remoteNodes) {
return newSessions(null, remoteNodes);
}

public RemoteSession[] newSessions(String[] sessionIds, RemoteNode[] remoteNodes) {

return newSessions(sessionIds, remoteNodes, null);
}

public RemoteSession[] newSessions(RemoteNode[] remoteNodes, MessageExecute messageExecute) {

return newSessions(null, remoteNodes, messageExecute);
}

public RemoteSession[] newSessions(String[] sessionIds, RemoteNode[] remoteNodes, MessageExecute messageExecute) {
if (remoteNodes == null || remoteNodes.length <= 0) {
throw new IllegalArgumentException("RemoteNodes is empty !!!");
}

if (sessionIds != null) {
if (sessionIds.length != remoteNodes.length) {
throw new IllegalArgumentException("RemoteNodes and sessionIds are different in length !!!");
}
}

RemoteSession[] remoteSessions = new RemoteSession[remoteNodes.length];

for (int i = 0; i < remoteNodes.length; i++) {
if (sessionIds == null) {
remoteSessions[i] = newSession(remoteNodes[i], messageExecute);
} else {
remoteSessions[i] = newSession(sessionIds[i], remoteNodes[i], messageExecute);
}
}
return remoteSessions;
}

public String toSessionId(RemoteNode remoteNode) {
return Hex.encodeHexString(remoteNode.toString().getBytes());
}
}

+ 30
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/AbstractMessage.java View File

@@ -0,0 +1,30 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.message.AbstractMessage
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/17 下午4:00
* Description:
*/
package com.jd.blockchain.stp.communication.message;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

/**
*
* @author shaozhuguang
* @create 2019/4/17
* @since 1.0.0
*/

public abstract class AbstractMessage implements IMessage {

@Override
public ByteBuf toTransferByteBuf() {
byte[] message = (toTransfer() + "\r\n").getBytes();
ByteBuf byteBuf = Unpooled.buffer(message.length);
byteBuf.writeBytes(message);
return byteBuf;
}
}

+ 57
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/HeartBeatMessage.java View File

@@ -0,0 +1,57 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.message.HeartBeatMessage
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/12 下午4:55
* Description:
*/
package com.jd.blockchain.stp.communication.message;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;

/**
* 心跳消息
* @author shaozhuguang
* @create 2019/4/12
* @since 1.0.0
*/

public class HeartBeatMessage implements IMessage {

private static final String HEARTBEAT_STRING = "JDChainHeartBeat";

private static final ByteBuf HEARTBEAT_MESSAGE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(HEARTBEAT_STRING + "\r\n",
CharsetUtil.UTF_8));

public static final void write(ChannelHandlerContext ctx) {
ctx.writeAndFlush(HEARTBEAT_MESSAGE.duplicate());
}

public static final boolean isHeartBeat(Object msg) {
if (msg instanceof String) {
return isHeartBeat((String) msg);
}
return false;
}

public static final boolean isHeartBeat(String msg) {
if (HEARTBEAT_STRING.equals(msg)) {
return true;
}
return false;
}

@Override
public String toTransfer() {
return HEARTBEAT_STRING;
}

@Override
public ByteBuf toTransferByteBuf() {
return HEARTBEAT_MESSAGE;
}
}

+ 25
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/IMessage.java View File

@@ -0,0 +1,25 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.message.IMessage
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/16 下午1:58
* Description:
*/
package com.jd.blockchain.stp.communication.message;

import io.netty.buffer.ByteBuf;

/**
*
* @author shaozhuguang
* @create 2019/4/16
* @since 1.0.0
*/

public interface IMessage {

String toTransfer();

ByteBuf toTransferByteBuf();
}

+ 95
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/SessionMessage.java View File

@@ -0,0 +1,95 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.message.SessionMessage
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/16 上午10:40
* Description:
*/
package com.jd.blockchain.stp.communication.message;

import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.commons.codec.binary.Hex;

/**
*
* @author shaozhuguang
* @create 2019/4/16
* @since 1.0.0
*/

public class SessionMessage extends AbstractMessage implements IMessage {

private String localHost;

private int listenPort;

private String messageExecute;

public SessionMessage() {
}

public SessionMessage(String localHost, int listenPort, String messageExecute) {
this.localHost = localHost;
this.listenPort = listenPort;
this.messageExecute = messageExecute;
}

public String getLocalHost() {
return localHost;
}

public void setLocalHost(String localHost) {
this.localHost = localHost;
}

public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}


public int getListenPort() {
return listenPort;
}

public String getMessageExecute() {
return messageExecute;
}

public void setMessageExecute(String messageExecute) {
this.messageExecute = messageExecute;
}

public String sessionId() {
return Hex.encodeHexString((this.localHost + ":" + this.listenPort).getBytes());
}

public static SessionMessage toNodeSessionMessage(Object msg) {
String msgString = msg.toString();
try {
String[] msgArray = msgString.split("\\|");
if (msgArray.length == 2 || msgArray.length == 3) {
String host = msgArray[0];
int port = Integer.parseInt(msgArray[1]);
String msgExecuteClass = null;
if (msgArray.length == 3) {
msgExecuteClass = msgArray[2];
}
return new SessionMessage(host, port, msgExecuteClass);
}
return null;
} catch (Exception e) {
return null;
}
}

@Override
public String toTransfer() {
// 为区别于TransferMessage的JSON格式,该处使用字符串连接处理
// 格式:localHost|port|class
String transferMsg = this.localHost + "|" + this.listenPort + "|" + this.messageExecute;
return transferMsg;
}
}

+ 117
- 11
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/TransferMessage.java View File

@@ -8,6 +8,9 @@
*/
package com.jd.blockchain.stp.communication.message;

import com.alibaba.fastjson.JSON;
import org.apache.commons.codec.binary.Base64;

/**
*
* @author shaozhuguang
@@ -15,32 +18,135 @@ package com.jd.blockchain.stp.communication.message;
* @since 1.0.0
*/

public class TransferMessage {
public class TransferMessage extends AbstractMessage implements IMessage{

private byte[] key;
private String sessionId;

private byte[] load;
private int type;

public byte[] getKey() {
return key;
private String key;

private transient byte[] load;

private String loadBase64;

public TransferMessage() {
}

public void setKey(byte[] key) {
public TransferMessage(String sessionId, int type, String key, byte[] load) {
this.sessionId = sessionId;
this.type = type;
this.key = key;
this.load = load;
}

public byte[] getLoad() {
public static TransferMessage toTransferMessageObj(Object msg) {
if (msg == null) {
return null;
}
TransferMessage tm;
try {
tm = JSON.parseObject(msg.toString(), TransferMessage.class);
tm.initLoad();
} catch (Exception e) {
return null;
}
return tm;
}

public byte[] load() {
return load;
}

public void setLoad(byte[] load) {
this.load = load;
public void initLoad() {
if (loadBase64 != null && loadBase64.length() > 0) {
load = Base64.decodeBase64(loadBase64);
}
}

public void initLoadBase64() {
if (load != null && load.length > 0) {
loadBase64 = Base64.encodeBase64String(load);
}
}

@Override
public String toTransfer() {
// 使用JSON的方式发送
// 初始化load的base64转换
initLoadBase64();

// 将字符串转换为JSON
return JSON.toJSONString(this);
}

public String toListenKey() {
// 格式:sessionId:key
return sessionId + ":" + key;
}

public String getSessionId() {
return sessionId;
}

public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}

public int getType() {
return type;
}

public void setType(int type) {
this.type = type;
}

public String loadKey() {
return key;
}

public String toBase64() {
public void setKey(String key) {
this.key = key;
}

public String getKey() {
return key;
}

public String getLoadBase64() {
return loadBase64;
}

public void setLoadBase64(String loadBase64) {
this.loadBase64 = loadBase64;
}

public enum MESSAGE_TYPE {

TYPE_REQUEST(0),

TYPE_RESPONSE(1);

private int code;

MESSAGE_TYPE(int code) {
this.code = code;
}

public int code() {
return code;
}

public static MESSAGE_TYPE valueOf(int code) {
switch (code) {
case 0:
return TYPE_REQUEST;
case 1:
return TYPE_RESPONSE;

}
return null;
}

return null;
}
}

+ 52
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/LocalNode.java View File

@@ -0,0 +1,52 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.node.LocalNode
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/16 下午3:12
* Description:
*/
package com.jd.blockchain.stp.communication.node;

import com.jd.blockchain.stp.communication.MessageExecute;

/**
*
* @author shaozhuguang
* @create 2019/4/16
* @since 1.0.0
*/

public class LocalNode extends RemoteNode {

private Class<?> messageExecute;

public LocalNode(String hostName, int port) {
super(hostName, port);
}

public LocalNode(String hostName, int port, MessageExecute messageExecute) {
super(hostName, port);
this.messageExecute = messageExecute.getClass();
}

public LocalNode(String hostName, int port, Class<?> messageExecute) {
super(hostName, port);
this.messageExecute = messageExecute;
}

public String messageExecuteClass() {
if (this.messageExecute == null) {
return null;
}
return this.messageExecute.getName();
}

public void setMessageExecute(MessageExecute messageExecute) {
this.messageExecute = messageExecute.getClass();
}

public void setMessageExecute(Class<?> messageExecute) {
this.messageExecute = messageExecute;
}
}

source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteNode.java → source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/node/RemoteNode.java View File

@@ -6,7 +6,7 @@
* Date: 2019/4/11 下午3:40
* Description:
*/
package com.jd.blockchain.stp.communication;
package com.jd.blockchain.stp.communication.node;

/**
*
@@ -41,4 +41,28 @@ public class RemoteNode {
public void setHostName(String hostName) {
this.hostName = hostName;
}

@Override
public int hashCode() {
return (hostName + ":" + port).hashCode();
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj instanceof RemoteNode) {
RemoteNode other = (RemoteNode) obj;
if (this.hashCode() == other.hashCode()) {
return true;
}
}
return false;
}

@Override
public String toString() {
return this.hostName + ":" + this.port;
}
}

+ 37
- 0
source/stp/stp-communication/src/test/java/com/jd/blockchain/SessionMessageTest.java View File

@@ -0,0 +1,37 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.SessionMessageTest
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/17 下午3:24
* Description:
*/
package com.jd.blockchain;

import com.jd.blockchain.stp.communication.message.SessionMessage;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

/**
*
* @author shaozhuguang
* @create 2019/4/17
* @since 1.0.0
*/

public class SessionMessageTest {


@Test
public void test() {
SessionMessage message = new SessionMessage("127.0.0.1", 9001, "com.jd.blockchain.StpTest.StpMessageExecute");

String transMsg = message.toTransfer();
System.out.println(transMsg);

SessionMessage sm = SessionMessage.toNodeSessionMessage(transMsg);

assertEquals(transMsg, sm.toTransfer());
}
}

+ 0
- 131
source/stp/stp-communication/src/test/java/com/jd/blockchain/StpTest.java View File

@@ -1,131 +0,0 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.StpTest
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 下午3:31
* Description:
*/
package com.jd.blockchain;

import com.jd.blockchain.stp.communication.MessageHandler;
import com.jd.blockchain.stp.communication.RemoteNode;
import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.RemoteSessionManager;
import com.jd.blockchain.stp.communication.message.LoadMessage;
import org.junit.Before;
import org.junit.Test;

import java.net.InetAddress;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class StpTest {

private int listenPort = 6000;

private int maxWaitTime = 2000;

private RemoteNode[] remoteNodes = new RemoteNode[3];

@Before
public void init() {
for (int i = 0; i < remoteNodes.length; i++) {
remoteNodes[i] = new RemoteNode("127.0.0.1", 6001 + i);
}
}


@Test
public void test() {
// 创建RemoteSessionManager对象
RemoteSessionManager sessionManager = new RemoteSessionManager(listenPort);

// 创建RemoteSession[]对象
RemoteSession[] remoteSessions = sessionManager.newSessions(remoteNodes);

// 设置MessageHandler并连接
for (RemoteSession remoteSession : remoteSessions) {

// 设置MessageHandler
remoteSession.initHandler(new StpMessageHandler());

// session连接
remoteSession.connect();
}

// 生成请求对象
LoadMessage loadMessage = new StpLoadMessage();

// 异步发送处理过程

CountDownLatch countDownLatch = new CountDownLatch(remoteSessions.length);

// 发送请求至remotes
LinkedList<Future<byte[]>> responses = new LinkedList<>();
for (RemoteSession remoteSession : remoteSessions) {
Future<byte[]> response = remoteSession.asyncSend(loadMessage, countDownLatch);
responses.addLast(response);
}

// 超时判断
try {
if (countDownLatch.await(maxWaitTime, TimeUnit.MILLISECONDS)) {
// 汇总异步消息结果
LinkedList<byte[]> receiveResponses = new LinkedList<>();
// 通过迭代器遍历链表
Iterator<Future<byte[]>> iterator = responses.iterator();
while (iterator.hasNext()) {
Future<byte[]> response = iterator.next();
// 判断是否已完成,对于没有完成的直接放弃(因为已经超时)
if (response.isDone()) {
receiveResponses.addLast(response.get());
}
}

//TODO 检查汇总后的应答消息,准备开始新一轮的请求




}
} catch (Exception e) {

}
}


public static class StpMessageHandler implements MessageHandler {

@Override
public void receive(byte[] key, byte[] data, RemoteSession session) {
// 作为Receiver接收到请求消息后需要发送应答
// 生成应答消息
LoadMessage replyLoadMessage = new StpLoadMessage();

// 发送应答消息(注意key必须保持一致)
// 异步发送应答消息可使用asyncReply
session.reply(key, replyLoadMessage);
}
}

public static class StpLoadMessage implements LoadMessage {

@Override
public byte[] toBytes() {
return new byte[0];
}
}
}

+ 1
- 0
source/test/pom.xml View File

@@ -15,6 +15,7 @@
<module>test-consensus-node</module>
<module>test-ledger-core</module>
<module>test-integration</module>
<module>test-stp-community</module>
</modules>
<build>


+ 36
- 0
source/test/test-stp-community/pom.xml View File

@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>test</artifactId>
<groupId>com.jd.blockchain</groupId>
<version>0.9.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>test-stp-community</artifactId>

<name>test-stp-community</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>

<dependency>
<groupId>com.jd.blockchain</groupId>
<artifactId>stp-communication</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

+ 37
- 0
source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java View File

@@ -0,0 +1,37 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.commucation.MyMessageExecutor
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/17 下午3:38
* Description:
*/
package com.jd.blockchain.stp.commucation;

import com.jd.blockchain.stp.communication.MessageExecute;
import com.jd.blockchain.stp.communication.RemoteSession;

import java.nio.charset.Charset;

/**
*
* @author shaozhuguang
* @create 2019/4/17
* @since 1.0.0
*/

public class MyMessageExecutor implements MessageExecute {

@Override
public byte[] receive(String key, byte[] data, RemoteSession session) {
String receiveMsg = new String(data, Charset.defaultCharset());
System.out.printf("receive client {%s} request {%s} \r\n", session.sessionId(), receiveMsg);
String msg = session.sessionId() + " -> received !!!";
return msg.getBytes(Charset.defaultCharset());
}

@Override
public REPLY replyType() {
return REPLY.AUTO;
}
}

+ 185
- 0
source/test/test-stp-community/src/test/java/com/jd/blockchain/StpTest.java View File

@@ -0,0 +1,185 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.StpTest
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 下午3:31
* Description:
*/
package com.jd.blockchain;

import com.jd.blockchain.stp.commucation.MyMessageExecutor;
import com.jd.blockchain.stp.communication.MessageExecute;
import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.callback.CallBackBarrier;
import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import com.jd.blockchain.stp.communication.message.LoadMessage;
import com.jd.blockchain.stp.communication.node.LocalNode;
import com.jd.blockchain.stp.communication.node.RemoteNode;
import org.junit.Before;
import org.junit.Test;

import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class StpTest {

private int maxWaitTime = 2000;

private final String remoteHost = "127.0.0.1";

private final int localPort = 9001;

private final int[] listenPorts = new int[]{9001, 9002, 9003, 9004};

private final RemoteSessionManager[] sessionManagers = new RemoteSessionManager[listenPorts.length];

private final ExecutorService threadPool = Executors.newFixedThreadPool(6);

private RemoteSession[] remoteSessions;

@Before
public void init() {

System.out.println("---------- listenStart -----------");
listenStart();
System.out.println("---------- listenComplete -----------");
System.out.println("---------- ConnectionStart ----------");
connectAllOthers();
System.out.println("---------- ConnectionComplete ----------");
}

private void listenStart() {
CountDownLatch countDownLatch = new CountDownLatch(listenPorts.length);

for (int i = 0; i < listenPorts.length; i++) {
final int port = listenPorts[i], index = i;
threadPool.execute(() -> {
// 创建本地节点
final LocalNode localNode = new LocalNode(remoteHost, port, MyMessageExecutor.class);
// 启动当前节点
RemoteSessionManager sessionManager = new RemoteSessionManager(localNode);
sessionManagers[index] = sessionManager;
countDownLatch.countDown();
System.out.printf("Current Node {%s} start success !!! \r\n", localNode.toString());
});
}

// 等待所有节点启动完成
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}

private void connectAllOthers() {
// 所有节点完成之后,需要启动
// 启动一个节点
RemoteSessionManager starter = sessionManagers[0];

// 当前节点需要连接到其他3个节点
RemoteNode[] remoteNodes = new RemoteNode[listenPorts.length - 1];
int index = 0;
for (int port : listenPorts) {
if (port != localPort) {
remoteNodes[index++] = new RemoteNode(remoteHost, port);
}
}

remoteSessions = starter.newSessions(remoteNodes, new MyMessageExecutor());
}

private void connectOneOther() {
// 所有节点完成之后,需要启动
// 启动一个节点
RemoteSessionManager starter = sessionManagers[0];

// 当前节点需要连接到其他3个节点
RemoteNode[] remoteNodes = new RemoteNode[1];
int index = 0;
for (int port : listenPorts) {
if (port != localPort && index < 1) {
remoteNodes[index++] = new RemoteNode(remoteHost, port);
}
}

remoteSessions = starter.newSessions(remoteNodes, new MyMessageExecutor());
}


@Test
public void test() {

try {
Thread.sleep(3000);

} catch (Exception e) {
e.printStackTrace();
}

// 生成请求对象
LoadMessage loadMessage = new StpLoadMessage(remoteHost + ":" + localPort);

// 异步发送处理过程
CallBackBarrier callBackBarrier = CallBackBarrier.newCallBackBarrier(remoteSessions.length, 10000);

// 发送请求至remotes
LinkedList<CallBackDataListener> responses = new LinkedList<>();
for (RemoteSession remoteSession : remoteSessions) {
CallBackDataListener response = remoteSession.asyncRequest(loadMessage, callBackBarrier);
responses.addLast(response);
}

// 超时判断
try {
if (callBackBarrier.tryCall()) {

// 说明结果已经全部返回
// 打印出所有的结果
// 通过迭代器遍历链表
Iterator<CallBackDataListener> iterator = responses.iterator();
while (iterator.hasNext()) {
CallBackDataListener response = iterator.next();
// 判断是否已完成,对于没有完成的直接放弃(因为已经超时)
if (response.isDone()) {
System.out.printf("Receive Response {%s} {%s} \r\n",
response.remoteNode().toString(), new String(response.getCallBackData()));
}
}
}
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}


}

public static class StpLoadMessage implements LoadMessage {

private String localInfo;

public StpLoadMessage(String localInfo) {
this.localInfo = localInfo;
}

@Override
public byte[] toBytes() {
String msg = localInfo + " -> Send !!!";
return msg.getBytes(Charset.defaultCharset());
}
}
}

Loading…
Cancel
Save