Browse Source

solve transaction and block rollback exception!

tags/1.1.2^2^2
zhangshuang 5 years ago
parent
commit
85ac5d2336
6 changed files with 291 additions and 17 deletions
  1. +32
    -16
      source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java
  2. +5
    -1
      source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/LedgerTransactionalEditor.java
  3. +114
    -0
      source/sdk/sdk-samples/src/test/java/test/com/jd/blockchain/sdk/test/SDK_GateWay_Tx_RollBack_Test_.java
  4. +5
    -0
      source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/utils/MemoryKVStorage.java
  5. +30
    -0
      source/test/test-integration/src/test/java/test/com/jd/blockchain/intgr/IntegrationBase.java
  6. +105
    -0
      source/test/test-integration/src/test/java/test/com/jd/blockchain/intgr/IntegrationBlockFullRollback.java

+ 32
- 16
source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java View File

@@ -402,16 +402,15 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
boolean isOK = true; boolean isOK = true;
TransactionState transactionState = TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK; TransactionState transactionState = TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK;


for (int i = 0; i < commands.length; i++) {
byte[] txContent = commands[i];
try {
try {
for (int i = 0; i < commands.length; i++) {
byte[] txContent = commands[i];
AsyncFuture<byte[]> asyncFuture = messageHandle.processOrdered(msgId++, txContent, realmName, batchId); AsyncFuture<byte[]> asyncFuture = messageHandle.processOrdered(msgId++, txContent, realmName, batchId);
asyncFutureLinkedList.add(asyncFuture); asyncFutureLinkedList.add(asyncFuture);
} catch (BlockRollbackException e) {
}
} catch (BlockRollbackException e) {
LOGGER.error("Error occurred while processing ordered messages! --" + e.getMessage(), e); LOGGER.error("Error occurred while processing ordered messages! --" + e.getMessage(), e);
isOK = false; isOK = false;
break;
}
} }


if (isOK) { if (isOK) {
@@ -448,22 +447,21 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
} }
} }


// Block full rollback responses, generated in pre compute phase, due to tx fail
public byte[] createAppResponse(byte[] command, TransactionState transactionState) { public byte[] createAppResponse(byte[] command, TransactionState transactionState) {
TransactionRequest txRequest = BinaryProtocol.decode(command); TransactionRequest txRequest = BinaryProtocol.decode(command);


TxResponseMessage resp = new TxResponseMessage(txRequest.getTransactionContent().getHash()); TxResponseMessage resp = new TxResponseMessage(txRequest.getTransactionContent().getHash());
// resp.setExecutionState(TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK);
resp.setExecutionState(transactionState); resp.setExecutionState(transactionState);


return BinaryProtocol.encode(resp, TransactionResponse.class); return BinaryProtocol.encode(resp, TransactionResponse.class);
} }


/**
*
* Consensus write phase will terminate, new block hash values are inconsistent, update batch messages execute state
*
*/
public List<byte[]> updateAppResponses(List<byte[]> asyncResponseLinkedList) {


//Pre compute block hash values are inconsistent, update batch messages to new state
public List<byte[]> preCompInconsistentAppResps(List<byte[]> asyncResponseLinkedList) {
List<byte[]> updatedResponses = new ArrayList<>(); List<byte[]> updatedResponses = new ArrayList<>();


for(int i = 0; i < asyncResponseLinkedList.size(); i++) { for(int i = 0; i < asyncResponseLinkedList.size(); i++) {
@@ -476,15 +474,33 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
return updatedResponses; return updatedResponses;
} }



//Consensus accept phase will terminate, pre compute commit exception occurs, update batch messages execute state to block full rollback
public List<byte[]> blockRollbackAppResps(List<byte[]> asyncResponseLinkedList) {
List<byte[]> updatedResponses = new ArrayList<>();

for(int i = 0; i < asyncResponseLinkedList.size(); i++) {
TransactionResponse txResponse = BinaryProtocol.decode(asyncResponseLinkedList.get(i));
TxResponseMessage resp = new TxResponseMessage(txResponse.getContentHash());
resp.setExecutionState(TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK);
updatedResponses.add(BinaryProtocol.encode(resp, TransactionResponse.class));
}

return updatedResponses;
}

/** /**
* *
* Decision has been made at the consensus stage, commit block * Decision has been made at the consensus stage, commit block
* *
*/ */
public void preComputeAppCommit(String batchId) { public void preComputeAppCommit(String batchId) {

messageHandle.commitBatch(realmName, batchId);

try {
messageHandle.commitBatch(realmName, batchId);
} catch (BlockRollbackException e) {
LOGGER.error("Error occurred while pre compute commit --" + e.getMessage(), e);
throw e;
}
} }


/** /**


+ 5
- 1
source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/LedgerTransactionalEditor.java View File

@@ -355,7 +355,11 @@ public class LedgerTransactionalEditor implements LedgerEditor {
throw new IllegalStateException("The current block is not ready yet!"); throw new IllegalStateException("The current block is not ready yet!");
} }


baseStorage.flush();
try {
baseStorage.flush();
} catch (Exception e) {
throw new BlockRollbackException(e.getMessage(), e);
}


committed = true; committed = true;
} }


+ 114
- 0
source/sdk/sdk-samples/src/test/java/test/com/jd/blockchain/sdk/test/SDK_GateWay_Tx_RollBack_Test_.java View File

@@ -0,0 +1,114 @@
package test.com.jd.blockchain.sdk.test;

import com.jd.blockchain.binaryproto.DataContractRegistry;
import com.jd.blockchain.crypto.*;
import com.jd.blockchain.ledger.*;
import com.jd.blockchain.sdk.BlockchainService;
import com.jd.blockchain.sdk.client.GatewayServiceFactory;
import com.jd.blockchain.transaction.TxResponseMessage;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

//Transaction rollback test example
public class SDK_GateWay_Tx_RollBack_Test_ {

private PrivKey privKey;
private PubKey pubKey;

private BlockchainKeypair CLIENT_CERT = null;

private String GATEWAY_IPADDR = null;

private int GATEWAY_PORT;

private boolean SECURE;

private BlockchainService service;

private BlockchainKeypair user;

private BlockchainKeypair dataAccount;

@Before
public void init() {

privKey = SDK_GateWay_KeyPair_Para.privkey1;
pubKey = SDK_GateWay_KeyPair_Para.pubKey1;

CLIENT_CERT = new BlockchainKeypair(SDK_GateWay_KeyPair_Para.pubKey0, SDK_GateWay_KeyPair_Para.privkey0);
GATEWAY_IPADDR = "127.0.0.1";
GATEWAY_PORT = 11000;
SECURE = false;
GatewayServiceFactory serviceFactory = GatewayServiceFactory.connect(GATEWAY_IPADDR, GATEWAY_PORT, SECURE,
CLIENT_CERT);
service = serviceFactory.getBlockchainService();

DataContractRegistry.register(TransactionContent.class);
DataContractRegistry.register(TransactionContentBody.class);
DataContractRegistry.register(TransactionRequest.class);
DataContractRegistry.register(NodeRequest.class);
DataContractRegistry.register(EndpointRequest.class);
DataContractRegistry.register(TransactionResponse.class);

user = BlockchainKeyGenerator.getInstance().generate();

dataAccount = BlockchainKeyGenerator.getInstance().generate();

}

@Test
public void failedTxRollback_Test() {

HashDigest[] ledgerHashs = service.getLedgerHashs();

//Construct the first transaction
TransactionTemplate txTemp = service.newTransaction(ledgerHashs[0]);

AsymmetricKeypair keyPair = new BlockchainKeypair(pubKey, privKey);

//Register user account
txTemp.users().register(user.getIdentity());

//Register data account
txTemp.dataAccounts().register(dataAccount.getIdentity());

String dataKey = "jd_code";
String dataVal = "www.jd.com";

// Construct error kv version
txTemp.dataAccount(dataAccount.getAddress()).setText(dataKey, dataVal, 1);

PreparedTransaction prepTx = txTemp.prepare();

prepTx.sign(keyPair);

//Commit transaction
TransactionResponse transactionResponse = prepTx.commit();

//The first transaction will rollback, due to version error
assertEquals(transactionResponse.getExecutionState().CODE, TransactionState.DATA_VERSION_CONFLICT.CODE);

//Construct the second transaction
TransactionTemplate txTemp1 = service.newTransaction(ledgerHashs[0]);

txTemp1.users().register(user.getIdentity());

txTemp1.dataAccounts().register(dataAccount.getIdentity());

txTemp1.dataAccount(dataAccount.getAddress()).setText(dataKey, dataVal, -1);

PreparedTransaction prepTx1 = txTemp1.prepare();

prepTx1.sign(keyPair);

TransactionResponse transactionResponse1 = prepTx1.commit();

//The second transaction success
assertTrue(transactionResponse1.isSuccess());

}

}

+ 5
- 0
source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/utils/MemoryKVStorage.java View File

@@ -12,6 +12,8 @@ import com.jd.blockchain.utils.io.BytesMap;


public class MemoryKVStorage implements ExPolicyKVStorage, VersioningKVStorage, KVStorageService, BytesMap<Bytes> { public class MemoryKVStorage implements ExPolicyKVStorage, VersioningKVStorage, KVStorageService, BytesMap<Bytes> {


public static boolean isBlockFullRollbackTest = false;

private ExistancePolicyKVStorageMap exStorage = new ExistancePolicyKVStorageMap(); private ExistancePolicyKVStorageMap exStorage = new ExistancePolicyKVStorageMap();
private VersioningKVStorageMap verStorage = new VersioningKVStorageMap(); private VersioningKVStorageMap verStorage = new VersioningKVStorageMap();


@@ -32,6 +34,9 @@ public class MemoryKVStorage implements ExPolicyKVStorage, VersioningKVStorage,
@Override @Override
public long set(Bytes key, byte[] value, long version) { public long set(Bytes key, byte[] value, long version) {
if (isBlockFullRollbackTest) {
return -1;
}
return verStorage.set(key, value, version); return verStorage.set(key, value, version);
} }




+ 30
- 0
source/test/test-integration/src/test/java/test/com/jd/blockchain/intgr/IntegrationBase.java View File

@@ -103,6 +103,36 @@ public class IntegrationBase {
return keyPairResponse; return keyPairResponse;
} }


public static KeyPairResponse testSDK_BlockFullRollBack(AsymmetricKeypair adminKey, HashDigest ledgerHash,
BlockchainService blockchainService) {

BlockchainKeypair user = BlockchainKeyGenerator.getInstance().generate();

TransactionTemplate txTpl = blockchainService.newTransaction(ledgerHash);

//Register user account
txTpl.users().register(user.getIdentity());

PreparedTransaction prepTx = txTpl.prepare();

HashDigest transactionHash = prepTx.getHash();

prepTx.sign(adminKey);

//Commit transaction
TransactionResponse transactionResponse = prepTx.commit();

//The whole block will rollback, due to storage error
assertEquals(transactionResponse.getExecutionState().CODE, TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK.CODE);

KeyPairResponse keyPairResponse = new KeyPairResponse();
keyPairResponse.keyPair = user;
keyPairResponse.txResp = transactionResponse;
keyPairResponse.txHash = transactionHash;
return keyPairResponse;
}


public static KeyPairResponse testSDK_RegisterDataAccount(AsymmetricKeypair adminKey, HashDigest ledgerHash, public static KeyPairResponse testSDK_RegisterDataAccount(AsymmetricKeypair adminKey, HashDigest ledgerHash,
BlockchainService blockchainService) { BlockchainService blockchainService) {
// 注册数据账户,并验证最终写入; // 注册数据账户,并验证最终写入;


+ 105
- 0
source/test/test-integration/src/test/java/test/com/jd/blockchain/intgr/IntegrationBlockFullRollback.java View File

@@ -0,0 +1,105 @@
package test.com.jd.blockchain.intgr;

import com.jd.blockchain.crypto.*;
import com.jd.blockchain.gateway.GatewayConfigProperties;
import com.jd.blockchain.ledger.core.LedgerQuery;
import com.jd.blockchain.sdk.BlockchainService;
import com.jd.blockchain.sdk.client.GatewayServiceFactory;
import com.jd.blockchain.storage.service.DbConnectionFactory;
import com.jd.blockchain.storage.service.utils.MemoryKVStorage;
import com.jd.blockchain.tools.initializer.LedgerBindingConfig;
import com.jd.blockchain.utils.concurrent.ThreadInvoker;
import org.junit.Test;
import test.com.jd.blockchain.intgr.initializer.LedgerInitializeTest;
import test.com.jd.blockchain.intgr.initializer.LedgerInitializeWeb4Nodes;

import static test.com.jd.blockchain.intgr.IntegrationBase.*;

public class IntegrationBlockFullRollback {

private static final String DB_TYPE_MEM = "mem";

@Test
public void test4Memory() {
test(LedgerInitConsensusConfig.bftsmartProvider, DB_TYPE_MEM, LedgerInitConsensusConfig.memConnectionStrings);
}

public void test(String[] providers, String dbType, String[] dbConnections) {

// 内存账本初始化
HashDigest ledgerHash = initLedger(dbConnections);

// 启动Peer节点
PeerTestRunner[] peerNodes = peerNodeStart(ledgerHash, dbType);

DbConnectionFactory dbConnectionFactory0 = peerNodes[0].getDBConnectionFactory();
DbConnectionFactory dbConnectionFactory1 = peerNodes[1].getDBConnectionFactory();
DbConnectionFactory dbConnectionFactory2 = peerNodes[2].getDBConnectionFactory();
DbConnectionFactory dbConnectionFactory3 = peerNodes[3].getDBConnectionFactory();

String encodedBase58Pwd = KeyGenUtils.encodePasswordAsBase58(LedgerInitializeTest.PASSWORD);

GatewayConfigProperties.KeyPairConfig gwkey0 = new GatewayConfigProperties.KeyPairConfig();
gwkey0.setPubKeyValue(IntegrationBase.PUB_KEYS[0]);
gwkey0.setPrivKeyValue(IntegrationBase.PRIV_KEYS[0]);
gwkey0.setPrivKeyPassword(encodedBase58Pwd);
GatewayTestRunner gateway = new GatewayTestRunner("127.0.0.1", 11000, gwkey0,
peerNodes[0].getServiceAddress(), providers,null);

ThreadInvoker.AsyncCallback<Object> gwStarting = gateway.start();

gwStarting.waitReturn();

// 执行测试用例之前,校验每个节点的一致性;
LedgerQuery[] ledgers = buildLedgers(new LedgerBindingConfig[]{
peerNodes[0].getLedgerBindingConfig(),
peerNodes[1].getLedgerBindingConfig(),
peerNodes[2].getLedgerBindingConfig(),
peerNodes[3].getLedgerBindingConfig(),
},
new DbConnectionFactory[]{
dbConnectionFactory0,
dbConnectionFactory1,
dbConnectionFactory2,
dbConnectionFactory3});

IntegrationBase.testConsistencyAmongNodes(ledgers);

LedgerQuery ledgerRepository = ledgers[0];

GatewayServiceFactory gwsrvFact = GatewayServiceFactory.connect(gateway.getServiceAddress());

PrivKey privkey0 = KeyGenUtils.decodePrivKeyWithRawPassword(IntegrationBase.PRIV_KEYS[0], IntegrationBase.PASSWORD);

PubKey pubKey0 = KeyGenUtils.decodePubKey(IntegrationBase.PUB_KEYS[0]);

AsymmetricKeypair adminKey = new AsymmetricKeypair(pubKey0, privkey0);

BlockchainService blockchainService = gwsrvFact.getBlockchainService();

//Block commit will be fail, due to storage error
MemoryKVStorage.isBlockFullRollbackTest = true;

IntegrationBase.KeyPairResponse userResponse = IntegrationBase.testSDK_BlockFullRollBack(adminKey, ledgerHash, blockchainService);

//Block commit will be normal
MemoryKVStorage.isBlockFullRollbackTest = false;

IntegrationBase.KeyPairResponse userResponse1 = IntegrationBase.testSDK_RegisterUser(adminKey, ledgerHash, blockchainService);

try {
System.out.println("----------------- Init Completed -----------------");
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}

IntegrationBase.testConsistencyAmongNodes(ledgers);
}
private HashDigest initLedger(String[] dbConnections) {
LedgerInitializeWeb4Nodes ledgerInit = new LedgerInitializeWeb4Nodes();
HashDigest ledgerHash = ledgerInit.testInitWith4Nodes(LedgerInitConsensusConfig.bftsmartConfig, dbConnections);
System.out.printf("LedgerHash = %s \r\n", ledgerHash.toBase58());
return ledgerHash;
}
}

Loading…
Cancel
Save