主题模式
主题(Topics)模式是最灵活的消息路由方式,支持使用通配符进行模糊匹配。
🎯 本章目标
- 理解 Topic 交换机的工作原理
- 掌握通配符的使用规则
- 实现灵活的消息路由
📦 模式介绍
┌─────────┐ ┌──────────┐
┌───→│ Queue 1 │───→│Consumer 1│
│ └─────────┘ └──────────┘
┌──────────┐ ┌────────┤ Binding: *.orange.*
│ Producer │───→│Exchange│
└──────────┘ │(topic) │ ┌─────────┐ ┌──────────┐
└────────┤───→│ Queue 2 │───→│Consumer 2│
│ └─────────┘ └──────────┘
│ Bindings: *.*.rabbit, lazy.#
Topic 交换机是 Direct 交换机的增强版,支持通配符匹配:
| 通配符 | 说明 | 示例 |
|---|---|---|
* | 匹配一个单词 | *.orange.* 匹配 quick.orange.rabbit |
# | 匹配零个或多个单词 | lazy.# 匹配 lazy.a.b.c |
🔤 路由键规则
路由键必须是由 . 分隔的单词列表:
stock.usd.nyse ✅ 有效
quick.orange.rabbit ✅ 有效
a.b.c.d.e.f ✅ 有效
stockusdnyse ✅ 有效(一个单词)
stock_usd_nyse ✅ 有效(下划线是单词的一部分)
通配符匹配示例:
| 绑定键 | 路由键 | 是否匹配 |
|---|---|---|
*.orange.* | quick.orange.rabbit | ✅ |
*.orange.* | lazy.orange.elephant | ✅ |
*.orange.* | quick.orange.male.rabbit | ❌ |
lazy.# | lazy.orange.elephant | ✅ |
lazy.# | lazy.a.b.c.d.e.f | ✅ |
lazy.# | lazy | ✅ |
# | 任意路由键 | ✅ |
* | 单个单词 | ✅ |
🛠️ 代码实现
消息生产者
package com.example.rabbitmq.topics;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* Topic 模式 - 消息生产者
*
* 发送带有不同路由键的消息
*/
public class TopicProducer {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
try (Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel()) {
// 1. 声明 Topic 类型的交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
System.out.println("✅ 交换机声明成功: " + EXCHANGE_NAME + " (topic)\n");
// 2. 发送不同路由键的消息
// 路由键格式: <facility>.<severity>.<source>
String[][] messages = {
{"kern.critical.server1", "内核崩溃"},
{"kern.warning.server1", "内核警告"},
{"auth.info.server1", "用户登录成功"},
{"auth.error.server1", "登录密码错误"},
{"cron.info.server2", "定时任务执行"},
{"cron.warning.server2", "定时任务超时"},
{"mail.info.server2", "邮件发送成功"},
{"mail.error.server3", "邮件发送失败"},
{"kern.info.server3", "内核启动完成"},
{"auth.critical.server3", "暴力破解检测"}
};
for (String[] msg : messages) {
String routingKey = msg[0];
String message = msg[1];
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println("📤 发送消息: [" + routingKey + "] " + message);
Thread.sleep(500);
}
System.out.println("\n✅ 所有消息发送完成!");
}
}
}
内核日志消费者
只接收内核相关的日志:
package com.example.rabbitmq.topics;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* Topic 模式 - 内核日志消费者
*
* 接收所有内核日志: kern.*.*
*/
public class KernelLogConsumer {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String QUEUE_NAME = "kernel_logs_queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定: kern.*.* 匹配所有内核日志
String bindingKey = "kern.*.*";
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, bindingKey);
System.out.println("🖥️ 内核日志消费者已启动");
System.out.println("📋 绑定键: " + bindingKey);
System.out.println("⏳ 等待消息...\n");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("🖥️ [KERNEL] " + routingKey + " → " + message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
严重错误消费者
接收所有 critical 级别的日志:
package com.example.rabbitmq.topics;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* Topic 模式 - 严重错误消费者
*
* 接收所有严重错误: *.critical.*
*/
public class CriticalLogConsumer {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String QUEUE_NAME = "critical_logs_queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定: *.critical.* 匹配所有严重错误
String bindingKey = "*.critical.*";
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, bindingKey);
System.out.println("🚨 严重错误消费者已启动");
System.out.println("📋 绑定键: " + bindingKey);
System.out.println("⏳ 等待消息...\n");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("🚨 [CRITICAL] " + routingKey + " → " + message);
System.out.println(" → 立即发送告警!");
System.out.println(" → 通知运维人员\n");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
Server1 日志消费者
接收来自 server1 的所有日志:
package com.example.rabbitmq.topics;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* Topic 模式 - Server1 日志消费者
*
* 接收 server1 的所有日志: #.server1
*/
public class Server1LogConsumer {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String QUEUE_NAME = "server1_logs_queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定: #.server1 匹配所有来自 server1 的日志
String bindingKey = "#.server1";
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, bindingKey);
System.out.println("🖥️ Server1 日志消费者已启动");
System.out.println("📋 绑定键: " + bindingKey);
System.out.println("⏳ 等待消息...\n");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("🖥️ [SERVER1] " + routingKey + " → " + message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
全量日志消费者
接收所有日志(用于归档):
package com.example.rabbitmq.topics;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* Topic 模式 - 全量日志消费者
*
* 接收所有日志: #
*/
public class AllLogConsumer {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String QUEUE_NAME = "all_logs_queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定: # 匹配所有消息
String bindingKey = "#";
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, bindingKey);
System.out.println("📚 全量日志消费者已启动");
System.out.println("📋 绑定键: " + bindingKey);
System.out.println("⏳ 等待消息...\n");
int[] count = {0};
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
count[0]++;
System.out.println("📚 [" + count[0] + "] " + routingKey + " → " + message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
🔍 实战案例
案例:电商系统事件路由
电商系统中有多种事件需要路由到不同的服务:
路由键格式: <服务>.<事件类型>.<地区>
示例:
- order.created.cn → 中国区订单创建
- order.paid.us → 美国区订单支付
- user.registered.cn → 中国区用户注册
- product.updated.all → 所有地区商品更新
事件发布者:
package com.example.rabbitmq.topics;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*;
/**
* 电商事件发布者
*/
public class EcommerceEventPublisher {
private static final String EXCHANGE_NAME = "ecommerce.events";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
try (Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
// 模拟各种事件
Object[][] events = {
{"order.created.cn", "订单创建", Map.of("orderId", 1001, "amount", 299.0)},
{"order.paid.cn", "订单支付", Map.of("orderId", 1001, "payMethod", "alipay")},
{"user.registered.cn", "用户注册", Map.of("userId", 2001, "username", "zhangsan")},
{"order.created.us", "订单创建", Map.of("orderId", 1002, "amount", 99.0)},
{"product.updated.all", "商品更新", Map.of("productId", 3001, "name", "iPhone")},
{"order.shipped.cn", "订单发货", Map.of("orderId", 1001, "trackingNo", "SF123456")},
{"user.login.cn", "用户登录", Map.of("userId", 2001, "ip", "192.168.1.1")},
{"order.paid.us", "订单支付", Map.of("orderId", 1002, "payMethod", "paypal")},
{"inventory.low.cn", "库存预警", Map.of("productId", 3002, "stock", 5)},
{"order.completed.cn", "订单完成", Map.of("orderId", 1001)}
};
for (Object[] event : events) {
String routingKey = (String) event[0];
String eventType = (String) event[1];
Map<String, Object> data = (Map<String, Object>) event[2];
Map<String, Object> eventData = new HashMap<>();
eventData.put("type", eventType);
eventData.put("data", data);
eventData.put("timestamp", System.currentTimeMillis());
String eventJson = objectMapper.writeValueAsString(eventData);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, eventJson.getBytes("UTF-8"));
System.out.println("📤 [" + routingKey + "] " + eventType);
Thread.sleep(800);
}
}
}
}
中国区订单服务:
package com.example.rabbitmq.topics;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* 中国区订单服务
* 订阅: order.*.cn
*/
public class ChinaOrderService {
private static final String EXCHANGE_NAME = "ecommerce.events";
private static final String QUEUE_NAME = "china.order.service.queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 只处理中国区的订单事件
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "order.*.cn");
System.out.println("🇨🇳 中国区订单服务已启动");
System.out.println("📋 绑定键: order.*.cn\n");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("🇨🇳 [中国订单] " + routingKey);
System.out.println(" → " + message + "\n");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
全局用户服务:
package com.example.rabbitmq.topics;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* 全局用户服务
* 订阅: user.#
*/
public class GlobalUserService {
private static final String EXCHANGE_NAME = "ecommerce.events";
private static final String QUEUE_NAME = "global.user.service.queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 处理所有用户相关事件
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "user.#");
System.out.println("👤 全局用户服务已启动");
System.out.println("📋 绑定键: user.#\n");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("👤 [用户事件] " + routingKey);
System.out.println(" → " + message + "\n");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
数据分析服务(接收所有事件):
package com.example.rabbitmq.topics;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* 数据分析服务
* 订阅: # (所有事件)
*/
public class AnalyticsService {
private static final String EXCHANGE_NAME = "ecommerce.events";
private static final String QUEUE_NAME = "analytics.service.queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 接收所有事件用于分析
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#");
System.out.println("📊 数据分析服务已启动");
System.out.println("📋 绑定键: #\n");
int[] count = {0};
Map<String, Integer> stats = new java.util.concurrent.ConcurrentHashMap<>();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String routingKey = delivery.getEnvelope().getRoutingKey();
count[0]++;
String category = routingKey.split("\\.")[0];
stats.merge(category, 1, Integer::sum);
System.out.println("📊 [分析] 事件 #" + count[0] + ": " + routingKey);
System.out.println(" 当前统计: " + stats);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
📊 三种交换机对比
| 交换机类型 | 路由规则 | 通配符支持 | 使用场景 |
|---|---|---|---|
| Fanout | 广播 | ❌ | 日志广播、消息通知 |
| Direct | 精确匹配 | ❌ | 简单的消息分发 |
| Topic | 模糊匹配 | ✅ * # | 复杂的消息路由 |
Topic 特殊情况:
#绑定键 → 行为类似 Fanout(接收所有消息)- 不使用通配符 → 行为类似 Direct(精确匹配)
📝 本章小结
本章学习了主题模式:
| 知识点 | 说明 |
|---|---|
| Topic 交换机 | 支持通配符的消息路由 |
* 通配符 | 匹配一个单词 |
# 通配符 | 匹配零个或多个单词 |
| 路由键格式 | 由 . 分隔的单词列表 |
最佳实践:
- 设计有意义的路由键格式
- 从左到右按重要性排列
- 使用
#接收全量数据 - 使用
*进行单级匹配
下一步
掌握了五种消息模式后,让我们学习如何在 Spring Boot 中集成 RabbitMQ!
