核心概念
在开始编写代码之前,我们需要先了解 RabbitMQ 的核心概念。这些概念是学习 RabbitMQ 的基础。
🏗️ 整体架构
┌─────────────────────────────────────────────────────────────────────┐
│ RabbitMQ Server │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ Virtual Host │ │
│ │ ┌──────────┐ ┌──────────────┐ ┌──────────────────────┐ │ │
│ │ │ Producer │───→│ Exchange │───→│ Queue │ │ │
│ │ │ 生产者 │ │ 交换机 │ │ 队列 │ │ │
│ │ └──────────┘ └──────────────┘ └──────────────────────┘ │ │
│ │ │ │ │ │
│ │ Binding(绑定) │ │ │
│ │ Routing Key ↓ │ │
│ │ ┌──────────┐ │ │
│ │ │ Consumer │ │ │
│ │ │ 消费者 │ │ │
│ │ └──────────┘ │ │
│ └───────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
📦 核心组件详解
1. Producer(生产者)
生产者是消息的发送方,负责创建消息并发送到 RabbitMQ。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin123");
// 创建连接和通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列
String queueName = "hello";
channel.queueDeclare(queueName, false, false, false, null);
// 发送消息
String message = "Hello RabbitMQ!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息已发送: " + message);
}
}
}
2. Consumer(消费者)
消费者是消息的接收方,负责从队列中获取消息并处理。
import com.rabbitmq.client.*;
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "hello";
channel.queueDeclare(queueName, false, false, false, null);
System.out.println("等待接收消息...");
// 定义消息处理回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("收到消息: " + message);
};
// 开始消费消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
3. Connection(连接)
连接是生产者/消费者与 RabbitMQ 服务器之间的 TCP 连接。
ConnectionFactory factory = new ConnectionFactory();
// 基本配置
factory.setHost("localhost"); // 服务器地址
factory.setPort(5672); // 端口号
factory.setUsername("admin"); // 用户名
factory.setPassword("admin123"); // 密码
factory.setVirtualHost("/"); // 虚拟主机
// 连接池配置
factory.setConnectionTimeout(30000); // 连接超时时间
factory.setRequestedHeartbeat(60); // 心跳检测间隔
// 创建连接
Connection connection = factory.newConnection();
注意
创建连接是比较耗费资源的操作,应该尽量复用连接,避免频繁创建和关闭。
4. Channel(通道)
通道是在连接内部创建的虚拟连接,大多数 API 操作都是通过通道完成的。
// 一个连接可以创建多个通道
Connection connection = factory.newConnection();
Channel channel1 = connection.createChannel(); // 通道1
Channel channel2 = connection.createChannel(); // 通道2
Channel channel3 = connection.createChannel(); // 通道3
// 通道是线程不安全的,每个线程应该使用自己的通道
为什么使用通道?
- 创建和销毁 TCP 连接开销大
- 一个连接可以创建多个通道
- 通道之间相互隔离,互不影响
- 通道轻量级,创建和销毁开销小
5. Queue(队列)
队列是消息的容器,消息存储在队列中等待消费者处理。
// 声明队列
channel.queueDeclare(
"myQueue", // 队列名称
true, // 是否持久化
false, // 是否排他(仅创建者可用)
false, // 是否自动删除(无消费者时删除)
null // 其他参数
);
队列参数详解:
| 参数 | 类型 | 说明 |
|---|---|---|
| queue | String | 队列名称,必须唯一 |
| durable | boolean | 持久化,服务器重启后队列仍存在 |
| exclusive | boolean | 排他队列,仅创建它的连接可用 |
| autoDelete | boolean | 自动删除,没有消费者时自动删除 |
| arguments | Map | 额外参数,如消息TTL、队列长度等 |
常用队列参数:
Map<String, Object> arguments = new HashMap<>();
// 消息过期时间(毫秒)
arguments.put("x-message-ttl", 60000);
// 队列最大长度
arguments.put("x-max-length", 1000);
// 队列最大字节数
arguments.put("x-max-length-bytes", 1048576);
// 死信交换机
arguments.put("x-dead-letter-exchange", "dlx.exchange");
// 死信路由键
arguments.put("x-dead-letter-routing-key", "dlx.routing.key");
channel.queueDeclare("myQueue", true, false, false, arguments);
6. Exchange(交换机)
交换机接收生产者发送的消息,并根据路由规则将消息路由到对应的队列。
// 声明交换机
channel.exchangeDeclare(
"myExchange", // 交换机名称
"direct", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
null // 其他参数
);
交换机类型:
| 类型 | 说明 | 路由规则 |
|---|---|---|
| direct | 直连交换机 | 完全匹配 Routing Key |
| fanout | 扇出交换机 | 忽略 Routing Key,广播到所有绑定队列 |
| topic | 主题交换机 | 模糊匹配 Routing Key(支持通配符) |
| headers | 头部交换机 | 根据消息头属性匹配(较少使用) |
7. Binding(绑定)
绑定是交换机和队列之间的关联关系。
// 绑定队列到交换机
channel.queueBind(
"myQueue", // 队列名称
"myExchange", // 交换机名称
"routing.key" // 路由键
);
8. Routing Key(路由键)
路由键是消息的路由规则,交换机根据路由键决定将消息发送到哪个队列。
// 发送消息时指定路由键
channel.basicPublish(
"myExchange", // 交换机名称
"order.created", // 路由键
null, // 消息属性
message.getBytes() // 消息内容
);
9. Virtual Host(虚拟主机)
虚拟主机是 RabbitMQ 的逻辑隔离单元,类似于数据库的 Schema。
// 连接到指定虚拟主机
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/myapp"); // 指定虚拟主机
虚拟主机的作用:
- 隔离不同应用的资源
- 权限控制
- 资源配额管理
# 创建虚拟主机
rabbitmqctl add_vhost /myapp
# 设置用户权限
rabbitmqctl set_permissions -p /myapp admin ".*" ".*" ".*"
# 列出虚拟主机
rabbitmqctl list_vhosts
🔄 消息流转过程
1. 生产者创建消息
↓
2. 生产者连接到 RabbitMQ(Connection + Channel)
↓
3. 生产者将消息发送到交换机(Exchange)
↓
4. 交换机根据路由键(Routing Key)和绑定规则
将消息路由到对应的队列(Queue)
↓
5. 消息在队列中等待
↓
6. 消费者连接到 RabbitMQ(Connection + Channel)
↓
7. 消费者从队列中获取消息
↓
8. 消费者处理消息并确认(ACK)
↓
9. RabbitMQ 从队列中删除已确认的消息
📋 完整示例
Maven 依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
连接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtil {
private static ConnectionFactory factory;
static {
factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin123");
factory.setVirtualHost("/");
}
/**
* 获取连接
*/
public static Connection getConnection() throws Exception {
return factory.newConnection();
}
}
生产者示例
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class ProducerDemo {
private static final String EXCHANGE_NAME = "demo.exchange";
private static final String QUEUE_NAME = "demo.queue";
private static final String ROUTING_KEY = "demo.routing.key";
public static void main(String[] args) throws Exception {
// 1. 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 2. 创建通道
Channel channel = connection.createChannel();
// 3. 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
// 4. 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 5. 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 6. 发送消息
for (int i = 1; i <= 10; i++) {
String message = "消息 " + i;
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
System.out.println("发送消息: " + message);
}
// 7. 关闭资源
channel.close();
connection.close();
System.out.println("消息发送完成!");
}
}
消费者示例
import com.rabbitmq.client.*;
public class ConsumerDemo {
private static final String QUEUE_NAME = "demo.queue";
public static void main(String[] args) throws Exception {
// 1. 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 2. 创建通道
Channel channel = connection.createChannel();
// 3. 声明队列(确保队列存在)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("等待接收消息...");
// 4. 定义消息处理回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("收到消息: " + message);
// 处理业务逻辑...
// 手动确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 取消消费回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费被取消: " + consumerTag);
};
// 5. 开始消费(false 表示手动确认)
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
// 保持程序运行
System.out.println("按 Ctrl+C 退出...");
}
}
📊 概念关系图
┌─────────────────────────────────────────────────────────────┐
│ RabbitMQ Broker │
│ ┌─────────────────────────────────────────────────────────┐│
│ │ Virtual Host: / ││
│ │ ││
│ │ ┌─────────┐ ││
│ │ │Producer │ ││
│ │ └────┬────┘ ││
│ │ │ Connection ││
│ │ ▼ ││
│ │ ┌─────────┐ Binding ┌─────────┐ ││
│ │ │Exchange │────────────────→│ Queue │ ││
│ │ │(direct) │ routing.key │ │ ││
│ │ └─────────┘ └────┬────┘ ││
│ │ │ ││
│ │ ▼ ││
│ │ ┌─────────┐ ││
│ │ │Consumer │ ││
│ │ └─────────┘ ││
│ └─────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────┘
📝 本章小结
本章介绍了 RabbitMQ 的核心概念:
| 概念 | 作用 |
|---|---|
| Producer | 消息的发送方 |
| Consumer | 消息的接收方 |
| Connection | 与服务器的 TCP 连接 |
| Channel | 连接内的虚拟通道 |
| Queue | 存储消息的容器 |
| Exchange | 接收并路由消息 |
| Binding | 交换机与队列的绑定关系 |
| Routing Key | 消息的路由规则 |
| Virtual Host | 逻辑隔离单元 |
下一步
理解了核心概念后,让我们开始编写第一个 RabbitMQ 程序:Hello World!
