Browse Source

solve ledger inconsistency problem

tags/1.1.0
zhangshuang 5 years ago
parent
commit
f60c63e20b
8 changed files with 394 additions and 62 deletions
  1. +207
    -59
      source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java
  2. +2
    -2
      source/contract/contract-samples/pom.xml
  3. +11
    -0
      source/contract/contract-samples/src/main/java/com/jd/blockchain/contract/RandomContract.java
  4. +43
    -0
      source/contract/contract-samples/src/main/java/com/jd/blockchain/contract/RandomContractImpl.java
  5. +7
    -0
      source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/TransactionState.java
  6. +3
    -0
      source/peer/src/main/java/com/jd/blockchain/peer/consensus/ConsensusMessageDispatcher.java
  7. +120
    -0
      source/sdk/sdk-samples/src/main/java/com/jd/blockchain/sdk/samples/SDK_Contract_Random_Demo.java
  8. +1
    -1
      source/storage/storage-redis/src/main/java/com/jd/blockchain/storage/service/impl/redis/JedisConnection.java

+ 207
- 59
source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java View File

@@ -5,7 +5,13 @@ import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import bftsmart.consensus.app.BlockResultImpl;
import bftsmart.tom.*;
import com.jd.blockchain.binaryproto.BinaryProtocol;
import com.jd.blockchain.consensus.service.*;
import com.jd.blockchain.ledger.*;
import com.jd.blockchain.transaction.TxResponseMessage;
import com.jd.blockchain.utils.serialize.binary.BinarySerializeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -15,18 +21,11 @@ import com.jd.blockchain.consensus.bftsmart.BftsmartConsensusProvider;
import com.jd.blockchain.consensus.bftsmart.BftsmartConsensusSettings;
import com.jd.blockchain.consensus.bftsmart.BftsmartNodeSettings;
import com.jd.blockchain.consensus.bftsmart.BftsmartTopology;
import com.jd.blockchain.consensus.service.MessageHandle;
import com.jd.blockchain.consensus.service.NodeServer;
import com.jd.blockchain.consensus.service.ServerSettings;
import com.jd.blockchain.consensus.service.StateHandle;
import com.jd.blockchain.consensus.service.StateMachineReplicate;
import com.jd.blockchain.ledger.TransactionState;
import com.jd.blockchain.utils.PropertiesUtils;
import com.jd.blockchain.utils.concurrent.AsyncFuture;
import com.jd.blockchain.utils.io.BytesUtils;
import bftsmart.reconfiguration.util.HostsConfig;
import bftsmart.reconfiguration.util.TOMConfiguration;
import bftsmart.tom.core.messages.TOMMessage;
import bftsmart.tom.server.defaultservices.DefaultRecoverable;

public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer {
@@ -187,94 +186,243 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
return messageHandle.processUnordered(bytes).get();
}

@Override
public byte[][] appExecuteBatch(byte[][] commands, MessageContext[] msgCtxs, boolean fromConsensus) {
return appExecuteBatch(commands, msgCtxs, fromConsensus, null);
/**
*
* Only block, no reply, used by state transfer when peer start
*
*/
private void block(List<byte[]> manageConsensusCmds) {

String batchId = messageHandle.beginBatch(realmName);
try {
int msgId = 0;
for (byte[] txContent : manageConsensusCmds) {
AsyncFuture<byte[]> asyncFuture = messageHandle.processOrdered(msgId++, txContent, realmName, batchId);
}
messageHandle.completeBatch(realmName, batchId);
messageHandle.commitBatch(realmName, batchId);
} catch (Exception e) {
// todo 需要处理应答码 404
LOGGER.error("Error occurred while processing ordered messages! --" + e.getMessage(), e);
messageHandle.rollbackBatch(realmName, batchId, TransactionState.CONSENSUS_ERROR.CODE);
}

}

@Override
public byte[][] appExecuteBatch(byte[][] commands, MessageContext[] msgCtxs, boolean fromConsensus, List<ReplyContextMessage> replyList) {
/**
*
* Local peer has cid diff with remote peer, used by state transfer when peer start
*
*/
private byte[][] appExecuteDiffBatch(byte[][] commands, MessageContext[] msgCtxs) {

if (replyList == null || replyList.size() == 0) {
throw new IllegalArgumentException();
}
// todo 此部分需要重新改造
/**
* 默认BFTSmart接口提供的commands是一个或多个共识结果的顺序集合
* 根据共识的规定,目前的做法是将其根据msgCtxs的内容进行分组,每组都作为一个结块标识来处理
* 从msgCtxs可以获取对应commands的分组情况
*/
int manageConsensusId = msgCtxs[0].getConsensusId();
List<byte[]> manageConsensusCmds = new ArrayList<>();
List<ReplyContextMessage> manageReplyMsgs = new ArrayList<>();

int index = 0;
for (MessageContext msgCtx : msgCtxs) {
if (msgCtx.getConsensusId() == manageConsensusId) {
manageConsensusCmds.add(commands[index]);
manageReplyMsgs.add(replyList.get(index));
} else {
// 达到结块标准,需要进行结块并应答
blockAndReply(manageConsensusCmds, manageReplyMsgs);
block(manageConsensusCmds);
// 重置链表和共识ID
manageConsensusCmds = new ArrayList<>();
manageReplyMsgs = new ArrayList<>();
manageConsensusId = msgCtx.getConsensusId();
manageConsensusCmds.add(commands[index]);
manageReplyMsgs.add(replyList.get(index));
}
index++;
}
// 结束时,肯定有最后一个结块请求未处理
if (!manageConsensusCmds.isEmpty()) {
blockAndReply(manageConsensusCmds, manageReplyMsgs);
block(manageConsensusCmds);
}
return null;

}

/**
*
* Invoked by state transfer when peer start
*
*/
@Override
public byte[][] appExecuteBatch(byte[][] commands, MessageContext[] msgCtxs, boolean fromConsensus) {

// Not from consensus outcomes, from state transfer
if (!fromConsensus) {
return appExecuteDiffBatch(commands, msgCtxs);
}

return null;
}

/**
*
* From consensus outcomes, do nothing now
* The operation of executing the batch was moved to the consensus stage 2 and 3, in order to guaranteed ledger consistency
*/
@Override
public byte[][] appExecuteBatch(byte[][] commands, MessageContext[] msgCtxs, boolean fromConsensus, List<ReplyContextMessage> replyList) {

// if (replyList == null || replyList.size() == 0) {
// throw new IllegalArgumentException();
// }
// // todo 此部分需要重新改造
// /**
// * 默认BFTSmart接口提供的commands是一个或多个共识结果的顺序集合
// * 根据共识的规定,目前的做法是将其根据msgCtxs的内容进行分组,每组都作为一个结块标识来处理
// * 从msgCtxs可以获取对应commands的分组情况
// */
// int manageConsensusId = msgCtxs[0].getConsensusId();
// List<byte[]> manageConsensusCmds = new ArrayList<>();
// List<ReplyContextMessage> manageReplyMsgs = new ArrayList<>();
//
// int index = 0;
// for (MessageContext msgCtx : msgCtxs) {
// if (msgCtx.getConsensusId() == manageConsensusId) {
// manageConsensusCmds.add(commands[index]);
// manageReplyMsgs.add(replyList.get(index));
// } else {
// // 达到结块标准,需要进行结块并应答
// blockAndReply(manageConsensusCmds, manageReplyMsgs);
// // 重置链表和共识ID
// manageConsensusCmds = new ArrayList<>();
// manageReplyMsgs = new ArrayList<>();
// manageConsensusId = msgCtx.getConsensusId();
// manageConsensusCmds.add(commands[index]);
// manageReplyMsgs.add(replyList.get(index));
// }
// index++;
// }
// // 结束时,肯定有最后一个结块请求未处理
// if (!manageConsensusCmds.isEmpty()) {
// blockAndReply(manageConsensusCmds, manageReplyMsgs);
// }
return null;
}

/**
*
* Block and reply are moved to consensus completion stage
*
*/
private void blockAndReply(List<byte[]> manageConsensusCmds, List<ReplyContextMessage> replyList) {
// consensusBatchId = messageHandle.beginBatch(realmName);
// List<AsyncFuture<byte[]>> asyncFutureLinkedList = new ArrayList<>(manageConsensusCmds.size());
// try {
// int msgId = 0;
// for (byte[] txContent : manageConsensusCmds) {
// AsyncFuture<byte[]> asyncFuture = messageHandle.processOrdered(msgId++, txContent, realmName, consensusBatchId);
// asyncFutureLinkedList.add(asyncFuture);
// }
// messageHandle.completeBatch(realmName, consensusBatchId);
// messageHandle.commitBatch(realmName, consensusBatchId);
// } catch (Exception e) {
// // todo 需要处理应答码 404
// LOGGER.error("Error occurred while processing ordered messages! --" + e.getMessage(), e);
// messageHandle.rollbackBatch(realmName, consensusBatchId, TransactionState.CONSENSUS_ERROR.CODE);
// }
//
// // 通知线程单独处理应答
// notifyReplyExecutors.execute(() -> {
// // 应答对应的结果
// int replyIndex = 0;
// for(ReplyContextMessage msg : replyList) {
// msg.setReply(asyncFutureLinkedList.get(replyIndex).get());
// TOMMessage request = msg.getTomMessage();
// ReplyContext replyContext = msg.getReplyContext();
// request.reply = new TOMMessage(replyContext.getId(), request.getSession(), request.getSequence(),
// request.getOperationId(), msg.getReply(), replyContext.getCurrentViewId(),
// request.getReqType());
//
// if (replyContext.getNumRepliers() > 0) {
// bftsmart.tom.util.Logger.println("(ServiceReplica.receiveMessages) sending reply to "
// + request.getSender() + " with sequence number " + request.getSequence()
// + " and operation ID " + request.getOperationId() + " via ReplyManager");
// replyContext.getRepMan().send(request);
// } else {
// bftsmart.tom.util.Logger.println("(ServiceReplica.receiveMessages) sending reply to "
// + request.getSender() + " with sequence number " + request.getSequence()
// + " and operation ID " + request.getOperationId());
// replyContext.getReplier().manageReply(request, msg.getMessageContext());
// }
// replyIndex++;
// }
// });
}

/**
*
* Used by consensus write phase, pre compute new block hash
*
*/
public BlockResultImpl preComputeBlockHash(byte[][] commands) {
String batchId = messageHandle.beginBatch(realmName);
List<AsyncFuture<byte[]>> asyncFutureLinkedList = new ArrayList<>(manageConsensusCmds.size());
List<AsyncFuture<byte[]>> asyncFutureLinkedList = new ArrayList<>(commands.length);
List<byte[]> responseLinkedList = new ArrayList<>();
try {
int msgId = 0;
for (byte[] txContent : manageConsensusCmds) {
for (byte[] txContent : commands) {
AsyncFuture<byte[]> asyncFuture = messageHandle.processOrdered(msgId++, txContent, realmName, batchId);
asyncFutureLinkedList.add(asyncFuture);
}
messageHandle.completeBatch(realmName, batchId);
messageHandle.commitBatch(realmName, batchId);
StateSnapshot stateSnapshot = messageHandle.completeBatch(realmName, batchId);
byte[] blockHashBytes = stateSnapshot.getSnapshot();

for (int i = 0; i< asyncFutureLinkedList.size(); i++) {
responseLinkedList.add(asyncFutureLinkedList.get(i).get());
}


return new BlockResultImpl(responseLinkedList, blockHashBytes, batchId);

} catch (Exception e) {
// todo 需要处理应答码 404
LOGGER.error("Error occurred while processing ordered messages! --" + e.getMessage(), e);
messageHandle.rollbackBatch(realmName, batchId, TransactionState.CONSENSUS_ERROR.CODE);
LOGGER.error("Error occurred while processing ordered messages! --" + e.getMessage(), e);
messageHandle.rollbackBatch(realmName, batchId, TransactionState.IGNORED_BY_CONSENSUS_PHASE_PRECOMPUTE_ROLLBACK.CODE);
}

// 通知线程单独处理应答
notifyReplyExecutors.execute(() -> {
// 应答对应的结果
int replyIndex = 0;
for(ReplyContextMessage msg : replyList) {
msg.setReply(asyncFutureLinkedList.get(replyIndex).get());
TOMMessage request = msg.getTomMessage();
ReplyContext replyContext = msg.getReplyContext();
request.reply = new TOMMessage(replyContext.getId(), request.getSession(), request.getSequence(),
request.getOperationId(), msg.getReply(), replyContext.getCurrentViewId(),
request.getReqType());

if (replyContext.getNumRepliers() > 0) {
bftsmart.tom.util.Logger.println("(ServiceReplica.receiveMessages) sending reply to "
+ request.getSender() + " with sequence number " + request.getSequence()
+ " and operation ID " + request.getOperationId() + " via ReplyManager");
replyContext.getRepMan().send(request);
} else {
bftsmart.tom.util.Logger.println("(ServiceReplica.receiveMessages) sending reply to "
+ request.getSender() + " with sequence number " + request.getSequence()
+ " and operation ID " + request.getOperationId());
replyContext.getReplier().manageReply(request, msg.getMessageContext());
}
replyIndex++;
}
});
return null;
}

/**
*
* Consensus write phase will terminate, new block hash values are inconsistent, update batch messages execute state
*
*/
public List<byte[]> updateBatchResponses(List<byte[]> asyncResponseLinkedList) {
List<byte[]> updatedResponses = new ArrayList<>();

for(int i = 0; i < asyncResponseLinkedList.size(); i++) {
TransactionResponse txResponse = BinaryProtocol.decode(asyncResponseLinkedList.get(i));
TxResponseMessage resp = new TxResponseMessage(txResponse.getContentHash());
resp.setExecutionState(TransactionState.IGNORED_BY_CONSENSUS_PHASE_PRECOMPUTE_ROLLBACK);
updatedResponses.add(BinaryProtocol.encode(resp, TransactionResponse.class));
}

return updatedResponses;
}

/**
*
* Decision has been made at the consensus stage, commit block
*
*/
public void preComputeBlockCommit(String batchId) {

messageHandle.commitBatch(realmName, batchId);

}

/**
*
* Consensus write phase will terminate, new block hash values are inconsistent, rollback block
*
*/
public void preComputeBlockRollback(String batchId) {
messageHandle.rollbackBatch(realmName, batchId, TransactionState.IGNORED_BY_CONSENSUS_PHASE_PRECOMPUTE_ROLLBACK.CODE);
LOGGER.debug("Rollback of operations that cause inconsistencies in the ledger");
}

//notice


+ 2
- 2
source/contract/contract-samples/pom.xml View File

@@ -34,11 +34,11 @@
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<finalName>transfer</finalName>
<finalName>random</finalName>
<appendAssemblyId>false</appendAssemblyId>
<archive>
<manifest>
<mainClass>com.jd.blockchain.contract.TransferContractImpl</mainClass>
<mainClass>com.jd.blockchain.contract.RandomContractImpl</mainClass>
</manifest>
</archive>
<descriptorRefs>


+ 11
- 0
source/contract/contract-samples/src/main/java/com/jd/blockchain/contract/RandomContract.java View File

@@ -0,0 +1,11 @@
package com.jd.blockchain.contract;

@Contract
public interface RandomContract {

@ContractEvent(name = "random-put")
void put(String address, String key, String value);

@ContractEvent(name = "random-putAndGet")
String putAndGet(String address, String key, String value);
}

+ 43
- 0
source/contract/contract-samples/src/main/java/com/jd/blockchain/contract/RandomContractImpl.java View File

@@ -0,0 +1,43 @@
package com.jd.blockchain.contract;

import com.jd.blockchain.crypto.HashDigest;

import java.util.Random;

public class RandomContractImpl implements RandomContract, EventProcessingAware {

private static final Random RANDOM_TIME = new Random();

private ContractEventContext eventContext;

private HashDigest ledgerHash;

@Override
public void beforeEvent(ContractEventContext eventContext) {
this.eventContext = eventContext;
this.ledgerHash = eventContext.getCurrentLedgerHash();
}

@Override
public void postEvent(ContractEventContext eventContext, Exception error) {

}

@Override
public void put(String address, String key, String value) {

String saveVal = value + "-" + RANDOM_TIME.nextInt(1024);

eventContext.getLedger().dataAccount(address).setText(key, saveVal, -1L);
}

@Override
public String putAndGet(String address, String key, String value) {

String saveVal = value + "-" + RANDOM_TIME.nextInt(1024);

eventContext.getLedger().dataAccount(address).setText(key, saveVal, -1L);

return address;
}
}

+ 7
- 0
source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/TransactionState.java View File

@@ -73,6 +73,13 @@ public enum TransactionState {
*/
IGNORED_BY_BLOCK_FULL_ROLLBACK((byte) 0x44),
/**
*
* 共识阶段加入新区块哈希预计算功能, 如果来自其他Peer的新区块哈希值不一致,本批次整体回滚
*
*/
IGNORED_BY_CONSENSUS_PHASE_PRECOMPUTE_ROLLBACK((byte) 0x45),
/**
* 系统错误;
*/


+ 3
- 0
source/peer/src/main/java/com/jd/blockchain/peer/consensus/ConsensusMessageDispatcher.java View File

@@ -245,6 +245,9 @@ public class ConsensusMessageDispatcher implements MessageHandle {
realmLock.lock();
try {
batchResultHandle.cancel(TransactionState.valueOf((byte)reasonCode));
currBatchId = null;
txResponseMap = null;
txBatchProcess = null;
} finally {
realmLock.unlock();
}


+ 120
- 0
source/sdk/sdk-samples/src/main/java/com/jd/blockchain/sdk/samples/SDK_Contract_Random_Demo.java View File

@@ -0,0 +1,120 @@
package com.jd.blockchain.sdk.samples;

import com.jd.blockchain.contract.RandomContract;
import com.jd.blockchain.contract.TransferContract;
import com.jd.blockchain.ledger.*;
import com.jd.blockchain.transaction.GenericValueHolder;
import com.jd.blockchain.transaction.LongValueHolder;
import com.jd.blockchain.utils.Bytes;

import java.util.Random;

import static com.jd.blockchain.sdk.samples.SDKDemo_Constant.readChainCodes;
import static com.jd.blockchain.transaction.ContractReturnValue.decode;

public class SDK_Contract_Random_Demo extends SDK_Base_Demo {

public static void main(String[] args) throws Exception {
new SDK_Contract_Random_Demo().executeContract();
}

public void executeContract() throws Exception {

// 发布jar包
// 定义交易模板
TransactionTemplate txTpl = blockchainService.newTransaction(ledgerHash);

// 将jar包转换为二进制数据
byte[] contractCode = readChainCodes("random.jar");

// 生成一个合约账号
BlockchainKeypair contractDeployKey = BlockchainKeyGenerator.getInstance().generate();

// 生成发布合约操作
txTpl.contracts().deploy(contractDeployKey.getIdentity(), contractCode);

// 生成预发布交易;
PreparedTransaction ptx = txTpl.prepare();

// 对交易进行签名
ptx.sign(adminKey);

// 提交并等待共识返回;
TransactionResponse txResp = ptx.commit();

// 获取合约地址
Bytes contractAddress = contractDeployKey.getAddress();

// 打印交易返回信息
System.out.printf("Tx[%s] -> BlockHeight = %s, BlockHash = %s, isSuccess = %s, ExecutionState = %s \r\n",
txResp.getContentHash().toBase58(), txResp.getBlockHeight(), txResp.getBlockHash().toBase58(),
txResp.isSuccess(), txResp.getExecutionState());

// 打印合约地址
System.out.printf("ContractAddress = %s \r\n", contractAddress.toBase58());

String result = create("LdeNzfhZd2qiBRk3YrEX6GZgiVRZJaf3MKJAY", "zhangshuang", "jingdong", contractAddress);


Thread.sleep(5000);
System.out.println(result);
}

private String readAll(String address, String account, Bytes contractAddress) {
TransactionTemplate txTpl = blockchainService.newTransaction(ledgerHash);
// 使用合约创建
TransferContract transferContract = txTpl.contract(contractAddress, TransferContract.class);
GenericValueHolder<String> result = decode(transferContract.readAll(address, account));
commit(txTpl);
return result.get();
}

private long readByContract(String address, String account, Bytes contractAddress) {
TransactionTemplate txTpl = blockchainService.newTransaction(ledgerHash);
// 使用合约创建
TransferContract transferContract = txTpl.contract(contractAddress, TransferContract.class);
LongValueHolder result = decode(transferContract.read(address, account));
commit(txTpl);
return result.get();
}

private long readByKvOperation(String address, String account) {
KVDataEntry[] kvDataEntries = blockchainService.getDataEntries(ledgerHash, address, account);
if (kvDataEntries == null || kvDataEntries.length == 0) {
throw new IllegalStateException(String.format("Ledger %s Service inner Error !!!", ledgerHash.toBase58()));
}
KVDataEntry kvDataEntry = kvDataEntries[0];
if (kvDataEntry.getVersion() == -1) {
return 0L;
}
return (long) (kvDataEntry.getValue());
}

private String transfer(String address, String from, String to, long money, Bytes contractAddress) {
TransactionTemplate txTpl = blockchainService.newTransaction(ledgerHash);
// 使用合约创建
TransferContract transferContract = txTpl.contract(contractAddress, TransferContract.class);
GenericValueHolder<String> result = decode(transferContract.transfer(address, from, to, money));
commit(txTpl);
return result.get();
}

private BlockchainKeypair createDataAccount() {
// 首先注册一个数据账户
BlockchainKeypair newDataAccount = BlockchainKeyGenerator.getInstance().generate();

TransactionTemplate txTpl = blockchainService.newTransaction(ledgerHash);
txTpl.dataAccounts().register(newDataAccount.getIdentity());
commit(txTpl);
return newDataAccount;
}

private String create(String address, String account, String value, Bytes contractAddress) {
TransactionTemplate txTpl = blockchainService.newTransaction(ledgerHash);
// 使用合约创建
RandomContract randomContract = txTpl.contract(contractAddress, RandomContract.class);
GenericValueHolder<String> result = decode(randomContract.putAndGet(address, account, value));
commit(txTpl);
return result.get();
}
}

+ 1
- 1
source/storage/storage-redis/src/main/java/com/jd/blockchain/storage/service/impl/redis/JedisConnection.java View File

@@ -18,7 +18,7 @@ public class JedisConnection implements DbConnection {

@Override
public void close() {
jedisPool.close();
// jedisPool.close();
}

@Override


Loading…
Cancel
Save