消息确认机制
消息确认机制是保证消息不丢失的核心手段。
🎯 本章目标
- 理解消息丢失的场景
- 掌握生产者确认机制
- 掌握消费者确认机制
- 实现消息的可靠投递
⚠️ 消息丢失场景
┌──────────┐ ① ┌──────────┐ ② ┌─────────┐ ③ ┌──────────┐
│ Producer │ ──────→ │ Exchange │ ──────→ │ Queue │ ──────→ │ Consumer │
└──────────┘ └──────────┘ └─────────┘ └──────────┘
↓ ↓ ↓ ↓
网络异常 路由失败 服务器宕机 处理失败
| 阶段 | 丢失原因 | 解决方案 |
|---|---|---|
| ① | 网络抖动、服务不可用 | 发布确认 (Publisher Confirm) |
| ② | 路由键错误、队列不存在 | 消息回退 (Publisher Return) |
| ③ | 服务重启、内存溢出 | 消息持久化 + 镜像队列 |
| ④ | 消费者崩溃、处理异常 | 消费者确认 (Consumer Ack) |
📤 生产者确认
1. 配置开启确认
spring:
rabbitmq:
# 开启发布确认
publisher-confirm-type: correlated
# 开启消息回退
publisher-returns: true
confirm-type 说明:
| 值 | 说明 |
|---|---|
| none | 禁用(默认) |
| simple | 同步等待确认 |
| correlated | 异步回调确认(推荐) |
2. 配置 RabbitTemplate
package com.example.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RabbitConfirmConfig {
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static final String CONFIRM_QUEUE = "confirm.queue";
public static final String CONFIRM_ROUTING_KEY = "confirm.key";
@Bean
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE);
}
@Bean
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE).build();
}
@Bean
public Binding confirmBinding() {
return BindingBuilder.bind(confirmQueue())
.to(confirmExchange())
.with(CONFIRM_ROUTING_KEY);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 设置消息到达交换机的确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("✅ 消息成功到达交换机, ID: {}", id);
} else {
log.error("❌ 消息未到达交换机, ID: {}, 原因: {}", id, cause);
// 这里可以进行消息补偿,如重发或记录到数据库
handleConfirmFail(correlationData, cause);
}
});
// 设置消息路由失败的回调
rabbitTemplate.setReturnsCallback(returned -> {
log.error("❌ 消息路由失败:");
log.error(" 消息: {}", new String(returned.getMessage().getBody()));
log.error(" 交换机: {}", returned.getExchange());
log.error(" 路由键: {}", returned.getRoutingKey());
log.error(" 原因: {}", returned.getReplyText());
// 处理路由失败的消息
handleRouteFail(returned);
});
// 必须设置为 true,否则消息路由失败不会触发回调
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
private void handleConfirmFail(CorrelationData correlationData, String cause) {
// 1. 记录到数据库
// 2. 发送告警
// 3. 定时任务重试
log.warn("消息发送失败,进入补偿流程");
}
private void handleRouteFail(ReturnedMessage returned) {
// 1. 记录到数据库
// 2. 人工处理
log.warn("消息路由失败,进入补偿流程");
}
}
3. 发送消息(带确认)
package com.example.rabbitmq.producer;
import com.example.rabbitmq.config.RabbitConfirmConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Slf4j
@Service
@RequiredArgsConstructor
public class ConfirmProducer {
private final RabbitTemplate rabbitTemplate;
/**
* 发送消息(带确认)
*/
public void sendWithConfirm(String message) {
// 创建关联数据,用于在回调中识别消息
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 可以存储额外信息,便于后续补偿
correlationData.setReturned(null);
log.info("发送消息, ID: {}, 内容: {}", correlationData.getId(), message);
rabbitTemplate.convertAndSend(
RabbitConfirmConfig.CONFIRM_EXCHANGE,
RabbitConfirmConfig.CONFIRM_ROUTING_KEY,
message,
correlationData
);
}
/**
* 发送消息到错误的路由键(测试路由失败)
*/
public void sendWithWrongRoutingKey(String message) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("发送消息(错误路由键), ID: {}", correlationData.getId());
rabbitTemplate.convertAndSend(
RabbitConfirmConfig.CONFIRM_EXCHANGE,
"wrong.routing.key", // 错误的路由键
message,
correlationData
);
}
}
📥 消费者确认
确认模式
| 模式 | 说明 | 适用场景 |
|---|---|---|
| AUTO | 处理成功自动确认,异常自动拒绝 | 简单场景 |
| MANUAL | 手动调用确认/拒绝 | 生产环境推荐 |
| NONE | 不确认,消息送达即删除 | 不推荐 |
手动确认示例
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.config.RabbitConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
public class ConfirmConsumer {
// 记录重试次数
private static final int MAX_RETRY = 3;
@RabbitListener(queues = RabbitConfirmConfig.CONFIRM_QUEUE)
public void handleMessage(String message, Channel channel, Message amqpMessage)
throws IOException {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
// 获取重试次数
Integer retryCount = (Integer) amqpMessage.getMessageProperties()
.getHeaders().getOrDefault("x-retry-count", 0);
try {
log.info("📥 收到消息: {}", message);
// 模拟业务处理
processMessage(message);
// 处理成功,确认消息
channel.basicAck(deliveryTag, false);
log.info("✅ 消息处理成功,已确认");
} catch (Exception e) {
log.error("❌ 消息处理失败: {}", e.getMessage());
if (retryCount < MAX_RETRY) {
// 未超过最大重试次数,重新入队
log.warn("🔄 第 {} 次重试", retryCount + 1);
channel.basicNack(deliveryTag, false, true);
} else {
// 超过最大重试次数,拒绝消息(进入死信队列)
log.error("🗑️ 超过最大重试次数,拒绝消息");
channel.basicNack(deliveryTag, false, false);
}
}
}
private void processMessage(String message) throws Exception {
// 模拟处理异常
if (message.contains("error")) {
throw new RuntimeException("处理失败");
}
Thread.sleep(100);
}
}
确认方法详解
// 确认消息
channel.basicAck(
deliveryTag, // 消息标签
false // multiple: false=只确认当前消息, true=确认当前及之前所有消息
);
// 拒绝消息
channel.basicNack(
deliveryTag, // 消息标签
false, // multiple: 是否批量拒绝
true // requeue: true=重新入队, false=丢弃或进入死信队列
);
// 拒绝单条消息(basicNack 的简化版)
channel.basicReject(
deliveryTag, // 消息标签
true // requeue: true=重新入队, false=丢弃
);
💾 消息持久化
确保消息在服务器重启后不丢失:
1. 交换机持久化
// durable = true
@Bean
public DirectExchange durableExchange() {
return new DirectExchange("durable.exchange", true, false);
}
2. 队列持久化
// durable = true
@Bean
public Queue durableQueue() {
return QueueBuilder.durable("durable.queue").build();
}
3. 消息持久化
// 设置消息持久化
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
});
🔄 完整可靠投递方案
package com.example.rabbitmq.reliable;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* 可靠消息发送服务
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class ReliableMessageService {
private final RabbitTemplate rabbitTemplate;
// 消息状态存储(生产环境使用数据库)
private final Map<String, MessageRecord> messageStore = new ConcurrentHashMap<>();
/**
* 发送可靠消息
*/
public String sendReliableMessage(String exchange, String routingKey, Object message) {
String messageId = UUID.randomUUID().toString();
// 1. 保存消息记录
MessageRecord record = new MessageRecord();
record.setMessageId(messageId);
record.setExchange(exchange);
record.setRoutingKey(routingKey);
record.setContent(message.toString());
record.setStatus(MessageStatus.SENDING);
record.setCreateTime(LocalDateTime.now());
record.setRetryCount(0);
messageStore.put(messageId, record);
log.info("📝 保存消息记录: {}", messageId);
// 2. 发送消息
CorrelationData correlationData = new CorrelationData(messageId);
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setMessageId(messageId);
return msg;
}, correlationData);
return messageId;
}
/**
* 处理发送确认
*/
public void handleConfirm(String messageId, boolean ack) {
MessageRecord record = messageStore.get(messageId);
if (record == null) return;
if (ack) {
record.setStatus(MessageStatus.SUCCESS);
log.info("✅ 消息发送成功: {}", messageId);
} else {
record.setStatus(MessageStatus.FAILED);
log.error("❌ 消息发送失败: {}", messageId);
}
}
/**
* 重发失败消息(定时任务调用)
*/
public void resendFailedMessages() {
messageStore.values().stream()
.filter(r -> r.getStatus() == MessageStatus.FAILED)
.filter(r -> r.getRetryCount() < 3)
.forEach(record -> {
record.setRetryCount(record.getRetryCount() + 1);
record.setStatus(MessageStatus.SENDING);
CorrelationData correlationData = new CorrelationData(record.getMessageId());
rabbitTemplate.convertAndSend(
record.getExchange(),
record.getRoutingKey(),
record.getContent(),
correlationData
);
log.info("🔄 重发消息: {}, 第 {} 次", record.getMessageId(), record.getRetryCount());
});
}
@Data
static class MessageRecord {
private String messageId;
private String exchange;
private String routingKey;
private String content;
private MessageStatus status;
private LocalDateTime createTime;
private int retryCount;
}
enum MessageStatus {
SENDING, SUCCESS, FAILED
}
}
📝 本章小结
| 机制 | 作用 | 配置 |
|---|---|---|
| Publisher Confirm | 确认消息到达交换机 | publisher-confirm-type |
| Publisher Return | 处理路由失败的消息 | publisher-returns |
| Consumer Ack | 确认消息处理完成 | acknowledge-mode: manual |
| 消息持久化 | 防止服务器重启丢失 | durable + deliveryMode |
下一步
消息确认保证了消息不丢失,但处理失败的消息怎么办?请学习 死信队列!
