主题模式

主题(Topics)模式是最灵活的消息路由方式,支持使用通配符进行模糊匹配。

🎯 本章目标

  • 理解 Topic 交换机的工作原理
  • 掌握通配符的使用规则
  • 实现灵活的消息路由

📦 模式介绍

                              ┌─────────┐    ┌──────────┐
                         ┌───→│ Queue 1 │───→│Consumer 1│
                         │    └─────────┘    └──────────┘
┌──────────┐    ┌────────┤    Binding: *.orange.*
│ Producer │───→│Exchange│
└──────────┘    │(topic) │    ┌─────────┐    ┌──────────┐
                └────────┤───→│ Queue 2 │───→│Consumer 2│
                         │    └─────────┘    └──────────┘
                         │    Bindings: *.*.rabbit, lazy.#

Topic 交换机是 Direct 交换机的增强版,支持通配符匹配:

通配符说明示例
*匹配一个单词*.orange.* 匹配 quick.orange.rabbit
#匹配零个或多个单词lazy.# 匹配 lazy.a.b.c

🔤 路由键规则

路由键必须是由 . 分隔的单词列表:

stock.usd.nyse     ✅ 有效
quick.orange.rabbit ✅ 有效
a.b.c.d.e.f        ✅ 有效
stockusdnyse       ✅ 有效(一个单词)
stock_usd_nyse     ✅ 有效(下划线是单词的一部分)

通配符匹配示例:

绑定键路由键是否匹配
*.orange.*quick.orange.rabbit
*.orange.*lazy.orange.elephant
*.orange.*quick.orange.male.rabbit
lazy.#lazy.orange.elephant
lazy.#lazy.a.b.c.d.e.f
lazy.#lazy
#任意路由键
*单个单词

🛠️ 代码实现

消息生产者

package com.example.rabbitmq.topics;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * Topic 模式 - 消息生产者
 * 
 * 发送带有不同路由键的消息
 */
public class TopicProducer {
    
    private static final String EXCHANGE_NAME = "topic_logs";
    
    public static void main(String[] args) throws Exception {
        try (Connection connection = RabbitMQUtil.getConnection();
             Channel channel = connection.createChannel()) {
            
            // 1. 声明 Topic 类型的交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
            System.out.println("✅ 交换机声明成功: " + EXCHANGE_NAME + " (topic)\n");
            
            // 2. 发送不同路由键的消息
            // 路由键格式: <facility>.<severity>.<source>
            String[][] messages = {
                {"kern.critical.server1", "内核崩溃"},
                {"kern.warning.server1", "内核警告"},
                {"auth.info.server1", "用户登录成功"},
                {"auth.error.server1", "登录密码错误"},
                {"cron.info.server2", "定时任务执行"},
                {"cron.warning.server2", "定时任务超时"},
                {"mail.info.server2", "邮件发送成功"},
                {"mail.error.server3", "邮件发送失败"},
                {"kern.info.server3", "内核启动完成"},
                {"auth.critical.server3", "暴力破解检测"}
            };
            
            for (String[] msg : messages) {
                String routingKey = msg[0];
                String message = msg[1];
                
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                
                System.out.println("📤 发送消息: [" + routingKey + "] " + message);
                
                Thread.sleep(500);
            }
            
            System.out.println("\n✅ 所有消息发送完成!");
        }
    }
}

内核日志消费者

只接收内核相关的日志:

package com.example.rabbitmq.topics;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * Topic 模式 - 内核日志消费者
 * 
 * 接收所有内核日志: kern.*.*
 */
public class KernelLogConsumer {
    
    private static final String EXCHANGE_NAME = "topic_logs";
    private static final String QUEUE_NAME = "kernel_logs_queue";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 绑定: kern.*.* 匹配所有内核日志
        String bindingKey = "kern.*.*";
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, bindingKey);
        
        System.out.println("🖥️ 内核日志消费者已启动");
        System.out.println("📋 绑定键: " + bindingKey);
        System.out.println("⏳ 等待消息...\n");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            
            System.out.println("🖥️ [KERNEL] " + routingKey + " → " + message);
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

严重错误消费者

接收所有 critical 级别的日志:

package com.example.rabbitmq.topics;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * Topic 模式 - 严重错误消费者
 * 
 * 接收所有严重错误: *.critical.*
 */
public class CriticalLogConsumer {
    
    private static final String EXCHANGE_NAME = "topic_logs";
    private static final String QUEUE_NAME = "critical_logs_queue";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 绑定: *.critical.* 匹配所有严重错误
        String bindingKey = "*.critical.*";
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, bindingKey);
        
        System.out.println("🚨 严重错误消费者已启动");
        System.out.println("📋 绑定键: " + bindingKey);
        System.out.println("⏳ 等待消息...\n");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            
            System.out.println("🚨 [CRITICAL] " + routingKey + " → " + message);
            System.out.println("   → 立即发送告警!");
            System.out.println("   → 通知运维人员\n");
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

Server1 日志消费者

接收来自 server1 的所有日志:

package com.example.rabbitmq.topics;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * Topic 模式 - Server1 日志消费者
 * 
 * 接收 server1 的所有日志: #.server1
 */
public class Server1LogConsumer {
    
    private static final String EXCHANGE_NAME = "topic_logs";
    private static final String QUEUE_NAME = "server1_logs_queue";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 绑定: #.server1 匹配所有来自 server1 的日志
        String bindingKey = "#.server1";
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, bindingKey);
        
        System.out.println("🖥️ Server1 日志消费者已启动");
        System.out.println("📋 绑定键: " + bindingKey);
        System.out.println("⏳ 等待消息...\n");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            
            System.out.println("🖥️ [SERVER1] " + routingKey + " → " + message);
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

全量日志消费者

接收所有日志(用于归档):

package com.example.rabbitmq.topics;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * Topic 模式 - 全量日志消费者
 * 
 * 接收所有日志: #
 */
public class AllLogConsumer {
    
    private static final String EXCHANGE_NAME = "topic_logs";
    private static final String QUEUE_NAME = "all_logs_queue";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 绑定: # 匹配所有消息
        String bindingKey = "#";
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, bindingKey);
        
        System.out.println("📚 全量日志消费者已启动");
        System.out.println("📋 绑定键: " + bindingKey);
        System.out.println("⏳ 等待消息...\n");
        
        int[] count = {0};
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            
            count[0]++;
            System.out.println("📚 [" + count[0] + "] " + routingKey + " → " + message);
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

🔍 实战案例

案例:电商系统事件路由

电商系统中有多种事件需要路由到不同的服务:

路由键格式: <服务>.<事件类型>.<地区>

示例:
- order.created.cn      → 中国区订单创建
- order.paid.us         → 美国区订单支付
- user.registered.cn    → 中国区用户注册
- product.updated.all   → 所有地区商品更新

事件发布者:

package com.example.rabbitmq.topics;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.*;

/**
 * 电商事件发布者
 */
public class EcommerceEventPublisher {
    
    private static final String EXCHANGE_NAME = "ecommerce.events";
    private static final ObjectMapper objectMapper = new ObjectMapper();
    
    public static void main(String[] args) throws Exception {
        try (Connection connection = RabbitMQUtil.getConnection();
             Channel channel = connection.createChannel()) {
            
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
            
            // 模拟各种事件
            Object[][] events = {
                {"order.created.cn", "订单创建", Map.of("orderId", 1001, "amount", 299.0)},
                {"order.paid.cn", "订单支付", Map.of("orderId", 1001, "payMethod", "alipay")},
                {"user.registered.cn", "用户注册", Map.of("userId", 2001, "username", "zhangsan")},
                {"order.created.us", "订单创建", Map.of("orderId", 1002, "amount", 99.0)},
                {"product.updated.all", "商品更新", Map.of("productId", 3001, "name", "iPhone")},
                {"order.shipped.cn", "订单发货", Map.of("orderId", 1001, "trackingNo", "SF123456")},
                {"user.login.cn", "用户登录", Map.of("userId", 2001, "ip", "192.168.1.1")},
                {"order.paid.us", "订单支付", Map.of("orderId", 1002, "payMethod", "paypal")},
                {"inventory.low.cn", "库存预警", Map.of("productId", 3002, "stock", 5)},
                {"order.completed.cn", "订单完成", Map.of("orderId", 1001)}
            };
            
            for (Object[] event : events) {
                String routingKey = (String) event[0];
                String eventType = (String) event[1];
                Map<String, Object> data = (Map<String, Object>) event[2];
                
                Map<String, Object> eventData = new HashMap<>();
                eventData.put("type", eventType);
                eventData.put("data", data);
                eventData.put("timestamp", System.currentTimeMillis());
                
                String eventJson = objectMapper.writeValueAsString(eventData);
                
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, eventJson.getBytes("UTF-8"));
                
                System.out.println("📤 [" + routingKey + "] " + eventType);
                
                Thread.sleep(800);
            }
        }
    }
}

中国区订单服务:

package com.example.rabbitmq.topics;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * 中国区订单服务
 * 订阅: order.*.cn
 */
public class ChinaOrderService {
    
    private static final String EXCHANGE_NAME = "ecommerce.events";
    private static final String QUEUE_NAME = "china.order.service.queue";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 只处理中国区的订单事件
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "order.*.cn");
        
        System.out.println("🇨🇳 中国区订单服务已启动");
        System.out.println("📋 绑定键: order.*.cn\n");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            
            System.out.println("🇨🇳 [中国订单] " + routingKey);
            System.out.println("   → " + message + "\n");
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

全局用户服务:

package com.example.rabbitmq.topics;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * 全局用户服务
 * 订阅: user.#
 */
public class GlobalUserService {
    
    private static final String EXCHANGE_NAME = "ecommerce.events";
    private static final String QUEUE_NAME = "global.user.service.queue";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 处理所有用户相关事件
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "user.#");
        
        System.out.println("👤 全局用户服务已启动");
        System.out.println("📋 绑定键: user.#\n");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            
            System.out.println("👤 [用户事件] " + routingKey);
            System.out.println("   → " + message + "\n");
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

数据分析服务(接收所有事件):

package com.example.rabbitmq.topics;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * 数据分析服务
 * 订阅: # (所有事件)
 */
public class AnalyticsService {
    
    private static final String EXCHANGE_NAME = "ecommerce.events";
    private static final String QUEUE_NAME = "analytics.service.queue";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 接收所有事件用于分析
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "#");
        
        System.out.println("📊 数据分析服务已启动");
        System.out.println("📋 绑定键: #\n");
        
        int[] count = {0};
        Map<String, Integer> stats = new java.util.concurrent.ConcurrentHashMap<>();
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String routingKey = delivery.getEnvelope().getRoutingKey();
            
            count[0]++;
            String category = routingKey.split("\\.")[0];
            stats.merge(category, 1, Integer::sum);
            
            System.out.println("📊 [分析] 事件 #" + count[0] + ": " + routingKey);
            System.out.println("   当前统计: " + stats);
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

📊 三种交换机对比

交换机类型路由规则通配符支持使用场景
Fanout广播日志广播、消息通知
Direct精确匹配简单的消息分发
Topic模糊匹配* #复杂的消息路由

Topic 特殊情况:

  • # 绑定键 → 行为类似 Fanout(接收所有消息)
  • 不使用通配符 → 行为类似 Direct(精确匹配)

📝 本章小结

本章学习了主题模式:

知识点说明
Topic 交换机支持通配符的消息路由
* 通配符匹配一个单词
# 通配符匹配零个或多个单词
路由键格式. 分隔的单词列表

最佳实践:

  1. 设计有意义的路由键格式
  2. 从左到右按重要性排列
  3. 使用 # 接收全量数据
  4. 使用 * 进行单级匹配

下一步

掌握了五种消息模式后,让我们学习如何在 Spring Boot 中集成 RabbitMQ