核心概念

在开始编写代码之前,我们需要先了解 RabbitMQ 的核心概念。这些概念是学习 RabbitMQ 的基础。

🏗️ 整体架构

┌─────────────────────────────────────────────────────────────────────┐
│                         RabbitMQ Server                              │
│  ┌───────────────────────────────────────────────────────────────┐  │
│  │                      Virtual Host                              │  │
│  │  ┌──────────┐    ┌──────────────┐    ┌──────────────────────┐ │  │
│  │  │ Producer │───→│   Exchange   │───→│        Queue         │ │  │
│  │  │  生产者   │    │    交换机     │    │         队列         │ │  │
│  │  └──────────┘    └──────────────┘    └──────────────────────┘ │  │
│  │                         │                       │              │  │
│  │                    Binding(绑定)            │              │  │
│  │                    Routing Key             ↓              │  │
│  │                                      ┌──────────┐         │  │
│  │                                      │ Consumer │         │  │
│  │                                      │  消费者   │         │  │
│  │                                      └──────────┘         │  │
│  └───────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────┘

📦 核心组件详解

1. Producer(生产者)

生产者是消息的发送方,负责创建消息并发送到 RabbitMQ。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin123");
        
        // 创建连接和通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // 声明队列
            String queueName = "hello";
            channel.queueDeclare(queueName, false, false, false, null);
            
            // 发送消息
            String message = "Hello RabbitMQ!";
            channel.basicPublish("", queueName, null, message.getBytes());
            
            System.out.println("消息已发送: " + message);
        }
    }
}

2. Consumer(消费者)

消费者是消息的接收方,负责从队列中获取消息并处理。

import com.rabbitmq.client.*;

public class Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin123");
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        String queueName = "hello";
        channel.queueDeclare(queueName, false, false, false, null);
        
        System.out.println("等待接收消息...");
        
        // 定义消息处理回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("收到消息: " + message);
        };
        
        // 开始消费消息
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
    }
}

3. Connection(连接)

连接是生产者/消费者与 RabbitMQ 服务器之间的 TCP 连接。

ConnectionFactory factory = new ConnectionFactory();

// 基本配置
factory.setHost("localhost");        // 服务器地址
factory.setPort(5672);               // 端口号
factory.setUsername("admin");        // 用户名
factory.setPassword("admin123");     // 密码
factory.setVirtualHost("/");         // 虚拟主机

// 连接池配置
factory.setConnectionTimeout(30000); // 连接超时时间
factory.setRequestedHeartbeat(60);   // 心跳检测间隔

// 创建连接
Connection connection = factory.newConnection();

注意

创建连接是比较耗费资源的操作,应该尽量复用连接,避免频繁创建和关闭。

4. Channel(通道)

通道是在连接内部创建的虚拟连接,大多数 API 操作都是通过通道完成的。

// 一个连接可以创建多个通道
Connection connection = factory.newConnection();

Channel channel1 = connection.createChannel();  // 通道1
Channel channel2 = connection.createChannel();  // 通道2
Channel channel3 = connection.createChannel();  // 通道3

// 通道是线程不安全的,每个线程应该使用自己的通道

为什么使用通道?

  • 创建和销毁 TCP 连接开销大
  • 一个连接可以创建多个通道
  • 通道之间相互隔离,互不影响
  • 通道轻量级,创建和销毁开销小

5. Queue(队列)

队列是消息的容器,消息存储在队列中等待消费者处理。

// 声明队列
channel.queueDeclare(
    "myQueue",    // 队列名称
    true,         // 是否持久化
    false,        // 是否排他(仅创建者可用)
    false,        // 是否自动删除(无消费者时删除)
    null          // 其他参数
);

队列参数详解:

参数类型说明
queueString队列名称,必须唯一
durableboolean持久化,服务器重启后队列仍存在
exclusiveboolean排他队列,仅创建它的连接可用
autoDeleteboolean自动删除,没有消费者时自动删除
argumentsMap额外参数,如消息TTL、队列长度等

常用队列参数:

Map<String, Object> arguments = new HashMap<>();

// 消息过期时间(毫秒)
arguments.put("x-message-ttl", 60000);

// 队列最大长度
arguments.put("x-max-length", 1000);

// 队列最大字节数
arguments.put("x-max-length-bytes", 1048576);

// 死信交换机
arguments.put("x-dead-letter-exchange", "dlx.exchange");

// 死信路由键
arguments.put("x-dead-letter-routing-key", "dlx.routing.key");

channel.queueDeclare("myQueue", true, false, false, arguments);

6. Exchange(交换机)

交换机接收生产者发送的消息,并根据路由规则将消息路由到对应的队列。

// 声明交换机
channel.exchangeDeclare(
    "myExchange",    // 交换机名称
    "direct",        // 交换机类型
    true,            // 是否持久化
    false,           // 是否自动删除
    null             // 其他参数
);

交换机类型:

类型说明路由规则
direct直连交换机完全匹配 Routing Key
fanout扇出交换机忽略 Routing Key,广播到所有绑定队列
topic主题交换机模糊匹配 Routing Key(支持通配符)
headers头部交换机根据消息头属性匹配(较少使用)

7. Binding(绑定)

绑定是交换机和队列之间的关联关系。

// 绑定队列到交换机
channel.queueBind(
    "myQueue",       // 队列名称
    "myExchange",    // 交换机名称
    "routing.key"    // 路由键
);

8. Routing Key(路由键)

路由键是消息的路由规则,交换机根据路由键决定将消息发送到哪个队列。

// 发送消息时指定路由键
channel.basicPublish(
    "myExchange",      // 交换机名称
    "order.created",   // 路由键
    null,              // 消息属性
    message.getBytes() // 消息内容
);

9. Virtual Host(虚拟主机)

虚拟主机是 RabbitMQ 的逻辑隔离单元,类似于数据库的 Schema。

// 连接到指定虚拟主机
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/myapp");  // 指定虚拟主机

虚拟主机的作用:

  • 隔离不同应用的资源
  • 权限控制
  • 资源配额管理
# 创建虚拟主机
rabbitmqctl add_vhost /myapp

# 设置用户权限
rabbitmqctl set_permissions -p /myapp admin ".*" ".*" ".*"

# 列出虚拟主机
rabbitmqctl list_vhosts

🔄 消息流转过程

1. 生产者创建消息
        ↓
2. 生产者连接到 RabbitMQ(Connection + Channel)
        ↓
3. 生产者将消息发送到交换机(Exchange)
        ↓
4. 交换机根据路由键(Routing Key)和绑定规则
   将消息路由到对应的队列(Queue)
        ↓
5. 消息在队列中等待
        ↓
6. 消费者连接到 RabbitMQ(Connection + Channel)
        ↓
7. 消费者从队列中获取消息
        ↓
8. 消费者处理消息并确认(ACK)
        ↓
9. RabbitMQ 从队列中删除已确认的消息

📋 完整示例

Maven 依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

连接工具类

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtil {
    
    private static ConnectionFactory factory;
    
    static {
        factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin123");
        factory.setVirtualHost("/");
    }
    
    /**
     * 获取连接
     */
    public static Connection getConnection() throws Exception {
        return factory.newConnection();
    }
}

生产者示例

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class ProducerDemo {
    
    private static final String EXCHANGE_NAME = "demo.exchange";
    private static final String QUEUE_NAME = "demo.queue";
    private static final String ROUTING_KEY = "demo.routing.key";
    
    public static void main(String[] args) throws Exception {
        // 1. 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        
        // 2. 创建通道
        Channel channel = connection.createChannel();
        
        // 3. 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        
        // 4. 声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 5. 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        
        // 6. 发送消息
        for (int i = 1; i <= 10; i++) {
            String message = "消息 " + i;
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
            System.out.println("发送消息: " + message);
        }
        
        // 7. 关闭资源
        channel.close();
        connection.close();
        
        System.out.println("消息发送完成!");
    }
}

消费者示例

import com.rabbitmq.client.*;

public class ConsumerDemo {
    
    private static final String QUEUE_NAME = "demo.queue";
    
    public static void main(String[] args) throws Exception {
        // 1. 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        
        // 2. 创建通道
        Channel channel = connection.createChannel();
        
        // 3. 声明队列(确保队列存在)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        System.out.println("等待接收消息...");
        
        // 4. 定义消息处理回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("收到消息: " + message);
            
            // 处理业务逻辑...
            
            // 手动确认消息
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        
        // 取消消费回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("消费被取消: " + consumerTag);
        };
        
        // 5. 开始消费(false 表示手动确认)
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
        
        // 保持程序运行
        System.out.println("按 Ctrl+C 退出...");
    }
}

📊 概念关系图

┌─────────────────────────────────────────────────────────────┐
│                      RabbitMQ Broker                         │
│  ┌─────────────────────────────────────────────────────────┐│
│  │                   Virtual Host: /                        ││
│  │                                                          ││
│  │   ┌─────────┐                                           ││
│  │   │Producer │                                           ││
│  │   └────┬────┘                                           ││
│  │        │ Connection                                      ││
│  │        ▼                                                 ││
│  │   ┌─────────┐     Binding      ┌─────────┐             ││
│  │   │Exchange │────────────────→│  Queue  │             ││
│  │   │(direct) │  routing.key     │         │             ││
│  │   └─────────┘                  └────┬────┘             ││
│  │                                     │                    ││
│  │                                     ▼                    ││
│  │                                ┌─────────┐              ││
│  │                                │Consumer │              ││
│  │                                └─────────┘              ││
│  └─────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────┘

📝 本章小结

本章介绍了 RabbitMQ 的核心概念:

概念作用
Producer消息的发送方
Consumer消息的接收方
Connection与服务器的 TCP 连接
Channel连接内的虚拟通道
Queue存储消息的容器
Exchange接收并路由消息
Binding交换机与队列的绑定关系
Routing Key消息的路由规则
Virtual Host逻辑隔离单元

下一步

理解了核心概念后,让我们开始编写第一个 RabbitMQ 程序:Hello World