@@ -35,9 +35,8 @@ public class DSDefaultMessageExecutor implements MessageExecutor { | |||
Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(data); | |||
if (object instanceof String) { | |||
String id = (String)object; | |||
byte[] respLoadMsg = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE, id, 0, 0); | |||
byte[] respLoadMsg = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE, id, 0, 0); | |||
session.reply(key, new DataSequenceLoadMessage(respLoadMsg)); | |||
} | |||
else if (object instanceof DSDiffRequestResult) { | |||
@@ -43,7 +43,7 @@ public class DataSequenceMsgEncoder { | |||
loadMessage = new byte[dataLength]; | |||
System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4); | |||
System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize); | |||
loadMessage[4] = msgType.CODE; | |||
System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize, 4); | |||
System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + 4, idSize); | |||
} else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST) { | |||
@@ -56,12 +56,12 @@ public class DataSequenceMsgEncoder { | |||
loadMessage = new byte[dataLength]; | |||
System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4); | |||
System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize); | |||
loadMessage[4] = msgType.CODE; | |||
System.arraycopy(BytesUtils.toBytes(fromHeight), 0, loadMessage, 4 + msgTypeSize, heightSize); | |||
System.arraycopy(BytesUtils.toBytes(toHeight), 0, loadMessage, 4 + msgTypeSize + heightSize, heightSize); | |||
System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize + heightSize + heightSize, 4); | |||
System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + heightSize + heightSize + 4, idSize); | |||
} else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE) { | |||
} else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE) { | |||
// CMD_DSINFO_RESPONSE Message parts : 4 bytes total message size, 1 byte message type coe, 8 bytes data sequence local height, | |||
// 4 bytes id length, id content size bytes | |||
@@ -71,7 +71,7 @@ public class DataSequenceMsgEncoder { | |||
loadMessage = new byte[dataLength]; | |||
System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4); | |||
System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize); | |||
loadMessage[4] = msgType.CODE; | |||
System.arraycopy(BytesUtils.toBytes(dsReader.getDSInfo(id).getHeight()), 0, loadMessage, 4 + msgTypeSize, heightSize); | |||
System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize + heightSize, 4); | |||
@@ -94,7 +94,7 @@ public class DataSequenceMsgEncoder { | |||
loadMessage = new byte[dataLength]; | |||
System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4); //total size | |||
System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize); //msgType size | |||
loadMessage[4] = msgType.CODE; //msgType size | |||
System.arraycopy(BytesUtils.toBytes(diffElem.length), 0, loadMessage, 4 + msgTypeSize, 4); // diffElem size | |||
System.arraycopy(diffElem, 0, loadMessage, 4 + msgTypeSize + 4, diffElem.length); // diffElem bytes | |||
} | |||
@@ -4,6 +4,7 @@ import com.jd.blockchain.statetransfer.DataSequenceElement; | |||
import com.jd.blockchain.statetransfer.DataSequenceInfo; | |||
import com.jd.blockchain.statetransfer.callback.DataSequenceReader; | |||
import com.jd.blockchain.statetransfer.callback.DataSequenceWriter; | |||
import com.jd.blockchain.statetransfer.comparator.DataSequenceComparator; | |||
import com.jd.blockchain.statetransfer.message.DSDefaultMessageExecutor; | |||
import com.jd.blockchain.statetransfer.result.DSInfoResponseResult; | |||
import com.jd.blockchain.stp.communication.RemoteSession; | |||
@@ -11,11 +12,11 @@ import com.jd.blockchain.stp.communication.callback.CallBackBarrier; | |||
import com.jd.blockchain.stp.communication.callback.CallBackDataListener; | |||
import com.jd.blockchain.stp.communication.manager.RemoteSessionManager; | |||
import com.jd.blockchain.stp.communication.node.LocalNode; | |||
import com.jd.blockchain.stp.communication.node.RemoteNode; | |||
import com.jd.blockchain.utils.concurrent.CompletableAsyncFuture; | |||
import java.net.InetSocketAddress; | |||
import java.util.LinkedList; | |||
import java.util.Map; | |||
import java.util.*; | |||
import java.util.concurrent.*; | |||
/** | |||
@@ -28,7 +29,7 @@ public class DSProcessManager { | |||
private static Map<String, DSTransferProcess> dSProcessMap = new ConcurrentHashMap<>(); | |||
private RemoteSession[] remoteSessions; | |||
private long dsInfoResponseTimeout = 2000; | |||
private long dsInfoResponseTimeout = 20000; | |||
private ExecutorService writeExecutors = Executors.newFixedThreadPool(5); | |||
private int returnCode = 0; | |||
/** | |||
@@ -52,6 +53,10 @@ public class DSProcessManager { | |||
dSProcessMap.put(dsInfo.getId(), dsTransferProcess); | |||
try { | |||
//wait all listener nodes start | |||
Thread.sleep(10000); | |||
// start network connections with targets | |||
dsTransferProcess.start(); | |||
@@ -62,52 +67,58 @@ public class DSProcessManager { | |||
CallBackBarrier callBackBarrier = CallBackBarrier.newCallBackBarrier(remoteSessions.length, dsInfoResponseTimeout); | |||
// response message manage map | |||
Map<RemoteSession, CallBackDataListener> dsInfoResponses = new ConcurrentHashMap<>(); | |||
LinkedList<CallBackDataListener> dsInfoResponses = new LinkedList<>(); | |||
System.out.println("Async send CMD_DSINFO_REQUEST msg to targets will start!"); | |||
// step1: send get dsInfo request, then hold | |||
for (RemoteSession remoteSession : remoteSessions) { | |||
CallBackDataListener dsInfoResponse = dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST, remoteSession, 0, 0, callBackBarrier); | |||
dsInfoResponses.put(remoteSession, dsInfoResponse); | |||
dsInfoResponses.addLast(dsInfoResponse); | |||
} | |||
System.out.println("Wait CMD_DSINFO_RESPONSE msg from targets!"); | |||
// step2: collect get dsInfo response | |||
Map<RemoteSession, byte[]> receiveResponses = new ConcurrentHashMap<>(); | |||
LinkedList<CallBackDataListener> receiveResponses = new LinkedList<>(); | |||
if (callBackBarrier.tryCall()) { | |||
for (RemoteSession remoteSession : dsInfoResponses.keySet()) { | |||
CallBackDataListener asyncFuture = dsInfoResponses.get(remoteSession); | |||
// if really done | |||
if (asyncFuture.isDone()) { | |||
receiveResponses.put(remoteSession, asyncFuture.getCallBackData()); | |||
Iterator<CallBackDataListener> iterator = dsInfoResponses.iterator(); | |||
while (iterator.hasNext()) { | |||
CallBackDataListener receiveResponse = iterator.next(); | |||
if (receiveResponse.isDone()) { | |||
receiveResponses.addLast(receiveResponse); | |||
} | |||
} | |||
} | |||
System.out.println("Compute diff info!"); | |||
// step3: process received responses | |||
DSInfoResponseResult diffResult = dsTransferProcess.computeDiffInfo(receiveResponses); | |||
System.out.println("Diff info result height = " + diffResult.getMaxHeight() + "!"); | |||
// height diff | |||
long diff = dsInfo.getHeight() - diffResult.getMaxHeight(); | |||
if (diff == 0 || diff > 0) { | |||
System.out.println("No duplication is required!"); | |||
// no duplication is required, life cycle ends | |||
dsTransferProcess.close(); | |||
// dsTransferProcess.close(); | |||
dSProcessMap.remove(dsInfo.getId()); | |||
return returnCode; | |||
} | |||
else { | |||
System.out.println("Duplication is required!"); | |||
// step4: async send get data sequence diff request | |||
// single step get diff | |||
// async message send process | |||
CallBackBarrier callBackBarrierDiff = CallBackBarrier.newCallBackBarrier((int)(diffResult.getMaxHeight() - dsInfo.getHeight()), dsInfoResponseTimeout); | |||
LinkedList<CallBackDataListener> dsDiffResponses = new LinkedList<>(); | |||
RemoteSession responseSession = findResponseSession(diffResult.getMaxHeightRemoteNode(), remoteSessions); | |||
System.out.println("Async send CMD_GETDSDIFF_REQUEST msg to targets will start!"); | |||
// step5: collect get data sequence diff response | |||
for (long height = dsInfo.getHeight() + 1; height < diffResult.getMaxHeight() + 1; height++) { | |||
CallBackDataListener dsDiffResponse = dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST, diffResult.getMaxHeightSession(), height, height, callBackBarrierDiff); | |||
CallBackDataListener dsDiffResponse = dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST, responseSession, height, height, callBackBarrierDiff); | |||
dsDiffResponses.addLast(dsDiffResponse); | |||
} | |||
@@ -116,6 +127,7 @@ public class DSProcessManager { | |||
// | |||
// }); | |||
System.out.println("Wait CMD_GETDSDIFF_RESPONSE msg from targets!"); | |||
LinkedList<byte[]> receiveDiffResponses = new LinkedList<>(); | |||
if (callBackBarrierDiff.tryCall()) { | |||
for (int i = 0; i < dsDiffResponses.size(); i++) { | |||
@@ -125,13 +137,18 @@ public class DSProcessManager { | |||
} | |||
} | |||
} | |||
System.out.println("ReceiveDiffResponses size = "+ receiveDiffResponses.size()); | |||
// step6: process data sequence diff response, update local data sequence state | |||
DataSequenceElement[] dataSequenceElements = dsTransferProcess.computeDiffElement(receiveDiffResponses.toArray(new byte[receiveDiffResponses.size()][])); | |||
returnCode = dsWriter.updateDSInfo(dsInfo, dataSequenceElements); | |||
System.out.println("Compute diff elements!"); | |||
ArrayList<DataSequenceElement> dataSequenceElements = dsTransferProcess.computeDiffElement(receiveDiffResponses.toArray(new byte[receiveDiffResponses.size()][])); | |||
System.out.println("Update local data sequence!"); | |||
Collections.sort(dataSequenceElements, new DataSequenceComparator()); | |||
returnCode = dsWriter.updateDSInfo(dsInfo, dataSequenceElements.toArray(new DataSequenceElement[dataSequenceElements.size()])); | |||
// data sequence transfer complete, close all sessions, end process life cycle | |||
dsTransferProcess.close(); | |||
System.out.println("Close all sessions"); | |||
// dsTransferProcess.close(); | |||
dSProcessMap.remove(dsInfo.getId()); | |||
} | |||
@@ -142,7 +159,14 @@ public class DSProcessManager { | |||
return returnCode; | |||
} | |||
RemoteSession findResponseSession(RemoteNode remoteNode, RemoteSession[] remoteSessions) { | |||
for (RemoteSession remoteSession : remoteSessions) { | |||
if (remoteSession.remoteNode().equals(remoteNode)) { | |||
return remoteSession; | |||
} | |||
} | |||
return null; | |||
} | |||
/** | |||
* | |||
* | |||
@@ -15,6 +15,8 @@ import com.jd.blockchain.stp.communication.node.RemoteNode; | |||
import com.jd.blockchain.utils.IllegalDataException; | |||
import java.net.InetSocketAddress; | |||
import java.util.ArrayList; | |||
import java.util.LinkedList; | |||
import java.util.Map; | |||
/** | |||
@@ -78,12 +80,14 @@ public class DSTransferProcess { | |||
* | |||
* | |||
*/ | |||
public DataSequenceElement[] computeDiffElement(byte[][] diffArray) { | |||
DataSequenceElement[] dataSequenceElements = new DataSequenceElement[diffArray.length]; | |||
for (int i = 0 ; i < dataSequenceElements.length; i++) { | |||
public ArrayList<DataSequenceElement> computeDiffElement(byte[][] diffArray) { | |||
ArrayList<DataSequenceElement> dataSequenceElements = new ArrayList<>(); | |||
for (int i = 0 ; i < diffArray.length; i++) { | |||
Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(diffArray[i]); | |||
if (object instanceof DataSequenceElement) { | |||
dataSequenceElements[i] = (DataSequenceElement) object; | |||
dataSequenceElements.add((DataSequenceElement) object); | |||
} | |||
else { | |||
throw new IllegalDataException("Unknown instance object!"); | |||
@@ -97,27 +101,36 @@ public class DSTransferProcess { | |||
* | |||
* | |||
*/ | |||
public DSInfoResponseResult computeDiffInfo(Map<RemoteSession, byte[]> responseMap) { | |||
public DSInfoResponseResult computeDiffInfo(LinkedList<CallBackDataListener> receiveResponses) { | |||
long maxHeight = 0; | |||
RemoteSession maxHeightSession = null; | |||
for (RemoteSession remoteSession : responseMap.keySet()) { | |||
Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(responseMap.get(remoteSession)); | |||
if (object instanceof DataSequenceInfo) { | |||
DataSequenceInfo dsInfo = (DataSequenceInfo) object; | |||
long height = dsInfo.getHeight(); | |||
if (maxHeight < height) { | |||
maxHeight = height; | |||
maxHeightSession = remoteSession; | |||
} | |||
} | |||
else { | |||
throw new IllegalDataException("Unknown instance object!"); | |||
} | |||
RemoteNode maxHeightRemoteNode = null; | |||
System.out.println("ComputeDiffInfo receiveResponses size = "+ receiveResponses.size()); | |||
try { | |||
for (CallBackDataListener receiveResponse : receiveResponses) { | |||
Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(receiveResponse.getCallBackData()); | |||
// System.out.println("ComputeDiffInfo object = "+object); | |||
if (object instanceof DataSequenceInfo) { | |||
DataSequenceInfo dsInfo = (DataSequenceInfo) object; | |||
long height = dsInfo.getHeight(); | |||
// System.out.println("ComputeDiffInfo height = " +height); | |||
if (maxHeight < height) { | |||
maxHeight = height; | |||
maxHeightRemoteNode = receiveResponse.remoteNode(); | |||
} | |||
} | |||
else { | |||
throw new IllegalDataException("Unknown instance object!"); | |||
} | |||
} | |||
} catch (Exception e) { | |||
System.out.println(e.getMessage()); | |||
e.printStackTrace(); | |||
} | |||
return new DSInfoResponseResult(maxHeight, maxHeightSession); | |||
return new DSInfoResponseResult(maxHeight, maxHeightRemoteNode); | |||
} | |||
/** | |||
@@ -1,6 +1,7 @@ | |||
package com.jd.blockchain.statetransfer.result; | |||
import com.jd.blockchain.stp.communication.RemoteSession; | |||
import com.jd.blockchain.stp.communication.node.RemoteNode; | |||
/** | |||
* | |||
@@ -11,27 +12,27 @@ import com.jd.blockchain.stp.communication.RemoteSession; | |||
public class DSInfoResponseResult { | |||
long maxHeight; | |||
RemoteSession maxHeightSession; | |||
RemoteNode maxHeightRemoteNode; | |||
public DSInfoResponseResult(long maxHeight, RemoteSession maxHeightSession) { | |||
public DSInfoResponseResult(long maxHeight, RemoteNode maxHeightRemoteNode) { | |||
this.maxHeight = maxHeight; | |||
this.maxHeightSession = maxHeightSession; | |||
this.maxHeightRemoteNode = maxHeightRemoteNode; | |||
} | |||
public long getMaxHeight() { | |||
return maxHeight; | |||
} | |||
public RemoteSession getMaxHeightSession() { | |||
return maxHeightSession; | |||
public RemoteNode getMaxHeightRemoteNode() { | |||
return maxHeightRemoteNode; | |||
} | |||
public void setMaxHeight(long maxHeight) { | |||
this.maxHeight = maxHeight; | |||
} | |||
public void setMaxHeightSession(RemoteSession maxHeightSession) { | |||
this.maxHeightSession = maxHeightSession; | |||
public void setMaxHeightRemoteNode(RemoteNode maxHeightRemoteNode) { | |||
this.maxHeightRemoteNode = maxHeightRemoteNode; | |||
} | |||
} |
@@ -11,7 +11,7 @@ public class DataSequence { | |||
private InetSocketAddress address; | |||
private String id; | |||
private static LinkedList<DataSequenceElement> dataSequenceElements = new LinkedList<>(); | |||
private LinkedList<DataSequenceElement> dataSequenceElements = new LinkedList<>(); | |||
public DataSequence(InetSocketAddress address, String id) { | |||
@@ -22,8 +22,19 @@ public class DataSequenceWriterImpl implements DataSequenceWriter { | |||
@Override | |||
public int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement[] diffContents) { | |||
if (diffContents == null) { | |||
throw new IllegalArgumentException("Update diffContents is null!"); | |||
} | |||
System.out.println("Old data sequence state: "); | |||
System.out.println(" Current height = " + currDataSequence.getDataSequenceElements().getLast().getHeight()); | |||
currDataSequence.addElements(diffContents); | |||
System.out.println("Update diffContents is completed!"); | |||
System.out.println("New data sequence state: "); | |||
System.out.println(" Current height = " + currDataSequence.getDataSequenceElements().getLast().getHeight()); | |||
return DataSequenceErrorType.DATA_SEQUENCE_ELEMENT_HEIGHT_NORMAL.CODE; | |||
} | |||
@@ -2,8 +2,6 @@ package test.com.jd.blockchain.statetransfer; | |||
import com.jd.blockchain.statetransfer.DataSequenceElement; | |||
import com.jd.blockchain.statetransfer.DataSequenceInfo; | |||
import com.jd.blockchain.statetransfer.callback.DataSequenceReader; | |||
import com.jd.blockchain.statetransfer.callback.DataSequenceWriter; | |||
import com.jd.blockchain.statetransfer.process.DSProcessManager; | |||
import com.jd.blockchain.utils.codec.Base58Utils; | |||
import org.junit.Before; | |||
@@ -30,13 +28,7 @@ public class StateTransferLayerTest { | |||
private Random rand = new Random(); | |||
private String[] dataSequenceIds; | |||
private DSProcessManager dsProcessManager; | |||
private DataSequenceReader dataSequenceReader; | |||
private DataSequenceWriter dataSequenceWriter; | |||
private String[] dataSequenceIds = new String[DataSequenceNum]; | |||
private InetSocketAddress[] remoteNodeIps = new InetSocketAddress[nodesNum]; | |||
@@ -59,10 +51,6 @@ public class StateTransferLayerTest { | |||
dataSequenceIds[i] = Base58Utils.encode(idBytes); | |||
} | |||
// 创建数据序列处理管理者实例 | |||
dsProcessManager = new DSProcessManager(); | |||
// 准备好所有的远端结点,包括监听者 | |||
for (int i = 0; i < nodesNum; i++) { | |||
remoteNodeIps[i] = new InetSocketAddress(localIp, listenPorts[i]); | |||
@@ -80,8 +68,8 @@ public class StateTransferLayerTest { | |||
DataSequence dataSequence = new DataSequence(remoteNodeIps[i], id); | |||
// 为数据序列的0,1,2高度添加内容 | |||
for (int j = 0; j < 3; i++) { | |||
dataSequence.addElement(new DataSequenceElement(id, i, dsElementDatas)); | |||
for (int j = 0; j < 3; j++) { | |||
dataSequence.addElement(new DataSequenceElement(id, j, dsElementDatas)); | |||
} | |||
dataSequencesPerNode.addLast(dataSequence); | |||
} | |||
@@ -91,7 +79,7 @@ public class StateTransferLayerTest { | |||
DataSequence dataSequence = dataSequencesPerNode.get(i); | |||
if (dataSequence.getAddress().getPort() != listenPorts[0]) { | |||
// 为数据序列的3,4高度添加内容 | |||
for (int j = 3; j < 5; i++) { | |||
for (int j = 3; j < 5; j++) { | |||
dataSequence.addElement(new DataSequenceElement(id, j, dsElementDatas)); | |||
} | |||
} | |||
@@ -136,6 +124,8 @@ public class StateTransferLayerTest { | |||
for (int i = 0; i < nodesNum; i++) { | |||
InetSocketAddress listenNode = remoteNodeIps[i]; | |||
threadPool.execute(() -> { | |||
// 创建数据序列处理管理者实例 | |||
DSProcessManager dsProcessManager = new DSProcessManager(); | |||
DataSequence currDataSequence = findDataSequence(id, listenNode); | |||
DataSequenceInfo dsInfo = currDataSequence.getDSInfo(); | |||
InetSocketAddress[] targets = getTargetNodesIp(listenNode, remoteNodeIps); | |||
@@ -147,6 +137,7 @@ public class StateTransferLayerTest { | |||
// 等待数据序列更新完成 | |||
try { | |||
Thread.sleep(60000); | |||
countDownLatch.await(); | |||
} catch (Exception e) { | |||
e.printStackTrace(); | |||