Streams(流)

RabbitMQ Streams 是 3.9 版本引入的新特性,提供了类似 Apache Kafka 的持久化日志流功能。

🎯 本章目标

  • 理解 Streams 的核心概念和优势
  • 掌握 Streams 的配置和使用方法
  • 了解 Streams 与传统队列的区别

📖 什么是 Streams

Streams vs 传统队列

传统队列(消费后删除):
┌─────────────────────────────────────────┐
│  Queue: [M1] [M2] [M3] [M4] [M5]         │
└─────────────────────────────────────────┘
         │
         ▼ 消费 M1
┌─────────────────────────────────────────┐
│  Queue: [M2] [M3] [M4] [M5]              │  ← M1 被删除
└─────────────────────────────────────────┘

Streams(持久化日志,消费后保留):
┌─────────────────────────────────────────┐
│  Stream: [M1] [M2] [M3] [M4] [M5]        │
└─────────────────────────────────────────┘
              ↑           ↑
         Consumer A   Consumer B          ← 多消费者各自维护 offset
              │           │
              │           └─── offset=4
              └─────────────── offset=2

核心特点

特性传统队列Streams
消息保留消费后删除持久保留
多消费者竞争消费独立消费
消息重放
时间旅行
高吞吐量一般更高
顺序保证单队列保证保证

适用场景

  • 事件溯源:重放历史事件
  • 日志收集:持久化日志数据
  • 数据同步:多系统数据同步
  • 审计追踪:保留完整操作记录
  • 大数据分析:历史数据分析

🔧 启用 Streams

检查版本

# 需要 RabbitMQ 3.9+
rabbitmq-diagnostics server_version

启用插件

# Streams 功能默认启用,确保以下插件启用
rabbitmq-plugins enable rabbitmq_stream
rabbitmq-plugins enable rabbitmq_stream_management  # 管理界面支持

💻 原生 Java 客户端

添加依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>stream-client</artifactId>
    <version>0.15.0</version>
</dependency>

创建 Stream

package com.example.rabbitmq.stream;

import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.StreamCreator;

public class StreamSetup {
    
    public static void main(String[] args) {
        // 创建 Stream 环境
        Environment environment = Environment.builder()
                .host("localhost")
                .port(5552)  // Stream 协议端口
                .username("admin")
                .password("admin123")
                .build();
        
        // 创建 Stream
        environment.streamCreator()
                .stream("my-stream")
                .maxLengthBytes(1_000_000_000)  // 最大 1GB
                .maxAge(java.time.Duration.ofDays(7))  // 保留 7 天
                .create();
        
        System.out.println("✅ Stream 创建成功!");
        
        environment.close();
    }
}

生产者

package com.example.rabbitmq.stream;

import com.rabbitmq.stream.*;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

@Slf4j
public class StreamProducer {
    
    public static void main(String[] args) throws Exception {
        Environment environment = Environment.builder()
                .host("localhost")
                .port(5552)
                .username("admin")
                .password("admin123")
                .build();
        
        String streamName = "my-stream";
        
        // 创建生产者
        Producer producer = environment.producerBuilder()
                .stream(streamName)
                .build();
        
        // 发送消息
        for (int i = 1; i <= 100; i++) {
            String content = "Message-" + i;
            
            producer.send(
                    producer.messageBuilder()
                            .addData(content.getBytes(StandardCharsets.UTF_8))
                            .build(),
                    confirmationStatus -> {
                        if (confirmationStatus.isConfirmed()) {
                            log.debug("✅ 消息确认");
                        } else {
                            log.error("❌ 消息发送失败");
                        }
                    }
            );
            
            log.info("📤 发送: {}", content);
        }
        
        // 等待确认
        Thread.sleep(1000);
        
        producer.close();
        environment.close();
        
        log.info("🎉 发送完成!");
    }
}

消费者

package com.example.rabbitmq.stream;

import com.rabbitmq.stream.*;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

@Slf4j
public class StreamConsumer {
    
    public static void main(String[] args) {
        Environment environment = Environment.builder()
                .host("localhost")
                .port(5552)
                .username("admin")
                .password("admin123")
                .build();
        
        String streamName = "my-stream";
        
        // 创建消费者
        Consumer consumer = environment.consumerBuilder()
                .stream(streamName)
                .name("my-consumer")  // 消费者名称(用于 offset 跟踪)
                .offset(OffsetSpecification.first())  // 从头开始消费
                .messageHandler((context, message) -> {
                    String content = new String(
                            message.getBodyAsBinary(), 
                            StandardCharsets.UTF_8
                    );
                    
                    log.info("📥 收到消息 [offset={}]: {}", 
                            context.offset(), content);
                    
                    // 手动存储 offset(可选)
                    context.storeOffset();
                })
                .build();
        
        log.info("🎧 消费者启动,等待消息...");
        
        // 保持运行
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            consumer.close();
            environment.close();
            log.info("👋 消费者关闭");
        }));
    }
}

Offset 策略

// 从头开始
OffsetSpecification.first()

// 从尾部开始(只消费新消息)
OffsetSpecification.last()

// 从下一条开始
OffsetSpecification.next()

// 从指定 offset 开始
OffsetSpecification.offset(12345)

// 从指定时间开始
OffsetSpecification.timestamp(
    System.currentTimeMillis() - 3600_000  // 1小时前
)

🌱 Spring Boot 集成

添加依赖

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-stream</artifactId>
</dependency>

配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin123
    stream:
      host: localhost
      port: 5552  # Stream 协议端口
      username: admin
      password: admin123

配置类

package com.example.rabbitmq.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.OffsetSpecification;

@Configuration
public class StreamConfig {
    
    @Bean
    public Environment streamEnvironment() {
        return Environment.builder()
                .host("localhost")
                .port(5552)
                .username("admin")
                .password("admin123")
                .build();
    }
    
    @Bean
    public StreamRabbitListenerContainerFactory streamListenerContainerFactory(
            Environment environment) {
        
        StreamRabbitListenerContainerFactory factory = 
                new StreamRabbitListenerContainerFactory(environment);
        
        // 配置消费者定制器
        factory.setConsumerCustomizer((name, builder) -> {
            builder.offset(OffsetSpecification.first())
                   .name(name);
        });
        
        return factory;
    }
}

Stream 生产者

package com.example.rabbitmq.producer;

import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;

@Slf4j
@Service
public class StreamProducerService {
    
    private final Environment environment;
    private Producer producer;
    
    private static final String STREAM_NAME = "orders.stream";
    
    public StreamProducerService(Environment environment) {
        this.environment = environment;
    }
    
    @PostConstruct
    public void init() {
        // 创建 Stream(如果不存在)
        try {
            environment.streamCreator()
                    .stream(STREAM_NAME)
                    .maxLengthBytes(1_000_000_000)
                    .create();
            log.info("✅ Stream 创建成功: {}", STREAM_NAME);
        } catch (Exception e) {
            log.info("Stream 已存在: {}", STREAM_NAME);
        }
        
        // 创建生产者
        producer = environment.producerBuilder()
                .stream(STREAM_NAME)
                .build();
    }
    
    public void sendMessage(String message) {
        producer.send(
                producer.messageBuilder()
                        .addData(message.getBytes(StandardCharsets.UTF_8))
                        .build(),
                confirmationStatus -> {
                    if (confirmationStatus.isConfirmed()) {
                        log.debug("✅ 消息已确认");
                    } else {
                        log.error("❌ 消息发送失败");
                    }
                }
        );
        
        log.info("📤 发送到 Stream: {}", message);
    }
    
    @PreDestroy
    public void cleanup() {
        if (producer != null) {
            producer.close();
        }
    }
}

Stream 消费者

package com.example.rabbitmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.rabbit.stream.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.stream.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

import java.nio.charset.StandardCharsets;

@Slf4j
@Component
public class StreamConsumerService {
    
    /**
     * 使用 Stream 监听器
     */
    @RabbitListener(
            queues = "orders.stream",
            containerFactory = "streamListenerContainerFactory"
    )
    public void handleStreamMessage(Message message) {
        String content = new String(
                message.getBodyAsBinary(), 
                StandardCharsets.UTF_8
        );
        
        log.info("📥 从 Stream 收到: {}", content);
    }
}

🔄 超级流(Super Streams)

超级流是分区的 Stream,类似于 Kafka 的分区。

创建超级流

Environment environment = Environment.builder()
        .host("localhost")
        .port(5552)
        .username("admin")
        .password("admin123")
        .build();

// 创建 3 分区的超级流
environment.streamCreator()
        .superStream("orders-super-stream")
        .partitions(3)
        .create();

超级流生产者

// 创建超级流生产者
Producer producer = environment.producerBuilder()
        .superStream("orders-super-stream")
        .routing(message -> {
            // 根据消息内容决定分区
            String routingKey = new String(message.getBodyAsBinary());
            return routingKey.hashCode() % 3 + "";  // 返回分区名
        })
        .producerBuilder()
        .build();

超级流消费者

// 创建超级流消费者(消费所有分区)
Consumer consumer = environment.consumerBuilder()
        .superStream("orders-super-stream")
        .name("my-super-consumer")
        .offset(OffsetSpecification.first())
        .messageHandler((context, message) -> {
            String content = new String(message.getBodyAsBinary());
            log.info("📥 分区[{}] 消息: {}", context.stream(), content);
        })
        .build();

📊 性能优化

批量发送

Producer producer = environment.producerBuilder()
        .stream("my-stream")
        .batchSize(100)  // 批量大小
        .build();

子条目批处理

// 使用子条目批处理提高吞吐量
Producer producer = environment.producerBuilder()
        .stream("my-stream")
        .subEntrySize(10)  // 每个条目包含 10 条消息
        .build();

MessageBuilder messageBuilder = producer.messageBuilder();

for (int i = 0; i < 100; i++) {
    messageBuilder.addData(("Message-" + i).getBytes());
}

producer.send(messageBuilder.build(), confirmationStatus -> {
    // 批量确认
});

压缩

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.compression.Compression;

Producer producer = environment.producerBuilder()
        .stream("my-stream")
        .compression(Compression.GZIP)  // 启用 GZIP 压缩
        .build();

⚠️ 注意事项

1. 端口配置

# Stream 协议使用独立端口
# AMQP: 5672
# Stream: 5552 (默认)

# Docker 需要映射该端口
docker run -p 5672:5672 -p 5552:5552 -p 15672:15672 rabbitmq:3.12-management

2. 消息保留策略

// 设置保留策略(二选一)
environment.streamCreator()
        .stream("my-stream")
        .maxLengthBytes(10_000_000_000L)  // 最大 10GB
        .maxAge(Duration.ofDays(30))      // 或最多保留 30 天
        .create();

3. 与传统队列的选择

使用 Stream:
✅ 需要消息重放
✅ 需要多消费者独立消费
✅ 需要高吞吐量
✅ 需要时间旅行

使用传统队列:
✅ 简单的生产消费模式
✅ 需要复杂的路由规则
✅ 需要死信队列
✅ 需要优先级队列

📝 本章小结

要点说明
核心特性持久化日志流,消息保留
消息重放支持从任意 offset 消费
多消费者每个消费者独立 offset
高吞吐量比传统队列更高
端口5552(Stream 协议)
版本要求RabbitMQ 3.9+

下一步

了解 Streams 后,让我们学习 安全配置 保护 RabbitMQ!