diff --git a/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java b/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java index e3eddc68..bbecdb5c 100644 --- a/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java +++ b/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java @@ -402,16 +402,15 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer boolean isOK = true; TransactionState transactionState = TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK; - for (int i = 0; i < commands.length; i++) { - byte[] txContent = commands[i]; - try { + try { + for (int i = 0; i < commands.length; i++) { + byte[] txContent = commands[i]; AsyncFuture asyncFuture = messageHandle.processOrdered(msgId++, txContent, realmName, batchId); asyncFutureLinkedList.add(asyncFuture); - } catch (BlockRollbackException e) { + } + } catch (BlockRollbackException e) { LOGGER.error("Error occurred while processing ordered messages! --" + e.getMessage(), e); isOK = false; - break; - } } if (isOK) { @@ -448,22 +447,21 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer } } + // Block full rollback responses, generated in pre compute phase, due to tx fail public byte[] createAppResponse(byte[] command, TransactionState transactionState) { TransactionRequest txRequest = BinaryProtocol.decode(command); TxResponseMessage resp = new TxResponseMessage(txRequest.getTransactionContent().getHash()); -// resp.setExecutionState(TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK); + resp.setExecutionState(transactionState); return BinaryProtocol.encode(resp, TransactionResponse.class); } - /** - * - * Consensus write phase will terminate, new block hash values are inconsistent, update batch messages execute state - * - */ - public List updateAppResponses(List asyncResponseLinkedList) { + + + //Pre compute block hash values are inconsistent, update batch messages to new state + public List preCompInconsistentAppResps(List asyncResponseLinkedList) { List updatedResponses = new ArrayList<>(); for(int i = 0; i < asyncResponseLinkedList.size(); i++) { @@ -476,15 +474,33 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer return updatedResponses; } + + //Consensus accept phase will terminate, pre compute commit exception occurs, update batch messages execute state to block full rollback + public List blockRollbackAppResps(List asyncResponseLinkedList) { + List updatedResponses = new ArrayList<>(); + + for(int i = 0; i < asyncResponseLinkedList.size(); i++) { + TransactionResponse txResponse = BinaryProtocol.decode(asyncResponseLinkedList.get(i)); + TxResponseMessage resp = new TxResponseMessage(txResponse.getContentHash()); + resp.setExecutionState(TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK); + updatedResponses.add(BinaryProtocol.encode(resp, TransactionResponse.class)); + } + + return updatedResponses; + } + /** * * Decision has been made at the consensus stage, commit block * */ public void preComputeAppCommit(String batchId) { - - messageHandle.commitBatch(realmName, batchId); - + try { + messageHandle.commitBatch(realmName, batchId); + } catch (BlockRollbackException e) { + LOGGER.error("Error occurred while pre compute commit --" + e.getMessage(), e); + throw e; + } } /** diff --git a/source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/LedgerTransactionalEditor.java b/source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/LedgerTransactionalEditor.java index ea7bb7ed..15954e4f 100644 --- a/source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/LedgerTransactionalEditor.java +++ b/source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/LedgerTransactionalEditor.java @@ -355,7 +355,11 @@ public class LedgerTransactionalEditor implements LedgerEditor { throw new IllegalStateException("The current block is not ready yet!"); } - baseStorage.flush(); + try { + baseStorage.flush(); + } catch (Exception e) { + throw new BlockRollbackException(e.getMessage(), e); + } committed = true; } diff --git a/source/sdk/sdk-samples/src/test/java/test/com/jd/blockchain/sdk/test/SDK_GateWay_Tx_RollBack_Test_.java b/source/sdk/sdk-samples/src/test/java/test/com/jd/blockchain/sdk/test/SDK_GateWay_Tx_RollBack_Test_.java new file mode 100644 index 00000000..4c8c233b --- /dev/null +++ b/source/sdk/sdk-samples/src/test/java/test/com/jd/blockchain/sdk/test/SDK_GateWay_Tx_RollBack_Test_.java @@ -0,0 +1,114 @@ +package test.com.jd.blockchain.sdk.test; + +import com.jd.blockchain.binaryproto.DataContractRegistry; +import com.jd.blockchain.crypto.*; +import com.jd.blockchain.ledger.*; +import com.jd.blockchain.sdk.BlockchainService; +import com.jd.blockchain.sdk.client.GatewayServiceFactory; +import com.jd.blockchain.transaction.TxResponseMessage; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +//Transaction rollback test example +public class SDK_GateWay_Tx_RollBack_Test_ { + + private PrivKey privKey; + private PubKey pubKey; + + private BlockchainKeypair CLIENT_CERT = null; + + private String GATEWAY_IPADDR = null; + + private int GATEWAY_PORT; + + private boolean SECURE; + + private BlockchainService service; + + private BlockchainKeypair user; + + private BlockchainKeypair dataAccount; + + @Before + public void init() { + + privKey = SDK_GateWay_KeyPair_Para.privkey1; + pubKey = SDK_GateWay_KeyPair_Para.pubKey1; + + CLIENT_CERT = new BlockchainKeypair(SDK_GateWay_KeyPair_Para.pubKey0, SDK_GateWay_KeyPair_Para.privkey0); + GATEWAY_IPADDR = "127.0.0.1"; + GATEWAY_PORT = 11000; + SECURE = false; + GatewayServiceFactory serviceFactory = GatewayServiceFactory.connect(GATEWAY_IPADDR, GATEWAY_PORT, SECURE, + CLIENT_CERT); + service = serviceFactory.getBlockchainService(); + + DataContractRegistry.register(TransactionContent.class); + DataContractRegistry.register(TransactionContentBody.class); + DataContractRegistry.register(TransactionRequest.class); + DataContractRegistry.register(NodeRequest.class); + DataContractRegistry.register(EndpointRequest.class); + DataContractRegistry.register(TransactionResponse.class); + + user = BlockchainKeyGenerator.getInstance().generate(); + + dataAccount = BlockchainKeyGenerator.getInstance().generate(); + + } + + @Test + public void failedTxRollback_Test() { + + HashDigest[] ledgerHashs = service.getLedgerHashs(); + + //Construct the first transaction + TransactionTemplate txTemp = service.newTransaction(ledgerHashs[0]); + + AsymmetricKeypair keyPair = new BlockchainKeypair(pubKey, privKey); + + //Register user account + txTemp.users().register(user.getIdentity()); + + //Register data account + txTemp.dataAccounts().register(dataAccount.getIdentity()); + + String dataKey = "jd_code"; + String dataVal = "www.jd.com"; + + // Construct error kv version + txTemp.dataAccount(dataAccount.getAddress()).setText(dataKey, dataVal, 1); + + PreparedTransaction prepTx = txTemp.prepare(); + + prepTx.sign(keyPair); + + //Commit transaction + TransactionResponse transactionResponse = prepTx.commit(); + + //The first transaction will rollback, due to version error + assertEquals(transactionResponse.getExecutionState().CODE, TransactionState.DATA_VERSION_CONFLICT.CODE); + + //Construct the second transaction + TransactionTemplate txTemp1 = service.newTransaction(ledgerHashs[0]); + + txTemp1.users().register(user.getIdentity()); + + txTemp1.dataAccounts().register(dataAccount.getIdentity()); + + txTemp1.dataAccount(dataAccount.getAddress()).setText(dataKey, dataVal, -1); + + PreparedTransaction prepTx1 = txTemp1.prepare(); + + prepTx1.sign(keyPair); + + TransactionResponse transactionResponse1 = prepTx1.commit(); + + //The second transaction success + assertTrue(transactionResponse1.isSuccess()); + + } + +} diff --git a/source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/utils/MemoryKVStorage.java b/source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/utils/MemoryKVStorage.java index 0c2192a3..1cb202fa 100644 --- a/source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/utils/MemoryKVStorage.java +++ b/source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/utils/MemoryKVStorage.java @@ -12,6 +12,8 @@ import com.jd.blockchain.utils.io.BytesMap; public class MemoryKVStorage implements ExPolicyKVStorage, VersioningKVStorage, KVStorageService, BytesMap { + public static boolean isBlockFullRollbackTest = false; + private ExistancePolicyKVStorageMap exStorage = new ExistancePolicyKVStorageMap(); private VersioningKVStorageMap verStorage = new VersioningKVStorageMap(); @@ -32,6 +34,9 @@ public class MemoryKVStorage implements ExPolicyKVStorage, VersioningKVStorage, @Override public long set(Bytes key, byte[] value, long version) { + if (isBlockFullRollbackTest) { + return -1; + } return verStorage.set(key, value, version); } diff --git a/source/test/test-integration/src/test/java/test/com/jd/blockchain/intgr/IntegrationBase.java b/source/test/test-integration/src/test/java/test/com/jd/blockchain/intgr/IntegrationBase.java index 9c3b3320..19429524 100644 --- a/source/test/test-integration/src/test/java/test/com/jd/blockchain/intgr/IntegrationBase.java +++ b/source/test/test-integration/src/test/java/test/com/jd/blockchain/intgr/IntegrationBase.java @@ -103,6 +103,36 @@ public class IntegrationBase { return keyPairResponse; } + public static KeyPairResponse testSDK_BlockFullRollBack(AsymmetricKeypair adminKey, HashDigest ledgerHash, + BlockchainService blockchainService) { + + BlockchainKeypair user = BlockchainKeyGenerator.getInstance().generate(); + + TransactionTemplate txTpl = blockchainService.newTransaction(ledgerHash); + + //Register user account + txTpl.users().register(user.getIdentity()); + + PreparedTransaction prepTx = txTpl.prepare(); + + HashDigest transactionHash = prepTx.getHash(); + + prepTx.sign(adminKey); + + //Commit transaction + TransactionResponse transactionResponse = prepTx.commit(); + + //The whole block will rollback, due to storage error + assertEquals(transactionResponse.getExecutionState().CODE, TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK.CODE); + + KeyPairResponse keyPairResponse = new KeyPairResponse(); + keyPairResponse.keyPair = user; + keyPairResponse.txResp = transactionResponse; + keyPairResponse.txHash = transactionHash; + return keyPairResponse; + } + + public static KeyPairResponse testSDK_RegisterDataAccount(AsymmetricKeypair adminKey, HashDigest ledgerHash, BlockchainService blockchainService) { // 注册数据账户,并验证最终写入; diff --git a/source/test/test-integration/src/test/java/test/com/jd/blockchain/intgr/IntegrationBlockFullRollback.java b/source/test/test-integration/src/test/java/test/com/jd/blockchain/intgr/IntegrationBlockFullRollback.java new file mode 100644 index 00000000..b3a68c77 --- /dev/null +++ b/source/test/test-integration/src/test/java/test/com/jd/blockchain/intgr/IntegrationBlockFullRollback.java @@ -0,0 +1,105 @@ +package test.com.jd.blockchain.intgr; + +import com.jd.blockchain.crypto.*; +import com.jd.blockchain.gateway.GatewayConfigProperties; +import com.jd.blockchain.ledger.core.LedgerQuery; +import com.jd.blockchain.sdk.BlockchainService; +import com.jd.blockchain.sdk.client.GatewayServiceFactory; +import com.jd.blockchain.storage.service.DbConnectionFactory; +import com.jd.blockchain.storage.service.utils.MemoryKVStorage; +import com.jd.blockchain.tools.initializer.LedgerBindingConfig; +import com.jd.blockchain.utils.concurrent.ThreadInvoker; +import org.junit.Test; +import test.com.jd.blockchain.intgr.initializer.LedgerInitializeTest; +import test.com.jd.blockchain.intgr.initializer.LedgerInitializeWeb4Nodes; + +import static test.com.jd.blockchain.intgr.IntegrationBase.*; + +public class IntegrationBlockFullRollback { + + private static final String DB_TYPE_MEM = "mem"; + + @Test + public void test4Memory() { + test(LedgerInitConsensusConfig.bftsmartProvider, DB_TYPE_MEM, LedgerInitConsensusConfig.memConnectionStrings); + } + + public void test(String[] providers, String dbType, String[] dbConnections) { + + // 内存账本初始化 + HashDigest ledgerHash = initLedger(dbConnections); + + // 启动Peer节点 + PeerTestRunner[] peerNodes = peerNodeStart(ledgerHash, dbType); + + DbConnectionFactory dbConnectionFactory0 = peerNodes[0].getDBConnectionFactory(); + DbConnectionFactory dbConnectionFactory1 = peerNodes[1].getDBConnectionFactory(); + DbConnectionFactory dbConnectionFactory2 = peerNodes[2].getDBConnectionFactory(); + DbConnectionFactory dbConnectionFactory3 = peerNodes[3].getDBConnectionFactory(); + + String encodedBase58Pwd = KeyGenUtils.encodePasswordAsBase58(LedgerInitializeTest.PASSWORD); + + GatewayConfigProperties.KeyPairConfig gwkey0 = new GatewayConfigProperties.KeyPairConfig(); + gwkey0.setPubKeyValue(IntegrationBase.PUB_KEYS[0]); + gwkey0.setPrivKeyValue(IntegrationBase.PRIV_KEYS[0]); + gwkey0.setPrivKeyPassword(encodedBase58Pwd); + GatewayTestRunner gateway = new GatewayTestRunner("127.0.0.1", 11000, gwkey0, + peerNodes[0].getServiceAddress(), providers,null); + + ThreadInvoker.AsyncCallback gwStarting = gateway.start(); + + gwStarting.waitReturn(); + + // 执行测试用例之前,校验每个节点的一致性; + LedgerQuery[] ledgers = buildLedgers(new LedgerBindingConfig[]{ + peerNodes[0].getLedgerBindingConfig(), + peerNodes[1].getLedgerBindingConfig(), + peerNodes[2].getLedgerBindingConfig(), + peerNodes[3].getLedgerBindingConfig(), + }, + new DbConnectionFactory[]{ + dbConnectionFactory0, + dbConnectionFactory1, + dbConnectionFactory2, + dbConnectionFactory3}); + + IntegrationBase.testConsistencyAmongNodes(ledgers); + + LedgerQuery ledgerRepository = ledgers[0]; + + GatewayServiceFactory gwsrvFact = GatewayServiceFactory.connect(gateway.getServiceAddress()); + + PrivKey privkey0 = KeyGenUtils.decodePrivKeyWithRawPassword(IntegrationBase.PRIV_KEYS[0], IntegrationBase.PASSWORD); + + PubKey pubKey0 = KeyGenUtils.decodePubKey(IntegrationBase.PUB_KEYS[0]); + + AsymmetricKeypair adminKey = new AsymmetricKeypair(pubKey0, privkey0); + + BlockchainService blockchainService = gwsrvFact.getBlockchainService(); + + //Block commit will be fail, due to storage error + MemoryKVStorage.isBlockFullRollbackTest = true; + + IntegrationBase.KeyPairResponse userResponse = IntegrationBase.testSDK_BlockFullRollBack(adminKey, ledgerHash, blockchainService); + + //Block commit will be normal + MemoryKVStorage.isBlockFullRollbackTest = false; + + IntegrationBase.KeyPairResponse userResponse1 = IntegrationBase.testSDK_RegisterUser(adminKey, ledgerHash, blockchainService); + + try { + System.out.println("----------------- Init Completed -----------------"); + Thread.sleep(Integer.MAX_VALUE); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + IntegrationBase.testConsistencyAmongNodes(ledgers); + } + private HashDigest initLedger(String[] dbConnections) { + LedgerInitializeWeb4Nodes ledgerInit = new LedgerInitializeWeb4Nodes(); + HashDigest ledgerHash = ledgerInit.testInitWith4Nodes(LedgerInitConsensusConfig.bftsmartConfig, dbConnections); + System.out.printf("LedgerHash = %s \r\n", ledgerHash.toBase58()); + return ledgerHash; + } +}