Redis Stream


Redis Stream 技术文档

1、Redis Stream 简介

Redis Stream 是 Redis 5.0 引入的一种新的数据类型,它用于可持久化的实时数据流处理,提供了一套简单而强大的消息分发模型。类似于Kafka的消息队列,Redis Stream 也支持多者进行数据消费,可以适用于诸如日志处理、即时消息、事件源、实时分析等数据采集场景。

redis> XADD mystream * field1 value1 field2 value2
>> "36116035032231361-0"
redis> XADD mystream * field1 value3
>> "36116036694022528-0"
redis> XADD mystream * field2 value4
>> "36116037260014209-0"
redis> XRANGE mystream - +
1) 1) "36116035032231361-0"
   2) 1) "field2"
      2) "value2"
      3) "field1"
      4) "value1"
2) 1) "36116036694022528-0"
   2) 1) "field1"
      2) "value3"
3) 1) "36116037260014209-0"
   2) 1) "field2"
      2) "value4"

2、Redis Stream 结构

每个 Redis Stream 均由一个唯一的名称表示。Stream 是一个有序列表,它由一个或多个包含键值对的消息组成,每个消息从左到右在列表中进行排列。当 Stream 长度超过限制时,旧数据会根据策略自动删除。

消息可以使用任何可打印的字符串作为标识符,也可以自己使用 XADD 命令来设置ID,Redis 会通过时间戳和随机标识符自动创建唯一ID标识。每个消息都由零个或多个键值对组成,每个键和值都是一个字符串,一个消息可以创建多个字段,但所有字段应具有相同的数量,不同长度的消息会对消费端造成偏移量不一致的问题。

3、Redis Stream 命令

命令 描述
XADD 添加一条消息到流
XRANGE 返回给定区间内的消息,可以限制消息数量以及逆序输出
XREVRANGE 返回逆序排列的区间内的消息,基本与XRANGE相同
XLEN 获取 Stream 长度
XREAD 读取来自一个或多个 Stream 的新消息。
XACK 消费者确认一条或多条消息
XDEL 删除一条或多条数据
XGROUP 创建和管理 Stream 的消费者组,由多个消费者共同消费消息。

4、使用 Redis Stream 构建简单消息系统

通过 Redis Stream ,我们可以快速构建一个消息系统,以便管理日志、分发任务等。下面是一个简单的Python demo,它将使用Redis Stream来发送和接收消息。

首先,我们需要安装Python的redis库(pip install redis)。接着,可以直接使用 Redis 的 STREAM 命令来构建消息系统,代码如下:

import redis

conn = redis.Redis()

def client_post_message(n):
    for i in range(n):
        message = input("Message:")
        conn.execute_command("XADD", "mystream", "*", "message", message)

def client_receive_message():
    while True:
        last_id = "0-0" if last_id is None else last_id
        response = conn.execute_command("XREAD", "COUNT", 1, "BLOCK", 0, "STREAMS", "mystream", last_id)
        streams = response[0][1]
        for stream in streams:
            msg = stream[1][1].decode("utf-8")
            print(f"Received message: {msg}")
            last_id = stream[0]

if __name__ == '__main__':
    import sys
    last_id = None
    if len(sys.argv) > 1:
        client_post_message(int(sys.argv[1]))
    else:
        client_receive_message()

上面的 Python 代码中,将 Redis 连接到本地 Redis 实例来发送和接收数据。客户端按需选择post 消息或者recv 消息。接到的消息会被输出到控制台。

5、总结

Redis Stream 支持多消费者的实时消息分发,其内部实现为基于链表的消息队列,并且应在存储中尽可能长时间保留数据,从而支持最终一致性模型。Redis Stream 不仅适用于大规模数据处理,也可以用于小规模的消息通讯。即使在网络延迟的情况下,Redis Stream 仍能够保证消息的可靠传递,是一个性能优异、易于扩展的优秀实时数据处理工具。