diff --git a/source/gateway/src/main/java/com/jd/blockchain/gateway/service/GatewayQueryServiceHandler.java b/source/gateway/src/main/java/com/jd/blockchain/gateway/service/GatewayQueryServiceHandler.java index 71e34a7c..1a6c0267 100644 --- a/source/gateway/src/main/java/com/jd/blockchain/gateway/service/GatewayQueryServiceHandler.java +++ b/source/gateway/src/main/java/com/jd/blockchain/gateway/service/GatewayQueryServiceHandler.java @@ -81,7 +81,7 @@ public class GatewayQueryServiceHandler implements GatewayQueryService { ledgerInitSettings.setSeed(initSeed(ledgerMetadata.getSeed())); // 设置共识协议 - ledgerInitSettings.setConsensusProtocol(consensusProtocol(ledgerMetadata.getSetting().getConsensusProvider())); + ledgerInitSettings.setConsensusProtocol(ledgerMetadata.getSetting().getConsensusProvider()); return ledgerInitSettings; } @@ -110,24 +110,6 @@ public class GatewayQueryServiceHandler implements GatewayQueryService { return seed.toString(); } - /** - * 生成共识协议 - * - * @param consensusProvider - * 共识协议提提供者 - * @return - */ - private int consensusProtocol(String consensusProvider) { - - if (consensusProvider.equals(BftsmartConsensusProvider.NAME)) { - return LedgerInitSettings.CONSENSUS_PROTOCOL.BFTSMART.code(); - } else if (consensusProvider.equals(MsgQueueConsensusProvider.NAME)) { - return LedgerInitSettings.CONSENSUS_PROTOCOL.MSGQUEUE.code(); - } - - return LedgerInitSettings.CONSENSUS_PROTOCOL.UNKNOWN.code(); - } - /** * 初始化共识配置 * diff --git a/source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/ParticipantNode.java b/source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/ParticipantNode.java index c8974d1c..dd2c62fa 100644 --- a/source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/ParticipantNode.java +++ b/source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/ParticipantNode.java @@ -22,6 +22,7 @@ public interface ParticipantNode {// extends ConsensusNode, ParticipantInfo { * * @return */ + @DataField(order = 0, primitiveType = PrimitiveType.INT32) int getId(); /** diff --git a/source/peer/src/main/java/com/jd/blockchain/peer/web/PeerTimeTasks.java b/source/peer/src/main/java/com/jd/blockchain/peer/web/PeerTimeTasks.java index 8e3d220f..698fb584 100644 --- a/source/peer/src/main/java/com/jd/blockchain/peer/web/PeerTimeTasks.java +++ b/source/peer/src/main/java/com/jd/blockchain/peer/web/PeerTimeTasks.java @@ -15,11 +15,11 @@ import com.jd.blockchain.peer.ConsensusManage; import com.jd.blockchain.peer.LedgerBindingConfigAware; import com.jd.blockchain.peer.PeerServerBooter; import com.jd.blockchain.tools.initializer.LedgerBindingConfig; -import com.jd.blockchain.utils.ArgumentSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.CommandLineRunner; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.core.io.ClassPathResource; @@ -38,9 +38,11 @@ import java.util.*; * @since 1.0.0 */ @Component -//@EnableScheduling +@EnableScheduling public class PeerTimeTasks implements ApplicationContextAware { + private static Logger LOGGER = LoggerFactory.getLogger(PeerTimeTasks.class); + private ApplicationContext applicationContext; @Autowired @@ -51,7 +53,9 @@ public class PeerTimeTasks implements ApplicationContextAware { //每1分钟执行一次 @Scheduled(cron = "0 */5 * * * * ") public void updateLedger(){ - System.out.println ("Update Ledger Tasks Start " + new Date()); + + LOGGER.debug("Time Task Update Ledger Tasks Start {}", new Date()); + try { LedgerBindingConfig ledgerBindingConfig = loadLedgerBindingConfig(); @@ -78,7 +82,8 @@ public class PeerTimeTasks implements ApplicationContextAware { Map bindingConfigAwares = applicationContext.getBeansOfType(LedgerBindingConfigAware.class); List nodeServers = new ArrayList<>(); for (HashDigest ledgerHash : newAddHashs) { - System.out.printf("newLedger[%s] \r\n", ledgerHash.toBase58()); + + LOGGER.info("New Ledger [{}] Need To Be Init !!!", ledgerHash.toBase58()); for (LedgerBindingConfigAware aware : bindingConfigAwares.values()) { nodeServers.add(aware.setConfig(ledgerBindingConfig, ledgerHash)); } @@ -89,10 +94,10 @@ public class PeerTimeTasks implements ApplicationContextAware { consensusManage.runRealm(nodeServer); } } else { - System.out.println("All Ledgers is newest!!!"); + LOGGER.debug("All Ledgers is newest!!!"); } } catch (Exception e) { - e.printStackTrace(); + LOGGER.error(e.getMessage()); } } @@ -104,7 +109,8 @@ public class PeerTimeTasks implements ApplicationContextAware { private LedgerBindingConfig loadLedgerBindingConfig() throws Exception { LedgerBindingConfig ledgerBindingConfig; ledgerBindConfigFile = PeerServerBooter.ledgerBindConfigFile; - System.out.printf("load ledgerBindConfigFile = %s \r\n", ledgerBindConfigFile); + LOGGER.debug("Load LedgerBindConfigFile path = {}", + ledgerBindConfigFile == null ? "Default" : ledgerBindConfigFile); if (ledgerBindConfigFile == null) { ClassPathResource configResource = new ClassPathResource("ledger-binding.conf"); InputStream in = configResource.getInputStream(); diff --git a/source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/LedgerInitSettings.java b/source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/LedgerInitSettings.java index fa3c2cd0..d4f287ab 100644 --- a/source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/LedgerInitSettings.java +++ b/source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/LedgerInitSettings.java @@ -34,7 +34,7 @@ public class LedgerInitSettings { /** * 共识协议 */ - private int consensusProtocol; + private String consensusProtocol; /** * 共识配置 @@ -70,11 +70,11 @@ public class LedgerInitSettings { this.cryptoSetting = cryptoSetting; } - public int getConsensusProtocol() { + public String getConsensusProtocol() { return consensusProtocol; } - public void setConsensusProtocol(int consensusProtocol) { + public void setConsensusProtocol(String consensusProtocol) { this.consensusProtocol = consensusProtocol; } @@ -93,21 +93,4 @@ public class LedgerInitSettings { public void setParticipantNodes(ParticipantNode[] participantNodes) { this.participantNodes = participantNodes; } - - public enum CONSENSUS_PROTOCOL { - UNKNOWN(0), - BFTSMART(1), - MSGQUEUE(2), - ; - - private int code; - - CONSENSUS_PROTOCOL(int code) { - this.code = code; - } - - public int code() { - return code; - } - } } diff --git a/source/test/pom.xml b/source/test/pom.xml index ac3b0562..4df214ad 100644 --- a/source/test/pom.xml +++ b/source/test/pom.xml @@ -15,7 +15,6 @@ test-consensus-node test-ledger-core test-integration - test-stp-community diff --git a/source/test/test-stp-community/pom.xml b/source/test/test-stp-community/pom.xml deleted file mode 100644 index 12d031b1..00000000 --- a/source/test/test-stp-community/pom.xml +++ /dev/null @@ -1,36 +0,0 @@ - - - - - test - com.jd.blockchain - 0.9.0-SNAPSHOT - - 4.0.0 - - test-stp-community - - test-stp-community - - - UTF-8 - 1.8 - 1.8 - - - - - - com.jd.blockchain - stp-communication - ${project.version} - - - - junit - junit - test - - - diff --git a/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java b/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java deleted file mode 100644 index 26656a8b..00000000 --- a/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Copyright: Copyright 2016-2020 JD.COM All Right Reserved - * FileName: com.jd.blockchain.stp.commucation.MyMessageExecutor - * Author: shaozhuguang - * Department: Y事业部 - * Date: 2019/4/17 下午3:38 - * Description: - */ -package com.jd.blockchain.stp.commucation; - -import com.jd.blockchain.stp.communication.MessageExecutor; -import com.jd.blockchain.stp.communication.RemoteSession; - -import java.nio.charset.Charset; - -/** - * - * @author shaozhuguang - * @create 2019/4/17 - * @since 1.0.0 - */ - -public class MyMessageExecutor implements MessageExecutor { - - @Override - public byte[] receive(String key, byte[] data, RemoteSession session) { - String receiveMsg = new String(data, Charset.defaultCharset()); - System.out.printf("receive client {%s} request {%s} \r\n", session.remoteNode().toString(), receiveMsg); - String msg = session.localId() + " -> received !!!"; - return msg.getBytes(Charset.defaultCharset()); - } - - @Override - public REPLY replyType() { - return REPLY.AUTO; - } -} \ No newline at end of file diff --git a/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/StpReceiversBoot.java b/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/StpReceiversBoot.java deleted file mode 100644 index e725d77b..00000000 --- a/source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/StpReceiversBoot.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Copyright: Copyright 2016-2020 JD.COM All Right Reserved - * FileName: com.jd.blockchain.StpReceiversBoot - * Author: shaozhuguang - * Department: Y事业部 - * Date: 2019/4/18 下午3:44 - * Description: - */ -package com.jd.blockchain.stp.commucation; - -import com.jd.blockchain.stp.communication.MessageExecutor; -import com.jd.blockchain.stp.communication.manager.RemoteSessionManager; -import com.jd.blockchain.stp.communication.node.LocalNode; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -/** - * - * @author shaozhuguang - * @create 2019/4/18 - * @since 1.0.0 - */ - -public class StpReceiversBoot { - - private int[] listenPorts; - - private final String remoteHost = "127.0.0.1"; - - private ExecutorService threadPool; - - public StpReceiversBoot(int... ports) { - listenPorts = ports; - threadPool = Executors.newFixedThreadPool(ports.length + 2); - } - - public RemoteSessionManager[] start(MessageExecutor messageExecutor) { - - final int totalSessionSize = listenPorts.length; - - CountDownLatch countDownLatch = new CountDownLatch(totalSessionSize); - - RemoteSessionManager[] sessionManagers = new RemoteSessionManager[totalSessionSize]; - for (int i = 0; i < totalSessionSize; i++) { - final int port = listenPorts[i], index = i; - threadPool.execute(() -> { - // 创建本地节点 - final LocalNode localNode = new LocalNode(remoteHost, port, messageExecutor); - try { - // 启动当前节点 - RemoteSessionManager sessionManager = new RemoteSessionManager(localNode); - sessionManagers[index] = sessionManager; - System.out.printf("Current Node {%s} start success !!! \r\n", localNode.toString()); - } catch (Exception e) { - e.printStackTrace(); - } finally { - countDownLatch.countDown(); - } - }); - } - - // 等待所有节点启动完成 - try { - countDownLatch.await(); - } catch (Exception e) { - e.printStackTrace(); - } - - return sessionManagers; - } - - -} \ No newline at end of file diff --git a/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpReceiversBootTest.java b/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpReceiversBootTest.java deleted file mode 100644 index 1cd6e985..00000000 --- a/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpReceiversBootTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright: Copyright 2016-2020 JD.COM All Right Reserved - * FileName: com.jd.blockchain.StpReceiversBootTest - * Author: shaozhuguang - * Department: Y事业部 - * Date: 2019/4/18 下午3:53 - * Description: - */ -package com.jd.blockchain; - -import com.jd.blockchain.stp.commucation.MyMessageExecutor; -import com.jd.blockchain.stp.commucation.StpReceiversBoot; -import com.jd.blockchain.stp.communication.manager.RemoteSessionManager; -import org.junit.Test; - -/** - * - * @author shaozhuguang - * @create 2019/4/18 - * @since 1.0.0 - */ - -public class StpReceiversBootTest { - - public static final int[] localPorts = new int[]{9900, 9901}; - - @Test - public void test() { - StpReceiversBoot stpReceiversBoot = new StpReceiversBoot(9900, 9901); - RemoteSessionManager[] sessionManagers = stpReceiversBoot.start(new MyMessageExecutor()); - - try { - Thread.sleep(10000); - - // 关闭所有的监听器 - for (RemoteSessionManager sessionManager : sessionManagers) { - sessionManager.connectionManager().closeReceiver(); - } - } catch (Exception e) { - e.printStackTrace(); - } - } -} \ No newline at end of file diff --git a/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpSenderTest.java b/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpSenderTest.java deleted file mode 100644 index 29741ae0..00000000 --- a/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpSenderTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Copyright: Copyright 2016-2020 JD.COM All Right Reserved - * FileName: com.jd.blockchain.StpSenderTest - * Author: shaozhuguang - * Department: Y事业部 - * Date: 2019/4/18 下午3:56 - * Description: - */ -package com.jd.blockchain; - -import com.jd.blockchain.stp.commucation.MyMessageExecutor; -import com.jd.blockchain.stp.commucation.StpReceiversBoot; -import com.jd.blockchain.stp.communication.RemoteSession; -import com.jd.blockchain.stp.communication.callback.CallBackBarrier; -import com.jd.blockchain.stp.communication.callback.CallBackDataListener; -import com.jd.blockchain.stp.communication.manager.RemoteSessionManager; -import com.jd.blockchain.stp.communication.message.LoadMessage; -import com.jd.blockchain.stp.communication.node.RemoteNode; -import org.junit.Test; - -import java.util.Iterator; -import java.util.LinkedList; - -/** - * - * @author shaozhuguang - * @create 2019/4/18 - * @since 1.0.0 - */ - -public class StpSenderTest { - - // 本地的端口 - private static final int localPort = 9800; - - // 连接的远端端口集合 - private static final int[] remotePorts = StpReceiversBootTest.localPorts; - - // 本地节点信息 - private static final String localHost = "127.0.0.1"; - - @Test - public void test() { - // 首先启动本地节点 - StpReceiversBoot stpReceiversBoot = new StpReceiversBoot(localPort); - RemoteSessionManager[] sessionManagers = stpReceiversBoot.start(new MyMessageExecutor()); - - // 本地节点启动完成后 - if (sessionManagers != null && sessionManagers.length > 0) { - RemoteSessionManager localSessionManager = sessionManagers[0]; - - // 连接远端的两个节点 - RemoteNode[] remoteNodes = new RemoteNode[]{ - new RemoteNode(localHost, remotePorts[0]), - new RemoteNode(localHost, remotePorts[1]) - }; - - RemoteSession[] remoteSessions = localSessionManager.newSessions(remoteNodes); - - // 生成请求对象 - LoadMessage loadMessage = new StpTest.StpLoadMessage(localHost + ":" + localPort); - - // 异步发送处理过程 - CallBackBarrier callBackBarrier = CallBackBarrier.newCallBackBarrier(remoteSessions.length, 10000); - - // 发送请求至remotes - LinkedList responses = new LinkedList<>(); - for (RemoteSession remoteSession : remoteSessions) { - CallBackDataListener response = remoteSession.asyncRequest(loadMessage, callBackBarrier); - responses.addLast(response); - } - - // 超时判断 - try { - if (callBackBarrier.tryCall()) { - - // 说明结果已经全部返回 - // 打印出所有的结果 - // 通过迭代器遍历链表 - Iterator iterator = responses.iterator(); - while (iterator.hasNext()) { - CallBackDataListener response = iterator.next(); - // 判断是否已完成,对于没有完成的直接放弃(因为已经超时) - if (response.isDone()) { - System.out.printf("Receive Response {%s} {%s} \r\n", - response.remoteNode().toString(), new String(response.getCallBackData())); - } - } - } - Thread.sleep(Integer.MAX_VALUE); - } catch (Exception e) { - e.printStackTrace(); - } - } - } -} \ No newline at end of file diff --git a/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpTest.java b/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpTest.java deleted file mode 100644 index 2e66c8e1..00000000 --- a/source/test/test-stp-community/src/test/java/com/jd/blockchain/StpTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Copyright: Copyright 2016-2020 JD.COM All Right Reserved - * FileName: com.jd.blockchain.StpTest - * Author: shaozhuguang - * Department: Y事业部 - * Date: 2019/4/11 下午3:31 - * Description: - */ -package com.jd.blockchain; - -import com.jd.blockchain.stp.commucation.MyMessageExecutor; -import com.jd.blockchain.stp.communication.RemoteSession; -import com.jd.blockchain.stp.communication.callback.CallBackBarrier; -import com.jd.blockchain.stp.communication.callback.CallBackDataListener; -import com.jd.blockchain.stp.communication.manager.RemoteSessionManager; -import com.jd.blockchain.stp.communication.message.LoadMessage; -import com.jd.blockchain.stp.communication.node.LocalNode; -import com.jd.blockchain.stp.communication.node.RemoteNode; -import org.junit.Before; -import org.junit.Test; - -import java.nio.charset.Charset; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertNull; - -/** - * - * @author shaozhuguang - * @create 2019/4/11 - * @since 1.0.0 - */ - -public class StpTest { - - private int maxWaitTime = 2000; - - private final String remoteHost = "127.0.0.1"; - - private final int localPort = 9001; - - private final int[] listenPorts = new int[]{9001, 9002, 9003, 9004}; - - private final RemoteSessionManager[] sessionManagers = new RemoteSessionManager[listenPorts.length]; - - private final ExecutorService threadPool = Executors.newFixedThreadPool(6); - - private RemoteSession[] remoteSessions; - - @Before - public void init() { - - System.out.println("---------- listenStart -----------"); - listenStart(); - System.out.println("---------- listenComplete -----------"); - System.out.println("---------- ConnectionStart ----------"); - connectOneOther(); - System.out.println("---------- ConnectionComplete ----------"); - } - - private void listenStart() { - CountDownLatch countDownLatch = new CountDownLatch(listenPorts.length); - - for (int i = 0; i < listenPorts.length; i++) { - final int port = listenPorts[i], index = i; - threadPool.execute(() -> { - // 创建本地节点 - final LocalNode localNode = new LocalNode(remoteHost, port, new MyMessageExecutor()); - try { - // 启动当前节点 - RemoteSessionManager sessionManager = new RemoteSessionManager(localNode); - sessionManagers[index] = sessionManager; - System.out.printf("Current Node {%s} start success !!! \r\n", localNode.toString()); - } catch (Exception e) { - e.printStackTrace(); - } finally { - countDownLatch.countDown(); - } - }); - } - - // 等待所有节点启动完成 - try { - countDownLatch.await(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - private void connectAllOthers() { - // 所有节点完成之后,需要启动 - // 启动一个节点 - RemoteSessionManager starter = sessionManagers[0]; - - // 当前节点需要连接到其他3个节点 - RemoteNode[] remoteNodes = new RemoteNode[listenPorts.length - 1]; - int index = 0; - for (int port : listenPorts) { - if (port != localPort) { - remoteNodes[index++] = new RemoteNode(remoteHost, port); - } - } - - remoteSessions = starter.newSessions(remoteNodes); - } - - private void connectOneOther() { - // 所有节点完成之后,需要启动 - // 启动一个节点 - RemoteSessionManager starter = sessionManagers[0]; - - // 当前节点需要连接到其他3个节点 - RemoteNode[] remoteNodes = new RemoteNode[1]; - int index = 0; - for (int port : listenPorts) { - if (port != localPort && index < 1) { - remoteNodes[index++] = new RemoteNode(remoteHost, port); - } - } - - remoteSessions = starter.newSessions(remoteNodes); - } - - private void connectOneErrorNode() { - // 所有节点完成之后,需要启动 - // 启动一个节点 - RemoteSessionManager starter = sessionManagers[0]; - - // 当前节点需要连接到其他3个节点 - RemoteNode[] remoteNodes = new RemoteNode[1]; - - remoteNodes[0] = new RemoteNode(remoteHost, 10001); - - remoteSessions = starter.newSessions(remoteNodes); - - assertNull(remoteSessions); - } - - - @Test - public void test() { - - try { - Thread.sleep(3000); - - } catch (Exception e) { - e.printStackTrace(); - } - - // 生成请求对象 - LoadMessage loadMessage = new StpLoadMessage(remoteHost + ":" + localPort); - - // 异步发送处理过程 - CallBackBarrier callBackBarrier = CallBackBarrier.newCallBackBarrier(remoteSessions.length, 10000); - - // 发送请求至remotes - LinkedList responses = new LinkedList<>(); - for (RemoteSession remoteSession : remoteSessions) { - CallBackDataListener response = remoteSession.asyncRequest(loadMessage, callBackBarrier); - responses.addLast(response); - } - - // 超时判断 - try { - if (callBackBarrier.tryCall()) { - - // 说明结果已经全部返回 - // 打印出所有的结果 - // 通过迭代器遍历链表 - Iterator iterator = responses.iterator(); - while (iterator.hasNext()) { - CallBackDataListener response = iterator.next(); - // 判断是否已完成,对于没有完成的直接放弃(因为已经超时) - if (response.isDone()) { - System.out.printf("Receive Response {%s} {%s} \r\n", - response.remoteNode().toString(), new String(response.getCallBackData())); - } - } - } - Thread.sleep(Integer.MAX_VALUE); - } catch (Exception e) { - e.printStackTrace(); - } - - - } - - public static class StpLoadMessage implements LoadMessage { - - private String localInfo; - - public StpLoadMessage(String localInfo) { - this.localInfo = localInfo; - } - - @Override - public byte[] toBytes() { - String msg = localInfo + " -> Send !!!"; - return msg.getBytes(Charset.defaultCharset()); - } - } -} \ No newline at end of file