工作队列模式

工作队列(Work Queues)模式也称为任务队列,用于在多个消费者之间分发耗时任务。

🎯 本章目标

  • 理解工作队列模式的应用场景
  • 实现多消费者负载均衡
  • 掌握消息确认机制
  • 了解消息预取和公平分发

📦 模式介绍

                         ┌──────────┐
                    ┌───→│Consumer 1│
┌──────────┐       │    └──────────┘
│ Producer │──────→│ Queue │
└──────────┘       │    ┌──────────┐
                    └───→│Consumer 2│
                         └──────────┘

应用场景:

  • 订单处理
  • 邮件发送
  • 数据导入/导出
  • 图片/视频处理
  • 任何需要异步处理的耗时任务

特点:

  • 多个消费者共同消费一个队列
  • 每条消息只会被一个消费者处理
  • 消息默认按轮询方式分发

🛠️ 基础实现

任务生产者

package com.example.rabbitmq.workqueues;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

/**
 * 工作队列 - 任务生产者
 * 
 * 模拟发送需要处理的任务
 */
public class TaskProducer {
    
    private static final String QUEUE_NAME = "task_queue";
    
    public static void main(String[] args) throws Exception {
        try (Connection connection = RabbitMQUtil.getConnection();
             Channel channel = connection.createChannel()) {
            
            // 声明持久化队列
            // durable=true 表示队列持久化,服务器重启后队列仍存在
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            
            // 模拟发送多个任务
            String[] tasks = {
                "发送邮件..",
                "处理订单....",
                "生成报表......",
                "图片压缩..",
                "视频转码........",
                "数据同步....",
                "日志分析......",
                "文件备份..",
                "消息推送....",
                "数据导出......"
            };
            
            for (int i = 0; i < tasks.length; i++) {
                String task = "任务" + (i + 1) + ": " + tasks[i];
                
                // 发送持久化消息
                // MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化
                channel.basicPublish(
                    "", 
                    QUEUE_NAME, 
                    MessageProperties.PERSISTENT_TEXT_PLAIN, 
                    task.getBytes("UTF-8")
                );
                
                System.out.println("📤 发送任务: " + task);
            }
            
            System.out.println("\n✅ 所有任务发送完成!");
        }
    }
}

任务消费者

package com.example.rabbitmq.workqueues;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * 工作队列 - 任务消费者
 * 
 * 处理队列中的任务,根据任务中的点号数量模拟处理时间
 */
public class TaskConsumer {
    
    private static final String QUEUE_NAME = "task_queue";
    
    public static void main(String[] args) throws Exception {
        // 消费者编号(通过命令行参数传入)
        String consumerName = args.length > 0 ? args[0] : "Consumer-1";
        
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        System.out.println("🔧 [" + consumerName + "] 等待任务...");
        
        // 消息处理回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String task = new String(delivery.getBody(), "UTF-8");
            
            System.out.println("📥 [" + consumerName + "] 收到任务: " + task);
            
            try {
                // 模拟任务处理(根据点号数量决定处理时间)
                doWork(task);
                System.out.println("✅ [" + consumerName + "] 任务完成: " + task);
            } catch (InterruptedException e) {
                System.out.println("❌ [" + consumerName + "] 任务失败: " + task);
            } finally {
                // 手动确认消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        
        // 关闭自动确认,使用手动确认
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
    }
    
    /**
     * 模拟任务处理
     * 每个点号代表1秒的处理时间
     */
    private static void doWork(String task) throws InterruptedException {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(500);  // 每个点休眠500毫秒
            }
        }
    }
}

🔄 轮询分发

默认情况下,RabbitMQ 采用轮询(Round-Robin)方式分发消息:

消息1 → Consumer-1
消息2 → Consumer-2
消息3 → Consumer-1
消息4 → Consumer-2
...

测试步骤:

  1. 启动两个消费者:
# 终端1
java TaskConsumer Consumer-1

# 终端2
java TaskConsumer Consumer-2
  1. 启动生产者发送任务

  2. 观察输出,消息会均匀分配给两个消费者

✅ 消息确认(Message Acknowledgment)

为什么需要消息确认?

默认的自动确认模式存在问题:

生产者 → 队列 → 消费者(收到消息后自动确认)
                    ↓
              消费者崩溃
                    ↓
              消息丢失!

使用手动确认可以避免消息丢失:

生产者 → 队列 → 消费者(处理完成后手动确认)
                    ↓
              确认成功 → 队列删除消息
              确认失败/超时 → 消息重新入队

手动确认代码

// 关闭自动确认
boolean autoAck = false;

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
        String message = new String(delivery.getBody(), "UTF-8");
        
        // 处理消息
        processMessage(message);
        
        // 处理成功,确认消息
        // 参数1: deliveryTag - 消息标签
        // 参数2: multiple - 是否批量确认(false 只确认当前消息)
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        
    } catch (Exception e) {
        // 处理失败,拒绝消息
        // 参数3: requeue - 是否重新入队(true 重新入队,false 丢弃)
        channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
    }
};

channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});

确认方法对比

方法说明
basicAck确认消息,消息会从队列中删除
basicNack拒绝消息,可以选择重新入队
basicReject拒绝单条消息,可以选择重新入队

完整的确认示例

package com.example.rabbitmq.workqueues;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * 带消息确认的消费者
 */
public class AckConsumer {
    
    private static final String QUEUE_NAME = "task_queue";
    
    public static void main(String[] args) throws Exception {
        String consumerName = args.length > 0 ? args[0] : "AckConsumer";
        
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        System.out.println("🔧 [" + consumerName + "] 等待任务(带确认)...");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String task = new String(delivery.getBody(), "UTF-8");
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            
            System.out.println("📥 收到任务[" + deliveryTag + "]: " + task);
            
            try {
                // 模拟处理
                processTask(task);
                
                // 处理成功,确认消息
                channel.basicAck(deliveryTag, false);
                System.out.println("✅ 确认任务[" + deliveryTag + "]: " + task);
                
            } catch (Exception e) {
                System.out.println("❌ 任务失败[" + deliveryTag + "]: " + e.getMessage());
                
                // 处理失败,判断是否需要重试
                if (shouldRetry(e)) {
                    // 重新入队,等待重新处理
                    channel.basicNack(deliveryTag, false, true);
                    System.out.println("🔄 重新入队[" + deliveryTag + "]");
                } else {
                    // 不重试,拒绝消息(可以配置死信队列接收)
                    channel.basicNack(deliveryTag, false, false);
                    System.out.println("🗑️ 丢弃消息[" + deliveryTag + "]");
                }
            }
        };
        
        // 手动确认模式
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
    }
    
    private static void processTask(String task) throws Exception {
        // 模拟处理,某些任务可能失败
        if (task.contains("失败")) {
            throw new RuntimeException("任务处理失败");
        }
        Thread.sleep(1000);
    }
    
    private static boolean shouldRetry(Exception e) {
        // 判断是否需要重试(这里简化处理)
        return e.getMessage() != null && !e.getMessage().contains("不可重试");
    }
}

📊 公平分发(Fair Dispatch)

问题:轮询分发的缺陷

轮询分发不考虑消费者的处理能力:

Consumer-1(快):处理消息 1, 3, 5, 7, 9(已完成)
Consumer-2(慢):处理消息 2, 4(正在处理),6, 8, 10(堆积)

结果:Consumer-1 空闲,Consumer-2 超负荷

解决方案:Prefetch 预取

设置每个消费者一次最多获取的消息数量:

// 每次只处理一条消息,处理完成后再获取下一条
int prefetchCount = 1;
channel.basicQos(prefetchCount);

设置后的效果:

Consumer-1(快):处理 1 → 完成 → 处理 3 → 完成 → 处理 5...
Consumer-2(慢):处理 2 → 完成 → 处理 4...

结果:能者多劳,公平分配

公平分发完整示例

package com.example.rabbitmq.workqueues;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * 公平分发消费者
 * 
 * 使用 QoS 限制,实现能者多劳
 */
public class FairConsumer {
    
    private static final String QUEUE_NAME = "task_queue";
    
    public static void main(String[] args) throws Exception {
        String consumerName = args.length > 0 ? args[0] : "FairConsumer";
        int processTime = args.length > 1 ? Integer.parseInt(args[1]) : 1000;
        
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 🔑 关键:设置 QoS,每次只获取 1 条消息
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        
        System.out.println("🔧 [" + consumerName + "] 等待任务(公平分发,处理时间:" + processTime + "ms)...");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String task = new String(delivery.getBody(), "UTF-8");
            
            System.out.println("📥 [" + consumerName + "] 收到: " + task);
            
            try {
                // 模拟处理(不同消费者处理速度不同)
                Thread.sleep(processTime);
                System.out.println("✅ [" + consumerName + "] 完成: " + task);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 手动确认
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        
        // 手动确认模式
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
    }
}

测试公平分发

# 启动慢消费者(处理时间 3000ms)
java FairConsumer SlowConsumer 3000

# 启动快消费者(处理时间 500ms)
java FairConsumer FastConsumer 500

# 发送 10 个任务
java TaskProducer

预期结果:

  • FastConsumer 会处理更多任务
  • SlowConsumer 处理较少任务
  • 实现了"能者多劳"的效果

💾 消息持久化

为什么需要持久化?

如果 RabbitMQ 服务器重启,默认情况下:

  • 非持久化队列会丢失
  • 非持久化消息会丢失

实现持久化

1. 队列持久化:

// durable = true 表示队列持久化
channel.queueDeclare("task_queue", true, false, false, null);

2. 消息持久化:

// 使用 PERSISTENT_TEXT_PLAIN 发送持久化消息
channel.basicPublish(
    "", 
    "task_queue",
    MessageProperties.PERSISTENT_TEXT_PLAIN,  // 持久化
    message.getBytes()
);

或者使用 BasicProperties

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .deliveryMode(2)  // 2 表示持久化,1 表示非持久化
    .build();

channel.basicPublish("", "task_queue", properties, message.getBytes());

注意

消息持久化并不能 100% 保证消息不丢失。消息可能只存在于缓存中还未刷盘时服务器就崩溃了。如果需要更强的保证,请使用发布确认机制。

📋 完整示例

生产者(带持久化)

package com.example.rabbitmq.workqueues;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 工作队列生产者 - 完整版
 */
public class WorkQueueProducer {
    
    private static final String QUEUE_NAME = "work_queue";
    
    public static void main(String[] args) throws Exception {
        try (Connection connection = RabbitMQUtil.getConnection();
             Channel channel = connection.createChannel()) {
            
            // 声明持久化队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            
            AtomicInteger counter = new AtomicInteger(0);
            
            // 持续发送任务
            while (true) {
                int taskId = counter.incrementAndGet();
                String task = String.format("任务#%d [%s]", taskId, generateDots());
                
                // 发送持久化消息
                channel.basicPublish(
                    "", 
                    QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    task.getBytes("UTF-8")
                );
                
                System.out.println("📤 发送: " + task);
                
                // 每秒发送一个任务
                Thread.sleep(1000);
                
                // 发送 20 个任务后停止
                if (taskId >= 20) {
                    break;
                }
            }
            
            System.out.println("✅ 所有任务发送完成!");
        }
    }
    
    /**
     * 随机生成点号(模拟不同处理时长的任务)
     */
    private static String generateDots() {
        int count = (int) (Math.random() * 5) + 1;
        return ".".repeat(count);
    }
}

消费者(带确认和公平分发)

package com.example.rabbitmq.workqueues;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * 工作队列消费者 - 完整版
 */
public class WorkQueueConsumer {
    
    private static final String QUEUE_NAME = "work_queue";
    
    public static void main(String[] args) throws Exception {
        // 消费者名称
        String workerName = args.length > 0 ? args[0] : "Worker-" + System.currentTimeMillis() % 1000;
        
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 设置 QoS,公平分发
        channel.basicQos(1);
        
        System.out.println("🔧 [" + workerName + "] 已就绪,等待任务...\n");
        
        // 统计信息
        final int[] stats = {0, 0};  // [成功数, 失败数]
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String task = new String(delivery.getBody(), "UTF-8");
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            
            System.out.println("📥 [" + workerName + "] 开始处理: " + task);
            long startTime = System.currentTimeMillis();
            
            try {
                // 模拟处理任务
                doWork(task);
                
                long elapsed = System.currentTimeMillis() - startTime;
                System.out.println("✅ [" + workerName + "] 完成: " + task + " (耗时: " + elapsed + "ms)");
                
                // 确认消息
                channel.basicAck(deliveryTag, false);
                stats[0]++;
                
            } catch (Exception e) {
                long elapsed = System.currentTimeMillis() - startTime;
                System.out.println("❌ [" + workerName + "] 失败: " + task + " (耗时: " + elapsed + "ms)");
                
                // 拒绝消息,不重新入队
                channel.basicNack(deliveryTag, false, false);
                stats[1]++;
            }
            
            System.out.println("📊 统计: 成功=" + stats[0] + ", 失败=" + stats[1] + "\n");
        };
        
        // 手动确认模式
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
        
        // 添加关闭钩子
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("\n🛑 [" + workerName + "] 停止工作");
            System.out.println("📊 最终统计: 成功=" + stats[0] + ", 失败=" + stats[1]);
        }));
    }
    
    /**
     * 模拟任务处理
     */
    private static void doWork(String task) throws InterruptedException {
        // 根据点号数量决定处理时间
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(500);
            }
        }
        
        // 模拟随机失败(10% 概率)
        if (Math.random() < 0.1) {
            throw new RuntimeException("随机失败");
        }
    }
}

📝 本章小结

本章学习了工作队列模式:

知识点说明
工作队列多消费者共享一个队列,实现负载均衡
轮询分发默认方式,消息均匀分配给消费者
消息确认手动确认保证消息不丢失
公平分发使用 QoS 实现能者多劳
持久化队列和消息持久化保证重启不丢失

最佳实践:

  1. ✅ 使用手动确认而非自动确认
  2. ✅ 设置 prefetchCount=1 实现公平分发
  3. ✅ 对重要消息启用持久化
  4. ✅ 处理失败时合理选择重试或拒绝

下一步

工作队列实现了任务分发,但消息只能被一个消费者处理。如果需要将消息广播给多个消费者,请学习 发布/订阅模式