Spring Boot 集成

本章将学习如何在 Spring Boot 项目中优雅地使用 RabbitMQ。

🎯 本章目标

  • 搭建 Spring Boot + RabbitMQ 项目
  • 使用 RabbitTemplate 发送消息
  • 使用 @RabbitListener 接收消息
  • 掌握消息序列化和配置

🛠️ 项目搭建

创建 Spring Boot 项目

使用 Spring Initializr 或手动创建项目:

rabbitmq-springboot-demo/
├── pom.xml
└── src/
    └── main/
        ├── java/
        │   └── com/
        │       └── example/
        │           └── rabbitmq/
        │               ├── RabbitMQApplication.java
        │               ├── config/
        │               │   └── RabbitMQConfig.java
        │               ├── producer/
        │               │   └── MessageProducer.java
        │               ├── consumer/
        │               │   └── MessageConsumer.java
        │               └── entity/
        │                   └── Order.java
        └── resources/
            └── application.yml

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.0</version>
        <relativePath/>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>rabbitmq-springboot-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>17</java.version>
    </properties>

    <dependencies>
        <!-- Spring Boot Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Spring Boot AMQP (RabbitMQ) -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- Lombok(可选) -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- JSON 序列化 -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <!-- 测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

application.yml

spring:
  rabbitmq:
    # 连接配置
    host: localhost
    port: 5672
    username: admin
    password: admin123
    virtual-host: /
    
    # 连接超时
    connection-timeout: 15000
    
    # 发布确认(生产者确认)
    publisher-confirm-type: correlated
    publisher-returns: true
    
    # 消费者配置
    listener:
      simple:
        # 确认模式: auto/manual/none
        acknowledge-mode: manual
        # 每次获取消息数量
        prefetch: 1
        # 并发消费者数量
        concurrency: 3
        max-concurrency: 10
        # 消费失败重试
        retry:
          enabled: true
          initial-interval: 1000
          max-attempts: 3
          max-interval: 10000
          multiplier: 2.0

# 日志配置
logging:
  level:
    org.springframework.amqp: DEBUG

📦 配置类

RabbitMQ 配置

package com.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置类
 */
@Configuration
public class RabbitMQConfig {
    
    // ============ 常量定义 ============
    
    // 简单模式
    public static final String SIMPLE_QUEUE = "simple.queue";
    
    // 工作队列
    public static final String WORK_QUEUE = "work.queue";
    
    // 发布订阅
    public static final String FANOUT_EXCHANGE = "fanout.exchange";
    public static final String FANOUT_QUEUE_1 = "fanout.queue.1";
    public static final String FANOUT_QUEUE_2 = "fanout.queue.2";
    
    // 路由模式
    public static final String DIRECT_EXCHANGE = "direct.exchange";
    public static final String DIRECT_QUEUE_INFO = "direct.queue.info";
    public static final String DIRECT_QUEUE_ERROR = "direct.queue.error";
    
    // 主题模式
    public static final String TOPIC_EXCHANGE = "topic.exchange";
    public static final String TOPIC_QUEUE_ORDER = "topic.queue.order";
    public static final String TOPIC_QUEUE_ALL = "topic.queue.all";
    
    // ============ 消息转换器 ============
    
    /**
     * 使用 JSON 序列化消息
     */
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    /**
     * 配置 RabbitTemplate
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        
        // 消息发送失败回调
        rabbitTemplate.setReturnsCallback(returned -> {
            System.err.println("消息发送失败:");
            System.err.println("  交换机: " + returned.getExchange());
            System.err.println("  路由键: " + returned.getRoutingKey());
            System.err.println("  原因: " + returned.getReplyText());
        });
        
        return rabbitTemplate;
    }
    
    // ============ 简单模式 ============
    
    @Bean
    public Queue simpleQueue() {
        return QueueBuilder.durable(SIMPLE_QUEUE).build();
    }
    
    // ============ 工作队列 ============
    
    @Bean
    public Queue workQueue() {
        return QueueBuilder.durable(WORK_QUEUE).build();
    }
    
    // ============ 发布订阅模式 ============
    
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }
    
    @Bean
    public Queue fanoutQueue1() {
        return QueueBuilder.durable(FANOUT_QUEUE_1).build();
    }
    
    @Bean
    public Queue fanoutQueue2() {
        return QueueBuilder.durable(FANOUT_QUEUE_2).build();
    }
    
    @Bean
    public Binding fanoutBinding1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
    
    @Bean
    public Binding fanoutBinding2() {
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }
    
    // ============ 路由模式 ============
    
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }
    
    @Bean
    public Queue directQueueInfo() {
        return QueueBuilder.durable(DIRECT_QUEUE_INFO).build();
    }
    
    @Bean
    public Queue directQueueError() {
        return QueueBuilder.durable(DIRECT_QUEUE_ERROR).build();
    }
    
    @Bean
    public Binding directBindingInfo() {
        return BindingBuilder.bind(directQueueInfo())
                .to(directExchange())
                .with("info");
    }
    
    @Bean
    public Binding directBindingError() {
        return BindingBuilder.bind(directQueueError())
                .to(directExchange())
                .with("error");
    }
    
    // ============ 主题模式 ============
    
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }
    
    @Bean
    public Queue topicQueueOrder() {
        return QueueBuilder.durable(TOPIC_QUEUE_ORDER).build();
    }
    
    @Bean
    public Queue topicQueueAll() {
        return QueueBuilder.durable(TOPIC_QUEUE_ALL).build();
    }
    
    @Bean
    public Binding topicBindingOrder() {
        return BindingBuilder.bind(topicQueueOrder())
                .to(topicExchange())
                .with("order.*");  // 匹配 order.xxx
    }
    
    @Bean
    public Binding topicBindingAll() {
        return BindingBuilder.bind(topicQueueAll())
                .to(topicExchange())
                .with("#");  // 匹配所有
    }
}

📤 消息生产者

消息发送服务

package com.example.rabbitmq.producer;

import com.example.rabbitmq.config.RabbitMQConfig;
import com.example.rabbitmq.entity.Order;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import java.util.Map;

/**
 * 消息生产者服务
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 简单模式 - 发送字符串消息
     */
    public void sendSimpleMessage(String message) {
        log.info("发送简单消息: {}", message);
        rabbitTemplate.convertAndSend(RabbitMQConfig.SIMPLE_QUEUE, message);
    }
    
    /**
     * 工作队列 - 发送任务
     */
    public void sendWorkMessage(String task) {
        log.info("发送工作任务: {}", task);
        rabbitTemplate.convertAndSend(RabbitMQConfig.WORK_QUEUE, task);
    }
    
    /**
     * 发布订阅 - 广播消息
     */
    public void sendFanoutMessage(Map<String, Object> message) {
        log.info("广播消息: {}", message);
        rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE, "", message);
    }
    
    /**
     * 路由模式 - 发送到指定路由
     */
    public void sendDirectMessage(String routingKey, String message) {
        log.info("发送路由消息 [{}]: {}", routingKey, message);
        rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, routingKey, message);
    }
    
    /**
     * 主题模式 - 发送主题消息
     */
    public void sendTopicMessage(String routingKey, Object message) {
        log.info("发送主题消息 [{}]: {}", routingKey, message);
        rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, routingKey, message);
    }
    
    /**
     * 发送订单消息(对象序列化)
     */
    public void sendOrder(Order order) {
        log.info("发送订单消息: {}", order);
        rabbitTemplate.convertAndSend(
            RabbitMQConfig.TOPIC_EXCHANGE, 
            "order.created", 
            order
        );
    }
}

📥 消息消费者

消息接收服务

package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.RabbitMQConfig;
import com.example.rabbitmq.entity.Order;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 * 消息消费者
 */
@Slf4j
@Component
public class MessageConsumer {
    
    /**
     * 简单模式消费者
     */
    @RabbitListener(queues = RabbitMQConfig.SIMPLE_QUEUE)
    public void handleSimpleMessage(String message, Channel channel, Message amqpMessage) 
            throws IOException {
        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
        
        try {
            log.info("【简单模式】收到消息: {}", message);
            
            // 处理业务逻辑
            processMessage(message);
            
            // 手动确认
            channel.basicAck(deliveryTag, false);
            log.info("【简单模式】消息确认成功");
            
        } catch (Exception e) {
            log.error("【简单模式】消息处理失败: {}", e.getMessage());
            // 拒绝消息,不重新入队
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    /**
     * 工作队列消费者 1
     */
    @RabbitListener(queues = RabbitMQConfig.WORK_QUEUE)
    public void handleWorkMessage1(String task, Channel channel, Message message) 
            throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        
        try {
            log.info("【工作者1】处理任务: {}", task);
            Thread.sleep(1000);  // 模拟处理时间
            
            channel.basicAck(deliveryTag, false);
            log.info("【工作者1】任务完成");
            
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, true);  // 重新入队
        }
    }
    
    /**
     * 工作队列消费者 2
     */
    @RabbitListener(queues = RabbitMQConfig.WORK_QUEUE)
    public void handleWorkMessage2(String task, Channel channel, Message message) 
            throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        
        try {
            log.info("【工作者2】处理任务: {}", task);
            Thread.sleep(2000);  // 模拟较慢的处理
            
            channel.basicAck(deliveryTag, false);
            log.info("【工作者2】任务完成");
            
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, true);
        }
    }
    
    /**
     * 发布订阅消费者 1
     */
    @RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_1)
    public void handleFanoutMessage1(Map<String, Object> message, Channel channel, 
            Message amqpMessage) throws IOException {
        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
        
        try {
            log.info("【订阅者1】收到广播: {}", message);
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    /**
     * 发布订阅消费者 2
     */
    @RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_2)
    public void handleFanoutMessage2(Map<String, Object> message, Channel channel, 
            Message amqpMessage) throws IOException {
        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
        
        try {
            log.info("【订阅者2】收到广播: {}", message);
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    /**
     * 路由模式 - INFO 级别
     */
    @RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE_INFO)
    public void handleDirectInfoMessage(String message, Channel channel, 
            Message amqpMessage) throws IOException {
        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
        
        try {
            log.info("【INFO日志】{}", message);
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    /**
     * 路由模式 - ERROR 级别
     */
    @RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE_ERROR)
    public void handleDirectErrorMessage(String message, Channel channel, 
            Message amqpMessage) throws IOException {
        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
        
        try {
            log.error("【ERROR日志】{}", message);
            // 发送告警...
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    /**
     * 主题模式 - 订单相关消息
     */
    @RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_ORDER)
    public void handleOrderMessage(Order order, Channel channel, 
            Message amqpMessage) throws IOException {
        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
        
        try {
            log.info("【订单服务】收到订单: {}", order);
            // 处理订单业务...
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("订单处理失败", e);
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    /**
     * 主题模式 - 所有消息(用于审计/日志)
     */
    @RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_ALL)
    public void handleAllMessage(Object message, Channel channel, 
            Message amqpMessage) throws IOException {
        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
        String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey();
        
        try {
            log.info("【审计日志】[{}] {}", routingKey, message);
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    /**
     * 模拟消息处理
     */
    private void processMessage(String message) throws Exception {
        if (message.contains("error")) {
            throw new RuntimeException("处理失败");
        }
        Thread.sleep(500);
    }
}

📋 实体类

Order 订单类

package com.example.rabbitmq.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
 * 订单实体
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Order implements Serializable {
    
    private static final long serialVersionUID = 1L;
    
    /** 订单ID */
    private Long orderId;
    
    /** 用户ID */
    private Long userId;
    
    /** 商品名称 */
    private String productName;
    
    /** 商品数量 */
    private Integer quantity;
    
    /** 订单金额 */
    private BigDecimal amount;
    
    /** 订单状态 */
    private String status;
    
    /** 创建时间 */
    private LocalDateTime createTime;
}

🌐 REST 接口

测试控制器

package com.example.rabbitmq.controller;

import com.example.rabbitmq.entity.Order;
import com.example.rabbitmq.producer.MessageProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;

/**
 * 消息测试接口
 */
@RestController
@RequestMapping("/api/mq")
@RequiredArgsConstructor
public class MessageController {
    
    private final MessageProducer messageProducer;
    
    /**
     * 简单模式测试
     * GET /api/mq/simple?message=hello
     */
    @GetMapping("/simple")
    public String sendSimple(@RequestParam String message) {
        messageProducer.sendSimpleMessage(message);
        return "简单消息已发送: " + message;
    }
    
    /**
     * 工作队列测试
     * GET /api/mq/work?count=10
     */
    @GetMapping("/work")
    public String sendWork(@RequestParam(defaultValue = "5") int count) {
        for (int i = 1; i <= count; i++) {
            messageProducer.sendWorkMessage("任务 #" + i);
        }
        return "已发送 " + count + " 个工作任务";
    }
    
    /**
     * 发布订阅测试
     * GET /api/mq/fanout
     */
    @GetMapping("/fanout")
    public String sendFanout() {
        Map<String, Object> message = new HashMap<>();
        message.put("type", "通知");
        message.put("content", "系统即将维护");
        message.put("time", LocalDateTime.now().toString());
        
        messageProducer.sendFanoutMessage(message);
        return "广播消息已发送";
    }
    
    /**
     * 路由模式测试
     * GET /api/mq/direct?level=info&message=操作成功
     * GET /api/mq/direct?level=error&message=操作失败
     */
    @GetMapping("/direct")
    public String sendDirect(
            @RequestParam(defaultValue = "info") String level,
            @RequestParam String message) {
        messageProducer.sendDirectMessage(level, message);
        return "路由消息已发送 [" + level + "]: " + message;
    }
    
    /**
     * 主题模式测试
     * GET /api/mq/topic?routingKey=order.created&message=新订单
     */
    @GetMapping("/topic")
    public String sendTopic(
            @RequestParam String routingKey,
            @RequestParam String message) {
        messageProducer.sendTopicMessage(routingKey, message);
        return "主题消息已发送 [" + routingKey + "]: " + message;
    }
    
    /**
     * 发送订单
     * POST /api/mq/order
     */
    @PostMapping("/order")
    public String sendOrder(@RequestBody(required = false) Order order) {
        if (order == null) {
            order = Order.builder()
                    .orderId(System.currentTimeMillis())
                    .userId(1001L)
                    .productName("iPhone 15")
                    .quantity(1)
                    .amount(new BigDecimal("6999.00"))
                    .status("CREATED")
                    .createTime(LocalDateTime.now())
                    .build();
        }
        
        messageProducer.sendOrder(order);
        return "订单消息已发送: " + order.getOrderId();
    }
}

🚀 启动类

package com.example.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQApplication.class, args);
    }
}

🧪 测试

启动应用

mvn spring-boot:run

测试接口

# 简单模式
curl "http://localhost:8080/api/mq/simple?message=Hello"

# 工作队列
curl "http://localhost:8080/api/mq/work?count=10"

# 发布订阅
curl "http://localhost:8080/api/mq/fanout"

# 路由模式
curl "http://localhost:8080/api/mq/direct?level=info&message=操作成功"
curl "http://localhost:8080/api/mq/direct?level=error&message=操作失败"

# 主题模式
curl "http://localhost:8080/api/mq/topic?routingKey=order.created&message=新订单"

# 发送订单
curl -X POST "http://localhost:8080/api/mq/order" \
  -H "Content-Type: application/json"

📊 注解详解

@RabbitListener 注解

// 基本用法
@RabbitListener(queues = "queue.name")
public void handleMessage(String message) { }

// 多个队列
@RabbitListener(queues = {"queue1", "queue2"})
public void handleMessage(String message) { }

// 动态声明队列
@RabbitListener(queuesToDeclare = @Queue("dynamic.queue"))
public void handleMessage(String message) { }

// 完整声明(交换机 + 队列 + 绑定)
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "my.queue", durable = "true"),
    exchange = @Exchange(value = "my.exchange", type = "topic"),
    key = "routing.key.*"
))
public void handleMessage(String message) { }

消息参数类型

// 自动反序列化
@RabbitListener(queues = "order.queue")
public void handleOrder(Order order) { }

// 获取原始消息
@RabbitListener(queues = "queue")
public void handleMessage(Message message) { }

// 获取通道(用于手动确认)
@RabbitListener(queues = "queue")
public void handleMessage(String msg, Channel channel, Message message) { }

// 获取消息头
@RabbitListener(queues = "queue")
public void handleMessage(
    @Payload String msg,
    @Header("x-custom-header") String header
) { }

📝 本章小结

本章学习了 Spring Boot 集成 RabbitMQ:

知识点说明
spring-boot-starter-amqpSpring Boot RabbitMQ 依赖
RabbitTemplate消息发送模板
@RabbitListener消息监听注解
Jackson2JsonMessageConverterJSON 消息转换器
手动确认Channel.basicAck/basicNack

最佳实践:

  1. ✅ 使用 JSON 序列化消息
  2. ✅ 启用手动确认模式
  3. ✅ 配置消息重试机制
  4. ✅ 使用配置类统一管理队列和交换机

恭喜!

你已经完成了 RabbitMQ 基础篇的学习!接下来可以进入 进阶篇 学习更高级的特性。