Federation 和 Shovel 是 RabbitMQ 提供的跨数据中心消息复制解决方案。
- 理解 Federation 和 Shovel 的区别
- 掌握跨数据中心消息同步配置
- 了解适用场景和最佳实践
| 特性 | Federation | Shovel |
|---|
| 方向 | 拉取(下游从上游拉) | 推送(从源推到目标) |
| 配置位置 | 下游节点 | 任意节点 |
| 连接方式 | 松耦合 | 紧耦合 |
| 断线处理 | 自动重连 | 自动重连 |
| 适用场景 | 跨区域订阅 | 点对点复制 |
Federation(拉取模式):
┌──────────────┐ ┌──────────────┐
│ 上游节点 │ ←─── 拉取 ─── │ 下游节点 │
│ (upstream) │ │ (downstream) │
│ │ │ │
│ [exchange-A] │ ──────────→ │ [exchange-A] │
└──────────────┘ └──────────────┘
Shovel(推送模式):
┌──────────────┐ ┌──────────────┐
│ 源节点 │ ─── 推送 ───→ │ 目标节点 │
│ (source) │ │(destination) │
│ │ │ │
│ [queue-A] │ ──────────→ │ [queue-B] │
└──────────────┘ └──────────────┘
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
rabbitmqctl set_parameter federation-upstream my-upstream \
'{"uri":"amqp://admin:password@upstream.example.com","expires":3600000}'
| 参数 | 说明 | 默认值 |
|---|
| uri | 上游节点 URI | 必填 |
| expires | 消息过期时间(毫秒) | 无 |
| message-ttl | 消息 TTL | 无 |
| max-hops | 最大跳数(防止循环) | 1 |
| prefetch-count | 预取数量 | 1000 |
| reconnect-delay | 重连延迟(秒) | 5 |
| ack-mode | 确认模式 | on-confirm |
| trust-user-id | 信任用户ID | false |
rabbitmqctl set_policy federate-exchanges "^fed\." \
'{"federation-upstream-set":"all"}' \
--priority 1 \
--apply-to exchanges
rabbitmqctl set_policy federate-queues "^fed\." \
'{"federation-upstream-set":"all"}' \
--priority 1 \
--apply-to queues
- 访问
Admin → Federation Upstreams - 添加上游:
- Name:
my-upstream - URI:
amqp://admin:password@upstream.example.com
- 添加策略
Admin → Policies: - Name:
fed-exchanges - Pattern:
^fed\. - Apply to:
Exchanges - Definition:
federation-upstream-set = all
rabbitmqadmin declare exchange name=fed.orders type=fanout
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
rabbitmqctl set_parameter federation-upstream upstream-orders \
'{"uri":"amqp://admin:password@upstream.example.com"}'
rabbitmqadmin declare exchange name=fed.orders type=fanout
rabbitmqctl set_policy federate-orders "^fed\.orders$" \
'{"federation-upstream":"upstream-orders"}' \
--apply-to exchanges
rabbitmqadmin declare queue name=local.orders
rabbitmqadmin declare binding source=fed.orders destination=local.orders
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
rabbitmqctl set_parameter shovel my-shovel \
'{
"src-uri": "amqp://admin:password@source.example.com",
"src-queue": "source.queue",
"dest-uri": "amqp://admin:password@dest.example.com",
"dest-queue": "dest.queue",
"prefetch-count": 1000,
"reconnect-delay": 5,
"ack-mode": "on-confirm",
"src-delete-after": "never"
}'
| 参数 | 说明 | 默认值 |
|---|
| src-uri | 源节点 URI | 必填 |
| src-queue | 源队列名 | 与 src-exchange 二选一 |
| src-exchange | 源交换机 | 与 src-queue 二选一 |
| src-exchange-key | 路由键 | # |
| dest-uri | 目标节点 URI | 必填 |
| dest-queue | 目标队列 | - |
| dest-exchange | 目标交换机 | - |
| dest-exchange-key | 目标路由键 | 源路由键 |
| prefetch-count | 预取数量 | 1000 |
| reconnect-delay | 重连延迟(秒) | 5 |
| ack-mode | 确认模式 | on-confirm |
| src-delete-after | 源队列删除策略 | never |
| 模式 | 说明 | 安全性 |
|---|
| on-confirm | 目标确认后确认源 | 最高(推荐) |
| on-publish | 发送到目标后确认源 | 中等 |
| no-ack | 不确认 | 最低 |
# 配置静态 Shovel
shovel.my-static-shovel.src.uris = amqp://admin:password@source.example.com
shovel.my-static-shovel.src.queue = source.queue
shovel.my-static-shovel.dest.uris = amqp://admin:password@dest.example.com
shovel.my-static-shovel.dest.queue = dest.queue
shovel.my-static-shovel.ack-mode = on-confirm
shovel.my-static-shovel.reconnect-delay = 5
rabbitmqadmin declare queue name=orders.beijing
rabbitmqadmin declare queue name=orders.shanghai
rabbitmqctl set_parameter shovel beijing-to-shanghai \
'{
"src-uri": "amqp://admin:password@beijing.example.com",
"src-queue": "orders.beijing",
"dest-uri": "amqp://admin:password@shanghai.example.com",
"dest-queue": "orders.shanghai",
"ack-mode": "on-confirm"
}'
rabbitmqctl shovel_status
rabbitmqctl clear_parameter shovel beijing-to-shanghai
┌─────────────────┐
│ 主数据中心 │
│ (Primary) │
│ │
│ ┌───────────┐ │
│ │ RabbitMQ │ │
│ │ Cluster │ │
│ └─────┬─────┘ │
└────────┼────────┘
│
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 北京机房 │ │ 上海机房 │ │ 广州机房 │
│ │ │ │ │ │
│ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │
│ │ RabbitMQ │ │ │ │ RabbitMQ │ │ │ │ RabbitMQ │ │
│ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │
│ │ │ │ │ │ │ │ │
│ Federation │ │ Federation │ │ Federation │
└─────────────────┘ └─────────────────┘ └─────────────────┘
┌─────────────────┐ ┌─────────────────┐
│ 机房 A │ │ 机房 B │
│ │ │ │
│ ┌───────────┐ │ │ ┌───────────┐ │
│ │ RabbitMQ │◄─┼─── Federation ───►│ │ RabbitMQ │ │
│ │ Cluster │──┼─── Shovel ────────┼──│ Cluster │ │
│ └───────────┘ │ │ └───────────┘ │
│ │ │ │ │ │
│ ┌─────┴─────┐ │ │ ┌─────┴─────┐ │
│ │ App A │ │ │ │ App B │ │
│ └───────────┘ │ │ └───────────┘ │
└─────────────────┘ └─────────────────┘
rabbitmqctl federation_status
rabbitmqctl shovel_status
rabbitmqctl eval 'rabbit_federation_status:status().'
rabbitmqctl restart_federation_link <link-name>
rabbitmqctl clear_parameter shovel my-shovel
rabbitmqctl set_parameter shovel my-shovel '...'
@Slf4j
@Component
public class ReplicationMonitor {
@Scheduled(fixedRate = 60000)
public void checkReplicationStatus() {
String url = "http://localhost:15672/api/federation-links";
}
}
rabbitmqctl set_parameter federation-upstream my-upstream \
'{"uri":"amqp://...","max-hops":1}'
- Federation/Shovel 依赖网络连接
- 高延迟网络会影响同步速度
- 建议在同城或低延迟网络使用
- Federation 不保证跨节点消息顺序
- 如需顺序保证,使用单一生产者或序列号
rabbitmqctl set_parameter federation-upstream secure-upstream \
'{"uri":"amqps://admin:password@upstream.example.com:5671"}'
| 特性 | Federation | Shovel |
|---|
| 模式 | 拉取 | 推送 |
| 粒度 | 交换机/队列 | 队列/交换机 |
| 配置 | 下游 | 任意 |
| 场景 | 订阅分发 | 点对点同步 |
| 场景 | 推荐方案 |
|---|
| 分支订阅总部消息 | Federation |
| 跨机房消息同步 | Shovel |
| 双活互备 | Federation + Shovel |
| 消息迁移 | Shovel |
下一步
跨数据中心配置完成后,让我们学习 备份与恢复!