延时队列

延时队列用于在指定时间后处理消息,是实现定时任务的利器。

🎯 本章目标

  • 理解延时队列的应用场景
  • 掌握基于 TTL + 死信的延时实现
  • 掌握基于插件的延时实现

📋 应用场景

场景延时时间
订单超时取消30 分钟
会议开始提醒15 分钟前
短信验证码过期5 分钟
预约服务提醒1 小时前
优惠券即将过期1 天前

🔧 方案一:TTL + 死信队列

原理

消息 → 延时队列(TTL=30min) → 30分钟后过期 → 死信队列 → 消费者处理

配置

package com.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * TTL 延时队列配置
 */
@Configuration
public class TtlDelayConfig {
    
    // 延时交换机和队列
    public static final String DELAY_EXCHANGE = "ttl.delay.exchange";
    public static final String DELAY_QUEUE_30S = "ttl.delay.queue.30s";
    public static final String DELAY_QUEUE_1M = "ttl.delay.queue.1m";
    public static final String DELAY_QUEUE_5M = "ttl.delay.queue.5m";
    
    // 处理交换机和队列
    public static final String PROCESS_EXCHANGE = "ttl.process.exchange";
    public static final String PROCESS_QUEUE = "ttl.process.queue";
    public static final String PROCESS_KEY = "process";
    
    // ===== 处理队列(死信队列)=====
    
    @Bean
    public DirectExchange processExchange() {
        return new DirectExchange(PROCESS_EXCHANGE);
    }
    
    @Bean
    public Queue processQueue() {
        return QueueBuilder.durable(PROCESS_QUEUE).build();
    }
    
    @Bean
    public Binding processBinding() {
        return BindingBuilder.bind(processQueue())
                .to(processExchange())
                .with(PROCESS_KEY);
    }
    
    // ===== 延时队列(不同TTL)=====
    
    @Bean
    public DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE);
    }
    
    @Bean
    public Queue delayQueue30s() {
        return createDelayQueue(DELAY_QUEUE_30S, 30 * 1000);
    }
    
    @Bean
    public Queue delayQueue1m() {
        return createDelayQueue(DELAY_QUEUE_1M, 60 * 1000);
    }
    
    @Bean
    public Queue delayQueue5m() {
        return createDelayQueue(DELAY_QUEUE_5M, 5 * 60 * 1000);
    }
    
    private Queue createDelayQueue(String name, long ttl) {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", PROCESS_EXCHANGE);
        args.put("x-dead-letter-routing-key", PROCESS_KEY);
        args.put("x-message-ttl", ttl);
        
        return QueueBuilder.durable(name).withArguments(args).build();
    }
    
    @Bean
    public Binding delayBinding30s() {
        return BindingBuilder.bind(delayQueue30s()).to(delayExchange()).with("delay.30s");
    }
    
    @Bean
    public Binding delayBinding1m() {
        return BindingBuilder.bind(delayQueue1m()).to(delayExchange()).with("delay.1m");
    }
    
    @Bean
    public Binding delayBinding5m() {
        return BindingBuilder.bind(delayQueue5m()).to(delayExchange()).with("delay.5m");
    }
}

使用

@Slf4j
@Service
@RequiredArgsConstructor
public class TtlDelayProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    public void sendDelay30s(String message) {
        log.info("发送30秒延时消息: {}", message);
        rabbitTemplate.convertAndSend(
            TtlDelayConfig.DELAY_EXCHANGE, 
            "delay.30s", 
            message
        );
    }
    
    public void sendDelay1m(String message) {
        log.info("发送1分钟延时消息: {}", message);
        rabbitTemplate.convertAndSend(
            TtlDelayConfig.DELAY_EXCHANGE, 
            "delay.1m", 
            message
        );
    }
}

@Slf4j
@Component
public class TtlDelayConsumer {
    
    @RabbitListener(queues = TtlDelayConfig.PROCESS_QUEUE)
    public void handleDelayMessage(String message, Channel channel, Message amqpMessage) 
            throws IOException {
        
        log.info("📥 收到延时消息: {}", message);
        log.info("   发送时间: {}", getOriginalTimestamp(amqpMessage));
        log.info("   接收时间: {}", LocalDateTime.now());
        
        // 处理业务逻辑
        channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
    }
}

TTL 方案的局限

问题:需要为每个延时时间创建单独的队列

如果业务需要不同的延时时间(1分钟、5分钟、30分钟、1小时...),就需要创建很多队列。

问题:消息按顺序过期

队列中的消息按照入队顺序过期,如果先入队的消息 TTL 较长,后入队的消息即使 TTL 较短,也要等前面的消息过期后才能出队。

🔧 方案二:延时插件(推荐)

安装插件

# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez

# 复制到插件目录
cp rabbitmq_delayed_message_exchange-3.12.0.ez /usr/lib/rabbitmq/plugins/

# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# Docker 方式
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

配置

package com.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 延时插件配置
 */
@Configuration
public class DelayPluginConfig {
    
    public static final String DELAY_EXCHANGE = "plugin.delay.exchange";
    public static final String DELAY_QUEUE = "plugin.delay.queue";
    public static final String DELAY_ROUTING_KEY = "plugin.delay.key";
    
    /**
     * 声明延时交换机
     * 类型为 x-delayed-message
     */
    @Bean
    public CustomExchange delayPluginExchange() {
        Map<String, Object> args = new HashMap<>();
        // 设置实际的交换机类型(direct/fanout/topic)
        args.put("x-delayed-type", "direct");
        
        return new CustomExchange(
            DELAY_EXCHANGE,
            "x-delayed-message",  // 延时交换机类型
            true,                  // durable
            false,                 // autoDelete
            args
        );
    }
    
    @Bean
    public Queue delayPluginQueue() {
        return QueueBuilder.durable(DELAY_QUEUE).build();
    }
    
    @Bean
    public Binding delayPluginBinding() {
        return BindingBuilder.bind(delayPluginQueue())
                .to(delayPluginExchange())
                .with(DELAY_ROUTING_KEY)
                .noargs();
    }
}

生产者

package com.example.rabbitmq.producer;

import com.example.rabbitmq.config.DelayPluginConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class DelayPluginProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 发送延时消息
     * @param message 消息内容
     * @param delayMs 延时时间(毫秒)
     */
    public void sendDelayMessage(String message, long delayMs) {
        log.info("📤 发送延时消息: {}, 延时: {}ms", message, delayMs);
        
        rabbitTemplate.convertAndSend(
            DelayPluginConfig.DELAY_EXCHANGE,
            DelayPluginConfig.DELAY_ROUTING_KEY,
            message,
            msg -> {
                // 设置延时时间(毫秒)
                msg.getMessageProperties().setDelay((int) delayMs);
                return msg;
            }
        );
    }
    
    /**
     * 发送延时消息(秒)
     */
    public void sendDelaySeconds(String message, int seconds) {
        sendDelayMessage(message, seconds * 1000L);
    }
    
    /**
     * 发送延时消息(分钟)
     */
    public void sendDelayMinutes(String message, int minutes) {
        sendDelayMessage(message, minutes * 60 * 1000L);
    }
}

消费者

package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.DelayPluginConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalDateTime;

@Slf4j
@Component
public class DelayPluginConsumer {
    
    @RabbitListener(queues = DelayPluginConfig.DELAY_QUEUE)
    public void handleDelayMessage(String message, Channel channel, Message amqpMessage) 
            throws IOException {
        
        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
        Integer delay = amqpMessage.getMessageProperties().getDelay();
        
        log.info("📥 收到延时消息:");
        log.info("   内容: {}", message);
        log.info("   延时: {}ms", delay);
        log.info("   时间: {}", LocalDateTime.now());
        
        try {
            // 处理业务逻辑
            processDelayedTask(message);
            
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            log.error("延时消息处理失败", e);
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    private void processDelayedTask(String message) {
        log.info("✅ 处理延时任务: {}", message);
    }
}

🔍 实战案例

订单超时取消

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderDelayService {
    
    private final RabbitTemplate rabbitTemplate;
    private final OrderRepository orderRepository;
    
    // 订单超时时间(30分钟)
    private static final int ORDER_TIMEOUT_MINUTES = 30;
    
    /**
     * 创建订单
     */
    public Order createOrder(CreateOrderRequest request) {
        // 1. 创建订单
        Order order = new Order();
        order.setOrderId(generateOrderId());
        order.setUserId(request.getUserId());
        order.setAmount(request.getAmount());
        order.setStatus(OrderStatus.UNPAID);
        order.setCreateTime(LocalDateTime.now());
        
        orderRepository.save(order);
        
        // 2. 发送延时消息(30分钟后检查)
        sendOrderTimeoutCheck(order.getOrderId());
        
        log.info("订单创建成功: {}", order.getOrderId());
        return order;
    }
    
    /**
     * 发送订单超时检查消息
     */
    private void sendOrderTimeoutCheck(String orderId) {
        Map<String, Object> message = new HashMap<>();
        message.put("orderId", orderId);
        message.put("action", "CHECK_PAYMENT");
        message.put("createTime", LocalDateTime.now().toString());
        
        rabbitTemplate.convertAndSend(
            DelayPluginConfig.DELAY_EXCHANGE,
            DelayPluginConfig.DELAY_ROUTING_KEY,
            message,
            msg -> {
                msg.getMessageProperties().setDelay(ORDER_TIMEOUT_MINUTES * 60 * 1000);
                return msg;
            }
        );
        
        log.info("发送订单超时检查消息: {}, {}分钟后执行", orderId, ORDER_TIMEOUT_MINUTES);
    }
}

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderTimeoutConsumer {
    
    private final OrderRepository orderRepository;
    private final InventoryService inventoryService;
    
    @RabbitListener(queues = DelayPluginConfig.DELAY_QUEUE)
    public void handleOrderTimeout(Map<String, Object> message, Channel channel, 
            Message amqpMessage) throws IOException {
        
        String orderId = (String) message.get("orderId");
        log.info("检查订单支付状态: {}", orderId);
        
        try {
            Order order = orderRepository.findById(orderId);
            
            if (order == null) {
                log.warn("订单不存在: {}", orderId);
                channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
                return;
            }
            
            if (order.getStatus() == OrderStatus.UNPAID) {
                // 取消订单
                order.setStatus(OrderStatus.CANCELLED);
                order.setCancelReason("支付超时");
                order.setCancelTime(LocalDateTime.now());
                orderRepository.save(order);
                
                // 恢复库存
                inventoryService.restore(order.getItems());
                
                log.info("订单超时取消: {}", orderId);
            } else {
                log.info("订单状态正常: {} - {}", orderId, order.getStatus());
            }
            
            channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
            
        } catch (Exception e) {
            log.error("订单超时处理失败: {}", orderId, e);
            channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

活动开始提醒

@Slf4j
@Service
@RequiredArgsConstructor
public class ActivityReminderService {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 设置活动提醒
     * @param activityId 活动ID
     * @param userId 用户ID
     * @param startTime 活动开始时间
     * @param reminderMinutes 提前多少分钟提醒
     */
    public void setReminder(String activityId, Long userId, LocalDateTime startTime, 
            int reminderMinutes) {
        
        // 计算延时时间
        LocalDateTime reminderTime = startTime.minusMinutes(reminderMinutes);
        long delayMs = Duration.between(LocalDateTime.now(), reminderTime).toMillis();
        
        if (delayMs <= 0) {
            log.warn("提醒时间已过,无法设置提醒");
            return;
        }
        
        Map<String, Object> message = new HashMap<>();
        message.put("activityId", activityId);
        message.put("userId", userId);
        message.put("startTime", startTime.toString());
        message.put("type", "ACTIVITY_REMINDER");
        
        rabbitTemplate.convertAndSend(
            DelayPluginConfig.DELAY_EXCHANGE,
            DelayPluginConfig.DELAY_ROUTING_KEY,
            message,
            msg -> {
                msg.getMessageProperties().setDelay((int) delayMs);
                return msg;
            }
        );
        
        log.info("设置活动提醒: 活动{}, 用户{}, {}分钟后提醒", activityId, userId, delayMs / 60000);
    }
}

📊 两种方案对比

特性TTL + 死信延时插件
延时精度秒级毫秒级
任意延时❌ 需要预设队列✅ 支持
消息顺序❌ 存在问题✅ 按延时时间排序
安装依赖✅ 无需安装❌ 需要安装插件
性能一般较好

📝 本章小结

方案优点缺点适用场景
TTL + 死信无需插件需要预设多个队列固定延时时间
延时插件灵活精确需要安装插件任意延时时间

下一步

延时队列解决了定时任务问题,接下来学习如何解决消息重复消费的 幂等性问题