diff --git a/source/state-transfer/pom.xml b/source/state-transfer/pom.xml
index 9c3f04be..d17c8e03 100644
--- a/source/state-transfer/pom.xml
+++ b/source/state-transfer/pom.xml
@@ -15,6 +15,17 @@
stp-communication
${project.version}
+
+ com.jd.blockchain
+ utils-common
+ ${project.version}
+
+
+
+ com.jd.blockchain
+ utils-serialize
+ ${project.version}
+
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSProcessManager.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSProcessManager.java
deleted file mode 100644
index 63573700..00000000
--- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSProcessManager.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package com.jd.blockchain.statetransfer;
-
-import com.jd.blockchain.stp.communication.RemoteSession;
-import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- *
- * @author zhangshuang
- * @create 2019/4/11
- * @since 1.0.0
- */
-public class DSProcessManager {
-
- private Map dSProcessMap = new ConcurrentHashMap<>();
- private RemoteSession[] remoteSessions;
-
- DSTransferProcess startDSProcess(DataSequenceInfo dsInfo, InetSocketAddress listener, InetSocketAddress[] targets, DataSequenceWriter dsWriter, DataSequenceReader dsReader) {
-
-// RemoteSessionManager remoteSessionManager = new RemoteSessionManager(listener.getPort());
- RemoteSessionManager remoteSessionManager = null;
- DSTransferProcess dsTransferProcess = new DSTransferProcess(dsInfo, remoteSessionManager, targets, dsWriter, dsReader);
-
- dsTransferProcess.start();
- remoteSessions = dsTransferProcess.getSessions();
-
- for(RemoteSession session : remoteSessions) {
- dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_DSINFO, session);
- }
-
-
- dSProcessMap.put(dsInfo.getId(), dsTransferProcess);
-
- return dsTransferProcess;
- }
-
- void setDSReader(DataSequenceReader reader) {
-
- }
-
-
-}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java
deleted file mode 100644
index 6d5b699f..00000000
--- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package com.jd.blockchain.statetransfer;
-
-import com.jd.blockchain.stp.communication.RemoteSession;
-import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
-import com.jd.blockchain.stp.communication.node.RemoteNode;
-
-import java.net.InetSocketAddress;
-
-/**
- *
- * @author zhangshuang
- * @create 2019/4/11
- * @since 1.0.0
- */
-public class DSTransferProcess {
-
- private InetSocketAddress[] targets;
- private DataSequenceWriter dsWriter;
- private DataSequenceReader dsReader;
- private DataSequenceInfo dsInfo;
- private RemoteSessionManager remoteSessionManager;
- private RemoteSession[] remoteSessions;
- private String id;
-
-
- public DSTransferProcess(DataSequenceInfo dsInfo, RemoteSessionManager remoteSessionManager, InetSocketAddress[] targets, DataSequenceWriter dsWriter, DataSequenceReader dsReader) {
-
- this.dsInfo = dsInfo;
- this.targets = targets;
- this.dsWriter = dsWriter;
- this.dsReader = dsReader;
- this.remoteSessionManager = remoteSessionManager;
- this.id = dsInfo.getId();
-
- }
-
- void send(DataSequenceMsgType msgType, RemoteSession session) {
-
- //session.send();
-
- }
-
- byte[] createMsg(DataSequenceMsgType msgType) {
- return null;
- }
-
- public void computeDiff() {
- //todo
- }
-
- public void getDSInfo(String id) {
- //todo
- }
-
- public RemoteSession[] getSessions() {
- //todo
- return remoteSessions;
- }
-
- public void start() {
-
- RemoteNode[] remoteNodes = new RemoteNode[targets.length];
-
- for (int i = 0; i< remoteNodes.length; i++) {
- remoteNodes[i] = new RemoteNode(targets[i].getHostName(), targets[i].getPort());
- }
-
- remoteSessions = remoteSessionManager.newSessions(remoteNodes);
-
- for (int i = 0; i < remoteSessions.length; i++) {
- DataSequenceMsgHandle msgHandle = new DataSequenceMsgHandle(dsReader, dsWriter);
- remoteSessions[i].initExecutor(msgHandle);
- remoteSessions[i].init();
- }
- }
-
- enum DataSequenceMsgType {
-
- CMD_DSINFO,
- CMD_GETDSDIFF
- }
-
-
-}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java
index e1e904ed..b64bd963 100644
--- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java
+++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java
@@ -1,12 +1,16 @@
package com.jd.blockchain.statetransfer;
+import java.io.Serializable;
+
/**
*数据序列复制的元素或单位
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
-public class DataSequenceElement {
+public class DataSequenceElement implements Serializable {
+
+ private static final long serialVersionUID = -719578198150380571L;
//数据序列的唯一标识符;
private String id;
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java
deleted file mode 100644
index d396ff3e..00000000
--- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.jd.blockchain.statetransfer;
-
-import com.jd.blockchain.stp.communication.MessageExecutor;
-import com.jd.blockchain.stp.communication.RemoteSession;
-
-/**
- *
- * @author zhangshuang
- * @create 2019/4/11
- * @since 1.0.0
- */
-public class DataSequenceMsgHandle implements MessageExecutor {
-
- DataSequenceReader dsReader;
- DataSequenceWriter dsWriter;
-
- public DataSequenceMsgHandle(DataSequenceReader dsReader, DataSequenceWriter dsWriter) {
- this.dsReader = dsReader;
- this.dsWriter = dsWriter;
- }
-
- @Override
- public byte[] receive(String key, byte[] data, RemoteSession session) {
- return new byte[0];
- }
-
- @Override
- public REPLY replyType() {
- return REPLY.AUTO;
- }
-
- /**
- *
- *
- */
-
-}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceReader.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceReader.java
deleted file mode 100644
index 5662c117..00000000
--- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceReader.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package com.jd.blockchain.statetransfer;
-/**
- *数据序列差异的提供者需要使用的回调接口
- * @author zhangshuang
- * @create 2019/4/11
- * @since 1.0.0
- */
-public interface DataSequenceReader {
-
- /**
- * 差异的提供者根据输入的数据序列标识符获取当前的数据序列信息;
- *
- */
- DataSequenceInfo getDSInfo(String id);
-
-
- /**
- * 差异的提供者根据输入的数据序列标识符以及起始,结束高度获得数据序列的差异内容;
- *
- */
- DataSequenceElement[] getDSContent(String id, long from, long to);
-}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceWriter.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceWriter.java
deleted file mode 100644
index 0c45de18..00000000
--- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceWriter.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.jd.blockchain.statetransfer;
-
-/**
- *数据序列差异的请求者获得差异内容后需要回调该接口,通过接口提供的方法对指定数据序列执行差异内容的重放,并更新数据序列的当前状态;
- * @author zhangshuang
- * @create 2019/4/11
- * @since 1.0.0
- */
-public interface DataSequenceWriter {
-
- /**
- *更新数据序列的当前状态
- * return void
- */
- void updateDSInfo(String id, DataSequenceElement[] diffContents);
-
-}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceReader.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceReader.java
new file mode 100644
index 00000000..8e0dda19
--- /dev/null
+++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceReader.java
@@ -0,0 +1,33 @@
+package com.jd.blockchain.statetransfer.callback;
+
+import com.jd.blockchain.statetransfer.DataSequenceElement;
+import com.jd.blockchain.statetransfer.DataSequenceInfo;
+
+/**
+ *数据序列差异的提供者需要使用的回调接口
+ * @author zhangshuang
+ * @create 2019/4/11
+ * @since 1.0.0
+ */
+public interface DataSequenceReader {
+
+ /**
+ * 差异的提供者根据输入的数据序列标识符获取当前的数据序列信息;
+ *
+ */
+ DataSequenceInfo getDSInfo(String id);
+
+
+ /**
+ * 差异的提供者根据输入的数据序列标识符以及起始,结束高度提供数据序列的差异内容;
+ *
+ */
+ DataSequenceElement[] getDSDiffContent(String id, long from, long to);
+
+
+ /**
+ * 差异的提供者根据输入的数据序列标识符以及高度提供数据序列的差异内容;
+ *
+ */
+ DataSequenceElement getDSDiffContent(String id, long height);
+}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java
new file mode 100644
index 00000000..3e7d060f
--- /dev/null
+++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java
@@ -0,0 +1,25 @@
+package com.jd.blockchain.statetransfer.callback;
+
+import com.jd.blockchain.statetransfer.DataSequenceElement;
+
+/**
+ *数据序列差异的请求者获得差异内容后需要回调该接口,通过接口提供的方法对指定数据序列执行差异内容的重放,并更新数据序列的当前状态;
+ * @author zhangshuang
+ * @create 2019/4/11
+ * @since 1.0.0
+ */
+public interface DataSequenceWriter {
+
+ /**
+ *更新数据序列的当前状态,一次更新多个高度的差异
+ * return void
+ */
+ int updateDSInfo(String id, DataSequenceElement[] diffContents);
+
+ /**
+ *更新数据序列的当前状态,一次更新一个高度的差异
+ * return void
+ */
+ int updateDSInfo(String id, DataSequenceElement diffContents);
+
+}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java
new file mode 100644
index 00000000..258c5fdb
--- /dev/null
+++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java
@@ -0,0 +1,75 @@
+package com.jd.blockchain.statetransfer.message;
+
+import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
+import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;
+import com.jd.blockchain.statetransfer.process.DSTransferProcess;
+import com.jd.blockchain.statetransfer.result.DSDiffRequestResult;
+import com.jd.blockchain.stp.communication.MessageExecutor;
+import com.jd.blockchain.stp.communication.RemoteSession;
+
+/**
+ *
+ * @author zhangshuang
+ * @create 2019/4/11
+ * @since 1.0.0
+ */
+public class DSDefaultMessageExecutor implements MessageExecutor {
+
+ DataSequenceReader dsReader;
+ DataSequenceWriter dsWriter;
+
+ public DSDefaultMessageExecutor(DataSequenceReader dsReader, DataSequenceWriter dsWriter) {
+ this.dsReader = dsReader;
+ this.dsWriter = dsWriter;
+ }
+
+ /**
+ * 对状态机复制的请求进行响应
+ *
+ */
+
+ @Override
+ public byte[] receive(String key, byte[] data, RemoteSession session) {
+
+ try {
+ Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(data);
+
+ if (object instanceof String) {
+
+ String id = (String)object;
+ byte[] respLoadMsg = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE, id, 0, 0);
+ session.reply(key, new DataSequenceLoadMessage(respLoadMsg));
+ }
+ else if (object instanceof DSDiffRequestResult) {
+
+ DSDiffRequestResult requestResult = (DSDiffRequestResult)object;
+ String id = requestResult.getId();
+ long fromHeight = requestResult.getFromHeight();
+ long toHeight = requestResult.getToHeight();
+ for (long i = fromHeight; i < toHeight + 1; i++) {
+ byte[] respLoadMsg = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE, id, i, i);
+ session.reply(key, new DataSequenceLoadMessage(respLoadMsg));
+ }
+ }
+ else {
+ throw new IllegalArgumentException("Receive data exception, unknown message type!");
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ return null;
+ }
+
+ @Override
+ public REPLY replyType() {
+ return REPLY.MANUAL;
+ }
+
+ /**
+ *
+ *
+ */
+
+}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSMsgResolverFactory.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSMsgResolverFactory.java
new file mode 100644
index 00000000..874ef367
--- /dev/null
+++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSMsgResolverFactory.java
@@ -0,0 +1,15 @@
+package com.jd.blockchain.statetransfer.message;
+
+import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
+import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;
+
+public class DSMsgResolverFactory {
+
+ public static DataSequenceMsgEncoder getEncoder(DataSequenceWriter dsWriter, DataSequenceReader dsReader) {
+ return new DataSequenceMsgEncoder(dsWriter, dsReader);
+ }
+
+ public static DataSequenceMsgDecoder getDecoder(DataSequenceWriter dsWriter, DataSequenceReader dsReader) {
+ return new DataSequenceMsgDecoder(dsWriter, dsReader);
+ }
+}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceLoadMessage.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceLoadMessage.java
new file mode 100644
index 00000000..ee38f880
--- /dev/null
+++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceLoadMessage.java
@@ -0,0 +1,24 @@
+package com.jd.blockchain.statetransfer.message;
+
+import com.jd.blockchain.stp.communication.message.LoadMessage;
+
+/**
+ *
+ *
+ */
+public class DataSequenceLoadMessage implements LoadMessage {
+ byte[] bytes;
+
+ public DataSequenceLoadMessage(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ public void setBytes(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public byte[] toBytes() {
+ return bytes;
+ }
+}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgDecoder.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgDecoder.java
new file mode 100644
index 00000000..81df552f
--- /dev/null
+++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgDecoder.java
@@ -0,0 +1,95 @@
+package com.jd.blockchain.statetransfer.message;
+
+import com.jd.blockchain.statetransfer.DataSequenceElement;
+import com.jd.blockchain.statetransfer.DataSequenceInfo;
+import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
+import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;
+import com.jd.blockchain.statetransfer.process.DSTransferProcess;
+import com.jd.blockchain.statetransfer.result.DSDiffRequestResult;
+import com.jd.blockchain.utils.io.BytesUtils;
+import com.jd.blockchain.utils.serialize.binary.BinarySerializeUtils;
+
+public class DataSequenceMsgDecoder {
+
+ private int heightSize = 8;
+ private int msgTypeSize = 1;
+
+ private long respHeight;
+ private long fromHeight;
+ private long toHeight;
+ private int idSize;
+ private byte[] idBytes;
+ private String id;
+ private int diffElemSize;
+ private byte[] diffElem;
+ DataSequenceElement dsElement;
+
+ private DataSequenceWriter dsWriter;
+ private DataSequenceReader dsReader;
+
+ public DataSequenceMsgDecoder(DataSequenceWriter dsWriter, DataSequenceReader dsReader) {
+ this.dsWriter = dsWriter;
+ this.dsReader = dsReader;
+ }
+
+
+ /**
+ *
+ *
+ */
+ public Object decode(byte[] loadMessage) {
+
+ try {
+ if (loadMessage.length <= 5) {
+ System.out.println("LoadMessage size is less than 5!");
+ throw new IllegalArgumentException();
+ }
+
+ int dataLength = BytesUtils.toInt(loadMessage, 0, 4);
+ byte msgCode = loadMessage[4];
+
+ if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE.CODE) {
+ respHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize);
+ idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize + heightSize, 4);
+ idBytes = new byte[idSize];
+ System.arraycopy(loadMessage, 4 + msgTypeSize + heightSize + 4, idBytes, 0, idSize);
+ id = new String(idBytes);
+ return new DataSequenceInfo(id, respHeight);
+ } else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE.CODE) {
+ diffElemSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize, 4);
+ diffElem = new byte[diffElemSize];
+ System.arraycopy(loadMessage, 4 + msgTypeSize + 4, diffElem, 0, diffElemSize);
+ dsElement = BinarySerializeUtils.deserialize(diffElem);
+ return dsElement;
+ } else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST.CODE) {
+ idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize, 4);
+ idBytes = new byte[idSize];
+ System.arraycopy(loadMessage, 4 + msgTypeSize + 4, idBytes, 0, idSize);
+ id = new String(idBytes);
+ return id;
+ } else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST.CODE) {
+ fromHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize);
+ toHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize + heightSize);
+ idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize + heightSize + heightSize, 4);
+ idBytes = new byte[idSize];
+ System.arraycopy(loadMessage, 4 + msgTypeSize + heightSize + heightSize + 4, idBytes, 0, idSize);
+ id = new String(idBytes);
+ return new DSDiffRequestResult(id, fromHeight, toHeight);
+ }
+ else {
+ System.out.println("Unknown message type!");
+ throw new IllegalArgumentException();
+ }
+
+ } catch (Exception e) {
+ System.out.println("Error to decode message: " + e.getMessage() + "!");
+ e.printStackTrace();
+
+ }
+
+ return null;
+ }
+
+
+
+}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java
new file mode 100644
index 00000000..9b1adad0
--- /dev/null
+++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java
@@ -0,0 +1,116 @@
+package com.jd.blockchain.statetransfer.message;
+
+import com.jd.blockchain.statetransfer.DataSequenceElement;
+import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
+import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;
+import com.jd.blockchain.statetransfer.process.DSTransferProcess;
+import com.jd.blockchain.utils.io.BytesUtils;
+import com.jd.blockchain.utils.serialize.binary.BinarySerializeUtils;
+
+public class DataSequenceMsgEncoder {
+
+ private int heightSize = 8;
+ private int msgTypeSize = 1;
+
+ private DataSequenceWriter dsWriter;
+ private DataSequenceReader dsReader;
+
+ public DataSequenceMsgEncoder(DataSequenceWriter dsWriter, DataSequenceReader dsReader) {
+ this.dsWriter = dsWriter;
+ this.dsReader = dsReader;
+ }
+
+ /**
+ * 目前暂时考虑fromHeight与toHeight相同的情况,即每次只对一个高度的差异编码并响应
+ *
+ */
+ public byte[] encode(DSTransferProcess.DataSequenceMsgType msgType, String id, long fromHeight, long toHeight) {
+
+ try {
+
+ int dataLength;
+ int idSize = id.getBytes().length;
+ byte[] loadMessage = null;
+
+ // different encoding methods for different message types
+ if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST) {
+
+ // CMD_DSINFO_REQUEST Message parts : 4 bytes total message size, 1 byte message type coe,
+ // 4 bytes id length, id content size bytes
+
+ dataLength = 4 + msgTypeSize + 4 + idSize;
+
+ loadMessage = new byte[dataLength];
+
+ System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4);
+ System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize);
+ System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize, 4);
+ System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + 4, idSize);
+ } else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST) {
+
+ // CMD_GETDSDIFF_REQUEST Message parts : 4 bytes total message size, 1 byte message type coe, 8 bytes from height,
+ // 8 bytes to height, 4 bytes id length, id content size bytes
+
+ dataLength = 4 + msgTypeSize + heightSize + heightSize + 4 + idSize;
+
+ loadMessage = new byte[dataLength];
+
+ System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4);
+ System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize);
+ System.arraycopy(BytesUtils.toBytes(fromHeight), 0, loadMessage, 4 + msgTypeSize, heightSize);
+ System.arraycopy(BytesUtils.toBytes(toHeight), 0, loadMessage, 4 + msgTypeSize + heightSize, heightSize);
+ System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize + heightSize + heightSize, 4);
+ System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + heightSize + heightSize + 4, idSize);
+ } else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE) {
+
+ // CMD_DSINFO_RESPONSE Message parts : 4 bytes total message size, 1 byte message type coe, 8 bytes data sequence local height,
+ // 4 bytes id length, id content size bytes
+
+ dataLength = 4 + msgTypeSize + heightSize + 4 + idSize;
+
+ loadMessage = new byte[dataLength];
+
+ System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4);
+ System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize);
+ System.arraycopy(BytesUtils.toBytes(dsReader.getDSInfo(id).getHeight()), 0, loadMessage, 4 + msgTypeSize, heightSize);
+
+ System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize + heightSize, 4);
+ System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + heightSize + 4, idSize);
+
+ } else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE) {
+ if (fromHeight != toHeight) {
+ throw new IllegalArgumentException("Height parameter error!");
+ }
+
+ // CMD_DSINFO_RESPONSE Message parts : 4 bytes total message size, 1 byte message type coe,
+ // 4 bytes diffElem size, diff content size;
+
+ // 回调reader,获得这个高度上的所有差异的数据序列内容,并组织成DataSequenceElement结构
+ DataSequenceElement element = dsReader.getDSDiffContent(id, fromHeight);
+
+ byte[] diffElem = BinarySerializeUtils.serialize(element);
+
+ dataLength = 4 + msgTypeSize + 4 + diffElem.length;
+ loadMessage = new byte[dataLength];
+
+ System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4); //total size
+ System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize); //msgType size
+ System.arraycopy(BytesUtils.toBytes(diffElem.length), 0, loadMessage, 4 + msgTypeSize, 4); // diffElem size
+ System.arraycopy(diffElem, 0, loadMessage, 4 + msgTypeSize + 4, diffElem.length); // diffElem bytes
+ }
+ else {
+ System.out.println("Unknown message type!");
+ throw new IllegalArgumentException();
+ }
+
+ return loadMessage;
+
+ } catch (Exception e) {
+ System.out.println("Error to encode message type : " + msgType + "!");
+ e.printStackTrace();
+ }
+
+ return null;
+ }
+
+}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java
new file mode 100644
index 00000000..af40d6f2
--- /dev/null
+++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java
@@ -0,0 +1,155 @@
+package com.jd.blockchain.statetransfer.process;
+
+import com.jd.blockchain.statetransfer.DataSequenceElement;
+import com.jd.blockchain.statetransfer.DataSequenceInfo;
+import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
+import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;
+import com.jd.blockchain.statetransfer.message.DSDefaultMessageExecutor;
+import com.jd.blockchain.statetransfer.result.DSInfoResponseResult;
+import com.jd.blockchain.stp.communication.RemoteSession;
+import com.jd.blockchain.stp.communication.callback.CallBackBarrier;
+import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
+import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
+import com.jd.blockchain.stp.communication.node.LocalNode;
+import com.jd.blockchain.utils.concurrent.CompletableAsyncFuture;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.*;
+
+/**
+ *
+ * @author zhangshuang
+ * @create 2019/4/11
+ * @since 1.0.0
+ */
+public class DSProcessManager {
+
+ private static Map dSProcessMap = new ConcurrentHashMap<>();
+ private RemoteSession[] remoteSessions;
+ private long dsInfoResponseTimeout = 2000;
+ private ExecutorService writeExecutors = Executors.newFixedThreadPool(5);
+ private int returnCode = 0;
+ /**
+ *
+ *
+ */
+ public int startDSProcess(DataSequenceInfo dsInfo, InetSocketAddress listener, InetSocketAddress[] targets, DataSequenceWriter dsWriter, DataSequenceReader dsReader) {
+
+ // create remote sessions manager, add listener
+
+ LocalNode listenNode = new LocalNode(listener.getHostName(), listener.getPort(), new DSDefaultMessageExecutor(dsReader, dsWriter));
+
+ RemoteSessionManager remoteSessionManager = new RemoteSessionManager(listenNode);
+
+ // data sequence transfer process life cycle start
+ DSTransferProcess dsTransferProcess = new DSTransferProcess(dsInfo, targets);
+ dsTransferProcess.setDSReader(dsReader);
+ dsTransferProcess.setDSWriter(dsWriter);
+ dsTransferProcess.setRemoteSessionManager(remoteSessionManager);
+
+ dSProcessMap.put(dsInfo.getId(), dsTransferProcess);
+
+ try {
+ // start network connections with targets
+ dsTransferProcess.start();
+
+ //get all target sessions
+ remoteSessions = dsTransferProcess.getSessions();
+
+ // async message send process
+ CallBackBarrier callBackBarrier = CallBackBarrier.newCallBackBarrier(remoteSessions.length, dsInfoResponseTimeout);
+
+ // response message manage map
+ Map dsInfoResponses = new ConcurrentHashMap<>();
+
+ // step1: send get dsInfo request, then hold
+ for (RemoteSession remoteSession : remoteSessions) {
+
+ CallBackDataListener dsInfoResponse = dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST, remoteSession, 0, 0, callBackBarrier);
+
+ dsInfoResponses.put(remoteSession, dsInfoResponse);
+ }
+
+ // step2: collect get dsInfo response
+ Map receiveResponses = new ConcurrentHashMap<>();
+ if (callBackBarrier.tryCall()) {
+ for (RemoteSession remoteSession : dsInfoResponses.keySet()) {
+ CallBackDataListener asyncFuture = dsInfoResponses.get(remoteSession);
+ // if really done
+ if (asyncFuture.isDone()) {
+ receiveResponses.put(remoteSession, asyncFuture.getCallBackData());
+ }
+ }
+ }
+
+ // step3: process received responses
+ DSInfoResponseResult diffResult = dsTransferProcess.computeDiffInfo(receiveResponses);
+
+ // height diff
+ long diff = dsInfo.getHeight() - diffResult.getMaxHeight();
+
+ if (diff == 0 || diff > 0) {
+ // no duplication is required, life cycle ends
+ dsTransferProcess.close();
+ dSProcessMap.remove(dsInfo.getId());
+ return returnCode;
+
+ }
+ else {
+
+ // step4: async send get data sequence diff request
+ // single step get diff
+ // async message send process
+ CallBackBarrier callBackBarrierDiff = CallBackBarrier.newCallBackBarrier((int)(diffResult.getMaxHeight() - dsInfo.getHeight()), dsInfoResponseTimeout);
+ LinkedList dsDiffResponses = new LinkedList<>();
+
+ // step5: collect get data sequence diff response
+ for (long height = dsInfo.getHeight() + 1; height < diffResult.getMaxHeight() + 1; height++) {
+ CallBackDataListener dsDiffResponse = dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST, diffResult.getMaxHeightSession(), height, height, callBackBarrierDiff);
+ dsDiffResponses.addLast(dsDiffResponse);
+ }
+
+ // 考虑性能
+// writeExecutors.execute(() -> {
+//
+// });
+
+ LinkedList receiveDiffResponses = new LinkedList<>();
+ if (callBackBarrierDiff.tryCall()) {
+ for (int i = 0; i < dsDiffResponses.size(); i++) {
+ CallBackDataListener asyncFutureDiff = dsDiffResponses.get(i);
+ if (asyncFutureDiff.isDone()) {
+ receiveDiffResponses.addLast(asyncFutureDiff.getCallBackData());
+ }
+ }
+ }
+ // step6: process data sequence diff response, update local data sequence state
+ DataSequenceElement[] dataSequenceElements = dsTransferProcess.computeDiffElement(receiveDiffResponses.toArray(new byte[receiveDiffResponses.size()][]));
+ returnCode = dsWriter.updateDSInfo(dsInfo.getId(), dataSequenceElements);
+
+ // data sequence transfer complete, close all sessions, end process life cycle
+
+ dsTransferProcess.close();
+ dSProcessMap.remove(dsInfo.getId());
+ }
+
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return returnCode;
+ }
+
+
+ /**
+ *
+ *
+ */
+ void setDSReader(DataSequenceReader reader) {
+
+ }
+
+
+}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java
new file mode 100644
index 00000000..90003712
--- /dev/null
+++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java
@@ -0,0 +1,192 @@
+package com.jd.blockchain.statetransfer.process;
+
+import com.jd.blockchain.statetransfer.DataSequenceElement;
+import com.jd.blockchain.statetransfer.DataSequenceInfo;
+import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
+import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;
+import com.jd.blockchain.statetransfer.message.DSMsgResolverFactory;
+import com.jd.blockchain.statetransfer.message.DataSequenceLoadMessage;
+import com.jd.blockchain.statetransfer.result.DSInfoResponseResult;
+import com.jd.blockchain.stp.communication.RemoteSession;
+import com.jd.blockchain.stp.communication.callback.CallBackBarrier;
+import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
+import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
+import com.jd.blockchain.stp.communication.node.RemoteNode;
+import com.jd.blockchain.utils.IllegalDataException;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+/**
+ *
+ * @author zhangshuang
+ * @create 2019/4/11
+ * @since 1.0.0
+ */
+public class DSTransferProcess {
+
+ private InetSocketAddress[] targets;
+ private DataSequenceWriter dsWriter;
+ private DataSequenceReader dsReader;
+ private DataSequenceInfo dsInfo;
+ private RemoteSessionManager remoteSessionManager;
+ private RemoteSession[] remoteSessions;
+ private String id;
+
+ /**
+ *
+ *
+ */
+ public DSTransferProcess(DataSequenceInfo dsInfo, InetSocketAddress[] targets) {
+ this.dsInfo = dsInfo;
+ this.targets = targets;
+ this.id = dsInfo.getId();
+ }
+
+ public void setDSWriter(DataSequenceWriter dsWriter) {
+ this.dsWriter = dsWriter;
+ }
+
+ public void setDSReader(DataSequenceReader dsReader) {
+ this.dsReader = dsReader;
+ }
+
+ public void setRemoteSessionManager(RemoteSessionManager remoteSessionManager) {
+ this.remoteSessionManager = remoteSessionManager;
+ }
+
+
+ /**
+ * get unique id from data sequence transfer process
+ *
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ *
+ *
+ */
+ CallBackDataListener send(DataSequenceMsgType msgType, RemoteSession remoteSession, long fromHeight, long toHeight, CallBackBarrier callBackBarrier) {
+
+ byte[] loadMessage = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(msgType, id, fromHeight, toHeight);
+
+ return remoteSession.asyncRequest(new DataSequenceLoadMessage(loadMessage), callBackBarrier);
+ }
+ /**
+ *
+ *
+ */
+ public DataSequenceElement[] computeDiffElement(byte[][] diffArray) {
+ DataSequenceElement[] dataSequenceElements = new DataSequenceElement[diffArray.length];
+ for (int i = 0 ; i < dataSequenceElements.length; i++) {
+ Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(diffArray[i]);
+ if (object instanceof DataSequenceElement) {
+ dataSequenceElements[i] = (DataSequenceElement) object;
+ }
+ else {
+ throw new IllegalDataException("Unknown instance object!");
+ }
+ }
+
+ return dataSequenceElements;
+ }
+
+ /**
+ *
+ *
+ */
+ public DSInfoResponseResult computeDiffInfo(Map responseMap) {
+ long maxHeight = 0;
+ RemoteSession maxHeightSession = null;
+
+ for (RemoteSession remoteSession : responseMap.keySet()) {
+ Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(responseMap.get(remoteSession));
+ if (object instanceof DataSequenceInfo) {
+ DataSequenceInfo dsInfo = (DataSequenceInfo) object;
+ long height = dsInfo.getHeight();
+ if (maxHeight < height) {
+ maxHeight = height;
+ maxHeightSession = remoteSession;
+ }
+ }
+ else {
+ throw new IllegalDataException("Unknown instance object!");
+ }
+
+ }
+
+ return new DSInfoResponseResult(maxHeight, maxHeightSession);
+ }
+
+ /**
+ *
+ *
+ */
+ public void getDSInfo(String id) {
+
+ }
+
+ /**
+ *
+ *
+ */
+ public RemoteSession[] getSessions() {
+ return remoteSessions;
+ }
+
+ /**
+ * close all sessions
+ *
+ */
+ public void close() {
+ for (RemoteSession session : remoteSessions) {
+ session.closeAll();
+ }
+ }
+
+ /**
+ * establish connections with target remote nodes
+ *
+ */
+ public void start() {
+
+ RemoteNode[] remoteNodes = new RemoteNode[targets.length];
+
+ for (int i = 0; i < remoteNodes.length; i++) {
+ remoteNodes[i] = new RemoteNode(targets[i].getHostName(), targets[i].getPort());
+ }
+
+ remoteSessions = remoteSessionManager.newSessions(remoteNodes);
+ }
+
+
+ /**
+ * data sequence transfer message type
+ *
+ */
+ public enum DataSequenceMsgType {
+ CMD_DSINFO_REQUEST((byte) 0x1),
+ CMD_DSINFO_RESPONSE((byte) 0x2),
+ CMD_GETDSDIFF_REQUEST((byte) 0x3),
+ CMD_GETDSDIFF_RESPONSE((byte) 0x4),
+ ;
+ public final byte CODE;
+
+ private DataSequenceMsgType(byte code) {
+ this.CODE = code;
+ }
+
+ public static DataSequenceMsgType valueOf(byte code) {
+ for (DataSequenceMsgType msgType : DataSequenceMsgType.values()) {
+ if (msgType.CODE == code) {
+ return msgType;
+ }
+ }
+ throw new IllegalArgumentException("Unsupported code[" + code + "] of msgType!");
+ }
+ }
+
+
+}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSDiffRequestResult.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSDiffRequestResult.java
new file mode 100644
index 00000000..f71ebbfb
--- /dev/null
+++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSDiffRequestResult.java
@@ -0,0 +1,39 @@
+package com.jd.blockchain.statetransfer.result;
+
+public class DSDiffRequestResult {
+
+ String id;
+ long fromHeight;
+ long toHeight;
+
+ public DSDiffRequestResult(String id ,long fromHeight, long toHeight) {
+ this.id = id;
+ this.fromHeight = fromHeight;
+ this.toHeight = toHeight;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public long getFromHeight() {
+ return fromHeight;
+ }
+
+ public long getToHeight() {
+ return toHeight;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public void setFromHeight(long fromHeight) {
+ this.fromHeight = fromHeight;
+ }
+
+ public void setToHeight(long toHeight) {
+ this.toHeight = toHeight;
+ }
+
+}
diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java
new file mode 100644
index 00000000..d0a57b26
--- /dev/null
+++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java
@@ -0,0 +1,37 @@
+package com.jd.blockchain.statetransfer.result;
+
+import com.jd.blockchain.stp.communication.RemoteSession;
+
+/**
+ *
+ *
+ *
+ *
+ */
+public class DSInfoResponseResult {
+
+ long maxHeight;
+ RemoteSession maxHeightSession;
+
+ public DSInfoResponseResult(long maxHeight, RemoteSession maxHeightSession) {
+ this.maxHeight = maxHeight;
+ this.maxHeightSession = maxHeightSession;
+ }
+
+ public long getMaxHeight() {
+ return maxHeight;
+ }
+
+ public RemoteSession getMaxHeightSession() {
+ return maxHeightSession;
+ }
+
+ public void setMaxHeight(long maxHeight) {
+ this.maxHeight = maxHeight;
+ }
+
+ public void setMaxHeightSession(RemoteSession maxHeightSession) {
+ this.maxHeightSession = maxHeightSession;
+ }
+
+}