怎么保证消息一定能发送到 RabbitMQ?


面试考察点

  1. Confirm 模式:这是最基础的。知不知道 confirmSelect 怎么用,异步确认的机制是什么。

  2. 消息路由失败的处理:消息到了 Broker 但没找到队列怎么办。mandatory 参数和 ReturnListener 要会说。

  3. 兜底方案:单靠 RabbitMQ 自身机制够不够,知不知道本地消息表这种最终一致性方案。

核心答案

消息发送到 RabbitMQ 的可靠性,分两步保障:

第一步,确认消息到了 Broker。用 Confirm 模式(confirmSelect),Producer 发完消息后 Broker 异步回调 ACK,表示收到了。比事务模式性能高得多。

第二步,确认消息进了队列。消息到了 Broker 不等于进了队列,如果 Routing Key 没匹配上任何队列,消息就丢了。开 mandatory = true,路由失败的消息会通过 ReturnListener 回退给 Producer。

两个都开了,才能算 "消息一定到了队列里"。

深度解析

消息发送过程中可能丢在哪

消息从 Producer 出来到最终落地,有三个可能丢的环节:

  • 网络传输阶段:Producer 发消息的过程中网络断了,或者 Broker 挂了,消息根本没到 Broker。用 Confirm 模式解决,Broker 收到后会回调 ACK,没收到就不回调,Producer 知道要重发。
  • 路由阶段:消息到了 Exchange,但 Routing Key 没匹配到任何绑定队列,消息直接被 Broker 丢弃。开 mandatory = true,路由失败的消息会通过 ReturnListener 回退给 Producer。
  • 持久化阶段:消息进了 Queue 但还没落盘,Broker 就宕机了,重启后消息丢失。需要把 Exchange、Queue 都声明为 durable,消息发送时设置 deliveryMode = 2(持久化)。

Confirm 模式怎么配

Channel channel = connection.createChannel();

// 开启 Confirm 模式
channel.confirmSelect();

// 注册确认回调
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) {
        // Broker 确认收到消息
        log.info("消息确认成功,deliveryTag = {}", deliveryTag);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) {
        // Broker 未确认,需要重发
        log.warn("消息确认失败,deliveryTag = {}", deliveryTag);
        retrySend(deliveryTag);
    }
});

confirmSelect 一开,之后这个 Channel 上发的每条消息都会被分配一个递增的 deliveryTag。Broker 收到后异步回调 handleAck。如果 Broker 因为内部错误没法处理,回调 handleNack,你在里面做重发。

Spring Boot 项目更简单,配置文件加一行:

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 开启异步 Confirm

然后实现 RabbitTemplate.ConfirmCallback 接口就行。

mandatory 处理路由失败

Confirm 只管 "消息到了 Broker",不管 "消息有没有路由到队列"。

比如你往一个 direct Exchange 发消息,Routing Key 是 order.cancel,但没有任何 Queue 用 order.cancel 绑定这个 Exchange。消息到了 Exchange,找不到队列,默认情况下 Broker 会静默丢弃。你连个错误都收不到。

mandatory = true 可以解决这个问题:

Channel channel = connection.createChannel();

// 开启 mandatory
channel.basicPublish("exchange.name", "order.cancel",
        true,  // mandatory = true
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        "消息内容".getBytes());

// 注册 ReturnListener,处理路由失败的消息
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText,
            String exchange, String routingKey,
            AMQP.BasicProperties properties, byte[] body) {
        log.warn("消息路由失败:exchange={}, routingKey={}, body={}",
                exchange, routingKey, new String(body));
        // 重新发送或存库兜底
    }
});

路由失败的消息不会静默丢弃,而是通过 ReturnListener 退回给你。你在回调里做重发或者存本地消息表都行。

Spring Boot 配置:

spring:
  rabbitmq:
    publisher-returns: true  # 开启 Return 退回模式

然后实现 RabbitTemplate.ReturnsCallback 接口。

备用交换机(Alternate Exchange)

除了 mandatory,还有一种思路:给 Exchange 配一个备用交换机(AE)。消息路由失败后自动转到 AE,由 AE 路由到一个专门的 "兜底队列",后续人工处理。

// 声明备用交换机和队列
channel.exchangeDeclare("ae.exchange", "fanout", true);
channel.queueDeclare("ae.queue", true, false, false, null);
channel.queueBind("ae.queue", "ae.exchange", "");

// 声明业务 Exchange 时指定备用交换机
Map<String, Object> args = new HashMap<>();
args.put("alternate-exchange", "ae.exchange");
channel.exchangeDeclare("business.exchange", "direct", true, false, args);

mandatory 和 AE 可以同时用。如果两个都配了,优先触发 mandatory 退回;如果 mandatory = false,消息才会走 AE。

本地消息表:终极兜底

说实话,Confirm + mandatory + 持久化已经能覆盖绝大多数场景了。但如果你要求 "绝对不能丢",比如金融交易类业务,光靠 RabbitMQ 自身机制还是不够。网络故障、Producer 自身宕机,都可能导致消息发不出去。

生产环境的常见做法是本地消息表:

  1. 业务操作和消息记录在同一个数据库事务里一起提交。业务表写入订单,消息表写入一条 "待发送" 状态的消息记录。
  2. 后台定时任务扫描消息表中 "待发送" 的记录,发到 RabbitMQ。发送成功就改成 "已发送"。
  3. 如果发送失败,定时任务下次还会扫到这条记录,重试发送。

这就是所谓的 "最终一致性"。消息不会丢,只是可能会延迟。

常见误区

  • 误区一:以为开了 Confirm 消息就不会丢。Confirm 只管到 Broker,路由失败的消息还是会丢。得配合 mandatory 或 AE。
  • 误区二:以为消息持久化了就万事大吉。持久化是异步的,消息刚进队列还没落盘 Broker 就挂了,照样丢。如果对可靠性要求极高,可以在发送后调用 channel.waitForConfirms() 等待 Broker 确认落盘,但这是同步的,会牺牲性能。
  • 误区三:觉得本地消息表太重不想用。如果你的业务能接受 "消息偶尔丢一两条",Confirm + mandatory 确实够了。但金融、支付场景,本地消息表是标配。

面试高频追问

  1. Confirm 模式和事务模式哪个好? 生产环境用 Confirm,不要用事务。事务是同步阻塞的,每发一条消息都得等 Broker 响应,吞吐量很低。Confirm 是异步的,发完不用等,Broker 通过回调通知结果。

  2. 消息全链路可靠性怎么做? 三段都得管:Producer 到 Broker 用 Confirm + mandatory;Broker 自身做持久化(Exchange、Queue、Message 都 durable);Consumer 端关自动 ACK,手动确认。

  3. 本地消息表和 RocketMQ 事务消息有什么区别? 思路一样,都是保证业务操作和消息发送的原子性。区别在于本地消息表是业务侧自己实现的,不依赖 MQ 的事务功能;RocketMQ 的事务消息是 MQ 原生支持的,不需要额外建表。RabbitMQ 没有原生事务消息功能,只能用本地消息表。

常见面试变体

  • "RabbitMQ 如何保证消息不丢?"
  • "消息发送到 RabbitMQ 失败怎么办?"
  • "RabbitMQ 的 Confirm 模式了解吗?"

记忆口诀

发送可靠两道关:Confirm 确认到 Broker,mandatory 兜底路由失败。生产环境三件套:Confirm + mandatory + 持久化。终极方案:本地消息表。

总结

消息发送可靠性靠 Confirm 模式确认消息到了 Broker,mandatory 参数兜底路由失败的消息,再加上 Exchange、Queue、Message 三层持久化。能覆盖绝大多数场景。金融级业务再加本地消息表做最终一致性。面试的时候把 "消息可能丢在哪" 和 "每个环节怎么防" 说清楚,这条线就通了。