From 0f5e585a2a497575bee1a25652206b71719fe321 Mon Sep 17 00:00:00 2001 From: shaozhuguang Date: Tue, 30 Jun 2020 15:20:53 +0800 Subject: [PATCH] modify exception of write repeat --- .../bftsmart/service/BftsmartNodeServer.java | 177 +++++++++++++----- .../main/resources/config/init/rocksdb.config | 4 +- .../peer/consensus/LedgerStateManager.java | 17 +- .../peer/web/ManagementController.java | 34 +++- source/pom.xml | 2 +- .../rocksdb/RocksDBConnectionFactory.java | 2 +- 6 files changed, 175 insertions(+), 61 deletions(-) diff --git a/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java b/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java index 701e6457..eff81047 100644 --- a/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java +++ b/source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java @@ -6,6 +6,8 @@ import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import bftsmart.consensus.app.BatchAppResultImpl; import bftsmart.reconfiguration.views.View; @@ -39,13 +41,10 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer private static Logger LOGGER = LoggerFactory.getLogger(BftsmartNodeServer.class); -// private static final String DEFAULT_BINDING_HOST = "0.0.0.0"; + private final Lock cmdHandleLock = new ReentrantLock(); private List stateHandles = new CopyOnWriteArrayList<>(); - // TODO 暂不处理队列溢出问题 - private ExecutorService notifyReplyExecutors = Executors.newSingleThreadExecutor(); - private volatile Status status = Status.STOPPED; private final Object mutex = new Object(); @@ -64,6 +63,8 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer private volatile BftsmartConsensusSettings setting; + private volatile InnerStateHolder stateHolder; + private TOMConfiguration tomConfig; private TOMConfiguration outerTomConfig; @@ -88,6 +89,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer this.realmName = serverSettings.getRealmName(); //used later this.stateMachineReplicate = stateMachineReplicate; + this.stateHolder = new InnerStateHolder(this.stateMachineReplicate.getLatestStateID(realmName)); this.messageHandle = messageHandler; createConfig(); serverId = findServerId(); @@ -116,11 +118,8 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer } protected void createConfig() { - setting = ((BftsmartServerSettings) serverSettings).getConsensusSettings(); - List configList = new ArrayList<>(); - NodeSettings[] nodeSettingsArray = setting.getNodes(); for (NodeSettings nodeSettings : nodeSettingsArray) { BftsmartNodeSettings node = (BftsmartNodeSettings)nodeSettings; @@ -129,10 +128,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer //create HostsConfig instance based on consensus realm nodes hostsConfig = new HostsConfig(configList.toArray(new HostsConfig.Config[configList.size()])); - systemConfig = PropertiesUtils.createProperties(setting.getSystemConfigs()); - - return; } protected void initConfig(int id, Properties systemsConfig, HostsConfig hostConfig) { @@ -212,6 +208,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer return status == Status.RUNNING; } + @Override public byte[] appExecuteUnordered(byte[] bytes, MessageContext messageContext) { return messageHandle.processUnordered(bytes).get(); } @@ -221,54 +218,51 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer * Only block, no reply, used by state transfer when peer start * */ - private void block(List manageConsensusCmds) { - + private void block(byte[][] manageConsensusCmds) { String batchId = messageHandle.beginBatch(realmName); try { int msgId = 0; for (byte[] txContent : manageConsensusCmds) { - AsyncFuture asyncFuture = messageHandle.processOrdered(msgId++, txContent, realmName, batchId); + messageHandle.processOrdered(msgId++, txContent, realmName, batchId); } messageHandle.completeBatch(realmName, batchId); messageHandle.commitBatch(realmName, batchId); } catch (Exception e) { - // todo 需要处理应答码 404 LOGGER.error("Error occurred while processing ordered messages! --" + e.getMessage(), e); messageHandle.rollbackBatch(realmName, batchId, TransactionState.CONSENSUS_ERROR.CODE); } - } /** - * * Local peer has cid diff with remote peer, used by state transfer when peer start + * 每个传入的commands都是一个确定的batch,不会出现跨batch或多batch的情况 * */ private byte[][] appExecuteDiffBatch(byte[][] commands, MessageContext[] msgCtxs) { - - int manageConsensusId = msgCtxs[0].getConsensusId(); - List manageConsensusCmds = new ArrayList<>(); - - int index = 0; - for (MessageContext msgCtx : msgCtxs) { - if (msgCtx.getConsensusId() == manageConsensusId) { - manageConsensusCmds.add(commands[index]); - } else { - // 达到结块标准,需要进行结块并应答 - block(manageConsensusCmds); - // 重置链表和共识ID - manageConsensusCmds = new ArrayList<>(); - manageConsensusId = msgCtx.getConsensusId(); - manageConsensusCmds.add(commands[index]); + cmdHandleLock.lock(); + try { + int currHandleCid = msgCtxs[0].getConsensusId(); // 同一批次,获取其中之一即可 + long lastCid = stateHolder.lastCid, currentCid = stateHolder.currentCid; + if (currHandleCid <= lastCid) { + // 表示该CID已经执行过,不再处理 + return null; + } else if (currHandleCid == lastCid + 1) { + // 有可能处理正在执行中的状态,需要判断是否新的开始执行 + if (currHandleCid == currentCid) { + // 表示在执行,那么将执行的结果回滚并重新执行 + String batchingID = stateHolder.batchingID; + messageHandle.rollbackBatch(realmName, batchingID, TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK.CODE); + } } - index++; - } - // 结束时,肯定有最后一个结块请求未处理 - if (!manageConsensusCmds.isEmpty()) { - block(manageConsensusCmds); + // 执行即可 + block(commands); + // 重置上轮ID + stateHolder.setLastCid(currHandleCid); + stateHolder.reset(); + } finally { + cmdHandleLock.unlock(); } return null; - } /** @@ -278,12 +272,11 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer */ @Override public byte[][] appExecuteBatch(byte[][] commands, MessageContext[] msgCtxs, boolean fromConsensus) { - // Not from consensus outcomes, from state transfer if (!fromConsensus) { + // 表示从状态传输接入 return appExecuteDiffBatch(commands, msgCtxs); } - return null; } @@ -385,7 +378,8 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer /** * Used by consensus write phase, pre compute new block hash */ - public BatchAppResultImpl preComputeAppHash(byte[][] commands) { + @Override + public BatchAppResultImpl preComputeAppHash(int cid, byte[][] commands) { List> asyncFutureLinkedList = new ArrayList<>(commands.length); List responseLinkedList = new ArrayList<>(); @@ -395,10 +389,23 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer BatchAppResultImpl result = null; String batchId = null; int msgId = 0; - + cmdHandleLock.lock(); try { - + long lastCid = stateHolder.lastCid, currentCid = stateHolder.currentCid; + if (cid <= lastCid) { + // 表示该CID已经执行过,不再处理 + return null; + } else if (cid == lastCid + 1) { + // 需要判断之前二阶段是否执行过 + if (cid == currentCid) { + // 表示二阶段已执行,回滚,重新执行 + String batchingID = stateHolder.batchingID; + messageHandle.rollbackBatch(realmName, batchingID, TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK.CODE); + } + } + stateHolder.currentCid = cid; batchId = messageHandle.beginBatch(realmName); + stateHolder.batchingID = batchId; genisStateSnapshot = messageHandle.getGenisStateSnapshot(realmName); preStateSnapshot = messageHandle.getStateSnapshot(realmName); @@ -436,6 +443,8 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer result = new BatchAppResultImpl(responseLinkedList,preStateSnapshot.getSnapshot(), batchId, genisStateSnapshot.getSnapshot()); result.setErrorCode((byte) 1); + } finally { + cmdHandleLock.unlock(); } return result; @@ -452,6 +461,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer return BinaryProtocol.encode(resp, TransactionResponse.class); } + @Override public List updateAppResponses(List asyncResponseLinkedList, byte[] commonHash, boolean isConsistent) { List updatedResponses = new ArrayList<>(); TxResponseMessage resp = null; @@ -474,12 +484,23 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer * Decision has been made at the consensus stage, commit block * */ - public void preComputeAppCommit(String batchId) { + @Override + public void preComputeAppCommit(int cid, String batchId) { + cmdHandleLock.lock(); try { + long lastCid = stateHolder.lastCid; + if (cid <= lastCid) { + // 表示该CID已经执行过,不再处理 + return; + } + stateHolder.setLastCid(cid); + stateHolder.reset(); messageHandle.commitBatch(realmName, batchId); } catch (BlockRollbackException e) { LOGGER.error("Error occurred while pre compute commit --" + e.getMessage(), e); throw e; + } finally { + cmdHandleLock.unlock(); } } @@ -488,12 +509,26 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer * Consensus write phase will terminate, new block hash values are inconsistent, rollback block * */ - public void preComputeAppRollback(String batchId) { - messageHandle.rollbackBatch(realmName, batchId, TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK.CODE); - LOGGER.debug("Rollback of operations that cause inconsistencies in the ledger"); + @Override + public void preComputeAppRollback(int cid, String batchId) { + cmdHandleLock.lock(); + try { + long lastCid = stateHolder.lastCid; + if (cid <= lastCid) { + // 表示该CID已经执行过,不再处理 + return; + } + stateHolder.setLastCid(cid); + stateHolder.reset(); + messageHandle.rollbackBatch(realmName, batchId, TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK.CODE); + LOGGER.debug("Rollback of operations that cause inconsistencies in the ledger"); + } finally { + cmdHandleLock.unlock(); + } } //notice + @Override public byte[] getSnapshot() { LOGGER.debug("------- GetSnapshot...[replica.id=" + this.getId() + "]"); @@ -506,6 +541,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer return out.toByteArray(); } + @Override public void installSnapshot(byte[] snapshot) { // System.out.println("Not implement!"); } @@ -528,7 +564,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer try { LOGGER.debug("Start replica...[ID=" + getId() + "]"); - this.replica = new ServiceReplica(tomConfig, this, this); + this.replica = new ServiceReplica(tomConfig, this, this, (int)(this.stateHolder.lastCid - 1)); this.topology = new BftsmartTopology(replica.getReplicaContext().getCurrentView()); initOutTopology(); status = Status.RUNNING; @@ -603,4 +639,51 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer } + private static class InnerStateHolder { + + private long lastCid; + + private long currentCid = -1L; + + private String batchingID = ""; + + public InnerStateHolder(long lastCid) { + this.lastCid = lastCid; + } + + public InnerStateHolder(long lastCid, long currentCid) { + this.lastCid = lastCid; + this.currentCid = currentCid; + } + + public long getLastCid() { + return lastCid; + } + + public void setLastCid(long lastCid) { + this.lastCid = lastCid; + } + + public long getCurrentCid() { + return currentCid; + } + + public void setCurrentCid(long currentCid) { + this.currentCid = currentCid; + } + + public String getBatchingID() { + return batchingID; + } + + public void setBatchingID(String batchingID) { + this.batchingID = batchingID; + } + + public void reset() { + currentCid = -1; + batchingID = ""; + } + } + } diff --git a/source/deployment/deployment-peer/src/main/resources/config/init/rocksdb.config b/source/deployment/deployment-peer/src/main/resources/config/init/rocksdb.config index 210c20a8..3bdaf6de 100644 --- a/source/deployment/deployment-peer/src/main/resources/config/init/rocksdb.config +++ b/source/deployment/deployment-peer/src/main/resources/config/init/rocksdb.config @@ -22,8 +22,8 @@ option.maxWriteBufferNumber=7 #最小合并MemTable的数量 option.minWriteBufferNumberToMerge=2 -#最大同时打开文件数量,-1表示全部打开 -option.maxOpenFiles=-1 +#最大同时打开文件数量,-1表示全部打开,设置为-1需要考虑ulimit的限制 +option.maxOpenFiles=256 #最大后台压缩线程数 option.maxBackgroundCompactions=5 diff --git a/source/peer/src/main/java/com/jd/blockchain/peer/consensus/LedgerStateManager.java b/source/peer/src/main/java/com/jd/blockchain/peer/consensus/LedgerStateManager.java index b88fc49d..f09f5d7f 100644 --- a/source/peer/src/main/java/com/jd/blockchain/peer/consensus/LedgerStateManager.java +++ b/source/peer/src/main/java/com/jd/blockchain/peer/consensus/LedgerStateManager.java @@ -2,18 +2,25 @@ package com.jd.blockchain.peer.consensus; import java.io.InputStream; import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import com.jd.blockchain.consensus.service.StateMachineReplicate; import com.jd.blockchain.consensus.service.StateSnapshot; import org.springframework.stereotype.Component; @Component -public class LedgerStateManager implements StateMachineReplicate{ +public class LedgerStateManager implements StateMachineReplicate { + + private final Map stateSnapshots = new ConcurrentHashMap<>(); @Override public long getLatestStateID(String realmName) { - // TODO Auto-generated method stub - return 0; + StateSnapshot snapshot = stateSnapshots.get(realmName); + if (snapshot == null) { + return -1L; + } + return snapshot.getId(); } @Override @@ -36,8 +43,6 @@ public class LedgerStateManager implements StateMachineReplicate{ @Override public void setupState(String realmName, StateSnapshot snapshot, InputStream state) { - // TODO Auto-generated method stub - + stateSnapshots.put(realmName, snapshot); } - } diff --git a/source/peer/src/main/java/com/jd/blockchain/peer/web/ManagementController.java b/source/peer/src/main/java/com/jd/blockchain/peer/web/ManagementController.java index a018bd85..561a9664 100644 --- a/source/peer/src/main/java/com/jd/blockchain/peer/web/ManagementController.java +++ b/source/peer/src/main/java/com/jd/blockchain/peer/web/ManagementController.java @@ -10,6 +10,7 @@ import bftsmart.reconfiguration.util.TOMConfiguration; import bftsmart.reconfiguration.views.View; import com.jd.blockchain.consensus.bftsmart.BftsmartClientIncomingSettings; import com.jd.blockchain.consensus.bftsmart.BftsmartTopology; +import com.jd.blockchain.consensus.service.*; import com.jd.blockchain.ledger.*; import com.jd.blockchain.utils.net.NetworkAddress; import com.jd.blockchain.utils.serialize.binary.BinarySerializeUtils; @@ -34,10 +35,6 @@ import com.jd.blockchain.consensus.action.ActionResponse; import com.jd.blockchain.consensus.bftsmart.BftsmartConsensusSettings; import com.jd.blockchain.consensus.bftsmart.BftsmartNodeSettings; import com.jd.blockchain.consensus.mq.server.MsgQueueMessageDispatcher; -import com.jd.blockchain.consensus.service.MessageHandle; -import com.jd.blockchain.consensus.service.NodeServer; -import com.jd.blockchain.consensus.service.ServerSettings; -import com.jd.blockchain.consensus.service.StateMachineReplicate; import com.jd.blockchain.crypto.HashDigest; import com.jd.blockchain.ledger.LedgerAdminInfo; import com.jd.blockchain.ledger.core.LedgerAdminDataQuery; @@ -250,6 +247,9 @@ public class ManagementController implements LedgerBindingConfigAware, PeerManag } ServerSettings serverSettings = provider.getServerFactory().buildServerSettings(ledgerHash.toBase58(), csSettings, currentNode.getAddress()); + // 注册快照状态 + consensusStateManager.setupState(ledgerHash.toBase58(), new BlockStateSnapshot( + ledgerRepository.retrieveLatestBlockHeight(), ledgerHash), null); NodeServer server = provider.getServerFactory().setupServer(serverSettings, consensusMessageHandler, consensusStateManager); ledgerPeers.put(ledgerHash, server); @@ -281,4 +281,30 @@ public class ManagementController implements LedgerBindingConfigAware, PeerManag peer.stop(); } } + + private final class BlockStateSnapshot implements StateSnapshot { + + private long id; + + private byte[] snapshotBytes; + + public BlockStateSnapshot(long id, byte[] snapshotBytes) { + this.id = id; + this.snapshotBytes = snapshotBytes; + } + + public BlockStateSnapshot(long id, HashDigest hash) { + this(id, hash.toBytes()); + } + + @Override + public long getId() { + return id; + } + + @Override + public byte[] getSnapshot() { + return snapshotBytes; + } + } } diff --git a/source/pom.xml b/source/pom.xml index 0d663735..7462d2fd 100644 --- a/source/pom.xml +++ b/source/pom.xml @@ -38,7 +38,7 @@ - 0.4.3.RELEASE + 0.4.4.RELEASE 1.1.3.RELEASE 1.1.3.RELEASE 2.4 diff --git a/source/storage/storage-rocksdb/src/main/java/com/jd/blockchain/storage/service/impl/rocksdb/RocksDBConnectionFactory.java b/source/storage/storage-rocksdb/src/main/java/com/jd/blockchain/storage/service/impl/rocksdb/RocksDBConnectionFactory.java index 99c54bfc..3a440127 100644 --- a/source/storage/storage-rocksdb/src/main/java/com/jd/blockchain/storage/service/impl/rocksdb/RocksDBConnectionFactory.java +++ b/source/storage/storage-rocksdb/src/main/java/com/jd/blockchain/storage/service/impl/rocksdb/RocksDBConnectionFactory.java @@ -111,7 +111,7 @@ public class RocksDBConnectionFactory implements DbConnectionFactory { long optionWriteBufferSize = getLong(dbProperties, "option.writeBufferSize", 256 * SizeUnit.MB); int optionMaxWriteBufferNumber = getInt(dbProperties, "option.maxWriteBufferNumber", 7); int optionMinWriteBufferNumberToMerge = getInt(dbProperties, "option.minWriteBufferNumberToMerge", 2); - int optionMaxOpenFiles = getInt(dbProperties, "option.maxOpenFiles", -1); + int optionMaxOpenFiles = getInt(dbProperties, "option.maxOpenFiles", 256); int optionMaxBackgroundCompactions = getInt(dbProperties, "option.maxBackgroundCompactions", 5); int optionMaxBackgroundFlushes = getInt(dbProperties, "option.maxBackgroundFlushes", 4);