Browse Source

add state transfer layer frame code

tags/1.0.0
zhangshuang 6 years ago
parent
commit
99f8063c87
12 changed files with 361 additions and 0 deletions
  1. +5
    -0
      source/peer/pom.xml
  2. +25
    -0
      source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java
  3. +22
    -0
      source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java
  4. +1
    -0
      source/pom.xml
  5. +23
    -0
      source/state-transfer/pom.xml
  6. +44
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSProcessManager.java
  7. +84
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java
  8. +49
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java
  9. +37
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceInfo.java
  10. +32
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java
  11. +22
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceReader.java
  12. +17
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceWriter.java

+ 5
- 0
source/peer/pom.xml View File

@@ -20,6 +20,11 @@
<artifactId>consensus-framework</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.jd.blockchain</groupId>
<artifactId>state-transfer</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.jd.blockchain</groupId>
<artifactId>ledger-rpc</artifactId>


+ 25
- 0
source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java View File

@@ -0,0 +1,25 @@
package com.jd.blockchain.peer.statetransfer;

import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo;
import com.jd.blockchain.statetransfer.DataSequenceReader;

/**
*数据序列差异的提供者需要使用的回调接口实现类
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*
*/
public class DataSequenceReaderImpl implements DataSequenceReader {

@Override
public DataSequenceInfo getDSInfo(String id) {
return null;
}

@Override
public DataSequenceElement[] getDSContent(String id, long from, long to) {
return null;
}
}

+ 22
- 0
source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java View File

@@ -0,0 +1,22 @@
package com.jd.blockchain.peer.statetransfer;

import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo;
import com.jd.blockchain.statetransfer.DataSequenceReader;
import com.jd.blockchain.statetransfer.DataSequenceWriter;

/**
*数据序列差异的请求者需要使用的回调接口实现类
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*
*/
public class DataSequenceWriterImpl implements DataSequenceWriter {


@Override
public void updateDSInfo(String id, DataSequenceElement[] diffContents) {

}
}

+ 1
- 0
source/pom.xml View File

@@ -32,6 +32,7 @@
<module>storage</module>
<module>gateway</module>
<module>peer</module>
<module>state-transfer</module>
<module>sdk</module>
<module>tools</module>
<module>test</module>


+ 23
- 0
source/state-transfer/pom.xml View File

@@ -0,0 +1,23 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.jd.blockchain</groupId>
<artifactId>jdchain-root</artifactId>
<version>0.9.0-SNAPSHOT</version>
</parent>
<artifactId>state-transfer</artifactId>
<dependencies>
<dependency>
<groupId>com.jd.blockchain</groupId>
<artifactId>stp-communication</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

+ 44
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSProcessManager.java View File

@@ -0,0 +1,44 @@
package com.jd.blockchain.statetransfer;

import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.RemoteSessionManager;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
*
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public class DSProcessManager {

private Map<String, DSTransferProcess> dSProcessMap = new ConcurrentHashMap<>();
private RemoteSession[] remoteSessions;

DSTransferProcess startDSProcess(DataSequenceInfo dsInfo, InetSocketAddress listener, InetSocketAddress[] targets, DataSequenceWriter dsWriter, DataSequenceReader dsReader) {

RemoteSessionManager remoteSessionManager = new RemoteSessionManager(listener.getPort());
DSTransferProcess dsTransferProcess = new DSTransferProcess(dsInfo, remoteSessionManager, targets, dsWriter, dsReader);

dsTransferProcess.start();
remoteSessions = dsTransferProcess.getSessions();

for(RemoteSession session : remoteSessions) {
dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_DSINFO, session);
}


dSProcessMap.put(dsInfo.getId(), dsTransferProcess);

return dsTransferProcess;
}

void setDSReader(DataSequenceReader reader) {

}


}

+ 84
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java View File

@@ -0,0 +1,84 @@
package com.jd.blockchain.statetransfer;

import com.jd.blockchain.stp.communication.RemoteNode;
import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.RemoteSessionManager;

import java.net.InetSocketAddress;

/**
*
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public class DSTransferProcess {

private InetSocketAddress[] targets;
private DataSequenceWriter dsWriter;
private DataSequenceReader dsReader;
private DataSequenceInfo dsInfo;
private RemoteSessionManager remoteSessionManager;
private RemoteSession[] remoteSessions;
private String id;


public DSTransferProcess(DataSequenceInfo dsInfo, RemoteSessionManager remoteSessionManager, InetSocketAddress[] targets, DataSequenceWriter dsWriter, DataSequenceReader dsReader) {

this.dsInfo = dsInfo;
this.targets = targets;
this.dsWriter = dsWriter;
this.dsReader = dsReader;
this.remoteSessionManager = remoteSessionManager;
this.id = dsInfo.getId();

}

void send(DataSequenceMsgType msgType, RemoteSession session) {

//session.send();

}

byte[] createMsg(DataSequenceMsgType msgType) {
return null;
}

public void computeDiff() {
//todo
}

public void getDSInfo(String id) {
//todo
}

public RemoteSession[] getSessions() {
//todo
return remoteSessions;
}

public void start() {

RemoteNode[] remoteNodes = new RemoteNode[targets.length];

for (int i = 0; i< remoteNodes.length; i++) {
remoteNodes[i] = new RemoteNode(targets[i].getHostName(), targets[i].getPort());
}

remoteSessions = remoteSessionManager.newSessions(remoteNodes);

for (int i = 0; i < remoteSessions.length; i++) {
DataSequenceMsgHandle msgHandle = new DataSequenceMsgHandle(dsReader, dsWriter);
remoteSessions[i].initHandler(msgHandle);
remoteSessions[i].connect();
}
}

enum DataSequenceMsgType {

CMD_DSINFO,
CMD_GETDSDIFF
}


}

+ 49
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java View File

@@ -0,0 +1,49 @@
package com.jd.blockchain.statetransfer;

/**
*数据序列复制的元素或单位
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public class DataSequenceElement {

//数据序列的唯一标识符;
private String id;

//数据序列的某个高度;
private long height;

//对应某个高度的数据序列内容
private byte[][] data;

public DataSequenceElement(String id, long height, byte[][] data) {
this.id = id;
this.height = height;
this.data = data;
}

public long getHeight() {
return height;
}

public void setHeight(long height) {
this.height = height;
}

public String getId() {
return id;
}

public void setId(String id) {
id = id;
}

public byte[][] getData() {
return data;
}

public void setData(byte[][] data) {
this.data = data;
}
}

+ 37
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceInfo.java View File

@@ -0,0 +1,37 @@
package com.jd.blockchain.statetransfer;

/**
*共识结点上的某个数据序列的当前状态信息,每个共识结点可以对应任意个数据序列;
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public class DataSequenceInfo {

//数据序列的唯一标识
private String id;

//数据序列的当前高度
private long height;

public DataSequenceInfo(String id, long height) {
this.id = id;
this.height = height;
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public long getHeight() {
return height;
}

public void setHeight(long height) {
this.height = height;
}
}

+ 32
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java View File

@@ -0,0 +1,32 @@
package com.jd.blockchain.statetransfer;

import com.jd.blockchain.stp.communication.MessageHandler;
import com.jd.blockchain.stp.communication.RemoteSession;

/**
*
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public class DataSequenceMsgHandle implements MessageHandler {

DataSequenceReader dsReader;
DataSequenceWriter dsWriter;

public DataSequenceMsgHandle(DataSequenceReader dsReader, DataSequenceWriter dsWriter) {
this.dsReader = dsReader;
this.dsWriter = dsWriter;
}

@Override
public void receive(byte[] key, byte[] data, RemoteSession session) {

}

/**
*
*
*/

}

+ 22
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceReader.java View File

@@ -0,0 +1,22 @@
package com.jd.blockchain.statetransfer;
/**
*数据序列差异的提供者需要使用的回调接口
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public interface DataSequenceReader {

/**
* 差异的提供者根据输入的数据序列标识符获取当前的数据序列信息;
*
*/
DataSequenceInfo getDSInfo(String id);


/**
* 差异的提供者根据输入的数据序列标识符以及起始,结束高度获得数据序列的差异内容;
*
*/
DataSequenceElement[] getDSContent(String id, long from, long to);
}

+ 17
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceWriter.java View File

@@ -0,0 +1,17 @@
package com.jd.blockchain.statetransfer;

/**
*数据序列差异的请求者获得差异内容后需要回调该接口,通过接口提供的方法对指定数据序列执行差异内容的重放,并更新数据序列的当前状态;
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public interface DataSequenceWriter {

/**
*更新数据序列的当前状态
* return void
*/
void updateDSInfo(String id, DataSequenceElement[] diffContents);

}

Loading…
Cancel
Save