工作队列模式
工作队列(Work Queues)模式也称为任务队列,用于在多个消费者之间分发耗时任务。
🎯 本章目标
- 理解工作队列模式的应用场景
- 实现多消费者负载均衡
- 掌握消息确认机制
- 了解消息预取和公平分发
📦 模式介绍
┌──────────┐
┌───→│Consumer 1│
┌──────────┐ │ └──────────┘
│ Producer │──────→│ Queue │
└──────────┘ │ ┌──────────┐
└───→│Consumer 2│
└──────────┘
应用场景:
- 订单处理
- 邮件发送
- 数据导入/导出
- 图片/视频处理
- 任何需要异步处理的耗时任务
特点:
- 多个消费者共同消费一个队列
- 每条消息只会被一个消费者处理
- 消息默认按轮询方式分发
🛠️ 基础实现
任务生产者
package com.example.rabbitmq.workqueues;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
/**
* 工作队列 - 任务生产者
*
* 模拟发送需要处理的任务
*/
public class TaskProducer {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
try (Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel()) {
// 声明持久化队列
// durable=true 表示队列持久化,服务器重启后队列仍存在
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 模拟发送多个任务
String[] tasks = {
"发送邮件..",
"处理订单....",
"生成报表......",
"图片压缩..",
"视频转码........",
"数据同步....",
"日志分析......",
"文件备份..",
"消息推送....",
"数据导出......"
};
for (int i = 0; i < tasks.length; i++) {
String task = "任务" + (i + 1) + ": " + tasks[i];
// 发送持久化消息
// MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化
channel.basicPublish(
"",
QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
task.getBytes("UTF-8")
);
System.out.println("📤 发送任务: " + task);
}
System.out.println("\n✅ 所有任务发送完成!");
}
}
}
任务消费者
package com.example.rabbitmq.workqueues;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* 工作队列 - 任务消费者
*
* 处理队列中的任务,根据任务中的点号数量模拟处理时间
*/
public class TaskConsumer {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
// 消费者编号(通过命令行参数传入)
String consumerName = args.length > 0 ? args[0] : "Consumer-1";
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("🔧 [" + consumerName + "] 等待任务...");
// 消息处理回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String task = new String(delivery.getBody(), "UTF-8");
System.out.println("📥 [" + consumerName + "] 收到任务: " + task);
try {
// 模拟任务处理(根据点号数量决定处理时间)
doWork(task);
System.out.println("✅ [" + consumerName + "] 任务完成: " + task);
} catch (InterruptedException e) {
System.out.println("❌ [" + consumerName + "] 任务失败: " + task);
} finally {
// 手动确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 关闭自动确认,使用手动确认
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
}
/**
* 模拟任务处理
* 每个点号代表1秒的处理时间
*/
private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.') {
Thread.sleep(500); // 每个点休眠500毫秒
}
}
}
}
🔄 轮询分发
默认情况下,RabbitMQ 采用轮询(Round-Robin)方式分发消息:
消息1 → Consumer-1
消息2 → Consumer-2
消息3 → Consumer-1
消息4 → Consumer-2
...
测试步骤:
- 启动两个消费者:
# 终端1
java TaskConsumer Consumer-1
# 终端2
java TaskConsumer Consumer-2
启动生产者发送任务
观察输出,消息会均匀分配给两个消费者
✅ 消息确认(Message Acknowledgment)
为什么需要消息确认?
默认的自动确认模式存在问题:
生产者 → 队列 → 消费者(收到消息后自动确认)
↓
消费者崩溃
↓
消息丢失!
使用手动确认可以避免消息丢失:
生产者 → 队列 → 消费者(处理完成后手动确认)
↓
确认成功 → 队列删除消息
确认失败/超时 → 消息重新入队
手动确认代码
// 关闭自动确认
boolean autoAck = false;
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
String message = new String(delivery.getBody(), "UTF-8");
// 处理消息
processMessage(message);
// 处理成功,确认消息
// 参数1: deliveryTag - 消息标签
// 参数2: multiple - 是否批量确认(false 只确认当前消息)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// 处理失败,拒绝消息
// 参数3: requeue - 是否重新入队(true 重新入队,false 丢弃)
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
确认方法对比
| 方法 | 说明 |
|---|---|
basicAck | 确认消息,消息会从队列中删除 |
basicNack | 拒绝消息,可以选择重新入队 |
basicReject | 拒绝单条消息,可以选择重新入队 |
完整的确认示例
package com.example.rabbitmq.workqueues;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* 带消息确认的消费者
*/
public class AckConsumer {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
String consumerName = args.length > 0 ? args[0] : "AckConsumer";
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("🔧 [" + consumerName + "] 等待任务(带确认)...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String task = new String(delivery.getBody(), "UTF-8");
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
System.out.println("📥 收到任务[" + deliveryTag + "]: " + task);
try {
// 模拟处理
processTask(task);
// 处理成功,确认消息
channel.basicAck(deliveryTag, false);
System.out.println("✅ 确认任务[" + deliveryTag + "]: " + task);
} catch (Exception e) {
System.out.println("❌ 任务失败[" + deliveryTag + "]: " + e.getMessage());
// 处理失败,判断是否需要重试
if (shouldRetry(e)) {
// 重新入队,等待重新处理
channel.basicNack(deliveryTag, false, true);
System.out.println("🔄 重新入队[" + deliveryTag + "]");
} else {
// 不重试,拒绝消息(可以配置死信队列接收)
channel.basicNack(deliveryTag, false, false);
System.out.println("🗑️ 丢弃消息[" + deliveryTag + "]");
}
}
};
// 手动确认模式
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
private static void processTask(String task) throws Exception {
// 模拟处理,某些任务可能失败
if (task.contains("失败")) {
throw new RuntimeException("任务处理失败");
}
Thread.sleep(1000);
}
private static boolean shouldRetry(Exception e) {
// 判断是否需要重试(这里简化处理)
return e.getMessage() != null && !e.getMessage().contains("不可重试");
}
}
📊 公平分发(Fair Dispatch)
问题:轮询分发的缺陷
轮询分发不考虑消费者的处理能力:
Consumer-1(快):处理消息 1, 3, 5, 7, 9(已完成)
Consumer-2(慢):处理消息 2, 4(正在处理),6, 8, 10(堆积)
结果:Consumer-1 空闲,Consumer-2 超负荷
解决方案:Prefetch 预取
设置每个消费者一次最多获取的消息数量:
// 每次只处理一条消息,处理完成后再获取下一条
int prefetchCount = 1;
channel.basicQos(prefetchCount);
设置后的效果:
Consumer-1(快):处理 1 → 完成 → 处理 3 → 完成 → 处理 5...
Consumer-2(慢):处理 2 → 完成 → 处理 4...
结果:能者多劳,公平分配
公平分发完整示例
package com.example.rabbitmq.workqueues;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* 公平分发消费者
*
* 使用 QoS 限制,实现能者多劳
*/
public class FairConsumer {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
String consumerName = args.length > 0 ? args[0] : "FairConsumer";
int processTime = args.length > 1 ? Integer.parseInt(args[1]) : 1000;
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 🔑 关键:设置 QoS,每次只获取 1 条消息
int prefetchCount = 1;
channel.basicQos(prefetchCount);
System.out.println("🔧 [" + consumerName + "] 等待任务(公平分发,处理时间:" + processTime + "ms)...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String task = new String(delivery.getBody(), "UTF-8");
System.out.println("📥 [" + consumerName + "] 收到: " + task);
try {
// 模拟处理(不同消费者处理速度不同)
Thread.sleep(processTime);
System.out.println("✅ [" + consumerName + "] 完成: " + task);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 手动确认模式
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}
测试公平分发
# 启动慢消费者(处理时间 3000ms)
java FairConsumer SlowConsumer 3000
# 启动快消费者(处理时间 500ms)
java FairConsumer FastConsumer 500
# 发送 10 个任务
java TaskProducer
预期结果:
- FastConsumer 会处理更多任务
- SlowConsumer 处理较少任务
- 实现了"能者多劳"的效果
💾 消息持久化
为什么需要持久化?
如果 RabbitMQ 服务器重启,默认情况下:
- 非持久化队列会丢失
- 非持久化消息会丢失
实现持久化
1. 队列持久化:
// durable = true 表示队列持久化
channel.queueDeclare("task_queue", true, false, false, null);
2. 消息持久化:
// 使用 PERSISTENT_TEXT_PLAIN 发送持久化消息
channel.basicPublish(
"",
"task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN, // 持久化
message.getBytes()
);
或者使用 BasicProperties:
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2 表示持久化,1 表示非持久化
.build();
channel.basicPublish("", "task_queue", properties, message.getBytes());
注意
消息持久化并不能 100% 保证消息不丢失。消息可能只存在于缓存中还未刷盘时服务器就崩溃了。如果需要更强的保证,请使用发布确认机制。
📋 完整示例
生产者(带持久化)
package com.example.rabbitmq.workqueues;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 工作队列生产者 - 完整版
*/
public class WorkQueueProducer {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
try (Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel()) {
// 声明持久化队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
AtomicInteger counter = new AtomicInteger(0);
// 持续发送任务
while (true) {
int taskId = counter.incrementAndGet();
String task = String.format("任务#%d [%s]", taskId, generateDots());
// 发送持久化消息
channel.basicPublish(
"",
QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
task.getBytes("UTF-8")
);
System.out.println("📤 发送: " + task);
// 每秒发送一个任务
Thread.sleep(1000);
// 发送 20 个任务后停止
if (taskId >= 20) {
break;
}
}
System.out.println("✅ 所有任务发送完成!");
}
}
/**
* 随机生成点号(模拟不同处理时长的任务)
*/
private static String generateDots() {
int count = (int) (Math.random() * 5) + 1;
return ".".repeat(count);
}
}
消费者(带确认和公平分发)
package com.example.rabbitmq.workqueues;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* 工作队列消费者 - 完整版
*/
public class WorkQueueConsumer {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
// 消费者名称
String workerName = args.length > 0 ? args[0] : "Worker-" + System.currentTimeMillis() % 1000;
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 设置 QoS,公平分发
channel.basicQos(1);
System.out.println("🔧 [" + workerName + "] 已就绪,等待任务...\n");
// 统计信息
final int[] stats = {0, 0}; // [成功数, 失败数]
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String task = new String(delivery.getBody(), "UTF-8");
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
System.out.println("📥 [" + workerName + "] 开始处理: " + task);
long startTime = System.currentTimeMillis();
try {
// 模拟处理任务
doWork(task);
long elapsed = System.currentTimeMillis() - startTime;
System.out.println("✅ [" + workerName + "] 完成: " + task + " (耗时: " + elapsed + "ms)");
// 确认消息
channel.basicAck(deliveryTag, false);
stats[0]++;
} catch (Exception e) {
long elapsed = System.currentTimeMillis() - startTime;
System.out.println("❌ [" + workerName + "] 失败: " + task + " (耗时: " + elapsed + "ms)");
// 拒绝消息,不重新入队
channel.basicNack(deliveryTag, false, false);
stats[1]++;
}
System.out.println("📊 统计: 成功=" + stats[0] + ", 失败=" + stats[1] + "\n");
};
// 手动确认模式
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("\n🛑 [" + workerName + "] 停止工作");
System.out.println("📊 最终统计: 成功=" + stats[0] + ", 失败=" + stats[1]);
}));
}
/**
* 模拟任务处理
*/
private static void doWork(String task) throws InterruptedException {
// 根据点号数量决定处理时间
for (char ch : task.toCharArray()) {
if (ch == '.') {
Thread.sleep(500);
}
}
// 模拟随机失败(10% 概率)
if (Math.random() < 0.1) {
throw new RuntimeException("随机失败");
}
}
}
📝 本章小结
本章学习了工作队列模式:
| 知识点 | 说明 |
|---|---|
| 工作队列 | 多消费者共享一个队列,实现负载均衡 |
| 轮询分发 | 默认方式,消息均匀分配给消费者 |
| 消息确认 | 手动确认保证消息不丢失 |
| 公平分发 | 使用 QoS 实现能者多劳 |
| 持久化 | 队列和消息持久化保证重启不丢失 |
最佳实践:
- ✅ 使用手动确认而非自动确认
- ✅ 设置
prefetchCount=1实现公平分发 - ✅ 对重要消息启用持久化
- ✅ 处理失败时合理选择重试或拒绝
下一步
工作队列实现了任务分发,但消息只能被一个消费者处理。如果需要将消息广播给多个消费者,请学习 发布/订阅模式!
