Browse Source

Modify the information returned by LedgerSetting, And Adjust Log Printing to LOGGER !

tags/1.0.0
shaozhuguang 6 years ago
parent
commit
376753b8d2
11 changed files with 19 additions and 541 deletions
  1. +1
    -19
      source/gateway/src/main/java/com/jd/blockchain/gateway/service/GatewayQueryServiceHandler.java
  2. +1
    -0
      source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/ParticipantNode.java
  3. +14
    -8
      source/peer/src/main/java/com/jd/blockchain/peer/web/PeerTimeTasks.java
  4. +3
    -20
      source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/LedgerInitSettings.java
  5. +0
    -1
      source/test/pom.xml
  6. +0
    -36
      source/test/test-stp-community/pom.xml
  7. +0
    -37
      source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java
  8. +0
    -75
      source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/StpReceiversBoot.java
  9. +0
    -43
      source/test/test-stp-community/src/test/java/com/jd/blockchain/StpReceiversBootTest.java
  10. +0
    -96
      source/test/test-stp-community/src/test/java/com/jd/blockchain/StpSenderTest.java
  11. +0
    -206
      source/test/test-stp-community/src/test/java/com/jd/blockchain/StpTest.java

+ 1
- 19
source/gateway/src/main/java/com/jd/blockchain/gateway/service/GatewayQueryServiceHandler.java View File

@@ -81,7 +81,7 @@ public class GatewayQueryServiceHandler implements GatewayQueryService {
ledgerInitSettings.setSeed(initSeed(ledgerMetadata.getSeed()));

// 设置共识协议
ledgerInitSettings.setConsensusProtocol(consensusProtocol(ledgerMetadata.getSetting().getConsensusProvider()));
ledgerInitSettings.setConsensusProtocol(ledgerMetadata.getSetting().getConsensusProvider());

return ledgerInitSettings;
}
@@ -110,24 +110,6 @@ public class GatewayQueryServiceHandler implements GatewayQueryService {
return seed.toString();
}

/**
* 生成共识协议
*
* @param consensusProvider
* 共识协议提提供者
* @return
*/
private int consensusProtocol(String consensusProvider) {

if (consensusProvider.equals(BftsmartConsensusProvider.NAME)) {
return LedgerInitSettings.CONSENSUS_PROTOCOL.BFTSMART.code();
} else if (consensusProvider.equals(MsgQueueConsensusProvider.NAME)) {
return LedgerInitSettings.CONSENSUS_PROTOCOL.MSGQUEUE.code();
}

return LedgerInitSettings.CONSENSUS_PROTOCOL.UNKNOWN.code();
}

/**
* 初始化共识配置
*


+ 1
- 0
source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/ParticipantNode.java View File

@@ -22,6 +22,7 @@ public interface ParticipantNode {// extends ConsensusNode, ParticipantInfo {
*
* @return
*/
@DataField(order = 0, primitiveType = PrimitiveType.INT32)
int getId();

/**


+ 14
- 8
source/peer/src/main/java/com/jd/blockchain/peer/web/PeerTimeTasks.java View File

@@ -15,11 +15,11 @@ import com.jd.blockchain.peer.ConsensusManage;
import com.jd.blockchain.peer.LedgerBindingConfigAware;
import com.jd.blockchain.peer.PeerServerBooter;
import com.jd.blockchain.tools.initializer.LedgerBindingConfig;
import com.jd.blockchain.utils.ArgumentSet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.io.ClassPathResource;
@@ -38,9 +38,11 @@ import java.util.*;
* @since 1.0.0
*/
@Component
//@EnableScheduling
@EnableScheduling
public class PeerTimeTasks implements ApplicationContextAware {

private static Logger LOGGER = LoggerFactory.getLogger(PeerTimeTasks.class);

private ApplicationContext applicationContext;

@Autowired
@@ -51,7 +53,9 @@ public class PeerTimeTasks implements ApplicationContextAware {
//每1分钟执行一次
@Scheduled(cron = "0 */5 * * * * ")
public void updateLedger(){
System.out.println ("Update Ledger Tasks Start " + new Date());

LOGGER.debug("Time Task Update Ledger Tasks Start {}", new Date());

try {
LedgerBindingConfig ledgerBindingConfig = loadLedgerBindingConfig();

@@ -78,7 +82,8 @@ public class PeerTimeTasks implements ApplicationContextAware {
Map<String, LedgerBindingConfigAware> bindingConfigAwares = applicationContext.getBeansOfType(LedgerBindingConfigAware.class);
List<NodeServer> nodeServers = new ArrayList<>();
for (HashDigest ledgerHash : newAddHashs) {
System.out.printf("newLedger[%s] \r\n", ledgerHash.toBase58());

LOGGER.info("New Ledger [{}] Need To Be Init !!!", ledgerHash.toBase58());
for (LedgerBindingConfigAware aware : bindingConfigAwares.values()) {
nodeServers.add(aware.setConfig(ledgerBindingConfig, ledgerHash));
}
@@ -89,10 +94,10 @@ public class PeerTimeTasks implements ApplicationContextAware {
consensusManage.runRealm(nodeServer);
}
} else {
System.out.println("All Ledgers is newest!!!");
LOGGER.debug("All Ledgers is newest!!!");
}
} catch (Exception e) {
e.printStackTrace();
LOGGER.error(e.getMessage());
}
}

@@ -104,7 +109,8 @@ public class PeerTimeTasks implements ApplicationContextAware {
private LedgerBindingConfig loadLedgerBindingConfig() throws Exception {
LedgerBindingConfig ledgerBindingConfig;
ledgerBindConfigFile = PeerServerBooter.ledgerBindConfigFile;
System.out.printf("load ledgerBindConfigFile = %s \r\n", ledgerBindConfigFile);
LOGGER.debug("Load LedgerBindConfigFile path = {}",
ledgerBindConfigFile == null ? "Default" : ledgerBindConfigFile);
if (ledgerBindConfigFile == null) {
ClassPathResource configResource = new ClassPathResource("ledger-binding.conf");
InputStream in = configResource.getInputStream();


+ 3
- 20
source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/LedgerInitSettings.java View File

@@ -34,7 +34,7 @@ public class LedgerInitSettings {
/**
* 共识协议
*/
private int consensusProtocol;
private String consensusProtocol;

/**
* 共识配置
@@ -70,11 +70,11 @@ public class LedgerInitSettings {
this.cryptoSetting = cryptoSetting;
}

public int getConsensusProtocol() {
public String getConsensusProtocol() {
return consensusProtocol;
}

public void setConsensusProtocol(int consensusProtocol) {
public void setConsensusProtocol(String consensusProtocol) {
this.consensusProtocol = consensusProtocol;
}

@@ -93,21 +93,4 @@ public class LedgerInitSettings {
public void setParticipantNodes(ParticipantNode[] participantNodes) {
this.participantNodes = participantNodes;
}

public enum CONSENSUS_PROTOCOL {
UNKNOWN(0),
BFTSMART(1),
MSGQUEUE(2),
;

private int code;

CONSENSUS_PROTOCOL(int code) {
this.code = code;
}

public int code() {
return code;
}
}
}

+ 0
- 1
source/test/pom.xml View File

@@ -15,7 +15,6 @@
<module>test-consensus-node</module>
<module>test-ledger-core</module>
<module>test-integration</module>
<module>test-stp-community</module>
</modules>
<build>


+ 0
- 36
source/test/test-stp-community/pom.xml View File

@@ -1,36 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>test</artifactId>
<groupId>com.jd.blockchain</groupId>
<version>0.9.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>test-stp-community</artifactId>

<name>test-stp-community</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>

<dependency>
<groupId>com.jd.blockchain</groupId>
<artifactId>stp-communication</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

+ 0
- 37
source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/MyMessageExecutor.java View File

@@ -1,37 +0,0 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.commucation.MyMessageExecutor
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/17 下午3:38
* Description:
*/
package com.jd.blockchain.stp.commucation;

import com.jd.blockchain.stp.communication.MessageExecutor;
import com.jd.blockchain.stp.communication.RemoteSession;

import java.nio.charset.Charset;

/**
*
* @author shaozhuguang
* @create 2019/4/17
* @since 1.0.0
*/

public class MyMessageExecutor implements MessageExecutor {

@Override
public byte[] receive(String key, byte[] data, RemoteSession session) {
String receiveMsg = new String(data, Charset.defaultCharset());
System.out.printf("receive client {%s} request {%s} \r\n", session.remoteNode().toString(), receiveMsg);
String msg = session.localId() + " -> received !!!";
return msg.getBytes(Charset.defaultCharset());
}

@Override
public REPLY replyType() {
return REPLY.AUTO;
}
}

+ 0
- 75
source/test/test-stp-community/src/main/java/com/jd/blockchain/stp/commucation/StpReceiversBoot.java View File

@@ -1,75 +0,0 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.StpReceiversBoot
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/18 下午3:44
* Description:
*/
package com.jd.blockchain.stp.commucation;

import com.jd.blockchain.stp.communication.MessageExecutor;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import com.jd.blockchain.stp.communication.node.LocalNode;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
*
* @author shaozhuguang
* @create 2019/4/18
* @since 1.0.0
*/

public class StpReceiversBoot {

private int[] listenPorts;

private final String remoteHost = "127.0.0.1";

private ExecutorService threadPool;

public StpReceiversBoot(int... ports) {
listenPorts = ports;
threadPool = Executors.newFixedThreadPool(ports.length + 2);
}

public RemoteSessionManager[] start(MessageExecutor messageExecutor) {

final int totalSessionSize = listenPorts.length;

CountDownLatch countDownLatch = new CountDownLatch(totalSessionSize);

RemoteSessionManager[] sessionManagers = new RemoteSessionManager[totalSessionSize];
for (int i = 0; i < totalSessionSize; i++) {
final int port = listenPorts[i], index = i;
threadPool.execute(() -> {
// 创建本地节点
final LocalNode localNode = new LocalNode(remoteHost, port, messageExecutor);
try {
// 启动当前节点
RemoteSessionManager sessionManager = new RemoteSessionManager(localNode);
sessionManagers[index] = sessionManager;
System.out.printf("Current Node {%s} start success !!! \r\n", localNode.toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
}

// 等待所有节点启动完成
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}

return sessionManagers;
}


}

+ 0
- 43
source/test/test-stp-community/src/test/java/com/jd/blockchain/StpReceiversBootTest.java View File

@@ -1,43 +0,0 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.StpReceiversBootTest
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/18 下午3:53
* Description:
*/
package com.jd.blockchain;

import com.jd.blockchain.stp.commucation.MyMessageExecutor;
import com.jd.blockchain.stp.commucation.StpReceiversBoot;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import org.junit.Test;

/**
*
* @author shaozhuguang
* @create 2019/4/18
* @since 1.0.0
*/

public class StpReceiversBootTest {

public static final int[] localPorts = new int[]{9900, 9901};

@Test
public void test() {
StpReceiversBoot stpReceiversBoot = new StpReceiversBoot(9900, 9901);
RemoteSessionManager[] sessionManagers = stpReceiversBoot.start(new MyMessageExecutor());

try {
Thread.sleep(10000);

// 关闭所有的监听器
for (RemoteSessionManager sessionManager : sessionManagers) {
sessionManager.connectionManager().closeReceiver();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

+ 0
- 96
source/test/test-stp-community/src/test/java/com/jd/blockchain/StpSenderTest.java View File

@@ -1,96 +0,0 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.StpSenderTest
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/18 下午3:56
* Description:
*/
package com.jd.blockchain;

import com.jd.blockchain.stp.commucation.MyMessageExecutor;
import com.jd.blockchain.stp.commucation.StpReceiversBoot;
import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.callback.CallBackBarrier;
import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import com.jd.blockchain.stp.communication.message.LoadMessage;
import com.jd.blockchain.stp.communication.node.RemoteNode;
import org.junit.Test;

import java.util.Iterator;
import java.util.LinkedList;

/**
*
* @author shaozhuguang
* @create 2019/4/18
* @since 1.0.0
*/

public class StpSenderTest {

// 本地的端口
private static final int localPort = 9800;

// 连接的远端端口集合
private static final int[] remotePorts = StpReceiversBootTest.localPorts;

// 本地节点信息
private static final String localHost = "127.0.0.1";

@Test
public void test() {
// 首先启动本地节点
StpReceiversBoot stpReceiversBoot = new StpReceiversBoot(localPort);
RemoteSessionManager[] sessionManagers = stpReceiversBoot.start(new MyMessageExecutor());

// 本地节点启动完成后
if (sessionManagers != null && sessionManagers.length > 0) {
RemoteSessionManager localSessionManager = sessionManagers[0];

// 连接远端的两个节点
RemoteNode[] remoteNodes = new RemoteNode[]{
new RemoteNode(localHost, remotePorts[0]),
new RemoteNode(localHost, remotePorts[1])
};

RemoteSession[] remoteSessions = localSessionManager.newSessions(remoteNodes);

// 生成请求对象
LoadMessage loadMessage = new StpTest.StpLoadMessage(localHost + ":" + localPort);

// 异步发送处理过程
CallBackBarrier callBackBarrier = CallBackBarrier.newCallBackBarrier(remoteSessions.length, 10000);

// 发送请求至remotes
LinkedList<CallBackDataListener> responses = new LinkedList<>();
for (RemoteSession remoteSession : remoteSessions) {
CallBackDataListener response = remoteSession.asyncRequest(loadMessage, callBackBarrier);
responses.addLast(response);
}

// 超时判断
try {
if (callBackBarrier.tryCall()) {

// 说明结果已经全部返回
// 打印出所有的结果
// 通过迭代器遍历链表
Iterator<CallBackDataListener> iterator = responses.iterator();
while (iterator.hasNext()) {
CallBackDataListener response = iterator.next();
// 判断是否已完成,对于没有完成的直接放弃(因为已经超时)
if (response.isDone()) {
System.out.printf("Receive Response {%s} {%s} \r\n",
response.remoteNode().toString(), new String(response.getCallBackData()));
}
}
}
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

+ 0
- 206
source/test/test-stp-community/src/test/java/com/jd/blockchain/StpTest.java View File

@@ -1,206 +0,0 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.StpTest
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 下午3:31
* Description:
*/
package com.jd.blockchain;

import com.jd.blockchain.stp.commucation.MyMessageExecutor;
import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.callback.CallBackBarrier;
import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import com.jd.blockchain.stp.communication.message.LoadMessage;
import com.jd.blockchain.stp.communication.node.LocalNode;
import com.jd.blockchain.stp.communication.node.RemoteNode;
import org.junit.Before;
import org.junit.Test;

import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.junit.Assert.assertNull;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class StpTest {

private int maxWaitTime = 2000;

private final String remoteHost = "127.0.0.1";

private final int localPort = 9001;

private final int[] listenPorts = new int[]{9001, 9002, 9003, 9004};

private final RemoteSessionManager[] sessionManagers = new RemoteSessionManager[listenPorts.length];

private final ExecutorService threadPool = Executors.newFixedThreadPool(6);

private RemoteSession[] remoteSessions;

@Before
public void init() {

System.out.println("---------- listenStart -----------");
listenStart();
System.out.println("---------- listenComplete -----------");
System.out.println("---------- ConnectionStart ----------");
connectOneOther();
System.out.println("---------- ConnectionComplete ----------");
}

private void listenStart() {
CountDownLatch countDownLatch = new CountDownLatch(listenPorts.length);

for (int i = 0; i < listenPorts.length; i++) {
final int port = listenPorts[i], index = i;
threadPool.execute(() -> {
// 创建本地节点
final LocalNode localNode = new LocalNode(remoteHost, port, new MyMessageExecutor());
try {
// 启动当前节点
RemoteSessionManager sessionManager = new RemoteSessionManager(localNode);
sessionManagers[index] = sessionManager;
System.out.printf("Current Node {%s} start success !!! \r\n", localNode.toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
}

// 等待所有节点启动完成
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}

private void connectAllOthers() {
// 所有节点完成之后,需要启动
// 启动一个节点
RemoteSessionManager starter = sessionManagers[0];

// 当前节点需要连接到其他3个节点
RemoteNode[] remoteNodes = new RemoteNode[listenPorts.length - 1];
int index = 0;
for (int port : listenPorts) {
if (port != localPort) {
remoteNodes[index++] = new RemoteNode(remoteHost, port);
}
}

remoteSessions = starter.newSessions(remoteNodes);
}

private void connectOneOther() {
// 所有节点完成之后,需要启动
// 启动一个节点
RemoteSessionManager starter = sessionManagers[0];

// 当前节点需要连接到其他3个节点
RemoteNode[] remoteNodes = new RemoteNode[1];
int index = 0;
for (int port : listenPorts) {
if (port != localPort && index < 1) {
remoteNodes[index++] = new RemoteNode(remoteHost, port);
}
}

remoteSessions = starter.newSessions(remoteNodes);
}

private void connectOneErrorNode() {
// 所有节点完成之后,需要启动
// 启动一个节点
RemoteSessionManager starter = sessionManagers[0];

// 当前节点需要连接到其他3个节点
RemoteNode[] remoteNodes = new RemoteNode[1];

remoteNodes[0] = new RemoteNode(remoteHost, 10001);

remoteSessions = starter.newSessions(remoteNodes);

assertNull(remoteSessions);
}


@Test
public void test() {

try {
Thread.sleep(3000);

} catch (Exception e) {
e.printStackTrace();
}

// 生成请求对象
LoadMessage loadMessage = new StpLoadMessage(remoteHost + ":" + localPort);

// 异步发送处理过程
CallBackBarrier callBackBarrier = CallBackBarrier.newCallBackBarrier(remoteSessions.length, 10000);

// 发送请求至remotes
LinkedList<CallBackDataListener> responses = new LinkedList<>();
for (RemoteSession remoteSession : remoteSessions) {
CallBackDataListener response = remoteSession.asyncRequest(loadMessage, callBackBarrier);
responses.addLast(response);
}

// 超时判断
try {
if (callBackBarrier.tryCall()) {

// 说明结果已经全部返回
// 打印出所有的结果
// 通过迭代器遍历链表
Iterator<CallBackDataListener> iterator = responses.iterator();
while (iterator.hasNext()) {
CallBackDataListener response = iterator.next();
// 判断是否已完成,对于没有完成的直接放弃(因为已经超时)
if (response.isDone()) {
System.out.printf("Receive Response {%s} {%s} \r\n",
response.remoteNode().toString(), new String(response.getCallBackData()));
}
}
}
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}


}

public static class StpLoadMessage implements LoadMessage {

private String localInfo;

public StpLoadMessage(String localInfo) {
this.localInfo = localInfo;
}

@Override
public byte[] toBytes() {
String msg = localInfo + " -> Send !!!";
return msg.getBytes(Charset.defaultCharset());
}
}
}

Loading…
Cancel
Save