Headers 交换机

Headers 交换机是 RabbitMQ 四种交换机类型之一,它根据消息头属性进行路由,而不是路由键。

🎯 本章目标

  • 理解 Headers 交换机的工作原理
  • 掌握 Headers 交换机的配置方法
  • 了解适用场景

📖 什么是 Headers 交换机

四种交换机类型回顾

类型路由依据匹配方式
DirectRouting Key精确匹配
Fanout广播所有
TopicRouting Key模式匹配(通配符)
Headers消息头属性属性匹配

Headers 交换机特点

消息属性(Headers):
{
  "format": "pdf",
  "type": "report",
  "x-match": "all"
}
        │
        ▼
┌─────────────────┐
│ Headers Exchange│
└────────┬────────┘
         │
    ┌────┴────┬─────────────┐
    ▼         ▼             ▼
┌───────┐ ┌───────┐    ┌───────┐
│Queue A│ │Queue B│    │Queue C│
│format │ │format │    │  type │
│= pdf  │ │= xlsx │    │=report│
└───────┘ └───────┘    └───────┘

x-match 参数

说明
all所有属性都必须匹配(AND)
any任意一个属性匹配即可(OR)

💻 原生 Java 实现

Maven 依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

生产者

package com.example.rabbitmq.headers;

import com.rabbitmq.client.*;

import java.util.HashMap;
import java.util.Map;

public class HeadersProducer {
    
    private static final String EXCHANGE_NAME = "headers.exchange";
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin123");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // 声明 Headers 交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true);
            
            // 发送 PDF 报告
            sendMessage(channel, "PDF 年度报告", Map.of(
                "format", "pdf",
                "type", "report"
            ));
            
            // 发送 Excel 报告
            sendMessage(channel, "Excel 月度报告", Map.of(
                "format", "xlsx",
                "type", "report"
            ));
            
            // 发送 PDF 通知
            sendMessage(channel, "PDF 系统通知", Map.of(
                "format", "pdf",
                "type", "notification"
            ));
            
            // 发送 Word 文档
            sendMessage(channel, "Word 会议纪要", Map.of(
                "format", "docx",
                "type", "document"
            ));
            
            System.out.println("所有消息发送完成!");
        }
    }
    
    private static void sendMessage(Channel channel, String message, Map<String, Object> headers) 
            throws Exception {
        
        // 构建消息属性(包含 headers)
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .headers(headers)
                .deliveryMode(2) // 持久化
                .build();
        
        // 发送消息(routing key 可以为空或任意值,Headers 交换机会忽略它)
        channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
        
        System.out.println("📤 发送消息: " + message);
        System.out.println("   Headers: " + headers);
    }
}

消费者

package com.example.rabbitmq.headers;

import com.rabbitmq.client.*;

import java.util.HashMap;
import java.util.Map;

public class HeadersConsumer {
    
    private static final String EXCHANGE_NAME = "headers.exchange";
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin123");
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true);
        
        // 创建队列1:匹配所有 PDF 文件(x-match = all)
        String queue1 = "headers.pdf.queue";
        channel.queueDeclare(queue1, true, false, false, null);
        
        Map<String, Object> bindArgs1 = new HashMap<>();
        bindArgs1.put("x-match", "all");  // 所有条件都要匹配
        bindArgs1.put("format", "pdf");
        channel.queueBind(queue1, EXCHANGE_NAME, "", bindArgs1);
        
        // 创建队列2:匹配所有报告(任意格式)
        String queue2 = "headers.report.queue";
        channel.queueDeclare(queue2, true, false, false, null);
        
        Map<String, Object> bindArgs2 = new HashMap<>();
        bindArgs2.put("x-match", "all");
        bindArgs2.put("type", "report");
        channel.queueBind(queue2, EXCHANGE_NAME, "", bindArgs2);
        
        // 创建队列3:匹配 PDF 或报告(x-match = any)
        String queue3 = "headers.any.queue";
        channel.queueDeclare(queue3, true, false, false, null);
        
        Map<String, Object> bindArgs3 = new HashMap<>();
        bindArgs3.put("x-match", "any");  // 任意条件匹配
        bindArgs3.put("format", "pdf");
        bindArgs3.put("type", "report");
        channel.queueBind(queue3, EXCHANGE_NAME, "", bindArgs3);
        
        System.out.println("等待接收消息...\n");
        
        // 消费队列1(PDF 文件)
        consumeQueue(channel, queue1, "PDF 处理器");
        
        // 消费队列2(报告文件)
        consumeQueue(channel, queue2, "报告处理器");
        
        // 消费队列3(PDF 或报告)
        consumeQueue(channel, queue3, "综合处理器");
    }
    
    private static void consumeQueue(Channel channel, String queue, String consumerName) 
            throws Exception {
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            Map<String, Object> headers = delivery.getProperties().getHeaders();
            
            System.out.println("【" + consumerName + "】收到消息: " + message);
            System.out.println("  Headers: " + headers);
            System.out.println();
        };
        
        channel.basicConsume(queue, true, deliverCallback, consumerTag -> {});
    }
}

🌱 Spring Boot 实现

配置类

package com.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class HeadersExchangeConfig {
    
    public static final String HEADERS_EXCHANGE = "headers.exchange";
    public static final String PDF_QUEUE = "headers.pdf.queue";
    public static final String REPORT_QUEUE = "headers.report.queue";
    public static final String ANY_QUEUE = "headers.any.queue";
    
    /**
     * 声明 Headers 交换机
     */
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange(HEADERS_EXCHANGE);
    }
    
    /**
     * PDF 文件队列
     */
    @Bean
    public Queue pdfQueue() {
        return new Queue(PDF_QUEUE, true);
    }
    
    /**
     * 报告队列
     */
    @Bean
    public Queue reportQueue() {
        return new Queue(REPORT_QUEUE, true);
    }
    
    /**
     * 综合队列(匹配 PDF 或报告)
     */
    @Bean
    public Queue anyQueue() {
        return new Queue(ANY_QUEUE, true);
    }
    
    /**
     * 绑定 PDF 队列(x-match = all)
     */
    @Bean
    public Binding pdfBinding() {
        Map<String, Object> headers = new HashMap<>();
        headers.put("format", "pdf");
        
        return BindingBuilder.bind(pdfQueue())
                .to(headersExchange())
                .whereAll(headers)  // x-match = all
                .match();
    }
    
    /**
     * 绑定报告队列(x-match = all)
     */
    @Bean
    public Binding reportBinding() {
        Map<String, Object> headers = new HashMap<>();
        headers.put("type", "report");
        
        return BindingBuilder.bind(reportQueue())
                .to(headersExchange())
                .whereAll(headers)
                .match();
    }
    
    /**
     * 绑定综合队列(x-match = any)
     */
    @Bean
    public Binding anyBinding() {
        Map<String, Object> headers = new HashMap<>();
        headers.put("format", "pdf");
        headers.put("type", "report");
        
        return BindingBuilder.bind(anyQueue())
                .to(headersExchange())
                .whereAny(headers)  // x-match = any
                .match();
    }
}

生产者服务

package com.example.rabbitmq.producer;

import com.example.rabbitmq.config.HeadersExchangeConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import java.util.Map;

@Slf4j
@Service
@RequiredArgsConstructor
public class HeadersProducerService {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 发送带 Headers 的消息
     */
    public void sendMessage(String content, Map<String, Object> headers) {
        MessageProperties props = new MessageProperties();
        headers.forEach(props::setHeader);
        
        Message message = MessageBuilder
                .withBody(content.getBytes())
                .andProperties(props)
                .build();
        
        rabbitTemplate.send(
                HeadersExchangeConfig.HEADERS_EXCHANGE,
                "",  // Headers 交换机忽略 routing key
                message
        );
        
        log.info("📤 发送消息: {}", content);
        log.info("   Headers: {}", headers);
    }
    
    /**
     * 发送 PDF 报告
     */
    public void sendPdfReport(String content) {
        sendMessage(content, Map.of(
                "format", "pdf",
                "type", "report"
        ));
    }
    
    /**
     * 发送 Excel 报告
     */
    public void sendExcelReport(String content) {
        sendMessage(content, Map.of(
                "format", "xlsx",
                "type", "report"
        ));
    }
    
    /**
     * 发送 PDF 通知
     */
    public void sendPdfNotification(String content) {
        sendMessage(content, Map.of(
                "format", "pdf",
                "type", "notification"
        ));
    }
}

消费者服务

package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.HeadersExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class HeadersConsumerService {
    
    /**
     * 处理 PDF 文件
     */
    @RabbitListener(queues = HeadersExchangeConfig.PDF_QUEUE)
    public void handlePdf(Message message) {
        String content = new String(message.getBody());
        log.info("📄 [PDF 处理器] 收到: {}", content);
        log.info("   Headers: {}", message.getMessageProperties().getHeaders());
    }
    
    /**
     * 处理报告文件
     */
    @RabbitListener(queues = HeadersExchangeConfig.REPORT_QUEUE)
    public void handleReport(Message message) {
        String content = new String(message.getBody());
        log.info("📊 [报告处理器] 收到: {}", content);
        log.info("   Headers: {}", message.getMessageProperties().getHeaders());
    }
    
    /**
     * 综合处理(PDF 或报告)
     */
    @RabbitListener(queues = HeadersExchangeConfig.ANY_QUEUE)
    public void handleAny(Message message) {
        String content = new String(message.getBody());
        log.info("📁 [综合处理器] 收到: {}", content);
        log.info("   Headers: {}", message.getMessageProperties().getHeaders());
    }
}

📊 匹配示例

假设我们发送以下消息:

消息formattype结果
PDF 年度报告pdfreport3 个队列都收到
Excel 月度报告xlsxreport报告队列 + 综合队列
PDF 通知pdfnotificationPDF 队列 + 综合队列
Word 文档docxdocument无队列接收
消息1: {format: pdf, type: report}
├── PDF 队列: ✅ (format=pdf 匹配)
├── 报告队列: ✅ (type=report 匹配)
└── 综合队列: ✅ (format=pdf 或 type=report 匹配)

消息2: {format: xlsx, type: report}
├── PDF 队列: ❌ (format≠pdf)
├── 报告队列: ✅ (type=report 匹配)
└── 综合队列: ✅ (type=report 匹配)

消息3: {format: pdf, type: notification}
├── PDF 队列: ✅ (format=pdf 匹配)
├── 报告队列: ❌ (type≠report)
└── 综合队列: ✅ (format=pdf 匹配)

消息4: {format: docx, type: document}
├── PDF 队列: ❌
├── 报告队列: ❌
└── 综合队列: ❌

🎯 适用场景

场景适合说明
多条件路由需要根据多个属性组合路由
属性值匹配精确匹配属性值
复杂路由规则AND/OR 逻辑组合
简单路由使用 Direct 更简单高效
模式匹配使用 Topic 更合适

实际应用场景

  1. 文件处理系统

    • 根据文件格式(pdf/xlsx/docx)和类型(报告/通知)分发
  2. 日志分发

    • 根据日志级别(error/warn/info)和模块(order/payment)路由
  3. 多租户系统

    • 根据租户ID和业务类型路由消息

⚠️ 注意事项

1. 性能考虑

// Headers 交换机性能略低于 Direct 和 Fanout
// 因为需要遍历 headers 进行匹配

// 如果只需要单属性匹配,建议使用 Direct 交换机
// routing_key = "pdf" 比 headers = {"format": "pdf"} 更高效

2. 属性命名

// x- 开头的属性是 RabbitMQ 保留的
Map<String, Object> headers = new HashMap<>();
headers.put("x-match", "all");  // ✅ 这是特殊属性
headers.put("format", "pdf");   // ✅ 业务属性
headers.put("x-custom", "xxx"); // ⚠️ 避免使用 x- 前缀

3. 属性值类型

// Headers 支持多种值类型
Map<String, Object> headers = new HashMap<>();
headers.put("format", "pdf");           // String
headers.put("priority", 1);             // Integer
headers.put("urgent", true);            // Boolean
headers.put("timestamp", new Date());   // Date

📝 本章小结

要点说明
路由依据消息头属性(Headers)
x-match=all所有属性必须匹配
x-match=any任一属性匹配即可
Routing Key被忽略,可为空
性能略低于 Direct/Fanout
适用场景多条件复杂路由

下一步

基础篇学习完成!接下来进入 进阶篇 学习更多高级特性!