死信队列

死信队列(Dead Letter Queue, DLQ)用于处理无法正常消费的消息。

🎯 本章目标

  • 理解什么是死信
  • 掌握死信队列的配置
  • 实现消息失败处理机制

💀 什么是死信

死信(Dead Letter) 是指无法被正常消费的消息。消息变成死信的情况:

场景说明
消息被拒绝basicNack/basicReject 且 requeue=false
消息过期TTL 到期
队列满了达到 max-length 限制

🔄 死信队列流程

┌──────────┐    ┌──────────────┐    ┌──────────────┐
│ Producer │───→│ 正常交换机    │───→│   正常队列    │
└──────────┘    └──────────────┘    └──────────────┘
                                           │
                                    消息变成死信
                                           ↓
                                    ┌──────────────┐    ┌──────────────┐
                                    │ 死信交换机    │───→│   死信队列    │
                                    └──────────────┘    └──────────────┘
                                                              │
                                                              ↓
                                                        ┌──────────────┐
                                                        │ 死信消费者    │
                                                        └──────────────┘

🛠️ 代码实现

配置类

package com.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列配置
 */
@Configuration
public class DeadLetterConfig {
    
    // ===== 正常业务队列 =====
    public static final String BUSINESS_EXCHANGE = "business.exchange";
    public static final String BUSINESS_QUEUE = "business.queue";
    public static final String BUSINESS_ROUTING_KEY = "business.key";
    
    // ===== 死信队列 =====
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
    public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.key";
    
    // ===== 死信交换机和队列 =====
    
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }
    
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }
    
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(deadLetterExchange())
                .with(DEAD_LETTER_ROUTING_KEY);
    }
    
    // ===== 正常业务交换机和队列 =====
    
    @Bean
    public DirectExchange businessExchange() {
        return new DirectExchange(BUSINESS_EXCHANGE);
    }
    
    @Bean
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>();
        
        // 配置死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 配置死信路由键
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        
        // 可选:消息TTL(毫秒)
        // args.put("x-message-ttl", 10000);
        
        // 可选:队列最大长度
        // args.put("x-max-length", 100);
        
        return QueueBuilder.durable(BUSINESS_QUEUE)
                .withArguments(args)
                .build();
    }
    
    @Bean
    public Binding businessBinding() {
        return BindingBuilder.bind(businessQueue())
                .to(businessExchange())
                .with(BUSINESS_ROUTING_KEY);
    }
}

生产者

package com.example.rabbitmq.producer;

import com.example.rabbitmq.config.DeadLetterConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class DeadLetterProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 发送消息
     */
    public void sendMessage(String message) {
        log.info("📤 发送消息: {}", message);
        
        rabbitTemplate.convertAndSend(
            DeadLetterConfig.BUSINESS_EXCHANGE,
            DeadLetterConfig.BUSINESS_ROUTING_KEY,
            message
        );
    }
    
    /**
     * 发送带TTL的消息
     */
    public void sendMessageWithTTL(String message, long ttl) {
        log.info("📤 发送消息 (TTL={}ms): {}", ttl, message);
        
        rabbitTemplate.convertAndSend(
            DeadLetterConfig.BUSINESS_EXCHANGE,
            DeadLetterConfig.BUSINESS_ROUTING_KEY,
            message,
            msg -> {
                msg.getMessageProperties().setExpiration(String.valueOf(ttl));
                return msg;
            }
        );
    }
}

业务消费者

package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.DeadLetterConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class BusinessConsumer {
    
    private static final int MAX_RETRY = 3;
    
    @RabbitListener(queues = DeadLetterConfig.BUSINESS_QUEUE)
    public void handleMessage(String message, Channel channel, Message amqpMessage) 
            throws IOException {
        
        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
        
        // 获取重试次数
        Map<String, Object> headers = amqpMessage.getMessageProperties().getHeaders();
        Long retryCount = (Long) headers.getOrDefault("x-retry-count", 0L);
        
        try {
            log.info("📥 处理消息: {}", message);
            
            // 模拟业务处理
            processMessage(message);
            
            // 处理成功
            channel.basicAck(deliveryTag, false);
            log.info("✅ 消息处理成功");
            
        } catch (Exception e) {
            log.error("❌ 消息处理失败: {}", e.getMessage());
            
            if (retryCount < MAX_RETRY) {
                // 重新入队重试
                log.warn("🔄 重试 {}/{}", retryCount + 1, MAX_RETRY);
                channel.basicNack(deliveryTag, false, true);
            } else {
                // 超过重试次数,进入死信队列
                log.error("💀 进入死信队列");
                channel.basicNack(deliveryTag, false, false);
            }
        }
    }
    
    private void processMessage(String message) throws Exception {
        // 模拟处理失败
        if (message.toLowerCase().contains("error")) {
            throw new RuntimeException("业务处理异常");
        }
        Thread.sleep(100);
    }
}

死信消费者

package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.DeadLetterConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalDateTime;

@Slf4j
@Component
public class DeadLetterConsumer {
    
    @RabbitListener(queues = DeadLetterConfig.DEAD_LETTER_QUEUE)
    public void handleDeadLetter(String message, Channel channel, Message amqpMessage) 
            throws IOException {
        
        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
        
        // 获取死信原因
        Map<String, Object> headers = amqpMessage.getMessageProperties().getHeaders();
        String deathReason = getDeathReason(headers);
        
        log.warn("💀 收到死信消息:");
        log.warn("   内容: {}", message);
        log.warn("   原因: {}", deathReason);
        log.warn("   时间: {}", LocalDateTime.now());
        
        try {
            // 处理死信
            handleDeadMessage(message, deathReason);
            
            // 确认消息
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            log.error("死信处理失败: {}", e.getMessage());
            // 死信队列的消息不再重试,直接确认
            channel.basicAck(deliveryTag, false);
        }
    }
    
    /**
     * 获取死信原因
     */
    private String getDeathReason(Map<String, Object> headers) {
        if (headers == null) return "unknown";
        
        List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");
        if (xDeath != null && !xDeath.isEmpty()) {
            return (String) xDeath.get(0).get("reason");
        }
        return "unknown";
    }
    
    /**
     * 处理死信消息
     */
    private void handleDeadMessage(String message, String reason) {
        // 1. 记录到数据库
        log.info("📝 记录死信到数据库");
        
        // 2. 发送告警通知
        log.info("📧 发送告警通知");
        
        // 3. 根据不同原因进行处理
        switch (reason) {
            case "rejected":
                log.info("💡 消息被拒绝,可能需要人工处理");
                break;
            case "expired":
                log.info("💡 消息过期,检查消费者性能");
                break;
            case "maxlen":
                log.info("💡 队列满了,考虑扩容");
                break;
            default:
                log.info("💡 未知原因: {}", reason);
        }
    }
}

🔍 实战案例:订单超时取消

package com.example.rabbitmq.order;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 订单超时取消配置
 */
@Configuration
public class OrderTimeoutConfig {
    
    // 订单创建队列(设置30分钟TTL)
    public static final String ORDER_DELAY_EXCHANGE = "order.delay.exchange";
    public static final String ORDER_DELAY_QUEUE = "order.delay.queue";
    public static final String ORDER_DELAY_KEY = "order.delay";
    
    // 订单取消队列(死信队列)
    public static final String ORDER_CANCEL_EXCHANGE = "order.cancel.exchange";
    public static final String ORDER_CANCEL_QUEUE = "order.cancel.queue";
    public static final String ORDER_CANCEL_KEY = "order.cancel";
    
    @Bean
    public DirectExchange orderCancelExchange() {
        return new DirectExchange(ORDER_CANCEL_EXCHANGE);
    }
    
    @Bean
    public Queue orderCancelQueue() {
        return QueueBuilder.durable(ORDER_CANCEL_QUEUE).build();
    }
    
    @Bean
    public Binding orderCancelBinding() {
        return BindingBuilder.bind(orderCancelQueue())
                .to(orderCancelExchange())
                .with(ORDER_CANCEL_KEY);
    }
    
    @Bean
    public DirectExchange orderDelayExchange() {
        return new DirectExchange(ORDER_DELAY_EXCHANGE);
    }
    
    @Bean
    public Queue orderDelayQueue() {
        Map<String, Object> args = new HashMap<>();
        // 死信交换机
        args.put("x-dead-letter-exchange", ORDER_CANCEL_EXCHANGE);
        args.put("x-dead-letter-routing-key", ORDER_CANCEL_KEY);
        // 30分钟后过期
        args.put("x-message-ttl", 30 * 60 * 1000);
        
        return QueueBuilder.durable(ORDER_DELAY_QUEUE)
                .withArguments(args)
                .build();
    }
    
    @Bean
    public Binding orderDelayBinding() {
        return BindingBuilder.bind(orderDelayQueue())
                .to(orderDelayExchange())
                .with(ORDER_DELAY_KEY);
    }
}
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderService {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 创建订单
     */
    public void createOrder(Order order) {
        // 1. 保存订单
        orderRepository.save(order);
        
        // 2. 发送延时消息(30分钟后检查)
        rabbitTemplate.convertAndSend(
            OrderTimeoutConfig.ORDER_DELAY_EXCHANGE,
            OrderTimeoutConfig.ORDER_DELAY_KEY,
            order.getId()
        );
        
        log.info("订单创建成功: {}, 30分钟后检查支付状态", order.getId());
    }
}

@Slf4j
@Component
public class OrderCancelConsumer {
    
    @RabbitListener(queues = OrderTimeoutConfig.ORDER_CANCEL_QUEUE)
    public void handleOrderCancel(Long orderId, Channel channel, Message message) 
            throws IOException {
        
        try {
            log.info("检查订单支付状态: {}", orderId);
            
            Order order = orderRepository.findById(orderId);
            
            if (order != null && order.getStatus() == OrderStatus.UNPAID) {
                // 取消未支付订单
                order.setStatus(OrderStatus.CANCELLED);
                orderRepository.save(order);
                
                // 恢复库存
                inventoryService.restore(order);
                
                log.info("订单超时取消: {}", orderId);
            }
            
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            
        } catch (Exception e) {
            log.error("订单取消处理失败", e);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

📝 本章小结

概念说明
死信无法正常消费的消息
死信交换机接收死信的交换机
死信队列存储死信的队列
x-dead-letter-exchange配置死信交换机
x-dead-letter-routing-key配置死信路由键

下一步

死信队列可以实现简单的延时功能,但有局限性。接下来学习更灵活的 延时队列