Browse Source

spring-boot-demo-mq-kafka 完成

pull/1/head
Yangkai.Shen 5 years ago
parent
commit
5394cfdb60
8 changed files with 199 additions and 1 deletions
  1. +16
    -0
      spring-boot-demo-mq-kafka/pom.xml
  2. +13
    -0
      spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/SpringBootDemoMqKafkaApplication.java
  3. +68
    -0
      spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/config/KafkaConfig.java
  4. +26
    -0
      spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/constants/KafkaConsts.java
  5. +39
    -0
      spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/handler/MessageHandler.java
  6. +0
    -0
      spring-boot-demo-mq-kafka/src/main/resources/application.properties
  7. +27
    -0
      spring-boot-demo-mq-kafka/src/main/resources/application.yml
  8. +10
    -1
      spring-boot-demo-mq-kafka/src/test/java/com/xkcoding/mq/kafka/SpringBootDemoMqKafkaApplicationTests.java

+ 16
- 0
spring-boot-demo-mq-kafka/pom.xml View File

@@ -38,6 +38,22 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>

<build>


+ 13
- 0
spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/SpringBootDemoMqKafkaApplication.java View File

@@ -3,6 +3,19 @@ package com.xkcoding.mq.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* <p>
* 启动器
* </p>
*
* @package: com.xkcoding.mq.kafka
* @description: 启动器
* @author: yangkai.shen
* @date: Created in 2019-01-07 14:43
* @copyright: Copyright (c) 2019
* @version: V1.0
* @modified: yangkai.shen
*/
@SpringBootApplication
public class SpringBootDemoMqKafkaApplication {



+ 68
- 0
spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/config/KafkaConfig.java View File

@@ -0,0 +1,68 @@
package com.xkcoding.mq.kafka.config;

import com.xkcoding.mq.kafka.constants.KafkaConsts;
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;

/**
* <p>
* kafka配置类
* </p>
*
* @package: com.xkcoding.mq.kafka.config
* @description: kafka配置类
* @author: yangkai.shen
* @date: Created in 2019-01-07 14:49
* @copyright: Copyright (c) 2019
* @version: V1.0
* @modified: yangkai.shen
*/
@Configuration
@EnableConfigurationProperties({KafkaProperties.class})
@EnableKafka
@AllArgsConstructor
public class KafkaConfig {
private final KafkaProperties kafkaProperties;

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}

@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
return factory;
}

}

+ 26
- 0
spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/constants/KafkaConsts.java View File

@@ -0,0 +1,26 @@
package com.xkcoding.mq.kafka.constants;

/**
* <p>
* kafka 常量池
* </p>
*
* @package: com.xkcoding.mq.kafka.constants
* @description: kafka 常量池
* @author: yangkai.shen
* @date: Created in 2019-01-07 14:52
* @copyright: Copyright (c) 2019
* @version: V1.0
* @modified: yangkai.shen
*/
public interface KafkaConsts {
/**
* 默认分区大小
*/
Integer DEFAULT_PARTITION_NUM = 3;

/**
* Topic 名称
*/
String TOPIC_TEST = "test";
}

+ 39
- 0
spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/handler/MessageHandler.java View File

@@ -0,0 +1,39 @@
package com.xkcoding.mq.kafka.handler;

import com.xkcoding.mq.kafka.constants.KafkaConsts;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
* <p>
* 消息处理器
* </p>
*
* @package: com.xkcoding.mq.kafka.handler
* @description: 消息处理器
* @author: yangkai.shen
* @date: Created in 2019-01-07 14:58
* @copyright: Copyright (c) 2019
* @version: V1.0
* @modified: yangkai.shen
*/
@Component
@Slf4j
public class MessageHandler {

@KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "ackContainerFactory")
public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
try {
String message = (String) record.value();
log.info("收到消息: {}", message);
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
// 手动提交 offset
acknowledgment.acknowledge();
}
}
}

+ 0
- 0
spring-boot-demo-mq-kafka/src/main/resources/application.properties View File


+ 27
- 0
spring-boot-demo-mq-kafka/src/main/resources/application.yml View File

@@ -0,0 +1,27 @@
server:
port: 8080
servlet:
context-path: /demo
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: spring-boot-demo
# 手动提交
enable-auto-commit: false
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session.timeout.ms: 60000
listener:
log-container-config: false
concurrency: 5
# 手动提交
ack-mode: manual_immediate

+ 10
- 1
spring-boot-demo-mq-kafka/src/test/java/com/xkcoding/mq/kafka/SpringBootDemoMqKafkaApplicationTests.java View File

@@ -1,16 +1,25 @@
package com.xkcoding.mq.kafka;

import com.xkcoding.mq.kafka.constants.KafkaConsts;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootDemoMqKafkaApplicationTests {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 测试发送消息
*/
@Test
public void contextLoads() {
public void testSend() {
kafkaTemplate.send(KafkaConsts.TOPIC_TEST, "hello,kafka...");
}

}


Loading…
Cancel
Save