diff --git a/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java index f41bfe9e..eeb1819e 100644 --- a/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java +++ b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java @@ -1,7 +1,9 @@ package com.jd.blockchain.peer.statetransfer; +import com.jd.blockchain.binaryproto.BinaryEncodingUtils; import com.jd.blockchain.crypto.hash.HashDigest; import com.jd.blockchain.ledger.LedgerBlock; +import com.jd.blockchain.ledger.LedgerTransaction; import com.jd.blockchain.ledger.core.LedgerManage; import com.jd.blockchain.ledger.core.LedgerRepository; import com.jd.blockchain.ledger.core.TransactionSet; @@ -38,8 +40,8 @@ public class DataSequenceReaderImpl implements DataSequenceReader { /** - * - * + * @param id 账本哈希的Base58编码 + * @return DataSequenceInfo 数据序列信息 */ @Override public DataSequenceInfo getDSInfo(String id) { @@ -58,7 +60,10 @@ public class DataSequenceReaderImpl implements DataSequenceReader { /** * - * + * @param id 账本哈希的Base58编码 + * @param from 数据序列复制的起始高度 + * @param to 数据序列复制的结束高度 + * @return DataSequenceElement【】数据序列差异数据元素的数组 */ @Override public DataSequenceElement[] getDSDiffContent(String id, long from, long to) { @@ -72,12 +77,27 @@ public class DataSequenceReaderImpl implements DataSequenceReader { } /** - * - * + * 账本交易序列化 + * @param transaction 账本交易 + * @return byte[] 对账本交易进行序列化的结果 + */ + private byte[] serialize(LedgerTransaction transaction) { + return BinaryEncodingUtils.encode(transaction, LedgerTransaction.class); + } + + /** + * 获得账本某一高度区块上的所有交易 + * @param id 账本哈希的Base58编码 + * @param height 账本的某个区块高度 + * @return DataSequenceElement 数据序列差异数据元素 */ @Override public DataSequenceElement getDSDiffContent(String id, long height) { + int lastHeightTxTotalNums = 0; + + byte[][] transacionDatas = null; + byte[] hashBytes = Base58Utils.decode(id); HashDigest ledgerHash = new HashDigest(hashBytes); @@ -89,9 +109,25 @@ public class DataSequenceReaderImpl implements DataSequenceReader { LedgerBlock ledgerBlock = ledgerRepository.getBlock(height); TransactionSet transactionSet = ledgerRepository.getTransactionSet(ledgerBlock); - //todo + if (height > 0) { + lastHeightTxTotalNums = (int) ledgerRepository.getTransactionSet(ledgerRepository.getBlock(height - 1)).getTotalCount(); + } + + int currentHeightTxTotalNums = (int)ledgerRepository.getTransactionSet(ledgerRepository.getBlock(height)).getTotalCount(); + + // get all transactions from current height block + int currentHeightTxNums = currentHeightTxTotalNums - lastHeightTxTotalNums; + + LedgerTransaction[] transactions = transactionSet.getTxs(lastHeightTxTotalNums , currentHeightTxNums); - return null; + for (int i = 0; i < transactions.length; i++) { + byte[] transactionData = serialize(transactions[i]); + transacionDatas[i] = transactionData; + } + + return new DataSequenceElement(id, height, transacionDatas); } + + } diff --git a/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java index 589c7d88..56a67358 100644 --- a/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java +++ b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java @@ -30,8 +30,10 @@ public class DataSequenceWriterImpl implements DataSequenceWriter { } /** - * check height to data sequence diff elements - * + * 检查数据序列差异元素中的高度是否合理; + * @param currHeight 当前结点的账本高度 + * @param dsUpdateElements 需要更新到本地结点的数据序列元素List + * @return */ private int checkElementsHeight(long currHeight, ArrayList dsUpdateElements) { boolean lossMiddleElements = false; @@ -65,10 +67,12 @@ public class DataSequenceWriterImpl implements DataSequenceWriter { } /** - * - * + * 对本地结点执行账本更新 + * @param realmName 账本哈希的Base58编码 + * @return void */ private void exeUpdate(String realmName) { + for (int i = 0; i < deceidedElements.size(); i++) { byte[][] element = deceidedElements.get(i).getData(); @@ -78,6 +82,7 @@ public class DataSequenceWriterImpl implements DataSequenceWriter { for (byte[] txContent : element) { batchMessageHandle.processOrdered(msgId++, txContent, realmName, batchId); } + // 结块 batchMessageHandle.completeBatch(realmName, batchId); batchMessageHandle.commitBatch(realmName, batchId); } catch (Exception e) { @@ -89,18 +94,19 @@ public class DataSequenceWriterImpl implements DataSequenceWriter { } /** - * - * + * @param dsInfo 当前结点的数据序列信息 + * @param diffContents 数据序列差异的数据元素数组 + * @return int 更新结果码 */ @Override - public int updateDSInfo(DataSequenceInfo id, DataSequenceElement[] diffContents) { + public int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement[] diffContents) { int result = 0; try { ArrayList dsUpdateElements = new ArrayList(); //remove unexpected elements for (int i = 0 ; i < diffContents.length; i++) { - if (diffContents[i].getId().equals(id.getId())) { + if (diffContents[i].getId().equals(dsInfo.getId())) { dsUpdateElements.add(diffContents[i]); } } @@ -108,7 +114,7 @@ public class DataSequenceWriterImpl implements DataSequenceWriter { // sort elements by height Collections.sort(dsUpdateElements, new DataSequenceComparator()); - currHeight = id.getHeight(); + currHeight = dsInfo.getHeight(); // check element's height result = checkElementsHeight(currHeight, dsUpdateElements); @@ -119,7 +125,7 @@ public class DataSequenceWriterImpl implements DataSequenceWriter { } // exe elements update else { - exeUpdate(id.getId()); + exeUpdate(dsInfo.getId()); return result; } } catch (Exception e) { @@ -131,14 +137,15 @@ public class DataSequenceWriterImpl implements DataSequenceWriter { } @Override - public int updateDSInfo(DataSequenceInfo id, DataSequenceElement diffContents) { + public int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement diffContents) { return 0; } /** - * data sequence transfer error type - * + * 数据序列更新错误码 + * @param + * @return */ public enum DataSequenceErrorType { DATA_SEQUENCE_LOSS_FIRST_ELEMENT((byte) 0x1), diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java index ee9a959b..73f5d45d 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java @@ -15,12 +15,12 @@ public interface DataSequenceWriter { *更新数据序列的当前状态,一次更新多个高度的差异 * return void */ - int updateDSInfo(DataSequenceInfo id, DataSequenceElement[] diffContents); + int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement[] diffContents); /** *更新数据序列的当前状态,一次更新一个高度的差异 * return void */ - int updateDSInfo(DataSequenceInfo id, DataSequenceElement diffContents); + int updateDSInfo(DataSequenceInfo id, DataSequenceElement diffContent); } diff --git a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java new file mode 100644 index 00000000..4d138d04 --- /dev/null +++ b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java @@ -0,0 +1,70 @@ +package test.com.jd.blockchain.statetransfer; + +import com.jd.blockchain.statetransfer.DataSequenceElement; +import com.jd.blockchain.statetransfer.DataSequenceInfo; + +import java.net.InetSocketAddress; +import java.util.LinkedList; + +public class DataSequence { + + private InetSocketAddress address; + private String id; + + private static LinkedList dataSequenceElements = new LinkedList<>(); + + + public DataSequence(InetSocketAddress address, String id) { + this.address = address; + this.id = id; + } + + public String getId() { + return id; + } + + public InetSocketAddress getAddress() { + return address; + } + + public void addElements(DataSequenceElement[] elements) { + for (DataSequenceElement element : elements) { + addElement(element); + } + } + + public void addElement(DataSequenceElement element) { + try { + if (dataSequenceElements.size() == 0) { + if (element.getHeight() != 0) { + throw new IllegalArgumentException("Data sequence add element height error!"); + } + dataSequenceElements.addLast(element); + } + else { + if (dataSequenceElements.getLast().getHeight() != element.getHeight() - 1) { + throw new IllegalArgumentException("Data sequence add element height error!"); + } + dataSequenceElements.addLast(element); + } + + } catch (Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + } + } + + public LinkedList getDataSequenceElements() { + return dataSequenceElements; + } + + public DataSequenceInfo getDSInfo() { + if (dataSequenceElements.size() == 0) { + return new DataSequenceInfo(id, -1); + } + else { + return new DataSequenceInfo(id, dataSequenceElements.getLast().getHeight()); + } + } + +} diff --git a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceReaderImpl.java b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceReaderImpl.java new file mode 100644 index 00000000..cbb97172 --- /dev/null +++ b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceReaderImpl.java @@ -0,0 +1,60 @@ +package test.com.jd.blockchain.statetransfer; + +import com.jd.blockchain.statetransfer.DataSequenceElement; +import com.jd.blockchain.statetransfer.DataSequenceInfo; +import com.jd.blockchain.statetransfer.callback.DataSequenceReader; + +import java.net.InetSocketAddress; +import java.util.LinkedList; + +/** + *数据序列差异的提供者需要使用的回调接口实现类 + * @author zhangshuang + * @create 2019/4/22 + * @since 1.0.0 + * + */ + +public class DataSequenceReaderImpl implements DataSequenceReader { + + DataSequence currDataSequence; + + public DataSequenceReaderImpl(DataSequence currDataSequence) { + this.currDataSequence = currDataSequence; + } + + @Override + public DataSequenceInfo getDSInfo(String id) { + return currDataSequence.getDSInfo(); + } + + @Override + public DataSequenceElement[] getDSDiffContent(String id, long from, long to) { + DataSequenceElement[] elements = new DataSequenceElement[(int)(to - from + 1)]; + + int i = 0; + LinkedList dataSequenceElements = currDataSequence.getDataSequenceElements(); + for (DataSequenceElement element : dataSequenceElements) { + if (element.getHeight() < from || element.getHeight() > to) { + continue; + } + else { + elements[i++] = element; + } + } + + return elements; + + } + + @Override + public DataSequenceElement getDSDiffContent(String id, long height) { + for(DataSequenceElement dataSequenceElement : currDataSequence.getDataSequenceElements()) { + if (dataSequenceElement.getHeight() == height) { + return dataSequenceElement; + + } + } + return null; + } +} diff --git a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java new file mode 100644 index 00000000..59ce5bbc --- /dev/null +++ b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java @@ -0,0 +1,58 @@ +package test.com.jd.blockchain.statetransfer; + +import com.jd.blockchain.statetransfer.DataSequenceElement; +import com.jd.blockchain.statetransfer.DataSequenceInfo; +import com.jd.blockchain.statetransfer.callback.DataSequenceWriter; + +/** + *数据序列差异的请求者需要使用的回调接口实现类 + * @author zhangshuang + * @create 2019/4/22 + * @since 1.0.0 + * + */ +public class DataSequenceWriterImpl implements DataSequenceWriter { + + DataSequence currDataSequence; + + public DataSequenceWriterImpl(DataSequence currDataSequence) { + this.currDataSequence = currDataSequence; + } + + @Override + public int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement[] diffContents) { + + currDataSequence.addElements(diffContents); + + return DataSequenceErrorType.DATA_SEQUENCE_ELEMENT_HEIGHT_NORMAL.CODE; + + } + + @Override + public int updateDSInfo(DataSequenceInfo id, DataSequenceElement diffContent) { + currDataSequence.addElement(diffContent); + return DataSequenceErrorType.DATA_SEQUENCE_ELEMENT_HEIGHT_NORMAL.CODE; + } + + public enum DataSequenceErrorType { + DATA_SEQUENCE_LOSS_FIRST_ELEMENT((byte) 0x1), + DATA_SEQUENCE_LOSS_MIDDLE_ELEMENT((byte) 0x2), + DATA_SEQUENCE_ELEMENT_HEIGHT_NORMAL((byte) 0x3), + ; + public final int CODE; + + private DataSequenceErrorType(byte code) { + this.CODE = code; + } + + public static DataSequenceErrorType valueOf(byte code) { + for (DataSequenceErrorType errorType : DataSequenceErrorType.values()) { + if (errorType.CODE == code) { + return errorType; + } + } + throw new IllegalArgumentException("Unsupported code[" + code + "] of errorType!"); + } + } + +} diff --git a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java new file mode 100644 index 00000000..efcaa060 --- /dev/null +++ b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java @@ -0,0 +1,155 @@ +package test.com.jd.blockchain.statetransfer; + +import com.jd.blockchain.statetransfer.DataSequenceElement; +import com.jd.blockchain.statetransfer.DataSequenceInfo; +import com.jd.blockchain.statetransfer.callback.DataSequenceReader; +import com.jd.blockchain.statetransfer.callback.DataSequenceWriter; +import com.jd.blockchain.statetransfer.process.DSProcessManager; +import com.jd.blockchain.utils.codec.Base58Utils; +import org.junit.Before; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.util.LinkedList; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class StateTransferLayerTest { + + private final int[] listenPorts = new int[]{9000, 9010, 9020, 9030}; + + private String localIp = "127.0.0.1"; + + private int DataSequenceNum = 1; + + private int nodesNum = 4; + + private byte[] idBytes = new byte[20]; + + private Random rand = new Random(); + + private String[] dataSequenceIds; + + private DSProcessManager dsProcessManager; + + private DataSequenceReader dataSequenceReader; + + private DataSequenceWriter dataSequenceWriter; + + private InetSocketAddress[] remoteNodeIps = new InetSocketAddress[nodesNum]; + + private final ExecutorService threadPool = Executors.newFixedThreadPool(8); + + private static LinkedList dataSequencesPerNode = new LinkedList<>(); + + // 假定每个数据序列元素里有四条记录数据 + private byte[][] dsElementDatas = new byte[4][]; + + + @Before + public void init() { + + // 产生两个唯一的数据序列Id标识 + for (int i = 0; i < DataSequenceNum; i++) { + + dataSequenceIds[i] = new String(); + rand.nextBytes(idBytes); + dataSequenceIds[i] = Base58Utils.encode(idBytes); + } + + // 创建数据序列处理管理者实例 + dsProcessManager = new DSProcessManager(); + + + // 准备好所有的远端结点,包括监听者 + for (int i = 0; i < nodesNum; i++) { + remoteNodeIps[i] = new InetSocketAddress(localIp, listenPorts[i]); + } + + // 为数据序列的每个高度准备好内容,为了方便测试,每个高度的内容设置为一致 + for (int i = 0; i < dsElementDatas.length; i++) { + rand.nextBytes(idBytes); + dsElementDatas[i] = idBytes; + } + + // 为结点准备数据序列 + for (String id : dataSequenceIds) { + for (int i = 0; i < remoteNodeIps.length; i++) { + DataSequence dataSequence = new DataSequence(remoteNodeIps[i], id); + + // 为数据序列的0,1,2高度添加内容 + for (int j = 0; j < 3; i++) { + dataSequence.addElement(new DataSequenceElement(id, i, dsElementDatas)); + } + dataSequencesPerNode.addLast(dataSequence); + } + + // 把其中一个结点的数据序列与其他结点区别开来 + for (int i = 0; i < dataSequencesPerNode.size(); i++) { + DataSequence dataSequence = dataSequencesPerNode.get(i); + if (dataSequence.getAddress().getPort() != listenPorts[0]) { + // 为数据序列的3,4高度添加内容 + for (int j = 3; j < 5; i++) { + dataSequence.addElement(new DataSequenceElement(id, j, dsElementDatas)); + } + } + } + } + } + + // 获得除监听结点之外的其他远端结点 + InetSocketAddress[] getTargetNodesIp(InetSocketAddress listenIp, InetSocketAddress[] remoteNodeIps) { + + InetSocketAddress[] targets = new InetSocketAddress[remoteNodeIps.length - 1]; + int j = 0; + + for (int i = 0; i < remoteNodeIps.length; i++) { + if ((remoteNodeIps[i].getHostName().equals(listenIp.getHostName())) && (remoteNodeIps[i].getPort() == listenIp.getPort())) { + continue; + } + targets[j++] = new InetSocketAddress(remoteNodeIps[i].getHostName(), remoteNodeIps[i].getPort()); + } + + return targets; + + } + + DataSequence findDataSequence(String id, InetSocketAddress listenNodeAddr) { + for (DataSequence dataSequence : dataSequencesPerNode) { + if ((dataSequence.getAddress().getPort() == listenNodeAddr.getPort() && (dataSequence.getAddress().getHostName().equals(listenNodeAddr.getHostName())) + && (dataSequence.getId().equals(id)))) { + return dataSequence; + } + } + return null; + } + + + @Test + public void test() { + + CountDownLatch countDownLatch = new CountDownLatch(nodesNum); + + for (String id : dataSequenceIds) { + for (int i = 0; i < nodesNum; i++) { + InetSocketAddress listenNode = remoteNodeIps[i]; + threadPool.execute(() -> { + DataSequence currDataSequence = findDataSequence(id, listenNode); + DataSequenceInfo dsInfo = currDataSequence.getDSInfo(); + InetSocketAddress[] targets = getTargetNodesIp(listenNode, remoteNodeIps); + dsProcessManager.startDSProcess(dsInfo, listenNode, targets, new DataSequenceWriterImpl(currDataSequence), new DataSequenceReaderImpl(currDataSequence)); + countDownLatch.countDown(); + }); + } + } + + // 等待数据序列更新完成 + try { + countDownLatch.await(); + } catch (Exception e) { + e.printStackTrace(); + } + } +}