Federation & Shovel

Federation 和 Shovel 是 RabbitMQ 提供的跨数据中心消息复制解决方案。

🎯 本章目标

  • 理解 Federation 和 Shovel 的区别
  • 掌握跨数据中心消息同步配置
  • 了解适用场景和最佳实践

📊 方案对比

特性FederationShovel
方向拉取(下游从上游拉)推送(从源推到目标)
配置位置下游节点任意节点
连接方式松耦合紧耦合
断线处理自动重连自动重连
适用场景跨区域订阅点对点复制
Federation(拉取模式):
┌──────────────┐              ┌──────────────┐
│   上游节点    │ ←─── 拉取 ─── │   下游节点    │
│  (upstream)  │              │ (downstream) │
│              │              │              │
│ [exchange-A] │ ──────────→  │ [exchange-A] │
└──────────────┘              └──────────────┘

Shovel(推送模式):
┌──────────────┐              ┌──────────────┐
│   源节点      │ ─── 推送 ───→ │   目标节点    │
│   (source)   │              │(destination) │
│              │              │              │
│  [queue-A]   │ ──────────→  │  [queue-B]   │
└──────────────┘              └──────────────┘

🌐 Federation(联邦)

启用插件

rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management

配置上游(Upstream)

# 在下游节点配置上游连接
rabbitmqctl set_parameter federation-upstream my-upstream \
  '{"uri":"amqp://admin:password@upstream.example.com","expires":3600000}'

参数说明

参数说明默认值
uri上游节点 URI必填
expires消息过期时间(毫秒)
message-ttl消息 TTL
max-hops最大跳数(防止循环)1
prefetch-count预取数量1000
reconnect-delay重连延迟(秒)5
ack-mode确认模式on-confirm
trust-user-id信任用户IDfalse

配置联邦策略

交换机联邦

# 联邦所有以 fed. 开头的交换机
rabbitmqctl set_policy federate-exchanges "^fed\." \
  '{"federation-upstream-set":"all"}' \
  --priority 1 \
  --apply-to exchanges

队列联邦

# 联邦所有以 fed. 开头的队列
rabbitmqctl set_policy federate-queues "^fed\." \
  '{"federation-upstream-set":"all"}' \
  --priority 1 \
  --apply-to queues

通过管理界面配置

  1. 访问 AdminFederation Upstreams
  2. 添加上游:
    • Name: my-upstream
    • URI: amqp://admin:password@upstream.example.com
  3. 添加策略 AdminPolicies
    • Name: fed-exchanges
    • Pattern: ^fed\.
    • Apply to: Exchanges
    • Definition: federation-upstream-set = all

完整示例

# === 上游节点 (upstream.example.com) ===
# 创建交换机
rabbitmqadmin declare exchange name=fed.orders type=fanout

# === 下游节点 (downstream.example.com) ===
# 1. 启用插件
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management

# 2. 配置上游
rabbitmqctl set_parameter federation-upstream upstream-orders \
  '{"uri":"amqp://admin:password@upstream.example.com"}'

# 3. 创建同名交换机
rabbitmqadmin declare exchange name=fed.orders type=fanout

# 4. 配置联邦策略
rabbitmqctl set_policy federate-orders "^fed\.orders$" \
  '{"federation-upstream":"upstream-orders"}' \
  --apply-to exchanges

# 5. 创建本地队列并绑定
rabbitmqadmin declare queue name=local.orders
rabbitmqadmin declare binding source=fed.orders destination=local.orders

# 现在发送到上游 fed.orders 的消息会自动同步到下游

🔄 Shovel(铲子)

启用插件

rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

动态 Shovel 配置

# 将源队列的消息铲到目标队列
rabbitmqctl set_parameter shovel my-shovel \
'{
  "src-uri": "amqp://admin:password@source.example.com",
  "src-queue": "source.queue",
  "dest-uri": "amqp://admin:password@dest.example.com",
  "dest-queue": "dest.queue",
  "prefetch-count": 1000,
  "reconnect-delay": 5,
  "ack-mode": "on-confirm",
  "src-delete-after": "never"
}'

参数说明

参数说明默认值
src-uri源节点 URI必填
src-queue源队列名与 src-exchange 二选一
src-exchange源交换机与 src-queue 二选一
src-exchange-key路由键#
dest-uri目标节点 URI必填
dest-queue目标队列-
dest-exchange目标交换机-
dest-exchange-key目标路由键源路由键
prefetch-count预取数量1000
reconnect-delay重连延迟(秒)5
ack-mode确认模式on-confirm
src-delete-after源队列删除策略never

ack-mode 说明

模式说明安全性
on-confirm目标确认后确认源最高(推荐)
on-publish发送到目标后确认源中等
no-ack不确认最低

静态 Shovel 配置

%% rabbitmq.conf

# 配置静态 Shovel
shovel.my-static-shovel.src.uris = amqp://admin:password@source.example.com
shovel.my-static-shovel.src.queue = source.queue
shovel.my-static-shovel.dest.uris = amqp://admin:password@dest.example.com
shovel.my-static-shovel.dest.queue = dest.queue
shovel.my-static-shovel.ack-mode = on-confirm
shovel.my-static-shovel.reconnect-delay = 5

完整示例

# === 跨机房消息同步 ===

# 场景:北京机房的订单消息同步到上海机房

# 北京机房 (beijing.example.com)
rabbitmqadmin declare queue name=orders.beijing

# 上海机房 (shanghai.example.com)  
rabbitmqadmin declare queue name=orders.shanghai

# 在任意节点配置 Shovel
rabbitmqctl set_parameter shovel beijing-to-shanghai \
'{
  "src-uri": "amqp://admin:password@beijing.example.com",
  "src-queue": "orders.beijing",
  "dest-uri": "amqp://admin:password@shanghai.example.com",
  "dest-queue": "orders.shanghai",
  "ack-mode": "on-confirm"
}'

# 查看 Shovel 状态
rabbitmqctl shovel_status

# 删除 Shovel
rabbitmqctl clear_parameter shovel beijing-to-shanghai

🏗️ 架构设计

多数据中心架构

                   ┌─────────────────┐
                   │   主数据中心     │
                   │    (Primary)    │
                   │                 │
                   │  ┌───────────┐  │
                   │  │ RabbitMQ  │  │
                   │  │  Cluster  │  │
                   │  └─────┬─────┘  │
                   └────────┼────────┘
                            │
         ┌──────────────────┼──────────────────┐
         │                  │                  │
         ▼                  ▼                  ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│   北京机房       │ │   上海机房       │ │   广州机房       │
│                 │ │                 │ │                 │
│  ┌───────────┐  │ │  ┌───────────┐  │ │  ┌───────────┐  │
│  │ RabbitMQ  │  │ │  │ RabbitMQ  │  │ │  │ RabbitMQ  │  │
│  └───────────┘  │ │  └───────────┘  │ │  └───────────┘  │
│        │        │ │        │        │ │        │        │
│     Federation  │ │     Federation  │ │     Federation  │
└─────────────────┘ └─────────────────┘ └─────────────────┘

双活架构

┌─────────────────┐                   ┌─────────────────┐
│    机房 A        │                   │    机房 B        │
│                 │                   │                 │
│  ┌───────────┐  │                   │  ┌───────────┐  │
│  │ RabbitMQ  │◄─┼─── Federation ───►│  │ RabbitMQ  │  │
│  │  Cluster  │──┼─── Shovel ────────┼──│  Cluster  │  │
│  └───────────┘  │                   │  └───────────┘  │
│        │        │                   │        │        │
│  ┌─────┴─────┐  │                   │  ┌─────┴─────┐  │
│  │   App A   │  │                   │  │   App B   │  │
│  └───────────┘  │                   │  └───────────┘  │
└─────────────────┘                   └─────────────────┘

🔧 监控与运维

查看状态

# Federation 状态
rabbitmqctl federation_status

# Shovel 状态
rabbitmqctl shovel_status

# 详细信息
rabbitmqctl eval 'rabbit_federation_status:status().'

故障处理

# 重启 Federation 链接
rabbitmqctl restart_federation_link <link-name>

# 重启 Shovel
rabbitmqctl clear_parameter shovel my-shovel
rabbitmqctl set_parameter shovel my-shovel '...'

监控指标

// 监控 Federation/Shovel 状态
@Slf4j
@Component
public class ReplicationMonitor {
    
    @Scheduled(fixedRate = 60000)
    public void checkReplicationStatus() {
        // 调用管理 API 检查状态
        String url = "http://localhost:15672/api/federation-links";
        // 或 http://localhost:15672/api/shovels
        
        // 检查状态并告警
    }
}

⚠️ 注意事项

1. 避免消息循环

# 使用 max-hops 参数
rabbitmqctl set_parameter federation-upstream my-upstream \
  '{"uri":"amqp://...","max-hops":1}'

2. 网络延迟

- Federation/Shovel 依赖网络连接
- 高延迟网络会影响同步速度
- 建议在同城或低延迟网络使用

3. 消息顺序

- Federation 不保证跨节点消息顺序
- 如需顺序保证,使用单一生产者或序列号

4. 安全配置

# 使用 TLS 连接
rabbitmqctl set_parameter federation-upstream secure-upstream \
  '{"uri":"amqps://admin:password@upstream.example.com:5671"}'

📝 本章小结

特性FederationShovel
模式拉取推送
粒度交换机/队列队列/交换机
配置下游任意
场景订阅分发点对点同步

选型建议

场景推荐方案
分支订阅总部消息Federation
跨机房消息同步Shovel
双活互备Federation + Shovel
消息迁移Shovel

下一步

跨数据中心配置完成后,让我们学习 备份与恢复