diff --git a/spring-boot-demo-mq-rabbitmq/README.md b/spring-boot-demo-mq-rabbitmq/README.md index 50ee33a..7abc695 100644 --- a/spring-boot-demo-mq-rabbitmq/README.md +++ b/spring-boot-demo-mq-rabbitmq/README.md @@ -52,3 +52,423 @@ 11. 启动容器:`docker start rabbitmq:3.7.7-management` +## pom.xml + +```xml + + + 4.0.0 + + spring-boot-demo-mq-rabbitmq + 1.0.0-SNAPSHOT + jar + + spring-boot-demo-mq-rabbitmq + Demo project for Spring Boot + + + com.xkcoding + spring-boot-demo + 1.0.0-SNAPSHOT + + + + UTF-8 + UTF-8 + 1.8 + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.springframework.boot + spring-boot-starter-test + test + + + + org.projectlombok + lombok + true + + + + cn.hutool + hutool-all + + + + com.google.guava + guava + + + + + spring-boot-demo-mq-rabbitmq + + + org.springframework.boot + spring-boot-maven-plugin + + + + + +``` + +## application.yml + +```yaml +server: + port: 8080 + servlet: + context-path: /demo +spring: + rabbitmq: + host: localhost + port: 5672 + username: guest + password: guest + virtual-host: / + # 手动提交消息 + listener: + simple: + acknowledge-mode: manual + direct: + acknowledge-mode: manual +``` + +## RabbitConsts.java + +```java +/** + *

+ * RabbitMQ常量池 + *

+ * + * @package: com.xkcoding.mq.rabbitmq.constants + * @description: RabbitMQ常量池 + * @author: yangkai.shen + * @date: Created in 2018-12-29 17:08 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +public interface RabbitConsts { + /** + * 直接模式1 + */ + String DIRECT_MODE_QUEUE_ONE = "queue.direct.1"; + + /** + * 队列2 + */ + String QUEUE_TWO = "queue.2"; + + /** + * 队列3 + */ + String QUEUE_THREE = "3.queue"; + + /** + * 分列模式 + */ + String FANOUT_MODE_QUEUE = "fanout.mode"; + + /** + * 主题模式 + */ + String TOPIC_MODE_QUEUE = "topic.mode"; + + /** + * 路由1 + */ + String TOPIC_ROUTING_KEY_ONE = "queue.#"; + + /** + * 路由2 + */ + String TOPIC_ROUTING_KEY_TWO = "*.queue"; + + /** + * 路由3 + */ + String TOPIC_ROUTING_KEY_THREE = "3.queue"; + + /** + * 延迟队列 + */ + String DELAY_QUEUE = "delay.queue"; + + /** + * 延迟队列交换器 + */ + String DELAY_MODE_QUEUE = "delay.mode"; +} +``` + +## RabbitMqConfig.java + +> RoutingKey规则 +> +> - 路由格式必须以 `.` 分隔,比如 `user.email` 或者 `user.aaa.email` +> - 通配符 `*` ,代表一个占位符,或者说一个单词,比如路由为 `user.*`,那么 **`user.email`** 可以匹配,但是 *`user.aaa.email`* 就匹配不了 +> - 通配符 `#` ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 `user.#`,那么 **`user.email`** 可以匹配,**`user.aaa.email `** 也可以匹配 + +```java +/** + *

+ * RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类 + *

+ * + * @package: com.xkcoding.mq.rabbitmq.config + * @description: RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类 + * @author: yangkai.shen + * @date: Created in 2018-12-29 17:03 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Slf4j +@Configuration +public class RabbitMqConfig { + + @Bean + public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { + connectionFactory.setPublisherConfirms(true); + connectionFactory.setPublisherReturns(true); + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + rabbitTemplate.setMandatory(true); + rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause)); + rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message)); + return rabbitTemplate; + } + + /** + * 直接模式队列1 + */ + @Bean + public Queue directOneQueue() { + return new Queue(RabbitConsts.DIRECT_MODE_QUEUE_ONE); + } + + /** + * 队列2 + */ + @Bean + public Queue queueTwo() { + return new Queue(RabbitConsts.QUEUE_TWO); + } + + /** + * 队列3 + */ + @Bean + public Queue queueThree() { + return new Queue(RabbitConsts.QUEUE_THREE); + } + + /** + * 分列模式队列 + */ + @Bean + public FanoutExchange fanoutExchange() { + return new FanoutExchange(RabbitConsts.FANOUT_MODE_QUEUE); + } + + /** + * 分列模式绑定队列1 + * + * @param directOneQueue 绑定队列1 + * @param fanoutExchange 分列模式交换器 + */ + @Bean + public Binding fanoutBinding1(Queue directOneQueue, FanoutExchange fanoutExchange) { + return BindingBuilder.bind(directOneQueue).to(fanoutExchange); + } + + /** + * 分列模式绑定队列2 + * + * @param queueTwo 绑定队列2 + * @param fanoutExchange 分列模式交换器 + */ + @Bean + public Binding fanoutBinding2(Queue queueTwo, FanoutExchange fanoutExchange) { + return BindingBuilder.bind(queueTwo).to(fanoutExchange); + } + + /** + * 主题模式队列 + *
  • 路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email
  • + *
  • 通配符 * ,代表一个占位符,或者说一个单词,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了
  • + *
  • 通配符 # ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也可以匹配
  • + */ + @Bean + public TopicExchange topicExchange() { + return new TopicExchange(RabbitConsts.TOPIC_MODE_QUEUE); + } + + + /** + * 主题模式绑定分列模式 + * + * @param fanoutExchange 分列模式交换器 + * @param topicExchange 主题模式交换器 + */ + @Bean + public Binding topicBinding1(FanoutExchange fanoutExchange, TopicExchange topicExchange) { + return BindingBuilder.bind(fanoutExchange).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_ONE); + } + + /** + * 主题模式绑定队列2 + * + * @param queueTwo 队列2 + * @param topicExchange 主题模式交换器 + */ + @Bean + public Binding topicBinding2(Queue queueTwo, TopicExchange topicExchange) { + return BindingBuilder.bind(queueTwo).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_TWO); + } + + /** + * 主题模式绑定队列3 + * + * @param queueThree 队列3 + * @param topicExchange 主题模式交换器 + */ + @Bean + public Binding topicBinding3(Queue queueThree, TopicExchange topicExchange) { + return BindingBuilder.bind(queueThree).to(topicExchange).with(RabbitConsts.TOPIC_ROUTING_KEY_THREE); + } + + /** + * 延迟队列 + */ + @Bean + public Queue delayQueue() { + return new Queue(RabbitConsts.DELAY_QUEUE, true); + } + + /** + * 延迟队列交换器, x-delayed-type 和 x-delayed-message 固定 + */ + @Bean + public CustomExchange delayExchange() { + Map args = Maps.newHashMap(); + args.put("x-delayed-type", "direct"); + return new CustomExchange(RabbitConsts.DELAY_MODE_QUEUE, "x-delayed-message", true, false, args); + } + + /** + * 延迟队列绑定自定义交换器 + * + * @param delayQueue 队列 + * @param delayExchange 延迟交换器 + */ + @Bean + public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) { + return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConsts.DELAY_QUEUE).noargs(); + } + +} +``` + +## 消息处理器 + +> 只展示直接队列模式的消息处理,其余模式请看源码 +> +> 需要注意:如果 `spring.rabbitmq.listener.direct.acknowledge-mode: auto`,则会自动Ack,否则需要手动Ack + +### DirectQueueOneHandler.java + +```java +/** + *

    + * 直接队列1 处理器 + *

    + * + * @package: com.xkcoding.mq.rabbitmq.handler + * @description: 直接队列1 处理器 + * @author: yangkai.shen + * @date: Created in 2019-01-04 15:42 + * @copyright: Copyright (c) 2019 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Slf4j +@RabbitListener(queues = RabbitConsts.DIRECT_MODE_QUEUE_ONE) +@Component +public class DirectQueueOneHandler { + + /** + * 如果 spring.rabbitmq.listener.direct.acknowledge-mode: auto,则可以用这个方式,会自动ack + */ + // @RabbitHandler + public void directHandlerAutoAck(MessageStruct message) { + log.info("直接队列处理器,接收消息:{}", JSONUtil.toJsonStr(message)); + } + + @RabbitHandler + public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) { + // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 + final long deliveryTag = message.getMessageProperties().getDeliveryTag(); + try { + log.info("直接队列1,手动ACK,接收消息:{}", JSONUtil.toJsonStr(messageStruct)); + // 通知 MQ 消息已被成功消费,可以ACK了 + channel.basicAck(deliveryTag, false); + } catch (IOException e) { + try { + // 处理失败,重新压入MQ + channel.basicRecover(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } +} +``` + +## 运行效果 + +### 直接模式 + +![image-20190107103229408](assets/image-20190107103229408-6828349.png) + +### 分列模式 + +![image-20190107103258291](assets/image-20190107103258291-6828378.png) + +### 主题模式 + +#### RoutingKey:`queue.#` + +![image-20190107103358744](assets/image-20190107103358744-6828438.png) + +#### RoutingKey:`*.queue` + +![image-20190107103429430](assets/image-20190107103429430-6828469.png) + +#### RoutingKey:`3.queue` + +![image-20190107103451240](assets/image-20190107103451240-6828491.png) + +### 延迟队列 + +![image-20190107103509943](assets/image-20190107103509943-6828509.png) + +## 参考 + +1. Spring AMQP 官方文档:https://docs.spring.io/spring-amqp/docs/2.1.0.RELEASE/reference/html/ +2. RabbitMQ延迟队列:https://www.cnblogs.com/vipstone/p/9967649.html \ No newline at end of file diff --git a/spring-boot-demo-mq-rabbitmq/assets/image-20190107103229408-6828349.png b/spring-boot-demo-mq-rabbitmq/assets/image-20190107103229408-6828349.png new file mode 100644 index 0000000..c2c03c0 Binary files /dev/null and b/spring-boot-demo-mq-rabbitmq/assets/image-20190107103229408-6828349.png differ diff --git a/spring-boot-demo-mq-rabbitmq/assets/image-20190107103258291-6828378.png b/spring-boot-demo-mq-rabbitmq/assets/image-20190107103258291-6828378.png new file mode 100644 index 0000000..522f805 Binary files /dev/null and b/spring-boot-demo-mq-rabbitmq/assets/image-20190107103258291-6828378.png differ diff --git a/spring-boot-demo-mq-rabbitmq/assets/image-20190107103358744-6828438.png b/spring-boot-demo-mq-rabbitmq/assets/image-20190107103358744-6828438.png new file mode 100644 index 0000000..1214326 Binary files /dev/null and b/spring-boot-demo-mq-rabbitmq/assets/image-20190107103358744-6828438.png differ diff --git a/spring-boot-demo-mq-rabbitmq/assets/image-20190107103429430-6828469.png b/spring-boot-demo-mq-rabbitmq/assets/image-20190107103429430-6828469.png new file mode 100644 index 0000000..36a373a Binary files /dev/null and b/spring-boot-demo-mq-rabbitmq/assets/image-20190107103429430-6828469.png differ diff --git a/spring-boot-demo-mq-rabbitmq/assets/image-20190107103451240-6828491.png b/spring-boot-demo-mq-rabbitmq/assets/image-20190107103451240-6828491.png new file mode 100644 index 0000000..cca4eaa Binary files /dev/null and b/spring-boot-demo-mq-rabbitmq/assets/image-20190107103451240-6828491.png differ diff --git a/spring-boot-demo-mq-rabbitmq/assets/image-20190107103509943-6828509.png b/spring-boot-demo-mq-rabbitmq/assets/image-20190107103509943-6828509.png new file mode 100644 index 0000000..e4a11d5 Binary files /dev/null and b/spring-boot-demo-mq-rabbitmq/assets/image-20190107103509943-6828509.png differ