消息幂等性

消息幂等性是保证消息不被重复处理的关键机制。

🎯 本章目标

  • 理解消息重复消费的原因
  • 掌握常用的幂等性解决方案
  • 实现生产级的幂等性处理

⚠️ 重复消费场景

1. 消费确认失败

消费者 → 处理成功 → 发送 ACK → 网络异常 → ACK 丢失
                                          ↓
                              RabbitMQ 重新投递
                                          ↓
                              消费者再次处理(重复!)

2. 生产者重试

生产者 → 发送消息 → 网络超时 → 生产者以为失败 → 重新发送
                        ↓
              消息实际已发送成功
                        ↓
                队列中有两条相同消息

3. 消费者重启

消费者 → 处理消息 → 突然宕机 → 消息未确认
                                ↓
                   消费者重启后重新获取消息

🔧 解决方案

方案一:唯一消息ID + 数据库

package com.example.rabbitmq.idempotency;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;

/**
 * 基于数据库的幂等性处理
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class DatabaseIdempotencyService {
    
    private final MessageLogRepository messageLogRepository;
    
    /**
     * 检查消息是否已处理
     */
    public boolean isProcessed(String messageId) {
        return messageLogRepository.existsByMessageId(messageId);
    }
    
    /**
     * 标记消息已处理
     */
    @Transactional
    public void markAsProcessed(String messageId, String content) {
        MessageLog log = new MessageLog();
        log.setMessageId(messageId);
        log.setContent(content);
        log.setProcessTime(LocalDateTime.now());
        log.setStatus("SUCCESS");
        
        messageLogRepository.save(log);
    }
    
    /**
     * 幂等消费模板方法
     */
    @Transactional
    public boolean consumeIdempotently(String messageId, String content, 
            MessageProcessor processor) {
        
        // 1. 检查是否已处理
        if (isProcessed(messageId)) {
            log.info("消息已处理,跳过: {}", messageId);
            return true;
        }
        
        try {
            // 2. 处理业务逻辑
            processor.process(content);
            
            // 3. 标记为已处理
            markAsProcessed(messageId, content);
            
            log.info("消息处理成功: {}", messageId);
            return true;
            
        } catch (Exception e) {
            log.error("消息处理失败: {}", messageId, e);
            throw e;
        }
    }
    
    @FunctionalInterface
    public interface MessageProcessor {
        void process(String content) throws Exception;
    }
}

// 实体类
@Data
@Entity
@Table(name = "message_log", indexes = {
    @Index(name = "idx_message_id", columnList = "messageId", unique = true)
})
public class MessageLog {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(unique = true, nullable = false, length = 64)
    private String messageId;
    
    @Column(length = 2000)
    private String content;
    
    private LocalDateTime processTime;
    
    @Column(length = 20)
    private String status;
}

消费者使用:

@Slf4j
@Component
@RequiredArgsConstructor
public class IdempotentConsumer {
    
    private final DatabaseIdempotencyService idempotencyService;
    private final OrderService orderService;
    
    @RabbitListener(queues = "order.queue")
    public void handleOrder(String orderJson, Channel channel, Message message) 
            throws IOException {
        
        String messageId = message.getMessageProperties().getMessageId();
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        
        if (messageId == null) {
            log.warn("消息缺少ID,生成临时ID");
            messageId = UUID.randomUUID().toString();
        }
        
        try {
            boolean success = idempotencyService.consumeIdempotently(
                messageId, 
                orderJson,
                content -> {
                    Order order = parseOrder(content);
                    orderService.processOrder(order);
                }
            );
            
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            log.error("消息处理异常", e);
            channel.basicNack(deliveryTag, false, false);
        }
    }
}

方案二:Redis 分布式锁

package com.example.rabbitmq.idempotency;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

/**
 * 基于 Redis 的幂等性处理
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class RedisIdempotencyService {
    
    private final StringRedisTemplate redisTemplate;
    
    private static final String KEY_PREFIX = "mq:idempotent:";
    private static final long EXPIRE_HOURS = 24;  // 24小时过期
    
    /**
     * 尝试获取幂等锁
     * @return true=首次处理,false=重复消息
     */
    public boolean tryLock(String messageId) {
        String key = KEY_PREFIX + messageId;
        
        Boolean success = redisTemplate.opsForValue()
            .setIfAbsent(key, "1", EXPIRE_HOURS, TimeUnit.HOURS);
        
        return Boolean.TRUE.equals(success);
    }
    
    /**
     * 释放幂等锁(处理失败时调用)
     */
    public void unlock(String messageId) {
        String key = KEY_PREFIX + messageId;
        redisTemplate.delete(key);
    }
    
    /**
     * 设置处理结果
     */
    public void setResult(String messageId, String result) {
        String key = KEY_PREFIX + messageId;
        redisTemplate.opsForValue().set(key, result, EXPIRE_HOURS, TimeUnit.HOURS);
    }
    
    /**
     * 获取处理结果
     */
    public String getResult(String messageId) {
        String key = KEY_PREFIX + messageId;
        return redisTemplate.opsForValue().get(key);
    }
}

消费者使用:

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisIdempotentConsumer {
    
    private final RedisIdempotencyService idempotencyService;
    private final PaymentService paymentService;
    
    @RabbitListener(queues = "payment.queue")
    public void handlePayment(String paymentJson, Channel channel, Message message) 
            throws IOException {
        
        String messageId = message.getMessageProperties().getMessageId();
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        
        // 1. 尝试获取幂等锁
        if (!idempotencyService.tryLock(messageId)) {
            log.info("重复消息,跳过处理: {}", messageId);
            channel.basicAck(deliveryTag, false);
            return;
        }
        
        try {
            // 2. 处理业务
            Payment payment = parsePayment(paymentJson);
            paymentService.processPayment(payment);
            
            // 3. 设置成功结果
            idempotencyService.setResult(messageId, "SUCCESS");
            
            channel.basicAck(deliveryTag, false);
            log.info("支付处理成功: {}", messageId);
            
        } catch (Exception e) {
            log.error("支付处理失败: {}", messageId, e);
            
            // 4. 处理失败,释放锁(允许重试)
            idempotencyService.unlock(messageId);
            
            channel.basicNack(deliveryTag, false, true);
        }
    }
}

方案三:业务状态机

package com.example.rabbitmq.idempotency;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * 基于业务状态机的幂等性处理
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class StateMachineService {
    
    private final OrderRepository orderRepository;
    
    /**
     * 支付订单(幂等)
     */
    @Transactional
    public boolean payOrder(String orderId, String paymentId) {
        // 使用乐观锁更新
        int updated = orderRepository.updateStatusWithCondition(
            orderId,
            OrderStatus.UNPAID,      // 当前状态必须是未支付
            OrderStatus.PAID,         // 更新为已支付
            paymentId
        );
        
        if (updated == 0) {
            // 更新失败,检查原因
            Order order = orderRepository.findById(orderId);
            
            if (order == null) {
                log.error("订单不存在: {}", orderId);
                return false;
            }
            
            if (order.getStatus() == OrderStatus.PAID) {
                log.info("订单已支付,幂等返回: {}", orderId);
                return true;  // 幂等成功
            }
            
            log.error("订单状态异常: {} - {}", orderId, order.getStatus());
            return false;
        }
        
        log.info("订单支付成功: {}", orderId);
        return true;
    }
}

// Repository
public interface OrderRepository extends JpaRepository<Order, String> {
    
    @Modifying
    @Query("UPDATE Order o SET o.status = :newStatus, o.paymentId = :paymentId, " +
           "o.payTime = CURRENT_TIMESTAMP WHERE o.orderId = :orderId AND o.status = :currentStatus")
    int updateStatusWithCondition(
        @Param("orderId") String orderId,
        @Param("currentStatus") OrderStatus currentStatus,
        @Param("newStatus") OrderStatus newStatus,
        @Param("paymentId") String paymentId
    );
}

方案四:唯一索引

/**
 * 利用数据库唯一索引保证幂等
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class UniqueIndexService {
    
    private final PointRecordRepository pointRecordRepository;
    
    /**
     * 增加积分(幂等)
     * 使用 (userId, businessId, businessType) 唯一索引
     */
    @Transactional
    public boolean addPoints(Long userId, String businessId, String businessType, int points) {
        try {
            PointRecord record = new PointRecord();
            record.setUserId(userId);
            record.setBusinessId(businessId);  // 业务唯一标识
            record.setBusinessType(businessType);
            record.setPoints(points);
            record.setCreateTime(LocalDateTime.now());
            
            pointRecordRepository.save(record);
            
            // 更新用户总积分
            userPointRepository.addPoints(userId, points);
            
            log.info("积分增加成功: userId={}, points={}", userId, points);
            return true;
            
        } catch (DataIntegrityViolationException e) {
            // 唯一索引冲突,说明已经处理过
            log.info("积分已增加,幂等返回: userId={}, businessId={}", userId, businessId);
            return true;
        }
    }
}

// 实体类
@Data
@Entity
@Table(name = "point_record", uniqueConstraints = {
    @UniqueConstraint(columnNames = {"userId", "businessId", "businessType"})
})
public class PointRecord {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    private Long userId;
    private String businessId;
    private String businessType;
    private Integer points;
    private LocalDateTime createTime;
}

📋 完整实现

生产者(设置消息ID)

@Slf4j
@Service
@RequiredArgsConstructor
public class IdempotentProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 发送消息(带唯一ID)
     */
    public void sendMessage(String exchange, String routingKey, Object message) {
        String messageId = generateMessageId();
        
        rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
            msg.getMessageProperties().setMessageId(messageId);
            msg.getMessageProperties().setTimestamp(new Date());
            return msg;
        });
        
        log.info("发送消息: id={}", messageId);
    }
    
    /**
     * 生成全局唯一消息ID
     */
    private String generateMessageId() {
        // 方式1: UUID
        // return UUID.randomUUID().toString();
        
        // 方式2: 雪花算法
        // return snowflake.nextIdStr();
        
        // 方式3: 业务ID + 时间戳
        return String.format("%s_%d", "MSG", System.nanoTime());
    }
}

消费者(通用幂等处理)

@Slf4j
@Component
@RequiredArgsConstructor
public class GenericIdempotentConsumer {
    
    private final RedisIdempotencyService redisIdempotency;
    private final DatabaseIdempotencyService dbIdempotency;
    
    /**
     * 通用幂等消费方法
     */
    public void consumeWithIdempotency(
            String messageId, 
            String content,
            Channel channel, 
            long deliveryTag,
            ThrowingConsumer<String> processor) throws IOException {
        
        // 1. Redis 快速检查
        if (!redisIdempotency.tryLock(messageId)) {
            log.info("Redis 检测到重复消息: {}", messageId);
            channel.basicAck(deliveryTag, false);
            return;
        }
        
        try {
            // 2. 数据库二次检查(可选,用于持久化)
            if (dbIdempotency.isProcessed(messageId)) {
                log.info("数据库检测到重复消息: {}", messageId);
                channel.basicAck(deliveryTag, false);
                return;
            }
            
            // 3. 执行业务逻辑
            processor.accept(content);
            
            // 4. 记录处理成功
            dbIdempotency.markAsProcessed(messageId, content);
            redisIdempotency.setResult(messageId, "SUCCESS");
            
            channel.basicAck(deliveryTag, false);
            log.info("消息处理成功: {}", messageId);
            
        } catch (Exception e) {
            log.error("消息处理失败: {}", messageId, e);
            
            // 释放 Redis 锁,允许重试
            redisIdempotency.unlock(messageId);
            
            channel.basicNack(deliveryTag, false, true);
        }
    }
    
    @FunctionalInterface
    public interface ThrowingConsumer<T> {
        void accept(T t) throws Exception;
    }
}

📊 方案对比

方案优点缺点适用场景
数据库持久化可靠性能较低低并发场景
Redis性能高可能丢失高并发场景
状态机业务语义清晰需要设计状态有明确状态流转
唯一索引简单可靠依赖数据库有唯一业务标识

📝 本章小结

要点说明
消息ID每条消息必须有全局唯一ID
幂等检查处理前检查是否已处理
幂等记录处理后记录已处理状态
失败回滚处理失败要释放幂等锁

下一步

幂等性保证了消息不重复处理,接下来学习 优先级队列