Browse Source

modify exception of write repeat

tags/1.1.11
shaozhuguang 5 years ago
parent
commit
0f5e585a2a
6 changed files with 175 additions and 61 deletions
  1. +130
    -47
      source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java
  2. +2
    -2
      source/deployment/deployment-peer/src/main/resources/config/init/rocksdb.config
  3. +11
    -6
      source/peer/src/main/java/com/jd/blockchain/peer/consensus/LedgerStateManager.java
  4. +30
    -4
      source/peer/src/main/java/com/jd/blockchain/peer/web/ManagementController.java
  5. +1
    -1
      source/pom.xml
  6. +1
    -1
      source/storage/storage-rocksdb/src/main/java/com/jd/blockchain/storage/service/impl/rocksdb/RocksDBConnectionFactory.java

+ 130
- 47
source/consensus/consensus-bftsmart/src/main/java/com/jd/blockchain/consensus/bftsmart/service/BftsmartNodeServer.java View File

@@ -6,6 +6,8 @@ import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import bftsmart.consensus.app.BatchAppResultImpl;
import bftsmart.reconfiguration.views.View;
@@ -39,13 +41,10 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer

private static Logger LOGGER = LoggerFactory.getLogger(BftsmartNodeServer.class);

// private static final String DEFAULT_BINDING_HOST = "0.0.0.0";
private final Lock cmdHandleLock = new ReentrantLock();

private List<StateHandle> stateHandles = new CopyOnWriteArrayList<>();

// TODO 暂不处理队列溢出问题
private ExecutorService notifyReplyExecutors = Executors.newSingleThreadExecutor();

private volatile Status status = Status.STOPPED;

private final Object mutex = new Object();
@@ -64,6 +63,8 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer

private volatile BftsmartConsensusSettings setting;

private volatile InnerStateHolder stateHolder;

private TOMConfiguration tomConfig;

private TOMConfiguration outerTomConfig;
@@ -88,6 +89,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
this.realmName = serverSettings.getRealmName();
//used later
this.stateMachineReplicate = stateMachineReplicate;
this.stateHolder = new InnerStateHolder(this.stateMachineReplicate.getLatestStateID(realmName));
this.messageHandle = messageHandler;
createConfig();
serverId = findServerId();
@@ -116,11 +118,8 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
}

protected void createConfig() {

setting = ((BftsmartServerSettings) serverSettings).getConsensusSettings();

List<HostsConfig.Config> configList = new ArrayList<>();

NodeSettings[] nodeSettingsArray = setting.getNodes();
for (NodeSettings nodeSettings : nodeSettingsArray) {
BftsmartNodeSettings node = (BftsmartNodeSettings)nodeSettings;
@@ -129,10 +128,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer

//create HostsConfig instance based on consensus realm nodes
hostsConfig = new HostsConfig(configList.toArray(new HostsConfig.Config[configList.size()]));

systemConfig = PropertiesUtils.createProperties(setting.getSystemConfigs());

return;
}

protected void initConfig(int id, Properties systemsConfig, HostsConfig hostConfig) {
@@ -212,6 +208,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
return status == Status.RUNNING;
}

@Override
public byte[] appExecuteUnordered(byte[] bytes, MessageContext messageContext) {
return messageHandle.processUnordered(bytes).get();
}
@@ -221,54 +218,51 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
* Only block, no reply, used by state transfer when peer start
*
*/
private void block(List<byte[]> manageConsensusCmds) {

private void block(byte[][] manageConsensusCmds) {
String batchId = messageHandle.beginBatch(realmName);
try {
int msgId = 0;
for (byte[] txContent : manageConsensusCmds) {
AsyncFuture<byte[]> asyncFuture = messageHandle.processOrdered(msgId++, txContent, realmName, batchId);
messageHandle.processOrdered(msgId++, txContent, realmName, batchId);
}
messageHandle.completeBatch(realmName, batchId);
messageHandle.commitBatch(realmName, batchId);
} catch (Exception e) {
// todo 需要处理应答码 404
LOGGER.error("Error occurred while processing ordered messages! --" + e.getMessage(), e);
messageHandle.rollbackBatch(realmName, batchId, TransactionState.CONSENSUS_ERROR.CODE);
}

}

/**
*
* Local peer has cid diff with remote peer, used by state transfer when peer start
* 每个传入的commands都是一个确定的batch,不会出现跨batch或多batch的情况
*
*/
private byte[][] appExecuteDiffBatch(byte[][] commands, MessageContext[] msgCtxs) {

int manageConsensusId = msgCtxs[0].getConsensusId();
List<byte[]> manageConsensusCmds = new ArrayList<>();

int index = 0;
for (MessageContext msgCtx : msgCtxs) {
if (msgCtx.getConsensusId() == manageConsensusId) {
manageConsensusCmds.add(commands[index]);
} else {
// 达到结块标准,需要进行结块并应答
block(manageConsensusCmds);
// 重置链表和共识ID
manageConsensusCmds = new ArrayList<>();
manageConsensusId = msgCtx.getConsensusId();
manageConsensusCmds.add(commands[index]);
cmdHandleLock.lock();
try {
int currHandleCid = msgCtxs[0].getConsensusId(); // 同一批次,获取其中之一即可
long lastCid = stateHolder.lastCid, currentCid = stateHolder.currentCid;
if (currHandleCid <= lastCid) {
// 表示该CID已经执行过,不再处理
return null;
} else if (currHandleCid == lastCid + 1) {
// 有可能处理正在执行中的状态,需要判断是否新的开始执行
if (currHandleCid == currentCid) {
// 表示在执行,那么将执行的结果回滚并重新执行
String batchingID = stateHolder.batchingID;
messageHandle.rollbackBatch(realmName, batchingID, TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK.CODE);
}
}
index++;
}
// 结束时,肯定有最后一个结块请求未处理
if (!manageConsensusCmds.isEmpty()) {
block(manageConsensusCmds);
// 执行即可
block(commands);
// 重置上轮ID
stateHolder.setLastCid(currHandleCid);
stateHolder.reset();
} finally {
cmdHandleLock.unlock();
}
return null;

}

/**
@@ -278,12 +272,11 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
*/
@Override
public byte[][] appExecuteBatch(byte[][] commands, MessageContext[] msgCtxs, boolean fromConsensus) {

// Not from consensus outcomes, from state transfer
if (!fromConsensus) {
// 表示从状态传输接入
return appExecuteDiffBatch(commands, msgCtxs);
}

return null;
}

@@ -385,7 +378,8 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
/**
* Used by consensus write phase, pre compute new block hash
*/
public BatchAppResultImpl preComputeAppHash(byte[][] commands) {
@Override
public BatchAppResultImpl preComputeAppHash(int cid, byte[][] commands) {

List<AsyncFuture<byte[]>> asyncFutureLinkedList = new ArrayList<>(commands.length);
List<byte[]> responseLinkedList = new ArrayList<>();
@@ -395,10 +389,23 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
BatchAppResultImpl result = null;
String batchId = null;
int msgId = 0;
cmdHandleLock.lock();
try {

long lastCid = stateHolder.lastCid, currentCid = stateHolder.currentCid;
if (cid <= lastCid) {
// 表示该CID已经执行过,不再处理
return null;
} else if (cid == lastCid + 1) {
// 需要判断之前二阶段是否执行过
if (cid == currentCid) {
// 表示二阶段已执行,回滚,重新执行
String batchingID = stateHolder.batchingID;
messageHandle.rollbackBatch(realmName, batchingID, TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK.CODE);
}
}
stateHolder.currentCid = cid;
batchId = messageHandle.beginBatch(realmName);
stateHolder.batchingID = batchId;
genisStateSnapshot = messageHandle.getGenisStateSnapshot(realmName);
preStateSnapshot = messageHandle.getStateSnapshot(realmName);

@@ -436,6 +443,8 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer

result = new BatchAppResultImpl(responseLinkedList,preStateSnapshot.getSnapshot(), batchId, genisStateSnapshot.getSnapshot());
result.setErrorCode((byte) 1);
} finally {
cmdHandleLock.unlock();
}

return result;
@@ -452,6 +461,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
return BinaryProtocol.encode(resp, TransactionResponse.class);
}

@Override
public List<byte[]> updateAppResponses(List<byte[]> asyncResponseLinkedList, byte[] commonHash, boolean isConsistent) {
List<byte[]> updatedResponses = new ArrayList<>();
TxResponseMessage resp = null;
@@ -474,12 +484,23 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
* Decision has been made at the consensus stage, commit block
*
*/
public void preComputeAppCommit(String batchId) {
@Override
public void preComputeAppCommit(int cid, String batchId) {
cmdHandleLock.lock();
try {
long lastCid = stateHolder.lastCid;
if (cid <= lastCid) {
// 表示该CID已经执行过,不再处理
return;
}
stateHolder.setLastCid(cid);
stateHolder.reset();
messageHandle.commitBatch(realmName, batchId);
} catch (BlockRollbackException e) {
LOGGER.error("Error occurred while pre compute commit --" + e.getMessage(), e);
throw e;
} finally {
cmdHandleLock.unlock();
}
}

@@ -488,12 +509,26 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
* Consensus write phase will terminate, new block hash values are inconsistent, rollback block
*
*/
public void preComputeAppRollback(String batchId) {
messageHandle.rollbackBatch(realmName, batchId, TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK.CODE);
LOGGER.debug("Rollback of operations that cause inconsistencies in the ledger");
@Override
public void preComputeAppRollback(int cid, String batchId) {
cmdHandleLock.lock();
try {
long lastCid = stateHolder.lastCid;
if (cid <= lastCid) {
// 表示该CID已经执行过,不再处理
return;
}
stateHolder.setLastCid(cid);
stateHolder.reset();
messageHandle.rollbackBatch(realmName, batchId, TransactionState.IGNORED_BY_BLOCK_FULL_ROLLBACK.CODE);
LOGGER.debug("Rollback of operations that cause inconsistencies in the ledger");
} finally {
cmdHandleLock.unlock();
}
}

//notice
@Override
public byte[] getSnapshot() {
LOGGER.debug("------- GetSnapshot...[replica.id=" + this.getId() + "]");

@@ -506,6 +541,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer
return out.toByteArray();
}

@Override
public void installSnapshot(byte[] snapshot) {
// System.out.println("Not implement!");
}
@@ -528,7 +564,7 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer

try {
LOGGER.debug("Start replica...[ID=" + getId() + "]");
this.replica = new ServiceReplica(tomConfig, this, this);
this.replica = new ServiceReplica(tomConfig, this, this, (int)(this.stateHolder.lastCid - 1));
this.topology = new BftsmartTopology(replica.getReplicaContext().getCurrentView());
initOutTopology();
status = Status.RUNNING;
@@ -603,4 +639,51 @@ public class BftsmartNodeServer extends DefaultRecoverable implements NodeServer

}

private static class InnerStateHolder {

private long lastCid;

private long currentCid = -1L;

private String batchingID = "";

public InnerStateHolder(long lastCid) {
this.lastCid = lastCid;
}

public InnerStateHolder(long lastCid, long currentCid) {
this.lastCid = lastCid;
this.currentCid = currentCid;
}

public long getLastCid() {
return lastCid;
}

public void setLastCid(long lastCid) {
this.lastCid = lastCid;
}

public long getCurrentCid() {
return currentCid;
}

public void setCurrentCid(long currentCid) {
this.currentCid = currentCid;
}

public String getBatchingID() {
return batchingID;
}

public void setBatchingID(String batchingID) {
this.batchingID = batchingID;
}

public void reset() {
currentCid = -1;
batchingID = "";
}
}

}

+ 2
- 2
source/deployment/deployment-peer/src/main/resources/config/init/rocksdb.config View File

@@ -22,8 +22,8 @@ option.maxWriteBufferNumber=7
#最小合并MemTable的数量
option.minWriteBufferNumberToMerge=2

#最大同时打开文件数量,-1表示全部打开
option.maxOpenFiles=-1
#最大同时打开文件数量,-1表示全部打开,设置为-1需要考虑ulimit的限制
option.maxOpenFiles=256

#最大后台压缩线程数
option.maxBackgroundCompactions=5


+ 11
- 6
source/peer/src/main/java/com/jd/blockchain/peer/consensus/LedgerStateManager.java View File

@@ -2,18 +2,25 @@ package com.jd.blockchain.peer.consensus;

import java.io.InputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.jd.blockchain.consensus.service.StateMachineReplicate;
import com.jd.blockchain.consensus.service.StateSnapshot;
import org.springframework.stereotype.Component;

@Component
public class LedgerStateManager implements StateMachineReplicate{
public class LedgerStateManager implements StateMachineReplicate {

private final Map<String, StateSnapshot> stateSnapshots = new ConcurrentHashMap<>();

@Override
public long getLatestStateID(String realmName) {
// TODO Auto-generated method stub
return 0;
StateSnapshot snapshot = stateSnapshots.get(realmName);
if (snapshot == null) {
return -1L;
}
return snapshot.getId();
}

@Override
@@ -36,8 +43,6 @@ public class LedgerStateManager implements StateMachineReplicate{

@Override
public void setupState(String realmName, StateSnapshot snapshot, InputStream state) {
// TODO Auto-generated method stub
stateSnapshots.put(realmName, snapshot);
}

}

+ 30
- 4
source/peer/src/main/java/com/jd/blockchain/peer/web/ManagementController.java View File

@@ -10,6 +10,7 @@ import bftsmart.reconfiguration.util.TOMConfiguration;
import bftsmart.reconfiguration.views.View;
import com.jd.blockchain.consensus.bftsmart.BftsmartClientIncomingSettings;
import com.jd.blockchain.consensus.bftsmart.BftsmartTopology;
import com.jd.blockchain.consensus.service.*;
import com.jd.blockchain.ledger.*;
import com.jd.blockchain.utils.net.NetworkAddress;
import com.jd.blockchain.utils.serialize.binary.BinarySerializeUtils;
@@ -34,10 +35,6 @@ import com.jd.blockchain.consensus.action.ActionResponse;
import com.jd.blockchain.consensus.bftsmart.BftsmartConsensusSettings;
import com.jd.blockchain.consensus.bftsmart.BftsmartNodeSettings;
import com.jd.blockchain.consensus.mq.server.MsgQueueMessageDispatcher;
import com.jd.blockchain.consensus.service.MessageHandle;
import com.jd.blockchain.consensus.service.NodeServer;
import com.jd.blockchain.consensus.service.ServerSettings;
import com.jd.blockchain.consensus.service.StateMachineReplicate;
import com.jd.blockchain.crypto.HashDigest;
import com.jd.blockchain.ledger.LedgerAdminInfo;
import com.jd.blockchain.ledger.core.LedgerAdminDataQuery;
@@ -250,6 +247,9 @@ public class ManagementController implements LedgerBindingConfigAware, PeerManag
}
ServerSettings serverSettings = provider.getServerFactory().buildServerSettings(ledgerHash.toBase58(), csSettings, currentNode.getAddress());

// 注册快照状态
consensusStateManager.setupState(ledgerHash.toBase58(), new BlockStateSnapshot(
ledgerRepository.retrieveLatestBlockHeight(), ledgerHash), null);
NodeServer server = provider.getServerFactory().setupServer(serverSettings, consensusMessageHandler,
consensusStateManager);
ledgerPeers.put(ledgerHash, server);
@@ -281,4 +281,30 @@ public class ManagementController implements LedgerBindingConfigAware, PeerManag
peer.stop();
}
}

private final class BlockStateSnapshot implements StateSnapshot {

private long id;

private byte[] snapshotBytes;

public BlockStateSnapshot(long id, byte[] snapshotBytes) {
this.id = id;
this.snapshotBytes = snapshotBytes;
}

public BlockStateSnapshot(long id, HashDigest hash) {
this(id, hash.toBytes());
}

@Override
public long getId() {
return id;
}

@Override
public byte[] getSnapshot() {
return snapshotBytes;
}
}
}

+ 1
- 1
source/pom.xml View File

@@ -38,7 +38,7 @@
</modules>

<properties>
<bft-smart.version>0.4.3.RELEASE</bft-smart.version>
<bft-smart.version>0.4.4.RELEASE</bft-smart.version>
<data-explorer.version>1.1.3.RELEASE</data-explorer.version>
<manager-explorer.version>1.1.3.RELEASE</manager-explorer.version>
<commons-io.version>2.4</commons-io.version>


+ 1
- 1
source/storage/storage-rocksdb/src/main/java/com/jd/blockchain/storage/service/impl/rocksdb/RocksDBConnectionFactory.java View File

@@ -111,7 +111,7 @@ public class RocksDBConnectionFactory implements DbConnectionFactory {
long optionWriteBufferSize = getLong(dbProperties, "option.writeBufferSize", 256 * SizeUnit.MB);
int optionMaxWriteBufferNumber = getInt(dbProperties, "option.maxWriteBufferNumber", 7);
int optionMinWriteBufferNumberToMerge = getInt(dbProperties, "option.minWriteBufferNumberToMerge", 2);
int optionMaxOpenFiles = getInt(dbProperties, "option.maxOpenFiles", -1);
int optionMaxOpenFiles = getInt(dbProperties, "option.maxOpenFiles", 256);
int optionMaxBackgroundCompactions = getInt(dbProperties, "option.maxBackgroundCompactions", 5);
int optionMaxBackgroundFlushes = getInt(dbProperties, "option.maxBackgroundFlushes", 4);



Loading…
Cancel
Save