diff --git a/spring-boot-demo-mq-kafka/pom.xml b/spring-boot-demo-mq-kafka/pom.xml index f082901..6e0cb55 100644 --- a/spring-boot-demo-mq-kafka/pom.xml +++ b/spring-boot-demo-mq-kafka/pom.xml @@ -38,6 +38,22 @@ spring-boot-starter-test test + + + org.projectlombok + lombok + true + + + + cn.hutool + hutool-all + + + + com.google.guava + guava + diff --git a/spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/SpringBootDemoMqKafkaApplication.java b/spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/SpringBootDemoMqKafkaApplication.java index 286d222..cbbee84 100644 --- a/spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/SpringBootDemoMqKafkaApplication.java +++ b/spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/SpringBootDemoMqKafkaApplication.java @@ -3,6 +3,19 @@ package com.xkcoding.mq.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +/** + *

+ * 启动器 + *

+ * + * @package: com.xkcoding.mq.kafka + * @description: 启动器 + * @author: yangkai.shen + * @date: Created in 2019-01-07 14:43 + * @copyright: Copyright (c) 2019 + * @version: V1.0 + * @modified: yangkai.shen + */ @SpringBootApplication public class SpringBootDemoMqKafkaApplication { diff --git a/spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/config/KafkaConfig.java b/spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/config/KafkaConfig.java new file mode 100644 index 0000000..730cb07 --- /dev/null +++ b/spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/config/KafkaConfig.java @@ -0,0 +1,68 @@ +package com.xkcoding.mq.kafka.config; + +import com.xkcoding.mq.kafka.constants.KafkaConsts; +import lombok.AllArgsConstructor; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.*; +import org.springframework.kafka.listener.ContainerProperties; + +/** + *

+ * kafka配置类 + *

+ * + * @package: com.xkcoding.mq.kafka.config + * @description: kafka配置类 + * @author: yangkai.shen + * @date: Created in 2019-01-07 14:49 + * @copyright: Copyright (c) 2019 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Configuration +@EnableConfigurationProperties({KafkaProperties.class}) +@EnableKafka +@AllArgsConstructor +public class KafkaConfig { + private final KafkaProperties kafkaProperties; + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM); + factory.setBatchListener(true); + factory.getContainerProperties().setPollTimeout(3000); + return factory; + } + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()); + } + + @Bean("ackContainerFactory") + public ConcurrentKafkaListenerContainerFactory ackContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM); + return factory; + } + +} diff --git a/spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/constants/KafkaConsts.java b/spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/constants/KafkaConsts.java new file mode 100644 index 0000000..48518d7 --- /dev/null +++ b/spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/constants/KafkaConsts.java @@ -0,0 +1,26 @@ +package com.xkcoding.mq.kafka.constants; + +/** + *

+ * kafka 常量池 + *

+ * + * @package: com.xkcoding.mq.kafka.constants + * @description: kafka 常量池 + * @author: yangkai.shen + * @date: Created in 2019-01-07 14:52 + * @copyright: Copyright (c) 2019 + * @version: V1.0 + * @modified: yangkai.shen + */ +public interface KafkaConsts { + /** + * 默认分区大小 + */ + Integer DEFAULT_PARTITION_NUM = 3; + + /** + * Topic 名称 + */ + String TOPIC_TEST = "test"; +} diff --git a/spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/handler/MessageHandler.java b/spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/handler/MessageHandler.java new file mode 100644 index 0000000..a55552e --- /dev/null +++ b/spring-boot-demo-mq-kafka/src/main/java/com/xkcoding/mq/kafka/handler/MessageHandler.java @@ -0,0 +1,39 @@ +package com.xkcoding.mq.kafka.handler; + +import com.xkcoding.mq.kafka.constants.KafkaConsts; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; + +/** + *

+ * 消息处理器 + *

+ * + * @package: com.xkcoding.mq.kafka.handler + * @description: 消息处理器 + * @author: yangkai.shen + * @date: Created in 2019-01-07 14:58 + * @copyright: Copyright (c) 2019 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Component +@Slf4j +public class MessageHandler { + + @KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "ackContainerFactory") + public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) { + try { + String message = (String) record.value(); + log.info("收到消息: {}", message); + } catch (Exception e) { + log.error(e.getMessage(), e); + } finally { + // 手动提交 offset + acknowledgment.acknowledge(); + } + } +} diff --git a/spring-boot-demo-mq-kafka/src/main/resources/application.properties b/spring-boot-demo-mq-kafka/src/main/resources/application.properties deleted file mode 100644 index e69de29..0000000 diff --git a/spring-boot-demo-mq-kafka/src/main/resources/application.yml b/spring-boot-demo-mq-kafka/src/main/resources/application.yml new file mode 100644 index 0000000..ffc54e7 --- /dev/null +++ b/spring-boot-demo-mq-kafka/src/main/resources/application.yml @@ -0,0 +1,27 @@ +server: + port: 8080 + servlet: + context-path: /demo +spring: + kafka: + bootstrap-servers: localhost:9092 + producer: + retries: 0 + batch-size: 16384 + buffer-memory: 33554432 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + consumer: + group-id: spring-boot-demo + # 手动提交 + enable-auto-commit: false + auto-offset-reset: latest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + properties: + session.timeout.ms: 60000 + listener: + log-container-config: false + concurrency: 5 + # 手动提交 + ack-mode: manual_immediate diff --git a/spring-boot-demo-mq-kafka/src/test/java/com/xkcoding/mq/kafka/SpringBootDemoMqKafkaApplicationTests.java b/spring-boot-demo-mq-kafka/src/test/java/com/xkcoding/mq/kafka/SpringBootDemoMqKafkaApplicationTests.java index 1aee5a0..5b4bf60 100644 --- a/spring-boot-demo-mq-kafka/src/test/java/com/xkcoding/mq/kafka/SpringBootDemoMqKafkaApplicationTests.java +++ b/spring-boot-demo-mq-kafka/src/test/java/com/xkcoding/mq/kafka/SpringBootDemoMqKafkaApplicationTests.java @@ -1,16 +1,25 @@ package com.xkcoding.mq.kafka; +import com.xkcoding.mq.kafka.constants.KafkaConsts; import org.junit.Test; import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringBootDemoMqKafkaApplicationTests { + @Autowired + private KafkaTemplate kafkaTemplate; + /** + * 测试发送消息 + */ @Test - public void contextLoads() { + public void testSend() { + kafkaTemplate.send(KafkaConsts.TOPIC_TEST, "hello,kafka..."); } }