路由模式

路由(Routing)模式可以根据路由键将消息选择性地发送到指定队列。

🎯 本章目标

  • 理解 Direct 交换机的工作原理
  • 掌握路由键的使用方法
  • 实现消息的选择性分发

📦 模式介绍

                              ┌─────────┐    ┌──────────┐
                         ┌───→│ Queue 1 │───→│Consumer 1│
                         │    └─────────┘    └──────────┘
┌──────────┐    ┌────────┤    Binding: error
│ Producer │───→│Exchange│
└──────────┘    │(direct)│    ┌─────────┐    ┌──────────┐
                └────────┤───→│ Queue 2 │───→│Consumer 2│
                         │    └─────────┘    └──────────┘
                         │    Bindings: info, warning, error

特点:

  • 消息根据路由键进行精确匹配
  • 一个队列可以绑定多个路由键
  • 同一个路由键可以绑定到多个队列

🔄 Direct 交换机

Direct 交换机根据消息的路由键与绑定的路由键进行精确匹配,将消息路由到对应的队列。

// 声明 Direct 交换机
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);

// 绑定队列(可以使用不同的路由键)
channel.queueBind(queueName, "direct_logs", "error");
channel.queueBind(queueName, "direct_logs", "warning");

// 发送消息时指定路由键
channel.basicPublish("direct_logs", "error", null, message.getBytes());

🛠️ 代码实现

日志生产者

package com.example.rabbitmq.routing;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * 路由模式 - 日志生产者
 * 
 * 发送不同级别的日志,使用路由键区分
 */
public class LogProducer {
    
    private static final String EXCHANGE_NAME = "direct_logs";
    
    public static void main(String[] args) throws Exception {
        try (Connection connection = RabbitMQUtil.getConnection();
             Channel channel = connection.createChannel()) {
            
            // 1. 声明 Direct 类型的交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
            System.out.println("✅ 交换机声明成功: " + EXCHANGE_NAME);
            
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
            
            // 2. 发送不同级别的日志
            String[][] logs = {
                {"info", "用户登录成功"},
                {"info", "查询订单列表"},
                {"debug", "SQL执行: SELECT * FROM users"},
                {"warning", "内存使用率达到 75%"},
                {"info", "文件上传完成"},
                {"error", "数据库连接失败!"},
                {"warning", "接口响应时间过长"},
                {"error", "支付回调验签失败!"},
                {"debug", "请求参数: {userId: 123}"},
                {"info", "用户退出登录"}
            };
            
            for (String[] log : logs) {
                String level = log[0];
                String message = log[1];
                String timestamp = LocalDateTime.now().format(formatter);
                
                String logMessage = String.format("[%s] %s", timestamp, message);
                
                // 3. 发送消息,使用日志级别作为路由键
                channel.basicPublish(EXCHANGE_NAME, level, null, logMessage.getBytes("UTF-8"));
                
                System.out.println("📤 发送 [" + level.toUpperCase() + "] 日志: " + logMessage);
                
                Thread.sleep(500);
            }
            
            System.out.println("\n✅ 所有日志发送完成!");
        }
    }
}

错误日志消费者

只接收 ERROR 级别的日志:

package com.example.rabbitmq.routing;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * 路由模式 - 错误日志消费者
 * 
 * 只接收 error 级别的日志
 */
public class ErrorLogConsumer {
    
    private static final String EXCHANGE_NAME = "direct_logs";
    private static final String QUEUE_NAME = "error_logs_queue";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        // 1. 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        
        // 2. 声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 3. 绑定队列到交换机,只绑定 error 路由键
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        
        System.out.println("🔴 错误日志消费者已启动");
        System.out.println("📋 订阅: error");
        System.out.println("⏳ 等待消息...\n");
        
        // 4. 消费消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            
            System.out.println("🔴 [ERROR] " + message);
            System.out.println("   → 已记录到错误日志文件");
            System.out.println("   → 已发送告警通知\n");
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

普通日志消费者

接收 INFO、WARNING、ERROR 级别的日志:

package com.example.rabbitmq.routing;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * 路由模式 - 普通日志消费者
 * 
 * 接收 info、warning、error 级别的日志
 */
public class GeneralLogConsumer {
    
    private static final String EXCHANGE_NAME = "direct_logs";
    private static final String QUEUE_NAME = "general_logs_queue";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        // 1. 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        
        // 2. 声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 3. 绑定多个路由键到同一个队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        
        System.out.println("📋 普通日志消费者已启动");
        System.out.println("📋 订阅: info, warning, error");
        System.out.println("⏳ 等待消息...\n");
        
        // 4. 消费消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            
            String icon = switch (routingKey) {
                case "info" -> "ℹ️";
                case "warning" -> "⚠️";
                case "error" -> "🔴";
                default -> "📝";
            };
            
            System.out.println(icon + " [" + routingKey.toUpperCase() + "] " + message);
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

调试日志消费者

接收所有级别的日志(包括 DEBUG):

package com.example.rabbitmq.routing;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * 路由模式 - 调试日志消费者
 * 
 * 接收所有级别的日志(debug, info, warning, error)
 */
public class DebugLogConsumer {
    
    private static final String EXCHANGE_NAME = "direct_logs";
    private static final String QUEUE_NAME = "debug_logs_queue";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        // 1. 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        
        // 2. 声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 3. 绑定所有日志级别
        String[] levels = {"debug", "info", "warning", "error"};
        for (String level : levels) {
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, level);
        }
        
        System.out.println("🔍 调试日志消费者已启动");
        System.out.println("📋 订阅: debug, info, warning, error");
        System.out.println("⏳ 等待消息...\n");
        
        // 4. 消费消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            
            System.out.println("🔍 [" + routingKey.toUpperCase() + "] " + message);
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

🔍 实战案例

案例:订单状态通知

不同的订单状态需要通知不同的服务:

订单状态变更
    ↓
发送到 order.status 交换机
    ↓
┌───────────────────────────────────────────────────────┐
│                                                       │
│  routing_key=created  → 库存服务(扣减库存)           │
│  routing_key=paid     → 物流服务(创建发货单)         │
│  routing_key=paid     → 财务服务(记录收入)           │
│  routing_key=cancelled→ 库存服务(恢复库存)           │
│  routing_key=completed→ 积分服务(赠送积分)           │
│                                                       │
└───────────────────────────────────────────────────────┘

订单状态发布者:

package com.example.rabbitmq.routing;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.*;

/**
 * 订单状态变更发布者
 */
public class OrderStatusPublisher {
    
    private static final String EXCHANGE_NAME = "order.status";
    private static final ObjectMapper objectMapper = new ObjectMapper();
    
    public static void main(String[] args) throws Exception {
        try (Connection connection = RabbitMQUtil.getConnection();
             Channel channel = connection.createChannel()) {
            
            // 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
            
            // 模拟订单状态变更
            Object[][] statusChanges = {
                {1001L, "created", "订单创建"},
                {1001L, "paid", "订单支付"},
                {1002L, "created", "订单创建"},
                {1002L, "cancelled", "订单取消"},
                {1001L, "shipped", "订单发货"},
                {1003L, "created", "订单创建"},
                {1003L, "paid", "订单支付"},
                {1001L, "completed", "订单完成"},
                {1003L, "shipped", "订单发货"},
                {1003L, "completed", "订单完成"}
            };
            
            for (Object[] change : statusChanges) {
                Long orderId = (Long) change[0];
                String status = (String) change[1];
                String description = (String) change[2];
                
                Map<String, Object> event = new HashMap<>();
                event.put("orderId", orderId);
                event.put("status", status);
                event.put("description", description);
                event.put("timestamp", System.currentTimeMillis());
                
                String eventJson = objectMapper.writeValueAsString(event);
                
                // 使用订单状态作为路由键
                channel.basicPublish(EXCHANGE_NAME, status, null, eventJson.getBytes("UTF-8"));
                
                System.out.println("📤 订单 " + orderId + " 状态变更: " + status + " (" + description + ")");
                
                Thread.sleep(1000);
            }
        }
    }
}

库存服务(订阅 created 和 cancelled):

package com.example.rabbitmq.routing;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Map;

/**
 * 库存服务 - 处理订单创建和取消
 */
public class InventoryService {
    
    private static final String EXCHANGE_NAME = "order.status";
    private static final String QUEUE_NAME = "inventory.service.queue";
    private static final ObjectMapper objectMapper = new ObjectMapper();
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 订阅 created 和 cancelled 状态
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "created");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "cancelled");
        
        System.out.println("📦 库存服务已启动");
        System.out.println("📋 订阅: created, cancelled\n");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String eventJson = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            
            try {
                Map<String, Object> event = objectMapper.readValue(eventJson, Map.class);
                Long orderId = ((Number) event.get("orderId")).longValue();
                
                if ("created".equals(routingKey)) {
                    System.out.println("📦 订单 " + orderId + " 创建 → 扣减库存");
                } else if ("cancelled".equals(routingKey)) {
                    System.out.println("📦 订单 " + orderId + " 取消 → 恢复库存");
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

物流服务(订阅 paid):

package com.example.rabbitmq.routing;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Map;

/**
 * 物流服务 - 处理已支付订单
 */
public class ShippingService {
    
    private static final String EXCHANGE_NAME = "order.status";
    private static final String QUEUE_NAME = "shipping.service.queue";
    private static final ObjectMapper objectMapper = new ObjectMapper();
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 只订阅 paid 状态
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "paid");
        
        System.out.println("🚚 物流服务已启动");
        System.out.println("📋 订阅: paid\n");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String eventJson = new String(delivery.getBody(), "UTF-8");
            
            try {
                Map<String, Object> event = objectMapper.readValue(eventJson, Map.class);
                Long orderId = ((Number) event.get("orderId")).longValue();
                
                System.out.println("🚚 订单 " + orderId + " 已支付 → 创建发货单");
                System.out.println("   → 分配仓库");
                System.out.println("   → 生成运单号\n");
                
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

积分服务(订阅 completed):

package com.example.rabbitmq.routing;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Map;

/**
 * 积分服务 - 处理已完成订单
 */
public class PointService {
    
    private static final String EXCHANGE_NAME = "order.status";
    private static final String QUEUE_NAME = "point.service.queue";
    private static final ObjectMapper objectMapper = new ObjectMapper();
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 只订阅 completed 状态
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "completed");
        
        System.out.println("🎁 积分服务已启动");
        System.out.println("📋 订阅: completed\n");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String eventJson = new String(delivery.getBody(), "UTF-8");
            
            try {
                Map<String, Object> event = objectMapper.readValue(eventJson, Map.class);
                Long orderId = ((Number) event.get("orderId")).longValue();
                
                int points = (int) (Math.random() * 100) + 50;
                System.out.println("🎁 订单 " + orderId + " 完成 → 赠送 " + points + " 积分\n");
                
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

📊 路由键匹配规则

Direct 交换机的路由键匹配是精确匹配

发送时路由键绑定路由键是否匹配
errorerror✅ 匹配
errorError❌ 不匹配(区分大小写)
error.logerror❌ 不匹配
errorerror.*❌ 不匹配(不支持通配符)

需要模糊匹配?

如果需要使用通配符进行模糊匹配,请使用 Topic 交换机

📝 本章小结

本章学习了路由模式:

知识点说明
Direct 交换机根据路由键精确匹配
路由键发送消息时指定,用于路由决策
绑定队列与交换机的关联关系
多绑定一个队列可以绑定多个路由键

适用场景:

  • ✅ 日志级别分类
  • ✅ 订单状态分发
  • ✅ 需要精确匹配的场景

限制:

  • ❌ 不支持通配符匹配
  • ❌ 路由键必须完全一致

下一步

如果需要更灵活的路由规则(支持通配符),请学习 主题模式