Hello World

本章将带你编写第一个 RabbitMQ 程序,实现最简单的消息发送和接收。

🎯 本章目标

  • 创建 Maven 项目并引入 RabbitMQ 依赖
  • 编写生产者发送消息
  • 编写消费者接收消息
  • 理解最简单的消息模式

📦 简单模式介绍

简单模式是 RabbitMQ 最基础的工作模式:

┌──────────┐          ┌─────────┐          ┌──────────┐
│ Producer │ ───────→ │  Queue  │ ───────→ │ Consumer │
│   生产者  │          │   队列   │          │   消费者  │
└──────────┘          └─────────┘          └──────────┘

特点:

  • 一个生产者对应一个消费者
  • 消息直接发送到队列
  • 不使用交换机(使用默认交换机)

🛠️ 项目搭建

创建 Maven 项目

创建一个新的 Maven 项目,项目结构如下:

rabbitmq-demo/
├── pom.xml
└── src/
    └── main/
        └── java/
            └── com/
                └── example/
                    └── rabbitmq/
                        ├── util/
                        │   └── RabbitMQUtil.java
                        └── helloworld/
                            ├── Producer.java
                            └── Consumer.java

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>rabbitmq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- RabbitMQ Java 客户端 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.20.0</version>
        </dependency>
        
        <!-- SLF4J 日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>2.0.9</version>
        </dependency>
    </dependencies>
</project>

🔧 连接工具类

首先创建一个工具类,用于获取 RabbitMQ 连接:

package com.example.rabbitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * RabbitMQ 连接工具类
 */
public class RabbitMQUtil {
    
    // RabbitMQ 服务器地址
    private static final String HOST = "localhost";
    // 端口号
    private static final int PORT = 5672;
    // 用户名
    private static final String USERNAME = "admin";
    // 密码
    private static final String PASSWORD = "admin123";
    // 虚拟主机
    private static final String VIRTUAL_HOST = "/";
    
    private static ConnectionFactory factory;
    
    static {
        // 创建连接工厂
        factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost(VIRTUAL_HOST);
    }
    
    /**
     * 获取连接
     * @return Connection 连接对象
     * @throws Exception 连接异常
     */
    public static Connection getConnection() throws Exception {
        return factory.newConnection();
    }
}

📤 生产者代码

创建生产者,发送 "Hello RabbitMQ!" 消息:

package com.example.rabbitmq.helloworld;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * Hello World 示例 - 生产者
 * 
 * 功能:发送一条简单的消息到队列
 */
public class Producer {
    
    // 队列名称
    private static final String QUEUE_NAME = "hello";
    
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        
        try {
            // 1. 获取连接
            connection = RabbitMQUtil.getConnection();
            System.out.println("✅ 连接 RabbitMQ 成功!");
            
            // 2. 创建通道
            channel = connection.createChannel();
            System.out.println("✅ 创建通道成功!");
            
            // 3. 声明队列
            // 参数说明:
            // - queue: 队列名称
            // - durable: 是否持久化(false 表示服务器重启后队列会丢失)
            // - exclusive: 是否排他(false 表示其他连接也可以使用)
            // - autoDelete: 是否自动删除(false 表示不自动删除)
            // - arguments: 其他参数
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println("✅ 声明队列成功:" + QUEUE_NAME);
            
            // 4. 发送消息
            String message = "Hello RabbitMQ!";
            
            // 参数说明:
            // - exchange: 交换机名称(空字符串表示使用默认交换机)
            // - routingKey: 路由键(简单模式下就是队列名称)
            // - props: 消息属性
            // - body: 消息内容(字节数组)
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            
            System.out.println("✅ 消息发送成功:" + message);
            
        } catch (Exception e) {
            System.err.println("❌ 发生错误:" + e.getMessage());
            e.printStackTrace();
        } finally {
            // 5. 关闭资源
            try {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
                System.out.println("✅ 资源关闭成功!");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

代码详解

1. 获取连接

connection = RabbitMQUtil.getConnection();

通过工具类获取与 RabbitMQ 服务器的 TCP 连接。

2. 创建通道

channel = connection.createChannel();

在连接上创建通道,后续的操作都通过通道完成。

3. 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

声明一个队列。如果队列不存在则创建,如果已存在则不做任何操作。

幂等性

queueDeclare 是幂等操作,多次调用不会重复创建队列。

4. 发送消息

channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
  • 第一个参数是交换机名称,空字符串表示使用默认交换机
  • 第二个参数是路由键,简单模式下就是队列名称
  • 第三个参数是消息属性
  • 第四个参数是消息内容(字节数组)

📥 消费者代码

创建消费者,从队列接收消息:

package com.example.rabbitmq.helloworld;

import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;

/**
 * Hello World 示例 - 消费者
 * 
 * 功能:从队列接收消息并打印
 */
public class Consumer {
    
    // 队列名称(必须与生产者一致)
    private static final String QUEUE_NAME = "hello";
    
    public static void main(String[] args) {
        try {
            // 1. 获取连接
            Connection connection = RabbitMQUtil.getConnection();
            System.out.println("✅ 连接 RabbitMQ 成功!");
            
            // 2. 创建通道
            Channel channel = connection.createChannel();
            System.out.println("✅ 创建通道成功!");
            
            // 3. 声明队列(确保队列存在)
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println("✅ 声明队列成功:" + QUEUE_NAME);
            
            System.out.println("⏳ 等待接收消息... 按 Ctrl+C 退出");
            System.out.println("=====================================");
            
            // 4. 定义消息接收回调
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                // 获取消息内容
                String message = new String(delivery.getBody(), "UTF-8");
                
                // 打印消息信息
                System.out.println("📨 收到消息:");
                System.out.println("   - 内容:" + message);
                System.out.println("   - 消费者标签:" + consumerTag);
                System.out.println("   - 投递标签:" + delivery.getEnvelope().getDeliveryTag());
                System.out.println("   - 交换机:" + delivery.getEnvelope().getExchange());
                System.out.println("   - 路由键:" + delivery.getEnvelope().getRoutingKey());
                System.out.println("=====================================");
            };
            
            // 5. 定义取消消费回调
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("⚠️ 消费被取消:" + consumerTag);
            };
            
            // 6. 开始消费消息
            // 参数说明:
            // - queue: 队列名称
            // - autoAck: 是否自动确认(true 表示自动确认)
            // - deliverCallback: 消息接收回调
            // - cancelCallback: 取消消费回调
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
            
            // 注意:消费者程序需要保持运行,不能关闭连接
            // 这里不关闭连接,程序会一直等待消息
            
        } catch (Exception e) {
            System.err.println("❌ 发生错误:" + e.getMessage());
            e.printStackTrace();
        }
    }
}

代码详解

DeliverCallback(消息接收回调)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println("收到消息:" + message);
};
  • consumerTag:消费者标签,用于标识消费者
  • delivery:投递对象,包含消息内容和元数据

CancelCallback(取消消费回调)

CancelCallback cancelCallback = consumerTag -> {
    System.out.println("消费被取消:" + consumerTag);
};

当消费者被取消时触发(例如队列被删除)。

basicConsume(开始消费)

channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
  • autoAck = true:自动确认模式,消息被接收后自动确认

🚀 运行测试

步骤一:启动消费者

先启动消费者,等待接收消息:

# 运行 Consumer.java

输出:

✅ 连接 RabbitMQ 成功!
✅ 创建通道成功!
✅ 声明队列成功:hello
⏳ 等待接收消息... 按 Ctrl+C 退出
=====================================

步骤二:启动生产者

再启动生产者,发送消息:

# 运行 Producer.java

输出:

✅ 连接 RabbitMQ 成功!
✅ 创建通道成功!
✅ 声明队列成功:hello
✅ 消息发送成功:Hello RabbitMQ!
✅ 资源关闭成功!

步骤三:查看消费者输出

消费者收到消息后会打印:

📨 收到消息:
   - 内容:Hello RabbitMQ!
   - 消费者标签:amq.ctag-xxxxx
   - 投递标签:1
   - 交换机:
   - 路由键:hello
=====================================

🎯 进阶练习

练习1:发送多条消息

修改生产者,循环发送 10 条消息:

// 发送10条消息
for (int i = 1; i <= 10; i++) {
    String message = "消息 #" + i + " - " + System.currentTimeMillis();
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println("✅ 发送消息:" + message);
    
    // 模拟间隔
    Thread.sleep(500);
}

练习2:发送 JSON 消息

发送结构化的 JSON 数据:

import com.fasterxml.jackson.databind.ObjectMapper;

// 创建消息对象
Map<String, Object> messageMap = new HashMap<>();
messageMap.put("id", 1001);
messageMap.put("name", "张三");
messageMap.put("action", "注册");
messageMap.put("timestamp", System.currentTimeMillis());

// 转换为 JSON 字符串
ObjectMapper objectMapper = new ObjectMapper();
String jsonMessage = objectMapper.writeValueAsString(messageMap);

// 发送消息
channel.basicPublish("", QUEUE_NAME, null, jsonMessage.getBytes("UTF-8"));
System.out.println("✅ 发送 JSON 消息:" + jsonMessage);

练习3:设置消息属性

发送带属性的消息:

import com.rabbitmq.client.AMQP;
import java.util.HashMap;
import java.util.Map;

// 设置消息属性
Map<String, Object> headers = new HashMap<>();
headers.put("source", "hello-world-demo");
headers.put("version", "1.0");

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .contentType("text/plain")
    .contentEncoding("UTF-8")
    .headers(headers)
    .deliveryMode(2)  // 2 表示持久化
    .priority(5)      // 优先级
    .build();

// 发送消息
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes("UTF-8"));

🔍 通过管理界面查看

打开 RabbitMQ 管理界面 http://localhost:15672

  1. Queues 标签页:可以看到 hello 队列

  2. 点击队列名称查看详情:

    • Ready:等待消费的消息数
    • Unacked:已投递但未确认的消息数
    • Total:消息总数
  3. Get messages:可以手动获取队列中的消息查看

❓ 常见问题

Q1:连接失败怎么办?

检查以下几点:

  1. RabbitMQ 服务是否已启动
  2. 用户名密码是否正确
  3. 端口是否正确(默认 5672)
  4. 防火墙是否放行端口

Q2:消息发送成功但消费者收不到?

  1. 检查队列名称是否一致
  2. 先启动消费者再发送消息
  3. 查看管理界面确认消息是否在队列中

Q3:为什么消费者程序不能关闭连接?

消费者需要持续监听队列,如果关闭连接就无法接收新消息了。消费者程序通常是常驻进程。

📝 本章小结

本章我们完成了:

  1. ✅ 创建 Maven 项目并添加依赖
  2. ✅ 编写连接工具类
  3. ✅ 实现生产者发送消息
  4. ✅ 实现消费者接收消息
  5. ✅ 了解简单模式的工作原理

关键知识点:

方法作用
queueDeclare()声明队列
basicPublish()发送消息
basicConsume()消费消息

下一步

简单模式虽然容易理解,但功能有限。接下来学习 工作队列模式,实现多消费者负载均衡!