diff --git a/source/pom.xml b/source/pom.xml index 2e510d96..cee45061 100644 --- a/source/pom.xml +++ b/source/pom.xml @@ -36,6 +36,7 @@ tools test deployment + stp diff --git a/source/stp/pom.xml b/source/stp/pom.xml new file mode 100644 index 00000000..fd7fc35e --- /dev/null +++ b/source/stp/pom.xml @@ -0,0 +1,21 @@ + + + + + jdchain-root + com.jd.blockchain + 0.9.0-SNAPSHOT + + 4.0.0 + + stp + pom + + stp-communication + + + + UTF-8 + + diff --git a/source/stp/stp-communication/pom.xml b/source/stp/stp-communication/pom.xml new file mode 100644 index 00000000..fcf21bc1 --- /dev/null +++ b/source/stp/stp-communication/pom.xml @@ -0,0 +1,35 @@ + + + + + stp + com.jd.blockchain + 0.9.0-SNAPSHOT + + 4.0.0 + + stp-communication + + stp-communication + + + UTF-8 + 1.8 + 1.8 + + + + + junit + junit + test + + + + io.netty + netty-all + 4.1.29.Final + + + diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageHandler.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageHandler.java new file mode 100644 index 00000000..cae8a853 --- /dev/null +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageHandler.java @@ -0,0 +1,21 @@ +/** + * Copyright: Copyright 2016-2020 JD.COM All Right Reserved + * FileName: com.jd.blockchain.stp.communication.MessageHandler + * Author: shaozhuguang + * Department: Y事业部 + * Date: 2019/4/11 上午10:59 + * Description: + */ +package com.jd.blockchain.stp.communication; + +/** + * + * @author shaozhuguang + * @create 2019/4/11 + * @since 1.0.0 + */ + +public interface MessageHandler { + + void receive(byte[] key, byte[] data, RemoteSession session); +} \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteNode.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteNode.java new file mode 100644 index 00000000..18fcafd2 --- /dev/null +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteNode.java @@ -0,0 +1,44 @@ +/** + * Copyright: Copyright 2016-2020 JD.COM All Right Reserved + * FileName: com.jd.blockchain.stp.communication.RemoteNode + * Author: shaozhuguang + * Department: Y事业部 + * Date: 2019/4/11 下午3:40 + * Description: + */ +package com.jd.blockchain.stp.communication; + +/** + * + * @author shaozhuguang + * @create 2019/4/11 + * @since 1.0.0 + */ + +public class RemoteNode { + + private int port; + + private String hostName; + + public RemoteNode(String hostName, int port) { + this.port = port; + this.hostName = hostName; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } +} \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java new file mode 100644 index 00000000..4a047e7f --- /dev/null +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java @@ -0,0 +1,94 @@ +/** + * Copyright: Copyright 2016-2020 JD.COM All Right Reserved + * FileName: com.jd.blockchain.stp.communication.RemoteSession + * Author: shaozhuguang + * Department: Y事业部 + * Date: 2019/4/11 上午11:15 + * Description: + */ +package com.jd.blockchain.stp.communication; + +import com.jd.blockchain.stp.communication.inner.Receiver; +import com.jd.blockchain.stp.communication.inner.Sender; +import com.jd.blockchain.stp.communication.message.LoadMessage; + +import java.net.InetAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; + +/** + * + * @author shaozhuguang + * @create 2019/4/11 + * @since 1.0.0 + */ + +public class RemoteSession { + + private String id; + + private RemoteNode remoteNode; + + private Sender sender; + + private Receiver receiver; + + private MessageHandler messageHandler; + + public void initHandler(MessageHandler messageHandler) { + + } + + public void connect() { + + } + + public byte[] send(LoadMessage loadMessage) { + + return null; + } + + public Future asyncSend(LoadMessage loadMessage) { + return null; + } + + public Future asyncSend(LoadMessage loadMessage, CountDownLatch countDownLatch) { + return null; + } + + public byte[] send(byte[] loadMessage) { + + return null; + } + + public Future asyncSend(byte[] loadMessage) { + + return null; + } + + public Future asyncSend(byte[] loadMessage, CountDownLatch countDownLatch) { + + return null; + } + + public void reply(byte[] key, LoadMessage loadMessage) { + + } + + public void asyncReply(byte[] key, LoadMessage loadMessage) { + + } + + public void reply(byte[] key, byte[] loadMessage) { + + } + + public void asyncReply(byte[] key, byte[] loadMessage) { + + } + + public void close() { + + } +} \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSessionManager.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSessionManager.java new file mode 100644 index 00000000..2e87cf16 --- /dev/null +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSessionManager.java @@ -0,0 +1,98 @@ +/** + * Copyright: Copyright 2016-2020 JD.COM All Right Reserved + * FileName: com.jd.blockchain.stp.communication.RemoteSessionManager + * Author: shaozhuguang + * Department: Y事业部 + * Date: 2019/4/11 上午11:22 + * Description: + */ +package com.jd.blockchain.stp.communication; + +import com.jd.blockchain.stp.communication.inner.Receiver; + +import java.net.InetAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * + * @author shaozhuguang + * @create 2019/4/11 + * @since 1.0.0 + */ + +public class RemoteSessionManager { + + private static Map receiverMap = new ConcurrentHashMap<>(); + + private static final Lock lock = new ReentrantLock(); + + public RemoteSessionManager(int listenPort) { + if (listenPort <= 0) { + throw new IllegalArgumentException("Illegal port, please check !!!"); + } + + if (!receiverMap.containsKey(listenPort)) { + try { + lock.lock(); + if (!receiverMap.containsKey(listenPort)) { + Receiver receiver = initReceiver(listenPort); + receiverMap.put(listenPort, receiver); + } + } finally { + lock.unlock(); + } + } + } + + public RemoteSession newSession(RemoteNode remoteNode) { + return newSession(toRemoteId(remoteNode), remoteNode); + } + + public RemoteSession newSession(String remoteId, RemoteNode remoteNode) { + return newSession(remoteId, remoteNode, null); + } + + public RemoteSession newSession(RemoteNode remoteNode, MessageHandler messageHandler) { + return newSession(toRemoteId(remoteNode), remoteNode, messageHandler); + } + + public RemoteSession newSession(String remoteId, RemoteNode remoteNode, MessageHandler messageHandler) { + + + return null; + } + + public RemoteSession[] newSessions(RemoteNode[] remoteNodes) { + + + return null; + } + + public RemoteSession[] newSessions(String[] remoteIds, RemoteNode[] remoteNodes) { + + return null; + } + + public RemoteSession[] newSessions(RemoteNode[] remoteNodes, MessageHandler messageHandler) { + + return null; + } + + public RemoteSession[] newSessions(String[] remoteIds, RemoteNode[] remoteNodes, MessageHandler messageHandler) { + + return null; + } + + public String toRemoteId(RemoteNode remoteNode) { + + return null; + } + + private Receiver initReceiver(int port) { + + return null; + } +} \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/inner/Receiver.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/inner/Receiver.java new file mode 100644 index 00000000..698c52b8 --- /dev/null +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/inner/Receiver.java @@ -0,0 +1,27 @@ +/** + * Copyright: Copyright 2016-2020 JD.COM All Right Reserved + * FileName: com.jd.blockchain.stp.communication.inner.Receiver + * Author: shaozhuguang + * Department: Y事业部 + * Date: 2019/4/11 上午10:59 + * Description: + */ +package com.jd.blockchain.stp.communication.inner; + +import com.jd.blockchain.stp.communication.MessageHandler; + +/** + * + * @author shaozhuguang + * @create 2019/4/11 + * @since 1.0.0 + */ + +public class Receiver { + + private MessageHandler messageHandler; + + public void listen() { + + } +} \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/inner/Sender.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/inner/Sender.java new file mode 100644 index 00000000..8072c675 --- /dev/null +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/inner/Sender.java @@ -0,0 +1,35 @@ +/** + * Copyright: Copyright 2016-2020 JD.COM All Right Reserved + * FileName: com.jd.blockchain.stp.communication.inner.Sender + * Author: shaozhuguang + * Department: Y事业部 + * Date: 2019/4/11 上午10:58 + * Description: + */ +package com.jd.blockchain.stp.communication.inner; + +import java.net.InetAddress; + +/** + * + * @author shaozhuguang + * @create 2019/4/11 + * @since 1.0.0 + */ + +public class Sender { + + private InetAddress inetAddress; + + public void connect() { + + } + + public void send(String message) { + + } + + public void send(byte[] message) { + + } +} \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/LoadMessage.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/LoadMessage.java new file mode 100644 index 00000000..acdedee2 --- /dev/null +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/LoadMessage.java @@ -0,0 +1,21 @@ +/** + * Copyright: Copyright 2016-2020 JD.COM All Right Reserved + * FileName: com.jd.blockchain.stp.communication.message.LoadMessage + * Author: shaozhuguang + * Department: Y事业部 + * Date: 2019/4/11 上午10:59 + * Description: + */ +package com.jd.blockchain.stp.communication.message; + +/** + * + * @author shaozhuguang + * @create 2019/4/11 + * @since 1.0.0 + */ + +public interface LoadMessage { + + byte[] toBytes(); +} \ No newline at end of file diff --git a/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/TransferMessage.java b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/TransferMessage.java new file mode 100644 index 00000000..eac2d2a2 --- /dev/null +++ b/source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/TransferMessage.java @@ -0,0 +1,46 @@ +/** + * Copyright: Copyright 2016-2020 JD.COM All Right Reserved + * FileName: com.jd.blockchain.stp.communication.message.TransferMessage + * Author: shaozhuguang + * Department: Y事业部 + * Date: 2019/4/11 上午11:00 + * Description: + */ +package com.jd.blockchain.stp.communication.message; + +/** + * + * @author shaozhuguang + * @create 2019/4/11 + * @since 1.0.0 + */ + +public class TransferMessage { + + private byte[] key; + + private byte[] load; + + public byte[] getKey() { + return key; + } + + public void setKey(byte[] key) { + this.key = key; + } + + public byte[] getLoad() { + return load; + } + + public void setLoad(byte[] load) { + this.load = load; + } + + public String toBase64() { + + + + return null; + } +} \ No newline at end of file diff --git a/source/stp/stp-communication/src/test/java/com/jd/blockchain/StpTest.java b/source/stp/stp-communication/src/test/java/com/jd/blockchain/StpTest.java new file mode 100644 index 00000000..04c7f1e4 --- /dev/null +++ b/source/stp/stp-communication/src/test/java/com/jd/blockchain/StpTest.java @@ -0,0 +1,131 @@ +/** + * Copyright: Copyright 2016-2020 JD.COM All Right Reserved + * FileName: com.jd.blockchain.StpTest + * Author: shaozhuguang + * Department: Y事业部 + * Date: 2019/4/11 下午3:31 + * Description: + */ +package com.jd.blockchain; + +import com.jd.blockchain.stp.communication.MessageHandler; +import com.jd.blockchain.stp.communication.RemoteNode; +import com.jd.blockchain.stp.communication.RemoteSession; +import com.jd.blockchain.stp.communication.RemoteSessionManager; +import com.jd.blockchain.stp.communication.message.LoadMessage; +import org.junit.Before; +import org.junit.Test; + +import java.net.InetAddress; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * + * @author shaozhuguang + * @create 2019/4/11 + * @since 1.0.0 + */ + +public class StpTest { + + private int listenPort = 6000; + + private int maxWaitTime = 2000; + + private RemoteNode[] remoteNodes = new RemoteNode[3]; + + @Before + public void init() { + for (int i = 0; i < remoteNodes.length; i++) { + remoteNodes[i] = new RemoteNode("127.0.0.1", 6001 + i); + } + } + + + @Test + public void test() { + // 创建RemoteSessionManager对象 + RemoteSessionManager sessionManager = new RemoteSessionManager(listenPort); + + // 创建RemoteSession[]对象 + RemoteSession[] remoteSessions = sessionManager.newSessions(remoteNodes); + + // 设置MessageHandler并连接 + for (RemoteSession remoteSession : remoteSessions) { + + // 设置MessageHandler + remoteSession.initHandler(new StpMessageHandler()); + + // session连接 + remoteSession.connect(); + } + + // 生成请求对象 + LoadMessage loadMessage = new StpLoadMessage(); + + // 异步发送处理过程 + + CountDownLatch countDownLatch = new CountDownLatch(remoteSessions.length); + + // 发送请求至remotes + LinkedList> responses = new LinkedList<>(); + for (RemoteSession remoteSession : remoteSessions) { + Future response = remoteSession.asyncSend(loadMessage, countDownLatch); + responses.addLast(response); + } + + // 超时判断 + try { + if (countDownLatch.await(maxWaitTime, TimeUnit.MILLISECONDS)) { + // 汇总异步消息结果 + LinkedList receiveResponses = new LinkedList<>(); + // 通过迭代器遍历链表 + Iterator> iterator = responses.iterator(); + while (iterator.hasNext()) { + Future response = iterator.next(); + // 判断是否已完成,对于没有完成的直接放弃(因为已经超时) + if (response.isDone()) { + receiveResponses.addLast(response.get()); + } + } + + //TODO 检查汇总后的应答消息,准备开始新一轮的请求 + + + + + } + } catch (Exception e) { + + } + } + + + public static class StpMessageHandler implements MessageHandler { + + @Override + public void receive(byte[] key, byte[] data, RemoteSession session) { + // 作为Receiver接收到请求消息后需要发送应答 + // 生成应答消息 + LoadMessage replyLoadMessage = new StpLoadMessage(); + + // 发送应答消息(注意key必须保持一致) + // 异步发送应答消息可使用asyncReply + session.reply(key, replyLoadMessage); + } + } + + public static class StpLoadMessage implements LoadMessage { + + @Override + public byte[] toBytes() { + return new byte[0]; + } + } +} \ No newline at end of file