Streams(流)
RabbitMQ Streams 是 3.9 版本引入的新特性,提供了类似 Apache Kafka 的持久化日志流功能。
🎯 本章目标
- 理解 Streams 的核心概念和优势
- 掌握 Streams 的配置和使用方法
- 了解 Streams 与传统队列的区别
📖 什么是 Streams
Streams vs 传统队列
传统队列(消费后删除):
┌─────────────────────────────────────────┐
│ Queue: [M1] [M2] [M3] [M4] [M5] │
└─────────────────────────────────────────┘
│
▼ 消费 M1
┌─────────────────────────────────────────┐
│ Queue: [M2] [M3] [M4] [M5] │ ← M1 被删除
└─────────────────────────────────────────┘
Streams(持久化日志,消费后保留):
┌─────────────────────────────────────────┐
│ Stream: [M1] [M2] [M3] [M4] [M5] │
└─────────────────────────────────────────┘
↑ ↑
Consumer A Consumer B ← 多消费者各自维护 offset
│ │
│ └─── offset=4
└─────────────── offset=2
核心特点
| 特性 | 传统队列 | Streams |
|---|---|---|
| 消息保留 | 消费后删除 | 持久保留 |
| 多消费者 | 竞争消费 | 独立消费 |
| 消息重放 | ❌ | ✅ |
| 时间旅行 | ❌ | ✅ |
| 高吞吐量 | 一般 | 更高 |
| 顺序保证 | 单队列保证 | 保证 |
适用场景
- 事件溯源:重放历史事件
- 日志收集:持久化日志数据
- 数据同步:多系统数据同步
- 审计追踪:保留完整操作记录
- 大数据分析:历史数据分析
🔧 启用 Streams
检查版本
# 需要 RabbitMQ 3.9+
rabbitmq-diagnostics server_version
启用插件
# Streams 功能默认启用,确保以下插件启用
rabbitmq-plugins enable rabbitmq_stream
rabbitmq-plugins enable rabbitmq_stream_management # 管理界面支持
💻 原生 Java 客户端
添加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>stream-client</artifactId>
<version>0.15.0</version>
</dependency>
创建 Stream
package com.example.rabbitmq.stream;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.StreamCreator;
public class StreamSetup {
public static void main(String[] args) {
// 创建 Stream 环境
Environment environment = Environment.builder()
.host("localhost")
.port(5552) // Stream 协议端口
.username("admin")
.password("admin123")
.build();
// 创建 Stream
environment.streamCreator()
.stream("my-stream")
.maxLengthBytes(1_000_000_000) // 最大 1GB
.maxAge(java.time.Duration.ofDays(7)) // 保留 7 天
.create();
System.out.println("✅ Stream 创建成功!");
environment.close();
}
}
生产者
package com.example.rabbitmq.stream;
import com.rabbitmq.stream.*;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class StreamProducer {
public static void main(String[] args) throws Exception {
Environment environment = Environment.builder()
.host("localhost")
.port(5552)
.username("admin")
.password("admin123")
.build();
String streamName = "my-stream";
// 创建生产者
Producer producer = environment.producerBuilder()
.stream(streamName)
.build();
// 发送消息
for (int i = 1; i <= 100; i++) {
String content = "Message-" + i;
producer.send(
producer.messageBuilder()
.addData(content.getBytes(StandardCharsets.UTF_8))
.build(),
confirmationStatus -> {
if (confirmationStatus.isConfirmed()) {
log.debug("✅ 消息确认");
} else {
log.error("❌ 消息发送失败");
}
}
);
log.info("📤 发送: {}", content);
}
// 等待确认
Thread.sleep(1000);
producer.close();
environment.close();
log.info("🎉 发送完成!");
}
}
消费者
package com.example.rabbitmq.stream;
import com.rabbitmq.stream.*;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class StreamConsumer {
public static void main(String[] args) {
Environment environment = Environment.builder()
.host("localhost")
.port(5552)
.username("admin")
.password("admin123")
.build();
String streamName = "my-stream";
// 创建消费者
Consumer consumer = environment.consumerBuilder()
.stream(streamName)
.name("my-consumer") // 消费者名称(用于 offset 跟踪)
.offset(OffsetSpecification.first()) // 从头开始消费
.messageHandler((context, message) -> {
String content = new String(
message.getBodyAsBinary(),
StandardCharsets.UTF_8
);
log.info("📥 收到消息 [offset={}]: {}",
context.offset(), content);
// 手动存储 offset(可选)
context.storeOffset();
})
.build();
log.info("🎧 消费者启动,等待消息...");
// 保持运行
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.close();
environment.close();
log.info("👋 消费者关闭");
}));
}
}
Offset 策略
// 从头开始
OffsetSpecification.first()
// 从尾部开始(只消费新消息)
OffsetSpecification.last()
// 从下一条开始
OffsetSpecification.next()
// 从指定 offset 开始
OffsetSpecification.offset(12345)
// 从指定时间开始
OffsetSpecification.timestamp(
System.currentTimeMillis() - 3600_000 // 1小时前
)
🌱 Spring Boot 集成
添加依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-stream</artifactId>
</dependency>
配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin123
stream:
host: localhost
port: 5552 # Stream 协议端口
username: admin
password: admin123
配置类
package com.example.rabbitmq.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.OffsetSpecification;
@Configuration
public class StreamConfig {
@Bean
public Environment streamEnvironment() {
return Environment.builder()
.host("localhost")
.port(5552)
.username("admin")
.password("admin123")
.build();
}
@Bean
public StreamRabbitListenerContainerFactory streamListenerContainerFactory(
Environment environment) {
StreamRabbitListenerContainerFactory factory =
new StreamRabbitListenerContainerFactory(environment);
// 配置消费者定制器
factory.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first())
.name(name);
});
return factory;
}
}
Stream 生产者
package com.example.rabbitmq.producer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
@Slf4j
@Service
public class StreamProducerService {
private final Environment environment;
private Producer producer;
private static final String STREAM_NAME = "orders.stream";
public StreamProducerService(Environment environment) {
this.environment = environment;
}
@PostConstruct
public void init() {
// 创建 Stream(如果不存在)
try {
environment.streamCreator()
.stream(STREAM_NAME)
.maxLengthBytes(1_000_000_000)
.create();
log.info("✅ Stream 创建成功: {}", STREAM_NAME);
} catch (Exception e) {
log.info("Stream 已存在: {}", STREAM_NAME);
}
// 创建生产者
producer = environment.producerBuilder()
.stream(STREAM_NAME)
.build();
}
public void sendMessage(String message) {
producer.send(
producer.messageBuilder()
.addData(message.getBytes(StandardCharsets.UTF_8))
.build(),
confirmationStatus -> {
if (confirmationStatus.isConfirmed()) {
log.debug("✅ 消息已确认");
} else {
log.error("❌ 消息发送失败");
}
}
);
log.info("📤 发送到 Stream: {}", message);
}
@PreDestroy
public void cleanup() {
if (producer != null) {
producer.close();
}
}
}
Stream 消费者
package com.example.rabbitmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.rabbit.stream.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.stream.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import java.nio.charset.StandardCharsets;
@Slf4j
@Component
public class StreamConsumerService {
/**
* 使用 Stream 监听器
*/
@RabbitListener(
queues = "orders.stream",
containerFactory = "streamListenerContainerFactory"
)
public void handleStreamMessage(Message message) {
String content = new String(
message.getBodyAsBinary(),
StandardCharsets.UTF_8
);
log.info("📥 从 Stream 收到: {}", content);
}
}
🔄 超级流(Super Streams)
超级流是分区的 Stream,类似于 Kafka 的分区。
创建超级流
Environment environment = Environment.builder()
.host("localhost")
.port(5552)
.username("admin")
.password("admin123")
.build();
// 创建 3 分区的超级流
environment.streamCreator()
.superStream("orders-super-stream")
.partitions(3)
.create();
超级流生产者
// 创建超级流生产者
Producer producer = environment.producerBuilder()
.superStream("orders-super-stream")
.routing(message -> {
// 根据消息内容决定分区
String routingKey = new String(message.getBodyAsBinary());
return routingKey.hashCode() % 3 + ""; // 返回分区名
})
.producerBuilder()
.build();
超级流消费者
// 创建超级流消费者(消费所有分区)
Consumer consumer = environment.consumerBuilder()
.superStream("orders-super-stream")
.name("my-super-consumer")
.offset(OffsetSpecification.first())
.messageHandler((context, message) -> {
String content = new String(message.getBodyAsBinary());
log.info("📥 分区[{}] 消息: {}", context.stream(), content);
})
.build();
📊 性能优化
批量发送
Producer producer = environment.producerBuilder()
.stream("my-stream")
.batchSize(100) // 批量大小
.build();
子条目批处理
// 使用子条目批处理提高吞吐量
Producer producer = environment.producerBuilder()
.stream("my-stream")
.subEntrySize(10) // 每个条目包含 10 条消息
.build();
MessageBuilder messageBuilder = producer.messageBuilder();
for (int i = 0; i < 100; i++) {
messageBuilder.addData(("Message-" + i).getBytes());
}
producer.send(messageBuilder.build(), confirmationStatus -> {
// 批量确认
});
压缩
import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.compression.Compression;
Producer producer = environment.producerBuilder()
.stream("my-stream")
.compression(Compression.GZIP) // 启用 GZIP 压缩
.build();
⚠️ 注意事项
1. 端口配置
# Stream 协议使用独立端口
# AMQP: 5672
# Stream: 5552 (默认)
# Docker 需要映射该端口
docker run -p 5672:5672 -p 5552:5552 -p 15672:15672 rabbitmq:3.12-management
2. 消息保留策略
// 设置保留策略(二选一)
environment.streamCreator()
.stream("my-stream")
.maxLengthBytes(10_000_000_000L) // 最大 10GB
.maxAge(Duration.ofDays(30)) // 或最多保留 30 天
.create();
3. 与传统队列的选择
使用 Stream:
✅ 需要消息重放
✅ 需要多消费者独立消费
✅ 需要高吞吐量
✅ 需要时间旅行
使用传统队列:
✅ 简单的生产消费模式
✅ 需要复杂的路由规则
✅ 需要死信队列
✅ 需要优先级队列
📝 本章小结
| 要点 | 说明 |
|---|---|
| 核心特性 | 持久化日志流,消息保留 |
| 消息重放 | 支持从任意 offset 消费 |
| 多消费者 | 每个消费者独立 offset |
| 高吞吐量 | 比传统队列更高 |
| 端口 | 5552(Stream 协议) |
| 版本要求 | RabbitMQ 3.9+ |
下一步
了解 Streams 后,让我们学习 安全配置 保护 RabbitMQ!
