From 46c1e2b20b6f4a26e9cc7cc3dbba42438f189432 Mon Sep 17 00:00:00 2001 From: zhangshuang Date: Thu, 25 Apr 2019 11:07:35 +0800 Subject: [PATCH] add notes for code --- .../statetransfer/DataSequenceReaderImpl.java | 1 - .../statetransfer/DataSequenceWriterImpl.java | 1 - .../statetransfer/DataSequenceElement.java | 2 +- .../statetransfer/DataSequenceInfo.java | 2 +- .../callback/DataSequenceReader.java | 20 ++++-- .../callback/DataSequenceWriter.java | 16 +++-- .../comparator/DataSequenceComparator.java | 12 +++- .../exception/DataSequenceException.java | 6 ++ .../message/DSDefaultMessageExecutor.java | 22 ++++--- .../message/DSMsgResolverFactory.java | 19 ++++++ .../message/DataSequenceLoadMessage.java | 6 +- .../message/DataSequenceMsgDecoder.java | 26 ++++++-- .../message/DataSequenceMsgEncoder.java | 27 ++++++-- .../process/DSProcessManager.java | 35 ++++++---- .../process/DSTransferProcess.java | 65 +++++++++++-------- .../result/DSDiffRequestResult.java | 6 ++ .../result/DSInfoResponseResult.java | 9 ++- .../statetransfer/DataSequence.java | 8 +++ .../statetransfer/DataSequenceReaderImpl.java | 3 +- .../statetransfer/DataSequenceWriterImpl.java | 3 +- .../statetransfer/StateTransferLayerTest.java | 8 ++- 21 files changed, 206 insertions(+), 91 deletions(-) diff --git a/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java index eeb1819e..620efcc5 100644 --- a/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java +++ b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java @@ -22,7 +22,6 @@ import org.springframework.beans.factory.annotation.Autowired; * @author zhangshuang * @create 2019/4/11 * @since 1.0.0 - * */ public class DataSequenceReaderImpl implements DataSequenceReader { diff --git a/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java index 56a67358..61cfde0f 100644 --- a/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java +++ b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java @@ -15,7 +15,6 @@ import java.util.Collections; * @author zhangshuang * @create 2019/4/11 * @since 1.0.0 - * */ public class DataSequenceWriterImpl implements DataSequenceWriter { diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java index b64bd963..aaf6e7f5 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java @@ -3,7 +3,7 @@ package com.jd.blockchain.statetransfer; import java.io.Serializable; /** - *数据序列复制的元素或单位 + * 数据序列需要复制内容的元素或单位 * @author zhangshuang * @create 2019/4/11 * @since 1.0.0 diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceInfo.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceInfo.java index 8aa45566..6f0f2e10 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceInfo.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceInfo.java @@ -1,7 +1,7 @@ package com.jd.blockchain.statetransfer; /** - *共识结点上的某个数据序列的当前状态信息,每个共识结点可以对应任意个数据序列; + * 共识结点上的某个数据序列的当前状态信息,每个共识结点可以对应任意个数据序列; * @author zhangshuang * @create 2019/4/11 * @since 1.0.0 diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceReader.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceReader.java index 8e0dda19..e137b929 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceReader.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceReader.java @@ -4,7 +4,7 @@ import com.jd.blockchain.statetransfer.DataSequenceElement; import com.jd.blockchain.statetransfer.DataSequenceInfo; /** - *数据序列差异的提供者需要使用的回调接口 + * 数据序列差异提供者需要使用的回调接口 * @author zhangshuang * @create 2019/4/11 * @since 1.0.0 @@ -12,22 +12,28 @@ import com.jd.blockchain.statetransfer.DataSequenceInfo; public interface DataSequenceReader { /** - * 差异的提供者根据输入的数据序列标识符获取当前的数据序列信息; - * + * 差异提供者根据数据序列标识符获取数据序列当前状态; + * @param id 数据序列标识符 + * @return 数据序列当前状态信息 */ DataSequenceInfo getDSInfo(String id); /** - * 差异的提供者根据输入的数据序列标识符以及起始,结束高度提供数据序列的差异内容; - * + * 差异提供者根据数据序列标识符以及起始,结束高度提供数据序列该范围的差异内容; + * @param id 数据序列标识符 + * @param from 差异的起始高度 + * @param to 差异的结束高度 + * @return 差异元素组成的数组 */ DataSequenceElement[] getDSDiffContent(String id, long from, long to); /** - * 差异的提供者根据输入的数据序列标识符以及高度提供数据序列的差异内容; - * + * 差异提供者根据数据序列标识符以及高度提供数据序列的差异内容; + * @param id 数据序列标识符 + * @param height 要获得哪个高度的差异元素 + * @return 差异元素 */ DataSequenceElement getDSDiffContent(String id, long height); } diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java index 73f5d45d..a38362c7 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java @@ -4,7 +4,7 @@ import com.jd.blockchain.statetransfer.DataSequenceElement; import com.jd.blockchain.statetransfer.DataSequenceInfo; /** - *数据序列差异的请求者获得差异内容后需要回调该接口,通过接口提供的方法对指定数据序列执行差异内容的重放,并更新数据序列的当前状态; + * 数据序列差异请求者获得差异内容后需要回调该接口 * @author zhangshuang * @create 2019/4/11 * @since 1.0.0 @@ -12,15 +12,19 @@ import com.jd.blockchain.statetransfer.DataSequenceInfo; public interface DataSequenceWriter { /** - *更新数据序列的当前状态,一次更新多个高度的差异 - * return void + * 差异请求者更新本地数据序列的状态,一次可以更新多个差异元素 + * @param dsInfo 数据序列当前状态信息 + * @param diffContents 需要更新的差异元素数组 + * @return 更新结果编码 */ int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement[] diffContents); /** - *更新数据序列的当前状态,一次更新一个高度的差异 - * return void + * 差异请求者更新本地数据序列的状态,一次只更新一个差异元素 + * @param dsInfo 数据序列当前状态信息 + * @param diffContent 需要更新的差异元素 + * @return 更新结果编码 */ - int updateDSInfo(DataSequenceInfo id, DataSequenceElement diffContent); + int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement diffContent); } diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/comparator/DataSequenceComparator.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/comparator/DataSequenceComparator.java index 53ad5a15..9e0afbe5 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/comparator/DataSequenceComparator.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/comparator/DataSequenceComparator.java @@ -5,12 +5,20 @@ import com.jd.blockchain.statetransfer.DataSequenceElement; import java.util.Comparator; /** - * - * + * 数据序列差异元素的高度比较器 + * @author zhangshuang + * @create 2019/4/18 + * @since 1.0.0 */ public class DataSequenceComparator implements Comparator { // sort by data sequence height + /** + * 对差异元素根据高度大小排序 + * @param o1 差异元素1 + * @param o2 差异元素2 + * @return >0 or <0 + */ @Override public int compare(DataSequenceElement o1, DataSequenceElement o2) { long height1; diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/exception/DataSequenceException.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/exception/DataSequenceException.java index 95718487..5e6248c0 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/exception/DataSequenceException.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/exception/DataSequenceException.java @@ -1,5 +1,11 @@ package com.jd.blockchain.statetransfer.exception; +/** + * 数据序列异常处理 + * @author zhangshuang + * @create 2019/4/18 + * @since 1.0.0 + */ public class DataSequenceException extends RuntimeException { private static final long serialVersionUID = -4090881296855827889L; diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java index 08464db2..6c67e8c8 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java @@ -8,7 +8,7 @@ import com.jd.blockchain.stp.communication.MessageExecutor; import com.jd.blockchain.stp.communication.RemoteSession; /** - * + * 数据序列差异提供者使用,解析收到的差异请求消息并产生响应 * @author zhangshuang * @create 2019/4/11 * @since 1.0.0 @@ -24,8 +24,11 @@ public class DSDefaultMessageExecutor implements MessageExecutor { } /** - * 对状态机复制的请求进行响应 - * + * 对状态机复制的差异请求进行响应 + * @param key 请求消息的Key + * @param data 需要解码的字节数组 + * @param session 指定响应需要使用的目标结点会话 + * @return 配置为自动响应时,返回值为响应的字节数组,配置为手动响应时,不需要关注返回值 */ @Override @@ -34,21 +37,25 @@ public class DSDefaultMessageExecutor implements MessageExecutor { try { Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(data); + // 解析CMD_DSINFO_REQUEST 请求的情况 if (object instanceof String) { String id = (String)object; byte[] respLoadMsg = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE, id, 0, 0); session.reply(key, new DataSequenceLoadMessage(respLoadMsg)); } + // 解析CMD_GETDSDIFF_REQUEST 请求的情况 else if (object instanceof DSDiffRequestResult) { DSDiffRequestResult requestResult = (DSDiffRequestResult)object; String id = requestResult.getId(); long fromHeight = requestResult.getFromHeight(); long toHeight = requestResult.getToHeight(); + //每个高度的数据序列差异元素进行一次响应的情况 for (long i = fromHeight; i < toHeight + 1; i++) { byte[] respLoadMsg = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE, id, i, i); session.reply(key, new DataSequenceLoadMessage(respLoadMsg)); } + //所有差异进行一次响应的情况 } else { throw new IllegalArgumentException("Receive data exception, unknown message type!"); @@ -61,14 +68,13 @@ public class DSDefaultMessageExecutor implements MessageExecutor { return null; } + /** + * 响应类型设置 + * 分手动响应,自动响应两种类型 + */ @Override public REPLY replyType() { return REPLY.MANUAL; } - /** - * - * - */ - } diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSMsgResolverFactory.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSMsgResolverFactory.java index 874ef367..c0dfae8d 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSMsgResolverFactory.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSMsgResolverFactory.java @@ -3,12 +3,31 @@ package com.jd.blockchain.statetransfer.message; import com.jd.blockchain.statetransfer.callback.DataSequenceReader; import com.jd.blockchain.statetransfer.callback.DataSequenceWriter; +/** + * 数据序列消息解析器工厂 + * @author zhangshuang + * @create 2019/4/18 + * @since 1.0.0 + * + */ public class DSMsgResolverFactory { + /** + * 获得数据序列消息编码器实例 + * @param dsWriter 差异请求者执行数据序列更新的执行器 + * @param dsReader 差异响应者执行数据序列读取的执行器 + * @return 消息编码器实例 + */ public static DataSequenceMsgEncoder getEncoder(DataSequenceWriter dsWriter, DataSequenceReader dsReader) { return new DataSequenceMsgEncoder(dsWriter, dsReader); } + /** + * 获得数据序列消息解码器实例 + * @param dsWriter 差异请求者执行数据序列更新的执行器 + * @param dsReader 差异响应者执行数据序列读取的执行器 + * @return 消息解码器实例 + */ public static DataSequenceMsgDecoder getDecoder(DataSequenceWriter dsWriter, DataSequenceReader dsReader) { return new DataSequenceMsgDecoder(dsWriter, dsReader); } diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceLoadMessage.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceLoadMessage.java index ee38f880..a4131e61 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceLoadMessage.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceLoadMessage.java @@ -3,10 +3,14 @@ package com.jd.blockchain.statetransfer.message; import com.jd.blockchain.stp.communication.message.LoadMessage; /** - * + * 数据序列复制的负载消息 + * @author zhangshuang + * @create 2019/4/18 + * @since 1.0.0 * */ public class DataSequenceLoadMessage implements LoadMessage { + byte[] bytes; public DataSequenceLoadMessage(byte[] bytes) { diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgDecoder.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgDecoder.java index 81df552f..6bfa4c97 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgDecoder.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgDecoder.java @@ -9,6 +9,12 @@ import com.jd.blockchain.statetransfer.result.DSDiffRequestResult; import com.jd.blockchain.utils.io.BytesUtils; import com.jd.blockchain.utils.serialize.binary.BinarySerializeUtils; +/** + * 数据序列消息解码器 + * @author zhangshuang + * @create 2019/4/18 + * @since 1.0.0 + */ public class DataSequenceMsgDecoder { private int heightSize = 8; @@ -34,8 +40,9 @@ public class DataSequenceMsgDecoder { /** - * - * + * 对编过码的字节数组解码,还原成对象实例 + * @param loadMessage 字节序列 + * @return 解码后的对象 */ public Object decode(byte[] loadMessage) { @@ -48,6 +55,7 @@ public class DataSequenceMsgDecoder { int dataLength = BytesUtils.toInt(loadMessage, 0, 4); byte msgCode = loadMessage[4]; + // send by diff provider, diff requester decode if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE.CODE) { respHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize); idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize + heightSize, 4); @@ -55,19 +63,25 @@ public class DataSequenceMsgDecoder { System.arraycopy(loadMessage, 4 + msgTypeSize + heightSize + 4, idBytes, 0, idSize); id = new String(idBytes); return new DataSequenceInfo(id, respHeight); - } else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE.CODE) { + } + // send by diff provider, diff requester decode + else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE.CODE) { diffElemSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize, 4); diffElem = new byte[diffElemSize]; System.arraycopy(loadMessage, 4 + msgTypeSize + 4, diffElem, 0, diffElemSize); dsElement = BinarySerializeUtils.deserialize(diffElem); return dsElement; - } else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST.CODE) { + } + // send by diff requester, diff provider decode + else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST.CODE) { idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize, 4); idBytes = new byte[idSize]; System.arraycopy(loadMessage, 4 + msgTypeSize + 4, idBytes, 0, idSize); id = new String(idBytes); return id; - } else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST.CODE) { + } + // send by diff requester, diff provider decode + else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST.CODE) { fromHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize); toHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize + heightSize); idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize + heightSize + heightSize, 4); @@ -90,6 +104,4 @@ public class DataSequenceMsgDecoder { return null; } - - } diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java index 0de98605..71b3ef4c 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java @@ -7,6 +7,12 @@ import com.jd.blockchain.statetransfer.process.DSTransferProcess; import com.jd.blockchain.utils.io.BytesUtils; import com.jd.blockchain.utils.serialize.binary.BinarySerializeUtils; +/** + * 数据序列消息编码器 + * @author zhangshuang + * @create 2019/4/18 + * @since 1.0.0 + */ public class DataSequenceMsgEncoder { private int heightSize = 8; @@ -21,8 +27,12 @@ public class DataSequenceMsgEncoder { } /** - * 目前暂时考虑fromHeight与toHeight相同的情况,即每次只对一个高度的差异编码并响应 - * + * 目前暂时考虑fromHeight与toHeight相同的情况,即每次只对一个高度的差异内容进行编码并响应 + * 把消息编码成字节数组,再交给通信层传输 + * @param msgType 数据序列状态复制消息类型 + * @param id 数据序列唯一标识符 + * @param fromHeight 差异元素起始高度 + * @param toHeight 差异元素结束高度 */ public byte[] encode(DSTransferProcess.DataSequenceMsgType msgType, String id, long fromHeight, long toHeight) { @@ -33,6 +43,7 @@ public class DataSequenceMsgEncoder { byte[] loadMessage = null; // different encoding methods for different message types + // send by diff requester, diff requester encode if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST) { // CMD_DSINFO_REQUEST Message parts : 4 bytes total message size, 1 byte message type coe, @@ -46,7 +57,9 @@ public class DataSequenceMsgEncoder { loadMessage[4] = msgType.CODE; System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize, 4); System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + 4, idSize); - } else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST) { + } + // send by diff requester, diff requester encode + else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST) { // CMD_GETDSDIFF_REQUEST Message parts : 4 bytes total message size, 1 byte message type coe, 8 bytes from height, // 8 bytes to height, 4 bytes id length, id content size bytes @@ -61,7 +74,9 @@ public class DataSequenceMsgEncoder { System.arraycopy(BytesUtils.toBytes(toHeight), 0, loadMessage, 4 + msgTypeSize + heightSize, heightSize); System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize + heightSize + heightSize, 4); System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + heightSize + heightSize + 4, idSize); - } else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE) { + } + // send by diff provider, diff provider encode + else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE) { // CMD_DSINFO_RESPONSE Message parts : 4 bytes total message size, 1 byte message type coe, 8 bytes data sequence local height, // 4 bytes id length, id content size bytes @@ -77,7 +92,9 @@ public class DataSequenceMsgEncoder { System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize + heightSize, 4); System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + heightSize + 4, idSize); - } else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE) { + } + // send by diff provider, diff provider encode + else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE) { if (fromHeight != toHeight) { throw new IllegalArgumentException("Height parameter error!"); } diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java index cab88add..97b14602 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java @@ -13,17 +13,17 @@ import com.jd.blockchain.stp.communication.callback.CallBackDataListener; import com.jd.blockchain.stp.communication.manager.RemoteSessionManager; import com.jd.blockchain.stp.communication.node.LocalNode; import com.jd.blockchain.stp.communication.node.RemoteNode; -import com.jd.blockchain.utils.concurrent.CompletableAsyncFuture; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.*; /** - * + * 数据序列状态复制过程管理器 * @author zhangshuang * @create 2019/4/11 * @since 1.0.0 + * */ public class DSProcessManager { @@ -32,14 +32,19 @@ public class DSProcessManager { private long dsInfoResponseTimeout = 20000; private ExecutorService writeExecutors = Executors.newFixedThreadPool(5); private int returnCode = 0; + /** - * - * + * 启动一个指定数据序列的状态复制过程 + * @param dsInfo 数据序列当前状态信息 + * @param listener 本地监听者 + * @param targets 目标结点 + * @param dsWriter 差异请求者执行数据序列更新的执行器 + * @param dsReader 差异响应者执行数据序列读取的执行器 + * @return returnCode 执行结果码 */ public int startDSProcess(DataSequenceInfo dsInfo, InetSocketAddress listener, InetSocketAddress[] targets, DataSequenceWriter dsWriter, DataSequenceReader dsReader) { // create remote sessions manager, add listener - LocalNode listenNode = new LocalNode(listener.getHostName(), listener.getPort(), new DSDefaultMessageExecutor(dsReader, dsWriter)); RemoteSessionManager remoteSessionManager = new RemoteSessionManager(listenNode); @@ -116,17 +121,13 @@ public class DSProcessManager { RemoteSession responseSession = findResponseSession(diffResult.getMaxHeightRemoteNode(), remoteSessions); System.out.println("Async send CMD_GETDSDIFF_REQUEST msg to targets will start!"); + // step5: collect get data sequence diff response for (long height = dsInfo.getHeight() + 1; height < diffResult.getMaxHeight() + 1; height++) { CallBackDataListener dsDiffResponse = dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST, responseSession, height, height, callBackBarrierDiff); dsDiffResponses.addLast(dsDiffResponse); } - - // 考虑性能 -// writeExecutors.execute(() -> { -// -// }); - + // 上述发送不合理,考虑一次性发送请求 System.out.println("Wait CMD_GETDSDIFF_RESPONSE msg from targets!"); LinkedList receiveDiffResponses = new LinkedList<>(); if (callBackBarrierDiff.tryCall()) { @@ -159,6 +160,12 @@ public class DSProcessManager { return returnCode; } + /** + * 根据远端结点找与远端结点建立的会话 + * @param remoteNode 远端结点 + * @param remoteSessions 本地维护的远端结点会话表 + * @return 与远端结点对应的会话 + */ RemoteSession findResponseSession(RemoteNode remoteNode, RemoteSession[] remoteSessions) { for (RemoteSession remoteSession : remoteSessions) { if (remoteSession.remoteNode().equals(remoteNode)) { @@ -171,9 +178,9 @@ public class DSProcessManager { * * */ - void setDSReader(DataSequenceReader reader) { - - } +// void setDSReader(DataSequenceReader reader) { +// +// } } diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java index b1e6b817..45fc89fb 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java @@ -20,7 +20,7 @@ import java.util.LinkedList; import java.util.Map; /** - * + * 数据序列状态复制过程 * @author zhangshuang * @create 2019/4/11 * @since 1.0.0 @@ -36,8 +36,8 @@ public class DSTransferProcess { private String id; /** - * - * + * @param dsInfo 数据序列当前状态信息 + * @param targets 目标结点 */ public DSTransferProcess(DataSequenceInfo dsInfo, InetSocketAddress[] targets) { this.dsInfo = dsInfo; @@ -45,30 +45,46 @@ public class DSTransferProcess { this.id = dsInfo.getId(); } + /** + * @param dsWriter 差异请求者执行数据序列更新的执行器 + * @return void + */ public void setDSWriter(DataSequenceWriter dsWriter) { this.dsWriter = dsWriter; } + /** + * @param dsReader 差异响应者执行数据序列读取的执行器 + * @return void + */ public void setDSReader(DataSequenceReader dsReader) { this.dsReader = dsReader; } + /** + * @param remoteSessionManager 远端会话管理器 + * @return void + */ public void setRemoteSessionManager(RemoteSessionManager remoteSessionManager) { this.remoteSessionManager = remoteSessionManager; } /** - * get unique id from data sequence transfer process * + * @return 数据序列标识符 */ public String getId() { return id; } /** - * - * + * @param msgType 数据序列差异请求消息类型 + * @param remoteSession 目标结点对应的会话 + * @param fromHeight 差异起始高度 + * @param toHeight 差异结束高度 + * @param callBackBarrier 异步回调 + * @return 异步回调 */ CallBackDataListener send(DataSequenceMsgType msgType, RemoteSession remoteSession, long fromHeight, long toHeight, CallBackBarrier callBackBarrier) { @@ -76,9 +92,11 @@ public class DSTransferProcess { return remoteSession.asyncRequest(new DataSequenceLoadMessage(loadMessage), callBackBarrier); } + /** - * - * + * 计算数据序列差异元素数组 + * @param diffArray 差异的字节数组 + * @return 对差异字节数组的解码结果 */ public ArrayList computeDiffElement(byte[][] diffArray) { @@ -98,8 +116,9 @@ public class DSTransferProcess { } /** - * - * + * 根据差异提供者响应的数据序列状态信息找到拥有最大数据序列高度的远端结点 + * @param receiveResponses 数据序列差异请求者收到的远端结点状态的响应信息 + * @return 得到远端数据序列的最大高度以及拥有者结点 */ public DSInfoResponseResult computeDiffInfo(LinkedList receiveResponses) { long maxHeight = 0; @@ -110,11 +129,10 @@ public class DSTransferProcess { try { for (CallBackDataListener receiveResponse : receiveResponses) { Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(receiveResponse.getCallBackData()); -// System.out.println("ComputeDiffInfo object = "+object); if (object instanceof DataSequenceInfo) { DataSequenceInfo dsInfo = (DataSequenceInfo) object; long height = dsInfo.getHeight(); -// System.out.println("ComputeDiffInfo height = " +height); + // sava max height and its remote node if (maxHeight < height) { maxHeight = height; maxHeightRemoteNode = receiveResponse.remoteNode(); @@ -134,24 +152,17 @@ public class DSTransferProcess { } /** - * - * - */ - public void getDSInfo(String id) { - - } - - /** - * - * + * 获取本复制过程维护的远端会话表 + * @param + * @return 远端会话表数组 */ public RemoteSession[] getSessions() { return remoteSessions; } /** - * close all sessions - * + * 关闭本复制过程维护的所有远端会话 + * @return void */ public void close() { for (RemoteSession session : remoteSessions) { @@ -160,8 +171,8 @@ public class DSTransferProcess { } /** - * establish connections with target remote nodes - * + * 建立与远端目标结点的连接,产生本地维护的远端会话表 + * @return void */ public void start() { @@ -176,7 +187,7 @@ public class DSTransferProcess { /** - * data sequence transfer message type + * 数据序列状态传输使用的消息类型 * */ public enum DataSequenceMsgType { diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSDiffRequestResult.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSDiffRequestResult.java index f71ebbfb..af62bd6c 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSDiffRequestResult.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSDiffRequestResult.java @@ -1,5 +1,11 @@ package com.jd.blockchain.statetransfer.result; +/** + * 数据序列差异提供者解码请求者"CMD_GETDSDIFF_REQUEST"消息时得到的结果 + * @author zhangshuang + * @create 2019/4/18 + * @since 1.0.0 + */ public class DSDiffRequestResult { String id; diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java index 9a7ea126..7f6d48cc 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java @@ -1,13 +1,12 @@ package com.jd.blockchain.statetransfer.result; -import com.jd.blockchain.stp.communication.RemoteSession; import com.jd.blockchain.stp.communication.node.RemoteNode; /** - * - * - * - * + * 数据序列差异请求者解码提供者"CMD_DSINFO_RESPONSE"消息时得到的结果 + * @author zhangshuang + * @create 2019/4/18 + * @since 1.0.0 */ public class DSInfoResponseResult { diff --git a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java index ed41757b..fcb932c3 100644 --- a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java +++ b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java @@ -6,11 +6,18 @@ import com.jd.blockchain.statetransfer.DataSequenceInfo; import java.net.InetSocketAddress; import java.util.LinkedList; +/** + * 测试过程建立的一个数据序列 + * @author zhangshuang + * @create 2019/4/18 + * @since 1.0.0 + */ public class DataSequence { private InetSocketAddress address; private String id; + // 每个数据序列维护了一系列的数据序列元素 private LinkedList dataSequenceElements = new LinkedList<>(); @@ -27,6 +34,7 @@ public class DataSequence { return address; } + public void addElements(DataSequenceElement[] elements) { for (DataSequenceElement element : elements) { addElement(element); diff --git a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceReaderImpl.java b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceReaderImpl.java index cbb97172..84b5477c 100644 --- a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceReaderImpl.java +++ b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceReaderImpl.java @@ -8,11 +8,10 @@ import java.net.InetSocketAddress; import java.util.LinkedList; /** - *数据序列差异的提供者需要使用的回调接口实现类 + * 数据序列差异的提供者需要使用的回调接口实现类(测试) * @author zhangshuang * @create 2019/4/22 * @since 1.0.0 - * */ public class DataSequenceReaderImpl implements DataSequenceReader { diff --git a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java index 9da566c2..c3fb037e 100644 --- a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java +++ b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java @@ -5,11 +5,10 @@ import com.jd.blockchain.statetransfer.DataSequenceInfo; import com.jd.blockchain.statetransfer.callback.DataSequenceWriter; /** - *数据序列差异的请求者需要使用的回调接口实现类 + * 数据序列差异的请求者需要使用的回调接口实现类(测试) * @author zhangshuang * @create 2019/4/22 * @since 1.0.0 - * */ public class DataSequenceWriterImpl implements DataSequenceWriter { diff --git a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java index 9a185d95..b42c0b4e 100644 --- a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java +++ b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java @@ -14,6 +14,11 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +/** + * @author zhangshuang + * @create 2019/4/18 + * @since 1.0.0 + */ public class StateTransferLayerTest { private final int[] listenPorts = new int[]{9000, 9010, 9020, 9030}; @@ -87,9 +92,9 @@ public class StateTransferLayerTest { } } - // 获得除监听结点之外的其他远端结点 InetSocketAddress[] getTargetNodesIp(InetSocketAddress listenIp, InetSocketAddress[] remoteNodeIps) { + // 获得除监听结点之外的其他远端结点 InetSocketAddress[] targets = new InetSocketAddress[remoteNodeIps.length - 1]; int j = 0; @@ -104,6 +109,7 @@ public class StateTransferLayerTest { } + DataSequence findDataSequence(String id, InetSocketAddress listenNodeAddr) { for (DataSequence dataSequence : dataSequencesPerNode) { if ((dataSequence.getAddress().getPort() == listenNodeAddr.getPort() && (dataSequence.getAddress().getHostName().equals(listenNodeAddr.getHostName()))