Browse Source

Add tikv-client module and modify dbconnection to auto check way!

tags/1.0.0
shaozhuguang 6 years ago
parent
commit
1689a50ced
12 changed files with 574 additions and 0 deletions
  1. +1
    -0
      source/pom.xml
  2. +21
    -0
      source/stp/pom.xml
  3. +35
    -0
      source/stp/stp-communication/pom.xml
  4. +21
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageHandler.java
  5. +44
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteNode.java
  6. +94
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java
  7. +98
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSessionManager.java
  8. +27
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/inner/Receiver.java
  9. +35
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/inner/Sender.java
  10. +21
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/LoadMessage.java
  11. +46
    -0
      source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/TransferMessage.java
  12. +131
    -0
      source/stp/stp-communication/src/test/java/com/jd/blockchain/StpTest.java

+ 1
- 0
source/pom.xml View File

@@ -36,6 +36,7 @@
<module>tools</module>
<module>test</module>
<module>deployment</module>
<module>stp</module>
</modules>

<properties>


+ 21
- 0
source/stp/pom.xml View File

@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>jdchain-root</artifactId>
<groupId>com.jd.blockchain</groupId>
<version>0.9.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>stp</artifactId>
<packaging>pom</packaging>
<modules>
<module>stp-communication</module>
</modules>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

+ 35
- 0
source/stp/stp-communication/pom.xml View File

@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>stp</artifactId>
<groupId>com.jd.blockchain</groupId>
<version>0.9.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>stp-communication</artifactId>

<name>stp-communication</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.29.Final</version>
</dependency>
</dependencies>
</project>

+ 21
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/MessageHandler.java View File

@@ -0,0 +1,21 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.MessageHandler
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午10:59
* Description:
*/
package com.jd.blockchain.stp.communication;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public interface MessageHandler {

void receive(byte[] key, byte[] data, RemoteSession session);
}

+ 44
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteNode.java View File

@@ -0,0 +1,44 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.RemoteNode
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 下午3:40
* Description:
*/
package com.jd.blockchain.stp.communication;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class RemoteNode {

private int port;

private String hostName;

public RemoteNode(String hostName, int port) {
this.port = port;
this.hostName = hostName;
}

public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}

public String getHostName() {
return hostName;
}

public void setHostName(String hostName) {
this.hostName = hostName;
}
}

+ 94
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSession.java View File

@@ -0,0 +1,94 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.RemoteSession
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午11:15
* Description:
*/
package com.jd.blockchain.stp.communication;

import com.jd.blockchain.stp.communication.inner.Receiver;
import com.jd.blockchain.stp.communication.inner.Sender;
import com.jd.blockchain.stp.communication.message.LoadMessage;

import java.net.InetAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class RemoteSession {

private String id;

private RemoteNode remoteNode;

private Sender sender;

private Receiver receiver;

private MessageHandler messageHandler;

public void initHandler(MessageHandler messageHandler) {

}

public void connect() {

}

public byte[] send(LoadMessage loadMessage) {

return null;
}

public Future<byte[]> asyncSend(LoadMessage loadMessage) {
return null;
}

public Future<byte[]> asyncSend(LoadMessage loadMessage, CountDownLatch countDownLatch) {
return null;
}

public byte[] send(byte[] loadMessage) {

return null;
}

public Future<byte[]> asyncSend(byte[] loadMessage) {

return null;
}

public Future<byte[]> asyncSend(byte[] loadMessage, CountDownLatch countDownLatch) {

return null;
}

public void reply(byte[] key, LoadMessage loadMessage) {

}

public void asyncReply(byte[] key, LoadMessage loadMessage) {

}

public void reply(byte[] key, byte[] loadMessage) {

}

public void asyncReply(byte[] key, byte[] loadMessage) {

}

public void close() {

}
}

+ 98
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/RemoteSessionManager.java View File

@@ -0,0 +1,98 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.RemoteSessionManager
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午11:22
* Description:
*/
package com.jd.blockchain.stp.communication;

import com.jd.blockchain.stp.communication.inner.Receiver;

import java.net.InetAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class RemoteSessionManager {

private static Map<Integer, Receiver> receiverMap = new ConcurrentHashMap<>();

private static final Lock lock = new ReentrantLock();

public RemoteSessionManager(int listenPort) {
if (listenPort <= 0) {
throw new IllegalArgumentException("Illegal port, please check !!!");
}

if (!receiverMap.containsKey(listenPort)) {
try {
lock.lock();
if (!receiverMap.containsKey(listenPort)) {
Receiver receiver = initReceiver(listenPort);
receiverMap.put(listenPort, receiver);
}
} finally {
lock.unlock();
}
}
}

public RemoteSession newSession(RemoteNode remoteNode) {
return newSession(toRemoteId(remoteNode), remoteNode);
}

public RemoteSession newSession(String remoteId, RemoteNode remoteNode) {
return newSession(remoteId, remoteNode, null);
}

public RemoteSession newSession(RemoteNode remoteNode, MessageHandler messageHandler) {
return newSession(toRemoteId(remoteNode), remoteNode, messageHandler);
}

public RemoteSession newSession(String remoteId, RemoteNode remoteNode, MessageHandler messageHandler) {


return null;
}

public RemoteSession[] newSessions(RemoteNode[] remoteNodes) {


return null;
}

public RemoteSession[] newSessions(String[] remoteIds, RemoteNode[] remoteNodes) {

return null;
}

public RemoteSession[] newSessions(RemoteNode[] remoteNodes, MessageHandler messageHandler) {

return null;
}

public RemoteSession[] newSessions(String[] remoteIds, RemoteNode[] remoteNodes, MessageHandler messageHandler) {

return null;
}

public String toRemoteId(RemoteNode remoteNode) {

return null;
}

private Receiver initReceiver(int port) {

return null;
}
}

+ 27
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/inner/Receiver.java View File

@@ -0,0 +1,27 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.inner.Receiver
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午10:59
* Description:
*/
package com.jd.blockchain.stp.communication.inner;

import com.jd.blockchain.stp.communication.MessageHandler;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class Receiver {

private MessageHandler messageHandler;

public void listen() {

}
}

+ 35
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/inner/Sender.java View File

@@ -0,0 +1,35 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.inner.Sender
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午10:58
* Description:
*/
package com.jd.blockchain.stp.communication.inner;

import java.net.InetAddress;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class Sender {

private InetAddress inetAddress;

public void connect() {

}

public void send(String message) {

}

public void send(byte[] message) {

}
}

+ 21
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/LoadMessage.java View File

@@ -0,0 +1,21 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.message.LoadMessage
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午10:59
* Description:
*/
package com.jd.blockchain.stp.communication.message;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public interface LoadMessage {

byte[] toBytes();
}

+ 46
- 0
source/stp/stp-communication/src/main/java/com/jd/blockchain/stp/communication/message/TransferMessage.java View File

@@ -0,0 +1,46 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.stp.communication.message.TransferMessage
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 上午11:00
* Description:
*/
package com.jd.blockchain.stp.communication.message;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class TransferMessage {

private byte[] key;

private byte[] load;

public byte[] getKey() {
return key;
}

public void setKey(byte[] key) {
this.key = key;
}

public byte[] getLoad() {
return load;
}

public void setLoad(byte[] load) {
this.load = load;
}

public String toBase64() {



return null;
}
}

+ 131
- 0
source/stp/stp-communication/src/test/java/com/jd/blockchain/StpTest.java View File

@@ -0,0 +1,131 @@
/**
* Copyright: Copyright 2016-2020 JD.COM All Right Reserved
* FileName: com.jd.blockchain.StpTest
* Author: shaozhuguang
* Department: Y事业部
* Date: 2019/4/11 下午3:31
* Description:
*/
package com.jd.blockchain;

import com.jd.blockchain.stp.communication.MessageHandler;
import com.jd.blockchain.stp.communication.RemoteNode;
import com.jd.blockchain.stp.communication.RemoteSession;
import com.jd.blockchain.stp.communication.RemoteSessionManager;
import com.jd.blockchain.stp.communication.message.LoadMessage;
import org.junit.Before;
import org.junit.Test;

import java.net.InetAddress;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
*
* @author shaozhuguang
* @create 2019/4/11
* @since 1.0.0
*/

public class StpTest {

private int listenPort = 6000;

private int maxWaitTime = 2000;

private RemoteNode[] remoteNodes = new RemoteNode[3];

@Before
public void init() {
for (int i = 0; i < remoteNodes.length; i++) {
remoteNodes[i] = new RemoteNode("127.0.0.1", 6001 + i);
}
}


@Test
public void test() {
// 创建RemoteSessionManager对象
RemoteSessionManager sessionManager = new RemoteSessionManager(listenPort);

// 创建RemoteSession[]对象
RemoteSession[] remoteSessions = sessionManager.newSessions(remoteNodes);

// 设置MessageHandler并连接
for (RemoteSession remoteSession : remoteSessions) {

// 设置MessageHandler
remoteSession.initHandler(new StpMessageHandler());

// session连接
remoteSession.connect();
}

// 生成请求对象
LoadMessage loadMessage = new StpLoadMessage();

// 异步发送处理过程

CountDownLatch countDownLatch = new CountDownLatch(remoteSessions.length);

// 发送请求至remotes
LinkedList<Future<byte[]>> responses = new LinkedList<>();
for (RemoteSession remoteSession : remoteSessions) {
Future<byte[]> response = remoteSession.asyncSend(loadMessage, countDownLatch);
responses.addLast(response);
}

// 超时判断
try {
if (countDownLatch.await(maxWaitTime, TimeUnit.MILLISECONDS)) {
// 汇总异步消息结果
LinkedList<byte[]> receiveResponses = new LinkedList<>();
// 通过迭代器遍历链表
Iterator<Future<byte[]>> iterator = responses.iterator();
while (iterator.hasNext()) {
Future<byte[]> response = iterator.next();
// 判断是否已完成,对于没有完成的直接放弃(因为已经超时)
if (response.isDone()) {
receiveResponses.addLast(response.get());
}
}

//TODO 检查汇总后的应答消息,准备开始新一轮的请求




}
} catch (Exception e) {

}
}


public static class StpMessageHandler implements MessageHandler {

@Override
public void receive(byte[] key, byte[] data, RemoteSession session) {
// 作为Receiver接收到请求消息后需要发送应答
// 生成应答消息
LoadMessage replyLoadMessage = new StpLoadMessage();

// 发送应答消息(注意key必须保持一致)
// 异步发送应答消息可使用asyncReply
session.reply(key, replyLoadMessage);
}
}

public static class StpLoadMessage implements LoadMessage {

@Override
public byte[] toBytes() {
return new byte[0];
}
}
}

Loading…
Cancel
Save