From 99f8063c8740ce7d5e324b750b75048a7ceac6d9 Mon Sep 17 00:00:00 2001 From: zhangshuang Date: Thu, 11 Apr 2019 16:51:04 +0800 Subject: [PATCH] add state transfer layer frame code --- source/peer/pom.xml | 5 ++ .../statetransfer/DataSequenceReaderImpl.java | 25 ++++++ .../statetransfer/DataSequenceWriterImpl.java | 22 +++++ source/pom.xml | 1 + source/state-transfer/pom.xml | 23 +++++ .../statetransfer/DSProcessManager.java | 44 ++++++++++ .../statetransfer/DSTransferProcess.java | 84 +++++++++++++++++++ .../statetransfer/DataSequenceElement.java | 49 +++++++++++ .../statetransfer/DataSequenceInfo.java | 37 ++++++++ .../statetransfer/DataSequenceMsgHandle.java | 32 +++++++ .../statetransfer/DataSequenceReader.java | 22 +++++ .../statetransfer/DataSequenceWriter.java | 17 ++++ 12 files changed, 361 insertions(+) create mode 100644 source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java create mode 100644 source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java create mode 100644 source/state-transfer/pom.xml create mode 100644 source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSProcessManager.java create mode 100644 source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java create mode 100644 source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java create mode 100644 source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceInfo.java create mode 100644 source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java create mode 100644 source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceReader.java create mode 100644 source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceWriter.java diff --git a/source/peer/pom.xml b/source/peer/pom.xml index 01311d5c..51746e7d 100644 --- a/source/peer/pom.xml +++ b/source/peer/pom.xml @@ -20,6 +20,11 @@ consensus-framework ${project.version} + + com.jd.blockchain + state-transfer + ${project.version} + com.jd.blockchain ledger-rpc diff --git a/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java new file mode 100644 index 00000000..cd2677ac --- /dev/null +++ b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java @@ -0,0 +1,25 @@ +package com.jd.blockchain.peer.statetransfer; + +import com.jd.blockchain.statetransfer.DataSequenceElement; +import com.jd.blockchain.statetransfer.DataSequenceInfo; +import com.jd.blockchain.statetransfer.DataSequenceReader; + +/** + *数据序列差异的提供者需要使用的回调接口实现类 + * @author zhangshuang + * @create 2019/4/11 + * @since 1.0.0 + * + */ +public class DataSequenceReaderImpl implements DataSequenceReader { + + @Override + public DataSequenceInfo getDSInfo(String id) { + return null; + } + + @Override + public DataSequenceElement[] getDSContent(String id, long from, long to) { + return null; + } +} diff --git a/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java new file mode 100644 index 00000000..e2c48601 --- /dev/null +++ b/source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java @@ -0,0 +1,22 @@ +package com.jd.blockchain.peer.statetransfer; + +import com.jd.blockchain.statetransfer.DataSequenceElement; +import com.jd.blockchain.statetransfer.DataSequenceInfo; +import com.jd.blockchain.statetransfer.DataSequenceReader; +import com.jd.blockchain.statetransfer.DataSequenceWriter; + +/** + *数据序列差异的请求者需要使用的回调接口实现类 + * @author zhangshuang + * @create 2019/4/11 + * @since 1.0.0 + * + */ +public class DataSequenceWriterImpl implements DataSequenceWriter { + + + @Override + public void updateDSInfo(String id, DataSequenceElement[] diffContents) { + + } +} diff --git a/source/pom.xml b/source/pom.xml index cee45061..0a03b26c 100644 --- a/source/pom.xml +++ b/source/pom.xml @@ -32,6 +32,7 @@ storage gateway peer + state-transfer sdk tools test diff --git a/source/state-transfer/pom.xml b/source/state-transfer/pom.xml new file mode 100644 index 00000000..9c3f04be --- /dev/null +++ b/source/state-transfer/pom.xml @@ -0,0 +1,23 @@ + + 4.0.0 + + com.jd.blockchain + jdchain-root + 0.9.0-SNAPSHOT + + state-transfer + + + + com.jd.blockchain + stp-communication + ${project.version} + + + + + + + diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSProcessManager.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSProcessManager.java new file mode 100644 index 00000000..cc849eba --- /dev/null +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSProcessManager.java @@ -0,0 +1,44 @@ +package com.jd.blockchain.statetransfer; + +import com.jd.blockchain.stp.communication.RemoteSession; +import com.jd.blockchain.stp.communication.RemoteSessionManager; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + * @author zhangshuang + * @create 2019/4/11 + * @since 1.0.0 + */ +public class DSProcessManager { + + private Map dSProcessMap = new ConcurrentHashMap<>(); + private RemoteSession[] remoteSessions; + + DSTransferProcess startDSProcess(DataSequenceInfo dsInfo, InetSocketAddress listener, InetSocketAddress[] targets, DataSequenceWriter dsWriter, DataSequenceReader dsReader) { + + RemoteSessionManager remoteSessionManager = new RemoteSessionManager(listener.getPort()); + DSTransferProcess dsTransferProcess = new DSTransferProcess(dsInfo, remoteSessionManager, targets, dsWriter, dsReader); + + dsTransferProcess.start(); + remoteSessions = dsTransferProcess.getSessions(); + + for(RemoteSession session : remoteSessions) { + dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_DSINFO, session); + } + + + dSProcessMap.put(dsInfo.getId(), dsTransferProcess); + + return dsTransferProcess; + } + + void setDSReader(DataSequenceReader reader) { + + } + + +} diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java new file mode 100644 index 00000000..55a1402c --- /dev/null +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java @@ -0,0 +1,84 @@ +package com.jd.blockchain.statetransfer; + +import com.jd.blockchain.stp.communication.RemoteNode; +import com.jd.blockchain.stp.communication.RemoteSession; +import com.jd.blockchain.stp.communication.RemoteSessionManager; + +import java.net.InetSocketAddress; + +/** + * + * @author zhangshuang + * @create 2019/4/11 + * @since 1.0.0 + */ +public class DSTransferProcess { + + private InetSocketAddress[] targets; + private DataSequenceWriter dsWriter; + private DataSequenceReader dsReader; + private DataSequenceInfo dsInfo; + private RemoteSessionManager remoteSessionManager; + private RemoteSession[] remoteSessions; + private String id; + + + public DSTransferProcess(DataSequenceInfo dsInfo, RemoteSessionManager remoteSessionManager, InetSocketAddress[] targets, DataSequenceWriter dsWriter, DataSequenceReader dsReader) { + + this.dsInfo = dsInfo; + this.targets = targets; + this.dsWriter = dsWriter; + this.dsReader = dsReader; + this.remoteSessionManager = remoteSessionManager; + this.id = dsInfo.getId(); + + } + + void send(DataSequenceMsgType msgType, RemoteSession session) { + + //session.send(); + + } + + byte[] createMsg(DataSequenceMsgType msgType) { + return null; + } + + public void computeDiff() { + //todo + } + + public void getDSInfo(String id) { + //todo + } + + public RemoteSession[] getSessions() { + //todo + return remoteSessions; + } + + public void start() { + + RemoteNode[] remoteNodes = new RemoteNode[targets.length]; + + for (int i = 0; i< remoteNodes.length; i++) { + remoteNodes[i] = new RemoteNode(targets[i].getHostName(), targets[i].getPort()); + } + + remoteSessions = remoteSessionManager.newSessions(remoteNodes); + + for (int i = 0; i < remoteSessions.length; i++) { + DataSequenceMsgHandle msgHandle = new DataSequenceMsgHandle(dsReader, dsWriter); + remoteSessions[i].initHandler(msgHandle); + remoteSessions[i].connect(); + } + } + + enum DataSequenceMsgType { + + CMD_DSINFO, + CMD_GETDSDIFF + } + + +} diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java new file mode 100644 index 00000000..e1e904ed --- /dev/null +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java @@ -0,0 +1,49 @@ +package com.jd.blockchain.statetransfer; + +/** + *数据序列复制的元素或单位 + * @author zhangshuang + * @create 2019/4/11 + * @since 1.0.0 + */ +public class DataSequenceElement { + + //数据序列的唯一标识符; + private String id; + + //数据序列的某个高度; + private long height; + + //对应某个高度的数据序列内容 + private byte[][] data; + + public DataSequenceElement(String id, long height, byte[][] data) { + this.id = id; + this.height = height; + this.data = data; + } + + public long getHeight() { + return height; + } + + public void setHeight(long height) { + this.height = height; + } + + public String getId() { + return id; + } + + public void setId(String id) { + id = id; + } + + public byte[][] getData() { + return data; + } + + public void setData(byte[][] data) { + this.data = data; + } +} diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceInfo.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceInfo.java new file mode 100644 index 00000000..8aa45566 --- /dev/null +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceInfo.java @@ -0,0 +1,37 @@ +package com.jd.blockchain.statetransfer; + +/** + *共识结点上的某个数据序列的当前状态信息,每个共识结点可以对应任意个数据序列; + * @author zhangshuang + * @create 2019/4/11 + * @since 1.0.0 + */ +public class DataSequenceInfo { + + //数据序列的唯一标识 + private String id; + + //数据序列的当前高度 + private long height; + + public DataSequenceInfo(String id, long height) { + this.id = id; + this.height = height; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public long getHeight() { + return height; + } + + public void setHeight(long height) { + this.height = height; + } +} diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java new file mode 100644 index 00000000..878869ad --- /dev/null +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java @@ -0,0 +1,32 @@ +package com.jd.blockchain.statetransfer; + +import com.jd.blockchain.stp.communication.MessageHandler; +import com.jd.blockchain.stp.communication.RemoteSession; + +/** + * + * @author zhangshuang + * @create 2019/4/11 + * @since 1.0.0 + */ +public class DataSequenceMsgHandle implements MessageHandler { + + DataSequenceReader dsReader; + DataSequenceWriter dsWriter; + + public DataSequenceMsgHandle(DataSequenceReader dsReader, DataSequenceWriter dsWriter) { + this.dsReader = dsReader; + this.dsWriter = dsWriter; + } + + @Override + public void receive(byte[] key, byte[] data, RemoteSession session) { + + } + + /** + * + * + */ + +} diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceReader.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceReader.java new file mode 100644 index 00000000..5662c117 --- /dev/null +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceReader.java @@ -0,0 +1,22 @@ +package com.jd.blockchain.statetransfer; +/** + *数据序列差异的提供者需要使用的回调接口 + * @author zhangshuang + * @create 2019/4/11 + * @since 1.0.0 + */ +public interface DataSequenceReader { + + /** + * 差异的提供者根据输入的数据序列标识符获取当前的数据序列信息; + * + */ + DataSequenceInfo getDSInfo(String id); + + + /** + * 差异的提供者根据输入的数据序列标识符以及起始,结束高度获得数据序列的差异内容; + * + */ + DataSequenceElement[] getDSContent(String id, long from, long to); +} diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceWriter.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceWriter.java new file mode 100644 index 00000000..0c45de18 --- /dev/null +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceWriter.java @@ -0,0 +1,17 @@ +package com.jd.blockchain.statetransfer; + +/** + *数据序列差异的请求者获得差异内容后需要回调该接口,通过接口提供的方法对指定数据序列执行差异内容的重放,并更新数据序列的当前状态; + * @author zhangshuang + * @create 2019/4/11 + * @since 1.0.0 + */ +public interface DataSequenceWriter { + + /** + *更新数据序列的当前状态 + * return void + */ + void updateDSInfo(String id, DataSequenceElement[] diffContents); + +}