Browse Source

add state transfer layer code

tags/1.0.0
zhangshuang 5 years ago
parent
commit
0b3b7f8109
7 changed files with 269 additions and 10 deletions
  1. +5
    -0
      source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/TransactionState.java
  2. +76
    -4
      source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java
  3. +145
    -3
      source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java
  4. +3
    -2
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java
  5. +24
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/comparator/DataSequenceComparator.java
  6. +15
    -0
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/exception/DataSequenceException.java
  7. +1
    -1
      source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java

+ 5
- 0
source/ledger/ledger-model/src/main/java/com/jd/blockchain/ledger/TransactionState.java View File

@@ -29,6 +29,11 @@ public enum TransactionState {
*/
LEDGER_ERROR((byte) 2),
/**
* 数据序列更新错误;
*/
DATA_SEQUENCE_UPDATE_ERROR((byte) 3),
/**
* 系统错误;
*/


+ 76
- 4
source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceReaderImpl.java View File

@@ -1,8 +1,19 @@
package com.jd.blockchain.peer.statetransfer;

import com.jd.blockchain.crypto.hash.HashDigest;
import com.jd.blockchain.ledger.LedgerBlock;
import com.jd.blockchain.ledger.core.LedgerManage;
import com.jd.blockchain.ledger.core.LedgerRepository;
import com.jd.blockchain.ledger.core.TransactionSet;
import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo;
import com.jd.blockchain.statetransfer.DataSequenceReader;
import com.jd.blockchain.statetransfer.callback.DataSequenceReader;
import com.jd.blockchain.storage.service.DbConnection;
import com.jd.blockchain.storage.service.DbConnectionFactory;
import com.jd.blockchain.tools.initializer.LedgerBindingConfig;
import com.jd.blockchain.utils.codec.Base58Utils;
import com.jd.blockchain.utils.codec.HexUtils;
import org.springframework.beans.factory.annotation.Autowired;

/**
*数据序列差异的提供者需要使用的回调接口实现类
@@ -13,13 +24,74 @@ import com.jd.blockchain.statetransfer.DataSequenceReader;
*/
public class DataSequenceReaderImpl implements DataSequenceReader {

private LedgerManage ledgerManager;

private DbConnectionFactory connFactory;

private LedgerBindingConfig config;

public DataSequenceReaderImpl(LedgerBindingConfig config, LedgerManage ledgerManager, DbConnectionFactory connFactory) {
this.config = config;
this.ledgerManager = ledgerManager;
this.connFactory = connFactory;
}


/**
*
*
*/
@Override
public DataSequenceInfo getDSInfo(String id) {
return null;

byte[] hashBytes = Base58Utils.decode(id);

HashDigest ledgerHash = new HashDigest(hashBytes);

LedgerBindingConfig.BindingConfig bindingConfig = config.getLedger(ledgerHash);
DbConnection dbConnNew = connFactory.connect(bindingConfig.getDbConnection().getUri(),
bindingConfig.getDbConnection().getPassword());
LedgerRepository ledgerRepository = ledgerManager.register(ledgerHash, dbConnNew.getStorageService());

return new DataSequenceInfo(id, ledgerRepository.getLatestBlockHeight());
}

/**
*
*
*/
@Override
public DataSequenceElement[] getDSContent(String id, long from, long to) {
return null;
public DataSequenceElement[] getDSDiffContent(String id, long from, long to) {

DataSequenceElement[] dataSequenceElements = new DataSequenceElement[(int)(to - from + 1)];
for (long i = from; i < to + 1; i++) {
dataSequenceElements[(int)(i - from)] = getDSDiffContent(id, i);
}

return dataSequenceElements;
}

/**
*
*
*/
@Override
public DataSequenceElement getDSDiffContent(String id, long height) {

byte[] hashBytes = Base58Utils.decode(id);

HashDigest ledgerHash = new HashDigest(hashBytes);

LedgerBindingConfig.BindingConfig bindingConfig = config.getLedger(ledgerHash);
DbConnection dbConnNew = connFactory.connect(bindingConfig.getDbConnection().getUri(),
bindingConfig.getDbConnection().getPassword());
LedgerRepository ledgerRepository = ledgerManager.register(ledgerHash, dbConnNew.getStorageService());

LedgerBlock ledgerBlock = ledgerRepository.getBlock(height);
TransactionSet transactionSet = ledgerRepository.getTransactionSet(ledgerBlock);
//todo


return null;
}
}

+ 145
- 3
source/peer/src/main/java/com/jd/blockchain/peer/statetransfer/DataSequenceWriterImpl.java View File

@@ -1,9 +1,14 @@
package com.jd.blockchain.peer.statetransfer;

import com.jd.blockchain.consensus.service.MessageHandle;
import com.jd.blockchain.ledger.TransactionState;
import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo;
import com.jd.blockchain.statetransfer.DataSequenceReader;
import com.jd.blockchain.statetransfer.DataSequenceWriter;
import com.jd.blockchain.statetransfer.callback.DataSequenceWriter;
import com.jd.blockchain.statetransfer.comparator.DataSequenceComparator;

import java.util.ArrayList;
import java.util.Collections;

/**
*数据序列差异的请求者需要使用的回调接口实现类
@@ -14,9 +19,146 @@ import com.jd.blockchain.statetransfer.DataSequenceWriter;
*/
public class DataSequenceWriterImpl implements DataSequenceWriter {

private long currHeight;
private ArrayList<DataSequenceElement> deceidedElements = new ArrayList<DataSequenceElement>();

private MessageHandle batchMessageHandle;


public DataSequenceWriterImpl(MessageHandle batchMessageHandle) {
this.batchMessageHandle = batchMessageHandle;
}

/**
* check height to data sequence diff elements
*
*/
private int checkElementsHeight(long currHeight, ArrayList<DataSequenceElement> dsUpdateElements) {
boolean lossMiddleElements = false;

// lose first element
if (currHeight + 1 < dsUpdateElements.get(0).getHeight()){
System.out.println("Diff response loss first element error!");
return DataSequenceErrorType.DATA_SEQUENCE_LOSS_FIRST_ELEMENT.CODE;
}
else {
for (int i = 0; i < dsUpdateElements.size(); i++) {
if (dsUpdateElements.get(i).getHeight() == currHeight + 1 + i) {
deceidedElements.add(dsUpdateElements.get(i));
}
// lose middle elements
else {
lossMiddleElements = true;
break;
}
}

if (lossMiddleElements) {
System.out.println("Diff response loss middle elements error!");
return DataSequenceErrorType.DATA_SEQUENCE_LOSS_MIDDLE_ELEMENT.CODE;
}

System.out.println("Diff response elements height normal!");
return DataSequenceErrorType.DATA_SEQUENCE_ELEMENT_HEIGHT_NORMAL.CODE;
}

}

/**
*
*
*/
private void exeUpdate(String realmName) {
for (int i = 0; i < deceidedElements.size(); i++) {
byte[][] element = deceidedElements.get(i).getData();

String batchId = batchMessageHandle.beginBatch(realmName);
try {
int msgId = 0;
for (byte[] txContent : element) {
batchMessageHandle.processOrdered(msgId++, txContent, realmName, batchId);
}
batchMessageHandle.completeBatch(realmName, batchId);
batchMessageHandle.commitBatch(realmName, batchId);
} catch (Exception e) {
// todo 需要处理应答码 404
batchMessageHandle.rollbackBatch(realmName, batchId, TransactionState.DATA_SEQUENCE_UPDATE_ERROR.CODE);
}
}

}

/**
*
*
*/
@Override
public void updateDSInfo(String id, DataSequenceElement[] diffContents) {
public int updateDSInfo(DataSequenceInfo id, DataSequenceElement[] diffContents) {
int result = 0;

try {
ArrayList<DataSequenceElement> dsUpdateElements = new ArrayList<DataSequenceElement>();
//remove unexpected elements
for (int i = 0 ; i < diffContents.length; i++) {
if (diffContents[i].getId().equals(id.getId())) {
dsUpdateElements.add(diffContents[i]);
}
}

// sort elements by height
Collections.sort(dsUpdateElements, new DataSequenceComparator());

currHeight = id.getHeight();

// check element's height
result = checkElementsHeight(currHeight, dsUpdateElements);

// cann't exe update
if (result == DataSequenceErrorType.DATA_SEQUENCE_LOSS_FIRST_ELEMENT.CODE) {
return result;
}
// exe elements update
else {
exeUpdate(id.getId());
return result;
}
} catch (Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
}

return result;
}

@Override
public int updateDSInfo(DataSequenceInfo id, DataSequenceElement diffContents) {
return 0;
}


/**
* data sequence transfer error type
*
*/
public enum DataSequenceErrorType {
DATA_SEQUENCE_LOSS_FIRST_ELEMENT((byte) 0x1),
DATA_SEQUENCE_LOSS_MIDDLE_ELEMENT((byte) 0x2),
DATA_SEQUENCE_ELEMENT_HEIGHT_NORMAL((byte) 0x3),
;
public final int CODE;

private DataSequenceErrorType(byte code) {
this.CODE = code;
}

public static DataSequenceErrorType valueOf(byte code) {
for (DataSequenceErrorType errorType : DataSequenceErrorType.values()) {
if (errorType.CODE == code) {
return errorType;
}
}
throw new IllegalArgumentException("Unsupported code[" + code + "] of errorType!");
}
}

}

+ 3
- 2
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/callback/DataSequenceWriter.java View File

@@ -1,6 +1,7 @@
package com.jd.blockchain.statetransfer.callback;

import com.jd.blockchain.statetransfer.DataSequenceElement;
import com.jd.blockchain.statetransfer.DataSequenceInfo;

/**
*数据序列差异的请求者获得差异内容后需要回调该接口,通过接口提供的方法对指定数据序列执行差异内容的重放,并更新数据序列的当前状态;
@@ -14,12 +15,12 @@ public interface DataSequenceWriter {
*更新数据序列的当前状态,一次更新多个高度的差异
* return void
*/
int updateDSInfo(String id, DataSequenceElement[] diffContents);
int updateDSInfo(DataSequenceInfo id, DataSequenceElement[] diffContents);

/**
*更新数据序列的当前状态,一次更新一个高度的差异
* return void
*/
int updateDSInfo(String id, DataSequenceElement diffContents);
int updateDSInfo(DataSequenceInfo id, DataSequenceElement diffContents);

}

+ 24
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/comparator/DataSequenceComparator.java View File

@@ -0,0 +1,24 @@
package com.jd.blockchain.statetransfer.comparator;

import com.jd.blockchain.statetransfer.DataSequenceElement;

import java.util.Comparator;

/**
*
*
*/
public class DataSequenceComparator implements Comparator<DataSequenceElement> {

// sort by data sequence height
@Override
public int compare(DataSequenceElement o1, DataSequenceElement o2) {
long height1;
long height2;

height1 = o1.getHeight();
height2 = o2.getHeight();

return (int) (height1 - height2);
}
}

+ 15
- 0
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/exception/DataSequenceException.java View File

@@ -0,0 +1,15 @@
package com.jd.blockchain.statetransfer.exception;

public class DataSequenceException extends RuntimeException {

private static final long serialVersionUID = -4090881296855827889L;


public DataSequenceException(String message) {
super(message);
}
public DataSequenceException(String message, Throwable cause) {
super(message, cause);
}

}

+ 1
- 1
source/state-transfer/src/main/java/com/jd/blockchain/statetransfer/process/DSProcessManager.java View File

@@ -127,7 +127,7 @@ public class DSProcessManager {
}
// step6: process data sequence diff response, update local data sequence state
DataSequenceElement[] dataSequenceElements = dsTransferProcess.computeDiffElement(receiveDiffResponses.toArray(new byte[receiveDiffResponses.size()][]));
returnCode = dsWriter.updateDSInfo(dsInfo.getId(), dataSequenceElements);
returnCode = dsWriter.updateDSInfo(dsInfo, dataSequenceElements);

// data sequence transfer complete, close all sessions, end process life cycle



Loading…
Cancel
Save