Browse Source

Refactored dataset of accounts;

tags/1.1.2^2
huanghaiquan 4 years ago
parent
commit
e3436dfb25
33 changed files with 878 additions and 604 deletions
  1. +4
    -4
      source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/AccountDecorator.java
  2. +8
    -8
      source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/ContractAccount.java
  3. +3
    -4
      source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/LedgerAccount.java
  4. +10
    -10
      source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/LedgerQueryService.java
  5. +264
    -198
      source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/MerkleAccount.java
  6. +35
    -50
      source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/MerkleAccountSet.java
  7. +1
    -1
      source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/MerkleDataCluster.java
  8. +2
    -2
      source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/MerkleDataEntry.java
  9. +66
    -29
      source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/MerkleDataSet.java
  10. +3
    -3
      source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/RolePrivilegeDataset.java
  11. +6
    -10
      source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/UserAccount.java
  12. +3
    -3
      source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/UserRoleDataset.java
  13. +2
    -1
      source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/handles/DataAccountKVSetOperationHandle.java
  14. +2
    -2
      source/ledger/ledger-core/src/test/java/test/com/jd/blockchain/ledger/core/AccountSetTest.java
  15. +6
    -6
      source/ledger/ledger-core/src/test/java/test/com/jd/blockchain/ledger/core/BaseAccountTest.java
  16. +1
    -1
      source/ledger/ledger-core/src/test/java/test/com/jd/blockchain/ledger/core/ContractInvokingTest.java
  17. +20
    -30
      source/ledger/ledger-core/src/test/java/test/com/jd/blockchain/ledger/core/LedgerAccountTest.java
  18. +3
    -11
      source/ledger/ledger-core/src/test/java/test/com/jd/blockchain/ledger/core/LedgerEditorTest.java
  19. +44
    -37
      source/ledger/ledger-core/src/test/java/test/com/jd/blockchain/ledger/core/MerkleDataSetTest.java
  20. +8
    -8
      source/ledger/ledger-core/src/test/java/test/com/jd/blockchain/ledger/core/TransactionBatchProcessorTest.java
  21. +3
    -3
      source/storage/storage-redis/src/main/java/com/jd/blockchain/storage/service/impl/redis/RedisVerioningStorage.java
  22. +3
    -3
      source/storage/storage-rocksdb/src/main/java/com/jd/blockchain/storage/service/impl/rocksdb/RocksDBVersioningStorage.java
  23. +2
    -2
      source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/VersioningKVStorage.java
  24. +4
    -4
      source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/utils/BufferedKVStorage.java
  25. +2
    -2
      source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/utils/MemoryKVStorage.java
  26. +2
    -2
      source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/utils/VersioningKVData.java
  27. +3
    -3
      source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/utils/VersioningKVStorageMap.java
  28. +46
    -24
      source/test/test-integration/src/test/java/test/com/jd/blockchain/intgr/IntegrationTestAll4Redis.java
  29. +17
    -0
      source/utils/utils-common/src/main/java/com/jd/blockchain/utils/DataEntry.java
  30. +4
    -4
      source/utils/utils-common/src/main/java/com/jd/blockchain/utils/Dataset.java
  31. +301
    -0
      source/utils/utils-common/src/main/java/com/jd/blockchain/utils/DatasetHelper.java
  32. +0
    -122
      source/utils/utils-common/src/main/java/com/jd/blockchain/utils/RegionMap.java
  33. +0
    -17
      source/utils/utils-common/src/main/java/com/jd/blockchain/utils/VersioningKVEntry.java

+ 4
- 4
source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/AccountDecorator.java View File

@@ -2,11 +2,11 @@ package com.jd.blockchain.ledger.core;

import com.jd.blockchain.crypto.HashDigest;
import com.jd.blockchain.ledger.BlockchainIdentity;
import com.jd.blockchain.ledger.BytesValue;
import com.jd.blockchain.ledger.HashProof;
import com.jd.blockchain.ledger.MerkleSnapshot;
import com.jd.blockchain.ledger.TypedValue;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.VersioningMap;
import com.jd.blockchain.utils.Dataset;

public class AccountDecorator implements LedgerAccount, HashProvable, MerkleSnapshot{
@@ -16,7 +16,7 @@ public class AccountDecorator implements LedgerAccount, HashProvable, MerkleSnap
this.mklAccount = mklAccount;
}
protected VersioningMap<Bytes, BytesValue> getHeaders() {
protected Dataset<String, TypedValue> getHeaders() {
return mklAccount.getHeaders();
}

@@ -37,7 +37,7 @@ public class AccountDecorator implements LedgerAccount, HashProvable, MerkleSnap
}

@Override
public VersioningMap<Bytes, BytesValue> getDataset() {
public Dataset<String, TypedValue> getDataset() {
return mklAccount.getDataset();
}



+ 8
- 8
source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/ContractAccount.java View File

@@ -8,9 +8,9 @@ import com.jd.blockchain.utils.Bytes;
public class ContractAccount extends AccountDecorator implements ContractInfo {
private static final Bytes CONTRACT_INFO_PREFIX = Bytes.fromString("INFO" + LedgerConsts.KEY_SEPERATOR);
private static final String CONTRACT_INFO_PREFIX = "INFO" + LedgerConsts.KEY_SEPERATOR;
private static final Bytes CHAIN_CODE_KEY = Bytes.fromString("CHAIN-CODE");
private static final String CHAIN_CODE_KEY = "CHAIN-CODE";
public ContractAccount(MerkleAccount mklAccount) {
super(mklAccount);
@@ -35,7 +35,7 @@ public class ContractAccount extends AccountDecorator implements ContractInfo {
// }
public long setChaincode(byte[] chaincode, long version) {
BytesValue bytesValue = TypedValue.fromBytes(chaincode);
TypedValue bytesValue = TypedValue.fromBytes(chaincode);
return getHeaders().setValue(CHAIN_CODE_KEY, bytesValue, version);
}
@@ -51,22 +51,22 @@ public class ContractAccount extends AccountDecorator implements ContractInfo {
return getHeaders().getVersion(CHAIN_CODE_KEY);
}
public long setProperty(Bytes key, String value, long version) {
BytesValue bytesValue = TypedValue.fromText(value);
public long setProperty(String key, String value, long version) {
TypedValue bytesValue = TypedValue.fromText(value);
return getHeaders().setValue(encodePropertyKey(key), bytesValue, version);
}
public String getProperty(Bytes key) {
public String getProperty(String key) {
BytesValue bytesValue = getHeaders().getValue(encodePropertyKey(key));
return TypedValue.wrap(bytesValue).stringValue();
}
public String getProperty(Bytes key, long version) {
public String getProperty(String key, long version) {
BytesValue bytesValue = getHeaders().getValue(encodePropertyKey(key), version);
return TypedValue.wrap(bytesValue).stringValue();
}
private Bytes encodePropertyKey(Bytes key) {
private String encodePropertyKey(String key) {
return CONTRACT_INFO_PREFIX.concat(key);
}

+ 3
- 4
source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/LedgerAccount.java View File

@@ -1,14 +1,13 @@
package com.jd.blockchain.ledger.core;

import com.jd.blockchain.ledger.BlockchainIdentity;
import com.jd.blockchain.ledger.BytesValue;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.VersioningMap;
import com.jd.blockchain.ledger.TypedValue;
import com.jd.blockchain.utils.Dataset;

public interface LedgerAccount {
BlockchainIdentity getID();
VersioningMap<Bytes, BytesValue> getDataset();
Dataset<String, TypedValue> getDataset();
}

+ 10
- 10
source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/LedgerQueryService.java View File

@@ -29,16 +29,16 @@ import com.jd.blockchain.utils.QueryUtil;
public class LedgerQueryService implements BlockchainQueryService {

private static final KVDataEntry[] EMPTY_ENTRIES = new KVDataEntry[0];
private HashDigest[] ledgerHashs;

private LedgerQuery ledger;

public LedgerQueryService(LedgerQuery ledger) {
this.ledger = ledger;
this.ledgerHashs = new HashDigest[] {ledger.getHash()};
this.ledgerHashs = new HashDigest[] { ledger.getHash() };
}
private void checkLedgerHash(HashDigest ledgerHash) {
if (!ledgerHashs[0].equals(ledgerHash)) {
throw new LedgerException("Unsupport cross chain query!");
@@ -59,7 +59,7 @@ public class LedgerQueryService implements BlockchainQueryService {
ledgerInfo.setLatestBlockHeight(ledger.getLatestBlockHeight());
return ledgerInfo;
}
@Override
public LedgerAdminInfo getLedgerAdminInfo(HashDigest ledgerHash) {
checkLedgerHash(ledgerHash);
@@ -345,12 +345,12 @@ public class LedgerQueryService implements BlockchainQueryService {
if (ver < 0) {
entries[i] = new KVDataObject(keys[i], -1, null);
} else {
if (dataAccount.getDataset().getDataEntriesTotalCount() == 0
|| dataAccount.getBytes(Bytes.fromString(keys[i]), ver) == null) {
if (dataAccount.getDataset().getDataCount() == 0
|| dataAccount.getDataset().getValue(keys[i], ver) == null) {
// is the address is not exist; the result is null;
entries[i] = new KVDataObject(keys[i], -1, null);
} else {
BytesValue value = dataAccount.getBytes(Bytes.fromString(keys[i]), ver);
BytesValue value = dataAccount.getDataset().getValue(keys[i], ver);
entries[i] = new KVDataObject(keys[i], ver, value);
}
}
@@ -366,8 +366,8 @@ public class LedgerQueryService implements BlockchainQueryService {
DataAccountQuery dataAccountSet = ledger.getDataAccountSet(block);
DataAccount dataAccount = dataAccountSet.getAccount(Bytes.fromBase58(address));

int pages[] = QueryUtil.calFromIndexAndCount(fromIndex, count, (int) dataAccount.getDataEntriesTotalCount());
return dataAccount.getDataEntries(pages[0], pages[1]);
int pages[] = QueryUtil.calFromIndexAndCount(fromIndex, count, (int) dataAccount.getDataset().getDataCount());
return dataAccount.getDataset()..getDataEntries(pages[0], pages[1]);
}

@Override
@@ -377,7 +377,7 @@ public class LedgerQueryService implements BlockchainQueryService {
DataAccountQuery dataAccountSet = ledger.getDataAccountSet(block);
DataAccount dataAccount = dataAccountSet.getAccount(Bytes.fromBase58(address));

return dataAccount.getDataEntriesTotalCount();
return dataAccount.getDataset().getDataCount();
}

@Override


+ 264
- 198
source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/MerkleAccount.java View File

@@ -14,9 +14,11 @@ import com.jd.blockchain.ledger.TypedValue;
import com.jd.blockchain.storage.service.ExPolicyKVStorage;
import com.jd.blockchain.storage.service.VersioningKVStorage;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.Dataset;
import com.jd.blockchain.utils.DatasetHelper;
import com.jd.blockchain.utils.DatasetHelper.DataChangedListener;
import com.jd.blockchain.utils.DatasetHelper.TypeMapper;
import com.jd.blockchain.utils.Transactional;
import com.jd.blockchain.utils.VersioningKVEntry;
import com.jd.blockchain.utils.VersioningMap;

/**
* 事务性的基础账户;
@@ -29,21 +31,25 @@ public class MerkleAccount implements LedgerAccount, HashProvable, MerkleSnapsho
private static final Bytes HEADER_PREFIX = Bytes.fromString("HD/");
private static final Bytes DATA_PREFIX = Bytes.fromString("DT/");

private static final Bytes KEY_PUBKEY = Bytes.fromString("PUBKEY");

private static final Bytes KEY_HEADER_ROOT = Bytes.fromString("HEADER");

private static final Bytes KEY_DATA_ROOT = Bytes.fromString("DATA");

private static final String KEY_PUBKEY = "PUBKEY";

private BlockchainIdentity accountID;

private MerkleDataSet rootDS;
private MerkleDataSet rootDataset;

private MerkleDataSet headerDataset;

private MerkleDatasetAdapter headerDS;
private MerkleDataSet dataDataset;

private MerkleDatasetAdapter dataDS;
private Dataset<String, TypedValue> typedHeader;

protected long version;
private Dataset<String, TypedValue> typedData;

private long version;

/**
* Create a new Account with the specified address and pubkey; <br>
@@ -103,40 +109,58 @@ public class MerkleAccount implements LedgerAccount, HashProvable, MerkleSnapsho
*/
private MerkleAccount(Bytes address, PubKey pubKey, long version, HashDigest rootHash, CryptoSetting cryptoSetting,
Bytes keyPrefix, ExPolicyKVStorage exStorage, VersioningKVStorage verStorage, boolean readonly) {
// 初始化账户的身份;
this.accountID = new AutoloadingID(address, pubKey);

this.version = version;

// 加载“根数据集”
this.rootDS = new MerkleDataSet(rootHash, cryptoSetting, keyPrefix, exStorage, verStorage, readonly);
this.rootDataset = new MerkleDataSet(rootHash, cryptoSetting, keyPrefix, exStorage, verStorage, readonly);

// 初始化数据修改监听器;
DataChangedListener dataChangedListener = new DataChangedListener() {
DataChangedListener<String, TypedValue> dataChangedListener = new DataChangedListener<String, TypedValue>() {
@Override
public void onChanged(String key, TypedValue value, long expectedVersion, long newVersion) {
onUpdated(key, value, expectedVersion, newVersion);
}
};

TypeMapper<byte[], TypedValue> valueMapper = new TypeMapper<byte[], TypedValue>() {

@Override
public byte[] encode(TypedValue t2) {
return BinaryProtocol.encode(t2, BytesValue.class);
}

@Override
public void onChanged(Bytes key, BytesValue value, long newVersion) {
onUpdated(keyPrefix, value, newVersion);
public TypedValue decode(byte[] t1) {
BytesValue v = BinaryProtocol.decodeAs(t1, BytesValue.class);
return TypedValue.wrap(v);
}
};

// 加载“头数据集”;
HashDigest headerRoot = loadHeaderRoot();
Bytes headerPrefix = keyPrefix.concat(HEADER_PREFIX);
MerkleDataSet headerDataset = new MerkleDataSet(headerRoot, cryptoSetting, headerPrefix, exStorage, verStorage,
this.headerDataset = new MerkleDataSet(headerRoot, cryptoSetting, headerPrefix, exStorage, verStorage,
readonly);
this.headerDS = new MerkleDatasetAdapter(headerDataset, dataChangedListener);
this.typedHeader = DatasetHelper.listen(DatasetHelper.map(headerDataset, valueMapper), dataChangedListener);

// 加载“主数据集”
HashDigest dataRoot = loadDataRoot();
Bytes dataPrefix = keyPrefix.concat(DATA_PREFIX);
MerkleDataSet dataDataset = new MerkleDataSet(dataRoot, cryptoSetting, dataPrefix, exStorage, verStorage,
readonly);
this.dataDS = new MerkleDatasetAdapter(dataDataset, dataChangedListener);
this.dataDataset = new MerkleDataSet(dataRoot, cryptoSetting, dataPrefix, exStorage, verStorage, readonly);
this.typedData = DatasetHelper.listen(DatasetHelper.map(dataDataset, valueMapper), dataChangedListener);

// 初始化账户的身份;
if (pubKey == null) {
if (version < 0) {
throw new IllegalArgumentException("Specified a null PubKey for newly Account!");
}
pubKey = loadPubKey();
}
this.accountID = new AccountID(address, pubKey);
}

private HashDigest loadHeaderRoot() {
byte[] hashBytes = rootDS.getValue(KEY_HEADER_ROOT);
byte[] hashBytes = rootDataset.getValue(KEY_HEADER_ROOT);
if (hashBytes == null) {
return null;
}
@@ -144,7 +168,7 @@ public class MerkleAccount implements LedgerAccount, HashProvable, MerkleSnapsho
}

private HashDigest loadDataRoot() {
byte[] hashBytes = rootDS.getValue(KEY_DATA_ROOT);
byte[] hashBytes = rootDataset.getValue(KEY_DATA_ROOT);
if (hashBytes == null) {
return null;
}
@@ -152,11 +176,11 @@ public class MerkleAccount implements LedgerAccount, HashProvable, MerkleSnapsho
}

private long getHeaderRootVersion() {
return rootDS.getVersion(KEY_HEADER_ROOT);
return rootDataset.getVersion(KEY_HEADER_ROOT);
}

private long getDataRootVersion() {
return rootDS.getVersion(KEY_DATA_ROOT);
return rootDataset.getVersion(KEY_DATA_ROOT);
}

public Bytes getAddress() {
@@ -172,13 +196,13 @@ public class MerkleAccount implements LedgerAccount, HashProvable, MerkleSnapsho
return accountID;
}

protected VersioningMap<Bytes, BytesValue> getHeaders() {
return headerDS;
protected Dataset<String, TypedValue> getHeaders() {
return typedHeader;
}

@Override
public VersioningMap<Bytes, BytesValue> getDataset() {
return dataDS;
public Dataset<String, TypedValue> getDataset() {
return typedData;
}

/*
@@ -188,16 +212,16 @@ public class MerkleAccount implements LedgerAccount, HashProvable, MerkleSnapsho
*/
@Override
public HashDigest getRootHash() {
return rootDS.getRootHash();
return rootDataset.getRootHash();
}

@Override
public HashProof getProof(Bytes key) {
MerkleProof dataProof = dataDS.getDataset().getProof(key);
MerkleProof dataProof = dataDataset.getProof(key);
if (dataProof == null) {
return null;
}
MerkleProof rootProof = rootDS.getProof(KEY_DATA_ROOT);
MerkleProof rootProof = rootDataset.getProof(KEY_DATA_ROOT);
if (rootProof == null) {
return null;
}
@@ -212,11 +236,7 @@ public class MerkleAccount implements LedgerAccount, HashProvable, MerkleSnapsho
* @return
*/
public boolean isReadonly() {
return dataDS.getDataset().isReadonly();
}

public long getDataCount() {
return dataDS.getDataset().getDataCount();
return dataDataset.isReadonly() || headerDataset.isReadonly();
}

/**
@@ -225,7 +245,7 @@ public class MerkleAccount implements LedgerAccount, HashProvable, MerkleSnapsho
* @param pubKey
*/
private void initPubKey(PubKey pubKey) {
long v = headerDS.setValue(KEY_PUBKEY, TypedValue.fromPubKey(pubKey), -1);
long v = typedHeader.setValue(KEY_PUBKEY, TypedValue.fromPubKey(pubKey), -1);
if (v < 0) {
throw new LedgerException("PubKey storage conflict!");
}
@@ -237,8 +257,8 @@ public class MerkleAccount implements LedgerAccount, HashProvable, MerkleSnapsho
* @return
*/
private PubKey loadPubKey() {
BytesValue bytesValue = headerDS.getValue(KEY_PUBKEY);
return TypedValue.wrap(bytesValue).pubKeyValue();
TypedValue value = typedHeader.getValue(KEY_PUBKEY);
return value.pubKeyValue();
}

/**
@@ -248,7 +268,7 @@ public class MerkleAccount implements LedgerAccount, HashProvable, MerkleSnapsho
* @param value
* @param newVersion
*/
protected void onUpdated(Bytes key, BytesValue value, long newVersion) {
protected void onUpdated(String key, TypedValue value, long expectedVersion, long newVersion) {
}

/**
@@ -266,44 +286,44 @@ public class MerkleAccount implements LedgerAccount, HashProvable, MerkleSnapsho

@Override
public boolean isUpdated() {
return dataDS.getDataset().isUpdated() || headerDS.getDataset().isUpdated() || rootDS.isUpdated();
return headerDataset.isUpdated() || dataDataset.isUpdated() || rootDataset.isUpdated();
}

@Override
public void commit() {
if (headerDS.dataset.isUpdated()) {
headerDS.getDataset().commit();
if (headerDataset.isUpdated()) {
headerDataset.commit();
long version = getHeaderRootVersion();
rootDS.setValue(KEY_HEADER_ROOT, headerDS.dataset.getRootHash().toBytes(), version);
rootDataset.setValue(KEY_HEADER_ROOT, headerDataset.getRootHash().toBytes(), version);
}
if (dataDS.dataset.isUpdated()) {
if (dataDataset.isUpdated()) {
long version = getDataRootVersion();
dataDS.getDataset().commit();
rootDS.setValue(KEY_DATA_ROOT, dataDS.dataset.getRootHash().toBytes(), version);
dataDataset.commit();
rootDataset.setValue(KEY_DATA_ROOT, dataDataset.getRootHash().toBytes(), version);
}

if (rootDS.isUpdated()) {
rootDS.commit();
this.version = onCommited(rootDS.getRootHash(), version);
if (rootDataset.isUpdated()) {
rootDataset.commit();
this.version = onCommited(rootDataset.getRootHash(), version);
}
}

@Override
public void cancel() {
headerDS.getDataset().cancel();
dataDS.getDataset().cancel();
rootDS.cancel();
headerDataset.cancel();
dataDataset.cancel();
rootDataset.cancel();
}

// ----------------------

private class AutoloadingID implements BlockchainIdentity {
private class AccountID implements BlockchainIdentity {

private Bytes address;

private PubKey pubKey;

public AutoloadingID(Bytes address, PubKey pubKey) {
public AccountID(Bytes address, PubKey pubKey) {
this.address = address;
this.pubKey = pubKey;
}
@@ -315,155 +335,201 @@ public class MerkleAccount implements LedgerAccount, HashProvable, MerkleSnapsho

@Override
public PubKey getPubKey() {
if (pubKey == null) {
pubKey = loadPubKey();
}
return pubKey;
}

}

private static class MerkleDatasetAdapter implements VersioningMap<Bytes, BytesValue> {

private static DataChangedListener NULL_LISTENER = new DataChangedListener() {
@Override
public void onChanged(Bytes key, BytesValue value, long newVersion) {
}
};

private DataChangedListener changedListener;

private MerkleDataSet dataset;

public MerkleDataSet getDataset() {
return dataset;
}

public MerkleDatasetAdapter(MerkleDataSet dataset) {
this(dataset, NULL_LISTENER);
}

public MerkleDatasetAdapter(MerkleDataSet dataset, DataChangedListener listener) {
this.dataset = dataset;
this.changedListener = listener == null ? NULL_LISTENER : listener;
}

@Override
public VersioningKVEntry<Bytes, BytesValue> getDataEntry(Bytes key) {
return new VersioningKVEntryWraper(dataset.getDataEntry(key));
}

@Override
public VersioningKVEntry<Bytes, BytesValue> getDataEntry(Bytes key, long version) {
return new VersioningKVEntryWraper(dataset.getDataEntry(key, version));
}

/**
* Create or update the value associated the specified key if the version
* checking is passed.<br>
*
* The value of the key will be updated only if it's latest version equals the
* specified version argument. <br>
* If the key doesn't exist, the version checking will be ignored, and key will
* be created with a new sequence number as id. <br>
* It also could specify the version argument to -1 to ignore the version
* checking.
* <p>
* If updating is performed, the version of the key increase by 1. <br>
* If creating is performed, the version of the key initialize by 0. <br>
*
* @param key The key of data;
* @param value The value of data;
* @param version The expected version of the key.
* @return The new version of the key. <br>
* If the key is new created success, then return 0; <br>
* If the key is updated success, then return the new version;<br>
* If this operation fail by version checking or other reason, then
* return -1;
*/
@Override
public long setValue(Bytes key, BytesValue value, long version) {
byte[] bytesValue = BinaryProtocol.encode(value, BytesValue.class);
long v = dataset.setValue(key, bytesValue, version);
if (v > -1) {
changedListener.onChanged(key, value, v);
}
return v;
}

/**
* Return the latest version entry associated the specified key; If the key
* doesn't exist, then return -1;
*
* @param key
* @return
*/
@Override
public long getVersion(Bytes key) {
return dataset.getVersion(key);
}

/**
* return the latest version's value;
*
* @param key
* @return return null if not exist;
*/
@Override
public BytesValue getValue(Bytes key) {
byte[] bytesValue = dataset.getValue(key);
if (bytesValue == null) {
return null;
}
return BinaryProtocol.decodeAs(bytesValue, BytesValue.class);
}

/**
* Return the specified version's value;
*
* @param key
* @param version
* @return return null if not exist;
*/
@Override
public BytesValue getValue(Bytes key, long version) {
byte[] bytesValue = dataset.getValue(key, version);
if (bytesValue == null) {
return null;
}
return BinaryProtocol.decodeAs(bytesValue, BytesValue.class);
}
}

private static interface DataChangedListener {

void onChanged(Bytes key, BytesValue value, long newVersion);

public long getVersion() {
return version;
}

private static class VersioningKVEntryWraper implements VersioningKVEntry<Bytes, BytesValue> {

private VersioningKVEntry<Bytes, byte[]> kv;

public VersioningKVEntryWraper(VersioningKVEntry<Bytes, byte[]> kv) {
this.kv = kv;
}

@Override
public Bytes getKey() {
return kv.getKey();
}

@Override
public long getVersion() {
return kv.getVersion();
}

@Override
public BytesValue getValue() {
return BinaryProtocol.decodeAs(kv.getValue(), BytesValue.class);
}

}
// private static class MerkleDatasetAdapter implements Dataset<String, BytesValue> {
//
// private static DataChangedListener NULL_LISTENER = new DataChangedListener() {
// @Override
// public void onChanged(Bytes key, BytesValue value, long newVersion) {
// }
// };
//
// private DataChangedListener changedListener;
//
// private MerkleDataSet dataset;
//
// public MerkleDataSet getDataset() {
// return dataset;
// }
//
// @SuppressWarnings("unused")
// public MerkleDatasetAdapter(MerkleDataSet dataset) {
// this(dataset, NULL_LISTENER);
// }
//
// public MerkleDatasetAdapter(MerkleDataSet dataset, DataChangedListener listener) {
// this.dataset = dataset;
// this.changedListener = listener == null ? NULL_LISTENER : listener;
// }
//
// @Override
// public DataEntry<String, BytesValue> getDataEntry(String key) {
// return new VersioningKVEntryWraper(dataset.getDataEntry(Bytes.fromString(key)));
// }
//
// @Override
// public DataEntry<String, BytesValue> getDataEntry(String key, long version) {
// return new VersioningKVEntryWraper(dataset.getDataEntry(Bytes.fromString(key), version));
// }
//
// /**
// * Create or update the value associated the specified key if the version
// * checking is passed.<br>
// *
// * The value of the key will be updated only if it's latest version equals the
// * specified version argument. <br>
// * If the key doesn't exist, the version checking will be ignored, and key will
// * be created with a new sequence number as id. <br>
// * It also could specify the version argument to -1 to ignore the version
// * checking.
// * <p>
// * If updating is performed, the version of the key increase by 1. <br>
// * If creating is performed, the version of the key initialize by 0. <br>
// *
// * @param key The key of data;
// * @param value The value of data;
// * @param version The expected version of the key.
// * @return The new version of the key. <br>
// * If the key is new created success, then return 0; <br>
// * If the key is updated success, then return the new version;<br>
// * If this operation fail by version checking or other reason, then
// * return -1;
// */
// @Override
// public long setValue(Bytes key, BytesValue value, long version) {
// byte[] bytesValue = BinaryProtocol.encode(value, BytesValue.class);
// long v = dataset.setValue(key, bytesValue, version);
// if (v > -1) {
// changedListener.onChanged(key, value, v);
// }
// return v;
// }
//
// /**
// * Return the latest version entry associated the specified key; If the key
// * doesn't exist, then return -1;
// *
// * @param key
// * @return
// */
// @Override
// public long getVersion(Bytes key) {
// return dataset.getVersion(key);
// }
//
// /**
// * return the latest version's value;
// *
// * @param key
// * @return return null if not exist;
// */
// @Override
// public BytesValue getValue(Bytes key) {
// byte[] bytesValue = dataset.getValue(key);
// if (bytesValue == null) {
// return null;
// }
// return BinaryProtocol.decodeAs(bytesValue, BytesValue.class);
// }
//
// /**
// * Return the specified version's value;
// *
// * @param key
// * @param version
// * @return return null if not exist;
// */
// @Override
// public BytesValue getValue(Bytes key, long version) {
// byte[] bytesValue = dataset.getValue(key, version);
// if (bytesValue == null) {
// return null;
// }
// return BinaryProtocol.decodeAs(bytesValue, BytesValue.class);
// }
//
// @Override
// public long getDataCount() {
// return dataset.getDataCount();
// }
//
// @Override
// public long setValue(String key, BytesValue value, long version) {
// byte[] bytesValue = BinaryProtocol.encode(value, BytesValue.class);
// return dataset.setValue(key, bytesValue, version);
// }
//
// @Override
// public BytesValue getValue(String key, long version) {
// byte[] bytesValue = dataset.getValue(key, version);
// if (bytesValue == null) {
// return null;
// }
// return BinaryProtocol.decodeAs(bytesValue, BytesValue.class);
// }
//
// @Override
// public BytesValue getValue(String key) {
// byte[] bytesValue = dataset.getValue(key);
// if (bytesValue == null) {
// return null;
// }
// return BinaryProtocol.decodeAs(bytesValue, BytesValue.class);
// }
//
// @Override
// public long getVersion(String key) {
// return dataset.getVersion(key);
// }
//
// @Override
// public DataEntry<String, BytesValue> getDataEntry(String key) {
// return new VersioningKVEntryWraper<String>(dataset.getDataEntry(key));
// }
//
// @Override
// public DataEntry<String, BytesValue> getDataEntry(String key, long version) {
// return new VersioningKVEntryWraper<String>(dataset.getDataEntry(key, version));
// }
// }

// private static interface DataChangedListener {
//
// void onChanged(Bytes key, BytesValue value, long newVersion);
//
// }

// private static class VersioningKVEntryWraper implements DataEntry<String, BytesValue> {
//
// private DataEntry<Bytes, byte[]> kv;
//
// public VersioningKVEntryWraper(DataEntry<Bytes, byte[]> kv) {
// this.kv = kv;
// }
//
// @Override
// public String getKey() {
// return kv.getKey().toUTF8String();
// }
//
// @Override
// public long getVersion() {
// return kv.getVersion();
// }
//
// @Override
// public BytesValue getValue() {
// return BinaryProtocol.decodeAs(kv.getValue(), BytesValue.class);
// }
//
// }

}

+ 35
- 50
source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/MerkleAccountSet.java View File

@@ -3,27 +3,25 @@ package com.jd.blockchain.ledger.core;
import java.util.HashMap;
import java.util.Map;
import com.jd.blockchain.binaryproto.BinaryProtocol;
import com.jd.blockchain.binaryproto.DataContractRegistry;
import com.jd.blockchain.crypto.AddressEncoding;
import com.jd.blockchain.crypto.HashDigest;
import com.jd.blockchain.crypto.PubKey;
import com.jd.blockchain.ledger.BlockchainIdentity;
import com.jd.blockchain.ledger.BlockchainIdentityData;
import com.jd.blockchain.ledger.BytesValue;
import com.jd.blockchain.ledger.CryptoSetting;
import com.jd.blockchain.ledger.LedgerException;
import com.jd.blockchain.ledger.MerkleProof;
import com.jd.blockchain.ledger.MerkleSnapshot;
import com.jd.blockchain.ledger.TypedValue;
import com.jd.blockchain.storage.service.ExPolicyKVStorage;
import com.jd.blockchain.storage.service.VersioningKVStorage;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.DataEntry;
import com.jd.blockchain.utils.Transactional;
public class MerkleAccountSet implements Transactional, MerkleProvable, AccountQuery<MerkleAccount> {
// private static final Bytes ACCOUNT_ROOT_PREFIX = Bytes.fromString("ROOT/");
static {
DataContractRegistry.register(MerkleSnapshot.class);
DataContractRegistry.register(BlockchainIdentity.class);
@@ -92,28 +90,14 @@ public class MerkleAccountSet implements Transactional, MerkleProvable, AccountQ
@Override
public BlockchainIdentity[] getHeaders(int fromIndex, int count) {
byte[][] results = merkleDataset.getLatestValues(fromIndex, count);
DataEntry<Bytes, byte[]>[] results = merkleDataset.getLatestDataEntries(fromIndex, count);
BlockchainIdentity[] accounts = new BlockchainIdentity[results.length];
BlockchainIdentity[] ids = new BlockchainIdentity[results.length];
for (int i = 0; i < results.length; i++) {
accounts[i] = deserialize(results[i]);
InnerMerkleAccount account = createAccount(results[i].getKey(), new HashDigest(results[i].getValue()), results[i].getVersion(), true);
ids[i] = account.getID();
}
return accounts;
}
// private VersioningAccount deserialize(byte[] txBytes) {
//// return BinaryEncodingUtils.decode(txBytes, null, Account.class);
// AccountHeaderData accInfo = BinaryEncodingUtils.decode(txBytes);
//// return new BaseAccount(accInfo.getAddress(), accInfo.getPubKey(), null,
// cryptoSetting,
//// baseExStorage, baseVerStorage, true, accessPolicy);
// return new VersioningAccount(accInfo.getAddress(), accInfo.getPubKey(),
// accInfo.getRootHash(), cryptoSetting,
// keyPrefix, baseExStorage, baseVerStorage, true, accessPolicy, accInfo.);
// }
private BlockchainIdentity deserialize(byte[] txBytes) {
return BinaryProtocol.decodeAs(txBytes, BlockchainIdentity.class);
return ids;
}
/**
@@ -158,9 +142,9 @@ public class MerkleAccountSet implements Transactional, MerkleProvable, AccountQ
/**
* 返回指定账户的版本; <br>
* 如果账户已经注册,则返回该账户的最新版本,值大于等于 0; <br>
* 如果账户不存在,则返回 -1; <br>
* 如果指定的账户已经注册(通过 {@link #register(String, PubKey)} 方法),但尚未提交(通过
* {@link #commit()} 方法),此方法对该账户仍然返回 0;
* 如果账户不存在,则返回 -1;<br>
* 如果账户已经注册(通过 {@link #register(String, PubKey)} 方法),但尚未提交(通过 {@link #commit()}
* 方法),则返回 -1; <br>
*
* @param address
* @return
@@ -169,7 +153,7 @@ public class MerkleAccountSet implements Transactional, MerkleProvable, AccountQ
InnerMerkleAccount acc = latestAccountsCache.get(address);
if (acc != null) {
// 已注册尚未提交,也返回 -1;
return acc.version == -1 ? 0 : acc.version;
return acc.getVersion();
}
return merkleDataset.getVersion(address);
@@ -189,7 +173,7 @@ public class MerkleAccountSet implements Transactional, MerkleProvable, AccountQ
InnerMerkleAccount acc = latestAccountsCache.get(address);
if (acc != null && version == -1) {
return acc;
} else if (acc != null && acc.version == version) {
} else if (acc != null && acc.getVersion() == version) {
return acc;
}
@@ -203,7 +187,7 @@ public class MerkleAccountSet implements Transactional, MerkleProvable, AccountQ
}
// 如果是不存在的,或者刚刚新增未提交的账户,则前面一步查询到的 latestVersion 小于 0, 代码不会执行到此;
if (acc != null && acc.version != latestVersion) {
if (acc != null && acc.getVersion() != latestVersion) {
// 当执行到此处时,并且缓冲列表中缓存了最新的版本,
// 如果当前缓存的最新账户的版本和刚刚从存储中检索得到的最新版本不一致,可能存在外部的并发更新,这超出了系统设计的逻辑;
@@ -256,7 +240,7 @@ public class MerkleAccountSet implements Transactional, MerkleProvable, AccountQ
InnerMerkleAccount cachedAcc = latestAccountsCache.get(address);
if (cachedAcc != null) {
if (cachedAcc.version < 0) {
if (cachedAcc.getVersion() < 0) {
// 同一个新账户已经注册,但尚未提交,所以重复注册不会引起任何变化;
return cachedAcc;
}
@@ -272,14 +256,6 @@ public class MerkleAccountSet implements Transactional, MerkleProvable, AccountQ
throw new LedgerException("Account Registering was rejected for the access policy!");
}
// String prefix = address.concat(LedgerConsts.KEY_SEPERATOR);
// ExPolicyKVStorage accExStorage = PrefixAppender.prefix(prefix,
// baseExStorage);
// VersioningKVStorage accVerStorage = PrefixAppender.prefix(prefix,
// baseVerStorage);
// BaseAccount accDS = createInstance(address, pubKey, cryptoSetting,
// accExStorage, accVerStorage);
Bytes prefix = keyPrefix.concat(address);
InnerMerkleAccount acc = createInstance(accountId, cryptoSetting, prefix);
latestAccountsCache.put(address, acc);
@@ -300,17 +276,21 @@ public class MerkleAccountSet implements Transactional, MerkleProvable, AccountQ
}
private InnerMerkleAccount loadAccount(Bytes address, boolean readonly, long version) {
// prefix;
Bytes prefix = keyPrefix.concat(address);
byte[] rootHashBytes = merkleDataset.getValue(address, version);
if (rootHashBytes == null) {
return null;
}
HashDigest rootHash = new HashDigest(rootHashBytes);
return createAccount(address, rootHash, version, readonly);
}
private InnerMerkleAccount createAccount(Bytes address,HashDigest rootHash, long version, boolean readonly) {
// prefix;
Bytes prefix = keyPrefix.concat(address);
return new InnerMerkleAccount(address, version, rootHash, cryptoSetting, prefix, baseExStorage, baseVerStorage,
readonly);
}
// TODO:优化:区块链身份(地址+公钥)与其Merkle树根哈希分开独立存储;
@@ -324,14 +304,9 @@ public class MerkleAccountSet implements Transactional, MerkleProvable, AccountQ
*/
private long saveAccount(InnerMerkleAccount account) {
// 提交更改,更新哈希;
long version = account.version;
account.commit();
long newVersion = merkleDataset.setValue(account.getAddress(), account.getRootHash().toBytes(), version);
if (newVersion < 0) {
// Update fail;
throw new LedgerException("Account updating fail! --[Address=" + account.getAddress() + "]");
}
return newVersion;
return account.getVersion();
}
@Override
@@ -347,7 +322,7 @@ public class MerkleAccountSet implements Transactional, MerkleProvable, AccountQ
try {
for (InnerMerkleAccount acc : latestAccountsCache.values()) {
// updated or new created;
if (acc.isUpdated() || acc.version < 0) {
if (acc.isUpdated() || acc.getVersion() < 0) {
saveAccount(acc);
}
}
@@ -394,10 +369,20 @@ public class MerkleAccountSet implements Transactional, MerkleProvable, AccountQ
}
@Override
protected void onUpdated(Bytes key, BytesValue value, long newVersion) {
protected void onUpdated(String key, TypedValue value, long expectedVersion, long newVersion) {
updated = true;
}
@Override
protected long onCommited(HashDigest newRootHash, long currentVersion) {
long newVersion = merkleDataset.setValue(this.getAddress(), newRootHash.toBytes(), currentVersion);
if (newVersion < 0) {
// Update fail;
throw new LedgerException("Account updating fail! --[Address=" + this.getAddress() + "]");
}
return newVersion;
}
}
}

+ 1
- 1
source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/MerkleDataCluster.java View File

@@ -10,7 +10,7 @@ import com.jd.blockchain.storage.service.ExPolicyKVStorage;
import com.jd.blockchain.storage.service.VersioningKVStorage;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.Transactional;
import com.jd.blockchain.utils.VersioningMap;
import com.jd.blockchain.utils.Dataset;

public class MerkleDataCluster implements Transactional, MerkleSnapshot {



+ 2
- 2
source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/MerkleDataEntry.java View File

@@ -2,11 +2,11 @@ package com.jd.blockchain.ledger.core;

import com.jd.blockchain.ledger.MerkleProof;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.VersioningKVEntry;
import com.jd.blockchain.utils.DataEntry;

public interface MerkleDataEntry {
VersioningKVEntry<Bytes, byte[]> getData();
DataEntry<Bytes, byte[]> getData();
MerkleProof getProof();
}

+ 66
- 29
source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/MerkleDataSet.java View File

@@ -12,8 +12,8 @@ import com.jd.blockchain.storage.service.utils.BufferedKVStorage;
import com.jd.blockchain.storage.service.utils.VersioningKVData;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.Transactional;
import com.jd.blockchain.utils.VersioningKVEntry;
import com.jd.blockchain.utils.VersioningMap;
import com.jd.blockchain.utils.DataEntry;
import com.jd.blockchain.utils.Dataset;
import com.jd.blockchain.utils.io.BytesUtils;
/**
@@ -24,7 +24,7 @@ import com.jd.blockchain.utils.io.BytesUtils;
* @author huanghaiquan
*
*/
public class MerkleDataSet implements Transactional, MerkleProvable, VersioningMap<Bytes, byte[]> {
public class MerkleDataSet implements Transactional, MerkleProvable, Dataset<Bytes, byte[]> {
/**
* 4 MB MaxSize of value;
@@ -70,6 +70,18 @@ public class MerkleDataSet implements Transactional, MerkleProvable, VersioningM
*/
public MerkleDataSet(CryptoSetting setting, String keyPrefix, ExPolicyKVStorage exPolicyStorage,
VersioningKVStorage versioningStorage) {
this(setting, Bytes.fromString(keyPrefix), exPolicyStorage, versioningStorage);
}
/**
* 创建一个新的 MerkleDataSet;
*
* @param setting 密码设置;
* @param exPolicyStorage 默克尔树的存储;
* @param versioningStorage 数据的存储;
*/
public MerkleDataSet(CryptoSetting setting, Bytes keyPrefix, ExPolicyKVStorage exPolicyStorage,
VersioningKVStorage versioningStorage) {
// 缓冲对KV的写入;
this.bufferedStorage = new BufferedKVStorage(exPolicyStorage, versioningStorage, false);
@@ -78,20 +90,34 @@ public class MerkleDataSet implements Transactional, MerkleProvable, VersioningM
// bufferedStorage);
// this.snStorage = PrefixAppender.prefix(SN_PREFIX, (ExPolicyKVStorage)
// bufferedStorage);
snKeyPrefix = Bytes.fromString(keyPrefix + SN_PREFIX);
dataKeyPrefix = Bytes.fromString(keyPrefix + DATA_PREFIX);
snKeyPrefix = keyPrefix.concat(SN_PREFIX);
dataKeyPrefix = keyPrefix.concat(DATA_PREFIX);
this.valueStorage = bufferedStorage;
this.snStorage = bufferedStorage;
// MerkleTree 本身是可缓冲的;
// ExPolicyKVStorage merkleTreeStorage =
// PrefixAppender.prefix(MERKLE_TREE_PREFIX, exPolicyStorage);
merkleKeyPrefix = Bytes.fromString(keyPrefix + MERKLE_TREE_PREFIX);
merkleKeyPrefix = keyPrefix.concat(MERKLE_TREE_PREFIX);
ExPolicyKVStorage merkleTreeStorage = exPolicyStorage;
this.merkleTree = new MerkleTree(setting, merkleKeyPrefix, merkleTreeStorage);
this.snGenerator = new MerkleSequenceSNGenerator(merkleTree);
}
/**
* 从指定的 Merkle 根构建的 MerkleDataSet;
*
* @param dataStorage
* @param defaultMerkleHashAlgorithm
* @param verifyMerkleHashOnLoad
* @param merkleTreeStorage
* @param snGenerator
*/
public MerkleDataSet(HashDigest merkleRootHash, CryptoSetting setting, String keyPrefix,
ExPolicyKVStorage exPolicyStorage, VersioningKVStorage versioningStorage, boolean readonly) {
this(merkleRootHash, setting, Bytes.fromString(keyPrefix), exPolicyStorage, versioningStorage, readonly);
}
/**
* 从指定的 Merkle 根构建的 MerkleDataSet;
*
@@ -131,6 +157,7 @@ public class MerkleDataSet implements Transactional, MerkleProvable, VersioningM
this.readonly = true;
}
@Override
public long getDataCount() {
return merkleTree.getDataCount();
}
@@ -155,7 +182,7 @@ public class MerkleDataSet implements Transactional, MerkleProvable, VersioningM
return values;
}
public VersioningKVEntry<Bytes, byte[]>[] getLatestDataEntries(int fromIndex, int count) {
public DataEntry<Bytes, byte[]>[] getLatestDataEntries(int fromIndex, int count) {
if (count > LedgerConsts.MAX_LIST_COUNT) {
throw new IllegalArgumentException("Count exceed the upper limit[" + LedgerConsts.MAX_LIST_COUNT + "]!");
}
@@ -163,7 +190,7 @@ public class MerkleDataSet implements Transactional, MerkleProvable, VersioningM
throw new IllegalArgumentException("Index out of bound!");
}
@SuppressWarnings("unchecked")
VersioningKVEntry<Bytes, byte[]>[] values = new VersioningKVEntry[count];
DataEntry<Bytes, byte[]>[] values = new DataEntry[count];
byte[] bytesValue;
for (int i = 0; i < count; i++) {
MerkleDataNode dataNode = merkleTree.getData(fromIndex + i);
@@ -194,7 +221,7 @@ public class MerkleDataSet implements Transactional, MerkleProvable, VersioningM
*/
public String getKeyAtIndex(int fromIndex) {
MerkleDataNode dataNode = merkleTree.getData(fromIndex);
//TODO: 未去掉前缀;
// TODO: 未去掉前缀;
return dataNode.getKey().toUTF8String();
}
@@ -428,8 +455,9 @@ public class MerkleDataSet implements Transactional, MerkleProvable, VersioningM
return getMerkleVersion(key);
}
// public VersioningKVEntry getDataEntry(String key) {
// return getDataEntry(Bytes.fromString(key));
// @Override
// public VersioningKVEntry<String, byte[]> getDataEntry(String key) {
// return getDataEntry(key, -1);
// }
/**
@@ -438,21 +466,30 @@ public class MerkleDataSet implements Transactional, MerkleProvable, VersioningM
* @return Null if the key doesn't exist!
*/
@Override
public VersioningKVEntry<Bytes, byte[]> getDataEntry(Bytes key) {
long latestVersion = getMerkleVersion(key);
if (latestVersion < 0) {
return null;
}
Bytes dataKey = encodeDataKey(key);
byte[] value = valueStorage.get(dataKey, latestVersion);
if (value == null) {
return null;
}
return new VersioningKVData<Bytes, byte[]>(key, latestVersion, value);
public DataEntry<Bytes, byte[]> getDataEntry(Bytes key) {
return getDataEntry(key, -1);
}
// @Override
// public VersioningKVEntry<String, byte[]> getDataEntry(String key, long version) {
// Bytes keyBytes = Bytes.fromString(key);
// long latestVersion = getMerkleVersion(keyBytes);
// if (latestVersion < 0 || version > latestVersion) {
// // key not exist, or the specified version is out of the latest version indexed
// // by the current merkletree;
// return null;
// }
// version = version < 0 ? latestVersion : version;
// Bytes dataKey = encodeDataKey(keyBytes);
// byte[] value = valueStorage.get(dataKey, version);
// if (value == null) {
// return null;
// }
// return new VersioningKVData<String, byte[]>(key, version, value);
// }
@Override
public VersioningKVEntry<Bytes, byte[]> getDataEntry(Bytes key, long version) {
public DataEntry<Bytes, byte[]> getDataEntry(Bytes key, long version) {
long latestVersion = getMerkleVersion(key);
if (latestVersion < 0 || version > latestVersion) {
// key not exist, or the specified version is out of the latest version indexed
@@ -469,7 +506,7 @@ public class MerkleDataSet implements Transactional, MerkleProvable, VersioningM
}
public MerkleDataEntry getMerkleEntry(Bytes key, long version) {
VersioningKVEntry<Bytes, byte[]> dataEntry = getDataEntry(key, version);
DataEntry<Bytes, byte[]> dataEntry = getDataEntry(key, version);
if (dataEntry == null) {
return null;
}
@@ -478,7 +515,7 @@ public class MerkleDataSet implements Transactional, MerkleProvable, VersioningM
}
public MerkleDataEntry getMerkleEntry(Bytes key) {
VersioningKVEntry<Bytes, byte[]> dataEntry = getDataEntry(key);
DataEntry<Bytes, byte[]> dataEntry = getDataEntry(key);
if (dataEntry == null) {
return null;
}
@@ -505,23 +542,23 @@ public class MerkleDataSet implements Transactional, MerkleProvable, VersioningM
}
/**
* A wrapper for {@link VersioningKVEntry} and {@link MerkleProof};
* A wrapper for {@link DataEntry} and {@link MerkleProof};
*
* @author huanghaiquan
*
*/
private static class MerkleDataEntryWrapper implements MerkleDataEntry {
private VersioningKVEntry<Bytes, byte[]> data;
private DataEntry<Bytes, byte[]> data;
private MerkleProof proof;
public MerkleDataEntryWrapper(VersioningKVEntry<Bytes, byte[]> data, MerkleProof proof) {
public MerkleDataEntryWrapper(DataEntry<Bytes, byte[]> data, MerkleProof proof) {
this.data = data;
this.proof = proof;
}
@Override
public VersioningKVEntry<Bytes, byte[]> getData() {
public DataEntry<Bytes, byte[]> getData() {
return data;
}


+ 3
- 3
source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/RolePrivilegeDataset.java View File

@@ -17,7 +17,7 @@ import com.jd.blockchain.storage.service.ExPolicyKVStorage;
import com.jd.blockchain.storage.service.VersioningKVStorage;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.Transactional;
import com.jd.blockchain.utils.VersioningKVEntry;
import com.jd.blockchain.utils.DataEntry;

public class RolePrivilegeDataset implements Transactional, MerkleProvable, RolePrivilegeSettings {

@@ -256,7 +256,7 @@ public class RolePrivilegeDataset implements Transactional, MerkleProvable, Role
public RolePrivileges getRolePrivilege(String roleName) {
// 只返回最新版本;
Bytes key = encodeKey(roleName);
VersioningKVEntry<Bytes, byte[]> kv = dataset.getDataEntry(key);
DataEntry<Bytes, byte[]> kv = dataset.getDataEntry(key);
if (kv == null) {
return null;
}
@@ -266,7 +266,7 @@ public class RolePrivilegeDataset implements Transactional, MerkleProvable, Role

@Override
public RolePrivileges[] getRolePrivileges(int index, int count) {
VersioningKVEntry<Bytes, byte[]>[] kvEntries = dataset.getLatestDataEntries(index, count);
DataEntry<Bytes, byte[]>[] kvEntries = dataset.getLatestDataEntries(index, count);
RolePrivileges[] pns = new RolePrivileges[kvEntries.length];
PrivilegeSet privilege;
for (int i = 0; i < pns.length; i++) {


+ 6
- 10
source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/UserAccount.java View File

@@ -15,9 +15,9 @@ import com.jd.blockchain.utils.Bytes;
*/
public class UserAccount extends AccountDecorator implements UserInfo { // implements UserInfo {
private static final Bytes USER_INFO_PREFIX = Bytes.fromString("PROP" + LedgerConsts.KEY_SEPERATOR);
private static final String USER_INFO_PREFIX = "PROP" + LedgerConsts.KEY_SEPERATOR;
private static final Bytes DATA_PUB_KEY = Bytes.fromString("DATA-PUBKEY");
private static final String DATA_PUB_KEY = "DATA-PUBKEY";
public UserAccount(MerkleAccount mklAccount) {
super(mklAccount);
@@ -64,25 +64,21 @@ public class UserAccount extends AccountDecorator implements UserInfo { // imple
}
public long setProperty(String key, String value, long version) {
return setProperty(Bytes.fromString(key), value, version);
}
public long setProperty(Bytes key, String value, long version) {
return getHeaders().setValue(encodePropertyKey(key), TypedValue.fromText(value), version);
}
public String getProperty(Bytes key) {
public String getProperty(String key) {
BytesValue value = getHeaders().getValue(encodePropertyKey(key));
return value == null ? null : value.getValue().toUTF8String();
}
public String getProperty(Bytes key, long version) {
public String getProperty(String key, long version) {
BytesValue value = getHeaders().getValue(encodePropertyKey(key), version);
return value == null ? null : value.getValue().toUTF8String();
}
private Bytes encodePropertyKey(Bytes key) {
return USER_INFO_PREFIX.concat(key);
private String encodePropertyKey(String key) {
return USER_INFO_PREFIX+key;
}

+ 3
- 3
source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/UserRoleDataset.java View File

@@ -16,7 +16,7 @@ import com.jd.blockchain.storage.service.ExPolicyKVStorage;
import com.jd.blockchain.storage.service.VersioningKVStorage;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.Transactional;
import com.jd.blockchain.utils.VersioningKVEntry;
import com.jd.blockchain.utils.DataEntry;

/**
* User-Role authorization data set;
@@ -168,7 +168,7 @@ public class UserRoleDataset implements Transactional, MerkleProvable, UserAutho
@Override
public UserRoles getUserRoles(Bytes userAddress) {
// 只返回最新版本;
VersioningKVEntry<Bytes, byte[]> kv = dataset.getDataEntry(userAddress);
DataEntry<Bytes, byte[]> kv = dataset.getDataEntry(userAddress);
if (kv == null) {
return null;
}
@@ -178,7 +178,7 @@ public class UserRoleDataset implements Transactional, MerkleProvable, UserAutho

@Override
public UserRoles[] getUserRoles() {
VersioningKVEntry<Bytes, byte[]>[] kvEntries = dataset.getLatestDataEntries(0, (int) dataset.getDataCount());
DataEntry<Bytes, byte[]>[] kvEntries = dataset.getLatestDataEntries(0, (int) dataset.getDataCount());
UserRoles[] pns = new UserRoles[kvEntries.length];
RoleSet roleset;
for (int i = 0; i < pns.length; i++) {


+ 2
- 1
source/ledger/ledger-core/src/main/java/com/jd/blockchain/ledger/core/handles/DataAccountKVSetOperationHandle.java View File

@@ -5,6 +5,7 @@ import com.jd.blockchain.ledger.DataAccountKVSetOperation;
import com.jd.blockchain.ledger.DataAccountKVSetOperation.KVWriteEntry;
import com.jd.blockchain.ledger.DataVersionConflictException;
import com.jd.blockchain.ledger.LedgerPermission;
import com.jd.blockchain.ledger.TypedValue;
import com.jd.blockchain.ledger.core.DataAccount;
import com.jd.blockchain.ledger.core.LedgerDataset;
import com.jd.blockchain.ledger.core.LedgerQuery;
@@ -37,7 +38,7 @@ public class DataAccountKVSetOperationHandle extends AbstractLedgerOperationHand
KVWriteEntry[] writeSet = kvWriteOp.getWriteSet();
long v = -1L;
for (KVWriteEntry kvw : writeSet) {
v = account.getDataset().setValue(Bytes.fromString(kvw.getKey()), kvw.getValue(), kvw.getExpectedVersion());
v = account.getDataset().setValue(kvw.getKey(), TypedValue.wrap(kvw.getValue()), kvw.getExpectedVersion());
if (v < 0) {
throw new DataVersionConflictException();
}


+ 2
- 2
source/ledger/ledger-core/src/test/java/test/com/jd/blockchain/ledger/core/AccountSetTest.java View File

@@ -14,9 +14,9 @@ import com.jd.blockchain.crypto.service.classic.ClassicCryptoService;
import com.jd.blockchain.crypto.service.sm.SMCryptoService;
import com.jd.blockchain.ledger.BlockchainKeyGenerator;
import com.jd.blockchain.ledger.BlockchainKeypair;
import com.jd.blockchain.ledger.core.MerkleAccountSet;
import com.jd.blockchain.ledger.core.MerkleAccount;
import com.jd.blockchain.ledger.core.CryptoConfig;
import com.jd.blockchain.ledger.core.MerkleAccount;
import com.jd.blockchain.ledger.core.MerkleAccountSet;
import com.jd.blockchain.ledger.core.OpeningAccessPolicy;
import com.jd.blockchain.storage.service.utils.MemoryKVStorage;
import com.jd.blockchain.utils.Bytes;


+ 6
- 6
source/ledger/ledger-core/src/test/java/test/com/jd/blockchain/ledger/core/BaseAccountTest.java View File

@@ -55,33 +55,33 @@ public class BaseAccountTest {
assertFalse(baseAccount.isReadonly());

// 在空白状态下写入数据;
long v = baseAccount.getDataset().setValue(Bytes.fromString("A"), TypedValue.fromText("VALUE_A"), 0);
long v = baseAccount.getDataset().setValue("A", TypedValue.fromText("VALUE_A"), 0);
// 预期失败;
assertEquals(-1, v);

v = baseAccount.getDataset().setValue(Bytes.fromString("A"), TypedValue.fromText("VALUE_A"), 1);
v = baseAccount.getDataset().setValue("A", TypedValue.fromText("VALUE_A"), 1);
// 预期失败;
assertEquals(-1, v);

v = baseAccount.getDataset().setValue(Bytes.fromString("A"), TypedValue.fromText("VALUE_A"), -1);
v = baseAccount.getDataset().setValue("A", TypedValue.fromText("VALUE_A"), -1);
// 预期成功;
assertEquals(0, v);

v = baseAccount.getDataset().setValue(Bytes.fromString("A"), TypedValue.fromText("VALUE_A-1"), -1);
v = baseAccount.getDataset().setValue("A", TypedValue.fromText("VALUE_A-1"), -1);
// 已经存在版本,指定版本号-1,预期导致失败;
assertEquals(-1, v);

baseAccount.commit();
v = 0;
for (int i = 0; i < 10; i++) {
long s = baseAccount.getDataset().setValue(Bytes.fromString("A"), TypedValue.fromText("VALUE_A_" + i), v);
long s = baseAccount.getDataset().setValue("A", TypedValue.fromText("VALUE_A_" + i), v);
baseAccount.commit();
// 预期成功;
assertEquals(v + 1, s);
v++;
}

v = baseAccount.getDataset().setValue(Bytes.fromString("A"), TypedValue.fromText("VALUE_A_" + v), v + 1);
v = baseAccount.getDataset().setValue("A", TypedValue.fromText("VALUE_A_" + v), v + 1);
// 预期成功;
assertEquals(-1, v);



+ 1
- 1
source/ledger/ledger-core/src/test/java/test/com/jd/blockchain/ledger/core/ContractInvokingTest.java View File

@@ -301,7 +301,7 @@ public class ContractInvokingTest {
}
});
// 预期数据都能够正常写入;
KVDataEntry kv1 = ledgerRepo.getDataAccountSet().getAccount(kpDataAccount.getAddress()).getDataEntry("K1",
KVDataEntry kv1 = ledgerRepo.getDataAccountSet().getAccount(kpDataAccount.getAddress()).getDataset().getDataEntry("K1",
0);
KVDataEntry kv2 = ledgerRepo.getDataAccountSet().getAccount(kpDataAccount.getAddress()).getDataEntry("K2",
0);


+ 20
- 30
source/ledger/ledger-core/src/test/java/test/com/jd/blockchain/ledger/core/LedgerAccountTest.java View File

@@ -1,22 +1,12 @@
package test.com.jd.blockchain.ledger.core;

import static org.junit.Assert.assertEquals;

import java.util.Random;

import org.junit.Before;
import org.junit.Test;

import com.jd.blockchain.binaryproto.BinaryProtocol;
import com.jd.blockchain.binaryproto.DataContractRegistry;
import com.jd.blockchain.crypto.HashDigest;
import com.jd.blockchain.crypto.PubKey;
import com.jd.blockchain.crypto.service.classic.ClassicAlgorithm;
import com.jd.blockchain.crypto.service.sm.SMAlgorithm;
import com.jd.blockchain.ledger.AccountHeader;
import com.jd.blockchain.ledger.BlockchainIdentity;
import com.jd.blockchain.ledger.UserInfo;
import com.jd.blockchain.ledger.core.MerkleAccountSet;
import com.jd.blockchain.utils.Bytes;

/**
* Created by zhangshuang3 on 2018/9/3.
@@ -35,27 +25,27 @@ public class LedgerAccountTest {
rand.nextBytes(seed);
rand.nextBytes(settingValue);
rand.nextBytes(rawDigestBytes);
DataContractRegistry.register(AccountHeader.class);
DataContractRegistry.register(BlockchainIdentity.class);
DataContractRegistry.register(UserInfo.class);
}

@Test
public void testSerialize_AccountHeader() {
String address = "xxxxxxxxxxxx";
PubKey pubKey = new PubKey(SMAlgorithm.SM2, rawDigestBytes);
HashDigest hashDigest = new HashDigest(ClassicAlgorithm.SHA256, rawDigestBytes);
MerkleAccountSet.AccountHeaderData accountHeaderData = new MerkleAccountSet.AccountHeaderData(Bytes.fromString(address),
pubKey, hashDigest);
// encode and decode
byte[] encodeBytes = BinaryProtocol.encode(accountHeaderData, AccountHeader.class);
AccountHeader deAccountHeaderData = BinaryProtocol.decode(encodeBytes);
// verify start
assertEquals(accountHeaderData.getAddress(), deAccountHeaderData.getAddress());
assertEquals(accountHeaderData.getPubKey(), deAccountHeaderData.getPubKey());
assertEquals(accountHeaderData.getRootHash(), deAccountHeaderData.getRootHash());
}
// @Test
// public void testSerialize_AccountHeader() {
// String address = "xxxxxxxxxxxx";
// PubKey pubKey = new PubKey(SMAlgorithm.SM2, rawDigestBytes);
// HashDigest hashDigest = new HashDigest(ClassicAlgorithm.SHA256, rawDigestBytes);
// MerkleAccountSet.AccountHeaderData accountHeaderData = new MerkleAccountSet.AccountHeaderData(Bytes.fromString(address),
// pubKey, hashDigest);
//
// // encode and decode
// byte[] encodeBytes = BinaryProtocol.encode(accountHeaderData, AccountHeader.class);
// AccountHeader deAccountHeaderData = BinaryProtocol.decode(encodeBytes);
//
// // verify start
// assertEquals(accountHeaderData.getAddress(), deAccountHeaderData.getAddress());
// assertEquals(accountHeaderData.getPubKey(), deAccountHeaderData.getPubKey());
// assertEquals(accountHeaderData.getRootHash(), deAccountHeaderData.getRootHash());
// }

}

+ 3
- 11
source/ledger/ledger-core/src/test/java/test/com/jd/blockchain/ledger/core/LedgerEditorTest.java View File

@@ -8,12 +8,9 @@ import org.junit.Before;
import org.junit.Test;

import com.jd.blockchain.binaryproto.DataContractRegistry;
import com.jd.blockchain.crypto.AddressEncoding;
import com.jd.blockchain.crypto.AsymmetricKeypair;
import com.jd.blockchain.crypto.Crypto;
import com.jd.blockchain.crypto.CryptoProvider;
import com.jd.blockchain.crypto.SignatureFunction;
import com.jd.blockchain.crypto.service.classic.ClassicAlgorithm;
import com.jd.blockchain.crypto.service.classic.ClassicCryptoService;
import com.jd.blockchain.crypto.service.sm.SMCryptoService;
import com.jd.blockchain.ledger.BlockchainKeyGenerator;
@@ -25,7 +22,7 @@ import com.jd.blockchain.ledger.LedgerInitSetting;
import com.jd.blockchain.ledger.LedgerTransaction;
import com.jd.blockchain.ledger.TransactionRequest;
import com.jd.blockchain.ledger.TransactionState;
import com.jd.blockchain.ledger.core.CryptoConfig;
import com.jd.blockchain.ledger.TypedValue;
import com.jd.blockchain.ledger.core.DataAccount;
import com.jd.blockchain.ledger.core.LedgerDataset;
import com.jd.blockchain.ledger.core.LedgerEditor;
@@ -33,11 +30,6 @@ import com.jd.blockchain.ledger.core.LedgerTransactionContext;
import com.jd.blockchain.ledger.core.LedgerTransactionalEditor;
import com.jd.blockchain.ledger.core.UserAccount;
import com.jd.blockchain.storage.service.utils.MemoryKVStorage;
import com.jd.blockchain.transaction.ConsensusParticipantData;
import com.jd.blockchain.transaction.LedgerInitData;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.io.BytesUtils;
import com.jd.blockchain.utils.net.NetworkAddress;

public class LedgerEditorTest {

@@ -102,7 +94,7 @@ public class LedgerEditorTest {

DataAccount dataAccount = ldgDS.getDataAccountSet().register(dataKP.getAddress(), dataKP.getPubKey(), null);

dataAccount.setBytes(Bytes.fromString("A"), "abc", -1);
dataAccount.getDataset().setValue("A", TypedValue.fromText("abc"), -1);

LedgerTransaction tx = genisisTxCtx.commit(TransactionState.SUCCESS);
LedgerBlock block = ldgEdt.prepare();
@@ -115,7 +107,7 @@ public class LedgerEditorTest {
assertEquals(0, block.getHeight());

// 验证数据读写的一致性;
BytesValue bytes = dataAccount.getBytes("A");
BytesValue bytes = dataAccount.getDataset().getValue("A");
assertEquals(DataType.TEXT, bytes.getType());
String textValue = bytes.getValue().toUTF8String();
assertEquals("abc", textValue);


+ 44
- 37
source/ledger/ledger-core/src/test/java/test/com/jd/blockchain/ledger/core/MerkleDataSetTest.java View File

@@ -25,7 +25,9 @@ import com.jd.blockchain.ledger.core.CryptoConfig;
import com.jd.blockchain.ledger.core.MerkleDataSet;
import com.jd.blockchain.storage.service.utils.MemoryKVStorage;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.VersioningKVEntry;
import com.jd.blockchain.utils.DataEntry;
import com.jd.blockchain.utils.Dataset;
import com.jd.blockchain.utils.DatasetHelper;
import com.jd.blockchain.utils.io.BytesUtils;

public class MerkleDataSetTest {
@@ -53,9 +55,9 @@ public class MerkleDataSetTest {
MemoryKVStorage storage = new MemoryKVStorage();
MerkleDataSet mds = new MerkleDataSet(cryptoConfig, keyPrefix, storage, storage);
mds.setValue("A", "A".getBytes(), -1);
mds.setValue("B", "B".getBytes(), -1);
mds.setValue("C", "C".getBytes(), -1);
mds.setValue(Bytes.fromString("A"), "A".getBytes(), -1);
mds.setValue(Bytes.fromString("B"), "B".getBytes(), -1);
mds.setValue(Bytes.fromString("C"), "C".getBytes(), -1);
mds.commit();
@@ -85,22 +87,23 @@ public class MerkleDataSetTest {
MemoryKVStorage storage = new MemoryKVStorage();

MerkleDataSet mds = new MerkleDataSet(cryptoConfig, keyPrefix, storage, storage);
mds.setValue("A", "A".getBytes(), -1);
mds.setValue("B", "B".getBytes(), -1);
mds.setValue("C", "C".getBytes(), -1);
Dataset<String, byte[]> ds = DatasetHelper.map(mds);
ds.setValue("A", "A".getBytes(), -1);
ds.setValue("B", "B".getBytes(), -1);
ds.setValue("C", "C".getBytes(), -1);

mds.commit();

byte[] va = mds.getValue("A");
byte[] va = ds.getValue("A");
assertNotNull(va);
assertEquals("A", new String(va));
byte[] vc = mds.getValue("C");
VersioningKVEntry ventry = mds.getDataEntry("C");
byte[] vc = ds.getValue("C");
DataEntry<String, byte[]> ventry = ds.getDataEntry("C");
assertNotNull(vc);
assertNotNull(ventry);
assertEquals("C", new String(vc));
assertEquals("C", ventry.getKey().toUTF8String());
assertEquals("C", ventry.getKey());

HashDigest root1 = mds.getRootHash();
@@ -111,8 +114,8 @@ public class MerkleDataSetTest {
int expStorageCount = 10;
assertEquals(expStorageCount, storage.getStorageCount());

mds.setValue("B", "B".getBytes(), 0);
mds.setValue("C", "C".getBytes(), 0);
ds.setValue("B", "B".getBytes(), 0);
ds.setValue("C", "C".getBytes(), 0);
mds.commit();
HashDigest root2 = mds.getRootHash();
assertNotEquals(root1, root2);
@@ -122,7 +125,7 @@ public class MerkleDataSetTest {
expStorageCount = expStorageCount + 3;
assertEquals(expStorageCount, storage.getStorageCount());

mds.setValue("D", "DValue".getBytes(), -1);
ds.setValue("D", "DValue".getBytes(), -1);
mds.commit();
HashDigest root3 = mds.getRootHash();
assertNotEquals(root2, root3);
@@ -135,31 +138,31 @@ public class MerkleDataSetTest {
assertEquals(expStorageCount, storage.getStorageCount());

// Check rollback function: Add some keys, and then rollback;
long v = mds.setValue("E", "E-values".getBytes(), -1);
long v = ds.setValue("E", "E-values".getBytes(), -1);
assertEquals(v, 0);
String expEValue = new String(mds.getValue("E"));
String expEValue = new String(ds.getValue("E"));
assertEquals(expEValue, "E-values");

v = mds.setValue("F", "F-values".getBytes(), -1);
v = ds.setValue("F", "F-values".getBytes(), -1);
assertEquals(v, 0);
String expFValue = new String(mds.getValue("F"));
String expFValue = new String(ds.getValue("F"));
assertEquals(expFValue, "F-values");

v = mds.setValue("E", "E-values-1".getBytes(), 0);
v = ds.setValue("E", "E-values-1".getBytes(), 0);
assertEquals(v, 1);
expEValue = new String(mds.getValue("E"));
expEValue = new String(ds.getValue("E"));
assertEquals(expEValue, "E-values-1");

mds.cancel();

byte[] bv = mds.getValue("E");
byte[] bv = ds.getValue("E");
assertNull(bv);
bv = mds.getValue("F");
bv = ds.getValue("F");
assertNull(bv);

v = mds.getVersion("E");
v = ds.getVersion("E");
assertEquals(-1, v);
v = mds.getVersion("F");
v = ds.getVersion("F");
assertEquals(-1, v);

// Expect that states has been recover;
@@ -194,10 +197,11 @@ public class MerkleDataSetTest {
MemoryKVStorage storage = new MemoryKVStorage();

MerkleDataSet mds = new MerkleDataSet(cryptoConfig, keyPrefix, storage, storage);
Dataset<String, byte[]> ds = DatasetHelper.map(mds);

// 初始的时候没有任何数据,总是返回 null;
VersioningKVEntry verKVEntry = mds.getDataEntry("NULL_KEY");
byte[] vbytes = mds.getValue("NULL_KEY");
DataEntry verKVEntry = ds.getDataEntry("NULL_KEY");
byte[] vbytes = ds.getValue("NULL_KEY");
assertNull(verKVEntry);
assertNull(vbytes);

@@ -217,7 +221,7 @@ public class MerkleDataSetTest {
for (int i = 0; i < count; i++) {
key = "data" + i;
rand.nextBytes(data);
v = mds.setValue(key, data, -1);
v = ds.setValue(key, data, -1);
dataVersions.put(key, v);
dataValues.put(key + "_" + v, data);
assertEquals(v, 0);
@@ -237,7 +241,7 @@ public class MerkleDataSetTest {

KeySnapshot ks = new KeySnapshot();
ks.proof = proof;
ks.maxVersion = mds.getVersion(key);
ks.maxVersion = ds.getVersion(key);

snapshot.put(key, ks);
}
@@ -271,7 +275,7 @@ public class MerkleDataSetTest {
key = "data" + i;
rand.nextBytes(data);
expVer = dataVersions.get(key);
v = mds.setValue(key, data, expVer);
v = ds.setValue(key, data, expVer);

assertEquals(v, expVer + 1);

@@ -300,7 +304,7 @@ public class MerkleDataSetTest {

KeySnapshot ks = new KeySnapshot();
ks.proof = proof;
ks.maxVersion = mds.getVersion(key);
ks.maxVersion = ds.getVersion(key);
snapshot.put(key, ks);
}
history.put(rootHash, snapshot);
@@ -316,6 +320,7 @@ public class MerkleDataSetTest {

MerkleDataSet mdsReload = new MerkleDataSet(hisRootHash, cryptoConfig, keyPrefix, storage, storage,
true);
Dataset<String, byte[]> dsReload = DatasetHelper.map(mdsReload);
assertEquals(hisRootHash, mdsReload.getRootHash());

// verify every keys;
@@ -323,7 +328,7 @@ public class MerkleDataSetTest {
key = "data" + i;
// 最新版本一致;
long expLatestVersion = snapshot.get(key).maxVersion;
long actualLatestVersion = mdsReload.getVersion(key);
long actualLatestVersion = dsReload.getVersion(key);
assertEquals(expLatestVersion, actualLatestVersion);

// 数据证明一致;
@@ -339,7 +344,7 @@ public class MerkleDataSetTest {
for (long j = 0; j < actualLatestVersion; j++) {
String keyver = key + "_" + j;
byte[] expValue = dataValues.get(keyver);
byte[] actualValue = mdsReload.getValue(key, j);
byte[] actualValue = dsReload.getValue(key, j);
assertTrue(BytesUtils.equals(expValue, actualValue));
}
}
@@ -365,10 +370,11 @@ public class MerkleDataSetTest {
MemoryKVStorage storage = new MemoryKVStorage();

MerkleDataSet mds = new MerkleDataSet(cryptoConfig, keyPrefix, storage, storage);
Dataset<String, byte[]> ds = DatasetHelper.map(mds);

// 初始的时候没有任何数据,总是返回 null;
VersioningKVEntry verKVEntry = mds.getDataEntry("NULL_KEY");
byte[] vbytes = mds.getValue("NULL_KEY");
DataEntry verKVEntry = ds.getDataEntry("NULL_KEY");
byte[] vbytes = ds.getValue("NULL_KEY");
assertNull(verKVEntry);
assertNull(vbytes);

@@ -388,7 +394,7 @@ public class MerkleDataSetTest {
MerkleProof proof;
for (int i = 0; i < count; i++) {
key = "data" + i;
v = mds.setValue(key, data, -1);
v = ds.setValue(key, data, -1);
dataVersions.put(key, v);
// dataValues.put(key + "_" + v, data);
assertEquals(v, 0);
@@ -408,7 +414,7 @@ public class MerkleDataSetTest {

KeySnapshot ks = new KeySnapshot();
ks.proof = proof;
ks.maxVersion = mds.getVersion(key);
ks.maxVersion = ds.getVersion(key);

snapshot.put(key, ks);
}
@@ -418,6 +424,7 @@ public class MerkleDataSetTest {
// verify;
{
MerkleDataSet mdsReload = new MerkleDataSet(rootHash, cryptoConfig, keyPrefix, storage, storage, true);
Dataset<String, byte[]> dsReload = DatasetHelper.map(mdsReload);
// verify every keys;
Map<String, KeySnapshot> snapshot = history.get(rootHash);
MerkleProof expProof;
@@ -429,7 +436,7 @@ public class MerkleDataSetTest {
expProof = snapshot.get(key).proof;
assertEquals(expProof.toString(), proof.toString());

byte[] value = mdsReload.getValue(key);
byte[] value = dsReload.getValue(key);
assertTrue(BytesUtils.equals(data, value));
}
}


+ 8
- 8
source/ledger/ledger-core/src/test/java/test/com/jd/blockchain/ledger/core/TransactionBatchProcessorTest.java View File

@@ -332,13 +332,13 @@ public class TransactionBatchProcessorTest {
newBlock = newBlockEditor.prepare();
newBlockEditor.commit();

BytesValue v1_0 = ledgerRepo.getDataAccountSet().getAccount(dataAccountKeypair.getAddress()).getValue("K1",
BytesValue v1_0 = ledgerRepo.getDataAccountSet().getAccount(dataAccountKeypair.getAddress()).getDataset().getValue("K1",
0);
BytesValue v1_1 = ledgerRepo.getDataAccountSet().getAccount(dataAccountKeypair.getAddress()).getValue("K1",
BytesValue v1_1 = ledgerRepo.getDataAccountSet().getAccount(dataAccountKeypair.getAddress()).getDataset().getValue("K1",
1);
BytesValue v2 = ledgerRepo.getDataAccountSet().getAccount(dataAccountKeypair.getAddress()).getValue("K2",
BytesValue v2 = ledgerRepo.getDataAccountSet().getAccount(dataAccountKeypair.getAddress()).getDataset().getValue("K2",
0);
BytesValue v3 = ledgerRepo.getDataAccountSet().getAccount(dataAccountKeypair.getAddress()).getValue("K3",
BytesValue v3 = ledgerRepo.getDataAccountSet().getAccount(dataAccountKeypair.getAddress()).getDataset().getValue("K3",
0);

assertNotNull(v1_0);
@@ -376,16 +376,16 @@ public class TransactionBatchProcessorTest {
newBlock = newBlockEditor.prepare();
newBlockEditor.commit();

BytesValue v1 = ledgerRepo.getDataAccountSet().getAccount(dataAccountKeypair.getAddress()).getValue("K1");
v3 = ledgerRepo.getDataAccountSet().getAccount(dataAccountKeypair.getAddress()).getValue("K3");
BytesValue v1 = ledgerRepo.getDataAccountSet().getAccount(dataAccountKeypair.getAddress()).getDataset().getValue("K1");
v3 = ledgerRepo.getDataAccountSet().getAccount(dataAccountKeypair.getAddress()).getDataset().getValue("K3");

// k1 的版本仍然为1,没有更新;
long k1_version = ledgerRepo.getDataAccountSet().getAccount(dataAccountKeypair.getAddress())
.getDataVersion("K1");
.getDataset().getVersion("K1");
assertEquals(1, k1_version);

long k3_version = ledgerRepo.getDataAccountSet().getAccount(dataAccountKeypair.getAddress())
.getDataVersion("K3");
.getDataset().getVersion("K3");
assertEquals(1, k3_version);

assertNotNull(v1);


+ 3
- 3
source/storage/storage-redis/src/main/java/com/jd/blockchain/storage/service/impl/redis/RedisVerioningStorage.java View File

@@ -2,7 +2,7 @@ package com.jd.blockchain.storage.service.impl.redis;

import com.jd.blockchain.storage.service.VersioningKVStorage;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.VersioningKVEntry;
import com.jd.blockchain.utils.DataEntry;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
@@ -45,7 +45,7 @@ public class RedisVerioningStorage implements VersioningKVStorage {
}

@Override
public VersioningKVEntry getEntry(Bytes key, long version) {
public DataEntry getEntry(Bytes key, long version) {
byte[] value = get(key, version);
if (value == null) {
return null;
@@ -101,7 +101,7 @@ public class RedisVerioningStorage implements VersioningKVStorage {
}


private static class VersioningKVData implements VersioningKVEntry{
private static class VersioningKVData implements DataEntry{

private Bytes key;



+ 3
- 3
source/storage/storage-rocksdb/src/main/java/com/jd/blockchain/storage/service/impl/rocksdb/RocksDBVersioningStorage.java View File

@@ -9,7 +9,7 @@ import org.rocksdb.*;

import com.jd.blockchain.storage.service.VersioningKVStorage;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.VersioningKVEntry;
import com.jd.blockchain.utils.DataEntry;
import com.jd.blockchain.utils.io.BytesUtils;

/**
@@ -127,7 +127,7 @@ public class RocksDBVersioningStorage implements VersioningKVStorage {
}

@Override
public VersioningKVEntry getEntry(Bytes key, long version) {
public DataEntry getEntry(Bytes key, long version) {
byte[] value = get(key, version);
if (value == null) {
return null;
@@ -226,7 +226,7 @@ public class RocksDBVersioningStorage implements VersioningKVStorage {
}
}

private static class VersioningKVData implements VersioningKVEntry {
private static class VersioningKVData implements DataEntry {

private Bytes key;



+ 2
- 2
source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/VersioningKVStorage.java View File

@@ -1,7 +1,7 @@
package com.jd.blockchain.storage.service;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.VersioningKVEntry;
import com.jd.blockchain.utils.DataEntry;
/**
* Versioning Key-Value Storage
@@ -41,7 +41,7 @@ public interface VersioningKVStorage extends BatchStorageService {
* @param version
* @return
*/
VersioningKVEntry<Bytes, byte[]> getEntry(Bytes key, long version);
DataEntry<Bytes, byte[]> getEntry(Bytes key, long version);
/**
* Return the specified verson's value; <br>


+ 4
- 4
source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/utils/BufferedKVStorage.java View File

@@ -10,7 +10,7 @@ import com.jd.blockchain.storage.service.ExPolicyKVStorage;
import com.jd.blockchain.storage.service.VersioningKVStorage;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.Transactional;
import com.jd.blockchain.utils.VersioningKVEntry;
import com.jd.blockchain.utils.DataEntry;

/**
* {@link BufferedKVStorage} 缓冲写入的KV存储;<br>
@@ -79,7 +79,7 @@ public class BufferedKVStorage implements VersioningKVStorage, ExPolicyKVStorage
}
@Override
public VersioningKVEntry<Bytes, byte[]> getEntry(Bytes key, long version) {
public DataEntry<Bytes, byte[]> getEntry(Bytes key, long version) {
VersioningWritingSet ws = versioningCache.get(key);
if (ws == null) {
return origVersioningStorage.getEntry(key, version);
@@ -484,7 +484,7 @@ public class BufferedKVStorage implements VersioningKVStorage, ExPolicyKVStorage
return startingVersion;
}

public VersioningKVEntry<Bytes, byte[]> getEntry(long version) {
public DataEntry<Bytes, byte[]> getEntry(long version) {
byte[] value = get(version);
if (value == null) {
return null;
@@ -505,7 +505,7 @@ public class BufferedKVStorage implements VersioningKVStorage, ExPolicyKVStorage
}
}

private static class VersioningKVData implements VersioningKVEntry<Bytes, byte[]> {
private static class VersioningKVData implements DataEntry<Bytes, byte[]> {

private Bytes key;



+ 2
- 2
source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/utils/MemoryKVStorage.java View File

@@ -7,7 +7,7 @@ import com.jd.blockchain.storage.service.ExPolicyKVStorage;
import com.jd.blockchain.storage.service.KVStorageService;
import com.jd.blockchain.storage.service.VersioningKVStorage;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.VersioningKVEntry;
import com.jd.blockchain.utils.DataEntry;
import com.jd.blockchain.utils.io.BytesMap;

public class MemoryKVStorage implements ExPolicyKVStorage, VersioningKVStorage, KVStorageService, BytesMap<Bytes> {
@@ -21,7 +21,7 @@ public class MemoryKVStorage implements ExPolicyKVStorage, VersioningKVStorage,
}

@Override
public VersioningKVEntry getEntry(Bytes key, long version) {
public DataEntry getEntry(Bytes key, long version) {
return verStorage.getEntry(key, version);
}



+ 2
- 2
source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/utils/VersioningKVData.java View File

@@ -1,8 +1,8 @@
package com.jd.blockchain.storage.service.utils;

import com.jd.blockchain.utils.VersioningKVEntry;
import com.jd.blockchain.utils.DataEntry;

public class VersioningKVData<K, V> implements VersioningKVEntry<K, V> {
public class VersioningKVData<K, V> implements DataEntry<K, V> {

private K key;



+ 3
- 3
source/storage/storage-service/src/main/java/com/jd/blockchain/storage/service/utils/VersioningKVStorageMap.java View File

@@ -7,7 +7,7 @@ import java.util.concurrent.ConcurrentHashMap;

import com.jd.blockchain.storage.service.VersioningKVStorage;
import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.VersioningKVEntry;
import com.jd.blockchain.utils.DataEntry;
import com.jd.blockchain.utils.io.BytesMap;

public class VersioningKVStorageMap implements VersioningKVStorage, BytesMap<Bytes> {
@@ -42,7 +42,7 @@ public class VersioningKVStorageMap implements VersioningKVStorage, BytesMap<Byt
}

@Override
public VersioningKVEntry getEntry(Bytes key, long version) {
public DataEntry getEntry(Bytes key, long version) {
VersioningWritingSet ws = versioningCache.get(key);
if (ws == null) {
return null;
@@ -195,7 +195,7 @@ public class VersioningKVStorageMap implements VersioningKVStorage, BytesMap<Byt
return startingVersion;
}

public VersioningKVEntry getEntry(long version) {
public DataEntry getEntry(long version) {
byte[] value = get(version);
if (value == null) {
return null;


+ 46
- 24
source/test/test-integration/src/test/java/test/com/jd/blockchain/intgr/IntegrationTestAll4Redis.java View File

@@ -1,8 +1,39 @@
package test.com.jd.blockchain.intgr;

import com.jd.blockchain.crypto.*;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Random;

import org.junit.Test;
import org.springframework.core.io.ClassPathResource;

import com.jd.blockchain.crypto.AddressEncoding;
import com.jd.blockchain.crypto.AsymmetricKeypair;
import com.jd.blockchain.crypto.Crypto;
import com.jd.blockchain.crypto.HashDigest;
import com.jd.blockchain.crypto.KeyGenUtils;
import com.jd.blockchain.crypto.PrivKey;
import com.jd.blockchain.crypto.PubKey;
import com.jd.blockchain.gateway.GatewayConfigProperties.KeyPairConfig;
import com.jd.blockchain.ledger.*;
import com.jd.blockchain.ledger.BlockchainKeyGenerator;
import com.jd.blockchain.ledger.BlockchainKeypair;
import com.jd.blockchain.ledger.BytesValue;
import com.jd.blockchain.ledger.DataAccountKVSetOperation;
import com.jd.blockchain.ledger.KVDataEntry;
import com.jd.blockchain.ledger.LedgerBlock;
import com.jd.blockchain.ledger.LedgerInfo;
import com.jd.blockchain.ledger.LedgerInitProperties;
import com.jd.blockchain.ledger.PreparedTransaction;
import com.jd.blockchain.ledger.TransactionResponse;
import com.jd.blockchain.ledger.TransactionState;
import com.jd.blockchain.ledger.TransactionTemplate;
import com.jd.blockchain.ledger.core.DataAccount;
import com.jd.blockchain.ledger.core.DataAccountQuery;
import com.jd.blockchain.ledger.core.LedgerManage;
@@ -17,19 +48,10 @@ import com.jd.blockchain.utils.Bytes;
import com.jd.blockchain.utils.codec.HexUtils;
import com.jd.blockchain.utils.concurrent.ThreadInvoker.AsyncCallback;
import com.jd.blockchain.utils.net.NetworkAddress;
import org.junit.Test;
import org.springframework.core.io.ClassPathResource;

import test.com.jd.blockchain.intgr.contract.AssetContract;
import test.com.jd.blockchain.intgr.initializer.LedgerInitializeWeb4SingleStepsTest;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Random;

import static org.junit.Assert.*;

public class IntegrationTestAll4Redis {

public static final String PASSWORD = "abc";
@@ -214,21 +236,21 @@ public class IntegrationTestAll4Redis {
assertEquals(ledgerRepository.retrieveLatestBlockHeight(), txResp.getBlockHeight());

assertEquals("Value_A_0", ledgerRepository.getDataAccountSet(ledgerRepository.retrieveLatestBlock())
.getAccount(dataKey.getAddress()).getBytes("A").getValue().toUTF8String());
.getAccount(dataKey.getAddress()).getDataset().getValue("A").getValue().toUTF8String());
assertEquals("Value_B_0", ledgerRepository.getDataAccountSet(ledgerRepository.retrieveLatestBlock())
.getAccount(dataKey.getAddress()).getBytes("B").getValue().toUTF8String());
.getAccount(dataKey.getAddress()).getDataset().getValue("B").getValue().toUTF8String());
assertEquals("Value_C_0", ledgerRepository.getDataAccountSet(ledgerRepository.retrieveLatestBlock())
.getAccount(dataKey.getAddress()).getBytes("C").getValue().toUTF8String());
.getAccount(dataKey.getAddress()).getDataset().getValue("C").getValue().toUTF8String());
assertEquals("Value_D_0", ledgerRepository.getDataAccountSet(ledgerRepository.retrieveLatestBlock())
.getAccount(dataKey.getAddress()).getBytes("D").getValue().toUTF8String());
.getAccount(dataKey.getAddress()).getDataset().getValue("D").getValue().toUTF8String());
assertEquals(0, ledgerRepository.getDataAccountSet(ledgerRepository.retrieveLatestBlock())
.getAccount(dataKey.getAddress()).getDataVersion("A"));
.getAccount(dataKey.getAddress()).getDataset().getVersion("A"));
assertEquals(0, ledgerRepository.getDataAccountSet(ledgerRepository.retrieveLatestBlock())
.getAccount(dataKey.getAddress()).getDataVersion("B"));
.getAccount(dataKey.getAddress()).getDataset().getVersion("B"));
assertEquals(0, ledgerRepository.getDataAccountSet(ledgerRepository.retrieveLatestBlock())
.getAccount(dataKey.getAddress()).getDataVersion("C"));
.getAccount(dataKey.getAddress()).getDataset().getVersion("C"));
assertEquals(0, ledgerRepository.getDataAccountSet(ledgerRepository.retrieveLatestBlock())
.getAccount(dataKey.getAddress()).getDataVersion("D"));
.getAccount(dataKey.getAddress()).getDataset().getVersion("D"));

return;
}
@@ -449,9 +471,9 @@ public class IntegrationTestAll4Redis {
AsymmetricKeypair key = Crypto.getSignatureFunction("ED25519").generateKeypair();
PubKey pubKey = key.getPubKey();
Bytes dataAddress = AddressEncoding.generateAddress(pubKey);
assertEquals(dataAddress, dataAccountSet.getAccount(dataAddress).getAddress());
assertEquals(dataAddress, dataAccountSet.getAccount(dataAddress).getID().getAddress());
assertEquals("hello",
dataAccountSet.getAccount(dataAddress).getBytes(KEY_TOTAL, -1).getValue().toUTF8String());
dataAccountSet.getAccount(dataAddress).getDataset().getValue(KEY_TOTAL, -1).getValue().toUTF8String());

// 验证userAccount,从合约内部赋值,然后外部验证;内部定义动态key,外部不便于得到,临时屏蔽;
// UserAccountSet userAccountSet =
@@ -479,9 +501,9 @@ public class IntegrationTestAll4Redis {
// 验证结果;
LedgerBlock block = ledgerRepository.getBlock(txResp.getBlockHeight());
BytesValue val1InDb = ledgerRepository.getDataAccountSet(block).getAccount(contractDataKey.getAddress())
.getBytes("A");
.getDataset().getValue("A");
BytesValue val2InDb = ledgerRepository.getDataAccountSet(block).getAccount(contractDataKey.getAddress())
.getBytes(KEY_TOTAL);
.getDataset().getValue(KEY_TOTAL);
assertEquals("Value_A_0", val1InDb.getValue().toUTF8String());
assertEquals("total value,dataAccount", val2InDb.getValue().toUTF8String());
}


+ 17
- 0
source/utils/utils-common/src/main/java/com/jd/blockchain/utils/DataEntry.java View File

@@ -0,0 +1,17 @@
package com.jd.blockchain.utils;
/**
* 版本化的键值数据项;
*
* @author huanghaiquan
*
*/
public interface DataEntry<K, V> {
public K getKey();
public long getVersion();
public V getValue();
}

source/utils/utils-common/src/main/java/com/jd/blockchain/utils/VersioningMap.java → source/utils/utils-common/src/main/java/com/jd/blockchain/utils/Dataset.java View File

@@ -1,8 +1,8 @@
package com.jd.blockchain.utils;

public interface VersioningMap<K, V> {
public interface Dataset<K, V> {
// long getDataCount();
long getDataCount();

// /**
// * Create or update the value associated the specified key if the version
@@ -109,8 +109,8 @@ public interface VersioningMap<K, V> {
* @param key
* @return Null if the key doesn't exist!
*/
VersioningKVEntry<K, V> getDataEntry(K key);
DataEntry<K, V> getDataEntry(K key);

VersioningKVEntry<K, V> getDataEntry(K key, long version);
DataEntry<K, V> getDataEntry(K key, long version);

}

+ 301
- 0
source/utils/utils-common/src/main/java/com/jd/blockchain/utils/DatasetHelper.java View File

@@ -0,0 +1,301 @@
package com.jd.blockchain.utils;

public class DatasetHelper {

public static final TypeMapper<Bytes, String> UTF8_STRING_BYTES_MAPPER = new TypeMapper<Bytes, String>() {

@Override
public Bytes encode(String t2) {
return Bytes.fromString(t2);
}

@Override
public String decode(Bytes t1) {
return t1.toUTF8String();
}
};

public static final TypeMapper<String, Bytes> BYTES_UTF8_STRING_MAPPER = new TypeMapper<String, Bytes>() {

@Override
public String encode(Bytes t1) {
return t1.toUTF8String();
}

@Override
public Bytes decode(String t2) {
return Bytes.fromString(t2);
}
};
/**
* 适配两个不同类型参数的数据集;
*
* @param <K1> 适配输入的 键 类型;
* @param <K2> 适配输出的 键 类型;
* @param <V1> 适配输入的 值 类型;
* @param <V2> 适配输出的 值 类型;
* @param dataset 数据集;
* @param keyMapper 键的映射配置;
* @param valueMapper 值的映射配置;
* @return
*/
public static <V> Dataset<String, V> map(Dataset<Bytes, V> dataset) {
return new TypeAdapter<Bytes, String, V, V>(dataset, UTF8_STRING_BYTES_MAPPER, new EmptyMapper<V>());
}

/**
* 适配两个不同类型参数的数据集;
*
* @param <K1> 适配输入的 键 类型;
* @param <K2> 适配输出的 键 类型;
* @param <V1> 适配输入的 值 类型;
* @param <V2> 适配输出的 值 类型;
* @param dataset 数据集;
* @param keyMapper 键的映射配置;
* @param valueMapper 值的映射配置;
* @return
*/
public static <V1, V2> Dataset<String, V2> map(Dataset<Bytes, V1> dataset, TypeMapper<V1, V2> valueMapper) {
return new TypeAdapter<Bytes, String, V1, V2>(dataset, UTF8_STRING_BYTES_MAPPER, valueMapper);
}

/**
* 适配两个不同类型参数的数据集;
*
* @param <K1> 适配输入的 键 类型;
* @param <K2> 适配输出的 键 类型;
* @param <V1> 适配输入的 值 类型;
* @param <V2> 适配输出的 值 类型;
* @param dataset 数据集;
* @param keyMapper 键的映射配置;
* @param valueMapper 值的映射配置;
* @return
*/
public static <K1, K2, V1, V2> Dataset<K2, V2> map(Dataset<K1, V1> dataset, TypeMapper<K1, K2> keyMapper,
TypeMapper<V1, V2> valueMapper) {
return new TypeAdapter<K1, K2, V1, V2>(dataset, keyMapper, valueMapper);
}

/**
* 监听对数据集的变更;
*
* @param <K> 键 类型;
* @param <V> 值 类型;
* @param dataset 要监听的数据集;
* @param listener 要植入的监听器;
* @return 植入监听器的数据集实例;
*/
public static <K, V> Dataset<K, V> listen(Dataset<K, V> dataset, DataChangedListener<K, V> listener) {
return new DatasetUpdatingMonitor<K, V>(dataset, listener);
}

/**
* 数据修改监听器;
*
* @author huanghaiquan
*
* @param <K>
* @param <V>
*/
public static interface DataChangedListener<K, V> {

void onChanged(K key, V value, long expectedVersion, long newVersion);

}

/**
* 类型映射接口;
*
* @author huanghaiquan
*
* @param <T1>
* @param <T2>
*/
public static interface TypeMapper<T1, T2> {

T1 encode(T2 t2);

T2 decode(T1 t1);

}
private static class EmptyMapper<T> implements TypeMapper<T, T>{

@Override
public T encode(T t) {
return t;
}

@Override
public T decode(T t) {
return t;
}
}

private static class DatasetUpdatingMonitor<K, V> implements Dataset<K, V> {

private Dataset<K, V> dataset;

private DataChangedListener<K, V> listener;

public DatasetUpdatingMonitor(Dataset<K, V> dataset, DataChangedListener<K, V> listener) {
this.dataset = dataset;
this.listener = listener;
}

@Override
public long getDataCount() {
return dataset.getDataCount();
}

@Override
public long setValue(K key, V value, long version) {
long newVersion = dataset.setValue(key, value, version);
if (newVersion > -1) {
listener.onChanged(key, value, version, newVersion);
}
return newVersion;
}

@Override
public V getValue(K key, long version) {
return dataset.getValue(key, version);
}

@Override
public V getValue(K key) {
return dataset.getValue(key);
}

@Override
public long getVersion(K key) {
return dataset.getVersion(key);
}

@Override
public DataEntry<K, V> getDataEntry(K key) {
return dataset.getDataEntry(key);
}

@Override
public DataEntry<K, V> getDataEntry(K key, long version) {
return dataset.getDataEntry(key, version);
}

}

/**
* 类型适配器;
*
* @author huanghaiquan
*
* @param <K1>
* @param <K2>
* @param <V1>
* @param <V2>
*/
private static class TypeAdapter<K1, K2, V1, V2> implements Dataset<K2, V2> {
private Dataset<K1, V1> dataset;
private TypeMapper<K1, K2> keyMapper;
private TypeMapper<V1, V2> valueMapper;

public TypeAdapter(Dataset<K1, V1> dataset, TypeMapper<K1, K2> keyMapper, TypeMapper<V1, V2> valueMapper) {
this.dataset = dataset;
this.keyMapper = keyMapper;
this.valueMapper = valueMapper;
}

@Override
public long getDataCount() {
return dataset.getDataCount();
}

@Override
public long setValue(K2 key, V2 value, long version) {
K1 key1 = keyMapper.encode(key);
V1 value1 = valueMapper.encode(value);
return dataset.setValue(key1, value1, version);
}

@Override
public V2 getValue(K2 key, long version) {
K1 k = keyMapper.encode(key);
V1 v = dataset.getValue(k, version);
if (v == null) {
return null;
}
return valueMapper.decode(v);
}

@Override
public V2 getValue(K2 key) {
K1 k = keyMapper.encode(key);
V1 v = dataset.getValue(k);
if (v == null) {
return null;
}
return valueMapper.decode(v);
}

@Override
public long getVersion(K2 key) {
K1 k = keyMapper.encode(key);
return dataset.getVersion(k);
}

@Override
public DataEntry<K2, V2> getDataEntry(K2 key) {
K1 k = keyMapper.encode(key);
DataEntry<K1, V1> entry = dataset.getDataEntry(k);
if (entry == null) {
return null;
}
V2 v = valueMapper.decode(entry.getValue());
return new KeyValueEntry<K2, V2>(key, v, entry.getVersion());
}

@Override
public DataEntry<K2, V2> getDataEntry(K2 key, long version) {
K1 k = keyMapper.encode(key);
DataEntry<K1, V1> entry = dataset.getDataEntry(k, version);
if (entry == null) {
return null;
}
V2 v = valueMapper.decode(entry.getValue());
return new KeyValueEntry<K2, V2>(key, v, entry.getVersion());
}

}

private static class KeyValueEntry<K, V> implements DataEntry<K, V> {

private K key;

private V value;

private long version;

public KeyValueEntry(K key, V value, long version) {
this.key = key;
this.value = value;
this.version = version;
}

public K getKey() {
return key;
}

public long getVersion() {
return version;
}

public V getValue() {
return value;
}

}
}

+ 0
- 122
source/utils/utils-common/src/main/java/com/jd/blockchain/utils/RegionMap.java View File

@@ -1,122 +0,0 @@
package com.jd.blockchain.utils;

public abstract class RegionMap<K, V> implements VersioningMap<K, V> {

private K region;

private VersioningMap<K, V> dataMap;

public RegionMap(K region, VersioningMap<K, V> dataMap) {
this.region = region;
this.dataMap = dataMap;
}

@Override
public long setValue(K key, V value, long version) {
K dataKey = concatKey(region, key);
return dataMap.setValue(dataKey, value, version);
}

@Override
public V getValue(K key, long version) {
K dataKey = concatKey(region, key);
return dataMap.getValue(dataKey, version);
}

@Override
public V getValue(K key) {
K dataKey = concatKey(region, key);
return dataMap.getValue(dataKey);
}

@Override
public long getVersion(K key) {
K dataKey = concatKey(region, key);
return dataMap.getVersion(dataKey);
}

@Override
public VersioningKVEntry<K, V> getDataEntry(K key) {
K dataKey = concatKey(region, key);
VersioningKVEntry<K, V> entry = dataMap.getDataEntry(dataKey);
return new KVEntryWrapper<K, V>(key, entry);
}

@Override
public VersioningKVEntry<K, V> getDataEntry(K key, long version) {
K dataKey = concatKey(region, key);
VersioningKVEntry<K, V> entry = dataMap.getDataEntry(dataKey, version);
return new KVEntryWrapper<K, V>(key, entry);
}

/**
* 以指定的前缀组成新的key;
*
* @param prefix
* @param key
* @return
*/
protected abstract K concatKey(K prefix, K key);

public static <V> VersioningMap<Bytes, V> newRegion(Bytes region, VersioningMap<Bytes, V> dataMap) {
return new BytesKeyRegionMap<V>(region, dataMap);
}
public static <V> VersioningMap<String, V> newRegion(String region, VersioningMap<String, V> dataMap) {
return new StringKeyRegionMap<V>(region, dataMap);
}

private static class BytesKeyRegionMap<V> extends RegionMap<Bytes, V> {

public BytesKeyRegionMap(Bytes region, VersioningMap<Bytes, V> dataMap) {
super(region, dataMap);
}

@Override
protected Bytes concatKey(Bytes prefix, Bytes key) {
return prefix.concat(key);
}

}

private static class StringKeyRegionMap<V> extends RegionMap<String, V> {

public StringKeyRegionMap(String region, VersioningMap<String, V> dataMap) {
super(region, dataMap);
}

@Override
protected String concatKey(String prefix, String key) {
return prefix + key;
}

}

private static class KVEntryWrapper<K, V> implements VersioningKVEntry<K, V> {

private K key;

private VersioningKVEntry<K, V> entry;

public KVEntryWrapper(K key, VersioningKVEntry<K, V> entry) {
this.key = key;
this.entry = entry;
}

@Override
public K getKey() {
return key;
}

@Override
public long getVersion() {
return entry.getVersion();
}

@Override
public V getValue() {
return entry.getValue();
}

}
}

+ 0
- 17
source/utils/utils-common/src/main/java/com/jd/blockchain/utils/VersioningKVEntry.java View File

@@ -1,17 +0,0 @@
package com.jd.blockchain.utils;
/**
* 版本化的键值数据项;
*
* @author huanghaiquan
*
*/
public interface VersioningKVEntry<K, V>{
K getKey();
long getVersion();
V getValue();
}

Loading…
Cancel
Save