谈谈 Redis 5.0 中的 Stream 消息队列?

谈谈 Redis 5.0 中的 Stream 消息队列?谈谈 Redis 5.0 中的 Stream 消息队列?

Redis Stream 是 Redis 5.0 引入的最重要的数据结构之一,它标志着 Redis 从一个纯粹的内存数据结构服务器向一个完整的消息中间件和流处理平台迈出了关键一步。

一、 Stream 是什么?为什么需要它?

在 Stream 出现之前,Redis 已经有了两种消息传递机制:

  1. List:可以实现简单的队列,但功能单一,缺乏消费者组等高级特性。
  2. Pub/Sub:纯粹的广播,消息无法持久化,无积压能力。

Stream 的诞生,就是为了填补 “可靠消息队列” 的空白。 它是一个更强大的、支持持久化的、类似 Kafka 的日志数据结构。

核心定位:一个支持多消费者组的、可持久化的、可回溯的消息流。

二、 核心概念与数据结构

理解 Stream 的关键在于理解它的几个核心概念,它们共同构成了一个完整的消息生态系统。

  1. 消息(Message)

    • 它是 Stream 中的基本单元。

    • 每个消息由一个唯一的 ID 和一组键值对组成。

    • ID:默认格式为 <毫秒时间戳>-<序列号>(例如 1640995200000-0)。可以自定义,但必须保证递增。

    • 内容:类似于一个小的 Hash,包含多个 Field-Value 对,这使得一条消息可以承载结构化数据。

      XADD mystream * sensor_id 1234 temperature 23.5 humidity 80
      # 消息ID: 1640995200000-0
      # 内容: {sensor_id: 1234, temperature: 23.5, humidity: 80}
      
  2. 消费者组(Consumer Group) 这是 Stream 最强大的特性,它解决了 Pub/Sub 中 “负载均衡” 和 “消息可靠性” 的核心痛点。

    • 一个 Stream 可以有多个消费者组。
    • 对于一个消费者组而言,每条消息只会被组内的一个消费者消费。这实现了组内的负载均衡。
    • 组之间是独立的,同一个消息可以被多个不同的组消费。这实现了 “广播” 效果。
  3. 消费者(Consumer)

    • 消费者组内的一个工作单元。
    • 每个消费者独立地消费消息并确认。
  4. 最后递送 ID(Last Delivered ID)

    • 每个消费者组都会维护一个 last_delivered_id,指向即将被递送给消费者的下一条消息。可以理解为消费组的消费位点。
    • 当消费者使用 > 符号读取时,就是从这个 ID 之后开始读取。
  5. 待处理消息列表(Pending Entries List, PEL)

    • 消息被发送给消费者后,在未被确认(ACK)之前,会进入该消费者的 PEL。
    • 这是实现 “至少一次” 投递语义的关键。如果消费者崩溃,其他消费者可以接管并重新处理这些 PEL 中的消息。

三、 核心命令与工作流程

接下来,让我们通过命令来串联起整个工作流程。

1. 生产消息

使用 XADD 命令向流中追加消息。

XADD orders * order_id 1001 user_id "john" amount 199.99

2. 创建消费者组

使用 XGROUP CREATE 为流创建一个消费者组。

XGROUP CREATE orders order-processors $ MKSTREAM
  • orders:流名称。
  • order-processors:消费者组名称。
  • $:表示从流的尾部开始消费。使用 0 则表示从流的开头开始消费。
  • MKSTREAM:如果流不存在,则自动创建。

3. 消费消息

消费者使用 XREADGROUP 命令从组中读取消息。

XREADGROUP GROUP order-processors consumer-1 COUNT 1 BLOCK 5000 STREAMS orders >
  • GROUP order-processors consumer-1:指定消费者组和消费者名称。
  • COUNT 1:一次读取一条消息。
  • BLOCK 5000:如果没有消息,阻塞等待 5 秒。
  • STREAMS orders >:从 orders 流中读取,> 表示只接收从未被当前消费者组消费过的新消息。

4. 确认消息

消费者处理完消息后,必须使用 XACK 进行确认,否则该消息会一直停留在 PEL 中。

XACK orders order-processors 1640995200000-0

5. 检查与认领待处理消息

如果某个消费者崩溃,它的 PEL 中会有未确认的消息。管理员或其他消费者可以使用 XPENDING 查看,并使用 XCLAIM 将这些消息认领过来自己处理。

# 查看 pending 消息
XPENDING orders order-processors

# 认领一条超过 60000 毫秒未被确认的消息
XCLAIM orders order-processors consumer-2 60000 1640995200000-0

四、 优势、局限与生产实践

核心优势(相比 List/Pub/Sub)

  1. 可靠性:消息持久化,支持 ACK 机制,确保消息至少被处理一次。
  2. 负载均衡:通过消费者组,实现在多个消费者实例间分担负载。
  3. 消息回溯:可以读取任意历史位置的消息,支持重放。
  4. 阻塞操作:原生支持阻塞式读取,避免无效轮询。
  5. 流剪裁:可以通过 XTRIMMAXLEN 选项限制流的长度,防止内存无限增长。

局限性考量

  1. 非专业 MQ 的全功能:缺乏 RabbitMQ 那样复杂的交换机和路由规则,也没有 Kafka 那样完善的分区再平衡和副本机制。
  2. 内存成本:所有数据都在内存中,成本高于基于磁盘的 Kafka。
  3. 复杂性:概念和 API 比 List/Pub/Sub 复杂得多。

生产环境选型建议

  • 选择 Redis Stream 当:
    • 你的系统已经重度依赖 Redis,希望引入轻量级的可靠消息队列,避免引入新的中间件。
    • 消息吞吐量不是极端巨大(例如每天千亿级),且允许消息因内存限制而被剪裁。
    • 需要快速的、低延迟的消息处理。
    • 场景类似于:事件溯源、任务队列、微服务间的通信、实时数据流(如传感器数据)。
  • 选择 Kafka/Pulsar 当:
    • 需要处理海量数据(TB/PB 级),并且要求高吞吐量。
    • 要求消息长期持久化(数月甚至数年)。
    • 需要更高级的流处理功能(如 Kafka Streams)。
    • 生态系统需要与其他大数据组件(如 Flink, Spark)无缝集成。
  • 选择 RabbitMQ 当:
    • 需要复杂的消息路由、优先级、延迟队列、死信交换等高级特性。

总结

Redis Stream 通过将 “消息流”、“消费者组”、“待处理列表” 和 “确认机制” 这些概念原生地集成到 Redis 内核中,提供了一个在性能和功能上极为均衡的解决方案。它完美地填补了 Redis Pub/Sub 的不可靠性和引入 Kafka 等重型中间件之间的空白。

对于许多中小型系统、实时性要求高的场景,或者已经深度使用 Redis 的架构而言,Redis Stream 是实现内部服务解耦、构建事件驱动架构的“杀手锏”。它让 Redis 从一个缓存/数据库,进化为了一个真正的应用内消息中枢。