最佳实践
本章总结 RabbitMQ 在生产环境中的最佳实践。
🎯 本章目标
- 掌握生产环境配置规范
- 了解常见问题和解决方案
- 学习运维经验
✅ 生产者最佳实践
1. 使用发布确认
// 开启发布确认
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
2. 设置消息 ID
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setMessageId(UUID.randomUUID().toString());
msg.getMessageProperties().setTimestamp(new Date());
return msg;
});
3. 消息持久化
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
});
4. 失败重试
@Slf4j
@Service
public class ReliableProducer {
private static final int MAX_RETRY = 3;
public boolean sendWithRetry(String message) {
for (int i = 0; i < MAX_RETRY; i++) {
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
return true;
} catch (Exception e) {
log.warn("发送失败,第 {} 次重试", i + 1);
sleep(1000 * (i + 1));
}
}
return false;
}
}
✅ 消费者最佳实践
1. 手动确认
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
2. 合理的 Prefetch
spring:
rabbitmq:
listener:
simple:
prefetch: 10 # 根据处理速度调整
3. 幂等处理
@RabbitListener(queues = "queue")
public void handle(Message message, Channel channel) {
String messageId = message.getMessageProperties().getMessageId();
if (isProcessed(messageId)) {
channel.basicAck(deliveryTag, false);
return;
}
try {
processMessage(message);
markAsProcessed(messageId);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
}
}
4. 异常处理
@RabbitListener(queues = "queue")
public void handle(Message message, Channel channel) {
try {
processMessage(message);
channel.basicAck(deliveryTag, false);
} catch (BusinessException e) {
// 业务异常,不重试
log.error("业务处理失败", e);
channel.basicNack(deliveryTag, false, false);
} catch (Exception e) {
// 系统异常,重试
log.error("系统异常,稍后重试", e);
channel.basicNack(deliveryTag, false, true);
}
}
✅ 队列配置最佳实践
1. 使用仲裁队列
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.quorum()
.build();
}
2. 配置死信队列
@Bean
public Queue businessQueue() {
return QueueBuilder.durable("business.queue")
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.withArgument("x-dead-letter-routing-key", "dlx.key")
.build();
}
3. 设置队列长度
@Bean
public Queue limitedQueue() {
return QueueBuilder.durable("limited.queue")
.withArgument("x-max-length", 100000)
.withArgument("x-overflow", "reject-publish")
.build();
}
✅ 连接配置最佳实践
spring:
rabbitmq:
# 集群地址
addresses: node1:5672,node2:5672,node3:5672
# 连接超时
connection-timeout: 30000
# 心跳
requested-heartbeat: 60
# 缓存
cache:
channel:
size: 25
checkout-timeout: 5000
⚠️ 常见问题
1. 消息丢失
原因:
- 生产者发送失败
- 消息未持久化
- 消费者处理失败
解决:
✅ 开启发布确认
✅ 消息持久化
✅ 手动确认
✅ 死信队列
2. 消息重复
原因:
- 网络超时
- 消费者确认失败
解决:
✅ 消息幂等处理
✅ 唯一消息 ID
✅ 业务去重
3. 消息堆积
原因:
- 消费者处理慢
- 消费者挂掉
解决:
✅ 增加消费者
✅ 优化处理逻辑
✅ 调整 prefetch
✅ 监控告警
4. 连接断开
原因:
- 网络不稳定
- 服务器重启
解决:
✅ 配置自动重连
✅ 多节点地址
✅ 心跳检测
📋 生产环境检查清单
部署前
- [ ] 集群已搭建(至少3节点)
- [ ] 高可用已配置(仲裁队列/镜像队列)
- [ ] 持久化已开启
- [ ] 死信队列已配置
- [ ] 监控告警已就绪
代码检查
- [ ] 发布确认已开启
- [ ] 手动确认已配置
- [ ] 幂等性已实现
- [ ] 异常处理完善
- [ ] 日志记录完整
运维准备
- [ ] 监控 Dashboard 已配置
- [ ] 告警规则已设置
- [ ] 日志收集已配置
- [ ] 备份恢复已测试
- [ ] 故障演练已完成
📝 本章小结
| 方面 | 最佳实践 |
|---|---|
| 生产者 | 发布确认、消息ID、持久化、重试 |
| 消费者 | 手动确认、幂等处理、异常处理 |
| 队列 | 仲裁队列、死信队列、长度限制 |
| 连接 | 集群地址、超时配置、心跳 |
恭喜!
你已经完成了 RabbitMQ 高级篇的学习!现在你已经具备了在生产环境中使用 RabbitMQ 的能力。
🎓 学习总结
通过本教程,你已经学习了:
基础篇:
- RabbitMQ 基本概念和安装
- 五种消息模式
- Spring Boot 集成
进阶篇:
- 消息确认机制
- 死信队列和延时队列
- 消息幂等性
高级篇:
- 集群搭建和高可用
- 性能优化
- 监控告警和最佳实践
祝你在 RabbitMQ 的学习和使用中一切顺利!🎉
