高可用方案

集群默认不会同步队列数据,需要配置高可用策略来保证数据安全。

🎯 本章目标

  • 理解 RabbitMQ 高可用原理
  • 掌握镜像队列配置
  • 了解仲裁队列(Quorum Queues)

📦 高可用方案对比

方案版本原理推荐度
镜像队列3.8 前主流主从复制⭐⭐⭐
仲裁队列3.8+ 推荐Raft 协议⭐⭐⭐⭐⭐

🔄 镜像队列

原理

┌─────────────────────────────────────────────────────────┐
│                     RabbitMQ Cluster                     │
│                                                          │
│   ┌─────────────┐   ┌─────────────┐   ┌─────────────┐   │
│   │  rabbitmq1  │   │  rabbitmq2  │   │  rabbitmq3  │   │
│   │             │   │             │   │             │   │
│   │ ┌─────────┐ │   │ ┌─────────┐ │   │ ┌─────────┐ │   │
│   │ │ Queue   │ │──→│ │ Mirror  │ │──→│ │ Mirror  │ │   │
│   │ │(Master) │ │   │ │ (Slave) │ │   │ │ (Slave) │ │   │
│   │ └─────────┘ │   │ └─────────┘ │   │ └─────────┘ │   │
│   └─────────────┘   └─────────────┘   └─────────────┘   │
│                                                          │
└─────────────────────────────────────────────────────────┘

配置策略

# 设置所有队列镜像到所有节点
rabbitmqctl set_policy ha-all ".*" \
  '{"ha-mode":"all"}' \
  --priority 0 \
  --apply-to queues

# 设置指定队列镜像到2个节点
rabbitmqctl set_policy ha-two "^order\." \
  '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' \
  --priority 1 \
  --apply-to queues

# 设置队列镜像到指定节点
rabbitmqctl set_policy ha-nodes "^payment\." \
  '{"ha-mode":"nodes","ha-params":["rabbit@rabbitmq1","rabbit@rabbitmq2"]}' \
  --priority 1 \
  --apply-to queues

策略参数

参数说明
ha-modeall/exactly/nodes
ha-params镜像数量或节点列表
ha-sync-modemanual/automatic
ha-promote-on-shutdownwhen-synced/always
ha-promote-on-failurewhen-synced/always

通过管理界面配置

  1. 访问管理界面 → Admin → Policies
  2. 添加策略:
    • Name: ha-all
    • Pattern: .*
    • Apply to: Queues
    • Definition: ha-mode = all

🎯 仲裁队列(推荐)

优势

特性镜像队列仲裁队列
一致性最终一致强一致
性能较低较高
数据安全可能丢失更安全
配置复杂度

声明仲裁队列

Java 代码方式:

@Configuration
public class QuorumQueueConfig {
    
    @Bean
    public Queue quorumQueue() {
        return QueueBuilder.durable("quorum.queue")
                .quorum()  // 声明为仲裁队列
                .build();
    }
    
    // 或者使用参数
    @Bean
    public Queue quorumQueue2() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-queue-type", "quorum");
        
        return new Queue("quorum.queue2", true, false, false, args);
    }
}

命令行方式:

rabbitmqadmin declare queue name=quorum.queue durable=true \
  arguments='{"x-queue-type":"quorum"}'

仲裁队列配置

@Bean
public Queue quorumQueueWithConfig() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-queue-type", "quorum");
    
    // 投递限制(防止消息堆积)
    args.put("x-delivery-limit", 5);
    
    // 初始组大小
    args.put("x-quorum-initial-group-size", 3);
    
    return new Queue("quorum.configured.queue", true, false, false, args);
}

仲裁队列限制

注意

仲裁队列有以下限制:

  • 不支持非持久化消息
  • 不支持排他队列
  • 不支持消息 TTL(单条消息)
  • 不支持队列长度限制的 drop-head 策略

💻 Spring Boot 配置

spring:
  rabbitmq:
    addresses: rabbitmq1:5672,rabbitmq2:5672,rabbitmq3:5672
    username: admin
    password: admin123
    
    # 连接重试
    connection-timeout: 30000
    
    # 发布确认
    publisher-confirm-type: correlated
    publisher-returns: true
    
    # 消费者配置
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 10
        retry:
          enabled: true
          initial-interval: 1000
          max-attempts: 3

🔍 故障转移测试

@Slf4j
@Component
public class FailoverTest {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    /**
     * 测试故障转移
     */
    public void testFailover() {
        int successCount = 0;
        int failCount = 0;
        
        for (int i = 0; i < 100; i++) {
            try {
                rabbitTemplate.convertAndSend("test.queue", "Message " + i);
                successCount++;
                log.info("✅ 消息 {} 发送成功", i);
            } catch (Exception e) {
                failCount++;
                log.error("❌ 消息 {} 发送失败: {}", i, e.getMessage());
            }
            
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        log.info("测试完成: 成功={}, 失败={}", successCount, failCount);
    }
}

📝 本章小结

方案适用场景
镜像队列兼容旧版本
仲裁队列新项目首选

最佳实践:

  1. ✅ 新项目使用仲裁队列
  2. ✅ 配置消息持久化
  3. ✅ 配置发布确认
  4. ✅ 客户端配置多个节点地址

下一步

高可用配置完成后,学习如何 优化性能