diff --git a/source/gateway/pom.xml b/source/gateway/pom.xml index 2317e3f4..d181be35 100644 --- a/source/gateway/pom.xml +++ b/source/gateway/pom.xml @@ -77,6 +77,11 @@ commons-io + + com.google.guava + guava + + org.springframework.boot spring-boot-starter-web diff --git a/source/gateway/src/main/java/com/jd/blockchain/gateway/GatewayConfigProperties.java b/source/gateway/src/main/java/com/jd/blockchain/gateway/GatewayConfigProperties.java index 5a836a41..7ae2bdb4 100644 --- a/source/gateway/src/main/java/com/jd/blockchain/gateway/GatewayConfigProperties.java +++ b/source/gateway/src/main/java/com/jd/blockchain/gateway/GatewayConfigProperties.java @@ -2,14 +2,14 @@ package com.jd.blockchain.gateway; import java.io.File; import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; +import java.util.*; +import com.jd.blockchain.utils.IllegalDataException; import com.jd.blockchain.utils.io.FileUtils; import com.jd.blockchain.utils.net.NetworkAddress; public class GatewayConfigProperties { + // HTTP协议相关配置项的键的前缀; public static final String HTTP_PREFIX = "http."; // 网关的HTTP服务地址; @@ -21,12 +21,14 @@ public class GatewayConfigProperties { // 共识相关配置项的键的前缀; public static final String PEER_PREFIX = "peer."; + // 共识节点的数量 + public static final String PEER_SIZE = PEER_PREFIX + "size"; // 共识节点的服务地址; - public static final String PEER_HOST = PEER_PREFIX + "host"; + public static final String PEER_HOST_FORMAT = PEER_PREFIX + "%s.host"; // 共识节点的服务端口; - public static final String PEER_PORT = PEER_PREFIX + "port"; + public static final String PEER_PORT_FORMAT = PEER_PREFIX + "%s.port"; // 共识节点的服务是否启用安全证书; - public static final String PEER_SECURE = PEER_PREFIX + "secure"; + public static final String PEER_SECURE_FORMAT = PEER_PREFIX + "%s.secure"; // 支持共识的Provider列表,以英文逗号分隔 public static final String PEER_PROVIDERS = PEER_PREFIX + "providers"; @@ -52,7 +54,7 @@ public class GatewayConfigProperties { private ProviderConfig providerConfig = new ProviderConfig(); - private NetworkAddress masterPeerAddress; + private Set masterPeerAddresses = new HashSet<>(); private String dataRetrievalUrl; private String schemaRetrievalUrl; @@ -63,8 +65,8 @@ public class GatewayConfigProperties { return http; } - public NetworkAddress masterPeerAddress() { - return masterPeerAddress; + public Set masterPeerAddresses() { + return masterPeerAddresses; } public String dataRetrievalUrl() { @@ -87,11 +89,11 @@ public class GatewayConfigProperties { return providerConfig; } - public void setMasterPeerAddress(NetworkAddress peerAddress) { + public void addMasterPeerAddress(NetworkAddress peerAddress) { if (peerAddress == null) { throw new IllegalArgumentException("peerAddress is null!"); } - this.masterPeerAddress = peerAddress; + this.masterPeerAddresses.add(peerAddress); } public KeysConfig keys() { @@ -122,11 +124,16 @@ public class GatewayConfigProperties { configProps.http.port = getInt(props, HTTP_PORT, true); configProps.http.contextPath = getProperty(props, HTTP_CONTEXT_PATH, false); - String peerHost = getProperty(props, PEER_HOST, true); - int peerPort = getInt(props, PEER_PORT, true); - boolean peerSecure = getBoolean(props, PEER_SECURE, false); - configProps.masterPeerAddress = new NetworkAddress(peerHost, peerPort, peerSecure); - + int peerSize = getInt(props, PEER_SIZE, true); + if (peerSize <= 0) { + throw new IllegalDataException("Peer size is illegal !!!"); + } + for (int i = 0; i < peerSize; i++) { + String peerHost = getProperty(props, String.format(PEER_HOST_FORMAT, i), true); + int peerPort = getInt(props, String.format(PEER_PORT_FORMAT, i), true); + boolean peerSecure = getBoolean(props, String.format(PEER_SECURE_FORMAT, i), false); + configProps.addMasterPeerAddress(new NetworkAddress(peerHost, peerPort, peerSecure)); + } String dataRetrievalUrl = getProperty(props, DATA_RETRIEVAL_URL, true); configProps.dataRetrievalUrl = dataRetrievalUrl; @@ -211,7 +218,7 @@ public class GatewayConfigProperties { private HttpConfig() { } - + private HttpConfig(String host, int port, String contextPath) { this.host = host; this.port = port; @@ -221,7 +228,7 @@ public class GatewayConfigProperties { public String getHost() { return host; } - + public void setHost(String host) { this.host = host; } @@ -229,7 +236,7 @@ public class GatewayConfigProperties { public int getPort() { return port; } - + public void setPort(int port) { this.port = port; } @@ -237,7 +244,7 @@ public class GatewayConfigProperties { public String getContextPath() { return contextPath; } - + public void setContextPath(String contextPath) { this.contextPath = contextPath; } @@ -265,7 +272,7 @@ public class GatewayConfigProperties { public String getPrivKeyPath() { return privKeyPath; } - + public String getPrivKeyValue() { return privKeyValue; } @@ -293,7 +300,7 @@ public class GatewayConfigProperties { public void setPrivKeyPassword(String privKeyPassword) { this.privKeyPassword = privKeyPassword; } - + } } diff --git a/source/gateway/src/main/java/com/jd/blockchain/gateway/GatewayServerBooter.java b/source/gateway/src/main/java/com/jd/blockchain/gateway/GatewayServerBooter.java index f743b4ac..e8954dab 100644 --- a/source/gateway/src/main/java/com/jd/blockchain/gateway/GatewayServerBooter.java +++ b/source/gateway/src/main/java/com/jd/blockchain/gateway/GatewayServerBooter.java @@ -5,7 +5,10 @@ import java.io.InputStream; import java.net.URL; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import com.jd.blockchain.gateway.web.GatewayLedgerLoadTimer; +import com.jd.blockchain.utils.net.NetworkAddress; import org.apache.commons.io.FileUtils; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; @@ -82,8 +85,6 @@ public class GatewayServerBooter { ConsoleUtils.info("Starting web server......"); GatewayServerBooter booter = new GatewayServerBooter(configProps,springConfigLocation); booter.start(); - - ConsoleUtils.info("Peer[%s] is connected success!", configProps.masterPeerAddress().toString()); } catch (Exception e) { ConsoleUtils.error("Error!! %s", e.getMessage()); if (debug) { @@ -131,8 +132,20 @@ public class GatewayServerBooter { blockBrowserController.setDataRetrievalUrl(config.dataRetrievalUrl()); blockBrowserController.setSchemaRetrievalUrl(config.getSchemaRetrievalUrl()); PeerConnector peerConnector = appCtx.getBean(PeerConnector.class); - peerConnector.connect(config.masterPeerAddress(), defaultKeyPair, config.providerConfig().getProviders()); - ConsoleUtils.info("Peer[%s] is connected success!", config.masterPeerAddress().toString()); + + Set peerAddresses = config.masterPeerAddresses(); + StringBuilder peerAddressBuf = new StringBuilder(); + for (NetworkAddress peerAddress : peerAddresses) { + peerConnector.connect(peerAddress, defaultKeyPair, config.providerConfig().getProviders()); + if (peerAddressBuf.length() > 0) { + peerAddressBuf.append(","); + } + peerAddressBuf.append(peerAddress.toString()); + } + // 不管连接是否成功,都需要释放许可 + GatewayLedgerLoadTimer loadTimer = appCtx.getBean(GatewayLedgerLoadTimer.class); + loadTimer.release(); + ConsoleUtils.info("Peer[%s] is connected success!", peerAddressBuf.toString()); } public synchronized void close() { diff --git a/source/gateway/src/main/java/com/jd/blockchain/gateway/PeerConnector.java b/source/gateway/src/main/java/com/jd/blockchain/gateway/PeerConnector.java index fafeff1e..a8a5f5ad 100644 --- a/source/gateway/src/main/java/com/jd/blockchain/gateway/PeerConnector.java +++ b/source/gateway/src/main/java/com/jd/blockchain/gateway/PeerConnector.java @@ -4,17 +4,47 @@ import com.jd.blockchain.crypto.AsymmetricKeypair; import com.jd.blockchain.utils.net.NetworkAddress; import java.util.List; +import java.util.Set; public interface PeerConnector { - - NetworkAddress getPeerAddress(); - + + /** + * 获取Peer地址列表 + * + * @return + */ + Set getPeerAddresses(); + + /** + * 是否连接成功 + * + * @return + */ boolean isConnected(); - + + /** + * 连接至指定Peer节点 + * + * @param peerAddress + * Peer地址 + * @param defaultKeyPair + * 连接Peer所需公私钥信息 + * @param peerProviders + * 支持的Provider解析列表 + */ void connect(NetworkAddress peerAddress, AsymmetricKeypair defaultKeyPair, List peerProviders); - void reconnect(); - + /** + * 监控重连,判断是否需要更新账本信息,再进行重连操作 + * Peer地址及其他信息见${@link PeerConnector#connect(com.jd.blockchain.utils.net.NetworkAddress, com.jd.blockchain.crypto.AsymmetricKeypair, java.util.List)} + * + */ + void monitorAndReconnect(); + + /** + * 关闭连接 + * + */ void close(); } diff --git a/source/gateway/src/main/java/com/jd/blockchain/gateway/PeerService.java b/source/gateway/src/main/java/com/jd/blockchain/gateway/PeerService.java index b3fd6065..76f8c645 100644 --- a/source/gateway/src/main/java/com/jd/blockchain/gateway/PeerService.java +++ b/source/gateway/src/main/java/com/jd/blockchain/gateway/PeerService.java @@ -1,12 +1,31 @@ package com.jd.blockchain.gateway; +import com.jd.blockchain.crypto.HashDigest; import com.jd.blockchain.transaction.BlockchainQueryService; import com.jd.blockchain.transaction.TransactionService; public interface PeerService { - + + /** + * 获取账本数量最多的查询器 + * + * @return + */ BlockchainQueryService getQueryService(); - + + /** + * 获取某个账本中区块高度最高的查询器 + * + * @param ledgerHash + * @return + */ + BlockchainQueryService getQueryService(HashDigest ledgerHash); + + /** + * 获取交易处理器 + * + * @return + */ TransactionService getTransactionService(); } diff --git a/source/gateway/src/main/java/com/jd/blockchain/gateway/service/PeerConnectionManager.java b/source/gateway/src/main/java/com/jd/blockchain/gateway/service/PeerConnectionManager.java index ba17036b..83fcee05 100644 --- a/source/gateway/src/main/java/com/jd/blockchain/gateway/service/PeerConnectionManager.java +++ b/source/gateway/src/main/java/com/jd/blockchain/gateway/service/PeerConnectionManager.java @@ -2,6 +2,11 @@ package com.jd.blockchain.gateway.service; import javax.annotation.PreDestroy; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.jd.blockchain.crypto.HashDigest; +import com.jd.blockchain.sdk.BlockchainService; +import com.jd.blockchain.sdk.service.PeerServiceProxy; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import com.jd.blockchain.crypto.AsymmetricKeypair; @@ -12,76 +17,171 @@ import com.jd.blockchain.transaction.BlockchainQueryService; import com.jd.blockchain.transaction.TransactionService; import com.jd.blockchain.utils.net.NetworkAddress; -import java.util.List; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Component public class PeerConnectionManager implements PeerService, PeerConnector { - private volatile PeerBlockchainServiceFactory peerServiceFactory; + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(PeerConnectionManager.class); - private volatile NetworkAddress peerAddress; + /** + * 30秒更新一次最新的情况 + */ + private static final long PERIOD_SECONDS = 30L; + + private final ScheduledThreadPoolExecutor peerConnectExecutor; + + private final Set localLedgerCache = new HashSet<>(); + + private final Lock ledgerHashLock = new ReentrantLock(); + + private Map peerBlockchainServiceFactories = new ConcurrentHashMap<>(); + + private Map latestPeerServiceFactories = new ConcurrentHashMap<>(16); + + private Set peerAddresses = new HashSet<>(); + + private volatile PeerServiceFactory mostLedgerPeerServiceFactory; + + private volatile PeerBlockchainServiceFactory masterPeerServiceFactory; private volatile AsymmetricKeypair gateWayKeyPair; private volatile List peerProviders; + private volatile EventListener eventListener; + + public PeerConnectionManager() { + peerConnectExecutor = scheduledThreadPoolExecutor(); + executorStart(); + } + @Override - public NetworkAddress getPeerAddress() { - return peerAddress; + public Set getPeerAddresses() { + return peerAddresses; } @Override public boolean isConnected() { - return peerServiceFactory != null; + return !peerBlockchainServiceFactories.isEmpty(); } @Override public synchronized void connect(NetworkAddress peerAddress, AsymmetricKeypair defaultKeyPair, List peerProviders) { - if (isConnected()) { - if (this.peerAddress.equals(peerAddress)) { - return; + if (peerAddresses.contains(peerAddress)) { + return; + } + // 连接成功的话,更新账本 + ledgerHashLock.lock(); + try { + addPeerAddress(peerAddress); + setGateWayKeyPair(defaultKeyPair); + setPeerProviders(peerProviders); + + PeerBlockchainServiceFactory peerServiceFactory = PeerBlockchainServiceFactory.connect(defaultKeyPair, peerAddress, peerProviders); + if (peerServiceFactory != null) { + LOGGER.error("Connect peer {} success !!!", peerAddress); + // 连接成功 + if (masterPeerServiceFactory == null) { + masterPeerServiceFactory = peerServiceFactory; + LOGGER.error("Master remote update to {}", peerAddress); + } + if (mostLedgerPeerServiceFactory == null) { + // 默认设置为第一个连接成功的,后续更新需要等待定时任务处理 + mostLedgerPeerServiceFactory = new PeerServiceFactory(peerAddress, peerServiceFactory); + LOGGER.error("Most ledgers remote update to {}", peerAddress); + } + peerBlockchainServiceFactories.put(peerAddress, peerServiceFactory); + updateLedgerCache(); } - throw new IllegalArgumentException( - "This gateway has been connected to a peer, cann't be connected to another peer before closing it!"); + } finally { + // 连接成功的话,更新账本 + ledgerHashLock.unlock(); } - setPeerAddress(peerAddress); - setGateWayKeyPair(defaultKeyPair); - setPeerProviders(peerProviders); - // TODO: 未实现运行时出错时动态重连; - peerServiceFactory = PeerBlockchainServiceFactory.connect(defaultKeyPair, peerAddress, peerProviders); } @Override - public synchronized void reconnect() { - if (!isConnected()) { - throw new IllegalArgumentException( - "This gateway has not connected to a peer, please connect it first!!!"); + public void monitorAndReconnect() { + if (getPeerAddresses().isEmpty()) { + throw new IllegalArgumentException("Peer addresses must be init first !!!"); + } + /** + * 1、首先判断是否之前连接成功过,若未成功则重连,走auth逻辑 + * 2、若成功,则判断对端节点的账本与当前账本是否一致,有新增的情况下重连 + */ + ledgerHashLock.lock(); + try { + if (isConnected()) { + // 已连接成功,判断账本信息 + PeerServiceFactory serviceFactory = mostLedgerPeerServiceFactory; + if (serviceFactory == null) { + // 等待被更新 + return; + } + BlockchainQueryService queryService = serviceFactory.serviceFactory.getBlockchainService(); + NetworkAddress peerAddress = serviceFactory.peerAddress; + + HashDigest[] peerLedgerHashs = queryService.getLedgerHashs(); + if (peerLedgerHashs != null && peerLedgerHashs.length > 0) { + boolean haveNewLedger = false; + for (HashDigest hash : peerLedgerHashs) { + if (!localLedgerCache.contains(hash)) { + haveNewLedger = true; + break; + } + } + if (haveNewLedger) { + // 有新账本的情况下重连,并更新本地账本 + PeerBlockchainServiceFactory peerServiceFactory = PeerBlockchainServiceFactory.connect( + gateWayKeyPair, peerAddress, peerProviders); + peerBlockchainServiceFactories.put(peerAddress, peerServiceFactory); + localLedgerCache.addAll(Arrays.asList(peerLedgerHashs)); + } + } + } + // 未连接成功的情况下不处理,等待定时连接线程来处理 + } finally { + ledgerHashLock.unlock(); } - peerServiceFactory = PeerBlockchainServiceFactory.connect(gateWayKeyPair, peerAddress, peerProviders); } @Override public void close() { - PeerBlockchainServiceFactory serviceFactory = this.peerServiceFactory; - if (serviceFactory != null) { - this.peerServiceFactory = null; - this.peerAddress = null; - serviceFactory.close(); + for (Map.Entry entry : peerBlockchainServiceFactories.entrySet()) { + PeerBlockchainServiceFactory serviceFactory = entry.getValue(); + if (serviceFactory != null) { + serviceFactory.close(); + } } + peerBlockchainServiceFactories.clear(); } @Override public BlockchainQueryService getQueryService() { - PeerBlockchainServiceFactory serviceFactory = this.peerServiceFactory; + // 查询选择最新的连接Factory + PeerServiceFactory serviceFactory = this.mostLedgerPeerServiceFactory; if (serviceFactory == null) { throw new IllegalStateException("Peer connection was closed!"); } + return serviceFactory.serviceFactory.getBlockchainService(); + } + + @Override + public BlockchainQueryService getQueryService(HashDigest ledgerHash) { + PeerBlockchainServiceFactory serviceFactory = latestPeerServiceFactories.get(ledgerHash); + if (serviceFactory == null) { + return getQueryService(); + } return serviceFactory.getBlockchainService(); } @Override public TransactionService getTransactionService() { - PeerBlockchainServiceFactory serviceFactory = this.peerServiceFactory; + // 交易始终使用第一个连接成功的即可 + PeerBlockchainServiceFactory serviceFactory = this.masterPeerServiceFactory; if (serviceFactory == null) { throw new IllegalStateException("Peer connection was closed!"); } @@ -94,8 +194,8 @@ public class PeerConnectionManager implements PeerService, PeerConnector { close(); } - public void setPeerAddress(NetworkAddress peerAddress) { - this.peerAddress = peerAddress; + public void addPeerAddress(NetworkAddress peerAddress) { + this.peerAddresses.add(peerAddress); } public void setGateWayKeyPair(AsymmetricKeypair gateWayKeyPair) { @@ -105,4 +205,186 @@ public class PeerConnectionManager implements PeerService, PeerConnector { public void setPeerProviders(List peerProviders) { this.peerProviders = peerProviders; } + + /** + * 更新本地账本缓存 + */ + private void updateLedgerCache() { + if (isConnected()) { + HashDigest[] peerLedgerHashs = getQueryService().getLedgerHashs(); + if (peerLedgerHashs != null && peerLedgerHashs.length > 0) { + localLedgerCache.addAll(Arrays.asList(peerLedgerHashs)); + } + } + } + + /** + * 创建定时线程池 + * @return + */ + private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor() { + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("peer-connect-%d").build(); + return new ScheduledThreadPoolExecutor(1, + threadFactory, + new ThreadPoolExecutor.AbortPolicy()); + } + + private void executorStart() { + // 定时任务处理线程 + peerConnectExecutor.scheduleAtFixedRate(new PeerConnectRunner(), 0, PERIOD_SECONDS, TimeUnit.SECONDS); + } + + private class PeerServiceFactory { + + private NetworkAddress peerAddress; + + private PeerBlockchainServiceFactory serviceFactory; + + PeerServiceFactory(NetworkAddress peerAddress, PeerBlockchainServiceFactory serviceFactory) { + this.peerAddress = peerAddress; + this.serviceFactory = serviceFactory; + } + } + + private class PeerConnectRunner implements Runnable { + + @Override + public void run() { + // 包括几部分工作 + // 1、重连没有连接成功的Peer; + // 2、从已经连接成功的Peer节点获取账本数量和最新的区块高度 + // 3、根据目前的情况更新缓存 + ledgerHashLock.lock(); + try { + reconnect(); + // 更新账本数量最多的节点连接 + HashDigest[] ledgerHashs = updateMostLedgerPeerServiceFactory(); + if (ledgerHashs != null) { + LOGGER.info("Most ledgers remote update to {}", mostLedgerPeerServiceFactory.peerAddress); + // 更新每个账本对应获取最高区块的缓存 + updateLatestPeerServiceFactories(ledgerHashs); + } + } catch (Exception e) { + LOGGER.error("Peer Connect Task Error !!!", e); + } finally { + ledgerHashLock.unlock(); + } + } + + /** + * 更新可获取最新区块的连接工厂 + * + * @param ledgerHashs + * 账本列表 + */ + private void updateLatestPeerServiceFactories(HashDigest[] ledgerHashs) { + Map blockHeightServiceFactories = new HashMap<>(); + for (HashDigest ledgerHash : ledgerHashs) { + long blockHeight = -1L; + PeerBlockchainServiceFactory serviceFactory = latestPeerServiceFactories.get(ledgerHash); + try { + if (serviceFactory != null) { + blockHeight = serviceFactory.getBlockchainService() + .getLedger(ledgerHash).getLatestBlockHeight(); + blockHeightServiceFactories.put(ledgerHash, serviceFactory); + } + } catch (Exception e) { + latestPeerServiceFactories.remove(ledgerHash); + serviceFactory = null; + LOGGER.error("Peer get latest block height fail !!!", e); + } + + // 查询其他所有节点对应的区块高度的情况 + NetworkAddress defaultPeerAddress = null, latestPeerAddress = null; + for (Map.Entry entry : peerBlockchainServiceFactories.entrySet()) { + PeerBlockchainServiceFactory sf = entry.getValue(); + if (sf != serviceFactory) { + try { + long latestBlockHeight = sf.getBlockchainService().getLedger(ledgerHash).getLatestBlockHeight(); + if (latestBlockHeight > blockHeight) { + latestPeerAddress = entry.getKey(); + blockHeightServiceFactories.put(ledgerHash, sf); + } + blockHeight = Math.max(latestBlockHeight, blockHeight); + } catch (Exception e) { + LOGGER.error(String.format("Peer[%s] get ledger[%s]'s latest block height fail !!!", + entry.getKey(), ledgerHash.toBase58()), e); + } + } else { + defaultPeerAddress = entry.getKey(); + } + } + LOGGER.info("Ledger[{}]'s master remote update to {}", ledgerHash.toBase58(), + latestPeerAddress == null ? defaultPeerAddress : latestPeerAddress); + } + // 更新结果集 + latestPeerServiceFactories.putAll(blockHeightServiceFactories); + } + + /** + * 之前未连接成功的Peer节点进行重连操作 + * + */ + private void reconnect() { + for (NetworkAddress peerAddress : peerAddresses) { + if (!peerBlockchainServiceFactories.containsKey(peerAddress)) { + // 重连指定节点 + try { + PeerBlockchainServiceFactory peerServiceFactory = PeerBlockchainServiceFactory.connect(gateWayKeyPair, peerAddress, peerProviders); + if (peerServiceFactory != null) { + peerBlockchainServiceFactories.put(peerAddress, peerServiceFactory); + } + } catch (Exception e) { + LOGGER.error(String.format("Reconnect %s fail !!!", peerAddress), e); + } + } + } + } + + private HashDigest[] updateMostLedgerPeerServiceFactory() { + int ledgerSize = -1; + if (mostLedgerPeerServiceFactory == null) { + return null; + } + HashDigest[] ledgerHashs = null; + BlockchainService blockchainService = mostLedgerPeerServiceFactory.serviceFactory.getBlockchainService(); + try { + if (blockchainService instanceof PeerServiceProxy) { + ledgerHashs = ((PeerServiceProxy) blockchainService).getLedgerHashsDirect(); + if (ledgerHashs != null) { + ledgerSize = ledgerHashs.length; + } + } + } catch (Exception e) { + // 连接失败的情况下清除该连接 + LOGGER.error(String.format("Connect %s fail !!!", mostLedgerPeerServiceFactory.peerAddress), e); + peerBlockchainServiceFactories.remove(mostLedgerPeerServiceFactory.peerAddress); + mostLedgerPeerServiceFactory = null; + blockchainService = null; + } + PeerServiceFactory tempMostLedgerPeerServiceFactory = mostLedgerPeerServiceFactory; + + // 遍历,获取对应端的账本数量及最新的区块高度 + for (Map.Entry entry : peerBlockchainServiceFactories.entrySet()) { + BlockchainService loopBlockchainService = entry.getValue().getBlockchainService(); + if (loopBlockchainService != blockchainService) { + // 处理账本数量 + try { + if (loopBlockchainService instanceof PeerServiceProxy) { + ledgerHashs = ((PeerServiceProxy) loopBlockchainService).getLedgerHashsDirect(); + if (ledgerHashs.length > ledgerSize) { + tempMostLedgerPeerServiceFactory = new PeerServiceFactory(entry.getKey(),entry.getValue()); + } + } + } catch (Exception e) { + LOGGER.error(String.format("%s get ledger hash fail !!!", entry.getKey()), e); + } + } + } + // 更新mostLedgerPeerServiceFactory + mostLedgerPeerServiceFactory = tempMostLedgerPeerServiceFactory; + return ledgerHashs; + } + } } diff --git a/source/gateway/src/main/java/com/jd/blockchain/gateway/web/BlockBrowserController.java b/source/gateway/src/main/java/com/jd/blockchain/gateway/web/BlockBrowserController.java index 97780f62..44e03df3 100644 --- a/source/gateway/src/main/java/com/jd/blockchain/gateway/web/BlockBrowserController.java +++ b/source/gateway/src/main/java/com/jd/blockchain/gateway/web/BlockBrowserController.java @@ -71,46 +71,46 @@ public class BlockBrowserController implements BlockchainExtendQueryService { @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}") @Override public LedgerInfo getLedger(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - return peerService.getQueryService().getLedger(ledgerHash); + return peerService.getQueryService(ledgerHash).getLedger(ledgerHash); } - + @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/admininfo") @Override public LedgerAdminInfo getLedgerAdminInfo(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - return peerService.getQueryService().getLedgerAdminInfo(ledgerHash); + return peerService.getQueryService(ledgerHash).getLedgerAdminInfo(ledgerHash); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/participants") @Override public ParticipantNode[] getConsensusParticipants(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - return peerService.getQueryService().getConsensusParticipants(ledgerHash); + return peerService.getQueryService(ledgerHash).getConsensusParticipants(ledgerHash); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/metadata") @Override public LedgerMetadata getLedgerMetadata(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - return peerService.getQueryService().getLedgerMetadata(ledgerHash); + return peerService.getQueryService(ledgerHash).getLedgerMetadata(ledgerHash); } - @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/settings") - public LedgerBaseSettings getLedgerInitSettings(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - return gatewayQueryService.getLedgerBaseSettings(ledgerHash); - } + @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/settings") + public LedgerBaseSettings getLedgerInitSettings(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { + return gatewayQueryService.getLedgerBaseSettings(ledgerHash); + } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks") public LedgerBlock[] getBlocks(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - LedgerInfo ledgerInfo = peerService.getQueryService().getLedger(ledgerHash); + LedgerInfo ledgerInfo = peerService.getQueryService(ledgerHash).getLedger(ledgerHash); long maxBlockHeight = ledgerInfo.getLatestBlockHeight(); List ledgerBlocks = new ArrayList<>(); for (long blockHeight = maxBlockHeight; blockHeight > GENESIS_BLOCK_HEIGHT; blockHeight--) { - LedgerBlock ledgerBlock = peerService.getQueryService().getBlock(ledgerHash, blockHeight); + LedgerBlock ledgerBlock = peerService.getQueryService(ledgerHash).getBlock(ledgerHash, blockHeight); ledgerBlocks.add(0, ledgerBlock); if (ledgerBlocks.size() == BLOCK_MAX_DISPLAY) { break; } } // 最后增加创世区块 - LedgerBlock genesisBlock = peerService.getQueryService().getBlock(ledgerHash, GENESIS_BLOCK_HEIGHT); + LedgerBlock genesisBlock = peerService.getQueryService(ledgerHash).getBlock(ledgerHash, GENESIS_BLOCK_HEIGHT); ledgerBlocks.add(0, genesisBlock); LedgerBlock[] blocks = new LedgerBlock[ledgerBlocks.size()]; ledgerBlocks.toArray(blocks); @@ -120,181 +120,181 @@ public class BlockBrowserController implements BlockchainExtendQueryService { @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}") @Override public LedgerBlock getBlock(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHeight") long blockHeight) { - return peerService.getQueryService().getBlock(ledgerHash, blockHeight); + @PathVariable(name = "blockHeight") long blockHeight) { + return peerService.getQueryService(ledgerHash).getBlock(ledgerHash, blockHeight); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}") @Override public LedgerBlock getBlock(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHash") HashDigest blockHash) { - return peerService.getQueryService().getBlock(ledgerHash, blockHash); + @PathVariable(name = "blockHash") HashDigest blockHash) { + return peerService.getQueryService(ledgerHash).getBlock(ledgerHash, blockHash); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/txs/count") @Override public long getTransactionCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHeight") long blockHeight) { - return peerService.getQueryService().getTransactionCount(ledgerHash, blockHeight); + @PathVariable(name = "blockHeight") long blockHeight) { + return peerService.getQueryService(ledgerHash).getTransactionCount(ledgerHash, blockHeight); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/txs/count") @Override public long getTransactionCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHash") HashDigest blockHash) { - return peerService.getQueryService().getTransactionCount(ledgerHash, blockHash); + @PathVariable(name = "blockHash") HashDigest blockHash) { + return peerService.getQueryService(ledgerHash).getTransactionCount(ledgerHash, blockHash); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/txs/count") @Override public long getTransactionTotalCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - return peerService.getQueryService().getTransactionTotalCount(ledgerHash); + return peerService.getQueryService(ledgerHash).getTransactionTotalCount(ledgerHash); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/accounts/count") @Override public long getDataAccountCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHeight") long blockHeight) { - return peerService.getQueryService().getDataAccountCount(ledgerHash, blockHeight); + @PathVariable(name = "blockHeight") long blockHeight) { + return peerService.getQueryService(ledgerHash).getDataAccountCount(ledgerHash, blockHeight); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/accounts/count") @Override public long getDataAccountCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHash") HashDigest blockHash) { - return peerService.getQueryService().getDataAccountCount(ledgerHash, blockHash); + @PathVariable(name = "blockHash") HashDigest blockHash) { + return peerService.getQueryService(ledgerHash).getDataAccountCount(ledgerHash, blockHash); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/accounts/count") @Override public long getDataAccountTotalCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - return peerService.getQueryService().getDataAccountTotalCount(ledgerHash); + return peerService.getQueryService(ledgerHash).getDataAccountTotalCount(ledgerHash); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/users/count") @Override public long getUserCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHeight") long blockHeight) { - return peerService.getQueryService().getUserCount(ledgerHash, blockHeight); + @PathVariable(name = "blockHeight") long blockHeight) { + return peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, blockHeight); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/users/count") @Override public long getUserCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHash") HashDigest blockHash) { - return peerService.getQueryService().getUserCount(ledgerHash, blockHash); + @PathVariable(name = "blockHash") HashDigest blockHash) { + return peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, blockHash); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/users/count") @Override public long getUserTotalCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - return peerService.getQueryService().getUserTotalCount(ledgerHash); + return peerService.getQueryService(ledgerHash).getUserTotalCount(ledgerHash); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/contracts/count") @Override public long getContractCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHeight") long blockHeight) { - return peerService.getQueryService().getContractCount(ledgerHash, blockHeight); + @PathVariable(name = "blockHeight") long blockHeight) { + return peerService.getQueryService(ledgerHash).getContractCount(ledgerHash, blockHeight); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/contracts/count") @Override public long getContractCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHash") HashDigest blockHash) { - return peerService.getQueryService().getContractCount(ledgerHash, blockHash); + @PathVariable(name = "blockHash") HashDigest blockHash) { + return peerService.getQueryService(ledgerHash).getContractCount(ledgerHash, blockHash); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/contracts/count") @Override public long getContractTotalCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - return peerService.getQueryService().getContractTotalCount(ledgerHash); + return peerService.getQueryService(ledgerHash).getContractTotalCount(ledgerHash); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/txs") @Override public LedgerTransaction[] getTransactions(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHeight") long blockHeight, - @RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex, - @RequestParam(name = "count", required = false, defaultValue = "-1") int count) { - return peerService.getQueryService().getTransactions(ledgerHash, blockHeight, fromIndex, count); + @PathVariable(name = "blockHeight") long blockHeight, + @RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex, + @RequestParam(name = "count", required = false, defaultValue = "-1") int count) { + return peerService.getQueryService(ledgerHash).getTransactions(ledgerHash, blockHeight, fromIndex, count); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/txs") @Override public LedgerTransaction[] getTransactions(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHash") HashDigest blockHash, - @RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex, - @RequestParam(name = "count", required = false, defaultValue = "-1") int count) { - return peerService.getQueryService().getTransactions(ledgerHash, blockHash, fromIndex, count); + @PathVariable(name = "blockHash") HashDigest blockHash, + @RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex, + @RequestParam(name = "count", required = false, defaultValue = "-1") int count) { + return peerService.getQueryService(ledgerHash).getTransactions(ledgerHash, blockHash, fromIndex, count); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/txs/hash/{contentHash}") @Override public LedgerTransaction getTransactionByContentHash(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "contentHash") HashDigest contentHash) { - return peerService.getQueryService().getTransactionByContentHash(ledgerHash, contentHash); + @PathVariable(name = "contentHash") HashDigest contentHash) { + return peerService.getQueryService(ledgerHash).getTransactionByContentHash(ledgerHash, contentHash); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/txs/state/{contentHash}") @Override public TransactionState getTransactionStateByContentHash(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "contentHash") HashDigest contentHash) { - return peerService.getQueryService().getTransactionStateByContentHash(ledgerHash, contentHash); + @PathVariable(name = "contentHash") HashDigest contentHash) { + return peerService.getQueryService(ledgerHash).getTransactionStateByContentHash(ledgerHash, contentHash); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/users/address/{address}") @Override public UserInfo getUser(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "address") String address) { - return peerService.getQueryService().getUser(ledgerHash, address); + @PathVariable(name = "address") String address) { + return peerService.getQueryService(ledgerHash).getUser(ledgerHash, address); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/accounts/address/{address}") @Override public BlockchainIdentity getDataAccount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "address") String address) { + @PathVariable(name = "address") String address) { - return peerService.getQueryService().getDataAccount(ledgerHash, address); + return peerService.getQueryService(ledgerHash).getDataAccount(ledgerHash, address); } @RequestMapping(method = { RequestMethod.GET, RequestMethod.POST }, path = "ledgers/{ledgerHash}/accounts/{address}/entries") @Override public TypedKVEntry[] getDataEntries(@PathVariable("ledgerHash") HashDigest ledgerHash, - @PathVariable("address") String address, @RequestParam("keys") String... keys) { - return peerService.getQueryService().getDataEntries(ledgerHash, address, keys); + @PathVariable("address") String address, @RequestParam("keys") String... keys) { + return peerService.getQueryService(ledgerHash).getDataEntries(ledgerHash, address, keys); } @RequestMapping(method = { RequestMethod.GET, RequestMethod.POST }, path = "ledgers/{ledgerHash}/accounts/{address}/entries-version") @Override public TypedKVEntry[] getDataEntries(@PathVariable("ledgerHash") HashDigest ledgerHash, - @PathVariable("address") String address, @RequestBody KVInfoVO kvInfoVO) { - return peerService.getQueryService().getDataEntries(ledgerHash, address, kvInfoVO); + @PathVariable("address") String address, @RequestBody KVInfoVO kvInfoVO) { + return peerService.getQueryService(ledgerHash).getDataEntries(ledgerHash, address, kvInfoVO); } @RequestMapping(method = { RequestMethod.GET, RequestMethod.POST }, path = "ledgers/{ledgerHash}/accounts/address/{address}/entries") @Override public TypedKVEntry[] getDataEntries(@PathVariable("ledgerHash") HashDigest ledgerHash, - @PathVariable("address") String address, - @RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex, - @RequestParam(name = "count", required = false, defaultValue = "-1") int count) { - return peerService.getQueryService().getDataEntries(ledgerHash, address, fromIndex, count); + @PathVariable("address") String address, + @RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex, + @RequestParam(name = "count", required = false, defaultValue = "-1") int count) { + return peerService.getQueryService(ledgerHash).getDataEntries(ledgerHash, address, fromIndex, count); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/accounts/address/{address}/entries/count") @Override public long getDataEntriesTotalCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "address") String address) { - return peerService.getQueryService().getDataEntriesTotalCount(ledgerHash, address); + @PathVariable(name = "address") String address) { + return peerService.getQueryService(ledgerHash).getDataEntriesTotalCount(ledgerHash, address); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/contracts/address/{address}") public ContractSettings getContractSettings(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "address") String address) { - ContractInfo contractInfo = peerService.getQueryService().getContract(ledgerHash, address); + @PathVariable(name = "address") String address) { + ContractInfo contractInfo = peerService.getQueryService(ledgerHash).getContract(ledgerHash, address); return contractSettings(contractInfo); } @@ -308,30 +308,30 @@ public class BlockBrowserController implements BlockchainExtendQueryService { return contractSettings; } -// @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/contracts/address/{address}") + // @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/contracts/address/{address}") @Override public ContractInfo getContract(HashDigest ledgerHash, String address) { - return peerService.getQueryService().getContract(ledgerHash, address); + return peerService.getQueryService(ledgerHash).getContract(ledgerHash, address); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/latest") @Override public LedgerBlock getLatestBlock(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - long latestBlockHeight = peerService.getQueryService().getLedger(ledgerHash).getLatestBlockHeight(); - return peerService.getQueryService().getBlock(ledgerHash, latestBlockHeight); + long latestBlockHeight = peerService.getQueryService(ledgerHash).getLedger(ledgerHash).getLatestBlockHeight(); + return peerService.getQueryService(ledgerHash).getBlock(ledgerHash, latestBlockHeight); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/txs/additional-count") @Override public long getAdditionalTransactionCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHeight") long blockHeight) { + @PathVariable(name = "blockHeight") long blockHeight) { // 获取某个区块的交易总数 - long currentBlockTxCount = peerService.getQueryService().getTransactionCount(ledgerHash, blockHeight); + long currentBlockTxCount = peerService.getQueryService(ledgerHash).getTransactionCount(ledgerHash, blockHeight); if (blockHeight == GENESIS_BLOCK_HEIGHT) { return currentBlockTxCount; } long lastBlockHeight = blockHeight - 1; - long lastBlockTxCount = peerService.getQueryService().getTransactionCount(ledgerHash, lastBlockHeight); + long lastBlockTxCount = peerService.getQueryService(ledgerHash).getTransactionCount(ledgerHash, lastBlockHeight); // 当前区块交易数减上个区块交易数 return currentBlockTxCount - lastBlockTxCount; } @@ -339,14 +339,14 @@ public class BlockBrowserController implements BlockchainExtendQueryService { @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/txs/additional-count") @Override public long getAdditionalTransactionCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHash") HashDigest blockHash) { - LedgerBlock currentBlock = peerService.getQueryService().getBlock(ledgerHash, blockHash); - long currentBlockTxCount = peerService.getQueryService().getTransactionCount(ledgerHash, blockHash); + @PathVariable(name = "blockHash") HashDigest blockHash) { + LedgerBlock currentBlock = peerService.getQueryService(ledgerHash).getBlock(ledgerHash, blockHash); + long currentBlockTxCount = peerService.getQueryService(ledgerHash).getTransactionCount(ledgerHash, blockHash); if (currentBlock.getHeight() == GENESIS_BLOCK_HEIGHT) { return currentBlockTxCount; } HashDigest previousHash = currentBlock.getPreviousHash(); - long lastBlockTxCount = peerService.getQueryService().getTransactionCount(ledgerHash, previousHash); + long lastBlockTxCount = peerService.getQueryService(ledgerHash).getTransactionCount(ledgerHash, previousHash); // 当前区块交易数减上个区块交易数 return currentBlockTxCount - lastBlockTxCount; } @@ -354,40 +354,40 @@ public class BlockBrowserController implements BlockchainExtendQueryService { @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/txs/additional-count") @Override public long getAdditionalTransactionCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - LedgerInfo ledgerInfo = peerService.getQueryService().getLedger(ledgerHash); + LedgerInfo ledgerInfo = peerService.getQueryService(ledgerHash).getLedger(ledgerHash); long maxBlockHeight = ledgerInfo.getLatestBlockHeight(); - long totalCount = peerService.getQueryService().getTransactionTotalCount(ledgerHash); + long totalCount = peerService.getQueryService(ledgerHash).getTransactionTotalCount(ledgerHash); if (maxBlockHeight == GENESIS_BLOCK_HEIGHT) { // 只有一个创世区块 return totalCount; } - long lastTotalCount = peerService.getQueryService().getTransactionCount(ledgerHash, maxBlockHeight - 1); + long lastTotalCount = peerService.getQueryService(ledgerHash).getTransactionCount(ledgerHash, maxBlockHeight - 1); return totalCount - lastTotalCount; } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/accounts/additional-count") @Override public long getAdditionalDataAccountCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHeight") long blockHeight) { - long currentDaCount = peerService.getQueryService().getDataAccountCount(ledgerHash, blockHeight); + @PathVariable(name = "blockHeight") long blockHeight) { + long currentDaCount = peerService.getQueryService(ledgerHash).getDataAccountCount(ledgerHash, blockHeight); if (blockHeight == GENESIS_BLOCK_HEIGHT) { return currentDaCount; } long lastBlockHeight = blockHeight - 1; - long lastDaCount = peerService.getQueryService().getDataAccountCount(ledgerHash, lastBlockHeight); + long lastDaCount = peerService.getQueryService(ledgerHash).getDataAccountCount(ledgerHash, lastBlockHeight); return currentDaCount - lastDaCount; } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/accounts/additional-count") @Override public long getAdditionalDataAccountCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHash") HashDigest blockHash) { - LedgerBlock currentBlock = peerService.getQueryService().getBlock(ledgerHash, blockHash); - long currentBlockDaCount = peerService.getQueryService().getDataAccountCount(ledgerHash, blockHash); + @PathVariable(name = "blockHash") HashDigest blockHash) { + LedgerBlock currentBlock = peerService.getQueryService(ledgerHash).getBlock(ledgerHash, blockHash); + long currentBlockDaCount = peerService.getQueryService(ledgerHash).getDataAccountCount(ledgerHash, blockHash); if (currentBlock.getHeight() == GENESIS_BLOCK_HEIGHT) { return currentBlockDaCount; } HashDigest previousHash = currentBlock.getPreviousHash(); - long lastBlockDaCount = peerService.getQueryService().getDataAccountCount(ledgerHash, previousHash); + long lastBlockDaCount = peerService.getQueryService(ledgerHash).getDataAccountCount(ledgerHash, previousHash); // 当前区块数据账户数量减上个区块数据账户数量 return currentBlockDaCount - lastBlockDaCount; } @@ -395,40 +395,40 @@ public class BlockBrowserController implements BlockchainExtendQueryService { @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/accounts/additional-count") @Override public long getAdditionalDataAccountCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - LedgerInfo ledgerInfo = peerService.getQueryService().getLedger(ledgerHash); + LedgerInfo ledgerInfo = peerService.getQueryService(ledgerHash).getLedger(ledgerHash); long maxBlockHeight = ledgerInfo.getLatestBlockHeight(); - long totalCount = peerService.getQueryService().getDataAccountTotalCount(ledgerHash); + long totalCount = peerService.getQueryService(ledgerHash).getDataAccountTotalCount(ledgerHash); if (maxBlockHeight == GENESIS_BLOCK_HEIGHT) { // 只有一个创世区块 return totalCount; } - long lastTotalCount = peerService.getQueryService().getDataAccountCount(ledgerHash, maxBlockHeight - 1); + long lastTotalCount = peerService.getQueryService(ledgerHash).getDataAccountCount(ledgerHash, maxBlockHeight - 1); return totalCount - lastTotalCount; } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/users/additional-count") @Override public long getAdditionalUserCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHeight") long blockHeight) { - long currentUserCount = peerService.getQueryService().getUserCount(ledgerHash, blockHeight); + @PathVariable(name = "blockHeight") long blockHeight) { + long currentUserCount = peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, blockHeight); if (blockHeight == GENESIS_BLOCK_HEIGHT) { return currentUserCount; } long lastBlockHeight = blockHeight - 1; - long lastUserCount = peerService.getQueryService().getUserCount(ledgerHash, lastBlockHeight); + long lastUserCount = peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, lastBlockHeight); return currentUserCount - lastUserCount; } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/users/additional-count") @Override public long getAdditionalUserCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHash") HashDigest blockHash) { - LedgerBlock currentBlock = peerService.getQueryService().getBlock(ledgerHash, blockHash); - long currentBlockUserCount = peerService.getQueryService().getUserCount(ledgerHash, blockHash); + @PathVariable(name = "blockHash") HashDigest blockHash) { + LedgerBlock currentBlock = peerService.getQueryService(ledgerHash).getBlock(ledgerHash, blockHash); + long currentBlockUserCount = peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, blockHash); if (currentBlock.getHeight() == GENESIS_BLOCK_HEIGHT) { return currentBlockUserCount; } HashDigest previousHash = currentBlock.getPreviousHash(); - long lastBlockUserCount = peerService.getQueryService().getUserCount(ledgerHash, previousHash); + long lastBlockUserCount = peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, previousHash); // 当前区块用户数量减上个区块用户数量 return currentBlockUserCount - lastBlockUserCount; } @@ -436,40 +436,40 @@ public class BlockBrowserController implements BlockchainExtendQueryService { @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/users/additional-count") @Override public long getAdditionalUserCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - LedgerInfo ledgerInfo = peerService.getQueryService().getLedger(ledgerHash); + LedgerInfo ledgerInfo = peerService.getQueryService(ledgerHash).getLedger(ledgerHash); long maxBlockHeight = ledgerInfo.getLatestBlockHeight(); - long totalCount = peerService.getQueryService().getUserTotalCount(ledgerHash); + long totalCount = peerService.getQueryService(ledgerHash).getUserTotalCount(ledgerHash); if (maxBlockHeight == GENESIS_BLOCK_HEIGHT) { // 只有一个创世区块 return totalCount; } - long lastTotalCount = peerService.getQueryService().getUserCount(ledgerHash, maxBlockHeight - 1); + long lastTotalCount = peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, maxBlockHeight - 1); return totalCount - lastTotalCount; } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/height/{blockHeight}/contracts/additional-count") @Override public long getAdditionalContractCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHeight") long blockHeight) { - long currentContractCount = peerService.getQueryService().getContractCount(ledgerHash, blockHeight); + @PathVariable(name = "blockHeight") long blockHeight) { + long currentContractCount = peerService.getQueryService(ledgerHash).getContractCount(ledgerHash, blockHeight); if (blockHeight == GENESIS_BLOCK_HEIGHT) { return currentContractCount; } long lastBlockHeight = blockHeight - 1; - long lastContractCount = peerService.getQueryService().getUserCount(ledgerHash, lastBlockHeight); + long lastContractCount = peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, lastBlockHeight); return currentContractCount - lastContractCount; } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/blocks/hash/{blockHash}/contracts/additional-count") @Override public long getAdditionalContractCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @PathVariable(name = "blockHash") HashDigest blockHash) { - LedgerBlock currentBlock = peerService.getQueryService().getBlock(ledgerHash, blockHash); - long currentBlockContractCount = peerService.getQueryService().getContractCount(ledgerHash, blockHash); + @PathVariable(name = "blockHash") HashDigest blockHash) { + LedgerBlock currentBlock = peerService.getQueryService(ledgerHash).getBlock(ledgerHash, blockHash); + long currentBlockContractCount = peerService.getQueryService(ledgerHash).getContractCount(ledgerHash, blockHash); if (currentBlock.getHeight() == GENESIS_BLOCK_HEIGHT) { return currentBlockContractCount; } HashDigest previousHash = currentBlock.getPreviousHash(); - long lastBlockContractCount = peerService.getQueryService().getUserCount(ledgerHash, previousHash); + long lastBlockContractCount = peerService.getQueryService(ledgerHash).getUserCount(ledgerHash, previousHash); // 当前区块合约数量减上个区块合约数量 return currentBlockContractCount - lastBlockContractCount; } @@ -477,13 +477,13 @@ public class BlockBrowserController implements BlockchainExtendQueryService { @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/contracts/additional-count") @Override public long getAdditionalContractCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - LedgerInfo ledgerInfo = peerService.getQueryService().getLedger(ledgerHash); + LedgerInfo ledgerInfo = peerService.getQueryService(ledgerHash).getLedger(ledgerHash); long maxBlockHeight = ledgerInfo.getLatestBlockHeight(); - long totalCount = peerService.getQueryService().getContractTotalCount(ledgerHash); + long totalCount = peerService.getQueryService(ledgerHash).getContractTotalCount(ledgerHash); if (maxBlockHeight == GENESIS_BLOCK_HEIGHT) { // 只有一个创世区块 return totalCount; } - long lastTotalCount = peerService.getQueryService().getContractCount(ledgerHash, maxBlockHeight - 1); + long lastTotalCount = peerService.getQueryService(ledgerHash).getContractCount(ledgerHash, maxBlockHeight - 1); return totalCount - lastTotalCount; } @@ -569,14 +569,14 @@ public class BlockBrowserController implements BlockchainExtendQueryService { @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/participants/count") public int getConsensusParticipantCount(@PathVariable(name = "ledgerHash") HashDigest ledgerHash) { - return peerService.getQueryService().getConsensusParticipants(ledgerHash).length; + return peerService.getQueryService(ledgerHash).getConsensusParticipants(ledgerHash).length; } // @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/participants") // public ParticipantNode[] getConsensusParticipants(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, // @RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex, // @RequestParam(name = "count", required = false, defaultValue = "-1") int count) { -// +// // ParticipantNode participantNode[] = peerService.getQueryService().getConsensusParticipants(ledgerHash); // int indexAndCount[] = QueryUtil.calFromIndexAndCount(fromIndex, count, participantNode.length); // ParticipantNode participantNodesNew[] = Arrays.copyOfRange(participantNode, indexAndCount[0], @@ -586,7 +586,7 @@ public class BlockBrowserController implements BlockchainExtendQueryService { /** * get more users by fromIndex and count; - * + * * @param ledgerHash * @param fromIndex * @param count @@ -595,25 +595,25 @@ public class BlockBrowserController implements BlockchainExtendQueryService { @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/users") @Override public BlockchainIdentity[] getUsers(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex, - @RequestParam(name = "count", required = false, defaultValue = "-1") int count) { - return revertAccountHeader(peerService.getQueryService().getUsers(ledgerHash, fromIndex, count)); + @RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex, + @RequestParam(name = "count", required = false, defaultValue = "-1") int count) { + return revertAccountHeader(peerService.getQueryService(ledgerHash).getUsers(ledgerHash, fromIndex, count)); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/accounts") @Override public BlockchainIdentity[] getDataAccounts(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex, - @RequestParam(name = "count", required = false, defaultValue = "-1") int count) { - return revertAccountHeader(peerService.getQueryService().getDataAccounts(ledgerHash, fromIndex, count)); + @RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex, + @RequestParam(name = "count", required = false, defaultValue = "-1") int count) { + return revertAccountHeader(peerService.getQueryService(ledgerHash).getDataAccounts(ledgerHash, fromIndex, count)); } @RequestMapping(method = RequestMethod.GET, path = "ledgers/{ledgerHash}/contracts") @Override public BlockchainIdentity[] getContractAccounts(@PathVariable(name = "ledgerHash") HashDigest ledgerHash, - @RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex, - @RequestParam(name = "count", required = false, defaultValue = "-1") int count) { - return revertAccountHeader(peerService.getQueryService().getContractAccounts(ledgerHash, fromIndex, count)); + @RequestParam(name = "fromIndex", required = false, defaultValue = "0") int fromIndex, + @RequestParam(name = "count", required = false, defaultValue = "-1") int count) { + return revertAccountHeader(peerService.getQueryService(ledgerHash).getContractAccounts(ledgerHash, fromIndex, count)); } /** diff --git a/source/gateway/src/main/java/com/jd/blockchain/gateway/web/GatewayLedgerLoadTimer.java b/source/gateway/src/main/java/com/jd/blockchain/gateway/web/GatewayLedgerLoadTimer.java new file mode 100644 index 00000000..f0d42e41 --- /dev/null +++ b/source/gateway/src/main/java/com/jd/blockchain/gateway/web/GatewayLedgerLoadTimer.java @@ -0,0 +1,91 @@ +/** + * Copyright: Copyright 2016-2020 JD.COM All Right Reserved + * FileName: com.jd.blockchain.gateway.web.GatewayTimeTasks + * Author: shaozhuguang + * Department: 区块链研发部 + * Date: 2019/1/16 下午6:17 + * Description: + */ +package com.jd.blockchain.gateway.web; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.jd.blockchain.gateway.PeerConnector; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.concurrent.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * + * @author shaozhuguang + * @create 2019/1/16 + * @since 1.0.0 + */ +@Component +@EnableScheduling +public class GatewayLedgerLoadTimer { + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(GatewayLedgerLoadTimer.class); + + private static final ExecutorService LEDGER_LOAD_EXECUTOR = initLedgerLoadExecutor(); + + private static final Lock LOCK = new ReentrantLock(); + + @Autowired + private PeerConnector peerConnector; + + /** + * 账本加载许可,主要作用两个 + * 1、防止启动时加载账本与当前定时器加载冲突 + * 2、每次加载完成后释放许可,以便于下次定时任务加载,若不存在许可,则下次定时任务放弃执行 + */ + private Semaphore loadSemaphore = new Semaphore(0); + + //每1钟执行一次 + @Scheduled(cron = "0 */1 * * * * ") + public void ledgerLoad(){ + boolean acquire = false; + LOCK.lock(); + try { + // 5秒内获取授权 + acquire = loadSemaphore.tryAcquire(5, TimeUnit.SECONDS); + if (acquire) { + // 授权成功的情况下,进行单线程重连 + LEDGER_LOAD_EXECUTOR.execute(() -> { + peerConnector.monitorAndReconnect(); + }); + } + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } finally { + if (acquire) { + // 授权成功的情况下,释放该许可 + release(); + } + LOCK.unlock(); + } + } + + /** + * 释放许可 + */ + public void release() { + loadSemaphore.release(); + } + + private static ThreadPoolExecutor initLedgerLoadExecutor() { + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("gateway-ledger-loader-%d").build(); + + return new ThreadPoolExecutor(1, 1, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1024), + threadFactory, + new ThreadPoolExecutor.AbortPolicy()); + } +} \ No newline at end of file diff --git a/source/gateway/src/main/java/com/jd/blockchain/gateway/web/GatewayTimeTasks.java b/source/gateway/src/main/java/com/jd/blockchain/gateway/web/GatewayTimeTasks.java deleted file mode 100644 index 9e6c249d..00000000 --- a/source/gateway/src/main/java/com/jd/blockchain/gateway/web/GatewayTimeTasks.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Copyright: Copyright 2016-2020 JD.COM All Right Reserved - * FileName: com.jd.blockchain.gateway.web.GatewayTimeTasks - * Author: shaozhuguang - * Department: 区块链研发部 - * Date: 2019/1/16 下午6:17 - * Description: - */ -package com.jd.blockchain.gateway.web; - -import com.jd.blockchain.consensus.service.NodeServer; -import com.jd.blockchain.crypto.HashDigest; -import com.jd.blockchain.gateway.PeerConnector; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.util.*; - -/** - * - * @author shaozhuguang - * @create 2019/1/16 - * @since 1.0.0 - */ -@Component -@EnableScheduling -public class GatewayTimeTasks { - - private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(GatewayTimeTasks.class); - - @Autowired - private PeerConnector peerConnector; - - //每30分钟执行一次 - @Scheduled(cron = "0 */8 * * * * ") - public void updateLedger(){ - try { - peerConnector.reconnect(); - } catch (Exception e) { - LOGGER.error(e.getMessage()); - } - } -} \ No newline at end of file diff --git a/source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/service/PeerServiceProxy.java b/source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/service/PeerServiceProxy.java index 535b45f5..7ac4ecc1 100644 --- a/source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/service/PeerServiceProxy.java +++ b/source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/service/PeerServiceProxy.java @@ -9,15 +9,13 @@ import com.jd.blockchain.sdk.proxy.BlockchainServiceProxy; import com.jd.blockchain.transaction.BlockchainQueryService; import com.jd.blockchain.transaction.TransactionService; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 对共识节点的区块链服务代理; - * + * * @author huanghaiquan * */ @@ -42,8 +40,8 @@ public class PeerServiceProxy extends BlockchainServiceProxy implements Transact } public void addLedgerAccessContexts(LedgerAccessContext[] accessAbleLedgers) { + accessLock.lock(); try { - accessLock.lock(); if (this.ledgerAccessContexts == null) { throw new IllegalArgumentException("LedgerAccessContexts is null, you need init first !!!"); } @@ -81,7 +79,28 @@ public class PeerServiceProxy extends BlockchainServiceProxy implements Transact } /** - * 处理网关的交易转发; + * 直接获取账本信息 + * 不通过内部缓存 + * + * @return + */ + public HashDigest[] getLedgerHashsDirect() { + Set ledgerHashs = new HashSet<>(); + if (ledgerAccessContexts != null && !ledgerAccessContexts.isEmpty()) { + Collection ctxs = ledgerAccessContexts.values(); + for (LedgerAccessContext ctx : ctxs) { + HashDigest[] hashs = ctx.getQueryService().getLedgerHashs(); + ledgerHashs.addAll(Arrays.asList(hashs)); + } + } + if (ledgerHashs.isEmpty()) { + return new HashDigest[0]; + } + return ledgerHashs.toArray(new HashDigest[ledgerHashs.size()]); + } + + /** + * 处理网关的交易转发; */ @Override public TransactionResponse process(TransactionRequest txRequest) {