diff --git a/source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/TransactionState.java b/source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/TransactionState.java index d19bcff2..f78df593 100644 --- a/source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/TransactionState.java +++ b/source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/TransactionState.java @@ -29,6 +29,11 @@ public enum TransactionState { */ LEDGER_ERROR((byte) 2), + /** + * 数据序列更新错误; + */ + DATA_SEQUENCE_UPDATE_ERROR((byte) 3), + /** * 系统错误; */ diff --git a/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java index cd2677ac..f41bfe9e 100644 --- a/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java +++ b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java @@ -1,8 +1,19 @@ package com.jd.blockchain.peer.statetransfer; +import com.jd.blockchain.crypto.hash.HashDigest; +import com.jd.blockchain.ledger.LedgerBlock; +import com.jd.blockchain.ledger.core.LedgerManage; +import com.jd.blockchain.ledger.core.LedgerRepository; +import com.jd.blockchain.ledger.core.TransactionSet; import com.jd.blockchain.statetransfer.DataSequenceElement; import com.jd.blockchain.statetransfer.DataSequenceInfo; -import com.jd.blockchain.statetransfer.DataSequenceReader; +import com.jd.blockchain.statetransfer.callback.DataSequenceReader; +import com.jd.blockchain.storage.service.DbConnection; +import com.jd.blockchain.storage.service.DbConnectionFactory; +import com.jd.blockchain.tools.initializer.LedgerBindingConfig; +import com.jd.blockchain.utils.codec.Base58Utils; +import com.jd.blockchain.utils.codec.HexUtils; +import org.springframework.beans.factory.annotation.Autowired; /** *数据序列差异的提供者需要使用的回调接口实现类 @@ -13,13 +24,74 @@ import com.jd.blockchain.statetransfer.DataSequenceReader; */ public class DataSequenceReaderImpl implements DataSequenceReader { + private LedgerManage ledgerManager; + + private DbConnectionFactory connFactory; + + private LedgerBindingConfig config; + + public DataSequenceReaderImpl(LedgerBindingConfig config, LedgerManage ledgerManager, DbConnectionFactory connFactory) { + this.config = config; + this.ledgerManager = ledgerManager; + this.connFactory = connFactory; + } + + + /** + * + * + */ @Override public DataSequenceInfo getDSInfo(String id) { - return null; + + byte[] hashBytes = Base58Utils.decode(id); + + HashDigest ledgerHash = new HashDigest(hashBytes); + + LedgerBindingConfig.BindingConfig bindingConfig = config.getLedger(ledgerHash); + DbConnection dbConnNew = connFactory.connect(bindingConfig.getDbConnection().getUri(), + bindingConfig.getDbConnection().getPassword()); + LedgerRepository ledgerRepository = ledgerManager.register(ledgerHash, dbConnNew.getStorageService()); + + return new DataSequenceInfo(id, ledgerRepository.getLatestBlockHeight()); } + /** + * + * + */ @Override - public DataSequenceElement[] getDSContent(String id, long from, long to) { - return null; + public DataSequenceElement[] getDSDiffContent(String id, long from, long to) { + + DataSequenceElement[] dataSequenceElements = new DataSequenceElement[(int)(to - from + 1)]; + for (long i = from; i < to + 1; i++) { + dataSequenceElements[(int)(i - from)] = getDSDiffContent(id, i); + } + + return dataSequenceElements; + } + + /** + * + * + */ + @Override + public DataSequenceElement getDSDiffContent(String id, long height) { + + byte[] hashBytes = Base58Utils.decode(id); + + HashDigest ledgerHash = new HashDigest(hashBytes); + + LedgerBindingConfig.BindingConfig bindingConfig = config.getLedger(ledgerHash); + DbConnection dbConnNew = connFactory.connect(bindingConfig.getDbConnection().getUri(), + bindingConfig.getDbConnection().getPassword()); + LedgerRepository ledgerRepository = ledgerManager.register(ledgerHash, dbConnNew.getStorageService()); + + LedgerBlock ledgerBlock = ledgerRepository.getBlock(height); + TransactionSet transactionSet = ledgerRepository.getTransactionSet(ledgerBlock); + //todo + + + return null; } } diff --git a/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java index e2c48601..589c7d88 100644 --- a/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java +++ b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java @@ -1,9 +1,14 @@ package com.jd.blockchain.peer.statetransfer; +import com.jd.blockchain.consensus.service.MessageHandle; +import com.jd.blockchain.ledger.TransactionState; import com.jd.blockchain.statetransfer.DataSequenceElement; import com.jd.blockchain.statetransfer.DataSequenceInfo; -import com.jd.blockchain.statetransfer.DataSequenceReader; -import com.jd.blockchain.statetransfer.DataSequenceWriter; +import com.jd.blockchain.statetransfer.callback.DataSequenceWriter; +import com.jd.blockchain.statetransfer.comparator.DataSequenceComparator; + +import java.util.ArrayList; +import java.util.Collections; /** *数据序列差异的请求者需要使用的回调接口实现类 @@ -14,9 +19,146 @@ import com.jd.blockchain.statetransfer.DataSequenceWriter; */ public class DataSequenceWriterImpl implements DataSequenceWriter { + private long currHeight; + private ArrayList deceidedElements = new ArrayList(); + + private MessageHandle batchMessageHandle; + + + public DataSequenceWriterImpl(MessageHandle batchMessageHandle) { + this.batchMessageHandle = batchMessageHandle; + } + + /** + * check height to data sequence diff elements + * + */ + private int checkElementsHeight(long currHeight, ArrayList dsUpdateElements) { + boolean lossMiddleElements = false; + + // lose first element + if (currHeight + 1 < dsUpdateElements.get(0).getHeight()){ + System.out.println("Diff response loss first element error!"); + return DataSequenceErrorType.DATA_SEQUENCE_LOSS_FIRST_ELEMENT.CODE; + } + else { + for (int i = 0; i < dsUpdateElements.size(); i++) { + if (dsUpdateElements.get(i).getHeight() == currHeight + 1 + i) { + deceidedElements.add(dsUpdateElements.get(i)); + } + // lose middle elements + else { + lossMiddleElements = true; + break; + } + } + + if (lossMiddleElements) { + System.out.println("Diff response loss middle elements error!"); + return DataSequenceErrorType.DATA_SEQUENCE_LOSS_MIDDLE_ELEMENT.CODE; + } + + System.out.println("Diff response elements height normal!"); + return DataSequenceErrorType.DATA_SEQUENCE_ELEMENT_HEIGHT_NORMAL.CODE; + } + + } + + /** + * + * + */ + private void exeUpdate(String realmName) { + for (int i = 0; i < deceidedElements.size(); i++) { + byte[][] element = deceidedElements.get(i).getData(); + + String batchId = batchMessageHandle.beginBatch(realmName); + try { + int msgId = 0; + for (byte[] txContent : element) { + batchMessageHandle.processOrdered(msgId++, txContent, realmName, batchId); + } + batchMessageHandle.completeBatch(realmName, batchId); + batchMessageHandle.commitBatch(realmName, batchId); + } catch (Exception e) { + // todo 需要处理应答码 404 + batchMessageHandle.rollbackBatch(realmName, batchId, TransactionState.DATA_SEQUENCE_UPDATE_ERROR.CODE); + } + } + + } + /** + * + * + */ @Override - public void updateDSInfo(String id, DataSequenceElement[] diffContents) { + public int updateDSInfo(DataSequenceInfo id, DataSequenceElement[] diffContents) { + int result = 0; + try { + ArrayList dsUpdateElements = new ArrayList(); + //remove unexpected elements + for (int i = 0 ; i < diffContents.length; i++) { + if (diffContents[i].getId().equals(id.getId())) { + dsUpdateElements.add(diffContents[i]); + } + } + + // sort elements by height + Collections.sort(dsUpdateElements, new DataSequenceComparator()); + + currHeight = id.getHeight(); + + // check element's height + result = checkElementsHeight(currHeight, dsUpdateElements); + + // cann't exe update + if (result == DataSequenceErrorType.DATA_SEQUENCE_LOSS_FIRST_ELEMENT.CODE) { + return result; + } + // exe elements update + else { + exeUpdate(id.getId()); + return result; + } + } catch (Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + } + + return result; + } + + @Override + public int updateDSInfo(DataSequenceInfo id, DataSequenceElement diffContents) { + return 0; } + + + /** + * data sequence transfer error type + * + */ + public enum DataSequenceErrorType { + DATA_SEQUENCE_LOSS_FIRST_ELEMENT((byte) 0x1), + DATA_SEQUENCE_LOSS_MIDDLE_ELEMENT((byte) 0x2), + DATA_SEQUENCE_ELEMENT_HEIGHT_NORMAL((byte) 0x3), + ; + public final int CODE; + + private DataSequenceErrorType(byte code) { + this.CODE = code; + } + + public static DataSequenceErrorType valueOf(byte code) { + for (DataSequenceErrorType errorType : DataSequenceErrorType.values()) { + if (errorType.CODE == code) { + return errorType; + } + } + throw new IllegalArgumentException("Unsupported code[" + code + "] of errorType!"); + } + } + } diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java index 3e7d060f..ee9a959b 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java @@ -1,6 +1,7 @@ package com.jd.blockchain.statetransfer.callback; import com.jd.blockchain.statetransfer.DataSequenceElement; +import com.jd.blockchain.statetransfer.DataSequenceInfo; /** *数据序列差异的请求者获得差异内容后需要回调该接口,通过接口提供的方法对指定数据序列执行差异内容的重放,并更新数据序列的当前状态; @@ -14,12 +15,12 @@ public interface DataSequenceWriter { *更新数据序列的当前状态,一次更新多个高度的差异 * return void */ - int updateDSInfo(String id, DataSequenceElement[] diffContents); + int updateDSInfo(DataSequenceInfo id, DataSequenceElement[] diffContents); /** *更新数据序列的当前状态,一次更新一个高度的差异 * return void */ - int updateDSInfo(String id, DataSequenceElement diffContents); + int updateDSInfo(DataSequenceInfo id, DataSequenceElement diffContents); } diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/comparator/DataSequenceComparator.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/comparator/DataSequenceComparator.java new file mode 100644 index 00000000..53ad5a15 --- /dev/null +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/comparator/DataSequenceComparator.java @@ -0,0 +1,24 @@ +package com.jd.blockchain.statetransfer.comparator; + +import com.jd.blockchain.statetransfer.DataSequenceElement; + +import java.util.Comparator; + +/** + * + * + */ +public class DataSequenceComparator implements Comparator { + + // sort by data sequence height + @Override + public int compare(DataSequenceElement o1, DataSequenceElement o2) { + long height1; + long height2; + + height1 = o1.getHeight(); + height2 = o2.getHeight(); + + return (int) (height1 - height2); + } +} diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/exception/DataSequenceException.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/exception/DataSequenceException.java new file mode 100644 index 00000000..95718487 --- /dev/null +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/exception/DataSequenceException.java @@ -0,0 +1,15 @@ +package com.jd.blockchain.statetransfer.exception; + +public class DataSequenceException extends RuntimeException { + + private static final long serialVersionUID = -4090881296855827889L; + + + public DataSequenceException(String message) { + super(message); + } + public DataSequenceException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java index af40d6f2..fecfa5c3 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java @@ -127,7 +127,7 @@ public class DSProcessManager { } // step6: process data sequence diff response, update local data sequence state DataSequenceElement[] dataSequenceElements = dsTransferProcess.computeDiffElement(receiveDiffResponses.toArray(new byte[receiveDiffResponses.size()][])); - returnCode = dsWriter.updateDSInfo(dsInfo.getId(), dataSequenceElements); + returnCode = dsWriter.updateDSInfo(dsInfo, dataSequenceElements); // data sequence transfer complete, close all sessions, end process life cycle