From 1b8923f8516f25819206f0df9a4293e2dc4b85b4 Mon Sep 17 00:00:00 2001 From: zhaoguangwei Date: Mon, 2 Mar 2020 15:14:02 +0800 Subject: [PATCH] =?UTF-8?q?1)0.0.0.0;=202)gateway's=20pool;=203)bftsmart's?= =?UTF-8?q?=20optimization=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/BftsmartMessageService.java | 4 +- .../bftsmart/service/BftsmartNodeServer.java | 18 +++--- .../gateway/web/TxProcessingController.java | 7 +++ .../ledger/core/LedgerQueryService.java | 6 +- .../core/TransactionBatchProcessor.java | 12 ++-- .../peer/web/LedgerQueryController.java | 4 +- .../sdk/service/NodeSigningAppender.java | 63 +++++++++++++++++-- .../sdk/test/SDK_GateWay_User_Test_.java | 2 +- 8 files changed, 92 insertions(+), 24 deletions(-) diff --git a/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/client/BftsmartMessageService.java b/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/client/BftsmartMessageService.java index 5ecf6596..9cc20c8b 100644 --- a/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/client/BftsmartMessageService.java +++ b/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/client/BftsmartMessageService.java @@ -43,7 +43,9 @@ public class BftsmartMessageService implements MessageService { throw new RuntimeException(e); } finally { - asyncPeerProxyPool.returnObject(asynchServiceProxy); + if (asynchServiceProxy != null) { + asyncPeerProxyPool.returnObject(asynchServiceProxy); + } } return asyncFuture; diff --git a/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java b/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java index 61a6a619..ef941d03 100644 --- a/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java +++ b/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java @@ -33,7 +33,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer private static Logger LOGGER = LoggerFactory.getLogger(BftsmartNodeServer.class); - private static final String DEFAULT_BINDING_HOST = "0.0.0.0"; +// private static final String DEFAULT_BINDING_HOST = "0.0.0.0"; private List stateHandles = new CopyOnWriteArrayList<>(); @@ -59,7 +59,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer private TOMConfiguration tomConfig; - private TOMConfiguration outerTomConfig; +// private TOMConfiguration outerTomConfig; private HostsConfig hostsConfig; private Properties systemConfig; @@ -129,12 +129,12 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer } protected void initConfig(int id, Properties systemsConfig, HostsConfig hostConfig) { - byte[] serialHostConf = BinarySerializeUtils.serialize(hostConfig); - Properties sysConfClone = (Properties)systemsConfig.clone(); - int port = hostConfig.getPort(id); - hostConfig.add(id, DEFAULT_BINDING_HOST, port); +// byte[] serialHostConf = BinarySerializeUtils.serialize(hostConfig); +// Properties sysConfClone = (Properties)systemsConfig.clone(); +// int port = hostConfig.getPort(id); +// hostConfig.add(id, DEFAULT_BINDING_HOST, port); this.tomConfig = new TOMConfiguration(id, systemsConfig, hostConfig); - this.outerTomConfig = new TOMConfiguration(id, sysConfClone, BinarySerializeUtils.deserialize(serialHostConf)); +// this.outerTomConfig = new TOMConfiguration(id, systemsConfig, hostConfig); } @Override @@ -153,7 +153,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer } public TOMConfiguration getTomConfig() { - return outerTomConfig; + return tomConfig; } public int getId() { @@ -165,7 +165,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer throw new IllegalArgumentException("ReplicaID is negative!"); } this.tomConfig.setProcessId(id); - this.outerTomConfig.setProcessId(id); +// this.outerTomConfig.setProcessId(id); } public BftsmartConsensusSettings getConsensusSetting() { diff --git a/source/gateway/src/main/java/com/jd/blockchain/gateway/web/TxProcessingController.java b/source/gateway/src/main/java/com/jd/blockchain/gateway/web/TxProcessingController.java index 8f949c69..41739cc7 100644 --- a/source/gateway/src/main/java/com/jd/blockchain/gateway/web/TxProcessingController.java +++ b/source/gateway/src/main/java/com/jd/blockchain/gateway/web/TxProcessingController.java @@ -3,6 +3,8 @@ package com.jd.blockchain.gateway.web; import com.jd.blockchain.crypto.*; import com.jd.blockchain.gateway.service.GatewayInterceptService; import com.jd.blockchain.transaction.SignatureUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; @@ -27,6 +29,8 @@ import com.jd.blockchain.web.converters.BinaryMessageConverter; @RestController public class TxProcessingController implements TransactionService { + private static Logger LOGGER = LoggerFactory.getLogger(TxProcessingController.class); + @Autowired private PeerService peerService; @@ -59,11 +63,14 @@ public class TxProcessingController implements TransactionService { throw new IllegalStateException("Not implemented!"); } else { // 验证签名; + StringBuilder signer = new StringBuilder(txRequest.getHash().toString()).append("->"); for (DigitalSignature sign : partiSigns) { + signer.append(AddressEncoding.generateAddress(sign.getPubKey()).toBase58()).append(","); if (!SignatureUtils.verifySignature(txRequest.getTransactionContent(), sign.getDigest(), sign.getPubKey())) { throw new BusinessException("The validation of participant signatures fail!"); } } + LOGGER.debug(signer.toString()); } // 注:转发前自动附加网关的签名并转发请求至共识节点;异步的处理方式 diff --git a/source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/LedgerQueryService.java b/source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/LedgerQueryService.java index c377a56a..5e54e3e9 100644 --- a/source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/LedgerQueryService.java +++ b/source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/LedgerQueryService.java @@ -370,9 +370,11 @@ public class LedgerQueryService implements BlockchainQueryService { DataAccountQuery dataAccountSet = ledger.getDataAccountSet(block); DataAccount dataAccount = dataAccountSet.getAccount(Bytes.fromBase58(address)); -// int pages[] = QueryUtil.calFromIndexAndCount(fromIndex, count, (int) dataAccount.getDataset().getDataCount()); + int pages[] = QueryUtil.calFromIndexAndCount(fromIndex, count, (int) dataAccount.getDataset().getDataCount()); // return dataAccount.getDataset().getDataEntry(key, version).getDataEntries(pages[0], pages[1]); - + fromIndex = pages[0]; + count = pages[1]; + DataIterator iterator = dataAccount.getDataset().iterator(); iterator.skip(fromIndex); DataEntry[] dataEntries = iterator.next(count); diff --git a/source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/TransactionBatchProcessor.java b/source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/TransactionBatchProcessor.java index e2f42050..0e777a75 100644 --- a/source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/TransactionBatchProcessor.java +++ b/source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/TransactionBatchProcessor.java @@ -1,11 +1,9 @@ package com.jd.blockchain.ledger.core; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; +import java.util.*; import com.jd.blockchain.ledger.*; +import com.jd.blockchain.utils.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,6 +115,12 @@ public class TransactionBatchProcessor implements TransactionBatchProcess { TransactionRequestExtension reqExt = new TransactionRequestExtensionImpl(request); // 初始化交易的用户安全策略; +// Set endPointAddresses = reqExt.getEndpointAddresses(); +// int index = 0; +// for (Bytes address : endPointAddresses) { +// System.out.printf("EndPoint Sign Address %s[%s] -> %s \r\n", request.getHash(), index++, address.toBase58()); +//// LOGGER.debug("EndPoint Sign Address {}[{}] -> {}", request.getHash(), index++, address.toBase58()); +// } SecurityPolicy securityPolicy = securityManager.createSecurityPolicy(reqExt.getEndpointAddresses(), reqExt.getNodeAddresses()); SecurityContext.setContextUsersPolicy(securityPolicy); diff --git a/source/peer/src/main/java/com/jd/blockchain/peer/web/LedgerQueryController.java b/source/peer/src/main/java/com/jd/blockchain/peer/web/LedgerQueryController.java index 41ce1458..38c7da00 100644 --- a/source/peer/src/main/java/com/jd/blockchain/peer/web/LedgerQueryController.java +++ b/source/peer/src/main/java/com/jd/blockchain/peer/web/LedgerQueryController.java @@ -449,8 +449,10 @@ public class LedgerQueryController implements BlockchainQueryService { DataAccountQuery dataAccountSet = ledger.getDataAccountSet(block); DataAccount dataAccount = dataAccountSet.getAccount(Bytes.fromBase58(address)); -// int pages[] = QueryUtil.calFromIndexAndCount(fromIndex, count, (int) dataAccount.getDataset().getDataCount()); + int pages[] = QueryUtil.calFromIndexAndCount(fromIndex, count, (int) dataAccount.getDataset().getDataCount()); // return dataAccount.getDataEntries(pages[0], pages[1]); + fromIndex = pages[0]; + count = pages[1]; DataIterator iterator = dataAccount.getDataset().iterator(); iterator.skip(fromIndex); diff --git a/source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/service/NodeSigningAppender.java b/source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/service/NodeSigningAppender.java index 0014c28a..5c0c6f06 100644 --- a/source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/service/NodeSigningAppender.java +++ b/source/sdk/sdk-base/src/main/java/com/jd/blockchain/sdk/service/NodeSigningAppender.java @@ -8,14 +8,13 @@ import com.jd.blockchain.crypto.AsymmetricKeypair; import com.jd.blockchain.crypto.Crypto; import com.jd.blockchain.crypto.HashDigest; import com.jd.blockchain.crypto.HashFunction; -import com.jd.blockchain.ledger.DigitalSignature; -import com.jd.blockchain.ledger.NodeRequest; -import com.jd.blockchain.ledger.TransactionRequest; -import com.jd.blockchain.ledger.TransactionResponse; +import com.jd.blockchain.ledger.*; import com.jd.blockchain.transaction.SignatureUtils; import com.jd.blockchain.transaction.TransactionService; import com.jd.blockchain.transaction.TxRequestMessage; import com.jd.blockchain.utils.concurrent.AsyncFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link NodeSigningAppender} 以装饰者模式实现,为交易请求附加上节点签名; @@ -25,6 +24,8 @@ import com.jd.blockchain.utils.concurrent.AsyncFuture; */ public class NodeSigningAppender implements TransactionService { + private static Logger LOGGER = LoggerFactory.getLogger(NodeSigningAppender.class); + static { DataContractRegistry.register(NodeRequest.class); } @@ -78,8 +79,58 @@ public class NodeSigningAppender implements TransactionService { HashDigest txHash = hashFunc.hash(nodeRequestBytes); txMessage.setHash(txHash); - AsyncFuture result = messageService.sendOrdered(BinaryProtocol.encode(txMessage, TransactionRequest.class)); + try { + AsyncFuture asyncFuture = messageService.sendOrdered(BinaryProtocol.encode(txMessage, TransactionRequest.class)); + byte[] result = asyncFuture.get(); + if (result == null) { + LOGGER.error("Gateway receive [{}]'s result is null!", txRequest.getHash()); + return new ErrorTransactionResponse(txRequest.getTransactionContent().getHash()); + } + return BinaryProtocol.decode(result); + } catch (IllegalStateException e) { + e.printStackTrace(); + LOGGER.error("Gateway send tx [{}] error {} !", txRequest.getHash(), e); + return new ErrorTransactionResponse(txRequest.getTransactionContent().getHash()); + } + } + + + private static class ErrorTransactionResponse implements TransactionResponse { + + HashDigest contentHash; + + public ErrorTransactionResponse(HashDigest contentHash) { + this.contentHash = contentHash; + } + + @Override + public HashDigest getContentHash() { + return contentHash; + } + + @Override + public TransactionState getExecutionState() { + return TransactionState.TIMEOUT; + } + + @Override + public HashDigest getBlockHash() { + return null; + } + + @Override + public long getBlockHeight() { + return -1L; + } + + @Override + public boolean isSuccess() { + return false; + } - return BinaryProtocol.decode(result.get()); + @Override + public OperationResult[] getOperationResults() { + return null; + } } } diff --git a/source/sdk/sdk-samples/src/test/java/test/com/jd/blockchain/sdk/test/SDK_GateWay_User_Test_.java b/source/sdk/sdk-samples/src/test/java/test/com/jd/blockchain/sdk/test/SDK_GateWay_User_Test_.java index 65cce89b..d69b0f97 100644 --- a/source/sdk/sdk-samples/src/test/java/test/com/jd/blockchain/sdk/test/SDK_GateWay_User_Test_.java +++ b/source/sdk/sdk-samples/src/test/java/test/com/jd/blockchain/sdk/test/SDK_GateWay_User_Test_.java @@ -82,7 +82,7 @@ public class SDK_GateWay_User_Test_ { CLIENT_CERT = new BlockchainKeypair(SDK_GateWay_KeyPair_Para.pubKey0, SDK_GateWay_KeyPair_Para.privkey0); GATEWAY_IPADDR = "127.0.0.1"; - GATEWAY_PORT = 8081; + GATEWAY_PORT = 10300; SECURE = false; GatewayServiceFactory serviceFactory = GatewayServiceFactory.connect(GATEWAY_IPADDR, GATEWAY_PORT, SECURE, CLIENT_CERT);