Headers 交换机
Headers 交换机是 RabbitMQ 四种交换机类型之一,它根据消息头属性进行路由,而不是路由键。
🎯 本章目标
- 理解 Headers 交换机的工作原理
- 掌握 Headers 交换机的配置方法
- 了解适用场景
📖 什么是 Headers 交换机
四种交换机类型回顾
| 类型 | 路由依据 | 匹配方式 |
|---|---|---|
| Direct | Routing Key | 精确匹配 |
| Fanout | 无 | 广播所有 |
| Topic | Routing Key | 模式匹配(通配符) |
| Headers | 消息头属性 | 属性匹配 |
Headers 交换机特点
消息属性(Headers):
{
"format": "pdf",
"type": "report",
"x-match": "all"
}
│
▼
┌─────────────────┐
│ Headers Exchange│
└────────┬────────┘
│
┌────┴────┬─────────────┐
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│Queue A│ │Queue B│ │Queue C│
│format │ │format │ │ type │
│= pdf │ │= xlsx │ │=report│
└───────┘ └───────┘ └───────┘
x-match 参数
| 值 | 说明 |
|---|---|
| all | 所有属性都必须匹配(AND) |
| any | 任意一个属性匹配即可(OR) |
💻 原生 Java 实现
Maven 依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
生产者
package com.example.rabbitmq.headers;
import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;
public class HeadersProducer {
private static final String EXCHANGE_NAME = "headers.exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin123");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明 Headers 交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true);
// 发送 PDF 报告
sendMessage(channel, "PDF 年度报告", Map.of(
"format", "pdf",
"type", "report"
));
// 发送 Excel 报告
sendMessage(channel, "Excel 月度报告", Map.of(
"format", "xlsx",
"type", "report"
));
// 发送 PDF 通知
sendMessage(channel, "PDF 系统通知", Map.of(
"format", "pdf",
"type", "notification"
));
// 发送 Word 文档
sendMessage(channel, "Word 会议纪要", Map.of(
"format", "docx",
"type", "document"
));
System.out.println("所有消息发送完成!");
}
}
private static void sendMessage(Channel channel, String message, Map<String, Object> headers)
throws Exception {
// 构建消息属性(包含 headers)
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(headers)
.deliveryMode(2) // 持久化
.build();
// 发送消息(routing key 可以为空或任意值,Headers 交换机会忽略它)
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
System.out.println("📤 发送消息: " + message);
System.out.println(" Headers: " + headers);
}
}
消费者
package com.example.rabbitmq.headers;
import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;
public class HeadersConsumer {
private static final String EXCHANGE_NAME = "headers.exchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true);
// 创建队列1:匹配所有 PDF 文件(x-match = all)
String queue1 = "headers.pdf.queue";
channel.queueDeclare(queue1, true, false, false, null);
Map<String, Object> bindArgs1 = new HashMap<>();
bindArgs1.put("x-match", "all"); // 所有条件都要匹配
bindArgs1.put("format", "pdf");
channel.queueBind(queue1, EXCHANGE_NAME, "", bindArgs1);
// 创建队列2:匹配所有报告(任意格式)
String queue2 = "headers.report.queue";
channel.queueDeclare(queue2, true, false, false, null);
Map<String, Object> bindArgs2 = new HashMap<>();
bindArgs2.put("x-match", "all");
bindArgs2.put("type", "report");
channel.queueBind(queue2, EXCHANGE_NAME, "", bindArgs2);
// 创建队列3:匹配 PDF 或报告(x-match = any)
String queue3 = "headers.any.queue";
channel.queueDeclare(queue3, true, false, false, null);
Map<String, Object> bindArgs3 = new HashMap<>();
bindArgs3.put("x-match", "any"); // 任意条件匹配
bindArgs3.put("format", "pdf");
bindArgs3.put("type", "report");
channel.queueBind(queue3, EXCHANGE_NAME, "", bindArgs3);
System.out.println("等待接收消息...\n");
// 消费队列1(PDF 文件)
consumeQueue(channel, queue1, "PDF 处理器");
// 消费队列2(报告文件)
consumeQueue(channel, queue2, "报告处理器");
// 消费队列3(PDF 或报告)
consumeQueue(channel, queue3, "综合处理器");
}
private static void consumeQueue(Channel channel, String queue, String consumerName)
throws Exception {
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
Map<String, Object> headers = delivery.getProperties().getHeaders();
System.out.println("【" + consumerName + "】收到消息: " + message);
System.out.println(" Headers: " + headers);
System.out.println();
};
channel.basicConsume(queue, true, deliverCallback, consumerTag -> {});
}
}
🌱 Spring Boot 实现
配置类
package com.example.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class HeadersExchangeConfig {
public static final String HEADERS_EXCHANGE = "headers.exchange";
public static final String PDF_QUEUE = "headers.pdf.queue";
public static final String REPORT_QUEUE = "headers.report.queue";
public static final String ANY_QUEUE = "headers.any.queue";
/**
* 声明 Headers 交换机
*/
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange(HEADERS_EXCHANGE);
}
/**
* PDF 文件队列
*/
@Bean
public Queue pdfQueue() {
return new Queue(PDF_QUEUE, true);
}
/**
* 报告队列
*/
@Bean
public Queue reportQueue() {
return new Queue(REPORT_QUEUE, true);
}
/**
* 综合队列(匹配 PDF 或报告)
*/
@Bean
public Queue anyQueue() {
return new Queue(ANY_QUEUE, true);
}
/**
* 绑定 PDF 队列(x-match = all)
*/
@Bean
public Binding pdfBinding() {
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf");
return BindingBuilder.bind(pdfQueue())
.to(headersExchange())
.whereAll(headers) // x-match = all
.match();
}
/**
* 绑定报告队列(x-match = all)
*/
@Bean
public Binding reportBinding() {
Map<String, Object> headers = new HashMap<>();
headers.put("type", "report");
return BindingBuilder.bind(reportQueue())
.to(headersExchange())
.whereAll(headers)
.match();
}
/**
* 绑定综合队列(x-match = any)
*/
@Bean
public Binding anyBinding() {
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf");
headers.put("type", "report");
return BindingBuilder.bind(anyQueue())
.to(headersExchange())
.whereAny(headers) // x-match = any
.match();
}
}
生产者服务
package com.example.rabbitmq.producer;
import com.example.rabbitmq.config.HeadersExchangeConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.Map;
@Slf4j
@Service
@RequiredArgsConstructor
public class HeadersProducerService {
private final RabbitTemplate rabbitTemplate;
/**
* 发送带 Headers 的消息
*/
public void sendMessage(String content, Map<String, Object> headers) {
MessageProperties props = new MessageProperties();
headers.forEach(props::setHeader);
Message message = MessageBuilder
.withBody(content.getBytes())
.andProperties(props)
.build();
rabbitTemplate.send(
HeadersExchangeConfig.HEADERS_EXCHANGE,
"", // Headers 交换机忽略 routing key
message
);
log.info("📤 发送消息: {}", content);
log.info(" Headers: {}", headers);
}
/**
* 发送 PDF 报告
*/
public void sendPdfReport(String content) {
sendMessage(content, Map.of(
"format", "pdf",
"type", "report"
));
}
/**
* 发送 Excel 报告
*/
public void sendExcelReport(String content) {
sendMessage(content, Map.of(
"format", "xlsx",
"type", "report"
));
}
/**
* 发送 PDF 通知
*/
public void sendPdfNotification(String content) {
sendMessage(content, Map.of(
"format", "pdf",
"type", "notification"
));
}
}
消费者服务
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.config.HeadersExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class HeadersConsumerService {
/**
* 处理 PDF 文件
*/
@RabbitListener(queues = HeadersExchangeConfig.PDF_QUEUE)
public void handlePdf(Message message) {
String content = new String(message.getBody());
log.info("📄 [PDF 处理器] 收到: {}", content);
log.info(" Headers: {}", message.getMessageProperties().getHeaders());
}
/**
* 处理报告文件
*/
@RabbitListener(queues = HeadersExchangeConfig.REPORT_QUEUE)
public void handleReport(Message message) {
String content = new String(message.getBody());
log.info("📊 [报告处理器] 收到: {}", content);
log.info(" Headers: {}", message.getMessageProperties().getHeaders());
}
/**
* 综合处理(PDF 或报告)
*/
@RabbitListener(queues = HeadersExchangeConfig.ANY_QUEUE)
public void handleAny(Message message) {
String content = new String(message.getBody());
log.info("📁 [综合处理器] 收到: {}", content);
log.info(" Headers: {}", message.getMessageProperties().getHeaders());
}
}
📊 匹配示例
假设我们发送以下消息:
| 消息 | format | type | 结果 |
|---|---|---|---|
| PDF 年度报告 | report | 3 个队列都收到 | |
| Excel 月度报告 | xlsx | report | 报告队列 + 综合队列 |
| PDF 通知 | notification | PDF 队列 + 综合队列 | |
| Word 文档 | docx | document | 无队列接收 |
消息1: {format: pdf, type: report}
├── PDF 队列: ✅ (format=pdf 匹配)
├── 报告队列: ✅ (type=report 匹配)
└── 综合队列: ✅ (format=pdf 或 type=report 匹配)
消息2: {format: xlsx, type: report}
├── PDF 队列: ❌ (format≠pdf)
├── 报告队列: ✅ (type=report 匹配)
└── 综合队列: ✅ (type=report 匹配)
消息3: {format: pdf, type: notification}
├── PDF 队列: ✅ (format=pdf 匹配)
├── 报告队列: ❌ (type≠report)
└── 综合队列: ✅ (format=pdf 匹配)
消息4: {format: docx, type: document}
├── PDF 队列: ❌
├── 报告队列: ❌
└── 综合队列: ❌
🎯 适用场景
| 场景 | 适合 | 说明 |
|---|---|---|
| 多条件路由 | ✅ | 需要根据多个属性组合路由 |
| 属性值匹配 | ✅ | 精确匹配属性值 |
| 复杂路由规则 | ✅ | AND/OR 逻辑组合 |
| 简单路由 | ❌ | 使用 Direct 更简单高效 |
| 模式匹配 | ❌ | 使用 Topic 更合适 |
实际应用场景
文件处理系统
- 根据文件格式(pdf/xlsx/docx)和类型(报告/通知)分发
日志分发
- 根据日志级别(error/warn/info)和模块(order/payment)路由
多租户系统
- 根据租户ID和业务类型路由消息
⚠️ 注意事项
1. 性能考虑
// Headers 交换机性能略低于 Direct 和 Fanout
// 因为需要遍历 headers 进行匹配
// 如果只需要单属性匹配,建议使用 Direct 交换机
// routing_key = "pdf" 比 headers = {"format": "pdf"} 更高效
2. 属性命名
// x- 开头的属性是 RabbitMQ 保留的
Map<String, Object> headers = new HashMap<>();
headers.put("x-match", "all"); // ✅ 这是特殊属性
headers.put("format", "pdf"); // ✅ 业务属性
headers.put("x-custom", "xxx"); // ⚠️ 避免使用 x- 前缀
3. 属性值类型
// Headers 支持多种值类型
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf"); // String
headers.put("priority", 1); // Integer
headers.put("urgent", true); // Boolean
headers.put("timestamp", new Date()); // Date
📝 本章小结
| 要点 | 说明 |
|---|---|
| 路由依据 | 消息头属性(Headers) |
| x-match=all | 所有属性必须匹配 |
| x-match=any | 任一属性匹配即可 |
| Routing Key | 被忽略,可为空 |
| 性能 | 略低于 Direct/Fanout |
| 适用场景 | 多条件复杂路由 |
下一步
基础篇学习完成!接下来进入 进阶篇 学习更多高级特性!
