From 4c68195c7aa625f4ba229d3f0dc2a60ea73c708d Mon Sep 17 00:00:00 2001 From: "Yangkai.Shen" <237497819@qq.com> Date: Tue, 8 Jan 2019 10:27:59 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20spring-boot-demo-mq-kafka=20?= =?UTF-8?q?=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spring-boot-demo-mq-kafka/README.md | 265 ++++++++++++++++++++++++++++ 1 file changed, 265 insertions(+) diff --git a/spring-boot-demo-mq-kafka/README.md b/spring-boot-demo-mq-kafka/README.md index e69de29..0c28892 100644 --- a/spring-boot-demo-mq-kafka/README.md +++ b/spring-boot-demo-mq-kafka/README.md @@ -0,0 +1,265 @@ +# spring-boot-demo-mq-kafka + +> 本 demo 主要演示了 Spring Boot 如何集成 kafka,实现消息的发送和接收。 + +## 环境准备 + +> 注意:本 demo 基于 Spring Boot 2.1.0.RELEASE 版本,因此 spring-kafka 的版本为 2.2.0.RELEASE,kafka-clients 的版本为2.0.0,所以 kafka 的版本选用为 kafka_2.11-2.1.0 + +创建一个名为 `test` 的Topic + +```bash +./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test +``` + +## pom.xml + +```xml + + + 4.0.0 + + spring-boot-demo-mq-kafka + 1.0.0-SNAPSHOT + jar + + spring-boot-demo-mq-kafka + Demo project for Spring Boot + + + com.xkcoding + spring-boot-demo + 1.0.0-SNAPSHOT + + + + UTF-8 + UTF-8 + 1.8 + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.kafka + spring-kafka + + + + org.springframework.boot + spring-boot-starter-test + test + + + + org.projectlombok + lombok + true + + + + cn.hutool + hutool-all + + + + com.google.guava + guava + + + + + spring-boot-demo-mq-kafka + + + org.springframework.boot + spring-boot-maven-plugin + + + + + +``` + +## application.yml + +```yaml +server: + port: 8080 + servlet: + context-path: /demo +spring: + kafka: + bootstrap-servers: localhost:9092 + producer: + retries: 0 + batch-size: 16384 + buffer-memory: 33554432 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + consumer: + group-id: spring-boot-demo + # 手动提交 + enable-auto-commit: false + auto-offset-reset: latest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + properties: + session.timeout.ms: 60000 + listener: + log-container-config: false + concurrency: 5 + # 手动提交 + ack-mode: manual_immediate +``` + +## KafkaConfig.java + +```java +/** + *

+ * kafka配置类 + *

+ * + * @package: com.xkcoding.mq.kafka.config + * @description: kafka配置类 + * @author: yangkai.shen + * @date: Created in 2019-01-07 14:49 + * @copyright: Copyright (c) 2019 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Configuration +@EnableConfigurationProperties({KafkaProperties.class}) +@EnableKafka +@AllArgsConstructor +public class KafkaConfig { + private final KafkaProperties kafkaProperties; + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM); + factory.setBatchListener(true); + factory.getContainerProperties().setPollTimeout(3000); + return factory; + } + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()); + } + + @Bean("ackContainerFactory") + public ConcurrentKafkaListenerContainerFactory ackContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM); + return factory; + } + +} +``` + +## MessageHandler.java + +```java +/** + *

+ * 消息处理器 + *

+ * + * @package: com.xkcoding.mq.kafka.handler + * @description: 消息处理器 + * @author: yangkai.shen + * @date: Created in 2019-01-07 14:58 + * @copyright: Copyright (c) 2019 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Component +@Slf4j +public class MessageHandler { + + @KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "ackContainerFactory") + public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) { + try { + String message = (String) record.value(); + log.info("收到消息: {}", message); + } catch (Exception e) { + log.error(e.getMessage(), e); + } finally { + // 手动提交 offset + acknowledgment.acknowledge(); + } + } +} +``` + +## SpringBootDemoMqKafkaApplicationTests.java + +```java +@RunWith(SpringRunner.class) +@SpringBootTest +public class SpringBootDemoMqKafkaApplicationTests { + @Autowired + private KafkaTemplate kafkaTemplate; + + /** + * 测试发送消息 + */ + @Test + public void testSend() { + kafkaTemplate.send(KafkaConsts.TOPIC_TEST, "hello,kafka..."); + } + +} +``` + +## 参考 + +1. Spring Boot 版本和 Spring-Kafka 的版本对应关系:https://spring.io/projects/spring-kafka + + | Spring for Apache Kafka Version | Spring Integration for Apache Kafka Version | kafka-clients | + | ------------------------------- | ------------------------------------------- | ------------------- | + | 2.2.x | 3.1.x | 2.0.0, 2.1.0 | + | 2.1.x | 3.0.x | 1.0.x, 1.1.x, 2.0.0 | + | 2.0.x | 3.0.x | 0.11.0.x, 1.0.x | + | 1.3.x | 2.3.x | 0.11.0.x, 1.0.x | + | 1.2.x | 2.2.x | 0.10.2.x | + | 1.1.x | 2.1.x | 0.10.0.x, 0.10.1.x | + | 1.0.x | 2.0.x | 0.9.x.x | + | N/A* | 1.3.x | 0.8.2.2 | + + > **IMPORTANT:** This matrix is client compatibility; in most cases (since 0.10.2.0) newer clients can communicate with older brokers. All users with brokers >= 0.10.x.x **(and all spring boot 1.5.x users)** are recommended to use spring-kafka version 1.3.x or higher due to its simpler threading model thanks to [KIP-62](https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread). For a complete discussion about client/broker compatibility, see the Kafka [Compatibility Matrix](https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix) + > + > - Spring Integration Kafka versions prior to 2.0 pre-dated the Spring for Apache Kafka project and therefore were not based on it. + > + > These versions will be referenced transitively when using maven or gradle for version management. For the 1.1.x version, the 0.10.1.x is the default. + > + > 2.1.x uses the 1.1.x kafka-clients by default. When overriding the kafka-clients for 2.1.x see [the documentation appendix](https://docs.spring.io/spring-kafka/docs/2.1.x/reference/html/deps-for-11x.html). + > + > 2.2.x uses the 2.0.x kafka-clients by default. When overriding the kafka-clients for 2.2.x see [the documentation appendix](https://docs.spring.io/spring-kafka/docs/2.2.1.BUILD-SNAPSHOT/reference/html/deps-for-21x.html). + > + > - Spring Boot 1.5 users should use 1.3.x (Boot dependency management will use 1.1.x by default so this should be overridden). + > - Spring Boot 2.0 users should use 2.0.x (Boot dependency management will use the correct version). + > - Spring Boot 2.1 users should use 2.2.x (Boot dependency management will use the correct version). + +2. Spring-Kafka 官方文档:https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/