Browse Source

modify gateway can read from many peers

tags/1.1.10
shaozhuguang 5 years ago
parent
commit
8e684eb4bc
10 changed files with 656 additions and 236 deletions
  1. +5
    -0
      source/gateway/pom.xml
  2. +29
    -22
      source/gateway/src/main/java/com/jd/blockchain/gateway/GatewayConfigProperties.java
  3. +17
    -4
      source/gateway/src/main/java/com/jd/blockchain/gateway/GatewayServerBooter.java
  4. +36
    -6
      source/gateway/src/main/java/com/jd/blockchain/gateway/PeerConnector.java
  5. +21
    -2
      source/gateway/src/main/java/com/jd/blockchain/gateway/PeerService.java
  6. +312
    -30
      source/gateway/src/main/java/com/jd/blockchain/gateway/service/PeerConnectionManager.java
  7. +120
    -120
      source/gateway/src/main/java/com/jd/blockchain/gateway/web/BlockBrowserController.java
  8. +91
    -0
      source/gateway/src/main/java/com/jd/blockchain/gateway/web/GatewayLedgerLoadTimer.java
  9. +0
    -46
      source/gateway/src/main/java/com/jd/blockchain/gateway/web/GatewayTimeTasks.java
  10. +25
    -6
      source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/service/PeerServiceProxy.java

+ 5
- 0
source/gateway/pom.xml View File

@@ -77,6 +77,11 @@
<artifactId>commons-io</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>


+ 29
- 22
source/gateway/src/main/java/com/jd/blockchain/gateway/GatewayConfigProperties.java View File

@@ -2,14 +2,14 @@ package com.jd.blockchain.gateway;

import java.io.File;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.*;

import com.jd.blockchain.utils.IllegalDataException;
import com.jd.blockchain.utils.io.FileUtils;
import com.jd.blockchain.utils.net.NetworkAddress;

public class GatewayConfigProperties {

// HTTP协议相关配置项的键的前缀;
public static final String HTTP_PREFIX = "http.";
// 网关的HTTP服务地址;
@@ -21,12 +21,14 @@ public class GatewayConfigProperties {

// 共识相关配置项的键的前缀;
public static final String PEER_PREFIX = "peer.";
// 共识节点的数量
public static final String PEER_SIZE = PEER_PREFIX + "size";
// 共识节点的服务地址;
public static final String PEER_HOST = PEER_PREFIX + "host";
public static final String PEER_HOST_FORMAT = PEER_PREFIX + "%s.host";
// 共识节点的服务端口;
public static final String PEER_PORT = PEER_PREFIX + "port";
public static final String PEER_PORT_FORMAT = PEER_PREFIX + "%s.port";
// 共识节点的服务是否启用安全证书;
public static final String PEER_SECURE = PEER_PREFIX + "secure";
public static final String PEER_SECURE_FORMAT = PEER_PREFIX + "%s.secure";
// 支持共识的Provider列表,以英文逗号分隔
public static final String PEER_PROVIDERS = PEER_PREFIX + "providers";

@@ -52,7 +54,7 @@ public class GatewayConfigProperties {

private ProviderConfig providerConfig = new ProviderConfig();

private NetworkAddress masterPeerAddress;
private Set<NetworkAddress> masterPeerAddresses = new HashSet<>();

private String dataRetrievalUrl;
private String schemaRetrievalUrl;
@@ -63,8 +65,8 @@ public class GatewayConfigProperties {
return http;
}

public NetworkAddress masterPeerAddress() {
return masterPeerAddress;
public Set<NetworkAddress> masterPeerAddresses() {
return masterPeerAddresses;
}

public String dataRetrievalUrl() {
@@ -87,11 +89,11 @@ public class GatewayConfigProperties {
return providerConfig;
}

public void setMasterPeerAddress(NetworkAddress peerAddress) {
public void addMasterPeerAddress(NetworkAddress peerAddress) {
if (peerAddress == null) {
throw new IllegalArgumentException("peerAddress is null!");
}
this.masterPeerAddress = peerAddress;
this.masterPeerAddresses.add(peerAddress);
}

public KeysConfig keys() {
@@ -122,11 +124,16 @@ public class GatewayConfigProperties {
configProps.http.port = getInt(props, HTTP_PORT, true);
configProps.http.contextPath = getProperty(props, HTTP_CONTEXT_PATH, false);

String peerHost = getProperty(props, PEER_HOST, true);
int peerPort = getInt(props, PEER_PORT, true);
boolean peerSecure = getBoolean(props, PEER_SECURE, false);
configProps.masterPeerAddress = new NetworkAddress(peerHost, peerPort, peerSecure);

int peerSize = getInt(props, PEER_SIZE, true);
if (peerSize <= 0) {
throw new IllegalDataException("Peer size is illegal !!!");
}
for (int i = 0; i < peerSize; i++) {
String peerHost = getProperty(props, String.format(PEER_HOST_FORMAT, i), true);
int peerPort = getInt(props, String.format(PEER_PORT_FORMAT, i), true);
boolean peerSecure = getBoolean(props, String.format(PEER_SECURE_FORMAT, i), false);
configProps.addMasterPeerAddress(new NetworkAddress(peerHost, peerPort, peerSecure));
}
String dataRetrievalUrl = getProperty(props, DATA_RETRIEVAL_URL, true);
configProps.dataRetrievalUrl = dataRetrievalUrl;

@@ -211,7 +218,7 @@ public class GatewayConfigProperties {

private HttpConfig() {
}
private HttpConfig(String host, int port, String contextPath) {
this.host = host;
this.port = port;
@@ -221,7 +228,7 @@ public class GatewayConfigProperties {
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
@@ -229,7 +236,7 @@ public class GatewayConfigProperties {
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
@@ -237,7 +244,7 @@ public class GatewayConfigProperties {
public String getContextPath() {
return contextPath;
}
public void setContextPath(String contextPath) {
this.contextPath = contextPath;
}
@@ -265,7 +272,7 @@ public class GatewayConfigProperties {
public String getPrivKeyPath() {
return privKeyPath;
}
public String getPrivKeyValue() {
return privKeyValue;
}
@@ -293,7 +300,7 @@ public class GatewayConfigProperties {
public void setPrivKeyPassword(String privKeyPassword) {
this.privKeyPassword = privKeyPassword;
}
}

}

+ 17
- 4
source/gateway/src/main/java/com/jd/blockchain/gateway/GatewayServerBooter.java View File

@@ -5,7 +5,10 @@ import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import com.jd.blockchain.gateway.web.GatewayLedgerLoadTimer;
import com.jd.blockchain.utils.net.NetworkAddress;
import org.apache.commons.io.FileUtils;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
@@ -82,8 +85,6 @@ public class GatewayServerBooter {
ConsoleUtils.info("Starting web server......");
GatewayServerBooter booter = new GatewayServerBooter(configProps,springConfigLocation);
booter.start();

ConsoleUtils.info("Peer[%s] is connected success!", configProps.masterPeerAddress().toString());
} catch (Exception e) {
ConsoleUtils.error("Error!! %s", e.getMessage());
if (debug) {
@@ -131,8 +132,20 @@ public class GatewayServerBooter {
blockBrowserController.setDataRetrievalUrl(config.dataRetrievalUrl());
blockBrowserController.setSchemaRetrievalUrl(config.getSchemaRetrievalUrl());
PeerConnector peerConnector = appCtx.getBean(PeerConnector.class);
peerConnector.connect(config.masterPeerAddress(), defaultKeyPair, config.providerConfig().getProviders());
ConsoleUtils.info("Peer[%s] is connected success!", config.masterPeerAddress().toString());

Set<NetworkAddress> peerAddresses = config.masterPeerAddresses();
StringBuilder peerAddressBuf = new StringBuilder();
for (NetworkAddress peerAddress : peerAddresses) {
peerConnector.connect(peerAddress, defaultKeyPair, config.providerConfig().getProviders());
if (peerAddressBuf.length() > 0) {
peerAddressBuf.append(",");
}
peerAddressBuf.append(peerAddress.toString());
}
// 不管连接是否成功,都需要释放许可
GatewayLedgerLoadTimer loadTimer = appCtx.getBean(GatewayLedgerLoadTimer.class);
loadTimer.release();
ConsoleUtils.info("Peer[%s] is connected success!", peerAddressBuf.toString());
}

public synchronized void close() {


+ 36
- 6
source/gateway/src/main/java/com/jd/blockchain/gateway/PeerConnector.java View File

@@ -4,17 +4,47 @@ import com.jd.blockchain.crypto.AsymmetricKeypair;
import com.jd.blockchain.utils.net.NetworkAddress;

import java.util.List;
import java.util.Set;

public interface PeerConnector {
NetworkAddress getPeerAddress();

/**
* 获取Peer地址列表
*
* @return
*/
Set<NetworkAddress> getPeerAddresses();

/**
* 是否连接成功
*
* @return
*/
boolean isConnected();

/**
* 连接至指定Peer节点
*
* @param peerAddress
* Peer地址
* @param defaultKeyPair
* 连接Peer所需公私钥信息
* @param peerProviders
* 支持的Provider解析列表
*/
void connect(NetworkAddress peerAddress, AsymmetricKeypair defaultKeyPair, List<String> peerProviders);

void reconnect();
/**
* 监控重连,判断是否需要更新账本信息,再进行重连操作
* Peer地址及其他信息见${@link PeerConnector#connect(com.jd.blockchain.utils.net.NetworkAddress, com.jd.blockchain.crypto.AsymmetricKeypair, java.util.List)}
*
*/
void monitorAndReconnect();

/**
* 关闭连接
*
*/
void close();
}

+ 21
- 2
source/gateway/src/main/java/com/jd/blockchain/gateway/PeerService.java View File

@@ -1,12 +1,31 @@
package com.jd.blockchain.gateway;

import com.jd.blockchain.crypto.HashDigest;
import com.jd.blockchain.transaction.BlockchainQueryService;
import com.jd.blockchain.transaction.TransactionService;

public interface PeerService {

/**
* 获取账本数量最多的查询器
*
* @return
*/
BlockchainQueryService getQueryService();

/**
* 获取某个账本中区块高度最高的查询器
*
* @param ledgerHash
* @return
*/
BlockchainQueryService getQueryService(HashDigest ledgerHash);

/**
* 获取交易处理器
*
* @return
*/
TransactionService getTransactionService();
}

+ 312
- 30
source/gateway/src/main/java/com/jd/blockchain/gateway/service/PeerConnectionManager.java View File

@@ -2,6 +2,11 @@ package com.jd.blockchain.gateway.service;

import javax.annotation.PreDestroy;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.jd.blockchain.crypto.HashDigest;
import com.jd.blockchain.sdk.BlockchainService;
import com.jd.blockchain.sdk.service.PeerServiceProxy;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.jd.blockchain.crypto.AsymmetricKeypair;
@@ -12,76 +17,171 @@ import com.jd.blockchain.transaction.BlockchainQueryService;
import com.jd.blockchain.transaction.TransactionService;
import com.jd.blockchain.utils.net.NetworkAddress;

import java.util.List;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Component
public class PeerConnectionManager implements PeerService, PeerConnector {

private volatile PeerBlockchainServiceFactory peerServiceFactory;
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(PeerConnectionManager.class);

private volatile NetworkAddress peerAddress;
/**
* 30秒更新一次最新的情况
*/
private static final long PERIOD_SECONDS = 30L;

private final ScheduledThreadPoolExecutor peerConnectExecutor;

private final Set<HashDigest> localLedgerCache = new HashSet<>();

private final Lock ledgerHashLock = new ReentrantLock();

private Map<NetworkAddress, PeerBlockchainServiceFactory> peerBlockchainServiceFactories = new ConcurrentHashMap<>();

private Map<HashDigest, PeerBlockchainServiceFactory> latestPeerServiceFactories = new ConcurrentHashMap<>(16);

private Set<NetworkAddress> peerAddresses = new HashSet<>();

private volatile PeerServiceFactory mostLedgerPeerServiceFactory;

private volatile PeerBlockchainServiceFactory masterPeerServiceFactory;

private volatile AsymmetricKeypair gateWayKeyPair;

private volatile List<String> peerProviders;

private volatile EventListener eventListener;

public PeerConnectionManager() {
peerConnectExecutor = scheduledThreadPoolExecutor();
executorStart();
}

@Override
public NetworkAddress getPeerAddress() {
return peerAddress;
public Set<NetworkAddress> getPeerAddresses() {
return peerAddresses;
}

@Override
public boolean isConnected() {
return peerServiceFactory != null;
return !peerBlockchainServiceFactories.isEmpty();
}

@Override
public synchronized void connect(NetworkAddress peerAddress, AsymmetricKeypair defaultKeyPair, List<String> peerProviders) {
if (isConnected()) {
if (this.peerAddress.equals(peerAddress)) {
return;
if (peerAddresses.contains(peerAddress)) {
return;
}
// 连接成功的话,更新账本
ledgerHashLock.lock();
try {
addPeerAddress(peerAddress);
setGateWayKeyPair(defaultKeyPair);
setPeerProviders(peerProviders);

PeerBlockchainServiceFactory peerServiceFactory = PeerBlockchainServiceFactory.connect(defaultKeyPair, peerAddress, peerProviders);
if (peerServiceFactory != null) {
LOGGER.error("Connect peer {} success !!!", peerAddress);
// 连接成功
if (masterPeerServiceFactory == null) {
masterPeerServiceFactory = peerServiceFactory;
LOGGER.error("Master remote update to {}", peerAddress);
}
if (mostLedgerPeerServiceFactory == null) {
// 默认设置为第一个连接成功的,后续更新需要等待定时任务处理
mostLedgerPeerServiceFactory = new PeerServiceFactory(peerAddress, peerServiceFactory);
LOGGER.error("Most ledgers remote update to {}", peerAddress);
}
peerBlockchainServiceFactories.put(peerAddress, peerServiceFactory);
updateLedgerCache();
}
throw new IllegalArgumentException(
"This gateway has been connected to a peer, cann't be connected to another peer before closing it!");
} finally {
// 连接成功的话,更新账本
ledgerHashLock.unlock();
}
setPeerAddress(peerAddress);
setGateWayKeyPair(defaultKeyPair);
setPeerProviders(peerProviders);
// TODO: 未实现运行时出错时动态重连;
peerServiceFactory = PeerBlockchainServiceFactory.connect(defaultKeyPair, peerAddress, peerProviders);
}

@Override
public synchronized void reconnect() {
if (!isConnected()) {
throw new IllegalArgumentException(
"This gateway has not connected to a peer, please connect it first!!!");
public void monitorAndReconnect() {
if (getPeerAddresses().isEmpty()) {
throw new IllegalArgumentException("Peer addresses must be init first !!!");
}
/**
* 1、首先判断是否之前连接成功过,若未成功则重连,走auth逻辑
* 2、若成功,则判断对端节点的账本与当前账本是否一致,有新增的情况下重连
*/
ledgerHashLock.lock();
try {
if (isConnected()) {
// 已连接成功,判断账本信息
PeerServiceFactory serviceFactory = mostLedgerPeerServiceFactory;
if (serviceFactory == null) {
// 等待被更新
return;
}
BlockchainQueryService queryService = serviceFactory.serviceFactory.getBlockchainService();
NetworkAddress peerAddress = serviceFactory.peerAddress;

HashDigest[] peerLedgerHashs = queryService.getLedgerHashs();
if (peerLedgerHashs != null && peerLedgerHashs.length > 0) {
boolean haveNewLedger = false;
for (HashDigest hash : peerLedgerHashs) {
if (!localLedgerCache.contains(hash)) {
haveNewLedger = true;
break;
}
}
if (haveNewLedger) {
// 有新账本的情况下重连,并更新本地账本
PeerBlockchainServiceFactory peerServiceFactory = PeerBlockchainServiceFactory.connect(
gateWayKeyPair, peerAddress, peerProviders);
peerBlockchainServiceFactories.put(peerAddress, peerServiceFactory);
localLedgerCache.addAll(Arrays.asList(peerLedgerHashs));
}
}
}
// 未连接成功的情况下不处理,等待定时连接线程来处理
} finally {
ledgerHashLock.unlock();
}
peerServiceFactory = PeerBlockchainServiceFactory.connect(gateWayKeyPair, peerAddress, peerProviders);
}

@Override
public void close() {
PeerBlockchainServiceFactory serviceFactory = this.peerServiceFactory;
if (serviceFactory != null) {
this.peerServiceFactory = null;
this.peerAddress = null;
serviceFactory.close();
for (Map.Entry<NetworkAddress, PeerBlockchainServiceFactory> entry : peerBlockchainServiceFactories.entrySet()) {
PeerBlockchainServiceFactory serviceFactory = entry.getValue();
if (serviceFactory != null) {
serviceFactory.close();
}
}
peerBlockchainServiceFactories.clear();
}

@Override
public BlockchainQueryService getQueryService() {
PeerBlockchainServiceFactory serviceFactory = this.peerServiceFactory;
// 查询选择最新的连接Factory
PeerServiceFactory serviceFactory = this.mostLedgerPeerServiceFactory;
if (serviceFactory == null) {
throw new IllegalStateException("Peer connection was closed!");
}
return serviceFactory.serviceFactory.getBlockchainService();
}

@Override
public BlockchainQueryService getQueryService(HashDigest ledgerHash) {
PeerBlockchainServiceFactory serviceFactory = latestPeerServiceFactories.get(ledgerHash);
if (serviceFactory == null) {
return getQueryService();
}
return serviceFactory.getBlockchainService();
}

@Override
public TransactionService getTransactionService() {
PeerBlockchainServiceFactory serviceFactory = this.peerServiceFactory;
// 交易始终使用第一个连接成功的即可
PeerBlockchainServiceFactory serviceFactory = this.masterPeerServiceFactory;
if (serviceFactory == null) {
throw new IllegalStateException("Peer connection was closed!");
}
@@ -94,8 +194,8 @@ public class PeerConnectionManager implements PeerService, PeerConnector {
close();
}

public void setPeerAddress(NetworkAddress peerAddress) {
this.peerAddress = peerAddress;
public void addPeerAddress(NetworkAddress peerAddress) {
this.peerAddresses.add(peerAddress);
}

public void setGateWayKeyPair(AsymmetricKeypair gateWayKeyPair) {
@@ -105,4 +205,186 @@ public class PeerConnectionManager implements PeerService, PeerConnector {
public void setPeerProviders(List<String> peerProviders) {
this.peerProviders = peerProviders;
}

/**
* 更新本地账本缓存
*/
private void updateLedgerCache() {
if (isConnected()) {
HashDigest[] peerLedgerHashs = getQueryService().getLedgerHashs();
if (peerLedgerHashs != null && peerLedgerHashs.length > 0) {
localLedgerCache.addAll(Arrays.asList(peerLedgerHashs));
}
}
}

/**
* 创建定时线程池
* @return
*/
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("peer-connect-%d").build();
return new ScheduledThreadPoolExecutor(1,
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}

private void executorStart() {
// 定时任务处理线程
peerConnectExecutor.scheduleAtFixedRate(new PeerConnectRunner(), 0, PERIOD_SECONDS, TimeUnit.SECONDS);
}

private class PeerServiceFactory {

private NetworkAddress peerAddress;

private PeerBlockchainServiceFactory serviceFactory;

PeerServiceFactory(NetworkAddress peerAddress, PeerBlockchainServiceFactory serviceFactory) {
this.peerAddress = peerAddress;
this.serviceFactory = serviceFactory;
}
}

private class PeerConnectRunner implements Runnable {

@Override
public void run() {
// 包括几部分工作
// 1、重连没有连接成功的Peer;
// 2、从已经连接成功的Peer节点获取账本数量和最新的区块高度
// 3、根据目前的情况更新缓存
ledgerHashLock.lock();
try {
reconnect();
// 更新账本数量最多的节点连接
HashDigest[] ledgerHashs = updateMostLedgerPeerServiceFactory();
if (ledgerHashs != null) {
LOGGER.info("Most ledgers remote update to {}", mostLedgerPeerServiceFactory.peerAddress);
// 更新每个账本对应获取最高区块的缓存
updateLatestPeerServiceFactories(ledgerHashs);
}
} catch (Exception e) {
LOGGER.error("Peer Connect Task Error !!!", e);
} finally {
ledgerHashLock.unlock();
}
}

/**
* 更新可获取最新区块的连接工厂
*
* @param ledgerHashs
* 账本列表
*/
private void updateLatestPeerServiceFactories(HashDigest[] ledgerHashs) {
Map<HashDigest, PeerBlockchainServiceFactory> blockHeightServiceFactories = new HashMap<>();
for (HashDigest ledgerHash : ledgerHashs) {
long blockHeight = -1L;
PeerBlockchainServiceFactory serviceFactory = latestPeerServiceFactories.get(ledgerHash);
try {
if (serviceFactory != null) {
blockHeight = serviceFactory.getBlockchainService()
.getLedger(ledgerHash).getLatestBlockHeight();
blockHeightServiceFactories.put(ledgerHash, serviceFactory);
}
} catch (Exception e) {
latestPeerServiceFactories.remove(ledgerHash);
serviceFactory = null;
LOGGER.error("Peer get latest block height fail !!!", e);
}

// 查询其他所有节点对应的区块高度的情况
NetworkAddress defaultPeerAddress = null, latestPeerAddress = null;
for (Map.Entry<NetworkAddress, PeerBlockchainServiceFactory> entry : peerBlockchainServiceFactories.entrySet()) {
PeerBlockchainServiceFactory sf = entry.getValue();
if (sf != serviceFactory) {
try {
long latestBlockHeight = sf.getBlockchainService().getLedger(ledgerHash).getLatestBlockHeight();
if (latestBlockHeight > blockHeight) {
latestPeerAddress = entry.getKey();
blockHeightServiceFactories.put(ledgerHash, sf);
}
blockHeight = Math.max(latestBlockHeight, blockHeight);
} catch (Exception e) {
LOGGER.error(String.format("Peer[%s] get ledger[%s]'s latest block height fail !!!",
entry.getKey(), ledgerHash.toBase58()), e);
}
} else {
defaultPeerAddress = entry.getKey();
}
}
LOGGER.info("Ledger[{}]'s master remote update to {}", ledgerHash.toBase58(),
latestPeerAddress == null ? defaultPeerAddress : latestPeerAddress);
}
// 更新结果集
latestPeerServiceFactories.putAll(blockHeightServiceFactories);
}

/**
* 之前未连接成功的Peer节点进行重连操作
*
*/
private void reconnect() {
for (NetworkAddress peerAddress : peerAddresses) {
if (!peerBlockchainServiceFactories.containsKey(peerAddress)) {
// 重连指定节点
try {
PeerBlockchainServiceFactory peerServiceFactory = PeerBlockchainServiceFactory.connect(gateWayKeyPair, peerAddress, peerProviders);
if (peerServiceFactory != null) {
peerBlockchainServiceFactories.put(peerAddress, peerServiceFactory);
}
} catch (Exception e) {
LOGGER.error(String.format("Reconnect %s fail !!!", peerAddress), e);
}
}
}
}

private HashDigest[] updateMostLedgerPeerServiceFactory() {
int ledgerSize = -1;
if (mostLedgerPeerServiceFactory == null) {
return null;
}
HashDigest[] ledgerHashs = null;
BlockchainService blockchainService = mostLedgerPeerServiceFactory.serviceFactory.getBlockchainService();
try {
if (blockchainService instanceof PeerServiceProxy) {
ledgerHashs = ((PeerServiceProxy) blockchainService).getLedgerHashsDirect();
if (ledgerHashs != null) {
ledgerSize = ledgerHashs.length;
}
}
} catch (Exception e) {
// 连接失败的情况下清除该连接
LOGGER.error(String.format("Connect %s fail !!!", mostLedgerPeerServiceFactory.peerAddress), e);
peerBlockchainServiceFactories.remove(mostLedgerPeerServiceFactory.peerAddress);
mostLedgerPeerServiceFactory = null;
blockchainService = null;
}
PeerServiceFactory tempMostLedgerPeerServiceFactory = mostLedgerPeerServiceFactory;

// 遍历,获取对应端的账本数量及最新的区块高度
for (Map.Entry<NetworkAddress, PeerBlockchainServiceFactory> entry : peerBlockchainServiceFactories.entrySet()) {
BlockchainService loopBlockchainService = entry.getValue().getBlockchainService();
if (loopBlockchainService != blockchainService) {
// 处理账本数量
try {
if (loopBlockchainService instanceof PeerServiceProxy) {
ledgerHashs = ((PeerServiceProxy) loopBlockchainService).getLedgerHashsDirect();
if (ledgerHashs.length > ledgerSize) {
tempMostLedgerPeerServiceFactory = new PeerServiceFactory(entry.getKey(),entry.getValue());
}
}
} catch (Exception e) {
LOGGER.error(String.format("%s get ledger hash fail !!!", entry.getKey()), e);
}
}
}
// 更新mostLedgerPeerServiceFactory
mostLedgerPeerServiceFactory = tempMostLedgerPeerServiceFactory;
return ledgerHashs;
}
}
}

+ 120
- 120
source/gateway/src/main/java/com/jd/blockchain/gateway/web/BlockBrowserController.java View File

@@ -71,46 +71,46 @@ public class BlockBrowserController implements BlockchainExtendQueryService {
@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}")
@Override
public LedgerInfo getLedger(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
return peerService.getQueryService().getLedger(ledgerHash);
return peerService.getQueryService(ledgerHash).getLedger(ledgerHash);
}
@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/admininfo")
@Override
public LedgerAdminInfo getLedgerAdminInfo(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
return peerService.getQueryService().getLedgerAdminInfo(ledgerHash);
return peerService.getQueryService(ledgerHash).getLedgerAdminInfo(ledgerHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/participants")
@Override
public ParticipantNode[] getConsensusParticipants(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
return peerService.getQueryService().getConsensusParticipants(ledgerHash);
return peerService.getQueryService(ledgerHash).getConsensusParticipants(ledgerHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/metadata")
@Override
public LedgerMetadata getLedgerMetadata(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
return peerService.getQueryService().getLedgerMetadata(ledgerHash);
return peerService.getQueryService(ledgerHash).getLedgerMetadata(ledgerHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/settings")
public LedgerBaseSettings getLedgerInitSettings(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
return gatewayQueryService.getLedgerBaseSettings(ledgerHash);
}
@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/settings")
public LedgerBaseSettings getLedgerInitSettings(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
return gatewayQueryService.getLedgerBaseSettings(ledgerHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks")
public LedgerBlock[] getBlocks(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
LedgerInfo ledgerInfo = peerService.getQueryService().getLedger(ledgerHash);
LedgerInfo ledgerInfo = peerService.getQueryService(ledgerHash).getLedger(ledgerHash);
long maxBlockHeight = ledgerInfo.getLatestBlockHeight();
List<LedgerBlock> ledgerBlocks = new ArrayList<>();
for (long blockHeight = maxBlockHeight; blockHeight > GENESIS_BLOCK_HEIGHT; blockHeight--) {
LedgerBlock ledgerBlock = peerService.getQueryService().getBlock(ledgerHash, blockHeight);
LedgerBlock ledgerBlock = peerService.getQueryService(ledgerHash).getBlock(ledgerHash, blockHeight);
ledgerBlocks.add(0, ledgerBlock);
if (ledgerBlocks.size() == BLOCK_MAX_DISPLAY) {
break;
}
}
// 最后增加创世区块
LedgerBlock genesisBlock = peerService.getQueryService().getBlock(ledgerHash, GENESIS_BLOCK_HEIGHT);
LedgerBlock genesisBlock = peerService.getQueryService(ledgerHash).getBlock(ledgerHash, GENESIS_BLOCK_HEIGHT);
ledgerBlocks.add(0, genesisBlock);
LedgerBlock[] blocks = new LedgerBlock[ledgerBlocks.size()];
ledgerBlocks.toArray(blocks);
@@ -120,181 +120,181 @@ public class BlockBrowserController implements BlockchainExtendQueryService {
@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}")
@Override
public LedgerBlock getBlock(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHeight") long blockHeight) {
return peerService.getQueryService().getBlock(ledgerHash, blockHeight);
@PathVariable(name = "blockHeight") long blockHeight) {
return peerService.getQueryService(ledgerHash).getBlock(ledgerHash, blockHeight);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}")
@Override
public LedgerBlock getBlock(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHash") HashDigest blockHash) {
return peerService.getQueryService().getBlock(ledgerHash, blockHash);
@PathVariable(name = "blockHash") HashDigest blockHash) {
return peerService.getQueryService(ledgerHash).getBlock(ledgerHash, blockHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/txs/count")
@Override
public long getTransactionCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHeight") long blockHeight) {
return peerService.getQueryService().getTransactionCount(ledgerHash, blockHeight);
@PathVariable(name = "blockHeight") long blockHeight) {
return peerService.getQueryService(ledgerHash).getTransactionCount(ledgerHash, blockHeight);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/txs/count")
@Override
public long getTransactionCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHash") HashDigest blockHash) {
return peerService.getQueryService().getTransactionCount(ledgerHash, blockHash);
@PathVariable(name = "blockHash") HashDigest blockHash) {
return peerService.getQueryService(ledgerHash).getTransactionCount(ledgerHash, blockHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/txs/count")
@Override
public long getTransactionTotalCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
return peerService.getQueryService().getTransactionTotalCount(ledgerHash);
return peerService.getQueryService(ledgerHash).getTransactionTotalCount(ledgerHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/accounts/count")
@Override
public long getDataAccountCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHeight") long blockHeight) {
return peerService.getQueryService().getDataAccountCount(ledgerHash, blockHeight);
@PathVariable(name = "blockHeight") long blockHeight) {
return peerService.getQueryService(ledgerHash).getDataAccountCount(ledgerHash, blockHeight);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/accounts/count")
@Override
public long getDataAccountCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHash") HashDigest blockHash) {
return peerService.getQueryService().getDataAccountCount(ledgerHash, blockHash);
@PathVariable(name = "blockHash") HashDigest blockHash) {
return peerService.getQueryService(ledgerHash).getDataAccountCount(ledgerHash, blockHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/accounts/count")
@Override
public long getDataAccountTotalCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
return peerService.getQueryService().getDataAccountTotalCount(ledgerHash);
return peerService.getQueryService(ledgerHash).getDataAccountTotalCount(ledgerHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/users/count")
@Override
public long getUserCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHeight") long blockHeight) {
return peerService.getQueryService().getUserCount(ledgerHash, blockHeight);
@PathVariable(name = "blockHeight") long blockHeight) {
return peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, blockHeight);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/users/count")
@Override
public long getUserCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHash") HashDigest blockHash) {
return peerService.getQueryService().getUserCount(ledgerHash, blockHash);
@PathVariable(name = "blockHash") HashDigest blockHash) {
return peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, blockHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/users/count")
@Override
public long getUserTotalCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
return peerService.getQueryService().getUserTotalCount(ledgerHash);
return peerService.getQueryService(ledgerHash).getUserTotalCount(ledgerHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/contracts/count")
@Override
public long getContractCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHeight") long blockHeight) {
return peerService.getQueryService().getContractCount(ledgerHash, blockHeight);
@PathVariable(name = "blockHeight") long blockHeight) {
return peerService.getQueryService(ledgerHash).getContractCount(ledgerHash, blockHeight);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/contracts/count")
@Override
public long getContractCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHash") HashDigest blockHash) {
return peerService.getQueryService().getContractCount(ledgerHash, blockHash);
@PathVariable(name = "blockHash") HashDigest blockHash) {
return peerService.getQueryService(ledgerHash).getContractCount(ledgerHash, blockHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/contracts/count")
@Override
public long getContractTotalCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
return peerService.getQueryService().getContractTotalCount(ledgerHash);
return peerService.getQueryService(ledgerHash).getContractTotalCount(ledgerHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/txs")
@Override
public LedgerTransaction[] getTransactions(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHeight") long blockHeight,
@RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex,
@RequestParam(name = "count", required = false, defaultValue = "-1") int count) {
return peerService.getQueryService().getTransactions(ledgerHash, blockHeight, fromIndex, count);
@PathVariable(name = "blockHeight") long blockHeight,
@RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex,
@RequestParam(name = "count", required = false, defaultValue = "-1") int count) {
return peerService.getQueryService(ledgerHash).getTransactions(ledgerHash, blockHeight, fromIndex, count);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/txs")
@Override
public LedgerTransaction[] getTransactions(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHash") HashDigest blockHash,
@RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex,
@RequestParam(name = "count", required = false, defaultValue = "-1") int count) {
return peerService.getQueryService().getTransactions(ledgerHash, blockHash, fromIndex, count);
@PathVariable(name = "blockHash") HashDigest blockHash,
@RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex,
@RequestParam(name = "count", required = false, defaultValue = "-1") int count) {
return peerService.getQueryService(ledgerHash).getTransactions(ledgerHash, blockHash, fromIndex, count);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/txs/hash/{contentHash}")
@Override
public LedgerTransaction getTransactionByContentHash(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "contentHash") HashDigest contentHash) {
return peerService.getQueryService().getTransactionByContentHash(ledgerHash, contentHash);
@PathVariable(name = "contentHash") HashDigest contentHash) {
return peerService.getQueryService(ledgerHash).getTransactionByContentHash(ledgerHash, contentHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/txs/state/{contentHash}")
@Override
public TransactionState getTransactionStateByContentHash(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "contentHash") HashDigest contentHash) {
return peerService.getQueryService().getTransactionStateByContentHash(ledgerHash, contentHash);
@PathVariable(name = "contentHash") HashDigest contentHash) {
return peerService.getQueryService(ledgerHash).getTransactionStateByContentHash(ledgerHash, contentHash);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/users/address/{address}")
@Override
public UserInfo getUser(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "address") String address) {
return peerService.getQueryService().getUser(ledgerHash, address);
@PathVariable(name = "address") String address) {
return peerService.getQueryService(ledgerHash).getUser(ledgerHash, address);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/accounts/address/{address}")
@Override
public BlockchainIdentity getDataAccount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "address") String address) {
@PathVariable(name = "address") String address) {

return peerService.getQueryService().getDataAccount(ledgerHash, address);
return peerService.getQueryService(ledgerHash).getDataAccount(ledgerHash, address);
}

@RequestMapping(method = { RequestMethod.GET,
RequestMethod.POST }, path = "ledgers/{ledgerHash}/accounts/{address}/entries")
@Override
public TypedKVEntry[] getDataEntries(@PathVariable("ledgerHash") HashDigest ledgerHash,
@PathVariable("address") String address, @RequestParam("keys") String... keys) {
return peerService.getQueryService().getDataEntries(ledgerHash, address, keys);
@PathVariable("address") String address, @RequestParam("keys") String... keys) {
return peerService.getQueryService(ledgerHash).getDataEntries(ledgerHash, address, keys);
}

@RequestMapping(method = { RequestMethod.GET,
RequestMethod.POST }, path = "ledgers/{ledgerHash}/accounts/{address}/entries-version")
@Override
public TypedKVEntry[] getDataEntries(@PathVariable("ledgerHash") HashDigest ledgerHash,
@PathVariable("address") String address, @RequestBody KVInfoVO kvInfoVO) {
return peerService.getQueryService().getDataEntries(ledgerHash, address, kvInfoVO);
@PathVariable("address") String address, @RequestBody KVInfoVO kvInfoVO) {
return peerService.getQueryService(ledgerHash).getDataEntries(ledgerHash, address, kvInfoVO);
}

@RequestMapping(method = { RequestMethod.GET,
RequestMethod.POST }, path = "ledgers/{ledgerHash}/accounts/address/{address}/entries")
@Override
public TypedKVEntry[] getDataEntries(@PathVariable("ledgerHash") HashDigest ledgerHash,
@PathVariable("address") String address,
@RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex,
@RequestParam(name = "count", required = false, defaultValue = "-1") int count) {
return peerService.getQueryService().getDataEntries(ledgerHash, address, fromIndex, count);
@PathVariable("address") String address,
@RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex,
@RequestParam(name = "count", required = false, defaultValue = "-1") int count) {
return peerService.getQueryService(ledgerHash).getDataEntries(ledgerHash, address, fromIndex, count);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/accounts/address/{address}/entries/count")
@Override
public long getDataEntriesTotalCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "address") String address) {
return peerService.getQueryService().getDataEntriesTotalCount(ledgerHash, address);
@PathVariable(name = "address") String address) {
return peerService.getQueryService(ledgerHash).getDataEntriesTotalCount(ledgerHash, address);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/contracts/address/{address}")
public ContractSettings getContractSettings(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "address") String address) {
ContractInfo contractInfo = peerService.getQueryService().getContract(ledgerHash, address);
@PathVariable(name = "address") String address) {
ContractInfo contractInfo = peerService.getQueryService(ledgerHash).getContract(ledgerHash, address);
return contractSettings(contractInfo);
}

@@ -308,30 +308,30 @@ public class BlockBrowserController implements BlockchainExtendQueryService {
return contractSettings;
}

// @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/contracts/address/{address}")
// @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/contracts/address/{address}")
@Override
public ContractInfo getContract(HashDigest ledgerHash, String address) {
return peerService.getQueryService().getContract(ledgerHash, address);
return peerService.getQueryService(ledgerHash).getContract(ledgerHash, address);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/latest")
@Override
public LedgerBlock getLatestBlock(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
long latestBlockHeight = peerService.getQueryService().getLedger(ledgerHash).getLatestBlockHeight();
return peerService.getQueryService().getBlock(ledgerHash, latestBlockHeight);
long latestBlockHeight = peerService.getQueryService(ledgerHash).getLedger(ledgerHash).getLatestBlockHeight();
return peerService.getQueryService(ledgerHash).getBlock(ledgerHash, latestBlockHeight);
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/txs/additional-count")
@Override
public long getAdditionalTransactionCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHeight") long blockHeight) {
@PathVariable(name = "blockHeight") long blockHeight) {
// 获取某个区块的交易总数
long currentBlockTxCount = peerService.getQueryService().getTransactionCount(ledgerHash, blockHeight);
long currentBlockTxCount = peerService.getQueryService(ledgerHash).getTransactionCount(ledgerHash, blockHeight);
if (blockHeight == GENESIS_BLOCK_HEIGHT) {
return currentBlockTxCount;
}
long lastBlockHeight = blockHeight - 1;
long lastBlockTxCount = peerService.getQueryService().getTransactionCount(ledgerHash, lastBlockHeight);
long lastBlockTxCount = peerService.getQueryService(ledgerHash).getTransactionCount(ledgerHash, lastBlockHeight);
// 当前区块交易数减上个区块交易数
return currentBlockTxCount - lastBlockTxCount;
}
@@ -339,14 +339,14 @@ public class BlockBrowserController implements BlockchainExtendQueryService {
@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/txs/additional-count")
@Override
public long getAdditionalTransactionCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHash") HashDigest blockHash) {
LedgerBlock currentBlock = peerService.getQueryService().getBlock(ledgerHash, blockHash);
long currentBlockTxCount = peerService.getQueryService().getTransactionCount(ledgerHash, blockHash);
@PathVariable(name = "blockHash") HashDigest blockHash) {
LedgerBlock currentBlock = peerService.getQueryService(ledgerHash).getBlock(ledgerHash, blockHash);
long currentBlockTxCount = peerService.getQueryService(ledgerHash).getTransactionCount(ledgerHash, blockHash);
if (currentBlock.getHeight() == GENESIS_BLOCK_HEIGHT) {
return currentBlockTxCount;
}
HashDigest previousHash = currentBlock.getPreviousHash();
long lastBlockTxCount = peerService.getQueryService().getTransactionCount(ledgerHash, previousHash);
long lastBlockTxCount = peerService.getQueryService(ledgerHash).getTransactionCount(ledgerHash, previousHash);
// 当前区块交易数减上个区块交易数
return currentBlockTxCount - lastBlockTxCount;
}
@@ -354,40 +354,40 @@ public class BlockBrowserController implements BlockchainExtendQueryService {
@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/txs/additional-count")
@Override
public long getAdditionalTransactionCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
LedgerInfo ledgerInfo = peerService.getQueryService().getLedger(ledgerHash);
LedgerInfo ledgerInfo = peerService.getQueryService(ledgerHash).getLedger(ledgerHash);
long maxBlockHeight = ledgerInfo.getLatestBlockHeight();
long totalCount = peerService.getQueryService().getTransactionTotalCount(ledgerHash);
long totalCount = peerService.getQueryService(ledgerHash).getTransactionTotalCount(ledgerHash);
if (maxBlockHeight == GENESIS_BLOCK_HEIGHT) { // 只有一个创世区块
return totalCount;
}
long lastTotalCount = peerService.getQueryService().getTransactionCount(ledgerHash, maxBlockHeight - 1);
long lastTotalCount = peerService.getQueryService(ledgerHash).getTransactionCount(ledgerHash, maxBlockHeight - 1);
return totalCount - lastTotalCount;
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/accounts/additional-count")
@Override
public long getAdditionalDataAccountCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHeight") long blockHeight) {
long currentDaCount = peerService.getQueryService().getDataAccountCount(ledgerHash, blockHeight);
@PathVariable(name = "blockHeight") long blockHeight) {
long currentDaCount = peerService.getQueryService(ledgerHash).getDataAccountCount(ledgerHash, blockHeight);
if (blockHeight == GENESIS_BLOCK_HEIGHT) {
return currentDaCount;
}
long lastBlockHeight = blockHeight - 1;
long lastDaCount = peerService.getQueryService().getDataAccountCount(ledgerHash, lastBlockHeight);
long lastDaCount = peerService.getQueryService(ledgerHash).getDataAccountCount(ledgerHash, lastBlockHeight);
return currentDaCount - lastDaCount;
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/accounts/additional-count")
@Override
public long getAdditionalDataAccountCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHash") HashDigest blockHash) {
LedgerBlock currentBlock = peerService.getQueryService().getBlock(ledgerHash, blockHash);
long currentBlockDaCount = peerService.getQueryService().getDataAccountCount(ledgerHash, blockHash);
@PathVariable(name = "blockHash") HashDigest blockHash) {
LedgerBlock currentBlock = peerService.getQueryService(ledgerHash).getBlock(ledgerHash, blockHash);
long currentBlockDaCount = peerService.getQueryService(ledgerHash).getDataAccountCount(ledgerHash, blockHash);
if (currentBlock.getHeight() == GENESIS_BLOCK_HEIGHT) {
return currentBlockDaCount;
}
HashDigest previousHash = currentBlock.getPreviousHash();
long lastBlockDaCount = peerService.getQueryService().getDataAccountCount(ledgerHash, previousHash);
long lastBlockDaCount = peerService.getQueryService(ledgerHash).getDataAccountCount(ledgerHash, previousHash);
// 当前区块数据账户数量减上个区块数据账户数量
return currentBlockDaCount - lastBlockDaCount;
}
@@ -395,40 +395,40 @@ public class BlockBrowserController implements BlockchainExtendQueryService {
@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/accounts/additional-count")
@Override
public long getAdditionalDataAccountCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
LedgerInfo ledgerInfo = peerService.getQueryService().getLedger(ledgerHash);
LedgerInfo ledgerInfo = peerService.getQueryService(ledgerHash).getLedger(ledgerHash);
long maxBlockHeight = ledgerInfo.getLatestBlockHeight();
long totalCount = peerService.getQueryService().getDataAccountTotalCount(ledgerHash);
long totalCount = peerService.getQueryService(ledgerHash).getDataAccountTotalCount(ledgerHash);
if (maxBlockHeight == GENESIS_BLOCK_HEIGHT) { // 只有一个创世区块
return totalCount;
}
long lastTotalCount = peerService.getQueryService().getDataAccountCount(ledgerHash, maxBlockHeight - 1);
long lastTotalCount = peerService.getQueryService(ledgerHash).getDataAccountCount(ledgerHash, maxBlockHeight - 1);
return totalCount - lastTotalCount;
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/users/additional-count")
@Override
public long getAdditionalUserCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHeight") long blockHeight) {
long currentUserCount = peerService.getQueryService().getUserCount(ledgerHash, blockHeight);
@PathVariable(name = "blockHeight") long blockHeight) {
long currentUserCount = peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, blockHeight);
if (blockHeight == GENESIS_BLOCK_HEIGHT) {
return currentUserCount;
}
long lastBlockHeight = blockHeight - 1;
long lastUserCount = peerService.getQueryService().getUserCount(ledgerHash, lastBlockHeight);
long lastUserCount = peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, lastBlockHeight);
return currentUserCount - lastUserCount;
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/users/additional-count")
@Override
public long getAdditionalUserCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHash") HashDigest blockHash) {
LedgerBlock currentBlock = peerService.getQueryService().getBlock(ledgerHash, blockHash);
long currentBlockUserCount = peerService.getQueryService().getUserCount(ledgerHash, blockHash);
@PathVariable(name = "blockHash") HashDigest blockHash) {
LedgerBlock currentBlock = peerService.getQueryService(ledgerHash).getBlock(ledgerHash, blockHash);
long currentBlockUserCount = peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, blockHash);
if (currentBlock.getHeight() == GENESIS_BLOCK_HEIGHT) {
return currentBlockUserCount;
}
HashDigest previousHash = currentBlock.getPreviousHash();
long lastBlockUserCount = peerService.getQueryService().getUserCount(ledgerHash, previousHash);
long lastBlockUserCount = peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, previousHash);
// 当前区块用户数量减上个区块用户数量
return currentBlockUserCount - lastBlockUserCount;
}
@@ -436,40 +436,40 @@ public class BlockBrowserController implements BlockchainExtendQueryService {
@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/users/additional-count")
@Override
public long getAdditionalUserCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
LedgerInfo ledgerInfo = peerService.getQueryService().getLedger(ledgerHash);
LedgerInfo ledgerInfo = peerService.getQueryService(ledgerHash).getLedger(ledgerHash);
long maxBlockHeight = ledgerInfo.getLatestBlockHeight();
long totalCount = peerService.getQueryService().getUserTotalCount(ledgerHash);
long totalCount = peerService.getQueryService(ledgerHash).getUserTotalCount(ledgerHash);
if (maxBlockHeight == GENESIS_BLOCK_HEIGHT) { // 只有一个创世区块
return totalCount;
}
long lastTotalCount = peerService.getQueryService().getUserCount(ledgerHash, maxBlockHeight - 1);
long lastTotalCount = peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, maxBlockHeight - 1);
return totalCount - lastTotalCount;
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/contracts/additional-count")
@Override
public long getAdditionalContractCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHeight") long blockHeight) {
long currentContractCount = peerService.getQueryService().getContractCount(ledgerHash, blockHeight);
@PathVariable(name = "blockHeight") long blockHeight) {
long currentContractCount = peerService.getQueryService(ledgerHash).getContractCount(ledgerHash, blockHeight);
if (blockHeight == GENESIS_BLOCK_HEIGHT) {
return currentContractCount;
}
long lastBlockHeight = blockHeight - 1;
long lastContractCount = peerService.getQueryService().getUserCount(ledgerHash, lastBlockHeight);
long lastContractCount = peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, lastBlockHeight);
return currentContractCount - lastContractCount;
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/contracts/additional-count")
@Override
public long getAdditionalContractCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@PathVariable(name = "blockHash") HashDigest blockHash) {
LedgerBlock currentBlock = peerService.getQueryService().getBlock(ledgerHash, blockHash);
long currentBlockContractCount = peerService.getQueryService().getContractCount(ledgerHash, blockHash);
@PathVariable(name = "blockHash") HashDigest blockHash) {
LedgerBlock currentBlock = peerService.getQueryService(ledgerHash).getBlock(ledgerHash, blockHash);
long currentBlockContractCount = peerService.getQueryService(ledgerHash).getContractCount(ledgerHash, blockHash);
if (currentBlock.getHeight() == GENESIS_BLOCK_HEIGHT) {
return currentBlockContractCount;
}
HashDigest previousHash = currentBlock.getPreviousHash();
long lastBlockContractCount = peerService.getQueryService().getUserCount(ledgerHash, previousHash);
long lastBlockContractCount = peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, previousHash);
// 当前区块合约数量减上个区块合约数量
return currentBlockContractCount - lastBlockContractCount;
}
@@ -477,13 +477,13 @@ public class BlockBrowserController implements BlockchainExtendQueryService {
@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/contracts/additional-count")
@Override
public long getAdditionalContractCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
LedgerInfo ledgerInfo = peerService.getQueryService().getLedger(ledgerHash);
LedgerInfo ledgerInfo = peerService.getQueryService(ledgerHash).getLedger(ledgerHash);
long maxBlockHeight = ledgerInfo.getLatestBlockHeight();
long totalCount = peerService.getQueryService().getContractTotalCount(ledgerHash);
long totalCount = peerService.getQueryService(ledgerHash).getContractTotalCount(ledgerHash);
if (maxBlockHeight == GENESIS_BLOCK_HEIGHT) { // 只有一个创世区块
return totalCount;
}
long lastTotalCount = peerService.getQueryService().getContractCount(ledgerHash, maxBlockHeight - 1);
long lastTotalCount = peerService.getQueryService(ledgerHash).getContractCount(ledgerHash, maxBlockHeight - 1);
return totalCount - lastTotalCount;
}

@@ -569,14 +569,14 @@ public class BlockBrowserController implements BlockchainExtendQueryService {

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/participants/count")
public int getConsensusParticipantCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) {
return peerService.getQueryService().getConsensusParticipants(ledgerHash).length;
return peerService.getQueryService(ledgerHash).getConsensusParticipants(ledgerHash).length;
}

// @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/participants")
// public ParticipantNode[] getConsensusParticipants(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
// @RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex,
// @RequestParam(name = "count", required = false, defaultValue = "-1") int count) {
//
//
// ParticipantNode participantNode[] = peerService.getQueryService().getConsensusParticipants(ledgerHash);
// int indexAndCount[] = QueryUtil.calFromIndexAndCount(fromIndex, count, participantNode.length);
// ParticipantNode participantNodesNew[] = Arrays.copyOfRange(participantNode, indexAndCount[0],
@@ -586,7 +586,7 @@ public class BlockBrowserController implements BlockchainExtendQueryService {

/**
* get more users by fromIndex and count;
*
*
* @param ledgerHash
* @param fromIndex
* @param count
@@ -595,25 +595,25 @@ public class BlockBrowserController implements BlockchainExtendQueryService {
@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/users")
@Override
public BlockchainIdentity[] getUsers(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex,
@RequestParam(name = "count", required = false, defaultValue = "-1") int count) {
return revertAccountHeader(peerService.getQueryService().getUsers(ledgerHash, fromIndex, count));
@RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex,
@RequestParam(name = "count", required = false, defaultValue = "-1") int count) {
return revertAccountHeader(peerService.getQueryService(ledgerHash).getUsers(ledgerHash, fromIndex, count));
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/accounts")
@Override
public BlockchainIdentity[] getDataAccounts(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex,
@RequestParam(name = "count", required = false, defaultValue = "-1") int count) {
return revertAccountHeader(peerService.getQueryService().getDataAccounts(ledgerHash, fromIndex, count));
@RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex,
@RequestParam(name = "count", required = false, defaultValue = "-1") int count) {
return revertAccountHeader(peerService.getQueryService(ledgerHash).getDataAccounts(ledgerHash, fromIndex, count));
}

@RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/contracts")
@Override
public BlockchainIdentity[] getContractAccounts(@PathVariable(name = "ledgerHash") HashDigest ledgerHash,
@RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex,
@RequestParam(name = "count", required = false, defaultValue = "-1") int count) {
return revertAccountHeader(peerService.getQueryService().getContractAccounts(ledgerHash, fromIndex, count));
@RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex,
@RequestParam(name = "count", required = false, defaultValue = "-1") int count) {
return revertAccountHeader(peerService.getQueryService(ledgerHash).getContractAccounts(ledgerHash, fromIndex, count));
}

/**


+ 91
- 0
source/gateway/src/main/java/com/jd/blockchain/gateway/web/GatewayLedgerLoadTimer.java View File

@@ -0,0 +1,91 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.gateway.web.GatewayTimeTasks
* Author: shaozhuguang
* Department: 区块链研发部
* Date: 2019/1/16 下午6:17
* Description:
*/
package com.jd.blockchain.gateway.web;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.jd.blockchain.gateway.PeerConnector;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
*
* @author shaozhuguang
* @create 2019/1/16
* @since 1.0.0
*/
@Component
@EnableScheduling
public class GatewayLedgerLoadTimer {

private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(GatewayLedgerLoadTimer.class);

private static final ExecutorService LEDGER_LOAD_EXECUTOR = initLedgerLoadExecutor();

private static final Lock LOCK = new ReentrantLock();

@Autowired
private PeerConnector peerConnector;

/**
* 账本加载许可,主要作用两个
* 1、防止启动时加载账本与当前定时器加载冲突
* 2、每次加载完成后释放许可,以便于下次定时任务加载,若不存在许可,则下次定时任务放弃执行
*/
private Semaphore loadSemaphore = new Semaphore(0);

//每1钟执行一次
@Scheduled(cron = "0 */1 * * * * ")
public void ledgerLoad(){
boolean acquire = false;
LOCK.lock();
try {
// 5秒内获取授权
acquire = loadSemaphore.tryAcquire(5, TimeUnit.SECONDS);
if (acquire) {
// 授权成功的情况下,进行单线程重连
LEDGER_LOAD_EXECUTOR.execute(() -> {
peerConnector.monitorAndReconnect();
});
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
} finally {
if (acquire) {
// 授权成功的情况下,释放该许可
release();
}
LOCK.unlock();
}
}

/**
* 释放许可
*/
public void release() {
loadSemaphore.release();
}

private static ThreadPoolExecutor initLedgerLoadExecutor() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("gateway-ledger-loader-%d").build();

return new ThreadPoolExecutor(1, 1,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
}

+ 0
- 46
source/gateway/src/main/java/com/jd/blockchain/gateway/web/GatewayTimeTasks.java View File

@@ -1,46 +0,0 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.gateway.web.GatewayTimeTasks
* Author: shaozhuguang
* Department: 区块链研发部
* Date: 2019/1/16 下午6:17
* Description:
*/
package com.jd.blockchain.gateway.web;

import com.jd.blockchain.consensus.service.NodeServer;
import com.jd.blockchain.crypto.HashDigest;
import com.jd.blockchain.gateway.PeerConnector;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.*;

/**
*
* @author shaozhuguang
* @create 2019/1/16
* @since 1.0.0
*/
@Component
@EnableScheduling
public class GatewayTimeTasks {

private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(GatewayTimeTasks.class);

@Autowired
private PeerConnector peerConnector;

//每30分钟执行一次
@Scheduled(cron = "0 */8 * * * * ")
public void updateLedger(){
try {
peerConnector.reconnect();
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}
}

+ 25
- 6
source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/service/PeerServiceProxy.java View File

@@ -9,15 +9,13 @@ import com.jd.blockchain.sdk.proxy.BlockchainServiceProxy;
import com.jd.blockchain.transaction.BlockchainQueryService;
import com.jd.blockchain.transaction.TransactionService;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 对共识节点的区块链服务代理;
*
*
* @author huanghaiquan
*
*/
@@ -42,8 +40,8 @@ public class PeerServiceProxy extends BlockchainServiceProxy implements Transact
}

public void addLedgerAccessContexts(LedgerAccessContext[] accessAbleLedgers) {
accessLock.lock();
try {
accessLock.lock();
if (this.ledgerAccessContexts == null) {
throw new IllegalArgumentException("LedgerAccessContexts is null, you need init first !!!");
}
@@ -81,7 +79,28 @@ public class PeerServiceProxy extends BlockchainServiceProxy implements Transact
}

/**
* 处理网关的交易转发;
* 直接获取账本信息
* 不通过内部缓存
*
* @return
*/
public HashDigest[] getLedgerHashsDirect() {
Set<HashDigest> ledgerHashs = new HashSet<>();
if (ledgerAccessContexts != null && !ledgerAccessContexts.isEmpty()) {
Collection<LedgerAccessContext> ctxs = ledgerAccessContexts.values();
for (LedgerAccessContext ctx : ctxs) {
HashDigest[] hashs = ctx.getQueryService().getLedgerHashs();
ledgerHashs.addAll(Arrays.asList(hashs));
}
}
if (ledgerHashs.isEmpty()) {
return new HashDigest[0];
}
return ledgerHashs.toArray(new HashDigest[ledgerHashs.size()]);
}

/**
* 处理网关的交易转发;
*/
@Override
public TransactionResponse process(TransactionRequest txRequest) {


Loading…
Cancel
Save