Browse Source

add state transfer layer code

tags/1.0.0
zhangshuang 5 years ago
parent
commit
9b2fa94463
18 changed files with 822 additions and 206 deletions
  1. +11
    -0
      source/state-transfer/pom.xml
  2. +0
    -45
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSProcessManager.java
  3. +0
    -84
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java
  4. +5
    -1
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java
  5. +0
    -37
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java
  6. +0
    -22
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceReader.java
  7. +0
    -17
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceWriter.java
  8. +33
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceReader.java
  9. +25
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java
  10. +75
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java
  11. +15
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSMsgResolverFactory.java
  12. +24
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceLoadMessage.java
  13. +95
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgDecoder.java
  14. +116
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java
  15. +155
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java
  16. +192
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java
  17. +39
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSDiffRequestResult.java
  18. +37
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java

+ 11
- 0
source/state-transfer/pom.xml View File

@@ -15,6 +15,17 @@
<artifactId>stp-communication</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.jd.blockchain</groupId>
<artifactId>utils-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.jd.blockchain</groupId>
<artifactId>utils-serialize</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>


+ 0
- 45
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSProcessManager.java View File

@@ -1,45 +0,0 @@
package com.jd.blockchain.statetransfer;

import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
*
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public class DSProcessManager {

private Map<String, DSTransferProcess> dSProcessMap = new ConcurrentHashMap<>();
private RemoteSession[] remoteSessions;

DSTransferProcess startDSProcess(DataSequenceInfo dsInfo, InetSocketAddress listener, InetSocketAddress[] targets, DataSequenceWriter dsWriter, DataSequenceReader dsReader) {

// RemoteSessionManager remoteSessionManager = new RemoteSessionManager(listener.getPort());
RemoteSessionManager remoteSessionManager = null;
DSTransferProcess dsTransferProcess = new DSTransferProcess(dsInfo, remoteSessionManager, targets, dsWriter, dsReader);

dsTransferProcess.start();
remoteSessions = dsTransferProcess.getSessions();

for(RemoteSession session : remoteSessions) {
dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_DSINFO, session);
}


dSProcessMap.put(dsInfo.getId(), dsTransferProcess);

return dsTransferProcess;
}

void setDSReader(DataSequenceReader reader) {

}


}

+ 0
- 84
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DSTransferProcess.java View File

@@ -1,84 +0,0 @@
package com.jd.blockchain.statetransfer;

import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import com.jd.blockchain.stp.communication.node.RemoteNode;

import java.net.InetSocketAddress;

/**
*
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public class DSTransferProcess {

private InetSocketAddress[] targets;
private DataSequenceWriter dsWriter;
private DataSequenceReader dsReader;
private DataSequenceInfo dsInfo;
private RemoteSessionManager remoteSessionManager;
private RemoteSession[] remoteSessions;
private String id;


public DSTransferProcess(DataSequenceInfo dsInfo, RemoteSessionManager remoteSessionManager, InetSocketAddress[] targets, DataSequenceWriter dsWriter, DataSequenceReader dsReader) {

this.dsInfo = dsInfo;
this.targets = targets;
this.dsWriter = dsWriter;
this.dsReader = dsReader;
this.remoteSessionManager = remoteSessionManager;
this.id = dsInfo.getId();

}

void send(DataSequenceMsgType msgType, RemoteSession session) {

//session.send();

}

byte[] createMsg(DataSequenceMsgType msgType) {
return null;
}

public void computeDiff() {
//todo
}

public void getDSInfo(String id) {
//todo
}

public RemoteSession[] getSessions() {
//todo
return remoteSessions;
}

public void start() {

RemoteNode[] remoteNodes = new RemoteNode[targets.length];

for (int i = 0; i< remoteNodes.length; i++) {
remoteNodes[i] = new RemoteNode(targets[i].getHostName(), targets[i].getPort());
}

remoteSessions = remoteSessionManager.newSessions(remoteNodes);

for (int i = 0; i < remoteSessions.length; i++) {
DataSequenceMsgHandle msgHandle = new DataSequenceMsgHandle(dsReader, dsWriter);
remoteSessions[i].initExecutor(msgHandle);
remoteSessions[i].init();
}
}

enum DataSequenceMsgType {

CMD_DSINFO,
CMD_GETDSDIFF
}


}

+ 5
- 1
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceElement.java View File

@@ -1,12 +1,16 @@
package com.jd.blockchain.statetransfer;

import java.io.Serializable;

/**
*数据序列复制的元素或单位
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public class DataSequenceElement {
public class DataSequenceElement implements Serializable {

private static final long serialVersionUID = -719578198150380571L;

//数据序列的唯一标识符;
private String id;


+ 0
- 37
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceMsgHandle.java View File

@@ -1,37 +0,0 @@
package com.jd.blockchain.statetransfer;

import com.jd.blockchain.stp.communication.MessageExecutor;
import com.jd.blockchain.stp.communication.RemoteSession;

/**
*
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public class DataSequenceMsgHandle implements MessageExecutor {

DataSequenceReader dsReader;
DataSequenceWriter dsWriter;

public DataSequenceMsgHandle(DataSequenceReader dsReader, DataSequenceWriter dsWriter) {
this.dsReader = dsReader;
this.dsWriter = dsWriter;
}

@Override
public byte[] receive(String key, byte[] data, RemoteSession session) {
return new byte[0];
}

@Override
public REPLY replyType() {
return REPLY.AUTO;
}

/**
*
*
*/

}

+ 0
- 22
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceReader.java View File

@@ -1,22 +0,0 @@
package com.jd.blockchain.statetransfer;
/**
*数据序列差异的提供者需要使用的回调接口
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public interface DataSequenceReader {

/**
* 差异的提供者根据输入的数据序列标识符获取当前的数据序列信息;
*
*/
DataSequenceInfo getDSInfo(String id);


/**
* 差异的提供者根据输入的数据序列标识符以及起始,结束高度获得数据序列的差异内容;
*
*/
DataSequenceElement[] getDSContent(String id, long from, long to);
}

+ 0
- 17
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/DataSequenceWriter.java View File

@@ -1,17 +0,0 @@
package com.jd.blockchain.statetransfer;

/**
*数据序列差异的请求者获得差异内容后需要回调该接口,通过接口提供的方法对指定数据序列执行差异内容的重放,并更新数据序列的当前状态;
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public interface DataSequenceWriter {

/**
*更新数据序列的当前状态
* return void
*/
void updateDSInfo(String id, DataSequenceElement[] diffContents);

}

+ 33
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceReader.java View File

@@ -0,0 +1,33 @@
package com.jd.blockchain.statetransfer.callback;

import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo;

/**
*数据序列差异的提供者需要使用的回调接口
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public interface DataSequenceReader {

/**
* 差异的提供者根据输入的数据序列标识符获取当前的数据序列信息;
*
*/
DataSequenceInfo getDSInfo(String id);


/**
* 差异的提供者根据输入的数据序列标识符以及起始,结束高度提供数据序列的差异内容;
*
*/
DataSequenceElement[] getDSDiffContent(String id, long from, long to);


/**
* 差异的提供者根据输入的数据序列标识符以及高度提供数据序列的差异内容;
*
*/
DataSequenceElement getDSDiffContent(String id, long height);
}

+ 25
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java View File

@@ -0,0 +1,25 @@
package com.jd.blockchain.statetransfer.callback;

import com.jd.blockchain.statetransfer.DataSequenceElement;

/**
*数据序列差异的请求者获得差异内容后需要回调该接口,通过接口提供的方法对指定数据序列执行差异内容的重放,并更新数据序列的当前状态;
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public interface DataSequenceWriter {

/**
*更新数据序列的当前状态,一次更新多个高度的差异
* return void
*/
int updateDSInfo(String id, DataSequenceElement[] diffContents);

/**
*更新数据序列的当前状态,一次更新一个高度的差异
* return void
*/
int updateDSInfo(String id, DataSequenceElement diffContents);

}

+ 75
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java View File

@@ -0,0 +1,75 @@
package com.jd.blockchain.statetransfer.message;

import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;
import com.jd.blockchain.statetransfer.process.DSTransferProcess;
import com.jd.blockchain.statetransfer.result.DSDiffRequestResult;
import com.jd.blockchain.stp.communication.MessageExecutor;
import com.jd.blockchain.stp.communication.RemoteSession;

/**
*
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public class DSDefaultMessageExecutor implements MessageExecutor {

DataSequenceReader dsReader;
DataSequenceWriter dsWriter;

public DSDefaultMessageExecutor(DataSequenceReader dsReader, DataSequenceWriter dsWriter) {
this.dsReader = dsReader;
this.dsWriter = dsWriter;
}

/**
* 对状态机复制的请求进行响应
*
*/

@Override
public byte[] receive(String key, byte[] data, RemoteSession session) {

try {
Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(data);

if (object instanceof String) {

String id = (String)object;
byte[] respLoadMsg = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE, id, 0, 0);
session.reply(key, new DataSequenceLoadMessage(respLoadMsg));
}
else if (object instanceof DSDiffRequestResult) {

DSDiffRequestResult requestResult = (DSDiffRequestResult)object;
String id = requestResult.getId();
long fromHeight = requestResult.getFromHeight();
long toHeight = requestResult.getToHeight();
for (long i = fromHeight; i < toHeight + 1; i++) {
byte[] respLoadMsg = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE, id, i, i);
session.reply(key, new DataSequenceLoadMessage(respLoadMsg));
}
}
else {
throw new IllegalArgumentException("Receive data exception, unknown message type!");
}

} catch (Exception e) {
e.printStackTrace();
}

return null;
}

@Override
public REPLY replyType() {
return REPLY.MANUAL;
}

/**
*
*
*/

}

+ 15
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSMsgResolverFactory.java View File

@@ -0,0 +1,15 @@
package com.jd.blockchain.statetransfer.message;

import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;

public class DSMsgResolverFactory {

public static DataSequenceMsgEncoder getEncoder(DataSequenceWriter dsWriter, DataSequenceReader dsReader) {
return new DataSequenceMsgEncoder(dsWriter, dsReader);
}

public static DataSequenceMsgDecoder getDecoder(DataSequenceWriter dsWriter, DataSequenceReader dsReader) {
return new DataSequenceMsgDecoder(dsWriter, dsReader);
}
}

+ 24
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceLoadMessage.java View File

@@ -0,0 +1,24 @@
package com.jd.blockchain.statetransfer.message;

import com.jd.blockchain.stp.communication.message.LoadMessage;

/**
*
*
*/
public class DataSequenceLoadMessage implements LoadMessage {
byte[] bytes;

public DataSequenceLoadMessage(byte[] bytes) {
this.bytes = bytes;
}

public void setBytes(byte[] bytes) {
this.bytes = bytes;
}

@Override
public byte[] toBytes() {
return bytes;
}
}

+ 95
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgDecoder.java View File

@@ -0,0 +1,95 @@
package com.jd.blockchain.statetransfer.message;

import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo;
import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;
import com.jd.blockchain.statetransfer.process.DSTransferProcess;
import com.jd.blockchain.statetransfer.result.DSDiffRequestResult;
import com.jd.blockchain.utils.io.BytesUtils;
import com.jd.blockchain.utils.serialize.binary.BinarySerializeUtils;

public class DataSequenceMsgDecoder {

private int heightSize = 8;
private int msgTypeSize = 1;

private long respHeight;
private long fromHeight;
private long toHeight;
private int idSize;
private byte[] idBytes;
private String id;
private int diffElemSize;
private byte[] diffElem;
DataSequenceElement dsElement;

private DataSequenceWriter dsWriter;
private DataSequenceReader dsReader;

public DataSequenceMsgDecoder(DataSequenceWriter dsWriter, DataSequenceReader dsReader) {
this.dsWriter = dsWriter;
this.dsReader = dsReader;
}


/**
*
*
*/
public Object decode(byte[] loadMessage) {

try {
if (loadMessage.length <= 5) {
System.out.println("LoadMessage size is less than 5!");
throw new IllegalArgumentException();
}

int dataLength = BytesUtils.toInt(loadMessage, 0, 4);
byte msgCode = loadMessage[4];

if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE.CODE) {
respHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize);
idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize + heightSize, 4);
idBytes = new byte[idSize];
System.arraycopy(loadMessage, 4 + msgTypeSize + heightSize + 4, idBytes, 0, idSize);
id = new String(idBytes);
return new DataSequenceInfo(id, respHeight);
} else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE.CODE) {
diffElemSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize, 4);
diffElem = new byte[diffElemSize];
System.arraycopy(loadMessage, 4 + msgTypeSize + 4, diffElem, 0, diffElemSize);
dsElement = BinarySerializeUtils.deserialize(diffElem);
return dsElement;
} else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST.CODE) {
idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize, 4);
idBytes = new byte[idSize];
System.arraycopy(loadMessage, 4 + msgTypeSize + 4, idBytes, 0, idSize);
id = new String(idBytes);
return id;
} else if (msgCode == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST.CODE) {
fromHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize);
toHeight = BytesUtils.toLong(loadMessage, 4 + msgTypeSize + heightSize);
idSize = BytesUtils.toInt(loadMessage, 4 + msgTypeSize + heightSize + heightSize, 4);
idBytes = new byte[idSize];
System.arraycopy(loadMessage, 4 + msgTypeSize + heightSize + heightSize + 4, idBytes, 0, idSize);
id = new String(idBytes);
return new DSDiffRequestResult(id, fromHeight, toHeight);
}
else {
System.out.println("Unknown message type!");
throw new IllegalArgumentException();
}

} catch (Exception e) {
System.out.println("Error to decode message: " + e.getMessage() + "!");
e.printStackTrace();

}

return null;
}



}

+ 116
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java View File

@@ -0,0 +1,116 @@
package com.jd.blockchain.statetransfer.message;

import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;
import com.jd.blockchain.statetransfer.process.DSTransferProcess;
import com.jd.blockchain.utils.io.BytesUtils;
import com.jd.blockchain.utils.serialize.binary.BinarySerializeUtils;

public class DataSequenceMsgEncoder {

private int heightSize = 8;
private int msgTypeSize = 1;

private DataSequenceWriter dsWriter;
private DataSequenceReader dsReader;

public DataSequenceMsgEncoder(DataSequenceWriter dsWriter, DataSequenceReader dsReader) {
this.dsWriter = dsWriter;
this.dsReader = dsReader;
}

/**
* 目前暂时考虑fromHeight与toHeight相同的情况,即每次只对一个高度的差异编码并响应
*
*/
public byte[] encode(DSTransferProcess.DataSequenceMsgType msgType, String id, long fromHeight, long toHeight) {

try {

int dataLength;
int idSize = id.getBytes().length;
byte[] loadMessage = null;

// different encoding methods for different message types
if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST) {

// CMD_DSINFO_REQUEST Message parts : 4 bytes total message size, 1 byte message type coe,
// 4 bytes id length, id content size bytes

dataLength = 4 + msgTypeSize + 4 + idSize;

loadMessage = new byte[dataLength];

System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4);
System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize);
System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize, 4);
System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + 4, idSize);
} else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST) {

// CMD_GETDSDIFF_REQUEST Message parts : 4 bytes total message size, 1 byte message type coe, 8 bytes from height,
// 8 bytes to height, 4 bytes id length, id content size bytes

dataLength = 4 + msgTypeSize + heightSize + heightSize + 4 + idSize;

loadMessage = new byte[dataLength];

System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4);
System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize);
System.arraycopy(BytesUtils.toBytes(fromHeight), 0, loadMessage, 4 + msgTypeSize, heightSize);
System.arraycopy(BytesUtils.toBytes(toHeight), 0, loadMessage, 4 + msgTypeSize + heightSize, heightSize);
System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize + heightSize + heightSize, 4);
System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + heightSize + heightSize + 4, idSize);
} else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE) {

// CMD_DSINFO_RESPONSE Message parts : 4 bytes total message size, 1 byte message type coe, 8 bytes data sequence local height,
// 4 bytes id length, id content size bytes

dataLength = 4 + msgTypeSize + heightSize + 4 + idSize;

loadMessage = new byte[dataLength];

System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4);
System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize);
System.arraycopy(BytesUtils.toBytes(dsReader.getDSInfo(id).getHeight()), 0, loadMessage, 4 + msgTypeSize, heightSize);

System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize + heightSize, 4);
System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + heightSize + 4, idSize);

} else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE) {
if (fromHeight != toHeight) {
throw new IllegalArgumentException("Height parameter error!");
}

// CMD_DSINFO_RESPONSE Message parts : 4 bytes total message size, 1 byte message type coe,
// 4 bytes diffElem size, diff content size;

// 回调reader,获得这个高度上的所有差异的数据序列内容,并组织成DataSequenceElement结构
DataSequenceElement element = dsReader.getDSDiffContent(id, fromHeight);

byte[] diffElem = BinarySerializeUtils.serialize(element);

dataLength = 4 + msgTypeSize + 4 + diffElem.length;
loadMessage = new byte[dataLength];

System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4); //total size
System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize); //msgType size
System.arraycopy(BytesUtils.toBytes(diffElem.length), 0, loadMessage, 4 + msgTypeSize, 4); // diffElem size
System.arraycopy(diffElem, 0, loadMessage, 4 + msgTypeSize + 4, diffElem.length); // diffElem bytes
}
else {
System.out.println("Unknown message type!");
throw new IllegalArgumentException();
}

return loadMessage;

} catch (Exception e) {
System.out.println("Error to encode message type : " + msgType + "!");
e.printStackTrace();
}

return null;
}

}

+ 155
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java View File

@@ -0,0 +1,155 @@
package com.jd.blockchain.statetransfer.process;

import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo;
import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;
import com.jd.blockchain.statetransfer.message.DSDefaultMessageExecutor;
import com.jd.blockchain.statetransfer.result.DSInfoResponseResult;
import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.callback.CallBackBarrier;
import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import com.jd.blockchain.stp.communication.node.LocalNode;
import com.jd.blockchain.utils.concurrent.CompletableAsyncFuture;

import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.*;

/**
*
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public class DSProcessManager {

private static Map<String, DSTransferProcess> dSProcessMap = new ConcurrentHashMap<>();
private RemoteSession[] remoteSessions;
private long dsInfoResponseTimeout = 2000;
private ExecutorService writeExecutors = Executors.newFixedThreadPool(5);
private int returnCode = 0;
/**
*
*
*/
public int startDSProcess(DataSequenceInfo dsInfo, InetSocketAddress listener, InetSocketAddress[] targets, DataSequenceWriter dsWriter, DataSequenceReader dsReader) {

// create remote sessions manager, add listener

LocalNode listenNode = new LocalNode(listener.getHostName(), listener.getPort(), new DSDefaultMessageExecutor(dsReader, dsWriter));

RemoteSessionManager remoteSessionManager = new RemoteSessionManager(listenNode);

// data sequence transfer process life cycle start
DSTransferProcess dsTransferProcess = new DSTransferProcess(dsInfo, targets);
dsTransferProcess.setDSReader(dsReader);
dsTransferProcess.setDSWriter(dsWriter);
dsTransferProcess.setRemoteSessionManager(remoteSessionManager);

dSProcessMap.put(dsInfo.getId(), dsTransferProcess);

try {
// start network connections with targets
dsTransferProcess.start();

//get all target sessions
remoteSessions = dsTransferProcess.getSessions();

// async message send process
CallBackBarrier callBackBarrier = CallBackBarrier.newCallBackBarrier(remoteSessions.length, dsInfoResponseTimeout);

// response message manage map
Map<RemoteSession, CallBackDataListener> dsInfoResponses = new ConcurrentHashMap<>();

// step1: send get dsInfo request, then hold
for (RemoteSession remoteSession : remoteSessions) {

CallBackDataListener dsInfoResponse = dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST, remoteSession, 0, 0, callBackBarrier);

dsInfoResponses.put(remoteSession, dsInfoResponse);
}

// step2: collect get dsInfo response
Map<RemoteSession, byte[]> receiveResponses = new ConcurrentHashMap<>();
if (callBackBarrier.tryCall()) {
for (RemoteSession remoteSession : dsInfoResponses.keySet()) {
CallBackDataListener asyncFuture = dsInfoResponses.get(remoteSession);
// if really done
if (asyncFuture.isDone()) {
receiveResponses.put(remoteSession, asyncFuture.getCallBackData());
}
}
}

// step3: process received responses
DSInfoResponseResult diffResult = dsTransferProcess.computeDiffInfo(receiveResponses);

// height diff
long diff = dsInfo.getHeight() - diffResult.getMaxHeight();

if (diff == 0 || diff > 0) {
// no duplication is required, life cycle ends
dsTransferProcess.close();
dSProcessMap.remove(dsInfo.getId());
return returnCode;

}
else {

// step4: async send get data sequence diff request
// single step get diff
// async message send process
CallBackBarrier callBackBarrierDiff = CallBackBarrier.newCallBackBarrier((int)(diffResult.getMaxHeight() - dsInfo.getHeight()), dsInfoResponseTimeout);
LinkedList<CallBackDataListener> dsDiffResponses = new LinkedList<>();

// step5: collect get data sequence diff response
for (long height = dsInfo.getHeight() + 1; height < diffResult.getMaxHeight() + 1; height++) {
CallBackDataListener dsDiffResponse = dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST, diffResult.getMaxHeightSession(), height, height, callBackBarrierDiff);
dsDiffResponses.addLast(dsDiffResponse);
}

// 考虑性能
// writeExecutors.execute(() -> {
//
// });

LinkedList<byte[]> receiveDiffResponses = new LinkedList<>();
if (callBackBarrierDiff.tryCall()) {
for (int i = 0; i < dsDiffResponses.size(); i++) {
CallBackDataListener asyncFutureDiff = dsDiffResponses.get(i);
if (asyncFutureDiff.isDone()) {
receiveDiffResponses.addLast(asyncFutureDiff.getCallBackData());
}
}
}
// step6: process data sequence diff response, update local data sequence state
DataSequenceElement[] dataSequenceElements = dsTransferProcess.computeDiffElement(receiveDiffResponses.toArray(new byte[receiveDiffResponses.size()][]));
returnCode = dsWriter.updateDSInfo(dsInfo.getId(), dataSequenceElements);

// data sequence transfer complete, close all sessions, end process life cycle

dsTransferProcess.close();
dSProcessMap.remove(dsInfo.getId());
}


} catch (Exception e) {
e.printStackTrace();
}
return returnCode;
}


/**
*
*
*/
void setDSReader(DataSequenceReader reader) {

}


}

+ 192
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java View File

@@ -0,0 +1,192 @@
package com.jd.blockchain.statetransfer.process;

import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo;
import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;
import com.jd.blockchain.statetransfer.message.DSMsgResolverFactory;
import com.jd.blockchain.statetransfer.message.DataSequenceLoadMessage;
import com.jd.blockchain.statetransfer.result.DSInfoResponseResult;
import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.callback.CallBackBarrier;
import com.jd.blockchain.stp.communication.callback.CallBackDataListener;
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager;
import com.jd.blockchain.stp.communication.node.RemoteNode;
import com.jd.blockchain.utils.IllegalDataException;

import java.net.InetSocketAddress;
import java.util.Map;

/**
*
* @author zhangshuang
* @create 2019/4/11
* @since 1.0.0
*/
public class DSTransferProcess {

private InetSocketAddress[] targets;
private DataSequenceWriter dsWriter;
private DataSequenceReader dsReader;
private DataSequenceInfo dsInfo;
private RemoteSessionManager remoteSessionManager;
private RemoteSession[] remoteSessions;
private String id;

/**
*
*
*/
public DSTransferProcess(DataSequenceInfo dsInfo, InetSocketAddress[] targets) {
this.dsInfo = dsInfo;
this.targets = targets;
this.id = dsInfo.getId();
}

public void setDSWriter(DataSequenceWriter dsWriter) {
this.dsWriter = dsWriter;
}

public void setDSReader(DataSequenceReader dsReader) {
this.dsReader = dsReader;
}

public void setRemoteSessionManager(RemoteSessionManager remoteSessionManager) {
this.remoteSessionManager = remoteSessionManager;
}


/**
* get unique id from data sequence transfer process
*
*/
public String getId() {
return id;
}

/**
*
*
*/
CallBackDataListener send(DataSequenceMsgType msgType, RemoteSession remoteSession, long fromHeight, long toHeight, CallBackBarrier callBackBarrier) {

byte[] loadMessage = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(msgType, id, fromHeight, toHeight);

return remoteSession.asyncRequest(new DataSequenceLoadMessage(loadMessage), callBackBarrier);
}
/**
*
*
*/
public DataSequenceElement[] computeDiffElement(byte[][] diffArray) {
DataSequenceElement[] dataSequenceElements = new DataSequenceElement[diffArray.length];
for (int i = 0 ; i < dataSequenceElements.length; i++) {
Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(diffArray[i]);
if (object instanceof DataSequenceElement) {
dataSequenceElements[i] = (DataSequenceElement) object;
}
else {
throw new IllegalDataException("Unknown instance object!");
}
}

return dataSequenceElements;
}

/**
*
*
*/
public DSInfoResponseResult computeDiffInfo(Map<RemoteSession, byte[]> responseMap) {
long maxHeight = 0;
RemoteSession maxHeightSession = null;

for (RemoteSession remoteSession : responseMap.keySet()) {
Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(responseMap.get(remoteSession));
if (object instanceof DataSequenceInfo) {
DataSequenceInfo dsInfo = (DataSequenceInfo) object;
long height = dsInfo.getHeight();
if (maxHeight < height) {
maxHeight = height;
maxHeightSession = remoteSession;
}
}
else {
throw new IllegalDataException("Unknown instance object!");
}

}

return new DSInfoResponseResult(maxHeight, maxHeightSession);
}

/**
*
*
*/
public void getDSInfo(String id) {

}

/**
*
*
*/
public RemoteSession[] getSessions() {
return remoteSessions;
}

/**
* close all sessions
*
*/
public void close() {
for (RemoteSession session : remoteSessions) {
session.closeAll();
}
}

/**
* establish connections with target remote nodes
*
*/
public void start() {

RemoteNode[] remoteNodes = new RemoteNode[targets.length];

for (int i = 0; i < remoteNodes.length; i++) {
remoteNodes[i] = new RemoteNode(targets[i].getHostName(), targets[i].getPort());
}

remoteSessions = remoteSessionManager.newSessions(remoteNodes);
}


/**
* data sequence transfer message type
*
*/
public enum DataSequenceMsgType {
CMD_DSINFO_REQUEST((byte) 0x1),
CMD_DSINFO_RESPONSE((byte) 0x2),
CMD_GETDSDIFF_REQUEST((byte) 0x3),
CMD_GETDSDIFF_RESPONSE((byte) 0x4),
;
public final byte CODE;

private DataSequenceMsgType(byte code) {
this.CODE = code;
}

public static DataSequenceMsgType valueOf(byte code) {
for (DataSequenceMsgType msgType : DataSequenceMsgType.values()) {
if (msgType.CODE == code) {
return msgType;
}
}
throw new IllegalArgumentException("Unsupported code[" + code + "] of msgType!");
}
}


}

+ 39
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSDiffRequestResult.java View File

@@ -0,0 +1,39 @@
package com.jd.blockchain.statetransfer.result;

public class DSDiffRequestResult {

String id;
long fromHeight;
long toHeight;

public DSDiffRequestResult(String id ,long fromHeight, long toHeight) {
this.id = id;
this.fromHeight = fromHeight;
this.toHeight = toHeight;
}

public String getId() {
return id;
}

public long getFromHeight() {
return fromHeight;
}

public long getToHeight() {
return toHeight;
}

public void setId(String id) {
this.id = id;
}

public void setFromHeight(long fromHeight) {
this.fromHeight = fromHeight;
}

public void setToHeight(long toHeight) {
this.toHeight = toHeight;
}

}

+ 37
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java View File

@@ -0,0 +1,37 @@
package com.jd.blockchain.statetransfer.result;

import com.jd.blockchain.stp.communication.RemoteSession;

/**
*
*
*
*
*/
public class DSInfoResponseResult {

long maxHeight;
RemoteSession maxHeightSession;

public DSInfoResponseResult(long maxHeight, RemoteSession maxHeightSession) {
this.maxHeight = maxHeight;
this.maxHeightSession = maxHeightSession;
}

public long getMaxHeight() {
return maxHeight;
}

public RemoteSession getMaxHeightSession() {
return maxHeightSession;
}

public void setMaxHeight(long maxHeight) {
this.maxHeight = maxHeight;
}

public void setMaxHeightSession(RemoteSession maxHeightSession) {
this.maxHeightSession = maxHeightSession;
}

}

Loading…
Cancel
Save