惰性队列

惰性队列(Lazy Queue)是 RabbitMQ 3.6+ 引入的特性,专门用于处理大量消息堆积的场景。

🎯 本章目标

  • 理解惰性队列的原理和优势
  • 掌握惰性队列的配置方法
  • 了解惰性队列的适用场景

📖 什么是惰性队列

普通队列 vs 惰性队列

普通队列(内存优先):
┌─────────────────────────────────────────┐
│                 内存                      │
│  ┌─────┬─────┬─────┬─────┬─────┬─────┐  │
│  │ M1  │ M2  │ M3  │ M4  │ M5  │ ... │  │  ← 消息存储在内存
│  └─────┴─────┴─────┴─────┴─────┴─────┘  │
└─────────────────────────────────────────┘
         │ 内存不足时
         ▼
┌─────────────────────────────────────────┐
│                 磁盘                      │  ← 才会写入磁盘
└─────────────────────────────────────────┘

惰性队列(磁盘优先):
┌─────────────────────────────────────────┐
│                 磁盘                      │
│  ┌─────┬─────┬─────┬─────┬─────┬─────┐  │
│  │ M1  │ M2  │ M3  │ M4  │ M5  │ ... │  │  ← 消息直接写入磁盘
│  └─────┴─────┴─────┴─────┴─────┴─────┘  │
└─────────────────────────────────────────┘
         │ 消费时
         ▼
┌─────────────────────────────────────────┐
│                 内存                      │  ← 按需加载到内存
│  ┌─────┐                                 │
│  │ M1  │                                 │
│  └─────┘                                 │
└─────────────────────────────────────────┘

核心特点

特性普通队列惰性队列
消息存储位置内存优先磁盘优先
内存占用
消息堆积能力有限大量
消费延迟略高
适用场景实时消费消息堆积

💻 配置惰性队列

方式一:声明时指定

原生 Java:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class LazyQueueExample {
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin123");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // 声明惰性队列
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-queue-mode", "lazy");  // 设置为惰性模式
            
            channel.queueDeclare(
                "lazy.queue",    // 队列名称
                true,            // 持久化
                false,           // 非排他
                false,           // 不自动删除
                arguments        // 参数
            );
            
            System.out.println("惰性队列创建成功!");
        }
    }
}

Spring Boot:

package com.example.rabbitmq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class LazyQueueConfig {
    
    /**
     * 方式一:使用 QueueBuilder
     */
    @Bean
    public Queue lazyQueue() {
        return QueueBuilder.durable("lazy.queue")
                .lazy()  // 设置为惰性队列
                .build();
    }
    
    /**
     * 方式二:使用 arguments
     */
    @Bean
    public Queue lazyQueue2() {
        return QueueBuilder.durable("lazy.queue2")
                .withArgument("x-queue-mode", "lazy")
                .build();
    }
}

方式二:通过策略(Policy)设置

# 为所有队列设置惰性模式
rabbitmqctl set_policy lazy-mode ".*" '{"queue-mode":"lazy"}' \
  --priority 0 \
  --apply-to queues

# 为特定前缀的队列设置
rabbitmqctl set_policy lazy-logs "^logs\." '{"queue-mode":"lazy"}' \
  --priority 1 \
  --apply-to queues

方式三:运行时修改

# 将普通队列转换为惰性队列
rabbitmqctl set_policy convert-to-lazy "^my-queue$" '{"queue-mode":"lazy"}' \
  --priority 10 \
  --apply-to queues

# 注意:转换过程中会有短暂的性能下降

🌱 完整 Spring Boot 示例

配置类

package com.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class LazyQueueExampleConfig {
    
    public static final String LAZY_EXCHANGE = "lazy.exchange";
    public static final String LAZY_QUEUE = "lazy.queue";
    public static final String LAZY_ROUTING_KEY = "lazy.key";
    
    @Bean
    public DirectExchange lazyExchange() {
        return new DirectExchange(LAZY_EXCHANGE);
    }
    
    /**
     * 惰性队列 - 适合消息堆积场景
     */
    @Bean
    public Queue lazyQueue() {
        return QueueBuilder.durable(LAZY_QUEUE)
                .lazy()                          // 惰性模式
                .withArgument("x-max-length", 10000000)  // 最大消息数
                .build();
    }
    
    @Bean
    public Binding lazyBinding() {
        return BindingBuilder.bind(lazyQueue())
                .to(lazyExchange())
                .with(LAZY_ROUTING_KEY);
    }
}

生产者

package com.example.rabbitmq.producer;

import com.example.rabbitmq.config.LazyQueueExampleConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class LazyQueueProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 批量发送消息(测试消息堆积)
     */
    public void sendBatch(int count) {
        log.info("开始发送 {} 条消息...", count);
        
        long start = System.currentTimeMillis();
        
        for (int i = 1; i <= count; i++) {
            String message = "Message-" + i;
            rabbitTemplate.convertAndSend(
                    LazyQueueExampleConfig.LAZY_EXCHANGE,
                    LazyQueueExampleConfig.LAZY_ROUTING_KEY,
                    message
            );
            
            if (i % 10000 == 0) {
                log.info("已发送 {} 条消息", i);
            }
        }
        
        long cost = System.currentTimeMillis() - start;
        log.info("发送完成!总耗时: {}ms, 平均: {}/秒", cost, count * 1000 / cost);
    }
}

消费者

package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.LazyQueueExampleConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
public class LazyQueueConsumer {
    
    private final AtomicInteger count = new AtomicInteger(0);
    
    @RabbitListener(queues = LazyQueueExampleConfig.LAZY_QUEUE)
    public void handleMessage(String message) {
        int current = count.incrementAndGet();
        
        if (current % 10000 == 0) {
            log.info("已消费 {} 条消息", current);
        }
        
        // 模拟业务处理
        // processMessage(message);
    }
}

📊 性能对比测试

测试代码

package com.example.rabbitmq.test;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class QueuePerformanceTest implements CommandLineRunner {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private RabbitAdmin rabbitAdmin;
    
    @Override
    public void run(String... args) {
        // 创建普通队列
        Queue normalQueue = QueueBuilder.durable("test.normal.queue").build();
        rabbitAdmin.declareQueue(normalQueue);
        
        // 创建惰性队列
        Queue lazyQueue = QueueBuilder.durable("test.lazy.queue").lazy().build();
        rabbitAdmin.declareQueue(lazyQueue);
        
        int messageCount = 100000;
        
        // 测试普通队列
        long normalTime = testQueue("test.normal.queue", messageCount);
        log.info("普通队列发送 {} 条消息耗时: {}ms", messageCount, normalTime);
        
        // 测试惰性队列
        long lazyTime = testQueue("test.lazy.queue", messageCount);
        log.info("惰性队列发送 {} 条消息耗时: {}ms", messageCount, lazyTime);
    }
    
    private long testQueue(String queueName, int count) {
        long start = System.currentTimeMillis();
        
        for (int i = 0; i < count; i++) {
            rabbitTemplate.convertAndSend("", queueName, "Message-" + i);
        }
        
        return System.currentTimeMillis() - start;
    }
}

测试结果示例

指标普通队列惰性队列
发送 10 万条消息5000ms8000ms
内存占用~500MB~50MB
消费延迟(首条)1ms5ms
适合堆积消息数百万级亿级

🎯 适用场景

✅ 适合使用惰性队列

  1. 消息大量堆积

    • 消费者处理能力有限
    • 定时批量消费场景
  2. 内存敏感

    • 服务器内存有限
    • 多队列共存场景
  3. 日志/审计队列

    • 可接受一定延迟
    • 消息量大
  4. 不可预测的流量峰值

    • 秒杀活动
    • 突发流量

❌ 不适合使用惰性队列

  1. 实时性要求高

    • 毫秒级延迟要求
  2. 消息快速消费

    • 消费速度 >= 生产速度
  3. 内存充足

    • 服务器内存足够

⚠️ 注意事项

1. 消费延迟

// 惰性队列消费时需要从磁盘加载,会有额外延迟
// 首条消息延迟较高,后续消息会预加载

// 可以通过增加预取数量缓解
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 250  # 默认 250,可适当增加

2. 队列转换

// 普通队列转惰性队列
// 会触发消息从内存写入磁盘,期间性能下降

// 建议:在低峰期进行转换

3. 磁盘空间

// 惰性队列依赖磁盘存储
// 需要确保磁盘空间充足

// 监控磁盘使用
rabbitmqctl status | grep disk

4. RabbitMQ 3.12+ 变化

// 从 RabbitMQ 3.12 开始,所有经典队列默认行为接近惰性队列
// 建议:新项目直接使用仲裁队列(Quorum Queue)

@Bean
public Queue quorumQueue() {
    return QueueBuilder.durable("quorum.queue")
            .quorum()  // 仲裁队列,默认就是惰性行为
            .build();
}

🔄 与其他特性结合

惰性队列 + 死信队列

@Bean
public Queue lazyQueueWithDlx() {
    return QueueBuilder.durable("lazy.with.dlx")
            .lazy()
            .deadLetterExchange("dlx.exchange")
            .deadLetterRoutingKey("dlx.key")
            .build();
}

惰性队列 + 消息 TTL

@Bean
public Queue lazyQueueWithTtl() {
    return QueueBuilder.durable("lazy.with.ttl")
            .lazy()
            .ttl(60000)  // 消息 60 秒过期
            .build();
}

📝 本章小结

要点说明
核心原理消息直接写入磁盘,按需加载到内存
主要优势低内存占用,支持大量消息堆积
主要劣势消费延迟略高
配置方式x-queue-mode=lazy 或策略
适用场景消息堆积、内存敏感、日志队列
版本建议3.12+ 考虑使用仲裁队列

下一步

了解了惰性队列后,让我们学习如何进行 消息追踪 来排查问题!