惰性队列
惰性队列(Lazy Queue)是 RabbitMQ 3.6+ 引入的特性,专门用于处理大量消息堆积的场景。
🎯 本章目标
- 理解惰性队列的原理和优势
- 掌握惰性队列的配置方法
- 了解惰性队列的适用场景
📖 什么是惰性队列
普通队列 vs 惰性队列
普通队列(内存优先):
┌─────────────────────────────────────────┐
│ 内存 │
│ ┌─────┬─────┬─────┬─────┬─────┬─────┐ │
│ │ M1 │ M2 │ M3 │ M4 │ M5 │ ... │ │ ← 消息存储在内存
│ └─────┴─────┴─────┴─────┴─────┴─────┘ │
└─────────────────────────────────────────┘
│ 内存不足时
▼
┌─────────────────────────────────────────┐
│ 磁盘 │ ← 才会写入磁盘
└─────────────────────────────────────────┘
惰性队列(磁盘优先):
┌─────────────────────────────────────────┐
│ 磁盘 │
│ ┌─────┬─────┬─────┬─────┬─────┬─────┐ │
│ │ M1 │ M2 │ M3 │ M4 │ M5 │ ... │ │ ← 消息直接写入磁盘
│ └─────┴─────┴─────┴─────┴─────┴─────┘ │
└─────────────────────────────────────────┘
│ 消费时
▼
┌─────────────────────────────────────────┐
│ 内存 │ ← 按需加载到内存
│ ┌─────┐ │
│ │ M1 │ │
│ └─────┘ │
└─────────────────────────────────────────┘
核心特点
| 特性 | 普通队列 | 惰性队列 |
|---|---|---|
| 消息存储位置 | 内存优先 | 磁盘优先 |
| 内存占用 | 高 | 低 |
| 消息堆积能力 | 有限 | 大量 |
| 消费延迟 | 低 | 略高 |
| 适用场景 | 实时消费 | 消息堆积 |
💻 配置惰性队列
方式一:声明时指定
原生 Java:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
public class LazyQueueExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin123");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明惰性队列
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-mode", "lazy"); // 设置为惰性模式
channel.queueDeclare(
"lazy.queue", // 队列名称
true, // 持久化
false, // 非排他
false, // 不自动删除
arguments // 参数
);
System.out.println("惰性队列创建成功!");
}
}
}
Spring Boot:
package com.example.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class LazyQueueConfig {
/**
* 方式一:使用 QueueBuilder
*/
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("lazy.queue")
.lazy() // 设置为惰性队列
.build();
}
/**
* 方式二:使用 arguments
*/
@Bean
public Queue lazyQueue2() {
return QueueBuilder.durable("lazy.queue2")
.withArgument("x-queue-mode", "lazy")
.build();
}
}
方式二:通过策略(Policy)设置
# 为所有队列设置惰性模式
rabbitmqctl set_policy lazy-mode ".*" '{"queue-mode":"lazy"}' \
--priority 0 \
--apply-to queues
# 为特定前缀的队列设置
rabbitmqctl set_policy lazy-logs "^logs\." '{"queue-mode":"lazy"}' \
--priority 1 \
--apply-to queues
方式三:运行时修改
# 将普通队列转换为惰性队列
rabbitmqctl set_policy convert-to-lazy "^my-queue$" '{"queue-mode":"lazy"}' \
--priority 10 \
--apply-to queues
# 注意:转换过程中会有短暂的性能下降
🌱 完整 Spring Boot 示例
配置类
package com.example.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class LazyQueueExampleConfig {
public static final String LAZY_EXCHANGE = "lazy.exchange";
public static final String LAZY_QUEUE = "lazy.queue";
public static final String LAZY_ROUTING_KEY = "lazy.key";
@Bean
public DirectExchange lazyExchange() {
return new DirectExchange(LAZY_EXCHANGE);
}
/**
* 惰性队列 - 适合消息堆积场景
*/
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable(LAZY_QUEUE)
.lazy() // 惰性模式
.withArgument("x-max-length", 10000000) // 最大消息数
.build();
}
@Bean
public Binding lazyBinding() {
return BindingBuilder.bind(lazyQueue())
.to(lazyExchange())
.with(LAZY_ROUTING_KEY);
}
}
生产者
package com.example.rabbitmq.producer;
import com.example.rabbitmq.config.LazyQueueExampleConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class LazyQueueProducer {
private final RabbitTemplate rabbitTemplate;
/**
* 批量发送消息(测试消息堆积)
*/
public void sendBatch(int count) {
log.info("开始发送 {} 条消息...", count);
long start = System.currentTimeMillis();
for (int i = 1; i <= count; i++) {
String message = "Message-" + i;
rabbitTemplate.convertAndSend(
LazyQueueExampleConfig.LAZY_EXCHANGE,
LazyQueueExampleConfig.LAZY_ROUTING_KEY,
message
);
if (i % 10000 == 0) {
log.info("已发送 {} 条消息", i);
}
}
long cost = System.currentTimeMillis() - start;
log.info("发送完成!总耗时: {}ms, 平均: {}/秒", cost, count * 1000 / cost);
}
}
消费者
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.config.LazyQueueExampleConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Component
public class LazyQueueConsumer {
private final AtomicInteger count = new AtomicInteger(0);
@RabbitListener(queues = LazyQueueExampleConfig.LAZY_QUEUE)
public void handleMessage(String message) {
int current = count.incrementAndGet();
if (current % 10000 == 0) {
log.info("已消费 {} 条消息", current);
}
// 模拟业务处理
// processMessage(message);
}
}
📊 性能对比测试
测试代码
package com.example.rabbitmq.test;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class QueuePerformanceTest implements CommandLineRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitAdmin rabbitAdmin;
@Override
public void run(String... args) {
// 创建普通队列
Queue normalQueue = QueueBuilder.durable("test.normal.queue").build();
rabbitAdmin.declareQueue(normalQueue);
// 创建惰性队列
Queue lazyQueue = QueueBuilder.durable("test.lazy.queue").lazy().build();
rabbitAdmin.declareQueue(lazyQueue);
int messageCount = 100000;
// 测试普通队列
long normalTime = testQueue("test.normal.queue", messageCount);
log.info("普通队列发送 {} 条消息耗时: {}ms", messageCount, normalTime);
// 测试惰性队列
long lazyTime = testQueue("test.lazy.queue", messageCount);
log.info("惰性队列发送 {} 条消息耗时: {}ms", messageCount, lazyTime);
}
private long testQueue(String queueName, int count) {
long start = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
rabbitTemplate.convertAndSend("", queueName, "Message-" + i);
}
return System.currentTimeMillis() - start;
}
}
测试结果示例
| 指标 | 普通队列 | 惰性队列 |
|---|---|---|
| 发送 10 万条消息 | 5000ms | 8000ms |
| 内存占用 | ~500MB | ~50MB |
| 消费延迟(首条) | 1ms | 5ms |
| 适合堆积消息数 | 百万级 | 亿级 |
🎯 适用场景
✅ 适合使用惰性队列
消息大量堆积
- 消费者处理能力有限
- 定时批量消费场景
内存敏感
- 服务器内存有限
- 多队列共存场景
日志/审计队列
- 可接受一定延迟
- 消息量大
不可预测的流量峰值
- 秒杀活动
- 突发流量
❌ 不适合使用惰性队列
实时性要求高
- 毫秒级延迟要求
消息快速消费
- 消费速度 >= 生产速度
内存充足
- 服务器内存足够
⚠️ 注意事项
1. 消费延迟
// 惰性队列消费时需要从磁盘加载,会有额外延迟
// 首条消息延迟较高,后续消息会预加载
// 可以通过增加预取数量缓解
spring:
rabbitmq:
listener:
simple:
prefetch: 250 # 默认 250,可适当增加
2. 队列转换
// 普通队列转惰性队列
// 会触发消息从内存写入磁盘,期间性能下降
// 建议:在低峰期进行转换
3. 磁盘空间
// 惰性队列依赖磁盘存储
// 需要确保磁盘空间充足
// 监控磁盘使用
rabbitmqctl status | grep disk
4. RabbitMQ 3.12+ 变化
// 从 RabbitMQ 3.12 开始,所有经典队列默认行为接近惰性队列
// 建议:新项目直接使用仲裁队列(Quorum Queue)
@Bean
public Queue quorumQueue() {
return QueueBuilder.durable("quorum.queue")
.quorum() // 仲裁队列,默认就是惰性行为
.build();
}
🔄 与其他特性结合
惰性队列 + 死信队列
@Bean
public Queue lazyQueueWithDlx() {
return QueueBuilder.durable("lazy.with.dlx")
.lazy()
.deadLetterExchange("dlx.exchange")
.deadLetterRoutingKey("dlx.key")
.build();
}
惰性队列 + 消息 TTL
@Bean
public Queue lazyQueueWithTtl() {
return QueueBuilder.durable("lazy.with.ttl")
.lazy()
.ttl(60000) // 消息 60 秒过期
.build();
}
📝 本章小结
| 要点 | 说明 |
|---|---|
| 核心原理 | 消息直接写入磁盘,按需加载到内存 |
| 主要优势 | 低内存占用,支持大量消息堆积 |
| 主要劣势 | 消费延迟略高 |
| 配置方式 | x-queue-mode=lazy 或策略 |
| 适用场景 | 消息堆积、内存敏感、日志队列 |
| 版本建议 | 3.12+ 考虑使用仲裁队列 |
下一步
了解了惰性队列后,让我们学习如何进行 消息追踪 来排查问题!
