Browse Source

add test examples for state transfer layer

tags/1.0.0
zhangshuang 5 years ago
parent
commit
efedca1a08
7 changed files with 408 additions and 22 deletions
  1. +43
    -7
      source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java
  2. +20
    -13
      source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java
  3. +2
    -2
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java
  4. +70
    -0
      source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java
  5. +60
    -0
      source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceReaderImpl.java
  6. +58
    -0
      source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java
  7. +155
    -0
      source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java

+ 43
- 7
source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java View File

@@ -1,7 +1,9 @@
package com.jd.blockchain.peer.statetransfer; package com.jd.blockchain.peer.statetransfer;


import com.jd.blockchain.binaryproto.BinaryEncodingUtils;
import com.jd.blockchain.crypto.hash.HashDigest; import com.jd.blockchain.crypto.hash.HashDigest;
import com.jd.blockchain.ledger.LedgerBlock; import com.jd.blockchain.ledger.LedgerBlock;
import com.jd.blockchain.ledger.LedgerTransaction;
import com.jd.blockchain.ledger.core.LedgerManage; import com.jd.blockchain.ledger.core.LedgerManage;
import com.jd.blockchain.ledger.core.LedgerRepository; import com.jd.blockchain.ledger.core.LedgerRepository;
import com.jd.blockchain.ledger.core.TransactionSet; import com.jd.blockchain.ledger.core.TransactionSet;
@@ -38,8 +40,8 @@ public class DataSequenceReaderImpl implements DataSequenceReader {




/** /**
*
*
* @param id 账本哈希的Base58编码
* @return DataSequenceInfo 数据序列信息
*/ */
@Override @Override
public DataSequenceInfo getDSInfo(String id) { public DataSequenceInfo getDSInfo(String id) {
@@ -58,7 +60,10 @@ public class DataSequenceReaderImpl implements DataSequenceReader {


/** /**
* *
*
* @param id 账本哈希的Base58编码
* @param from 数据序列复制的起始高度
* @param to 数据序列复制的结束高度
* @return DataSequenceElement【】数据序列差异数据元素的数组
*/ */
@Override @Override
public DataSequenceElement[] getDSDiffContent(String id, long from, long to) { public DataSequenceElement[] getDSDiffContent(String id, long from, long to) {
@@ -72,12 +77,27 @@ public class DataSequenceReaderImpl implements DataSequenceReader {
} }


/** /**
*
*
* 账本交易序列化
* @param transaction 账本交易
* @return byte[] 对账本交易进行序列化的结果
*/
private byte[] serialize(LedgerTransaction transaction) {
return BinaryEncodingUtils.encode(transaction, LedgerTransaction.class);
}

/**
* 获得账本某一高度区块上的所有交易
* @param id 账本哈希的Base58编码
* @param height 账本的某个区块高度
* @return DataSequenceElement 数据序列差异数据元素
*/ */
@Override @Override
public DataSequenceElement getDSDiffContent(String id, long height) { public DataSequenceElement getDSDiffContent(String id, long height) {


int lastHeightTxTotalNums = 0;

byte[][] transacionDatas = null;

byte[] hashBytes = Base58Utils.decode(id); byte[] hashBytes = Base58Utils.decode(id);


HashDigest ledgerHash = new HashDigest(hashBytes); HashDigest ledgerHash = new HashDigest(hashBytes);
@@ -89,9 +109,25 @@ public class DataSequenceReaderImpl implements DataSequenceReader {


LedgerBlock ledgerBlock = ledgerRepository.getBlock(height); LedgerBlock ledgerBlock = ledgerRepository.getBlock(height);
TransactionSet transactionSet = ledgerRepository.getTransactionSet(ledgerBlock); TransactionSet transactionSet = ledgerRepository.getTransactionSet(ledgerBlock);
//todo


if (height > 0) {
lastHeightTxTotalNums = (int) ledgerRepository.getTransactionSet(ledgerRepository.getBlock(height - 1)).getTotalCount();
}

int currentHeightTxTotalNums = (int)ledgerRepository.getTransactionSet(ledgerRepository.getBlock(height)).getTotalCount();

// get all transactions from current height block
int currentHeightTxNums = currentHeightTxTotalNums - lastHeightTxTotalNums;

LedgerTransaction[] transactions = transactionSet.getTxs(lastHeightTxTotalNums , currentHeightTxNums);


return null;
for (int i = 0; i < transactions.length; i++) {
byte[] transactionData = serialize(transactions[i]);
transacionDatas[i] = transactionData;
}

return new DataSequenceElement(id, height, transacionDatas);
} }


} }

+ 20
- 13
source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java View File

@@ -30,8 +30,10 @@ public class DataSequenceWriterImpl implements DataSequenceWriter {
} }


/** /**
* check height to data sequence diff elements
*
* 检查数据序列差异元素中的高度是否合理;
* @param currHeight 当前结点的账本高度
* @param dsUpdateElements 需要更新到本地结点的数据序列元素List
* @return
*/ */
private int checkElementsHeight(long currHeight, ArrayList<DataSequenceElement> dsUpdateElements) { private int checkElementsHeight(long currHeight, ArrayList<DataSequenceElement> dsUpdateElements) {
boolean lossMiddleElements = false; boolean lossMiddleElements = false;
@@ -65,10 +67,12 @@ public class DataSequenceWriterImpl implements DataSequenceWriter {
} }


/** /**
*
*
* 对本地结点执行账本更新
* @param realmName 账本哈希的Base58编码
* @return void
*/ */
private void exeUpdate(String realmName) { private void exeUpdate(String realmName) {

for (int i = 0; i < deceidedElements.size(); i++) { for (int i = 0; i < deceidedElements.size(); i++) {
byte[][] element = deceidedElements.get(i).getData(); byte[][] element = deceidedElements.get(i).getData();


@@ -78,6 +82,7 @@ public class DataSequenceWriterImpl implements DataSequenceWriter {
for (byte[] txContent : element) { for (byte[] txContent : element) {
batchMessageHandle.processOrdered(msgId++, txContent, realmName, batchId); batchMessageHandle.processOrdered(msgId++, txContent, realmName, batchId);
} }
// 结块
batchMessageHandle.completeBatch(realmName, batchId); batchMessageHandle.completeBatch(realmName, batchId);
batchMessageHandle.commitBatch(realmName, batchId); batchMessageHandle.commitBatch(realmName, batchId);
} catch (Exception e) { } catch (Exception e) {
@@ -89,18 +94,19 @@ public class DataSequenceWriterImpl implements DataSequenceWriter {
} }


/** /**
*
*
* @param dsInfo 当前结点的数据序列信息
* @param diffContents 数据序列差异的数据元素数组
* @return int 更新结果码
*/ */
@Override @Override
public int updateDSInfo(DataSequenceInfo id, DataSequenceElement[] diffContents) {
public int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement[] diffContents) {
int result = 0; int result = 0;


try { try {
ArrayList<DataSequenceElement> dsUpdateElements = new ArrayList<DataSequenceElement>(); ArrayList<DataSequenceElement> dsUpdateElements = new ArrayList<DataSequenceElement>();
//remove unexpected elements //remove unexpected elements
for (int i = 0 ; i < diffContents.length; i++) { for (int i = 0 ; i < diffContents.length; i++) {
if (diffContents[i].getId().equals(id.getId())) {
if (diffContents[i].getId().equals(dsInfo.getId())) {
dsUpdateElements.add(diffContents[i]); dsUpdateElements.add(diffContents[i]);
} }
} }
@@ -108,7 +114,7 @@ public class DataSequenceWriterImpl implements DataSequenceWriter {
// sort elements by height // sort elements by height
Collections.sort(dsUpdateElements, new DataSequenceComparator()); Collections.sort(dsUpdateElements, new DataSequenceComparator());


currHeight = id.getHeight();
currHeight = dsInfo.getHeight();


// check element's height // check element's height
result = checkElementsHeight(currHeight, dsUpdateElements); result = checkElementsHeight(currHeight, dsUpdateElements);
@@ -119,7 +125,7 @@ public class DataSequenceWriterImpl implements DataSequenceWriter {
} }
// exe elements update // exe elements update
else { else {
exeUpdate(id.getId());
exeUpdate(dsInfo.getId());
return result; return result;
} }
} catch (Exception e) { } catch (Exception e) {
@@ -131,14 +137,15 @@ public class DataSequenceWriterImpl implements DataSequenceWriter {
} }


@Override @Override
public int updateDSInfo(DataSequenceInfo id, DataSequenceElement diffContents) {
public int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement diffContents) {
return 0; return 0;
} }




/** /**
* data sequence transfer error type
*
* 数据序列更新错误码
* @param
* @return
*/ */
public enum DataSequenceErrorType { public enum DataSequenceErrorType {
DATA_SEQUENCE_LOSS_FIRST_ELEMENT((byte) 0x1), DATA_SEQUENCE_LOSS_FIRST_ELEMENT((byte) 0x1),


+ 2
- 2
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java View File

@@ -15,12 +15,12 @@ public interface DataSequenceWriter {
*更新数据序列的当前状态,一次更新多个高度的差异 *更新数据序列的当前状态,一次更新多个高度的差异
* return void * return void
*/ */
int updateDSInfo(DataSequenceInfo id, DataSequenceElement[] diffContents);
int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement[] diffContents);


/** /**
*更新数据序列的当前状态,一次更新一个高度的差异 *更新数据序列的当前状态,一次更新一个高度的差异
* return void * return void
*/ */
int updateDSInfo(DataSequenceInfo id, DataSequenceElement diffContents);
int updateDSInfo(DataSequenceInfo id, DataSequenceElement diffContent);


} }

+ 70
- 0
source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java View File

@@ -0,0 +1,70 @@
package test.com.jd.blockchain.statetransfer;

import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo;

import java.net.InetSocketAddress;
import java.util.LinkedList;

public class DataSequence {

private InetSocketAddress address;
private String id;

private static LinkedList<DataSequenceElement> dataSequenceElements = new LinkedList<>();


public DataSequence(InetSocketAddress address, String id) {
this.address = address;
this.id = id;
}

public String getId() {
return id;
}

public InetSocketAddress getAddress() {
return address;
}

public void addElements(DataSequenceElement[] elements) {
for (DataSequenceElement element : elements) {
addElement(element);
}
}

public void addElement(DataSequenceElement element) {
try {
if (dataSequenceElements.size() == 0) {
if (element.getHeight() != 0) {
throw new IllegalArgumentException("Data sequence add element height error!");
}
dataSequenceElements.addLast(element);
}
else {
if (dataSequenceElements.getLast().getHeight() != element.getHeight() - 1) {
throw new IllegalArgumentException("Data sequence add element height error!");
}
dataSequenceElements.addLast(element);
}

} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}
}

public LinkedList<DataSequenceElement> getDataSequenceElements() {
return dataSequenceElements;
}

public DataSequenceInfo getDSInfo() {
if (dataSequenceElements.size() == 0) {
return new DataSequenceInfo(id, -1);
}
else {
return new DataSequenceInfo(id, dataSequenceElements.getLast().getHeight());
}
}

}

+ 60
- 0
source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceReaderImpl.java View File

@@ -0,0 +1,60 @@
package test.com.jd.blockchain.statetransfer;

import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo;
import com.jd.blockchain.statetransfer.callback.DataSequenceReader;

import java.net.InetSocketAddress;
import java.util.LinkedList;

/**
*数据序列差异的提供者需要使用的回调接口实现类
* @author zhangshuang
* @create 2019/4/22
* @since 1.0.0
*
*/

public class DataSequenceReaderImpl implements DataSequenceReader {

DataSequence currDataSequence;

public DataSequenceReaderImpl(DataSequence currDataSequence) {
this.currDataSequence = currDataSequence;
}

@Override
public DataSequenceInfo getDSInfo(String id) {
return currDataSequence.getDSInfo();
}

@Override
public DataSequenceElement[] getDSDiffContent(String id, long from, long to) {
DataSequenceElement[] elements = new DataSequenceElement[(int)(to - from + 1)];

int i = 0;
LinkedList<DataSequenceElement> dataSequenceElements = currDataSequence.getDataSequenceElements();
for (DataSequenceElement element : dataSequenceElements) {
if (element.getHeight() < from || element.getHeight() > to) {
continue;
}
else {
elements[i++] = element;
}
}

return elements;

}

@Override
public DataSequenceElement getDSDiffContent(String id, long height) {
for(DataSequenceElement dataSequenceElement : currDataSequence.getDataSequenceElements()) {
if (dataSequenceElement.getHeight() == height) {
return dataSequenceElement;

}
}
return null;
}
}

+ 58
- 0
source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java View File

@@ -0,0 +1,58 @@
package test.com.jd.blockchain.statetransfer;

import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo;
import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;

/**
*数据序列差异的请求者需要使用的回调接口实现类
* @author zhangshuang
* @create 2019/4/22
* @since 1.0.0
*
*/
public class DataSequenceWriterImpl implements DataSequenceWriter {

DataSequence currDataSequence;

public DataSequenceWriterImpl(DataSequence currDataSequence) {
this.currDataSequence = currDataSequence;
}

@Override
public int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement[] diffContents) {

currDataSequence.addElements(diffContents);

return DataSequenceErrorType.DATA_SEQUENCE_ELEMENT_HEIGHT_NORMAL.CODE;

}

@Override
public int updateDSInfo(DataSequenceInfo id, DataSequenceElement diffContent) {
currDataSequence.addElement(diffContent);
return DataSequenceErrorType.DATA_SEQUENCE_ELEMENT_HEIGHT_NORMAL.CODE;
}

public enum DataSequenceErrorType {
DATA_SEQUENCE_LOSS_FIRST_ELEMENT((byte) 0x1),
DATA_SEQUENCE_LOSS_MIDDLE_ELEMENT((byte) 0x2),
DATA_SEQUENCE_ELEMENT_HEIGHT_NORMAL((byte) 0x3),
;
public final int CODE;

private DataSequenceErrorType(byte code) {
this.CODE = code;
}

public static DataSequenceErrorType valueOf(byte code) {
for (DataSequenceErrorType errorType : DataSequenceErrorType.values()) {
if (errorType.CODE == code) {
return errorType;
}
}
throw new IllegalArgumentException("Unsupported code[" + code + "] of errorType!");
}
}

}

+ 155
- 0
source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java View File

@@ -0,0 +1,155 @@
package test.com.jd.blockchain.statetransfer;

import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo;
import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;
import com.jd.blockchain.statetransfer.process.DSProcessManager;
import com.jd.blockchain.utils.codec.Base58Utils;
import org.junit.Before;
import org.junit.Test;

import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class StateTransferLayerTest {

private final int[] listenPorts = new int[]{9000, 9010, 9020, 9030};

private String localIp = "127.0.0.1";

private int DataSequenceNum = 1;

private int nodesNum = 4;

private byte[] idBytes = new byte[20];

private Random rand = new Random();

private String[] dataSequenceIds;

private DSProcessManager dsProcessManager;

private DataSequenceReader dataSequenceReader;

private DataSequenceWriter dataSequenceWriter;

private InetSocketAddress[] remoteNodeIps = new InetSocketAddress[nodesNum];

private final ExecutorService threadPool = Executors.newFixedThreadPool(8);

private static LinkedList<DataSequence> dataSequencesPerNode = new LinkedList<>();

// 假定每个数据序列元素里有四条记录数据
private byte[][] dsElementDatas = new byte[4][];


@Before
public void init() {

// 产生两个唯一的数据序列Id标识
for (int i = 0; i < DataSequenceNum; i++) {

dataSequenceIds[i] = new String();
rand.nextBytes(idBytes);
dataSequenceIds[i] = Base58Utils.encode(idBytes);
}

// 创建数据序列处理管理者实例
dsProcessManager = new DSProcessManager();


// 准备好所有的远端结点,包括监听者
for (int i = 0; i < nodesNum; i++) {
remoteNodeIps[i] = new InetSocketAddress(localIp, listenPorts[i]);
}

// 为数据序列的每个高度准备好内容,为了方便测试,每个高度的内容设置为一致
for (int i = 0; i < dsElementDatas.length; i++) {
rand.nextBytes(idBytes);
dsElementDatas[i] = idBytes;
}

// 为结点准备数据序列
for (String id : dataSequenceIds) {
for (int i = 0; i < remoteNodeIps.length; i++) {
DataSequence dataSequence = new DataSequence(remoteNodeIps[i], id);

// 为数据序列的0,1,2高度添加内容
for (int j = 0; j < 3; i++) {
dataSequence.addElement(new DataSequenceElement(id, i, dsElementDatas));
}
dataSequencesPerNode.addLast(dataSequence);
}

// 把其中一个结点的数据序列与其他结点区别开来
for (int i = 0; i < dataSequencesPerNode.size(); i++) {
DataSequence dataSequence = dataSequencesPerNode.get(i);
if (dataSequence.getAddress().getPort() != listenPorts[0]) {
// 为数据序列的3,4高度添加内容
for (int j = 3; j < 5; i++) {
dataSequence.addElement(new DataSequenceElement(id, j, dsElementDatas));
}
}
}
}
}

// 获得除监听结点之外的其他远端结点
InetSocketAddress[] getTargetNodesIp(InetSocketAddress listenIp, InetSocketAddress[] remoteNodeIps) {

InetSocketAddress[] targets = new InetSocketAddress[remoteNodeIps.length - 1];
int j = 0;

for (int i = 0; i < remoteNodeIps.length; i++) {
if ((remoteNodeIps[i].getHostName().equals(listenIp.getHostName())) && (remoteNodeIps[i].getPort() == listenIp.getPort())) {
continue;
}
targets[j++] = new InetSocketAddress(remoteNodeIps[i].getHostName(), remoteNodeIps[i].getPort());
}

return targets;

}

DataSequence findDataSequence(String id, InetSocketAddress listenNodeAddr) {
for (DataSequence dataSequence : dataSequencesPerNode) {
if ((dataSequence.getAddress().getPort() == listenNodeAddr.getPort() && (dataSequence.getAddress().getHostName().equals(listenNodeAddr.getHostName()))
&& (dataSequence.getId().equals(id)))) {
return dataSequence;
}
}
return null;
}


@Test
public void test() {

CountDownLatch countDownLatch = new CountDownLatch(nodesNum);

for (String id : dataSequenceIds) {
for (int i = 0; i < nodesNum; i++) {
InetSocketAddress listenNode = remoteNodeIps[i];
threadPool.execute(() -> {
DataSequence currDataSequence = findDataSequence(id, listenNode);
DataSequenceInfo dsInfo = currDataSequence.getDSInfo();
InetSocketAddress[] targets = getTargetNodesIp(listenNode, remoteNodeIps);
dsProcessManager.startDSProcess(dsInfo, listenNode, targets, new DataSequenceWriterImpl(currDataSequence), new DataSequenceReaderImpl(currDataSequence));
countDownLatch.countDown();
});
}
}

// 等待数据序列更新完成
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}

Loading…
Cancel
Save