消息幂等性
消息幂等性是保证消息不被重复处理的关键机制。
🎯 本章目标
- 理解消息重复消费的原因
- 掌握常用的幂等性解决方案
- 实现生产级的幂等性处理
⚠️ 重复消费场景
1. 消费确认失败
消费者 → 处理成功 → 发送 ACK → 网络异常 → ACK 丢失
↓
RabbitMQ 重新投递
↓
消费者再次处理(重复!)
2. 生产者重试
生产者 → 发送消息 → 网络超时 → 生产者以为失败 → 重新发送
↓
消息实际已发送成功
↓
队列中有两条相同消息
3. 消费者重启
消费者 → 处理消息 → 突然宕机 → 消息未确认
↓
消费者重启后重新获取消息
🔧 解决方案
方案一:唯一消息ID + 数据库
package com.example.rabbitmq.idempotency;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
/**
* 基于数据库的幂等性处理
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class DatabaseIdempotencyService {
private final MessageLogRepository messageLogRepository;
/**
* 检查消息是否已处理
*/
public boolean isProcessed(String messageId) {
return messageLogRepository.existsByMessageId(messageId);
}
/**
* 标记消息已处理
*/
@Transactional
public void markAsProcessed(String messageId, String content) {
MessageLog log = new MessageLog();
log.setMessageId(messageId);
log.setContent(content);
log.setProcessTime(LocalDateTime.now());
log.setStatus("SUCCESS");
messageLogRepository.save(log);
}
/**
* 幂等消费模板方法
*/
@Transactional
public boolean consumeIdempotently(String messageId, String content,
MessageProcessor processor) {
// 1. 检查是否已处理
if (isProcessed(messageId)) {
log.info("消息已处理,跳过: {}", messageId);
return true;
}
try {
// 2. 处理业务逻辑
processor.process(content);
// 3. 标记为已处理
markAsProcessed(messageId, content);
log.info("消息处理成功: {}", messageId);
return true;
} catch (Exception e) {
log.error("消息处理失败: {}", messageId, e);
throw e;
}
}
@FunctionalInterface
public interface MessageProcessor {
void process(String content) throws Exception;
}
}
// 实体类
@Data
@Entity
@Table(name = "message_log", indexes = {
@Index(name = "idx_message_id", columnList = "messageId", unique = true)
})
public class MessageLog {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(unique = true, nullable = false, length = 64)
private String messageId;
@Column(length = 2000)
private String content;
private LocalDateTime processTime;
@Column(length = 20)
private String status;
}
消费者使用:
@Slf4j
@Component
@RequiredArgsConstructor
public class IdempotentConsumer {
private final DatabaseIdempotencyService idempotencyService;
private final OrderService orderService;
@RabbitListener(queues = "order.queue")
public void handleOrder(String orderJson, Channel channel, Message message)
throws IOException {
String messageId = message.getMessageProperties().getMessageId();
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (messageId == null) {
log.warn("消息缺少ID,生成临时ID");
messageId = UUID.randomUUID().toString();
}
try {
boolean success = idempotencyService.consumeIdempotently(
messageId,
orderJson,
content -> {
Order order = parseOrder(content);
orderService.processOrder(order);
}
);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("消息处理异常", e);
channel.basicNack(deliveryTag, false, false);
}
}
}
方案二:Redis 分布式锁
package com.example.rabbitmq.idempotency;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
/**
* 基于 Redis 的幂等性处理
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class RedisIdempotencyService {
private final StringRedisTemplate redisTemplate;
private static final String KEY_PREFIX = "mq:idempotent:";
private static final long EXPIRE_HOURS = 24; // 24小时过期
/**
* 尝试获取幂等锁
* @return true=首次处理,false=重复消息
*/
public boolean tryLock(String messageId) {
String key = KEY_PREFIX + messageId;
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, "1", EXPIRE_HOURS, TimeUnit.HOURS);
return Boolean.TRUE.equals(success);
}
/**
* 释放幂等锁(处理失败时调用)
*/
public void unlock(String messageId) {
String key = KEY_PREFIX + messageId;
redisTemplate.delete(key);
}
/**
* 设置处理结果
*/
public void setResult(String messageId, String result) {
String key = KEY_PREFIX + messageId;
redisTemplate.opsForValue().set(key, result, EXPIRE_HOURS, TimeUnit.HOURS);
}
/**
* 获取处理结果
*/
public String getResult(String messageId) {
String key = KEY_PREFIX + messageId;
return redisTemplate.opsForValue().get(key);
}
}
消费者使用:
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisIdempotentConsumer {
private final RedisIdempotencyService idempotencyService;
private final PaymentService paymentService;
@RabbitListener(queues = "payment.queue")
public void handlePayment(String paymentJson, Channel channel, Message message)
throws IOException {
String messageId = message.getMessageProperties().getMessageId();
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 1. 尝试获取幂等锁
if (!idempotencyService.tryLock(messageId)) {
log.info("重复消息,跳过处理: {}", messageId);
channel.basicAck(deliveryTag, false);
return;
}
try {
// 2. 处理业务
Payment payment = parsePayment(paymentJson);
paymentService.processPayment(payment);
// 3. 设置成功结果
idempotencyService.setResult(messageId, "SUCCESS");
channel.basicAck(deliveryTag, false);
log.info("支付处理成功: {}", messageId);
} catch (Exception e) {
log.error("支付处理失败: {}", messageId, e);
// 4. 处理失败,释放锁(允许重试)
idempotencyService.unlock(messageId);
channel.basicNack(deliveryTag, false, true);
}
}
}
方案三:业务状态机
package com.example.rabbitmq.idempotency;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 基于业务状态机的幂等性处理
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class StateMachineService {
private final OrderRepository orderRepository;
/**
* 支付订单(幂等)
*/
@Transactional
public boolean payOrder(String orderId, String paymentId) {
// 使用乐观锁更新
int updated = orderRepository.updateStatusWithCondition(
orderId,
OrderStatus.UNPAID, // 当前状态必须是未支付
OrderStatus.PAID, // 更新为已支付
paymentId
);
if (updated == 0) {
// 更新失败,检查原因
Order order = orderRepository.findById(orderId);
if (order == null) {
log.error("订单不存在: {}", orderId);
return false;
}
if (order.getStatus() == OrderStatus.PAID) {
log.info("订单已支付,幂等返回: {}", orderId);
return true; // 幂等成功
}
log.error("订单状态异常: {} - {}", orderId, order.getStatus());
return false;
}
log.info("订单支付成功: {}", orderId);
return true;
}
}
// Repository
public interface OrderRepository extends JpaRepository<Order, String> {
@Modifying
@Query("UPDATE Order o SET o.status = :newStatus, o.paymentId = :paymentId, " +
"o.payTime = CURRENT_TIMESTAMP WHERE o.orderId = :orderId AND o.status = :currentStatus")
int updateStatusWithCondition(
@Param("orderId") String orderId,
@Param("currentStatus") OrderStatus currentStatus,
@Param("newStatus") OrderStatus newStatus,
@Param("paymentId") String paymentId
);
}
方案四:唯一索引
/**
* 利用数据库唯一索引保证幂等
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class UniqueIndexService {
private final PointRecordRepository pointRecordRepository;
/**
* 增加积分(幂等)
* 使用 (userId, businessId, businessType) 唯一索引
*/
@Transactional
public boolean addPoints(Long userId, String businessId, String businessType, int points) {
try {
PointRecord record = new PointRecord();
record.setUserId(userId);
record.setBusinessId(businessId); // 业务唯一标识
record.setBusinessType(businessType);
record.setPoints(points);
record.setCreateTime(LocalDateTime.now());
pointRecordRepository.save(record);
// 更新用户总积分
userPointRepository.addPoints(userId, points);
log.info("积分增加成功: userId={}, points={}", userId, points);
return true;
} catch (DataIntegrityViolationException e) {
// 唯一索引冲突,说明已经处理过
log.info("积分已增加,幂等返回: userId={}, businessId={}", userId, businessId);
return true;
}
}
}
// 实体类
@Data
@Entity
@Table(name = "point_record", uniqueConstraints = {
@UniqueConstraint(columnNames = {"userId", "businessId", "businessType"})
})
public class PointRecord {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long userId;
private String businessId;
private String businessType;
private Integer points;
private LocalDateTime createTime;
}
📋 完整实现
生产者(设置消息ID)
@Slf4j
@Service
@RequiredArgsConstructor
public class IdempotentProducer {
private final RabbitTemplate rabbitTemplate;
/**
* 发送消息(带唯一ID)
*/
public void sendMessage(String exchange, String routingKey, Object message) {
String messageId = generateMessageId();
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setMessageId(messageId);
msg.getMessageProperties().setTimestamp(new Date());
return msg;
});
log.info("发送消息: id={}", messageId);
}
/**
* 生成全局唯一消息ID
*/
private String generateMessageId() {
// 方式1: UUID
// return UUID.randomUUID().toString();
// 方式2: 雪花算法
// return snowflake.nextIdStr();
// 方式3: 业务ID + 时间戳
return String.format("%s_%d", "MSG", System.nanoTime());
}
}
消费者(通用幂等处理)
@Slf4j
@Component
@RequiredArgsConstructor
public class GenericIdempotentConsumer {
private final RedisIdempotencyService redisIdempotency;
private final DatabaseIdempotencyService dbIdempotency;
/**
* 通用幂等消费方法
*/
public void consumeWithIdempotency(
String messageId,
String content,
Channel channel,
long deliveryTag,
ThrowingConsumer<String> processor) throws IOException {
// 1. Redis 快速检查
if (!redisIdempotency.tryLock(messageId)) {
log.info("Redis 检测到重复消息: {}", messageId);
channel.basicAck(deliveryTag, false);
return;
}
try {
// 2. 数据库二次检查(可选,用于持久化)
if (dbIdempotency.isProcessed(messageId)) {
log.info("数据库检测到重复消息: {}", messageId);
channel.basicAck(deliveryTag, false);
return;
}
// 3. 执行业务逻辑
processor.accept(content);
// 4. 记录处理成功
dbIdempotency.markAsProcessed(messageId, content);
redisIdempotency.setResult(messageId, "SUCCESS");
channel.basicAck(deliveryTag, false);
log.info("消息处理成功: {}", messageId);
} catch (Exception e) {
log.error("消息处理失败: {}", messageId, e);
// 释放 Redis 锁,允许重试
redisIdempotency.unlock(messageId);
channel.basicNack(deliveryTag, false, true);
}
}
@FunctionalInterface
public interface ThrowingConsumer<T> {
void accept(T t) throws Exception;
}
}
📊 方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 数据库 | 持久化可靠 | 性能较低 | 低并发场景 |
| Redis | 性能高 | 可能丢失 | 高并发场景 |
| 状态机 | 业务语义清晰 | 需要设计状态 | 有明确状态流转 |
| 唯一索引 | 简单可靠 | 依赖数据库 | 有唯一业务标识 |
📝 本章小结
| 要点 | 说明 |
|---|---|
| 消息ID | 每条消息必须有全局唯一ID |
| 幂等检查 | 处理前检查是否已处理 |
| 幂等记录 | 处理后记录已处理状态 |
| 失败回滚 | 处理失败要释放幂等锁 |
下一步
幂等性保证了消息不重复处理,接下来学习 优先级队列!
