Spring Boot 集成
本章将学习如何在 Spring Boot 项目中优雅地使用 RabbitMQ。
🎯 本章目标
- 搭建 Spring Boot + RabbitMQ 项目
- 使用 RabbitTemplate 发送消息
- 使用 @RabbitListener 接收消息
- 掌握消息序列化和配置
🛠️ 项目搭建
创建 Spring Boot 项目
使用 Spring Initializr 或手动创建项目:
rabbitmq-springboot-demo/
├── pom.xml
└── src/
└── main/
├── java/
│ └── com/
│ └── example/
│ └── rabbitmq/
│ ├── RabbitMQApplication.java
│ ├── config/
│ │ └── RabbitMQConfig.java
│ ├── producer/
│ │ └── MessageProducer.java
│ ├── consumer/
│ │ └── MessageConsumer.java
│ └── entity/
│ └── Order.java
└── resources/
└── application.yml
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>rabbitmq-springboot-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot AMQP (RabbitMQ) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Lombok(可选) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- JSON 序列化 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- 测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
rabbitmq:
# 连接配置
host: localhost
port: 5672
username: admin
password: admin123
virtual-host: /
# 连接超时
connection-timeout: 15000
# 发布确认(生产者确认)
publisher-confirm-type: correlated
publisher-returns: true
# 消费者配置
listener:
simple:
# 确认模式: auto/manual/none
acknowledge-mode: manual
# 每次获取消息数量
prefetch: 1
# 并发消费者数量
concurrency: 3
max-concurrency: 10
# 消费失败重试
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
max-interval: 10000
multiplier: 2.0
# 日志配置
logging:
level:
org.springframework.amqp: DEBUG
📦 配置类
RabbitMQ 配置
package com.example.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 配置类
*/
@Configuration
public class RabbitMQConfig {
// ============ 常量定义 ============
// 简单模式
public static final String SIMPLE_QUEUE = "simple.queue";
// 工作队列
public static final String WORK_QUEUE = "work.queue";
// 发布订阅
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE_1 = "fanout.queue.1";
public static final String FANOUT_QUEUE_2 = "fanout.queue.2";
// 路由模式
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE_INFO = "direct.queue.info";
public static final String DIRECT_QUEUE_ERROR = "direct.queue.error";
// 主题模式
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE_ORDER = "topic.queue.order";
public static final String TOPIC_QUEUE_ALL = "topic.queue.all";
// ============ 消息转换器 ============
/**
* 使用 JSON 序列化消息
*/
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 配置 RabbitTemplate
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
// 消息发送失败回调
rabbitTemplate.setReturnsCallback(returned -> {
System.err.println("消息发送失败:");
System.err.println(" 交换机: " + returned.getExchange());
System.err.println(" 路由键: " + returned.getRoutingKey());
System.err.println(" 原因: " + returned.getReplyText());
});
return rabbitTemplate;
}
// ============ 简单模式 ============
@Bean
public Queue simpleQueue() {
return QueueBuilder.durable(SIMPLE_QUEUE).build();
}
// ============ 工作队列 ============
@Bean
public Queue workQueue() {
return QueueBuilder.durable(WORK_QUEUE).build();
}
// ============ 发布订阅模式 ============
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Queue fanoutQueue1() {
return QueueBuilder.durable(FANOUT_QUEUE_1).build();
}
@Bean
public Queue fanoutQueue2() {
return QueueBuilder.durable(FANOUT_QUEUE_2).build();
}
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
// ============ 路由模式 ============
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean
public Queue directQueueInfo() {
return QueueBuilder.durable(DIRECT_QUEUE_INFO).build();
}
@Bean
public Queue directQueueError() {
return QueueBuilder.durable(DIRECT_QUEUE_ERROR).build();
}
@Bean
public Binding directBindingInfo() {
return BindingBuilder.bind(directQueueInfo())
.to(directExchange())
.with("info");
}
@Bean
public Binding directBindingError() {
return BindingBuilder.bind(directQueueError())
.to(directExchange())
.with("error");
}
// ============ 主题模式 ============
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Queue topicQueueOrder() {
return QueueBuilder.durable(TOPIC_QUEUE_ORDER).build();
}
@Bean
public Queue topicQueueAll() {
return QueueBuilder.durable(TOPIC_QUEUE_ALL).build();
}
@Bean
public Binding topicBindingOrder() {
return BindingBuilder.bind(topicQueueOrder())
.to(topicExchange())
.with("order.*"); // 匹配 order.xxx
}
@Bean
public Binding topicBindingAll() {
return BindingBuilder.bind(topicQueueAll())
.to(topicExchange())
.with("#"); // 匹配所有
}
}
📤 消息生产者
消息发送服务
package com.example.rabbitmq.producer;
import com.example.rabbitmq.config.RabbitMQConfig;
import com.example.rabbitmq.entity.Order;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.Map;
/**
* 消息生产者服务
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageProducer {
private final RabbitTemplate rabbitTemplate;
/**
* 简单模式 - 发送字符串消息
*/
public void sendSimpleMessage(String message) {
log.info("发送简单消息: {}", message);
rabbitTemplate.convertAndSend(RabbitMQConfig.SIMPLE_QUEUE, message);
}
/**
* 工作队列 - 发送任务
*/
public void sendWorkMessage(String task) {
log.info("发送工作任务: {}", task);
rabbitTemplate.convertAndSend(RabbitMQConfig.WORK_QUEUE, task);
}
/**
* 发布订阅 - 广播消息
*/
public void sendFanoutMessage(Map<String, Object> message) {
log.info("广播消息: {}", message);
rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE, "", message);
}
/**
* 路由模式 - 发送到指定路由
*/
public void sendDirectMessage(String routingKey, String message) {
log.info("发送路由消息 [{}]: {}", routingKey, message);
rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, routingKey, message);
}
/**
* 主题模式 - 发送主题消息
*/
public void sendTopicMessage(String routingKey, Object message) {
log.info("发送主题消息 [{}]: {}", routingKey, message);
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, routingKey, message);
}
/**
* 发送订单消息(对象序列化)
*/
public void sendOrder(Order order) {
log.info("发送订单消息: {}", order);
rabbitTemplate.convertAndSend(
RabbitMQConfig.TOPIC_EXCHANGE,
"order.created",
order
);
}
}
📥 消息消费者
消息接收服务
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.config.RabbitMQConfig;
import com.example.rabbitmq.entity.Order;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* 消息消费者
*/
@Slf4j
@Component
public class MessageConsumer {
/**
* 简单模式消费者
*/
@RabbitListener(queues = RabbitMQConfig.SIMPLE_QUEUE)
public void handleSimpleMessage(String message, Channel channel, Message amqpMessage)
throws IOException {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
try {
log.info("【简单模式】收到消息: {}", message);
// 处理业务逻辑
processMessage(message);
// 手动确认
channel.basicAck(deliveryTag, false);
log.info("【简单模式】消息确认成功");
} catch (Exception e) {
log.error("【简单模式】消息处理失败: {}", e.getMessage());
// 拒绝消息,不重新入队
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 工作队列消费者 1
*/
@RabbitListener(queues = RabbitMQConfig.WORK_QUEUE)
public void handleWorkMessage1(String task, Channel channel, Message message)
throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("【工作者1】处理任务: {}", task);
Thread.sleep(1000); // 模拟处理时间
channel.basicAck(deliveryTag, false);
log.info("【工作者1】任务完成");
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true); // 重新入队
}
}
/**
* 工作队列消费者 2
*/
@RabbitListener(queues = RabbitMQConfig.WORK_QUEUE)
public void handleWorkMessage2(String task, Channel channel, Message message)
throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("【工作者2】处理任务: {}", task);
Thread.sleep(2000); // 模拟较慢的处理
channel.basicAck(deliveryTag, false);
log.info("【工作者2】任务完成");
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);
}
}
/**
* 发布订阅消费者 1
*/
@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_1)
public void handleFanoutMessage1(Map<String, Object> message, Channel channel,
Message amqpMessage) throws IOException {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
try {
log.info("【订阅者1】收到广播: {}", message);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 发布订阅消费者 2
*/
@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_2)
public void handleFanoutMessage2(Map<String, Object> message, Channel channel,
Message amqpMessage) throws IOException {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
try {
log.info("【订阅者2】收到广播: {}", message);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 路由模式 - INFO 级别
*/
@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE_INFO)
public void handleDirectInfoMessage(String message, Channel channel,
Message amqpMessage) throws IOException {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
try {
log.info("【INFO日志】{}", message);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 路由模式 - ERROR 级别
*/
@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE_ERROR)
public void handleDirectErrorMessage(String message, Channel channel,
Message amqpMessage) throws IOException {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
try {
log.error("【ERROR日志】{}", message);
// 发送告警...
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 主题模式 - 订单相关消息
*/
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_ORDER)
public void handleOrderMessage(Order order, Channel channel,
Message amqpMessage) throws IOException {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
try {
log.info("【订单服务】收到订单: {}", order);
// 处理订单业务...
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("订单处理失败", e);
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 主题模式 - 所有消息(用于审计/日志)
*/
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_ALL)
public void handleAllMessage(Object message, Channel channel,
Message amqpMessage) throws IOException {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey();
try {
log.info("【审计日志】[{}] {}", routingKey, message);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
}
}
/**
* 模拟消息处理
*/
private void processMessage(String message) throws Exception {
if (message.contains("error")) {
throw new RuntimeException("处理失败");
}
Thread.sleep(500);
}
}
📋 实体类
Order 订单类
package com.example.rabbitmq.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 订单实体
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Order implements Serializable {
private static final long serialVersionUID = 1L;
/** 订单ID */
private Long orderId;
/** 用户ID */
private Long userId;
/** 商品名称 */
private String productName;
/** 商品数量 */
private Integer quantity;
/** 订单金额 */
private BigDecimal amount;
/** 订单状态 */
private String status;
/** 创建时间 */
private LocalDateTime createTime;
}
🌐 REST 接口
测试控制器
package com.example.rabbitmq.controller;
import com.example.rabbitmq.entity.Order;
import com.example.rabbitmq.producer.MessageProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
/**
* 消息测试接口
*/
@RestController
@RequestMapping("/api/mq")
@RequiredArgsConstructor
public class MessageController {
private final MessageProducer messageProducer;
/**
* 简单模式测试
* GET /api/mq/simple?message=hello
*/
@GetMapping("/simple")
public String sendSimple(@RequestParam String message) {
messageProducer.sendSimpleMessage(message);
return "简单消息已发送: " + message;
}
/**
* 工作队列测试
* GET /api/mq/work?count=10
*/
@GetMapping("/work")
public String sendWork(@RequestParam(defaultValue = "5") int count) {
for (int i = 1; i <= count; i++) {
messageProducer.sendWorkMessage("任务 #" + i);
}
return "已发送 " + count + " 个工作任务";
}
/**
* 发布订阅测试
* GET /api/mq/fanout
*/
@GetMapping("/fanout")
public String sendFanout() {
Map<String, Object> message = new HashMap<>();
message.put("type", "通知");
message.put("content", "系统即将维护");
message.put("time", LocalDateTime.now().toString());
messageProducer.sendFanoutMessage(message);
return "广播消息已发送";
}
/**
* 路由模式测试
* GET /api/mq/direct?level=info&message=操作成功
* GET /api/mq/direct?level=error&message=操作失败
*/
@GetMapping("/direct")
public String sendDirect(
@RequestParam(defaultValue = "info") String level,
@RequestParam String message) {
messageProducer.sendDirectMessage(level, message);
return "路由消息已发送 [" + level + "]: " + message;
}
/**
* 主题模式测试
* GET /api/mq/topic?routingKey=order.created&message=新订单
*/
@GetMapping("/topic")
public String sendTopic(
@RequestParam String routingKey,
@RequestParam String message) {
messageProducer.sendTopicMessage(routingKey, message);
return "主题消息已发送 [" + routingKey + "]: " + message;
}
/**
* 发送订单
* POST /api/mq/order
*/
@PostMapping("/order")
public String sendOrder(@RequestBody(required = false) Order order) {
if (order == null) {
order = Order.builder()
.orderId(System.currentTimeMillis())
.userId(1001L)
.productName("iPhone 15")
.quantity(1)
.amount(new BigDecimal("6999.00"))
.status("CREATED")
.createTime(LocalDateTime.now())
.build();
}
messageProducer.sendOrder(order);
return "订单消息已发送: " + order.getOrderId();
}
}
🚀 启动类
package com.example.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQApplication.class, args);
}
}
🧪 测试
启动应用
mvn spring-boot:run
测试接口
# 简单模式
curl "http://localhost:8080/api/mq/simple?message=Hello"
# 工作队列
curl "http://localhost:8080/api/mq/work?count=10"
# 发布订阅
curl "http://localhost:8080/api/mq/fanout"
# 路由模式
curl "http://localhost:8080/api/mq/direct?level=info&message=操作成功"
curl "http://localhost:8080/api/mq/direct?level=error&message=操作失败"
# 主题模式
curl "http://localhost:8080/api/mq/topic?routingKey=order.created&message=新订单"
# 发送订单
curl -X POST "http://localhost:8080/api/mq/order" \
-H "Content-Type: application/json"
📊 注解详解
@RabbitListener 注解
// 基本用法
@RabbitListener(queues = "queue.name")
public void handleMessage(String message) { }
// 多个队列
@RabbitListener(queues = {"queue1", "queue2"})
public void handleMessage(String message) { }
// 动态声明队列
@RabbitListener(queuesToDeclare = @Queue("dynamic.queue"))
public void handleMessage(String message) { }
// 完整声明(交换机 + 队列 + 绑定)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "my.queue", durable = "true"),
exchange = @Exchange(value = "my.exchange", type = "topic"),
key = "routing.key.*"
))
public void handleMessage(String message) { }
消息参数类型
// 自动反序列化
@RabbitListener(queues = "order.queue")
public void handleOrder(Order order) { }
// 获取原始消息
@RabbitListener(queues = "queue")
public void handleMessage(Message message) { }
// 获取通道(用于手动确认)
@RabbitListener(queues = "queue")
public void handleMessage(String msg, Channel channel, Message message) { }
// 获取消息头
@RabbitListener(queues = "queue")
public void handleMessage(
@Payload String msg,
@Header("x-custom-header") String header
) { }
📝 本章小结
本章学习了 Spring Boot 集成 RabbitMQ:
| 知识点 | 说明 |
|---|---|
| spring-boot-starter-amqp | Spring Boot RabbitMQ 依赖 |
| RabbitTemplate | 消息发送模板 |
| @RabbitListener | 消息监听注解 |
| Jackson2JsonMessageConverter | JSON 消息转换器 |
| 手动确认 | Channel.basicAck/basicNack |
最佳实践:
- ✅ 使用 JSON 序列化消息
- ✅ 启用手动确认模式
- ✅ 配置消息重试机制
- ✅ 使用配置类统一管理队列和交换机
恭喜!
你已经完成了 RabbitMQ 基础篇的学习!接下来可以进入 进阶篇 学习更高级的特性。
