消息确认机制

消息确认机制是保证消息不丢失的核心手段。

🎯 本章目标

  • 理解消息丢失的场景
  • 掌握生产者确认机制
  • 掌握消费者确认机制
  • 实现消息的可靠投递

⚠️ 消息丢失场景

┌──────────┐    ①    ┌──────────┐    ②    ┌─────────┐    ③    ┌──────────┐
│ Producer │ ──────→ │ Exchange │ ──────→ │  Queue  │ ──────→ │ Consumer │
└──────────┘         └──────────┘         └─────────┘         └──────────┘
      ↓                    ↓                   ↓                    ↓
   网络异常             路由失败            服务器宕机           处理失败
阶段丢失原因解决方案
网络抖动、服务不可用发布确认 (Publisher Confirm)
路由键错误、队列不存在消息回退 (Publisher Return)
服务重启、内存溢出消息持久化 + 镜像队列
消费者崩溃、处理异常消费者确认 (Consumer Ack)

📤 生产者确认

1. 配置开启确认

spring:
  rabbitmq:
    # 开启发布确认
    publisher-confirm-type: correlated
    # 开启消息回退
    publisher-returns: true

confirm-type 说明:

说明
none禁用(默认)
simple同步等待确认
correlated异步回调确认(推荐)

2. 配置 RabbitTemplate

package com.example.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RabbitConfirmConfig {
    
    public static final String CONFIRM_EXCHANGE = "confirm.exchange";
    public static final String CONFIRM_QUEUE = "confirm.queue";
    public static final String CONFIRM_ROUTING_KEY = "confirm.key";
    
    @Bean
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE);
    }
    
    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }
    
    @Bean
    public Binding confirmBinding() {
        return BindingBuilder.bind(confirmQueue())
                .to(confirmExchange())
                .with(CONFIRM_ROUTING_KEY);
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        
        // 设置消息到达交换机的确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            String id = correlationData != null ? correlationData.getId() : "";
            
            if (ack) {
                log.info("✅ 消息成功到达交换机, ID: {}", id);
            } else {
                log.error("❌ 消息未到达交换机, ID: {}, 原因: {}", id, cause);
                // 这里可以进行消息补偿,如重发或记录到数据库
                handleConfirmFail(correlationData, cause);
            }
        });
        
        // 设置消息路由失败的回调
        rabbitTemplate.setReturnsCallback(returned -> {
            log.error("❌ 消息路由失败:");
            log.error("   消息: {}", new String(returned.getMessage().getBody()));
            log.error("   交换机: {}", returned.getExchange());
            log.error("   路由键: {}", returned.getRoutingKey());
            log.error("   原因: {}", returned.getReplyText());
            
            // 处理路由失败的消息
            handleRouteFail(returned);
        });
        
        // 必须设置为 true,否则消息路由失败不会触发回调
        rabbitTemplate.setMandatory(true);
        
        return rabbitTemplate;
    }
    
    private void handleConfirmFail(CorrelationData correlationData, String cause) {
        // 1. 记录到数据库
        // 2. 发送告警
        // 3. 定时任务重试
        log.warn("消息发送失败,进入补偿流程");
    }
    
    private void handleRouteFail(ReturnedMessage returned) {
        // 1. 记录到数据库
        // 2. 人工处理
        log.warn("消息路由失败,进入补偿流程");
    }
}

3. 发送消息(带确认)

package com.example.rabbitmq.producer;

import com.example.rabbitmq.config.RabbitConfirmConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Slf4j
@Service
@RequiredArgsConstructor
public class ConfirmProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 发送消息(带确认)
     */
    public void sendWithConfirm(String message) {
        // 创建关联数据,用于在回调中识别消息
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        
        // 可以存储额外信息,便于后续补偿
        correlationData.setReturned(null);
        
        log.info("发送消息, ID: {}, 内容: {}", correlationData.getId(), message);
        
        rabbitTemplate.convertAndSend(
            RabbitConfirmConfig.CONFIRM_EXCHANGE,
            RabbitConfirmConfig.CONFIRM_ROUTING_KEY,
            message,
            correlationData
        );
    }
    
    /**
     * 发送消息到错误的路由键(测试路由失败)
     */
    public void sendWithWrongRoutingKey(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        
        log.info("发送消息(错误路由键), ID: {}", correlationData.getId());
        
        rabbitTemplate.convertAndSend(
            RabbitConfirmConfig.CONFIRM_EXCHANGE,
            "wrong.routing.key",  // 错误的路由键
            message,
            correlationData
        );
    }
}

📥 消费者确认

确认模式

模式说明适用场景
AUTO处理成功自动确认,异常自动拒绝简单场景
MANUAL手动调用确认/拒绝生产环境推荐
NONE不确认,消息送达即删除不推荐

手动确认示例

package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.RabbitConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class ConfirmConsumer {
    
    // 记录重试次数
    private static final int MAX_RETRY = 3;
    
    @RabbitListener(queues = RabbitConfirmConfig.CONFIRM_QUEUE)
    public void handleMessage(String message, Channel channel, Message amqpMessage) 
            throws IOException {
        
        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
        
        // 获取重试次数
        Integer retryCount = (Integer) amqpMessage.getMessageProperties()
                .getHeaders().getOrDefault("x-retry-count", 0);
        
        try {
            log.info("📥 收到消息: {}", message);
            
            // 模拟业务处理
            processMessage(message);
            
            // 处理成功,确认消息
            channel.basicAck(deliveryTag, false);
            log.info("✅ 消息处理成功,已确认");
            
        } catch (Exception e) {
            log.error("❌ 消息处理失败: {}", e.getMessage());
            
            if (retryCount < MAX_RETRY) {
                // 未超过最大重试次数,重新入队
                log.warn("🔄 第 {} 次重试", retryCount + 1);
                channel.basicNack(deliveryTag, false, true);
            } else {
                // 超过最大重试次数,拒绝消息(进入死信队列)
                log.error("🗑️ 超过最大重试次数,拒绝消息");
                channel.basicNack(deliveryTag, false, false);
            }
        }
    }
    
    private void processMessage(String message) throws Exception {
        // 模拟处理异常
        if (message.contains("error")) {
            throw new RuntimeException("处理失败");
        }
        Thread.sleep(100);
    }
}

确认方法详解

// 确认消息
channel.basicAck(
    deliveryTag,  // 消息标签
    false         // multiple: false=只确认当前消息, true=确认当前及之前所有消息
);

// 拒绝消息
channel.basicNack(
    deliveryTag,  // 消息标签
    false,        // multiple: 是否批量拒绝
    true          // requeue: true=重新入队, false=丢弃或进入死信队列
);

// 拒绝单条消息(basicNack 的简化版)
channel.basicReject(
    deliveryTag,  // 消息标签
    true          // requeue: true=重新入队, false=丢弃
);

💾 消息持久化

确保消息在服务器重启后不丢失:

1. 交换机持久化

// durable = true
@Bean
public DirectExchange durableExchange() {
    return new DirectExchange("durable.exchange", true, false);
}

2. 队列持久化

// durable = true
@Bean
public Queue durableQueue() {
    return QueueBuilder.durable("durable.queue").build();
}

3. 消息持久化

// 设置消息持久化
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
    msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    return msg;
});

🔄 完整可靠投递方案

package com.example.rabbitmq.reliable;

import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 可靠消息发送服务
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class ReliableMessageService {
    
    private final RabbitTemplate rabbitTemplate;
    
    // 消息状态存储(生产环境使用数据库)
    private final Map<String, MessageRecord> messageStore = new ConcurrentHashMap<>();
    
    /**
     * 发送可靠消息
     */
    public String sendReliableMessage(String exchange, String routingKey, Object message) {
        String messageId = UUID.randomUUID().toString();
        
        // 1. 保存消息记录
        MessageRecord record = new MessageRecord();
        record.setMessageId(messageId);
        record.setExchange(exchange);
        record.setRoutingKey(routingKey);
        record.setContent(message.toString());
        record.setStatus(MessageStatus.SENDING);
        record.setCreateTime(LocalDateTime.now());
        record.setRetryCount(0);
        
        messageStore.put(messageId, record);
        log.info("📝 保存消息记录: {}", messageId);
        
        // 2. 发送消息
        CorrelationData correlationData = new CorrelationData(messageId);
        
        rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
            msg.getMessageProperties().setMessageId(messageId);
            return msg;
        }, correlationData);
        
        return messageId;
    }
    
    /**
     * 处理发送确认
     */
    public void handleConfirm(String messageId, boolean ack) {
        MessageRecord record = messageStore.get(messageId);
        if (record == null) return;
        
        if (ack) {
            record.setStatus(MessageStatus.SUCCESS);
            log.info("✅ 消息发送成功: {}", messageId);
        } else {
            record.setStatus(MessageStatus.FAILED);
            log.error("❌ 消息发送失败: {}", messageId);
        }
    }
    
    /**
     * 重发失败消息(定时任务调用)
     */
    public void resendFailedMessages() {
        messageStore.values().stream()
            .filter(r -> r.getStatus() == MessageStatus.FAILED)
            .filter(r -> r.getRetryCount() < 3)
            .forEach(record -> {
                record.setRetryCount(record.getRetryCount() + 1);
                record.setStatus(MessageStatus.SENDING);
                
                CorrelationData correlationData = new CorrelationData(record.getMessageId());
                rabbitTemplate.convertAndSend(
                    record.getExchange(),
                    record.getRoutingKey(),
                    record.getContent(),
                    correlationData
                );
                
                log.info("🔄 重发消息: {}, 第 {} 次", record.getMessageId(), record.getRetryCount());
            });
    }
    
    @Data
    static class MessageRecord {
        private String messageId;
        private String exchange;
        private String routingKey;
        private String content;
        private MessageStatus status;
        private LocalDateTime createTime;
        private int retryCount;
    }
    
    enum MessageStatus {
        SENDING, SUCCESS, FAILED
    }
}

📝 本章小结

机制作用配置
Publisher Confirm确认消息到达交换机publisher-confirm-type
Publisher Return处理路由失败的消息publisher-returns
Consumer Ack确认消息处理完成acknowledge-mode: manual
消息持久化防止服务器重启丢失durable + deliveryMode

下一步

消息确认保证了消息不丢失,但处理失败的消息怎么办?请学习 死信队列