路由模式
路由(Routing)模式可以根据路由键将消息选择性地发送到指定队列。
🎯 本章目标
- 理解 Direct 交换机的工作原理
- 掌握路由键的使用方法
- 实现消息的选择性分发
📦 模式介绍
┌─────────┐ ┌──────────┐
┌───→│ Queue 1 │───→│Consumer 1│
│ └─────────┘ └──────────┘
┌──────────┐ ┌────────┤ Binding: error
│ Producer │───→│Exchange│
└──────────┘ │(direct)│ ┌─────────┐ ┌──────────┐
└────────┤───→│ Queue 2 │───→│Consumer 2│
│ └─────────┘ └──────────┘
│ Bindings: info, warning, error
特点:
- 消息根据路由键进行精确匹配
- 一个队列可以绑定多个路由键
- 同一个路由键可以绑定到多个队列
🔄 Direct 交换机
Direct 交换机根据消息的路由键与绑定的路由键进行精确匹配,将消息路由到对应的队列。
// 声明 Direct 交换机
channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
// 绑定队列(可以使用不同的路由键)
channel.queueBind(queueName, "direct_logs", "error");
channel.queueBind(queueName, "direct_logs", "warning");
// 发送消息时指定路由键
channel.basicPublish("direct_logs", "error", null, message.getBytes());
🛠️ 代码实现
日志生产者
package com.example.rabbitmq.routing;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* 路由模式 - 日志生产者
*
* 发送不同级别的日志,使用路由键区分
*/
public class LogProducer {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
try (Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel()) {
// 1. 声明 Direct 类型的交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
System.out.println("✅ 交换机声明成功: " + EXCHANGE_NAME);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
// 2. 发送不同级别的日志
String[][] logs = {
{"info", "用户登录成功"},
{"info", "查询订单列表"},
{"debug", "SQL执行: SELECT * FROM users"},
{"warning", "内存使用率达到 75%"},
{"info", "文件上传完成"},
{"error", "数据库连接失败!"},
{"warning", "接口响应时间过长"},
{"error", "支付回调验签失败!"},
{"debug", "请求参数: {userId: 123}"},
{"info", "用户退出登录"}
};
for (String[] log : logs) {
String level = log[0];
String message = log[1];
String timestamp = LocalDateTime.now().format(formatter);
String logMessage = String.format("[%s] %s", timestamp, message);
// 3. 发送消息,使用日志级别作为路由键
channel.basicPublish(EXCHANGE_NAME, level, null, logMessage.getBytes("UTF-8"));
System.out.println("📤 发送 [" + level.toUpperCase() + "] 日志: " + logMessage);
Thread.sleep(500);
}
System.out.println("\n✅ 所有日志发送完成!");
}
}
}
错误日志消费者
只接收 ERROR 级别的日志:
package com.example.rabbitmq.routing;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* 路由模式 - 错误日志消费者
*
* 只接收 error 级别的日志
*/
public class ErrorLogConsumer {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String QUEUE_NAME = "error_logs_queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 1. 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 2. 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 3. 绑定队列到交换机,只绑定 error 路由键
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
System.out.println("🔴 错误日志消费者已启动");
System.out.println("📋 订阅: error");
System.out.println("⏳ 等待消息...\n");
// 4. 消费消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("🔴 [ERROR] " + message);
System.out.println(" → 已记录到错误日志文件");
System.out.println(" → 已发送告警通知\n");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
普通日志消费者
接收 INFO、WARNING、ERROR 级别的日志:
package com.example.rabbitmq.routing;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* 路由模式 - 普通日志消费者
*
* 接收 info、warning、error 级别的日志
*/
public class GeneralLogConsumer {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String QUEUE_NAME = "general_logs_queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 1. 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 2. 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 3. 绑定多个路由键到同一个队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
System.out.println("📋 普通日志消费者已启动");
System.out.println("📋 订阅: info, warning, error");
System.out.println("⏳ 等待消息...\n");
// 4. 消费消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
String icon = switch (routingKey) {
case "info" -> "ℹ️";
case "warning" -> "⚠️";
case "error" -> "🔴";
default -> "📝";
};
System.out.println(icon + " [" + routingKey.toUpperCase() + "] " + message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
调试日志消费者
接收所有级别的日志(包括 DEBUG):
package com.example.rabbitmq.routing;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* 路由模式 - 调试日志消费者
*
* 接收所有级别的日志(debug, info, warning, error)
*/
public class DebugLogConsumer {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String QUEUE_NAME = "debug_logs_queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 1. 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 2. 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 3. 绑定所有日志级别
String[] levels = {"debug", "info", "warning", "error"};
for (String level : levels) {
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, level);
}
System.out.println("🔍 调试日志消费者已启动");
System.out.println("📋 订阅: debug, info, warning, error");
System.out.println("⏳ 等待消息...\n");
// 4. 消费消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("🔍 [" + routingKey.toUpperCase() + "] " + message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
🔍 实战案例
案例:订单状态通知
不同的订单状态需要通知不同的服务:
订单状态变更
↓
发送到 order.status 交换机
↓
┌───────────────────────────────────────────────────────┐
│ │
│ routing_key=created → 库存服务(扣减库存) │
│ routing_key=paid → 物流服务(创建发货单) │
│ routing_key=paid → 财务服务(记录收入) │
│ routing_key=cancelled→ 库存服务(恢复库存) │
│ routing_key=completed→ 积分服务(赠送积分) │
│ │
└───────────────────────────────────────────────────────┘
订单状态发布者:
package com.example.rabbitmq.routing;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*;
/**
* 订单状态变更发布者
*/
public class OrderStatusPublisher {
private static final String EXCHANGE_NAME = "order.status";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
try (Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel()) {
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
// 模拟订单状态变更
Object[][] statusChanges = {
{1001L, "created", "订单创建"},
{1001L, "paid", "订单支付"},
{1002L, "created", "订单创建"},
{1002L, "cancelled", "订单取消"},
{1001L, "shipped", "订单发货"},
{1003L, "created", "订单创建"},
{1003L, "paid", "订单支付"},
{1001L, "completed", "订单完成"},
{1003L, "shipped", "订单发货"},
{1003L, "completed", "订单完成"}
};
for (Object[] change : statusChanges) {
Long orderId = (Long) change[0];
String status = (String) change[1];
String description = (String) change[2];
Map<String, Object> event = new HashMap<>();
event.put("orderId", orderId);
event.put("status", status);
event.put("description", description);
event.put("timestamp", System.currentTimeMillis());
String eventJson = objectMapper.writeValueAsString(event);
// 使用订单状态作为路由键
channel.basicPublish(EXCHANGE_NAME, status, null, eventJson.getBytes("UTF-8"));
System.out.println("📤 订单 " + orderId + " 状态变更: " + status + " (" + description + ")");
Thread.sleep(1000);
}
}
}
}
库存服务(订阅 created 和 cancelled):
package com.example.rabbitmq.routing;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
/**
* 库存服务 - 处理订单创建和取消
*/
public class InventoryService {
private static final String EXCHANGE_NAME = "order.status";
private static final String QUEUE_NAME = "inventory.service.queue";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 订阅 created 和 cancelled 状态
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "created");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "cancelled");
System.out.println("📦 库存服务已启动");
System.out.println("📋 订阅: created, cancelled\n");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String eventJson = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
try {
Map<String, Object> event = objectMapper.readValue(eventJson, Map.class);
Long orderId = ((Number) event.get("orderId")).longValue();
if ("created".equals(routingKey)) {
System.out.println("📦 订单 " + orderId + " 创建 → 扣减库存");
} else if ("cancelled".equals(routingKey)) {
System.out.println("📦 订单 " + orderId + " 取消 → 恢复库存");
}
} catch (Exception e) {
e.printStackTrace();
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
物流服务(订阅 paid):
package com.example.rabbitmq.routing;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
/**
* 物流服务 - 处理已支付订单
*/
public class ShippingService {
private static final String EXCHANGE_NAME = "order.status";
private static final String QUEUE_NAME = "shipping.service.queue";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 只订阅 paid 状态
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "paid");
System.out.println("🚚 物流服务已启动");
System.out.println("📋 订阅: paid\n");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String eventJson = new String(delivery.getBody(), "UTF-8");
try {
Map<String, Object> event = objectMapper.readValue(eventJson, Map.class);
Long orderId = ((Number) event.get("orderId")).longValue();
System.out.println("🚚 订单 " + orderId + " 已支付 → 创建发货单");
System.out.println(" → 分配仓库");
System.out.println(" → 生成运单号\n");
} catch (Exception e) {
e.printStackTrace();
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
积分服务(订阅 completed):
package com.example.rabbitmq.routing;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
/**
* 积分服务 - 处理已完成订单
*/
public class PointService {
private static final String EXCHANGE_NAME = "order.status";
private static final String QUEUE_NAME = "point.service.queue";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 只订阅 completed 状态
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "completed");
System.out.println("🎁 积分服务已启动");
System.out.println("📋 订阅: completed\n");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String eventJson = new String(delivery.getBody(), "UTF-8");
try {
Map<String, Object> event = objectMapper.readValue(eventJson, Map.class);
Long orderId = ((Number) event.get("orderId")).longValue();
int points = (int) (Math.random() * 100) + 50;
System.out.println("🎁 订单 " + orderId + " 完成 → 赠送 " + points + " 积分\n");
} catch (Exception e) {
e.printStackTrace();
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
📊 路由键匹配规则
Direct 交换机的路由键匹配是精确匹配:
| 发送时路由键 | 绑定路由键 | 是否匹配 |
|---|---|---|
| error | error | ✅ 匹配 |
| error | Error | ❌ 不匹配(区分大小写) |
| error.log | error | ❌ 不匹配 |
| error | error.* | ❌ 不匹配(不支持通配符) |
需要模糊匹配?
如果需要使用通配符进行模糊匹配,请使用 Topic 交换机。
📝 本章小结
本章学习了路由模式:
| 知识点 | 说明 |
|---|---|
| Direct 交换机 | 根据路由键精确匹配 |
| 路由键 | 发送消息时指定,用于路由决策 |
| 绑定 | 队列与交换机的关联关系 |
| 多绑定 | 一个队列可以绑定多个路由键 |
适用场景:
- ✅ 日志级别分类
- ✅ 订单状态分发
- ✅ 需要精确匹配的场景
限制:
- ❌ 不支持通配符匹配
- ❌ 路由键必须完全一致
下一步
如果需要更灵活的路由规则(支持通配符),请学习 主题模式!
