高可用方案
集群默认不会同步队列数据,需要配置高可用策略来保证数据安全。
🎯 本章目标
- 理解 RabbitMQ 高可用原理
- 掌握镜像队列配置
- 了解仲裁队列(Quorum Queues)
📦 高可用方案对比
| 方案 | 版本 | 原理 | 推荐度 |
|---|---|---|---|
| 镜像队列 | 3.8 前主流 | 主从复制 | ⭐⭐⭐ |
| 仲裁队列 | 3.8+ 推荐 | Raft 协议 | ⭐⭐⭐⭐⭐ |
🔄 镜像队列
原理
┌─────────────────────────────────────────────────────────┐
│ RabbitMQ Cluster │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ rabbitmq1 │ │ rabbitmq2 │ │ rabbitmq3 │ │
│ │ │ │ │ │ │ │
│ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │
│ │ │ Queue │ │──→│ │ Mirror │ │──→│ │ Mirror │ │ │
│ │ │(Master) │ │ │ │ (Slave) │ │ │ │ (Slave) │ │ │
│ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
配置策略
# 设置所有队列镜像到所有节点
rabbitmqctl set_policy ha-all ".*" \
'{"ha-mode":"all"}' \
--priority 0 \
--apply-to queues
# 设置指定队列镜像到2个节点
rabbitmqctl set_policy ha-two "^order\." \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' \
--priority 1 \
--apply-to queues
# 设置队列镜像到指定节点
rabbitmqctl set_policy ha-nodes "^payment\." \
'{"ha-mode":"nodes","ha-params":["rabbit@rabbitmq1","rabbit@rabbitmq2"]}' \
--priority 1 \
--apply-to queues
策略参数
| 参数 | 说明 |
|---|---|
| ha-mode | all/exactly/nodes |
| ha-params | 镜像数量或节点列表 |
| ha-sync-mode | manual/automatic |
| ha-promote-on-shutdown | when-synced/always |
| ha-promote-on-failure | when-synced/always |
通过管理界面配置
- 访问管理界面 → Admin → Policies
- 添加策略:
- Name: ha-all
- Pattern: .*
- Apply to: Queues
- Definition: ha-mode = all
🎯 仲裁队列(推荐)
优势
| 特性 | 镜像队列 | 仲裁队列 |
|---|---|---|
| 一致性 | 最终一致 | 强一致 |
| 性能 | 较低 | 较高 |
| 数据安全 | 可能丢失 | 更安全 |
| 配置复杂度 | 高 | 低 |
声明仲裁队列
Java 代码方式:
@Configuration
public class QuorumQueueConfig {
@Bean
public Queue quorumQueue() {
return QueueBuilder.durable("quorum.queue")
.quorum() // 声明为仲裁队列
.build();
}
// 或者使用参数
@Bean
public Queue quorumQueue2() {
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
return new Queue("quorum.queue2", true, false, false, args);
}
}
命令行方式:
rabbitmqadmin declare queue name=quorum.queue durable=true \
arguments='{"x-queue-type":"quorum"}'
仲裁队列配置
@Bean
public Queue quorumQueueWithConfig() {
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
// 投递限制(防止消息堆积)
args.put("x-delivery-limit", 5);
// 初始组大小
args.put("x-quorum-initial-group-size", 3);
return new Queue("quorum.configured.queue", true, false, false, args);
}
仲裁队列限制
注意
仲裁队列有以下限制:
- 不支持非持久化消息
- 不支持排他队列
- 不支持消息 TTL(单条消息)
- 不支持队列长度限制的 drop-head 策略
💻 Spring Boot 配置
spring:
rabbitmq:
addresses: rabbitmq1:5672,rabbitmq2:5672,rabbitmq3:5672
username: admin
password: admin123
# 连接重试
connection-timeout: 30000
# 发布确认
publisher-confirm-type: correlated
publisher-returns: true
# 消费者配置
listener:
simple:
acknowledge-mode: manual
prefetch: 10
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
🔍 故障转移测试
@Slf4j
@Component
public class FailoverTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 测试故障转移
*/
public void testFailover() {
int successCount = 0;
int failCount = 0;
for (int i = 0; i < 100; i++) {
try {
rabbitTemplate.convertAndSend("test.queue", "Message " + i);
successCount++;
log.info("✅ 消息 {} 发送成功", i);
} catch (Exception e) {
failCount++;
log.error("❌ 消息 {} 发送失败: {}", i, e.getMessage());
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
log.info("测试完成: 成功={}, 失败={}", successCount, failCount);
}
}
📝 本章小结
| 方案 | 适用场景 |
|---|---|
| 镜像队列 | 兼容旧版本 |
| 仲裁队列 | 新项目首选 |
最佳实践:
- ✅ 新项目使用仲裁队列
- ✅ 配置消息持久化
- ✅ 配置发布确认
- ✅ 客户端配置多个节点地址
下一步
高可用配置完成后,学习如何 优化性能!
