发布/订阅模式
发布/订阅(Publish/Subscribe)模式可以将一条消息同时发送给多个消费者。
🎯 本章目标
- 理解发布/订阅模式的工作原理
- 掌握 Fanout 交换机的使用
- 实现消息广播功能
📦 模式介绍
┌─────────┐ ┌──────────┐
┌───→│ Queue 1 │───→│Consumer 1│
┌──────────┐ ┌────────┐ └─────────┘ └──────────┘
│ Producer │───→│Exchange│
└──────────┘ │(fanout)│ ┌─────────┐ ┌──────────┐
└────────┘───→│ Queue 2 │───→│Consumer 2│
└─────────┘ └──────────┘
与工作队列的区别:
| 模式 | 消息分发 | 适用场景 |
|---|---|---|
| 工作队列 | 一条消息只被一个消费者处理 | 任务分发 |
| 发布订阅 | 一条消息被所有消费者处理 | 消息广播 |
应用场景:
- 日志系统:多个服务需要接收同样的日志
- 消息通知:一个事件通知多个系统
- 缓存更新:数据变更通知多个缓存节点
- 实时推送:将消息推送给多个客户端
🔄 Fanout 交换机
Fanout 交换机会将收到的消息广播到所有绑定的队列,忽略路由键。
// 声明 Fanout 交换机
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
// 发送消息(路由键被忽略,可以为空)
channel.basicPublish("logs", "", null, message.getBytes());
🛠️ 代码实现
日志发布者
package com.example.rabbitmq.pubsub;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* 发布/订阅模式 - 日志发布者
*
* 使用 Fanout 交换机广播日志消息
*/
public class LogProducer {
// 交换机名称
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
try (Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel()) {
// 1. 声明 Fanout 类型的交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
System.out.println("✅ 交换机声明成功: " + EXCHANGE_NAME);
// 2. 模拟发送日志
String[] logLevels = {"INFO", "DEBUG", "WARN", "ERROR"};
String[] messages = {
"用户登录成功",
"数据库连接建立",
"内存使用率超过80%",
"支付接口调用失败",
"订单创建成功",
"缓存更新完成"
};
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
for (int i = 0; i < 10; i++) {
// 随机选择日志级别和消息
String level = logLevels[(int) (Math.random() * logLevels.length)];
String msg = messages[(int) (Math.random() * messages.length)];
String timestamp = LocalDateTime.now().format(formatter);
String logMessage = String.format("[%s] [%s] %s", timestamp, level, msg);
// 3. 发布消息到交换机
// 注意:Fanout 交换机忽略路由键,这里传空字符串
channel.basicPublish(EXCHANGE_NAME, "", null, logMessage.getBytes("UTF-8"));
System.out.println("📤 发布日志: " + logMessage);
Thread.sleep(1000);
}
System.out.println("\n✅ 所有日志发布完成!");
}
}
}
日志订阅者
package com.example.rabbitmq.pubsub;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* 发布/订阅模式 - 日志订阅者
*
* 订阅并接收所有日志消息
*/
public class LogConsumer {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
// 订阅者名称
String subscriberName = args.length > 0 ? args[0] : "Subscriber-" + System.currentTimeMillis() % 1000;
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 1. 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 2. 创建临时队列(随机名称,断开连接后自动删除)
String queueName = channel.queueDeclare().getQueue();
System.out.println("✅ 创建临时队列: " + queueName);
// 3. 将队列绑定到交换机
// Fanout 交换机不需要路由键
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("✅ 队列已绑定到交换机: " + EXCHANGE_NAME);
System.out.println("🔔 [" + subscriberName + "] 等待日志消息...\n");
// 4. 定义消息处理回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("📨 [" + subscriberName + "] 收到日志: " + message);
// 这里可以处理日志,例如:
// - 写入文件
// - 存入数据库
// - 发送告警
processLog(subscriberName, message);
};
// 5. 开始消费
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
/**
* 处理日志(模拟)
*/
private static void processLog(String subscriber, String log) {
// 模拟不同订阅者的处理逻辑
if (subscriber.contains("File")) {
System.out.println(" → 写入日志文件");
} else if (subscriber.contains("DB")) {
System.out.println(" → 存入数据库");
} else if (subscriber.contains("Alert")) {
if (log.contains("ERROR")) {
System.out.println(" → 🚨 发送告警通知!");
}
}
}
}
临时队列说明
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
queueDeclare() 不带参数时会创建一个:
- 随机名称的队列(如
amq.gen-JzTY20BRgKO-HjmUJj0wLg) - 非持久化的队列
- 排他的队列(仅当前连接可用)
- 自动删除的队列(连接断开后自动删除)
这种队列适合发布/订阅场景,因为:
- 每个消费者需要自己的队列
- 消费者断开后不需要保留消息
🔍 实战案例
案例:系统事件广播
模拟一个系统事件广播场景:用户注册后,需要通知多个服务。
事件发布者:
package com.example.rabbitmq.pubsub;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*;
/**
* 用户注册事件发布者
*/
public class UserEventPublisher {
private static final String EXCHANGE_NAME = "user.events";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
try (Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel()) {
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
// 模拟用户注册
for (int i = 1; i <= 5; i++) {
Map<String, Object> userEvent = new HashMap<>();
userEvent.put("eventType", "USER_REGISTERED");
userEvent.put("userId", 10000 + i);
userEvent.put("username", "user" + i);
userEvent.put("email", "user" + i + "@example.com");
userEvent.put("phone", "1380000000" + i);
userEvent.put("timestamp", System.currentTimeMillis());
String eventJson = objectMapper.writeValueAsString(userEvent);
channel.basicPublish(EXCHANGE_NAME, "", null, eventJson.getBytes("UTF-8"));
System.out.println("📤 发布用户注册事件: " + eventJson);
Thread.sleep(2000);
}
}
}
}
邮件服务订阅者:
package com.example.rabbitmq.pubsub;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
/**
* 邮件服务 - 订阅用户注册事件,发送欢迎邮件
*/
public class EmailServiceSubscriber {
private static final String EXCHANGE_NAME = "user.events";
private static final String QUEUE_NAME = "email.service.queue";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 声明交换机和队列
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println("📧 邮件服务已启动,等待用户注册事件...\n");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String eventJson = new String(delivery.getBody(), "UTF-8");
try {
Map<String, Object> event = objectMapper.readValue(eventJson, Map.class);
String email = (String) event.get("email");
String username = (String) event.get("username");
// 模拟发送邮件
System.out.println("📧 发送欢迎邮件到: " + email);
System.out.println(" 主题: 欢迎加入我们!");
System.out.println(" 内容: 尊敬的 " + username + ",感谢您的注册...\n");
Thread.sleep(500);
} catch (Exception e) {
System.err.println("处理事件失败: " + e.getMessage());
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
短信服务订阅者:
package com.example.rabbitmq.pubsub;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
/**
* 短信服务 - 订阅用户注册事件,发送验证短信
*/
public class SmsServiceSubscriber {
private static final String EXCHANGE_NAME = "user.events";
private static final String QUEUE_NAME = "sms.service.queue";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 声明交换机和队列
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println("📱 短信服务已启动,等待用户注册事件...\n");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String eventJson = new String(delivery.getBody(), "UTF-8");
try {
Map<String, Object> event = objectMapper.readValue(eventJson, Map.class);
String phone = (String) event.get("phone");
String username = (String) event.get("username");
// 模拟发送短信
System.out.println("📱 发送短信到: " + phone);
System.out.println(" 内容: 尊敬的" + username + ",您已成功注册,验证码:" + generateCode() + "\n");
Thread.sleep(300);
} catch (Exception e) {
System.err.println("处理事件失败: " + e.getMessage());
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
private static String generateCode() {
return String.valueOf((int) (Math.random() * 900000) + 100000);
}
}
积分服务订阅者:
package com.example.rabbitmq.pubsub;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
/**
* 积分服务 - 订阅用户注册事件,赠送积分
*/
public class PointServiceSubscriber {
private static final String EXCHANGE_NAME = "user.events";
private static final String QUEUE_NAME = "point.service.queue";
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 声明交换机和队列
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println("🎁 积分服务已启动,等待用户注册事件...\n");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String eventJson = new String(delivery.getBody(), "UTF-8");
try {
Map<String, Object> event = objectMapper.readValue(eventJson, Map.class);
Integer userId = (Integer) event.get("userId");
String username = (String) event.get("username");
int bonusPoints = 100;
// 模拟赠送积分
System.out.println("🎁 为用户 " + username + "(ID:" + userId + ") 赠送 " + bonusPoints + " 积分");
System.out.println(" 备注: 新用户注册奖励\n");
Thread.sleep(200);
} catch (Exception e) {
System.err.println("处理事件失败: " + e.getMessage());
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}
运行测试
- 依次启动三个订阅者服务
- 启动事件发布者
- 观察三个服务都能收到用户注册事件
用户注册 → 发布事件到 user.events 交换机
↓
┌──────────────────────┐
↓ ↓ ↓
邮件服务 短信服务 积分服务
发送邮件 发送短信 赠送积分
📊 Fanout vs Direct vs Topic
| 交换机类型 | 路由规则 | 使用场景 |
|---|---|---|
| Fanout | 广播到所有绑定队列 | 日志广播、事件通知 |
| Direct | 精确匹配路由键 | 按条件分发 |
| Topic | 模糊匹配路由键 | 灵活的消息路由 |
📝 本章小结
本章学习了发布/订阅模式:
| 知识点 | 说明 |
|---|---|
| Fanout 交换机 | 广播消息到所有绑定队列 |
| 临时队列 | 自动命名、自动删除的队列 |
| 队列绑定 | 将队列绑定到交换机 |
| 消息广播 | 一条消息被多个消费者处理 |
适用场景:
- ✅ 日志系统
- ✅ 事件通知
- ✅ 缓存同步
- ✅ 消息广播
不适用场景:
- ❌ 需要选择性接收消息
- ❌ 需要按条件路由
下一步
如果需要选择性地接收消息,请学习 路由模式!
