消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
AMQP(Advanced Message Queue Protocol)定义了一种消息系统规范。这个规范描述了在一个分布式的系统中各个子系统如何通过消息交互。而RabbitMQ则是AMQP的一种基于erlang的实现。AMQP将分布式系统中各个子系统隔离开来,子系统之间不再有依赖。子系统仅依赖于消息。子系统不关心消息的发送者,也不关心消息的接受者。
- 当库存系统出现故障时,订单就会失败.
- 订单系统和库存系统高耦合.

订单系统: 用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统: 订阅下单的消息,获取下单消息,进行库操作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失

场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1. 串行的方式 2. 并行的方式
串行方式: 将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.




| version: "3.1" services: rabbitmq: image: daocloud.io/library/rabbitmq:management restart: always container_name: rabbitmq ports: - 5672:5672 - 15672:15672 volumes: - ./data:/var/lib/rabbitmq
| [root@192 ~]# cd /opt [root@192 opt]# mkdir docker_rabbitmq [root@192 opt]# cd docker_rabbitmq/ [root@192 docker_rabbitmq]# vim docker-compose.yml [root@192 docker_rabbitmq]# docker-compose up -d Creating network "docker_rabbitmq_default" with the default driver Pulling rabbitmq (daocloud.io/library/rabbitmq:management)... management: Pulling from library/rabbitmq 01bf7da0a88c: Pull complete f3b4a5f15c7a: Pull complete 57ffbe87baa1: Pull complete 5ef3ef76b1b5: Pull complete 82a3ce07c0eb: Pull complete 1da219d9bd70: Pull complete 446554ac749d: Pull complete 8e4c09e200e7: Pull complete 7a8620611ebf: Pull complete c70a2924b273: Pull complete 3b0b9e36b4e9: Pull complete 7619a9a42512: Pull complete 965a8e1f1b1c: Pull complete Digest: sha256:4cc2267788b21e0f34523b4f2d9b32ee1c2867bf2de75d572158d6115349658c Status: Downloaded newer image for daocloud.io/library/rabbitmq:management Creating rabbitmq ... done
浏览器访问:http://ip:15672 (注:ip指当前云服务器的地址,云服务器记得开放 15672 和 5672 端口)
4.1 官方的简单架构图
Publisher - 生产者:发布消息到RabbitMQ中的Exchange
Consumer - 消费者:监听RabbitMQ中的Queue中的消息
Exchange - 交换机:和生产者建立连接并接收生产者的消息
Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
Routes - 路由:交换机以什么样的策略将消息发布到Queue
简单架构图 |
4.2 RabbitMQ的完整架构图
完整架构图 |
4.3 RabbitMQ 通讯方式
4.4 Hello-World案例演示
- 导入依赖
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.15.0</version> </dependency>
- 创建生产者 Publisher
| package com.couture.rabbitmq;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class Publisher { public static void main(String[] args) throws Exception{ System.out.println("Publisher..."); ConnectionFactory factory = new ConnectionFactory(); factory.setHost(""); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("helloworldQueue",false,false,false,null); channel.basicPublish("","helloworldQueue",null,"helloworld".getBytes()); channel.close(); connection.close(); } }
- 创建消费者 Consumer
| package com.couture.rabbitmq;
import com.rabbitmq.client.*; import java.io.IOException;
public class Consumer { public static void main(String[] args)throws Exception { System.out.println("Consumer..."); ConnectionFactory factory = new ConnectionFactory(); factory.setHost(""); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("来自生产者的消息:"+new String(body)); } };
channel.basicConsume("helloworldQueue",true,defaultConsumer); channel.close(); connection.close(); } }
4.5 基本原理
- Broker:提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输
- ConnectionFactory:与RabbitMQ服务器连接的管理器
- Connection:与RabbitMQ服务器的TCP连接
- Channel:与Exchange的连接,一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,为了多路复用。RabbitMQ建议客户端线程之间不要共用Channel,但是建议尽量共用Connection。
- Queue:消息的载体,每个消息都会被投到一个或多个队列。
- Exchange:接受消息生产者的消息,并根据消息的RoutingKey和 Exchange绑定的BindingKey,以及Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。
- Message Queue:消息队列,用于存储还未被消费者消费的消息。
- Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。
- RoutingKey:由Producer发送Message时指定,指定当前消息被谁接受
- BindingKey:由Consumer在Binding Exchange与Message Queue时指定,指定当前Exchange下,什么样的RoutingKey会被下派到当前绑定的Queue中
- Binding:联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。
- Server: 接受客户端连接,实现AMQP消息队列和路由功能的进程。
- Virtual Host:其实是一个虚拟概念,类似于权限控制组,可以通过命令分配给用户Virtual Host的权限,默认的guest用户是管理员权限,初始空间有/,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host
5.1 导入依赖
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
5.2 在application.properties中增加配置
| spring.rabbitmq.host= spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
5.3 Hello-World 简单队列
结构图 |
| package com.couture.simple; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class SimpleQueueConfig {
@Bean public Queue simple(){ return new Queue("simpleQueue"); } }
| package com.couture.simple; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class SimpleQueueProducer {
@Autowired private RabbitTemplate rabbitTemplate;
public void send(){ System.out.println("SimpleQueueProducer"); rabbitTemplate.convertAndSend("simpleQueue","简单模式"); } }
| package com.couture.simple; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class SimpleQueueCustomer { @RabbitListener(queues="simpleQueue") public void receive(String content){ System.out.println("SimpleQueueCustomer"); System.out.println("来SimpleQueueProducer的信息:"+content); } }
| package com.couture; import com.couture.simple.SimpleQueueProducer; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest class Rabbitmq01ApplicationTests { @Test void contextLoads() { } @Autowired private SimpleQueueProducer simpleQueueProducer;
@Test public void testSimpleQueueProducer(){ simpleQueueProducer.send(); } }
如果传递的是 JavaBean 对象,该实体类需要实现序列化接口,具体流程如下:
- 导入lombok依赖,创建User类
| package com.couture.pojo;
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data @AllArgsConstructor @NoArgsConstructor public class User implements Serializable { private String username; private String password; }
- 修改生产者中的代码
| package com.couture.simple;
import com.couture.pojo.User; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class SimplePublisher {
@Autowired private RabbitTemplate rabbitTemplate;
public void send(){ System.out.println("SimplePublisher..."); rabbitTemplate.convertAndSend("","simpleQueue",new User("张三","123")); } }
- 修改消费者中的代码
| package com.couture.simple;
import com.couture.pojo.User; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class SimpleConsumer {
@RabbitListener(queues = "simpleQueue") public void receive(User user){ System.out.println("SimpleConsumer..."); System.out.println("来自SimplePublisher的消息:"+user); } }
- 运行测试类即可!
5.4 Work 工作队列
结构图 |
| package com.couture.work;
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class WorkQueueConfig { @Bean public Queue work(){ return new Queue("workQueue"); } }
| package com.couture.work;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class WorkQueueProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(){ System.out.println("WorkQueueProducer"); rabbitTemplate.convertAndSend("workQueue","工作队列模式"); } }
| package com.couture.work;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class WorkQueueCustomer_01 { @RabbitListener(queues="workQueue") public void receive(String content){ System.out.println("WorkQueueCustomer_01:"+content); } }
| package com.couture.work;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class WorkQueueCustomer_02 {
@RabbitListener(queues="workQueue") public void receive(String content){ System.out.println("WorkQueueCustomer_02:"+content); } }
| @Autowired private WorkQueueProducer workQueueProducer;
@Test public void testWorkQueueProducer(){
for (int i = 0; i<100; i++){ workQueueProducer.send(); } }
5.5 Publish/Subscribe 发布订阅模式
结构图 |
| package com.couture.fanout;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class FanoutConfig {
@Bean public Queue fanoutQueue1(){ return new Queue("fanoutQueue1"); }
@Bean public Queue fanoutQueue2(){ return new Queue("fanoutQueue2"); }
@Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); }
@Bean public Binding bindingFanoutQueue1(){ return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); }
@Bean public Binding bindingFanoutQueue2(){ return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); } }
| package com.couture.fanout;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class FanoutProducer {
@Autowired private RabbitTemplate rabbitTemplate;
public void send(){ System.out.println("FanoutProducer"); rabbitTemplate.convertAndSend("fanoutExchange","","发布/订阅"); } }
| package com.couture.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class FanoutCustomer_01 {
@RabbitListener(queues = "fanoutQueue1") public void receive(String content){ System.out.println("FanoutCustomer_01:"+content); } }
| package com.couture.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class FanoutCustomer_02 {
@RabbitListener(queues = "fanoutQueue2") public void receive(String content){ System.out.println("FanoutCustomer_02:"+content); } }
| @Autowired private FanoutProducer fanoutProducer;
@Test public void testFanoutProducer(){ fanoutProducer.send(); }
5.6 Routing 路由模式
结构图 |
| package com.couture.direct;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class DirectConfig {
@Bean public Queue directQueue1(){ return new Queue("directQueue1"); }
@Bean public Queue directQueue2(){ return new Queue("directQueue2"); }
@Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange"); }
@Bean public Binding bingDirectQueue1(){ return BindingBuilder.bind(directQueue1()).to(directExchange()).with("zhangsan"); }
@Bean public Binding bingDirectQueue2(){ return BindingBuilder.bind(directQueue2()).to(directExchange()).with("lisi"); }
| package com.couture.direct;
import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class DirectProducer {
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private DirectExchange directExchange;
public void send(){ System.out.println("DirectProducer"); rabbitTemplate.convertAndSend(directExchange.getName(),"zhangsan","zhangsanContent"); rabbitTemplate.convertAndSend(directExchange.getName(),"lisi","lisiContent"); } }
| package com.couture.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class DirectCustomer_01 {
@RabbitListener(queues = "directQueue1") public void receive(String content){ System.out.println("DirectCustomer_01:"+content); } }
| package com.couture.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class DirectCustomer_02 {
@RabbitListener(queues = "directQueue2") public void receive(String content){ System.out.println("DirectCustomer_02:"+content); } }
| @Autowired private DirectProducer directProducer;
@Test public void testDirectProducer(){ directProducer.send(); }
5.7 Topic 主题模式
结构图 |
| package com.couture.topic;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class TopicConfig {
@Bean public Queue topicQueue1(){ return new Queue("topicQueue1"); }
@Bean public Queue topicQueue2(){ return new Queue("topicQueue2"); }
@Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); }
@Bean public Binding bingTopicQueue1(){ return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("wangwu.*"); }
@Bean public Binding bingTopicQueue2(){ return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("zhaoliu.#"); }
| package com.couture.topic;
import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class TopicProducer {
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private TopicExchange topicExchange;
public void send(){ System.out.println("TopicProducer"); rabbitTemplate.convertAndSend(topicExchange.getName(),"wangwu.abc","wangwuContent"); rabbitTemplate.convertAndSend(topicExchange.getName(),"zhaoliu.xyz.qwer","zhaoliuContent"); } }
| package com.couture.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class TopicCustomer_01 {
@RabbitListener(queues = "topicQueue1") public void receive(String content){ System.out.println("TopicCustomer_01:"+content); } }
| package com.couture.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class TopicCustomer_02 {
@RabbitListener(queues = "topicQueue2") public void receive(String content){ System.out.println("TopicCustomer_02:"+content); } }
| @Autowired private TopicProducer topicProducer;
@Test public void testTopicProducer(){ topicProducer.send(); }
RabbitMQ中的Ack: 主要是确认消息被消费者消费完成后通知服务器将队列里面的消息清除,spring-boot-data-amqp 是自动ACK机制,就意味着 MQ 会在消息发送完毕后,自动帮我们去ACK,然后删除队列中的消息,这样会存在一些问题:如果消费者处理消息需要较长时间,或者在消费消息的时候出现异常,都会出现问题,手动Ack可以避免消息重复消费。
5.8.1 原生方式测试
| package com.couture.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { System.out.println("消费者启动...");
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost(""); factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { System.out.println(new String(body, "UTF-8")); int i = 1/0; channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(envelope.getDeliveryTag(), false, false); e.printStackTrace(); connection.close(); } } };
} }
5.8.2 SpringBoot中测试
1.在 application.properties 中添加配置
| spring.rabbitmq.listener.simple.acknowledge-mode=manual
2.在之前测试的任意模式中添加 AckCustomer 演示
| package com.couture.simple;
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component @RabbitListener(queues = "simpleQueue") public class AckCustomer {
@RabbitHandler public void receive(String message,Channel channel,Message msg){ System.out.println("AckCustomer..."); if(message!=null && message.length()>0){ try { System.out.println("获取消息:"+message); int i = 1/0; long deliveryTag = msg.getMessageProperties().getDeliveryTag(); System.out.println("deliveryTag:"+deliveryTag); channel.basicAck(deliveryTag,false); } catch (Exception e) { System.out.println("消息处理..."); try { channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false); e.printStackTrace(); channel.getConnection().close(); } catch (Exception ex) { ex.printStackTrace(); } }
}else{ System.out.println("没有消息"); } } }
6.1 消息的可靠性

6.2 RabbitMQ事务
| channel.txSelect();
channel.txCommit(); channel.txRollback();
6.3 Confirm机制
6.3.1 创建工具类
| package com.couture.utils;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException;
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
static { connectionFactory = new ConnectionFactory(); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setHost(""); }
public static Connection getConnection() { Connection connection = null; try { connection = connectionFactory.newConnection(); } catch (IOException ioException) { ioException.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return connection; }
public static void close(Channel channel, Connection connection) { try { if(null != channel) { channel.close(); } if(null != connection){ connection.close(); } }catch (Exception ex) { ex.printStackTrace(); } } }
6.3.2 单条消息确认
| channel.confirmSelect();
channel.waitForConfirms(); //对于单条消息的确认,true表示成功
| package com.couture.confirm;
import com.couture.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class ComfirmTest1 { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.confirmSelect(); channel.basicPublish("myExchange","my",null,"消息内容".getBytes());
if(channel.waitForConfirms()){ System.out.println("消息已到达交换机"); }
RabbitMQUtils.close(channel,connection); } }
6.3.2 批量消息确认
channel.waitForConfirmsOrDie(); //批量消息确认,如果有一条消息没有发送成功,会抛出异常
| package com.couture.confirm;
import com.couture.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class ComfirmTest2 { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.confirmSelect(); for (int i = 0; i < 10; i++) { if(i == 5){ channel.basicPublish("myExchange2", "my", null, ("消息内容"+ i).getBytes()); continue; } channel.basicPublish("myExchange", "my", null, ("消息内容"+ i).getBytes()); }
RabbitMQUtils.close(channel,connection); } }
6.3.3 回调方式确认
| package com.couture.confirm;
import com.couture.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection;
import java.io.IOException;
public class ComfirmTest3 { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("成功达到交换机"); }
@Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("没有到达交换机"); } });
RabbitMQUtils.close(channel,connection); } }
6.4 Return机制

| package com.couture.confirm;
import com.couture.utils.RabbitMQUtils; import com.rabbitmq.client.*;
import java.io.IOException;
public class ReturnTest { public static void main(String[] args) throws Exception { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("成功达到交换机"); }
@Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("没有到达交换机"); } });
channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("没有到达队列"); } });
RabbitMQUtils.close(channel,connection); } }
6.5 SpringBoot实现
1.在 application.properties 中添加配置
spring.rabbitmq.publisher-confirm-type 对应值的说明
- NONE :禁用发布确认模式,是默认值
- CORRELATED:发布消息成功到交换器后会触发回调方法
- SIMPLE:两种效果
- 和CORRELATED值一样会触发回调方法
- 在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker
| spring.rabbitmq.publisher-confirm-type: simple spring.rabbitmq.publisher-returns: true
| package com.couture.direct;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void initMethod(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); }
@Override public void confirm(CorrelationData correlationData, boolean ack, String s) { if(ack){ System.out.println("到达交换机"); }else{ System.out.println("没有到达交换机"); } }
@Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("没有到达队列"); } }
七. 死信队列
7.1 场景
7.2 测试
| package com.couture.dead;
import com.couture.utils.RabbitMQUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.util.HashMap; import java.util.Map;
public class DeadQueueTest {
private static final String dead_letter_exchange = "dead_letter_exchange"; private static final String dead_letter_routing_key = "dead_letter_routing_key"; private static final String dead_letter_queue = "dead_letter_queue";
private static final String people_exchange = "people_exchange"; private static final String people_routing_key = "people_routing_key"; private static final String people_queue = "people_queue";
public static void main(String[] args) throws Exception{ Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(dead_letter_exchange, "direct"); channel.queueDeclare(dead_letter_queue, true, false, false, null); channel.queueBind(dead_letter_queue, dead_letter_exchange, dead_letter_routing_key);
Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", dead_letter_exchange); arguments.put("x-dead-letter-routing-key", dead_letter_routing_key);
channel.exchangeDeclare(people_exchange, "direct"); channel.queueDeclare(people_queue, true, false, false, arguments); channel.queueBind(people_queue, people_exchange, people_routing_key);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("15000").build();
channel.basicPublish(people_exchange, people_routing_key, properties, "dead_message".getBytes());
RabbitMQUtils.close(channel, connection); } }
八. 避免消息重复消费
8.1 幂等性

8.2 解决方案
8.3 在springboot中测试
8.3.1 导入依赖
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
8.3.2 配置application.properties
| spring.rabbitmq.host= spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
spring.redis.host= spring.redis.port=6379
8.3.3 这里以简单模式演示
| package com.couture.simple;
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class SimpleQueueConfig {
@Bean public Queue simpleQueue(){ return new Queue("simpleQueue"); }
@Bean public MessagePostProcessor correlationIdProcessor() { MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message, Correlation correlation) { MessageProperties messageProperties = message.getMessageProperties(); if (correlation instanceof CorrelationData) { String correlationId = ((CorrelationData) correlation).getId(); messageProperties.setCorrelationId(correlationId); } return message; }
@Override public Message postProcessMessage(Message message) throws AmqpException { return message; } }; return messagePostProcessor; } }
| package com.couture.simple;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.util.UUID;
@Component public class SimpleProducer {
@Autowired private MessagePostProcessor messagePostProcessor;
@Autowired private RabbitTemplate rabbitTemplate;
public void send(){ CorrelationData correlationData = new CorrelationData(); System.out.println(correlationData.getId()); rabbitTemplate.convertAndSend("","simpleQueue","简单队列",messagePostProcessor,correlationData); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component;
import java.io.IOException; import java.util.concurrent.TimeUnit;
@RabbitListener(queues = "simpleQueue") public class AckCustomer {
@Autowired private StringRedisTemplate redisTemplate;
@RabbitHandler public void getMessage(String msg, Channel channel, Message message) throws IOException { String messageId = message.getMessageProperties().getCorrelationId(); System.out.println(messageId); Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(messageId, "0", 600, TimeUnit.SECONDS); if(aBoolean) { System.out.println("正在处理消息...."); return; }
if("0".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){ System.out.println("接收到消息:" + msg); redisTemplate.opsForValue().set(messageId,"1",600, TimeUnit.SECONDS); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }else { if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
} }
| package com.couture;
import com.couture.simple.SimpleProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest class Springboot14RabbitmqApplicationTests {
@Test void contextLoads() { }
@Autowired private SimpleProducer simpleProducer;
@Test public void test(){ simpleProducer.send(); }