死信队列
死信队列(Dead Letter Queue, DLQ)用于处理无法正常消费的消息。
🎯 本章目标
- 理解什么是死信
- 掌握死信队列的配置
- 实现消息失败处理机制
💀 什么是死信
死信(Dead Letter) 是指无法被正常消费的消息。消息变成死信的情况:
| 场景 | 说明 |
|---|---|
| 消息被拒绝 | basicNack/basicReject 且 requeue=false |
| 消息过期 | TTL 到期 |
| 队列满了 | 达到 max-length 限制 |
🔄 死信队列流程
┌──────────┐ ┌──────────────┐ ┌──────────────┐
│ Producer │───→│ 正常交换机 │───→│ 正常队列 │
└──────────┘ └──────────────┘ └──────────────┘
│
消息变成死信
↓
┌──────────────┐ ┌──────────────┐
│ 死信交换机 │───→│ 死信队列 │
└──────────────┘ └──────────────┘
│
↓
┌──────────────┐
│ 死信消费者 │
└──────────────┘
🛠️ 代码实现
配置类
package com.example.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 死信队列配置
*/
@Configuration
public class DeadLetterConfig {
// ===== 正常业务队列 =====
public static final String BUSINESS_EXCHANGE = "business.exchange";
public static final String BUSINESS_QUEUE = "business.queue";
public static final String BUSINESS_ROUTING_KEY = "business.key";
// ===== 死信队列 =====
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.key";
// ===== 死信交换机和队列 =====
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with(DEAD_LETTER_ROUTING_KEY);
}
// ===== 正常业务交换机和队列 =====
@Bean
public DirectExchange businessExchange() {
return new DirectExchange(BUSINESS_EXCHANGE);
}
@Bean
public Queue businessQueue() {
Map<String, Object> args = new HashMap<>();
// 配置死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 配置死信路由键
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
// 可选:消息TTL(毫秒)
// args.put("x-message-ttl", 10000);
// 可选:队列最大长度
// args.put("x-max-length", 100);
return QueueBuilder.durable(BUSINESS_QUEUE)
.withArguments(args)
.build();
}
@Bean
public Binding businessBinding() {
return BindingBuilder.bind(businessQueue())
.to(businessExchange())
.with(BUSINESS_ROUTING_KEY);
}
}
生产者
package com.example.rabbitmq.producer;
import com.example.rabbitmq.config.DeadLetterConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class DeadLetterProducer {
private final RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
public void sendMessage(String message) {
log.info("📤 发送消息: {}", message);
rabbitTemplate.convertAndSend(
DeadLetterConfig.BUSINESS_EXCHANGE,
DeadLetterConfig.BUSINESS_ROUTING_KEY,
message
);
}
/**
* 发送带TTL的消息
*/
public void sendMessageWithTTL(String message, long ttl) {
log.info("📤 发送消息 (TTL={}ms): {}", ttl, message);
rabbitTemplate.convertAndSend(
DeadLetterConfig.BUSINESS_EXCHANGE,
DeadLetterConfig.BUSINESS_ROUTING_KEY,
message,
msg -> {
msg.getMessageProperties().setExpiration(String.valueOf(ttl));
return msg;
}
);
}
}
业务消费者
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.config.DeadLetterConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
public class BusinessConsumer {
private static final int MAX_RETRY = 3;
@RabbitListener(queues = DeadLetterConfig.BUSINESS_QUEUE)
public void handleMessage(String message, Channel channel, Message amqpMessage)
throws IOException {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
// 获取重试次数
Map<String, Object> headers = amqpMessage.getMessageProperties().getHeaders();
Long retryCount = (Long) headers.getOrDefault("x-retry-count", 0L);
try {
log.info("📥 处理消息: {}", message);
// 模拟业务处理
processMessage(message);
// 处理成功
channel.basicAck(deliveryTag, false);
log.info("✅ 消息处理成功");
} catch (Exception e) {
log.error("❌ 消息处理失败: {}", e.getMessage());
if (retryCount < MAX_RETRY) {
// 重新入队重试
log.warn("🔄 重试 {}/{}", retryCount + 1, MAX_RETRY);
channel.basicNack(deliveryTag, false, true);
} else {
// 超过重试次数,进入死信队列
log.error("💀 进入死信队列");
channel.basicNack(deliveryTag, false, false);
}
}
}
private void processMessage(String message) throws Exception {
// 模拟处理失败
if (message.toLowerCase().contains("error")) {
throw new RuntimeException("业务处理异常");
}
Thread.sleep(100);
}
}
死信消费者
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.config.DeadLetterConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
@Slf4j
@Component
public class DeadLetterConsumer {
@RabbitListener(queues = DeadLetterConfig.DEAD_LETTER_QUEUE)
public void handleDeadLetter(String message, Channel channel, Message amqpMessage)
throws IOException {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
// 获取死信原因
Map<String, Object> headers = amqpMessage.getMessageProperties().getHeaders();
String deathReason = getDeathReason(headers);
log.warn("💀 收到死信消息:");
log.warn(" 内容: {}", message);
log.warn(" 原因: {}", deathReason);
log.warn(" 时间: {}", LocalDateTime.now());
try {
// 处理死信
handleDeadMessage(message, deathReason);
// 确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("死信处理失败: {}", e.getMessage());
// 死信队列的消息不再重试,直接确认
channel.basicAck(deliveryTag, false);
}
}
/**
* 获取死信原因
*/
private String getDeathReason(Map<String, Object> headers) {
if (headers == null) return "unknown";
List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");
if (xDeath != null && !xDeath.isEmpty()) {
return (String) xDeath.get(0).get("reason");
}
return "unknown";
}
/**
* 处理死信消息
*/
private void handleDeadMessage(String message, String reason) {
// 1. 记录到数据库
log.info("📝 记录死信到数据库");
// 2. 发送告警通知
log.info("📧 发送告警通知");
// 3. 根据不同原因进行处理
switch (reason) {
case "rejected":
log.info("💡 消息被拒绝,可能需要人工处理");
break;
case "expired":
log.info("💡 消息过期,检查消费者性能");
break;
case "maxlen":
log.info("💡 队列满了,考虑扩容");
break;
default:
log.info("💡 未知原因: {}", reason);
}
}
}
🔍 实战案例:订单超时取消
package com.example.rabbitmq.order;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 订单超时取消配置
*/
@Configuration
public class OrderTimeoutConfig {
// 订单创建队列(设置30分钟TTL)
public static final String ORDER_DELAY_EXCHANGE = "order.delay.exchange";
public static final String ORDER_DELAY_QUEUE = "order.delay.queue";
public static final String ORDER_DELAY_KEY = "order.delay";
// 订单取消队列(死信队列)
public static final String ORDER_CANCEL_EXCHANGE = "order.cancel.exchange";
public static final String ORDER_CANCEL_QUEUE = "order.cancel.queue";
public static final String ORDER_CANCEL_KEY = "order.cancel";
@Bean
public DirectExchange orderCancelExchange() {
return new DirectExchange(ORDER_CANCEL_EXCHANGE);
}
@Bean
public Queue orderCancelQueue() {
return QueueBuilder.durable(ORDER_CANCEL_QUEUE).build();
}
@Bean
public Binding orderCancelBinding() {
return BindingBuilder.bind(orderCancelQueue())
.to(orderCancelExchange())
.with(ORDER_CANCEL_KEY);
}
@Bean
public DirectExchange orderDelayExchange() {
return new DirectExchange(ORDER_DELAY_EXCHANGE);
}
@Bean
public Queue orderDelayQueue() {
Map<String, Object> args = new HashMap<>();
// 死信交换机
args.put("x-dead-letter-exchange", ORDER_CANCEL_EXCHANGE);
args.put("x-dead-letter-routing-key", ORDER_CANCEL_KEY);
// 30分钟后过期
args.put("x-message-ttl", 30 * 60 * 1000);
return QueueBuilder.durable(ORDER_DELAY_QUEUE)
.withArguments(args)
.build();
}
@Bean
public Binding orderDelayBinding() {
return BindingBuilder.bind(orderDelayQueue())
.to(orderDelayExchange())
.with(ORDER_DELAY_KEY);
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderService {
private final RabbitTemplate rabbitTemplate;
/**
* 创建订单
*/
public void createOrder(Order order) {
// 1. 保存订单
orderRepository.save(order);
// 2. 发送延时消息(30分钟后检查)
rabbitTemplate.convertAndSend(
OrderTimeoutConfig.ORDER_DELAY_EXCHANGE,
OrderTimeoutConfig.ORDER_DELAY_KEY,
order.getId()
);
log.info("订单创建成功: {}, 30分钟后检查支付状态", order.getId());
}
}
@Slf4j
@Component
public class OrderCancelConsumer {
@RabbitListener(queues = OrderTimeoutConfig.ORDER_CANCEL_QUEUE)
public void handleOrderCancel(Long orderId, Channel channel, Message message)
throws IOException {
try {
log.info("检查订单支付状态: {}", orderId);
Order order = orderRepository.findById(orderId);
if (order != null && order.getStatus() == OrderStatus.UNPAID) {
// 取消未支付订单
order.setStatus(OrderStatus.CANCELLED);
orderRepository.save(order);
// 恢复库存
inventoryService.restore(order);
log.info("订单超时取消: {}", orderId);
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("订单取消处理失败", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
📝 本章小结
| 概念 | 说明 |
|---|---|
| 死信 | 无法正常消费的消息 |
| 死信交换机 | 接收死信的交换机 |
| 死信队列 | 存储死信的队列 |
| x-dead-letter-exchange | 配置死信交换机 |
| x-dead-letter-routing-key | 配置死信路由键 |
下一步
死信队列可以实现简单的延时功能,但有局限性。接下来学习更灵活的 延时队列!
