发布/订阅模式

发布/订阅(Publish/Subscribe)模式可以将一条消息同时发送给多个消费者。

🎯 本章目标

  • 理解发布/订阅模式的工作原理
  • 掌握 Fanout 交换机的使用
  • 实现消息广播功能

📦 模式介绍

                              ┌─────────┐    ┌──────────┐
                         ┌───→│ Queue 1 │───→│Consumer 1│
┌──────────┐    ┌────────┐    └─────────┘    └──────────┘
│ Producer │───→│Exchange│
└──────────┘    │(fanout)│    ┌─────────┐    ┌──────────┐
                └────────┘───→│ Queue 2 │───→│Consumer 2│
                              └─────────┘    └──────────┘

与工作队列的区别:

模式消息分发适用场景
工作队列一条消息只被一个消费者处理任务分发
发布订阅一条消息被所有消费者处理消息广播

应用场景:

  • 日志系统:多个服务需要接收同样的日志
  • 消息通知:一个事件通知多个系统
  • 缓存更新:数据变更通知多个缓存节点
  • 实时推送:将消息推送给多个客户端

🔄 Fanout 交换机

Fanout 交换机会将收到的消息广播到所有绑定的队列,忽略路由键。

// 声明 Fanout 交换机
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);

// 发送消息(路由键被忽略,可以为空)
channel.basicPublish("logs", "", null, message.getBytes());

🛠️ 代码实现

日志发布者

package com.example.rabbitmq.pubsub;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * 发布/订阅模式 - 日志发布者
 * 
 * 使用 Fanout 交换机广播日志消息
 */
public class LogProducer {
    
    // 交换机名称
    private static final String EXCHANGE_NAME = "logs";
    
    public static void main(String[] args) throws Exception {
        try (Connection connection = RabbitMQUtil.getConnection();
             Channel channel = connection.createChannel()) {
            
            // 1. 声明 Fanout 类型的交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            System.out.println("✅ 交换机声明成功: " + EXCHANGE_NAME);
            
            // 2. 模拟发送日志
            String[] logLevels = {"INFO", "DEBUG", "WARN", "ERROR"};
            String[] messages = {
                "用户登录成功",
                "数据库连接建立",
                "内存使用率超过80%",
                "支付接口调用失败",
                "订单创建成功",
                "缓存更新完成"
            };
            
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            
            for (int i = 0; i < 10; i++) {
                // 随机选择日志级别和消息
                String level = logLevels[(int) (Math.random() * logLevels.length)];
                String msg = messages[(int) (Math.random() * messages.length)];
                String timestamp = LocalDateTime.now().format(formatter);
                
                String logMessage = String.format("[%s] [%s] %s", timestamp, level, msg);
                
                // 3. 发布消息到交换机
                // 注意:Fanout 交换机忽略路由键,这里传空字符串
                channel.basicPublish(EXCHANGE_NAME, "", null, logMessage.getBytes("UTF-8"));
                
                System.out.println("📤 发布日志: " + logMessage);
                
                Thread.sleep(1000);
            }
            
            System.out.println("\n✅ 所有日志发布完成!");
        }
    }
}

日志订阅者

package com.example.rabbitmq.pubsub;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * 发布/订阅模式 - 日志订阅者
 * 
 * 订阅并接收所有日志消息
 */
public class LogConsumer {
    
    private static final String EXCHANGE_NAME = "logs";
    
    public static void main(String[] args) throws Exception {
        // 订阅者名称
        String subscriberName = args.length > 0 ? args[0] : "Subscriber-" + System.currentTimeMillis() % 1000;
        
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        // 1. 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        
        // 2. 创建临时队列(随机名称,断开连接后自动删除)
        String queueName = channel.queueDeclare().getQueue();
        System.out.println("✅ 创建临时队列: " + queueName);
        
        // 3. 将队列绑定到交换机
        // Fanout 交换机不需要路由键
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println("✅ 队列已绑定到交换机: " + EXCHANGE_NAME);
        
        System.out.println("🔔 [" + subscriberName + "] 等待日志消息...\n");
        
        // 4. 定义消息处理回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("📨 [" + subscriberName + "] 收到日志: " + message);
            
            // 这里可以处理日志,例如:
            // - 写入文件
            // - 存入数据库
            // - 发送告警
            processLog(subscriberName, message);
        };
        
        // 5. 开始消费
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
    }
    
    /**
     * 处理日志(模拟)
     */
    private static void processLog(String subscriber, String log) {
        // 模拟不同订阅者的处理逻辑
        if (subscriber.contains("File")) {
            System.out.println("   → 写入日志文件");
        } else if (subscriber.contains("DB")) {
            System.out.println("   → 存入数据库");
        } else if (subscriber.contains("Alert")) {
            if (log.contains("ERROR")) {
                System.out.println("   → 🚨 发送告警通知!");
            }
        }
    }
}

临时队列说明

// 创建临时队列
String queueName = channel.queueDeclare().getQueue();

queueDeclare() 不带参数时会创建一个:

  • 随机名称的队列(如 amq.gen-JzTY20BRgKO-HjmUJj0wLg
  • 非持久化的队列
  • 排他的队列(仅当前连接可用)
  • 自动删除的队列(连接断开后自动删除)

这种队列适合发布/订阅场景,因为:

  • 每个消费者需要自己的队列
  • 消费者断开后不需要保留消息

🔍 实战案例

案例:系统事件广播

模拟一个系统事件广播场景:用户注册后,需要通知多个服务。

事件发布者:

package com.example.rabbitmq.pubsub;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.*;

/**
 * 用户注册事件发布者
 */
public class UserEventPublisher {
    
    private static final String EXCHANGE_NAME = "user.events";
    private static final ObjectMapper objectMapper = new ObjectMapper();
    
    public static void main(String[] args) throws Exception {
        try (Connection connection = RabbitMQUtil.getConnection();
             Channel channel = connection.createChannel()) {
            
            // 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
            
            // 模拟用户注册
            for (int i = 1; i <= 5; i++) {
                Map<String, Object> userEvent = new HashMap<>();
                userEvent.put("eventType", "USER_REGISTERED");
                userEvent.put("userId", 10000 + i);
                userEvent.put("username", "user" + i);
                userEvent.put("email", "user" + i + "@example.com");
                userEvent.put("phone", "1380000000" + i);
                userEvent.put("timestamp", System.currentTimeMillis());
                
                String eventJson = objectMapper.writeValueAsString(userEvent);
                
                channel.basicPublish(EXCHANGE_NAME, "", null, eventJson.getBytes("UTF-8"));
                
                System.out.println("📤 发布用户注册事件: " + eventJson);
                
                Thread.sleep(2000);
            }
        }
    }
}

邮件服务订阅者:

package com.example.rabbitmq.pubsub;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Map;

/**
 * 邮件服务 - 订阅用户注册事件,发送欢迎邮件
 */
public class EmailServiceSubscriber {
    
    private static final String EXCHANGE_NAME = "user.events";
    private static final String QUEUE_NAME = "email.service.queue";
    private static final ObjectMapper objectMapper = new ObjectMapper();
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        // 声明交换机和队列
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        
        System.out.println("📧 邮件服务已启动,等待用户注册事件...\n");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String eventJson = new String(delivery.getBody(), "UTF-8");
            
            try {
                Map<String, Object> event = objectMapper.readValue(eventJson, Map.class);
                
                String email = (String) event.get("email");
                String username = (String) event.get("username");
                
                // 模拟发送邮件
                System.out.println("📧 发送欢迎邮件到: " + email);
                System.out.println("   主题: 欢迎加入我们!");
                System.out.println("   内容: 尊敬的 " + username + ",感谢您的注册...\n");
                
                Thread.sleep(500);
                
            } catch (Exception e) {
                System.err.println("处理事件失败: " + e.getMessage());
            }
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

短信服务订阅者:

package com.example.rabbitmq.pubsub;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Map;

/**
 * 短信服务 - 订阅用户注册事件,发送验证短信
 */
public class SmsServiceSubscriber {
    
    private static final String EXCHANGE_NAME = "user.events";
    private static final String QUEUE_NAME = "sms.service.queue";
    private static final ObjectMapper objectMapper = new ObjectMapper();
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        // 声明交换机和队列
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        
        System.out.println("📱 短信服务已启动,等待用户注册事件...\n");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String eventJson = new String(delivery.getBody(), "UTF-8");
            
            try {
                Map<String, Object> event = objectMapper.readValue(eventJson, Map.class);
                
                String phone = (String) event.get("phone");
                String username = (String) event.get("username");
                
                // 模拟发送短信
                System.out.println("📱 发送短信到: " + phone);
                System.out.println("   内容: 尊敬的" + username + ",您已成功注册,验证码:" + generateCode() + "\n");
                
                Thread.sleep(300);
                
            } catch (Exception e) {
                System.err.println("处理事件失败: " + e.getMessage());
            }
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
    
    private static String generateCode() {
        return String.valueOf((int) (Math.random() * 900000) + 100000);
    }
}

积分服务订阅者:

package com.example.rabbitmq.pubsub;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Map;

/**
 * 积分服务 - 订阅用户注册事件,赠送积分
 */
public class PointServiceSubscriber {
    
    private static final String EXCHANGE_NAME = "user.events";
    private static final String QUEUE_NAME = "point.service.queue";
    private static final ObjectMapper objectMapper = new ObjectMapper();
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        // 声明交换机和队列
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        
        System.out.println("🎁 积分服务已启动,等待用户注册事件...\n");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String eventJson = new String(delivery.getBody(), "UTF-8");
            
            try {
                Map<String, Object> event = objectMapper.readValue(eventJson, Map.class);
                
                Integer userId = (Integer) event.get("userId");
                String username = (String) event.get("username");
                int bonusPoints = 100;
                
                // 模拟赠送积分
                System.out.println("🎁 为用户 " + username + "(ID:" + userId + ") 赠送 " + bonusPoints + " 积分");
                System.out.println("   备注: 新用户注册奖励\n");
                
                Thread.sleep(200);
                
            } catch (Exception e) {
                System.err.println("处理事件失败: " + e.getMessage());
            }
        };
        
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}

运行测试

  1. 依次启动三个订阅者服务
  2. 启动事件发布者
  3. 观察三个服务都能收到用户注册事件
用户注册 → 发布事件到 user.events 交换机
                    ↓
        ┌──────────────────────┐
        ↓          ↓           ↓
    邮件服务    短信服务     积分服务
   发送邮件    发送短信     赠送积分

📊 Fanout vs Direct vs Topic

交换机类型路由规则使用场景
Fanout广播到所有绑定队列日志广播、事件通知
Direct精确匹配路由键按条件分发
Topic模糊匹配路由键灵活的消息路由

📝 本章小结

本章学习了发布/订阅模式:

知识点说明
Fanout 交换机广播消息到所有绑定队列
临时队列自动命名、自动删除的队列
队列绑定将队列绑定到交换机
消息广播一条消息被多个消费者处理

适用场景:

  • ✅ 日志系统
  • ✅ 事件通知
  • ✅ 缓存同步
  • ✅ 消息广播

不适用场景:

  • ❌ 需要选择性接收消息
  • ❌ 需要按条件路由

下一步

如果需要选择性地接收消息,请学习 路由模式