Tavio's blog Tavio's blog
首页
  • JVM底层原理
  • 邪恶多线程
  • MyBatis底层原理
  • Spring底层原理
  • MySQL的优化之路
  • ClickHouse的高性能
  • Redis的快速查询
  • RabbitMQ的生产
  • Kafka的高吞吐量
  • ES的入门到入坑
  • MySQL自增ID主键空洞
  • 前端实现长整型排序
  • MySQL无感换表
  • Redis延时双删
  • 高并发秒杀优惠卷
  • AOP无侵入式告警
  • 长短链接跳转
  • 订单超时取消
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

Tavio Zhang

努力学习的小码喽
首页
  • JVM底层原理
  • 邪恶多线程
  • MyBatis底层原理
  • Spring底层原理
  • MySQL的优化之路
  • ClickHouse的高性能
  • Redis的快速查询
  • RabbitMQ的生产
  • Kafka的高吞吐量
  • ES的入门到入坑
  • MySQL自增ID主键空洞
  • 前端实现长整型排序
  • MySQL无感换表
  • Redis延时双删
  • 高并发秒杀优惠卷
  • AOP无侵入式告警
  • 长短链接跳转
  • 订单超时取消
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • RabbitMQ概念扫盲
  • RabbitMQ集群
  • RabbitMQ生产者可靠性
  • RabbitMQ消息持久化
  • RabbitMQ消费者可靠性
  • Rabbit延时消息
    • 一、核心概念:死信与死信交换机
      • 1.1 什么是死信?
      • 1.2 什么是死信交换机?
    • 二、方案一:消息TTL + 死信队列
      • 2.1 核心原理
      • 2.2 完整实现步骤
      • 步骤1:声明组件(以Spring AMQP为例)
      • 步骤2:发送延时消息
      • 步骤3:消费死信队列消息
      • 2.3 存在的问题:队列阻塞
    • 三、方案二:延时消息插件
      • 3.1 插件介绍
      • 3.2 核心原理
      • 3.3 完整实现步骤
      • 步骤1:声明组件(以Spring AMQP为例)
      • 步骤2:发送延时消息
      • 步骤3:消费目标队列消息
      • 3.4 优缺点分析
    • 四、方案对比与选型建议
    • 五、总结
  • 《RabbitMQ》笔记
Tavio
2024-06-30
目录

Rabbit延时消息

在分布式系统开发中,我们经常会遇到需要「延时处理」的业务场景:

  • 电商订单创建后,若15分钟内未支付则自动取消
  • 外卖订单超时未接单,自动推送给其他骑手
  • 会员到期前3天,发送续费提醒
  • 任务失败后,间隔10分钟、30分钟、1小时自动重试

传统实现方案多采用「定时任务轮询数据库」,但这种方式存在明显缺陷:

  • 资源浪费:即使只需10分钟延时,也可能需要每分钟轮询一次,产生大量无效查询
  • 精度不足:轮询间隔决定了最小延时单位,无法实现秒级精度
  • 分布式复杂:多实例部署时需引入分布式锁避免重复执行,增加系统复杂度

RabbitMQ提供了更优雅的延时消息解决方案,具备「解耦、高可靠、高精度」的特点。

# 一、核心概念:死信与死信交换机

在了解RabbitMQ延时消息前,需先掌握「死信」和「死信交换机」的核心概念。

# 1.1 什么是死信?

死信(Dead Letter)指无法被正常消费的消息。当消息满足以下三种情况之一时,会被RabbitMQ标记为死信:

  1. 消息被消费者拒绝
    消费者通过basicReject或basicNack方法拒绝消费消息,且未设置requeue=false(不重新入队)。
    例:订单处理时发现参数错误,拒绝消费并标记为死信,后续人工排查

  2. 消息TTL过期
    TTL(Time To Live)即消息存活时间,若消息在队列中超过设定时间仍未被消费,则变为死信。
    例:设置TTL=15分钟的订单消息,15分钟未支付则触发取消逻辑

  3. 队列达到最大长度
    队列配置了x-max-length(最大消息数),当消息堆积超过上限时,新消息会挤掉旧消息(或直接成为死信,取决于配置)。
    例:秒杀场景下,队列最多存储1000条消息,超出的消息直接进入死信队列等待降级处理

# 1.2 什么是死信交换机?

死信交换机(Dead Letter Exchange,DLX)是一种特殊的交换机,专门用于接收死信消息,并将其路由到指定的「死信队列」中。

  • 本质:一种兜底机制,避免死信消息丢失,同时隔离异常数据便于排查
  • 工作流程:普通队列 → 产生死信 → 死信交换机 → 死信队列 → 死信消费者

# 二、方案一:消息TTL + 死信队列

利用「消息TTL过期后成为死信」的特性,结合死信交换机实现延时消息,是RabbitMQ最经典的延时方案。

# 2.1 核心原理

  1. 声明一个「延时队列」(无消费者监听),并配置其绑定的死信交换机和路由键
  2. 生产者发送消息到延时队列,并设置消息TTL(延时时间)
  3. 消息在延时队列中等待TTL过期,自动成为死信
  4. 死信被投递到配置的死信交换机,再路由到死信队列
  5. 消费者监听死信队列,触发延时任务

关键:延时队列仅用于存储消息等待过期,真正执行逻辑的是死信队列的消费者

# 2.2 完整实现步骤

# 步骤1:声明组件(以Spring AMQP为例)

需声明4个核心组件:普通交换机、延时队列、死信交换机、死信队列。

@Configuration
public class TtlDlxConfig {

    // 1. 普通交换机(用于发送消息到延时队列)
    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange("normal.exchange");
    }

    // 2. 延时队列(无消费者,仅用于存储消息等待TTL过期)
    @Bean
    public Queue delayQueue() {
        Map<String, Object> args = new HashMap<>();
        // 配置死信交换机
        args.put("x-dead-letter-exchange", "dlx.exchange");
        // 配置死信路由键(需与死信队列绑定键一致)
        args.put("x-dead-letter-routing-key", "dlx.key");
        // 可选:设置队列最大长度(超出则新消息成为死信)
        // args.put("x-max-length", 1000);
        return QueueBuilder.durable("delay.queue")
                .withArguments(args)
                .build();
    }

    // 3. 绑定:普通交换机 → 延时队列
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue())
                .to(normalExchange())
                .with("delay.key");
    }

    // 4. 死信交换机
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlx.exchange");
    }

    // 5. 死信队列(消费者监听此队列)
    @Bean
    public Queue dlxQueue() {
        return QueueBuilder.durable("dlx.queue").build();
    }

    // 6. 绑定:死信交换机 → 死信队列
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with("dlx.key"); // 与延时队列配置的死信路由键一致
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

# 步骤2:发送延时消息

通过setExpiration设置消息TTL(单位:毫秒):

@Service
public class DelayMessageService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送延时消息
     * @param content 消息内容
     * @param delayMillis 延时时间(毫秒)
     */
    public void sendDelayMsg(String content, long delayMillis) {
        Message message = MessageBuilder
                .withBody(content.getBytes(StandardCharsets.UTF_8))
                .setExpiration(String.valueOf(delayMillis)) // 设置TTL
                .build();
        // 发送到普通交换机,路由到延时队列
        rabbitTemplate.convertAndSend("normal.exchange", "delay.key", message);
        log.info("延时消息发送成功,内容:{},延时:{}ms", content, delayMillis);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 步骤3:消费死信队列消息

@Component
public class DlxConsumer {

    @RabbitListener(queues = "dlx.queue")
    public void handleDlxMsg(String content) {
        log.info("收到延时消息,执行逻辑:{}", content);
        // 此处执行延时任务(如取消订单、发送提醒等)
    }
}
1
2
3
4
5
6
7
8
9

# 2.3 存在的问题:队列阻塞

核心缺陷:延时队列的FIFO(先进先出)特性会导致消息阻塞。

  • 场景举例:
    第一条消息设置TTL=30秒,第二条消息设置TTL=1秒。
    由于队列按顺序存储,第一条消息未过期前,第二条消息即使已过期也无法被处理,必须等待第一条消息过期后才会被检测并投递到死信队列。

  • 结论:
    此方案仅适用于「所有消息延时时间相同」的场景(如固定15分钟后取消订单),若需处理不同延时时间的消息,会导致精度问题。

# 三、方案二:延时消息插件

为解决TTL队列的阻塞问题,RabbitMQ提供了第三方插件rabbitmq_delayed_message_exchange,实现更灵活的延时消息。

# 3.1 插件介绍

  • 本质:一种特殊的交换机,接收消息后不会立即转发,而是根据消息的x-delay头信息,在指定时间后再转发到目标队列。
  • 优势:消息按延时时间排序,到点即转发,无阻塞问题,支持不同延时时间的消息。
  • 安装方法:
    1. 下载插件(需与RabbitMQ版本匹配):官网下载 (opens new window)
    2. 放入RabbitMQ的plugins目录
    3. 启用插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    4. 重启RabbitMQ生效

# 3.2 核心原理

  1. 声明一个「延时交换机」(类型为x-delayed-message)
  2. 生产者发送消息时,通过x-delay头设置延时时间(毫秒)
  3. 延时交换机接收消息后,暂存消息并倒计时
  4. 延时时间到后,交换机将消息转发到绑定的目标队列
  5. 消费者监听目标队列,触发延时任务

# 3.3 完整实现步骤

# 步骤1:声明组件(以Spring AMQP为例)

@Configuration
public class DelayedPluginConfig {

    // 1. 延时交换机(类型为x-delayed-message)
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct"); // 指定交换机的路由类型(如direct、topic)
        // 构造函数参数:交换机名称、类型、持久化、自动删除、参数
        return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
    }

    // 2. 目标队列(消费者监听此队列)
    @Bean
    public Queue targetQueue() {
        return QueueBuilder.durable("target.queue").build();
    }

    // 3. 绑定:延时交换机 → 目标队列
    @Bean
    public Binding delayedBinding() {
        return BindingBuilder.bind(targetQueue())
                .to(delayedExchange())
                .with("delayed.key")
                .noargs();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 步骤2:发送延时消息

通过MessagePostProcessor设置x-delay头:

@Service
public class DelayedPluginService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送延时消息(基于插件)
     * @param content 消息内容
     * @param delayMillis 延时时间(毫秒)
     */
    public void sendDelayedMsg(String content, long delayMillis) {
        rabbitTemplate.convertAndSend(
                "delayed.exchange", 
                "delayed.key", 
                content, 
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) {
                        // 设置延时时间(x-delay头)
                        message.getMessageProperties().setDelay((int) delayMillis);
                        return message;
                    }
                }
        );
        log.info("插件延时消息发送成功,内容:{},延时:{}ms", content, delayMillis);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 步骤3:消费目标队列消息

@Component
public class TargetConsumer {

    @RabbitListener(queues = "target.queue")
    public void handleDelayedMsg(String content) {
        log.info("收到插件延时消息,执行逻辑:{}", content);
        // 此处执行延时任务
    }
}
1
2
3
4
5
6
7
8
9

# 3.4 优缺点分析

  • 优点:

    1. 支持不同延时时间的消息,无队列阻塞问题
    2. 延时精度高(依赖RabbitMQ节点时间,误差较小)
    3. 实现简单,无需配置死信相关组件
  • 缺点:

    1. 消息持久化风险:延时期间消息存储在交换机中,未写入磁盘。若RabbitMQ节点宕机,未转发的消息会丢失(即使消息标记为持久化)。
    2. 依赖第三方插件:需额外安装和维护插件,增加部署复杂度。

# 四、方案对比与选型建议

特性 消息TTL + 死信队列 延时消息插件
适用场景 固定延时时间(如统一15分钟过期) 动态延时时间(如10s、30s、1h)
阻塞问题 存在(FIFO特性导致) 无(按延时时间排序)
消息可靠性 高(消息存储在队列,支持持久化) 中(延时期间存在内存,宕机丢失)
部署复杂度 低(无需额外插件) 中(需安装插件)
延时精度 低(受前序消息影响) 高(独立计时)

选型建议:

  • 若业务场景中所有消息延时时间固定(如订单统一15分钟过期),优先选择「消息TTL+死信队列」,可靠性更高。
  • 若需要处理动态延时时间(如不同任务不同重试间隔),且能接受轻微的可靠性风险,选择「延时消息插件」。
  • 对消息可靠性要求极高的场景(如金融交易),可结合两者设计兜底方案(如插件+定时任务补偿)。

# 五、总结

RabbitMQ通过「消息TTL+死信队列」和「延时消息插件」两种方案实现延时消息,解决了传统定时任务的资源浪费和精度问题。实际开发中需根据业务场景的「延时多样性」「可靠性要求」「部署复杂度」选择合适方案,同时注意规避队列阻塞、消息丢失等风险,确保延时任务稳定执行。

编辑 (opens new window)
#延时消息#死信队列
上次更新: 2026/01/21, 19:29:14
RabbitMQ消费者可靠性

← RabbitMQ消费者可靠性

最近更新
01
订单超时取消
01-21
02
双 Token 登录
01-21
03
长短链接跳转
01-21
更多文章>
Theme by Vdoing
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式