Browse Source

add notes for code

tags/1.0.0
zhangshuang 6 years ago
parent
commit
46c1e2b20b
21 changed files with 206 additions and 91 deletions
  1. +0
    -1
      source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java
  2. +0
    -1
      source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java
  3. +1
    -1
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java
  4. +1
    -1
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceInfo.java
  5. +13
    -7
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceReader.java
  6. +10
    -6
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java
  7. +10
    -2
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/comparator/DataSequenceComparator.java
  8. +6
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/exception/DataSequenceException.java
  9. +14
    -8
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java
  10. +19
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSMsgResolverFactory.java
  11. +5
    -1
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceLoadMessage.java
  12. +19
    -7
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgDecoder.java
  13. +22
    -5
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java
  14. +21
    -14
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java
  15. +38
    -27
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java
  16. +6
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSDiffRequestResult.java
  17. +4
    -5
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java
  18. +8
    -0
      source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java
  19. +1
    -2
      source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceReaderImpl.java
  20. +1
    -2
      source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java
  21. +7
    -1
      source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java

+ 0
- 1
source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java View File

@@ -22,7 +22,6 @@ import org.springframework.beans.factory.annotation.Autowired;
* @author zhangshuang * @author zhangshuang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
*
*/ */
public class DataSequenceReaderImpl implements DataSequenceReader { public class DataSequenceReaderImpl implements DataSequenceReader {




+ 0
- 1
source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java View File

@@ -15,7 +15,6 @@ import java.util.Collections;
* @author zhangshuang * @author zhangshuang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
*
*/ */
public class DataSequenceWriterImpl implements DataSequenceWriter { public class DataSequenceWriterImpl implements DataSequenceWriter {




+ 1
- 1
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java View File

@@ -3,7 +3,7 @@ package com.jd.blockchain.statetransfer;
import java.io.Serializable; import java.io.Serializable;


/** /**
*数据序列复制的元素或单位
* 数据序列需要复制内容的元素或单位
* @author zhangshuang * @author zhangshuang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0


+ 1
- 1
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceInfo.java View File

@@ -1,7 +1,7 @@
package com.jd.blockchain.statetransfer; package com.jd.blockchain.statetransfer;


/** /**
*共识结点上的某个数据序列的当前状态信息,每个共识结点可以对应任意个数据序列;
* 共识结点上的某个数据序列的当前状态信息,每个共识结点可以对应任意个数据序列;
* @author zhangshuang * @author zhangshuang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0


+ 13
- 7
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceReader.java View File

@@ -4,7 +4,7 @@ import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo; import com.jd.blockchain.statetransfer.DataSequenceInfo;


/** /**
*数据序列差异提供者需要使用的回调接口
* 数据序列差异提供者需要使用的回调接口
* @author zhangshuang * @author zhangshuang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
@@ -12,22 +12,28 @@ import com.jd.blockchain.statetransfer.DataSequenceInfo;
public interface DataSequenceReader { public interface DataSequenceReader {


/** /**
* 差异的提供者根据输入的数据序列标识符获取当前的数据序列信息;
*
* 差异提供者根据数据序列标识符获取数据序列当前状态;
* @param id 数据序列标识符
* @return 数据序列当前状态信息
*/ */
DataSequenceInfo getDSInfo(String id); DataSequenceInfo getDSInfo(String id);




/** /**
* 差异的提供者根据输入的数据序列标识符以及起始,结束高度提供数据序列的差异内容;
*
* 差异提供者根据数据序列标识符以及起始,结束高度提供数据序列该范围的差异内容;
* @param id 数据序列标识符
* @param from 差异的起始高度
* @param to 差异的结束高度
* @return 差异元素组成的数组
*/ */
DataSequenceElement[] getDSDiffContent(String id, long from, long to); DataSequenceElement[] getDSDiffContent(String id, long from, long to);




/** /**
* 差异的提供者根据输入的数据序列标识符以及高度提供数据序列的差异内容;
*
* 差异提供者根据数据序列标识符以及高度提供数据序列的差异内容;
* @param id 数据序列标识符
* @param height 要获得哪个高度的差异元素
* @return 差异元素
*/ */
DataSequenceElement getDSDiffContent(String id, long height); DataSequenceElement getDSDiffContent(String id, long height);
} }

+ 10
- 6
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java View File

@@ -4,7 +4,7 @@ import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo; import com.jd.blockchain.statetransfer.DataSequenceInfo;


/** /**
*数据序列差异请求者获得差异内容后需要回调该接口,通过接口提供的方法对指定数据序列执行差异内容的重放,并更新数据序列的当前状态;
* 数据序列差异请求者获得差异内容后需要回调该接口
* @author zhangshuang * @author zhangshuang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
@@ -12,15 +12,19 @@ import com.jd.blockchain.statetransfer.DataSequenceInfo;
public interface DataSequenceWriter { public interface DataSequenceWriter {


/** /**
*更新数据序列的当前状态,一次更新多个高度的差异
* return void
* 差异请求者更新本地数据序列的状态,一次可以更新多个差异元素
* @param dsInfo 数据序列当前状态信息
* @param diffContents 需要更新的差异元素数组
* @return 更新结果编码
*/ */
int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement[] diffContents); int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement[] diffContents);


/** /**
*更新数据序列的当前状态,一次更新一个高度的差异
* return void
* 差异请求者更新本地数据序列的状态,一次只更新一个差异元素
* @param dsInfo 数据序列当前状态信息
* @param diffContent 需要更新的差异元素
* @return 更新结果编码
*/ */
int updateDSInfo(DataSequenceInfo id, DataSequenceElement diffContent);
int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement diffContent);


} }

+ 10
- 2
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/comparator/DataSequenceComparator.java View File

@@ -5,12 +5,20 @@ import com.jd.blockchain.statetransfer.DataSequenceElement;
import java.util.Comparator; import java.util.Comparator;


/** /**
*
*
* 数据序列差异元素的高度比较器
* @author zhangshuang
* @create 2019/4/18
* @since 1.0.0
*/ */
public class DataSequenceComparator implements Comparator<DataSequenceElement> { public class DataSequenceComparator implements Comparator<DataSequenceElement> {


// sort by data sequence height // sort by data sequence height
/**
* 对差异元素根据高度大小排序
* @param o1 差异元素1
* @param o2 差异元素2
* @return >0 or <0
*/
@Override @Override
public int compare(DataSequenceElement o1, DataSequenceElement o2) { public int compare(DataSequenceElement o1, DataSequenceElement o2) {
long height1; long height1;


+ 6
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/exception/DataSequenceException.java View File

@@ -1,5 +1,11 @@
package com.jd.blockchain.statetransfer.exception; package com.jd.blockchain.statetransfer.exception;


/**
* 数据序列异常处理
* @author zhangshuang
* @create 2019/4/18
* @since 1.0.0
*/
public class DataSequenceException extends RuntimeException { public class DataSequenceException extends RuntimeException {


private static final long serialVersionUID = -4090881296855827889L; private static final long serialVersionUID = -4090881296855827889L;


+ 14
- 8
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java View File

@@ -8,7 +8,7 @@ import com.jd.blockchain.stp.communication.MessageExecutor;
import com.jd.blockchain.stp.communication.RemoteSession; import com.jd.blockchain.stp.communication.RemoteSession;


/** /**
*
* 数据序列差异提供者使用,解析收到的差异请求消息并产生响应
* @author zhangshuang * @author zhangshuang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
@@ -24,8 +24,11 @@ public class DSDefaultMessageExecutor implements MessageExecutor {
} }


/** /**
* 对状态机复制的请求进行响应
*
* 对状态机复制的差异请求进行响应
* @param key 请求消息的Key
* @param data 需要解码的字节数组
* @param session 指定响应需要使用的目标结点会话
* @return 配置为自动响应时,返回值为响应的字节数组,配置为手动响应时,不需要关注返回值
*/ */


@Override @Override
@@ -34,21 +37,25 @@ public class DSDefaultMessageExecutor implements MessageExecutor {
try { try {
Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(data); Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(data);


// 解析CMD_DSINFO_REQUEST 请求的情况
if (object instanceof String) { if (object instanceof String) {
String id = (String)object; String id = (String)object;
byte[] respLoadMsg = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE, id, 0, 0); byte[] respLoadMsg = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE, id, 0, 0);
session.reply(key, new DataSequenceLoadMessage(respLoadMsg)); session.reply(key, new DataSequenceLoadMessage(respLoadMsg));
} }
// 解析CMD_GETDSDIFF_REQUEST 请求的情况
else if (object instanceof DSDiffRequestResult) { else if (object instanceof DSDiffRequestResult) {


DSDiffRequestResult requestResult = (DSDiffRequestResult)object; DSDiffRequestResult requestResult = (DSDiffRequestResult)object;
String id = requestResult.getId(); String id = requestResult.getId();
long fromHeight = requestResult.getFromHeight(); long fromHeight = requestResult.getFromHeight();
long toHeight = requestResult.getToHeight(); long toHeight = requestResult.getToHeight();
//每个高度的数据序列差异元素进行一次响应的情况
for (long i = fromHeight; i < toHeight + 1; i++) { for (long i = fromHeight; i < toHeight + 1; i++) {
byte[] respLoadMsg = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE, id, i, i); byte[] respLoadMsg = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE, id, i, i);
session.reply(key, new DataSequenceLoadMessage(respLoadMsg)); session.reply(key, new DataSequenceLoadMessage(respLoadMsg));
} }
//所有差异进行一次响应的情况
} }
else { else {
throw new IllegalArgumentException("Receive data exception, unknown message type!"); throw new IllegalArgumentException("Receive data exception, unknown message type!");
@@ -61,14 +68,13 @@ public class DSDefaultMessageExecutor implements MessageExecutor {
return null; return null;
} }


/**
* 响应类型设置
* 分手动响应,自动响应两种类型
*/
@Override @Override
public REPLY replyType() { public REPLY replyType() {
return REPLY.MANUAL; return REPLY.MANUAL;
} }


/**
*
*
*/

} }

+ 19
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSMsgResolverFactory.java View File

@@ -3,12 +3,31 @@ package com.jd.blockchain.statetransfer.message;
import com.jd.blockchain.statetransfer.callback.DataSequenceReader; import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
import com.jd.blockchain.statetransfer.callback.DataSequenceWriter; import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;


/**
* 数据序列消息解析器工厂
* @author zhangshuang
* @create 2019/4/18
* @since 1.0.0
*
*/
public class DSMsgResolverFactory { public class DSMsgResolverFactory {


/**
* 获得数据序列消息编码器实例
* @param dsWriter 差异请求者执行数据序列更新的执行器
* @param dsReader 差异响应者执行数据序列读取的执行器
* @return 消息编码器实例
*/
public static DataSequenceMsgEncoder getEncoder(DataSequenceWriter dsWriter, DataSequenceReader dsReader) { public static DataSequenceMsgEncoder getEncoder(DataSequenceWriter dsWriter, DataSequenceReader dsReader) {
return new DataSequenceMsgEncoder(dsWriter, dsReader); return new DataSequenceMsgEncoder(dsWriter, dsReader);
} }


/**
* 获得数据序列消息解码器实例
* @param dsWriter 差异请求者执行数据序列更新的执行器
* @param dsReader 差异响应者执行数据序列读取的执行器
* @return 消息解码器实例
*/
public static DataSequenceMsgDecoder getDecoder(DataSequenceWriter dsWriter, DataSequenceReader dsReader) { public static DataSequenceMsgDecoder getDecoder(DataSequenceWriter dsWriter, DataSequenceReader dsReader) {
return new DataSequenceMsgDecoder(dsWriter, dsReader); return new DataSequenceMsgDecoder(dsWriter, dsReader);
} }


+ 5
- 1
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceLoadMessage.java View File

@@ -3,10 +3,14 @@ package com.jd.blockchain.statetransfer.message;
import com.jd.blockchain.stp.communication.message.LoadMessage; import com.jd.blockchain.stp.communication.message.LoadMessage;


/** /**
*
* 数据序列复制的负载消息
* @author zhangshuang
* @create 2019/4/18
* @since 1.0.0
* *
*/ */
public class DataSequenceLoadMessage implements LoadMessage { public class DataSequenceLoadMessage implements LoadMessage {

byte[] bytes; byte[] bytes;


public DataSequenceLoadMessage(byte[] bytes) { public DataSequenceLoadMessage(byte[] bytes) {


+ 19
- 7
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgDecoder.java View File

@@ -9,6 +9,12 @@ import com.jd.blockchain.statetransfer.result.DSDiffRequestResult;
import com.jd.blockchain.utils.io.BytesUtils; import com.jd.blockchain.utils.io.BytesUtils;
import com.jd.blockchain.utils.serialize.binary.BinarySerializeUtils; import com.jd.blockchain.utils.serialize.binary.BinarySerializeUtils;


/**
* 数据序列消息解码器
* @author zhangshuang
* @create 2019/4/18
* @since 1.0.0
*/
public class DataSequenceMsgDecoder { public class DataSequenceMsgDecoder {


private int heightSize = 8; private int heightSize = 8;
@@ -34,8 +40,9 @@ public class DataSequenceMsgDecoder {




/** /**
*
*
* 对编过码的字节数组解码,还原成对象实例
* @param loadMessage 字节序列
* @return 解码后的对象
*/ */
public Object decode(byte[] loadMessage) { public Object decode(byte[] loadMessage) {


@@ -48,6 +55,7 @@ public class DataSequenceMsgDecoder {
int dataLength = BytesUtils.toInt(loadMessage, 0, 4); int dataLength = BytesUtils.toInt(loadMessage, 0, 4);
byte msgCode = loadMessage[4]; byte msgCode = loadMessage[4];


// send by diff provider, diff requester decode
if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE.CODE) { if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE.CODE) {
respHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize); respHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize);
idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize + heightSize, 4); idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize + heightSize, 4);
@@ -55,19 +63,25 @@ public class DataSequenceMsgDecoder {
System.arraycopy(loadMessage, 4 + msgTypeSize + heightSize + 4, idBytes, 0, idSize); System.arraycopy(loadMessage, 4 + msgTypeSize + heightSize + 4, idBytes, 0, idSize);
id = new String(idBytes); id = new String(idBytes);
return new DataSequenceInfo(id, respHeight); return new DataSequenceInfo(id, respHeight);
} else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE.CODE) {
}
// send by diff provider, diff requester decode
else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE.CODE) {
diffElemSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize, 4); diffElemSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize, 4);
diffElem = new byte[diffElemSize]; diffElem = new byte[diffElemSize];
System.arraycopy(loadMessage, 4 + msgTypeSize + 4, diffElem, 0, diffElemSize); System.arraycopy(loadMessage, 4 + msgTypeSize + 4, diffElem, 0, diffElemSize);
dsElement = BinarySerializeUtils.deserialize(diffElem); dsElement = BinarySerializeUtils.deserialize(diffElem);
return dsElement; return dsElement;
} else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST.CODE) {
}
// send by diff requester, diff provider decode
else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST.CODE) {
idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize, 4); idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize, 4);
idBytes = new byte[idSize]; idBytes = new byte[idSize];
System.arraycopy(loadMessage, 4 + msgTypeSize + 4, idBytes, 0, idSize); System.arraycopy(loadMessage, 4 + msgTypeSize + 4, idBytes, 0, idSize);
id = new String(idBytes); id = new String(idBytes);
return id; return id;
} else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST.CODE) {
}
// send by diff requester, diff provider decode
else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST.CODE) {
fromHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize); fromHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize);
toHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize + heightSize); toHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize + heightSize);
idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize + heightSize + heightSize, 4); idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize + heightSize + heightSize, 4);
@@ -90,6 +104,4 @@ public class DataSequenceMsgDecoder {
return null; return null;
} }




} }

+ 22
- 5
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java View File

@@ -7,6 +7,12 @@ import com.jd.blockchain.statetransfer.process.DSTransferProcess;
import com.jd.blockchain.utils.io.BytesUtils; import com.jd.blockchain.utils.io.BytesUtils;
import com.jd.blockchain.utils.serialize.binary.BinarySerializeUtils; import com.jd.blockchain.utils.serialize.binary.BinarySerializeUtils;


/**
* 数据序列消息编码器
* @author zhangshuang
* @create 2019/4/18
* @since 1.0.0
*/
public class DataSequenceMsgEncoder { public class DataSequenceMsgEncoder {


private int heightSize = 8; private int heightSize = 8;
@@ -21,8 +27,12 @@ public class DataSequenceMsgEncoder {
} }


/** /**
* 目前暂时考虑fromHeight与toHeight相同的情况,即每次只对一个高度的差异编码并响应
*
* 目前暂时考虑fromHeight与toHeight相同的情况,即每次只对一个高度的差异内容进行编码并响应
* 把消息编码成字节数组,再交给通信层传输
* @param msgType 数据序列状态复制消息类型
* @param id 数据序列唯一标识符
* @param fromHeight 差异元素起始高度
* @param toHeight 差异元素结束高度
*/ */
public byte[] encode(DSTransferProcess.DataSequenceMsgType msgType, String id, long fromHeight, long toHeight) { public byte[] encode(DSTransferProcess.DataSequenceMsgType msgType, String id, long fromHeight, long toHeight) {


@@ -33,6 +43,7 @@ public class DataSequenceMsgEncoder {
byte[] loadMessage = null; byte[] loadMessage = null;


// different encoding methods for different message types // different encoding methods for different message types
// send by diff requester, diff requester encode
if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST) { if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST) {


// CMD_DSINFO_REQUEST Message parts : 4 bytes total message size, 1 byte message type coe, // CMD_DSINFO_REQUEST Message parts : 4 bytes total message size, 1 byte message type coe,
@@ -46,7 +57,9 @@ public class DataSequenceMsgEncoder {
loadMessage[4] = msgType.CODE; loadMessage[4] = msgType.CODE;
System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize, 4); System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize, 4);
System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + 4, idSize); System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + 4, idSize);
} else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST) {
}
// send by diff requester, diff requester encode
else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST) {


// CMD_GETDSDIFF_REQUEST Message parts : 4 bytes total message size, 1 byte message type coe, 8 bytes from height, // CMD_GETDSDIFF_REQUEST Message parts : 4 bytes total message size, 1 byte message type coe, 8 bytes from height,
// 8 bytes to height, 4 bytes id length, id content size bytes // 8 bytes to height, 4 bytes id length, id content size bytes
@@ -61,7 +74,9 @@ public class DataSequenceMsgEncoder {
System.arraycopy(BytesUtils.toBytes(toHeight), 0, loadMessage, 4 + msgTypeSize + heightSize, heightSize); System.arraycopy(BytesUtils.toBytes(toHeight), 0, loadMessage, 4 + msgTypeSize + heightSize, heightSize);
System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize + heightSize + heightSize, 4); System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize + heightSize + heightSize, 4);
System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + heightSize + heightSize + 4, idSize); System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + heightSize + heightSize + 4, idSize);
} else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE) {
}
// send by diff provider, diff provider encode
else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE) {


// CMD_DSINFO_RESPONSE Message parts : 4 bytes total message size, 1 byte message type coe, 8 bytes data sequence local height, // CMD_DSINFO_RESPONSE Message parts : 4 bytes total message size, 1 byte message type coe, 8 bytes data sequence local height,
// 4 bytes id length, id content size bytes // 4 bytes id length, id content size bytes
@@ -77,7 +92,9 @@ public class DataSequenceMsgEncoder {
System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize + heightSize, 4); System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize + heightSize, 4);
System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + heightSize + 4, idSize); System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + heightSize + 4, idSize);


} else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE) {
}
// send by diff provider, diff provider encode
else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE) {
if (fromHeight != toHeight) { if (fromHeight != toHeight) {
throw new IllegalArgumentException("Height parameter error!"); throw new IllegalArgumentException("Height parameter error!");
} }


+ 21
- 14
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java View File

@@ -13,17 +13,17 @@ import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager; import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import com.jd.blockchain.stp.communication.node.LocalNode; import com.jd.blockchain.stp.communication.node.LocalNode;
import com.jd.blockchain.stp.communication.node.RemoteNode; import com.jd.blockchain.stp.communication.node.RemoteNode;
import com.jd.blockchain.utils.concurrent.CompletableAsyncFuture;


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;


/** /**
*
* 数据序列状态复制过程管理器
* @author zhangshuang * @author zhangshuang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
*
*/ */
public class DSProcessManager { public class DSProcessManager {


@@ -32,14 +32,19 @@ public class DSProcessManager {
private long dsInfoResponseTimeout = 20000; private long dsInfoResponseTimeout = 20000;
private ExecutorService writeExecutors = Executors.newFixedThreadPool(5); private ExecutorService writeExecutors = Executors.newFixedThreadPool(5);
private int returnCode = 0; private int returnCode = 0;

/** /**
*
*
* 启动一个指定数据序列的状态复制过程
* @param dsInfo 数据序列当前状态信息
* @param listener 本地监听者
* @param targets 目标结点
* @param dsWriter 差异请求者执行数据序列更新的执行器
* @param dsReader 差异响应者执行数据序列读取的执行器
* @return returnCode 执行结果码
*/ */
public int startDSProcess(DataSequenceInfo dsInfo, InetSocketAddress listener, InetSocketAddress[] targets, DataSequenceWriter dsWriter, DataSequenceReader dsReader) { public int startDSProcess(DataSequenceInfo dsInfo, InetSocketAddress listener, InetSocketAddress[] targets, DataSequenceWriter dsWriter, DataSequenceReader dsReader) {


// create remote sessions manager, add listener // create remote sessions manager, add listener

LocalNode listenNode = new LocalNode(listener.getHostName(), listener.getPort(), new DSDefaultMessageExecutor(dsReader, dsWriter)); LocalNode listenNode = new LocalNode(listener.getHostName(), listener.getPort(), new DSDefaultMessageExecutor(dsReader, dsWriter));


RemoteSessionManager remoteSessionManager = new RemoteSessionManager(listenNode); RemoteSessionManager remoteSessionManager = new RemoteSessionManager(listenNode);
@@ -116,17 +121,13 @@ public class DSProcessManager {


RemoteSession responseSession = findResponseSession(diffResult.getMaxHeightRemoteNode(), remoteSessions); RemoteSession responseSession = findResponseSession(diffResult.getMaxHeightRemoteNode(), remoteSessions);
System.out.println("Async send CMD_GETDSDIFF_REQUEST msg to targets will start!"); System.out.println("Async send CMD_GETDSDIFF_REQUEST msg to targets will start!");

// step5: collect get data sequence diff response // step5: collect get data sequence diff response
for (long height = dsInfo.getHeight() + 1; height < diffResult.getMaxHeight() + 1; height++) { for (long height = dsInfo.getHeight() + 1; height < diffResult.getMaxHeight() + 1; height++) {
CallBackDataListener dsDiffResponse = dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST, responseSession, height, height, callBackBarrierDiff); CallBackDataListener dsDiffResponse = dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST, responseSession, height, height, callBackBarrierDiff);
dsDiffResponses.addLast(dsDiffResponse); dsDiffResponses.addLast(dsDiffResponse);
} }

// 考虑性能
// writeExecutors.execute(() -> {
//
// });

// 上述发送不合理,考虑一次性发送请求
System.out.println("Wait CMD_GETDSDIFF_RESPONSE msg from targets!"); System.out.println("Wait CMD_GETDSDIFF_RESPONSE msg from targets!");
LinkedList<byte[]> receiveDiffResponses = new LinkedList<>(); LinkedList<byte[]> receiveDiffResponses = new LinkedList<>();
if (callBackBarrierDiff.tryCall()) { if (callBackBarrierDiff.tryCall()) {
@@ -159,6 +160,12 @@ public class DSProcessManager {
return returnCode; return returnCode;
} }


/**
* 根据远端结点找与远端结点建立的会话
* @param remoteNode 远端结点
* @param remoteSessions 本地维护的远端结点会话表
* @return 与远端结点对应的会话
*/
RemoteSession findResponseSession(RemoteNode remoteNode, RemoteSession[] remoteSessions) { RemoteSession findResponseSession(RemoteNode remoteNode, RemoteSession[] remoteSessions) {
for (RemoteSession remoteSession : remoteSessions) { for (RemoteSession remoteSession : remoteSessions) {
if (remoteSession.remoteNode().equals(remoteNode)) { if (remoteSession.remoteNode().equals(remoteNode)) {
@@ -171,9 +178,9 @@ public class DSProcessManager {
* *
* *
*/ */
void setDSReader(DataSequenceReader reader) {
}
// void setDSReader(DataSequenceReader reader) {
//
// }




} }

+ 38
- 27
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java View File

@@ -20,7 +20,7 @@ import java.util.LinkedList;
import java.util.Map; import java.util.Map;


/** /**
*
* 数据序列状态复制过程
* @author zhangshuang * @author zhangshuang
* @create 2019/4/11 * @create 2019/4/11
* @since 1.0.0 * @since 1.0.0
@@ -36,8 +36,8 @@ public class DSTransferProcess {
private String id; private String id;


/** /**
*
*
* @param dsInfo 数据序列当前状态信息
* @param targets 目标结点
*/ */
public DSTransferProcess(DataSequenceInfo dsInfo, InetSocketAddress[] targets) { public DSTransferProcess(DataSequenceInfo dsInfo, InetSocketAddress[] targets) {
this.dsInfo = dsInfo; this.dsInfo = dsInfo;
@@ -45,30 +45,46 @@ public class DSTransferProcess {
this.id = dsInfo.getId(); this.id = dsInfo.getId();
} }


/**
* @param dsWriter 差异请求者执行数据序列更新的执行器
* @return void
*/
public void setDSWriter(DataSequenceWriter dsWriter) { public void setDSWriter(DataSequenceWriter dsWriter) {
this.dsWriter = dsWriter; this.dsWriter = dsWriter;
} }


/**
* @param dsReader 差异响应者执行数据序列读取的执行器
* @return void
*/
public void setDSReader(DataSequenceReader dsReader) { public void setDSReader(DataSequenceReader dsReader) {
this.dsReader = dsReader; this.dsReader = dsReader;
} }


/**
* @param remoteSessionManager 远端会话管理器
* @return void
*/
public void setRemoteSessionManager(RemoteSessionManager remoteSessionManager) { public void setRemoteSessionManager(RemoteSessionManager remoteSessionManager) {
this.remoteSessionManager = remoteSessionManager; this.remoteSessionManager = remoteSessionManager;
} }




/** /**
* get unique id from data sequence transfer process
* *
* @return 数据序列标识符
*/ */
public String getId() { public String getId() {
return id; return id;
} }


/** /**
*
*
* @param msgType 数据序列差异请求消息类型
* @param remoteSession 目标结点对应的会话
* @param fromHeight 差异起始高度
* @param toHeight 差异结束高度
* @param callBackBarrier 异步回调
* @return 异步回调
*/ */
CallBackDataListener send(DataSequenceMsgType msgType, RemoteSession remoteSession, long fromHeight, long toHeight, CallBackBarrier callBackBarrier) { CallBackDataListener send(DataSequenceMsgType msgType, RemoteSession remoteSession, long fromHeight, long toHeight, CallBackBarrier callBackBarrier) {


@@ -76,9 +92,11 @@ public class DSTransferProcess {


return remoteSession.asyncRequest(new DataSequenceLoadMessage(loadMessage), callBackBarrier); return remoteSession.asyncRequest(new DataSequenceLoadMessage(loadMessage), callBackBarrier);
} }

/** /**
*
*
* 计算数据序列差异元素数组
* @param diffArray 差异的字节数组
* @return 对差异字节数组的解码结果
*/ */
public ArrayList<DataSequenceElement> computeDiffElement(byte[][] diffArray) { public ArrayList<DataSequenceElement> computeDiffElement(byte[][] diffArray) {


@@ -98,8 +116,9 @@ public class DSTransferProcess {
} }


/** /**
*
*
* 根据差异提供者响应的数据序列状态信息找到拥有最大数据序列高度的远端结点
* @param receiveResponses 数据序列差异请求者收到的远端结点状态的响应信息
* @return 得到远端数据序列的最大高度以及拥有者结点
*/ */
public DSInfoResponseResult computeDiffInfo(LinkedList<CallBackDataListener> receiveResponses) { public DSInfoResponseResult computeDiffInfo(LinkedList<CallBackDataListener> receiveResponses) {
long maxHeight = 0; long maxHeight = 0;
@@ -110,11 +129,10 @@ public class DSTransferProcess {
try { try {
for (CallBackDataListener receiveResponse : receiveResponses) { for (CallBackDataListener receiveResponse : receiveResponses) {
Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(receiveResponse.getCallBackData()); Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(receiveResponse.getCallBackData());
// System.out.println("ComputeDiffInfo object = "+object);
if (object instanceof DataSequenceInfo) { if (object instanceof DataSequenceInfo) {
DataSequenceInfo dsInfo = (DataSequenceInfo) object; DataSequenceInfo dsInfo = (DataSequenceInfo) object;
long height = dsInfo.getHeight(); long height = dsInfo.getHeight();
// System.out.println("ComputeDiffInfo height = " +height);
// sava max height and its remote node
if (maxHeight < height) { if (maxHeight < height) {
maxHeight = height; maxHeight = height;
maxHeightRemoteNode = receiveResponse.remoteNode(); maxHeightRemoteNode = receiveResponse.remoteNode();
@@ -134,24 +152,17 @@ public class DSTransferProcess {
} }


/** /**
*
*
*/
public void getDSInfo(String id) {

}

/**
*
*
* 获取本复制过程维护的远端会话表
* @param
* @return 远端会话表数组
*/ */
public RemoteSession[] getSessions() { public RemoteSession[] getSessions() {
return remoteSessions; return remoteSessions;
} }


/** /**
* close all sessions
*
* 关闭本复制过程维护的所有远端会话
* @return void
*/ */
public void close() { public void close() {
for (RemoteSession session : remoteSessions) { for (RemoteSession session : remoteSessions) {
@@ -160,8 +171,8 @@ public class DSTransferProcess {
} }


/** /**
* establish connections with target remote nodes
*
* 建立与远端目标结点的连接,产生本地维护的远端会话表
* @return void
*/ */
public void start() { public void start() {


@@ -176,7 +187,7 @@ public class DSTransferProcess {




/** /**
* data sequence transfer message type
* 数据序列状态传输使用的消息类型
* *
*/ */
public enum DataSequenceMsgType { public enum DataSequenceMsgType {


+ 6
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSDiffRequestResult.java View File

@@ -1,5 +1,11 @@
package com.jd.blockchain.statetransfer.result; package com.jd.blockchain.statetransfer.result;


/**
* 数据序列差异提供者解码请求者"CMD_GETDSDIFF_REQUEST"消息时得到的结果
* @author zhangshuang
* @create 2019/4/18
* @since 1.0.0
*/
public class DSDiffRequestResult { public class DSDiffRequestResult {


String id; String id;


+ 4
- 5
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java View File

@@ -1,13 +1,12 @@
package com.jd.blockchain.statetransfer.result; package com.jd.blockchain.statetransfer.result;


import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.node.RemoteNode; import com.jd.blockchain.stp.communication.node.RemoteNode;


/** /**
*
*
*
*
* 数据序列差异请求者解码提供者"CMD_DSINFO_RESPONSE"消息时得到的结果
* @author zhangshuang
* @create 2019/4/18
* @since 1.0.0
*/ */
public class DSInfoResponseResult { public class DSInfoResponseResult {




+ 8
- 0
source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java View File

@@ -6,11 +6,18 @@ import com.jd.blockchain.statetransfer.DataSequenceInfo;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.LinkedList; import java.util.LinkedList;


/**
* 测试过程建立的一个数据序列
* @author zhangshuang
* @create 2019/4/18
* @since 1.0.0
*/
public class DataSequence { public class DataSequence {


private InetSocketAddress address; private InetSocketAddress address;
private String id; private String id;


// 每个数据序列维护了一系列的数据序列元素
private LinkedList<DataSequenceElement> dataSequenceElements = new LinkedList<>(); private LinkedList<DataSequenceElement> dataSequenceElements = new LinkedList<>();




@@ -27,6 +34,7 @@ public class DataSequence {
return address; return address;
} }



public void addElements(DataSequenceElement[] elements) { public void addElements(DataSequenceElement[] elements) {
for (DataSequenceElement element : elements) { for (DataSequenceElement element : elements) {
addElement(element); addElement(element);


+ 1
- 2
source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceReaderImpl.java View File

@@ -8,11 +8,10 @@ import java.net.InetSocketAddress;
import java.util.LinkedList; import java.util.LinkedList;


/** /**
*数据序列差异的提供者需要使用的回调接口实现类
* 数据序列差异的提供者需要使用的回调接口实现类(测试)
* @author zhangshuang * @author zhangshuang
* @create 2019/4/22 * @create 2019/4/22
* @since 1.0.0 * @since 1.0.0
*
*/ */


public class DataSequenceReaderImpl implements DataSequenceReader { public class DataSequenceReaderImpl implements DataSequenceReader {


+ 1
- 2
source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java View File

@@ -5,11 +5,10 @@ import com.jd.blockchain.statetransfer.DataSequenceInfo;
import com.jd.blockchain.statetransfer.callback.DataSequenceWriter; import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;


/** /**
*数据序列差异的请求者需要使用的回调接口实现类
* 数据序列差异的请求者需要使用的回调接口实现类(测试)
* @author zhangshuang * @author zhangshuang
* @create 2019/4/22 * @create 2019/4/22
* @since 1.0.0 * @since 1.0.0
*
*/ */
public class DataSequenceWriterImpl implements DataSequenceWriter { public class DataSequenceWriterImpl implements DataSequenceWriter {




+ 7
- 1
source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java View File

@@ -14,6 +14,11 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;


/**
* @author zhangshuang
* @create 2019/4/18
* @since 1.0.0
*/
public class StateTransferLayerTest { public class StateTransferLayerTest {


private final int[] listenPorts = new int[]{9000, 9010, 9020, 9030}; private final int[] listenPorts = new int[]{9000, 9010, 9020, 9030};
@@ -87,9 +92,9 @@ public class StateTransferLayerTest {
} }
} }


// 获得除监听结点之外的其他远端结点
InetSocketAddress[] getTargetNodesIp(InetSocketAddress listenIp, InetSocketAddress[] remoteNodeIps) { InetSocketAddress[] getTargetNodesIp(InetSocketAddress listenIp, InetSocketAddress[] remoteNodeIps) {


// 获得除监听结点之外的其他远端结点
InetSocketAddress[] targets = new InetSocketAddress[remoteNodeIps.length - 1]; InetSocketAddress[] targets = new InetSocketAddress[remoteNodeIps.length - 1];
int j = 0; int j = 0;


@@ -104,6 +109,7 @@ public class StateTransferLayerTest {


} }



DataSequence findDataSequence(String id, InetSocketAddress listenNodeAddr) { DataSequence findDataSequence(String id, InetSocketAddress listenNodeAddr) {
for (DataSequence dataSequence : dataSequencesPerNode) { for (DataSequence dataSequence : dataSequencesPerNode) {
if ((dataSequence.getAddress().getPort() == listenNodeAddr.getPort() && (dataSequence.getAddress().getHostName().equals(listenNodeAddr.getHostName())) if ((dataSequence.getAddress().getPort() == listenNodeAddr.getPort() && (dataSequence.getAddress().getHostName().equals(listenNodeAddr.getHostName()))


Loading…
Cancel
Save