延时队列
延时队列用于在指定时间后处理消息,是实现定时任务的利器。
🎯 本章目标
- 理解延时队列的应用场景
- 掌握基于 TTL + 死信的延时实现
- 掌握基于插件的延时实现
📋 应用场景
| 场景 | 延时时间 |
|---|---|
| 订单超时取消 | 30 分钟 |
| 会议开始提醒 | 15 分钟前 |
| 短信验证码过期 | 5 分钟 |
| 预约服务提醒 | 1 小时前 |
| 优惠券即将过期 | 1 天前 |
🔧 方案一:TTL + 死信队列
原理
消息 → 延时队列(TTL=30min) → 30分钟后过期 → 死信队列 → 消费者处理
配置
package com.example.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* TTL 延时队列配置
*/
@Configuration
public class TtlDelayConfig {
// 延时交换机和队列
public static final String DELAY_EXCHANGE = "ttl.delay.exchange";
public static final String DELAY_QUEUE_30S = "ttl.delay.queue.30s";
public static final String DELAY_QUEUE_1M = "ttl.delay.queue.1m";
public static final String DELAY_QUEUE_5M = "ttl.delay.queue.5m";
// 处理交换机和队列
public static final String PROCESS_EXCHANGE = "ttl.process.exchange";
public static final String PROCESS_QUEUE = "ttl.process.queue";
public static final String PROCESS_KEY = "process";
// ===== 处理队列(死信队列)=====
@Bean
public DirectExchange processExchange() {
return new DirectExchange(PROCESS_EXCHANGE);
}
@Bean
public Queue processQueue() {
return QueueBuilder.durable(PROCESS_QUEUE).build();
}
@Bean
public Binding processBinding() {
return BindingBuilder.bind(processQueue())
.to(processExchange())
.with(PROCESS_KEY);
}
// ===== 延时队列(不同TTL)=====
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE);
}
@Bean
public Queue delayQueue30s() {
return createDelayQueue(DELAY_QUEUE_30S, 30 * 1000);
}
@Bean
public Queue delayQueue1m() {
return createDelayQueue(DELAY_QUEUE_1M, 60 * 1000);
}
@Bean
public Queue delayQueue5m() {
return createDelayQueue(DELAY_QUEUE_5M, 5 * 60 * 1000);
}
private Queue createDelayQueue(String name, long ttl) {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", PROCESS_EXCHANGE);
args.put("x-dead-letter-routing-key", PROCESS_KEY);
args.put("x-message-ttl", ttl);
return QueueBuilder.durable(name).withArguments(args).build();
}
@Bean
public Binding delayBinding30s() {
return BindingBuilder.bind(delayQueue30s()).to(delayExchange()).with("delay.30s");
}
@Bean
public Binding delayBinding1m() {
return BindingBuilder.bind(delayQueue1m()).to(delayExchange()).with("delay.1m");
}
@Bean
public Binding delayBinding5m() {
return BindingBuilder.bind(delayQueue5m()).to(delayExchange()).with("delay.5m");
}
}
使用
@Slf4j
@Service
@RequiredArgsConstructor
public class TtlDelayProducer {
private final RabbitTemplate rabbitTemplate;
public void sendDelay30s(String message) {
log.info("发送30秒延时消息: {}", message);
rabbitTemplate.convertAndSend(
TtlDelayConfig.DELAY_EXCHANGE,
"delay.30s",
message
);
}
public void sendDelay1m(String message) {
log.info("发送1分钟延时消息: {}", message);
rabbitTemplate.convertAndSend(
TtlDelayConfig.DELAY_EXCHANGE,
"delay.1m",
message
);
}
}
@Slf4j
@Component
public class TtlDelayConsumer {
@RabbitListener(queues = TtlDelayConfig.PROCESS_QUEUE)
public void handleDelayMessage(String message, Channel channel, Message amqpMessage)
throws IOException {
log.info("📥 收到延时消息: {}", message);
log.info(" 发送时间: {}", getOriginalTimestamp(amqpMessage));
log.info(" 接收时间: {}", LocalDateTime.now());
// 处理业务逻辑
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
}
}
TTL 方案的局限
❌ 问题:需要为每个延时时间创建单独的队列
如果业务需要不同的延时时间(1分钟、5分钟、30分钟、1小时...),就需要创建很多队列。
❌ 问题:消息按顺序过期
队列中的消息按照入队顺序过期,如果先入队的消息 TTL 较长,后入队的消息即使 TTL 较短,也要等前面的消息过期后才能出队。
🔧 方案二:延时插件(推荐)
安装插件
# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez
# 复制到插件目录
cp rabbitmq_delayed_message_exchange-3.12.0.ez /usr/lib/rabbitmq/plugins/
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# Docker 方式
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
配置
package com.example.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 延时插件配置
*/
@Configuration
public class DelayPluginConfig {
public static final String DELAY_EXCHANGE = "plugin.delay.exchange";
public static final String DELAY_QUEUE = "plugin.delay.queue";
public static final String DELAY_ROUTING_KEY = "plugin.delay.key";
/**
* 声明延时交换机
* 类型为 x-delayed-message
*/
@Bean
public CustomExchange delayPluginExchange() {
Map<String, Object> args = new HashMap<>();
// 设置实际的交换机类型(direct/fanout/topic)
args.put("x-delayed-type", "direct");
return new CustomExchange(
DELAY_EXCHANGE,
"x-delayed-message", // 延时交换机类型
true, // durable
false, // autoDelete
args
);
}
@Bean
public Queue delayPluginQueue() {
return QueueBuilder.durable(DELAY_QUEUE).build();
}
@Bean
public Binding delayPluginBinding() {
return BindingBuilder.bind(delayPluginQueue())
.to(delayPluginExchange())
.with(DELAY_ROUTING_KEY)
.noargs();
}
}
生产者
package com.example.rabbitmq.producer;
import com.example.rabbitmq.config.DelayPluginConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class DelayPluginProducer {
private final RabbitTemplate rabbitTemplate;
/**
* 发送延时消息
* @param message 消息内容
* @param delayMs 延时时间(毫秒)
*/
public void sendDelayMessage(String message, long delayMs) {
log.info("📤 发送延时消息: {}, 延时: {}ms", message, delayMs);
rabbitTemplate.convertAndSend(
DelayPluginConfig.DELAY_EXCHANGE,
DelayPluginConfig.DELAY_ROUTING_KEY,
message,
msg -> {
// 设置延时时间(毫秒)
msg.getMessageProperties().setDelay((int) delayMs);
return msg;
}
);
}
/**
* 发送延时消息(秒)
*/
public void sendDelaySeconds(String message, int seconds) {
sendDelayMessage(message, seconds * 1000L);
}
/**
* 发送延时消息(分钟)
*/
public void sendDelayMinutes(String message, int minutes) {
sendDelayMessage(message, minutes * 60 * 1000L);
}
}
消费者
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.config.DelayPluginConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
@Slf4j
@Component
public class DelayPluginConsumer {
@RabbitListener(queues = DelayPluginConfig.DELAY_QUEUE)
public void handleDelayMessage(String message, Channel channel, Message amqpMessage)
throws IOException {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
Integer delay = amqpMessage.getMessageProperties().getDelay();
log.info("📥 收到延时消息:");
log.info(" 内容: {}", message);
log.info(" 延时: {}ms", delay);
log.info(" 时间: {}", LocalDateTime.now());
try {
// 处理业务逻辑
processDelayedTask(message);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("延时消息处理失败", e);
channel.basicNack(deliveryTag, false, false);
}
}
private void processDelayedTask(String message) {
log.info("✅ 处理延时任务: {}", message);
}
}
🔍 实战案例
订单超时取消
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderDelayService {
private final RabbitTemplate rabbitTemplate;
private final OrderRepository orderRepository;
// 订单超时时间(30分钟)
private static final int ORDER_TIMEOUT_MINUTES = 30;
/**
* 创建订单
*/
public Order createOrder(CreateOrderRequest request) {
// 1. 创建订单
Order order = new Order();
order.setOrderId(generateOrderId());
order.setUserId(request.getUserId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.UNPAID);
order.setCreateTime(LocalDateTime.now());
orderRepository.save(order);
// 2. 发送延时消息(30分钟后检查)
sendOrderTimeoutCheck(order.getOrderId());
log.info("订单创建成功: {}", order.getOrderId());
return order;
}
/**
* 发送订单超时检查消息
*/
private void sendOrderTimeoutCheck(String orderId) {
Map<String, Object> message = new HashMap<>();
message.put("orderId", orderId);
message.put("action", "CHECK_PAYMENT");
message.put("createTime", LocalDateTime.now().toString());
rabbitTemplate.convertAndSend(
DelayPluginConfig.DELAY_EXCHANGE,
DelayPluginConfig.DELAY_ROUTING_KEY,
message,
msg -> {
msg.getMessageProperties().setDelay(ORDER_TIMEOUT_MINUTES * 60 * 1000);
return msg;
}
);
log.info("发送订单超时检查消息: {}, {}分钟后执行", orderId, ORDER_TIMEOUT_MINUTES);
}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderTimeoutConsumer {
private final OrderRepository orderRepository;
private final InventoryService inventoryService;
@RabbitListener(queues = DelayPluginConfig.DELAY_QUEUE)
public void handleOrderTimeout(Map<String, Object> message, Channel channel,
Message amqpMessage) throws IOException {
String orderId = (String) message.get("orderId");
log.info("检查订单支付状态: {}", orderId);
try {
Order order = orderRepository.findById(orderId);
if (order == null) {
log.warn("订单不存在: {}", orderId);
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
return;
}
if (order.getStatus() == OrderStatus.UNPAID) {
// 取消订单
order.setStatus(OrderStatus.CANCELLED);
order.setCancelReason("支付超时");
order.setCancelTime(LocalDateTime.now());
orderRepository.save(order);
// 恢复库存
inventoryService.restore(order.getItems());
log.info("订单超时取消: {}", orderId);
} else {
log.info("订单状态正常: {} - {}", orderId, order.getStatus());
}
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("订单超时处理失败: {}", orderId, e);
channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
活动开始提醒
@Slf4j
@Service
@RequiredArgsConstructor
public class ActivityReminderService {
private final RabbitTemplate rabbitTemplate;
/**
* 设置活动提醒
* @param activityId 活动ID
* @param userId 用户ID
* @param startTime 活动开始时间
* @param reminderMinutes 提前多少分钟提醒
*/
public void setReminder(String activityId, Long userId, LocalDateTime startTime,
int reminderMinutes) {
// 计算延时时间
LocalDateTime reminderTime = startTime.minusMinutes(reminderMinutes);
long delayMs = Duration.between(LocalDateTime.now(), reminderTime).toMillis();
if (delayMs <= 0) {
log.warn("提醒时间已过,无法设置提醒");
return;
}
Map<String, Object> message = new HashMap<>();
message.put("activityId", activityId);
message.put("userId", userId);
message.put("startTime", startTime.toString());
message.put("type", "ACTIVITY_REMINDER");
rabbitTemplate.convertAndSend(
DelayPluginConfig.DELAY_EXCHANGE,
DelayPluginConfig.DELAY_ROUTING_KEY,
message,
msg -> {
msg.getMessageProperties().setDelay((int) delayMs);
return msg;
}
);
log.info("设置活动提醒: 活动{}, 用户{}, {}分钟后提醒", activityId, userId, delayMs / 60000);
}
}
📊 两种方案对比
| 特性 | TTL + 死信 | 延时插件 |
|---|---|---|
| 延时精度 | 秒级 | 毫秒级 |
| 任意延时 | ❌ 需要预设队列 | ✅ 支持 |
| 消息顺序 | ❌ 存在问题 | ✅ 按延时时间排序 |
| 安装依赖 | ✅ 无需安装 | ❌ 需要安装插件 |
| 性能 | 一般 | 较好 |
📝 本章小结
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| TTL + 死信 | 无需插件 | 需要预设多个队列 | 固定延时时间 |
| 延时插件 | 灵活精确 | 需要安装插件 | 任意延时时间 |
下一步
延时队列解决了定时任务问题,接下来学习如何解决消息重复消费的 幂等性问题!
