From 27591f0fe93134e083583ca447e78aa6e7cdc942 Mon Sep 17 00:00:00 2001 From: zhangshuang Date: Wed, 24 Apr 2019 14:50:54 +0800 Subject: [PATCH] stp code debug --- .../message/DSDefaultMessageExecutor.java | 3 +- .../message/DataSequenceMsgEncoder.java | 10 +-- .../process/DSProcessManager.java | 66 +++++++++++++------ .../process/DSTransferProcess.java | 55 ++++++++++------ .../result/DSInfoResponseResult.java | 15 +++-- .../statetransfer/DataSequence.java | 2 +- .../statetransfer/DataSequenceWriterImpl.java | 11 ++++ .../statetransfer/StateTransferLayerTest.java | 23 ++----- 8 files changed, 112 insertions(+), 73 deletions(-) diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java index 258c5fdb..08464db2 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DSDefaultMessageExecutor.java @@ -35,9 +35,8 @@ public class DSDefaultMessageExecutor implements MessageExecutor { Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(data); if (object instanceof String) { - String id = (String)object; - byte[] respLoadMsg = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE, id, 0, 0); + byte[] respLoadMsg = DSMsgResolverFactory.getEncoder(dsWriter, dsReader).encode(DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE, id, 0, 0); session.reply(key, new DataSequenceLoadMessage(respLoadMsg)); } else if (object instanceof DSDiffRequestResult) { diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java index 9b1adad0..0de98605 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/message/DataSequenceMsgEncoder.java @@ -43,7 +43,7 @@ public class DataSequenceMsgEncoder { loadMessage = new byte[dataLength]; System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4); - System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize); + loadMessage[4] = msgType.CODE; System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize, 4); System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + 4, idSize); } else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST) { @@ -56,12 +56,12 @@ public class DataSequenceMsgEncoder { loadMessage = new byte[dataLength]; System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4); - System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize); + loadMessage[4] = msgType.CODE; System.arraycopy(BytesUtils.toBytes(fromHeight), 0, loadMessage, 4 + msgTypeSize, heightSize); System.arraycopy(BytesUtils.toBytes(toHeight), 0, loadMessage, 4 + msgTypeSize + heightSize, heightSize); System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize + heightSize + heightSize, 4); System.arraycopy(id.getBytes(), 0, loadMessage, 4 + msgTypeSize + heightSize + heightSize + 4, idSize); - } else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_RESPONSE) { + } else if (msgType == DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_RESPONSE) { // CMD_DSINFO_RESPONSE Message parts : 4 bytes total message size, 1 byte message type coe, 8 bytes data sequence local height, // 4 bytes id length, id content size bytes @@ -71,7 +71,7 @@ public class DataSequenceMsgEncoder { loadMessage = new byte[dataLength]; System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4); - System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize); + loadMessage[4] = msgType.CODE; System.arraycopy(BytesUtils.toBytes(dsReader.getDSInfo(id).getHeight()), 0, loadMessage, 4 + msgTypeSize, heightSize); System.arraycopy(BytesUtils.toBytes(idSize), 0, loadMessage, 4 + msgTypeSize + heightSize, 4); @@ -94,7 +94,7 @@ public class DataSequenceMsgEncoder { loadMessage = new byte[dataLength]; System.arraycopy(BytesUtils.toBytes(dataLength), 0, loadMessage, 0, 4); //total size - System.arraycopy(msgType.CODE, 0, loadMessage, 4, msgTypeSize); //msgType size + loadMessage[4] = msgType.CODE; //msgType size System.arraycopy(BytesUtils.toBytes(diffElem.length), 0, loadMessage, 4 + msgTypeSize, 4); // diffElem size System.arraycopy(diffElem, 0, loadMessage, 4 + msgTypeSize + 4, diffElem.length); // diffElem bytes } diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java index fecfa5c3..6440a842 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java @@ -4,6 +4,7 @@ import com.jd.blockchain.statetransfer.DataSequenceElement; import com.jd.blockchain.statetransfer.DataSequenceInfo; import com.jd.blockchain.statetransfer.callback.DataSequenceReader; import com.jd.blockchain.statetransfer.callback.DataSequenceWriter; +import com.jd.blockchain.statetransfer.comparator.DataSequenceComparator; import com.jd.blockchain.statetransfer.message.DSDefaultMessageExecutor; import com.jd.blockchain.statetransfer.result.DSInfoResponseResult; import com.jd.blockchain.stp.communication.RemoteSession; @@ -11,11 +12,11 @@ import com.jd.blockchain.stp.communication.callback.CallBackBarrier; import com.jd.blockchain.stp.communication.callback.CallBackDataListener; import com.jd.blockchain.stp.communication.manager.RemoteSessionManager; import com.jd.blockchain.stp.communication.node.LocalNode; +import com.jd.blockchain.stp.communication.node.RemoteNode; import com.jd.blockchain.utils.concurrent.CompletableAsyncFuture; import java.net.InetSocketAddress; -import java.util.LinkedList; -import java.util.Map; +import java.util.*; import java.util.concurrent.*; /** @@ -28,7 +29,7 @@ public class DSProcessManager { private static Map dSProcessMap = new ConcurrentHashMap<>(); private RemoteSession[] remoteSessions; - private long dsInfoResponseTimeout = 2000; + private long dsInfoResponseTimeout = 20000; private ExecutorService writeExecutors = Executors.newFixedThreadPool(5); private int returnCode = 0; /** @@ -52,6 +53,10 @@ public class DSProcessManager { dSProcessMap.put(dsInfo.getId(), dsTransferProcess); try { + + //wait all listener nodes start + Thread.sleep(10000); + // start network connections with targets dsTransferProcess.start(); @@ -62,52 +67,58 @@ public class DSProcessManager { CallBackBarrier callBackBarrier = CallBackBarrier.newCallBackBarrier(remoteSessions.length, dsInfoResponseTimeout); // response message manage map - Map dsInfoResponses = new ConcurrentHashMap<>(); + LinkedList dsInfoResponses = new LinkedList<>(); + System.out.println("Async send CMD_DSINFO_REQUEST msg to targets will start!"); // step1: send get dsInfo request, then hold for (RemoteSession remoteSession : remoteSessions) { - CallBackDataListener dsInfoResponse = dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_DSINFO_REQUEST, remoteSession, 0, 0, callBackBarrier); - - dsInfoResponses.put(remoteSession, dsInfoResponse); + dsInfoResponses.addLast(dsInfoResponse); } + System.out.println("Wait CMD_DSINFO_RESPONSE msg from targets!"); // step2: collect get dsInfo response - Map receiveResponses = new ConcurrentHashMap<>(); + LinkedList receiveResponses = new LinkedList<>(); if (callBackBarrier.tryCall()) { - for (RemoteSession remoteSession : dsInfoResponses.keySet()) { - CallBackDataListener asyncFuture = dsInfoResponses.get(remoteSession); - // if really done - if (asyncFuture.isDone()) { - receiveResponses.put(remoteSession, asyncFuture.getCallBackData()); + Iterator iterator = dsInfoResponses.iterator(); + while (iterator.hasNext()) { + CallBackDataListener receiveResponse = iterator.next(); + if (receiveResponse.isDone()) { + receiveResponses.addLast(receiveResponse); } } } + System.out.println("Compute diff info!"); // step3: process received responses DSInfoResponseResult diffResult = dsTransferProcess.computeDiffInfo(receiveResponses); + System.out.println("Diff info result height = " + diffResult.getMaxHeight() + "!"); + // height diff long diff = dsInfo.getHeight() - diffResult.getMaxHeight(); if (diff == 0 || diff > 0) { + System.out.println("No duplication is required!"); // no duplication is required, life cycle ends - dsTransferProcess.close(); +// dsTransferProcess.close(); dSProcessMap.remove(dsInfo.getId()); return returnCode; } else { - + System.out.println("Duplication is required!"); // step4: async send get data sequence diff request // single step get diff // async message send process CallBackBarrier callBackBarrierDiff = CallBackBarrier.newCallBackBarrier((int)(diffResult.getMaxHeight() - dsInfo.getHeight()), dsInfoResponseTimeout); LinkedList dsDiffResponses = new LinkedList<>(); + RemoteSession responseSession = findResponseSession(diffResult.getMaxHeightRemoteNode(), remoteSessions); + System.out.println("Async send CMD_GETDSDIFF_REQUEST msg to targets will start!"); // step5: collect get data sequence diff response for (long height = dsInfo.getHeight() + 1; height < diffResult.getMaxHeight() + 1; height++) { - CallBackDataListener dsDiffResponse = dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST, diffResult.getMaxHeightSession(), height, height, callBackBarrierDiff); + CallBackDataListener dsDiffResponse = dsTransferProcess.send(DSTransferProcess.DataSequenceMsgType.CMD_GETDSDIFF_REQUEST, responseSession, height, height, callBackBarrierDiff); dsDiffResponses.addLast(dsDiffResponse); } @@ -116,6 +127,7 @@ public class DSProcessManager { // // }); + System.out.println("Wait CMD_GETDSDIFF_RESPONSE msg from targets!"); LinkedList receiveDiffResponses = new LinkedList<>(); if (callBackBarrierDiff.tryCall()) { for (int i = 0; i < dsDiffResponses.size(); i++) { @@ -125,13 +137,18 @@ public class DSProcessManager { } } } + + System.out.println("ReceiveDiffResponses size = "+ receiveDiffResponses.size()); // step6: process data sequence diff response, update local data sequence state - DataSequenceElement[] dataSequenceElements = dsTransferProcess.computeDiffElement(receiveDiffResponses.toArray(new byte[receiveDiffResponses.size()][])); - returnCode = dsWriter.updateDSInfo(dsInfo, dataSequenceElements); + System.out.println("Compute diff elements!"); + ArrayList dataSequenceElements = dsTransferProcess.computeDiffElement(receiveDiffResponses.toArray(new byte[receiveDiffResponses.size()][])); + System.out.println("Update local data sequence!"); + Collections.sort(dataSequenceElements, new DataSequenceComparator()); + returnCode = dsWriter.updateDSInfo(dsInfo, dataSequenceElements.toArray(new DataSequenceElement[dataSequenceElements.size()])); // data sequence transfer complete, close all sessions, end process life cycle - - dsTransferProcess.close(); + System.out.println("Close all sessions"); +// dsTransferProcess.close(); dSProcessMap.remove(dsInfo.getId()); } @@ -142,7 +159,14 @@ public class DSProcessManager { return returnCode; } - + RemoteSession findResponseSession(RemoteNode remoteNode, RemoteSession[] remoteSessions) { + for (RemoteSession remoteSession : remoteSessions) { + if (remoteSession.remoteNode().equals(remoteNode)) { + return remoteSession; + } + } + return null; + } /** * * diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java index 90003712..b1e6b817 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSTransferProcess.java @@ -15,6 +15,8 @@ import com.jd.blockchain.stp.communication.node.RemoteNode; import com.jd.blockchain.utils.IllegalDataException; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.Map; /** @@ -78,12 +80,14 @@ public class DSTransferProcess { * * */ - public DataSequenceElement[] computeDiffElement(byte[][] diffArray) { - DataSequenceElement[] dataSequenceElements = new DataSequenceElement[diffArray.length]; - for (int i = 0 ; i < dataSequenceElements.length; i++) { + public ArrayList computeDiffElement(byte[][] diffArray) { + + ArrayList dataSequenceElements = new ArrayList<>(); + + for (int i = 0 ; i < diffArray.length; i++) { Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(diffArray[i]); if (object instanceof DataSequenceElement) { - dataSequenceElements[i] = (DataSequenceElement) object; + dataSequenceElements.add((DataSequenceElement) object); } else { throw new IllegalDataException("Unknown instance object!"); @@ -97,27 +101,36 @@ public class DSTransferProcess { * * */ - public DSInfoResponseResult computeDiffInfo(Map responseMap) { + public DSInfoResponseResult computeDiffInfo(LinkedList receiveResponses) { long maxHeight = 0; - RemoteSession maxHeightSession = null; - - for (RemoteSession remoteSession : responseMap.keySet()) { - Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(responseMap.get(remoteSession)); - if (object instanceof DataSequenceInfo) { - DataSequenceInfo dsInfo = (DataSequenceInfo) object; - long height = dsInfo.getHeight(); - if (maxHeight < height) { - maxHeight = height; - maxHeightSession = remoteSession; - } - } - else { - throw new IllegalDataException("Unknown instance object!"); - } + RemoteNode maxHeightRemoteNode = null; + + System.out.println("ComputeDiffInfo receiveResponses size = "+ receiveResponses.size()); + + try { + for (CallBackDataListener receiveResponse : receiveResponses) { + Object object = DSMsgResolverFactory.getDecoder(dsWriter, dsReader).decode(receiveResponse.getCallBackData()); +// System.out.println("ComputeDiffInfo object = "+object); + if (object instanceof DataSequenceInfo) { + DataSequenceInfo dsInfo = (DataSequenceInfo) object; + long height = dsInfo.getHeight(); +// System.out.println("ComputeDiffInfo height = " +height); + if (maxHeight < height) { + maxHeight = height; + maxHeightRemoteNode = receiveResponse.remoteNode(); + } + } + else { + throw new IllegalDataException("Unknown instance object!"); + } + } + } catch (Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); } - return new DSInfoResponseResult(maxHeight, maxHeightSession); + return new DSInfoResponseResult(maxHeight, maxHeightRemoteNode); } /** diff --git a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java index d0a57b26..9a7ea126 100644 --- a/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java +++ b/source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/result/DSInfoResponseResult.java @@ -1,6 +1,7 @@ package com.jd.blockchain.statetransfer.result; import com.jd.blockchain.stp.communication.RemoteSession; +import com.jd.blockchain.stp.communication.node.RemoteNode; /** * @@ -11,27 +12,27 @@ import com.jd.blockchain.stp.communication.RemoteSession; public class DSInfoResponseResult { long maxHeight; - RemoteSession maxHeightSession; + RemoteNode maxHeightRemoteNode; - public DSInfoResponseResult(long maxHeight, RemoteSession maxHeightSession) { + public DSInfoResponseResult(long maxHeight, RemoteNode maxHeightRemoteNode) { this.maxHeight = maxHeight; - this.maxHeightSession = maxHeightSession; + this.maxHeightRemoteNode = maxHeightRemoteNode; } public long getMaxHeight() { return maxHeight; } - public RemoteSession getMaxHeightSession() { - return maxHeightSession; + public RemoteNode getMaxHeightRemoteNode() { + return maxHeightRemoteNode; } public void setMaxHeight(long maxHeight) { this.maxHeight = maxHeight; } - public void setMaxHeightSession(RemoteSession maxHeightSession) { - this.maxHeightSession = maxHeightSession; + public void setMaxHeightRemoteNode(RemoteNode maxHeightRemoteNode) { + this.maxHeightRemoteNode = maxHeightRemoteNode; } } diff --git a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java index 4d138d04..ed41757b 100644 --- a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java +++ b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequence.java @@ -11,7 +11,7 @@ public class DataSequence { private InetSocketAddress address; private String id; - private static LinkedList dataSequenceElements = new LinkedList<>(); + private LinkedList dataSequenceElements = new LinkedList<>(); public DataSequence(InetSocketAddress address, String id) { diff --git a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java index 59ce5bbc..9da566c2 100644 --- a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java +++ b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/DataSequenceWriterImpl.java @@ -22,8 +22,19 @@ public class DataSequenceWriterImpl implements DataSequenceWriter { @Override public int updateDSInfo(DataSequenceInfo dsInfo, DataSequenceElement[] diffContents) { + if (diffContents == null) { + throw new IllegalArgumentException("Update diffContents is null!"); + } + + System.out.println("Old data sequence state: "); + System.out.println(" Current height = " + currDataSequence.getDataSequenceElements().getLast().getHeight()); currDataSequence.addElements(diffContents); + System.out.println("Update diffContents is completed!"); + System.out.println("New data sequence state: "); + System.out.println(" Current height = " + currDataSequence.getDataSequenceElements().getLast().getHeight()); + + return DataSequenceErrorType.DATA_SEQUENCE_ELEMENT_HEIGHT_NORMAL.CODE; } diff --git a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java index efcaa060..9a185d95 100644 --- a/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java +++ b/source/state-transfer/src/test/java/test/com/jd/blockchain/statetransfer/StateTransferLayerTest.java @@ -2,8 +2,6 @@ package test.com.jd.blockchain.statetransfer; import com.jd.blockchain.statetransfer.DataSequenceElement; import com.jd.blockchain.statetransfer.DataSequenceInfo; -import com.jd.blockchain.statetransfer.callback.DataSequenceReader; -import com.jd.blockchain.statetransfer.callback.DataSequenceWriter; import com.jd.blockchain.statetransfer.process.DSProcessManager; import com.jd.blockchain.utils.codec.Base58Utils; import org.junit.Before; @@ -30,13 +28,7 @@ public class StateTransferLayerTest { private Random rand = new Random(); - private String[] dataSequenceIds; - - private DSProcessManager dsProcessManager; - - private DataSequenceReader dataSequenceReader; - - private DataSequenceWriter dataSequenceWriter; + private String[] dataSequenceIds = new String[DataSequenceNum]; private InetSocketAddress[] remoteNodeIps = new InetSocketAddress[nodesNum]; @@ -59,10 +51,6 @@ public class StateTransferLayerTest { dataSequenceIds[i] = Base58Utils.encode(idBytes); } - // 创建数据序列处理管理者实例 - dsProcessManager = new DSProcessManager(); - - // 准备好所有的远端结点,包括监听者 for (int i = 0; i < nodesNum; i++) { remoteNodeIps[i] = new InetSocketAddress(localIp, listenPorts[i]); @@ -80,8 +68,8 @@ public class StateTransferLayerTest { DataSequence dataSequence = new DataSequence(remoteNodeIps[i], id); // 为数据序列的0,1,2高度添加内容 - for (int j = 0; j < 3; i++) { - dataSequence.addElement(new DataSequenceElement(id, i, dsElementDatas)); + for (int j = 0; j < 3; j++) { + dataSequence.addElement(new DataSequenceElement(id, j, dsElementDatas)); } dataSequencesPerNode.addLast(dataSequence); } @@ -91,7 +79,7 @@ public class StateTransferLayerTest { DataSequence dataSequence = dataSequencesPerNode.get(i); if (dataSequence.getAddress().getPort() != listenPorts[0]) { // 为数据序列的3,4高度添加内容 - for (int j = 3; j < 5; i++) { + for (int j = 3; j < 5; j++) { dataSequence.addElement(new DataSequenceElement(id, j, dsElementDatas)); } } @@ -136,6 +124,8 @@ public class StateTransferLayerTest { for (int i = 0; i < nodesNum; i++) { InetSocketAddress listenNode = remoteNodeIps[i]; threadPool.execute(() -> { + // 创建数据序列处理管理者实例 + DSProcessManager dsProcessManager = new DSProcessManager(); DataSequence currDataSequence = findDataSequence(id, listenNode); DataSequenceInfo dsInfo = currDataSequence.getDSInfo(); InetSocketAddress[] targets = getTargetNodesIp(listenNode, remoteNodeIps); @@ -147,6 +137,7 @@ public class StateTransferLayerTest { // 等待数据序列更新完成 try { + Thread.sleep(60000); countDownLatch.await(); } catch (Exception e) { e.printStackTrace();