Hello World
本章将带你编写第一个 RabbitMQ 程序,实现最简单的消息发送和接收。
🎯 本章目标
- 创建 Maven 项目并引入 RabbitMQ 依赖
- 编写生产者发送消息
- 编写消费者接收消息
- 理解最简单的消息模式
📦 简单模式介绍
简单模式是 RabbitMQ 最基础的工作模式:
┌──────────┐ ┌─────────┐ ┌──────────┐
│ Producer │ ───────→ │ Queue │ ───────→ │ Consumer │
│ 生产者 │ │ 队列 │ │ 消费者 │
└──────────┘ └─────────┘ └──────────┘
特点:
- 一个生产者对应一个消费者
- 消息直接发送到队列
- 不使用交换机(使用默认交换机)
🛠️ 项目搭建
创建 Maven 项目
创建一个新的 Maven 项目,项目结构如下:
rabbitmq-demo/
├── pom.xml
└── src/
└── main/
└── java/
└── com/
└── example/
└── rabbitmq/
├── util/
│ └── RabbitMQUtil.java
└── helloworld/
├── Producer.java
└── Consumer.java
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rabbitmq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- RabbitMQ Java 客户端 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
<!-- SLF4J 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
</project>
🔧 连接工具类
首先创建一个工具类,用于获取 RabbitMQ 连接:
package com.example.rabbitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* RabbitMQ 连接工具类
*/
public class RabbitMQUtil {
// RabbitMQ 服务器地址
private static final String HOST = "localhost";
// 端口号
private static final int PORT = 5672;
// 用户名
private static final String USERNAME = "admin";
// 密码
private static final String PASSWORD = "admin123";
// 虚拟主机
private static final String VIRTUAL_HOST = "/";
private static ConnectionFactory factory;
static {
// 创建连接工厂
factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VIRTUAL_HOST);
}
/**
* 获取连接
* @return Connection 连接对象
* @throws Exception 连接异常
*/
public static Connection getConnection() throws Exception {
return factory.newConnection();
}
}
📤 生产者代码
创建生产者,发送 "Hello RabbitMQ!" 消息:
package com.example.rabbitmq.helloworld;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* Hello World 示例 - 生产者
*
* 功能:发送一条简单的消息到队列
*/
public class Producer {
// 队列名称
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 1. 获取连接
connection = RabbitMQUtil.getConnection();
System.out.println("✅ 连接 RabbitMQ 成功!");
// 2. 创建通道
channel = connection.createChannel();
System.out.println("✅ 创建通道成功!");
// 3. 声明队列
// 参数说明:
// - queue: 队列名称
// - durable: 是否持久化(false 表示服务器重启后队列会丢失)
// - exclusive: 是否排他(false 表示其他连接也可以使用)
// - autoDelete: 是否自动删除(false 表示不自动删除)
// - arguments: 其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("✅ 声明队列成功:" + QUEUE_NAME);
// 4. 发送消息
String message = "Hello RabbitMQ!";
// 参数说明:
// - exchange: 交换机名称(空字符串表示使用默认交换机)
// - routingKey: 路由键(简单模式下就是队列名称)
// - props: 消息属性
// - body: 消息内容(字节数组)
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("✅ 消息发送成功:" + message);
} catch (Exception e) {
System.err.println("❌ 发生错误:" + e.getMessage());
e.printStackTrace();
} finally {
// 5. 关闭资源
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
System.out.println("✅ 资源关闭成功!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
代码详解
1. 获取连接
connection = RabbitMQUtil.getConnection();
通过工具类获取与 RabbitMQ 服务器的 TCP 连接。
2. 创建通道
channel = connection.createChannel();
在连接上创建通道,后续的操作都通过通道完成。
3. 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
声明一个队列。如果队列不存在则创建,如果已存在则不做任何操作。
幂等性
queueDeclare 是幂等操作,多次调用不会重复创建队列。
4. 发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
- 第一个参数是交换机名称,空字符串表示使用默认交换机
- 第二个参数是路由键,简单模式下就是队列名称
- 第三个参数是消息属性
- 第四个参数是消息内容(字节数组)
📥 消费者代码
创建消费者,从队列接收消息:
package com.example.rabbitmq.helloworld;
import com.example.rabbitmq.util.RabbitMQUtil;
import com.rabbitmq.client.*;
/**
* Hello World 示例 - 消费者
*
* 功能:从队列接收消息并打印
*/
public class Consumer {
// 队列名称(必须与生产者一致)
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
try {
// 1. 获取连接
Connection connection = RabbitMQUtil.getConnection();
System.out.println("✅ 连接 RabbitMQ 成功!");
// 2. 创建通道
Channel channel = connection.createChannel();
System.out.println("✅ 创建通道成功!");
// 3. 声明队列(确保队列存在)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("✅ 声明队列成功:" + QUEUE_NAME);
System.out.println("⏳ 等待接收消息... 按 Ctrl+C 退出");
System.out.println("=====================================");
// 4. 定义消息接收回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 获取消息内容
String message = new String(delivery.getBody(), "UTF-8");
// 打印消息信息
System.out.println("📨 收到消息:");
System.out.println(" - 内容:" + message);
System.out.println(" - 消费者标签:" + consumerTag);
System.out.println(" - 投递标签:" + delivery.getEnvelope().getDeliveryTag());
System.out.println(" - 交换机:" + delivery.getEnvelope().getExchange());
System.out.println(" - 路由键:" + delivery.getEnvelope().getRoutingKey());
System.out.println("=====================================");
};
// 5. 定义取消消费回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("⚠️ 消费被取消:" + consumerTag);
};
// 6. 开始消费消息
// 参数说明:
// - queue: 队列名称
// - autoAck: 是否自动确认(true 表示自动确认)
// - deliverCallback: 消息接收回调
// - cancelCallback: 取消消费回调
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
// 注意:消费者程序需要保持运行,不能关闭连接
// 这里不关闭连接,程序会一直等待消息
} catch (Exception e) {
System.err.println("❌ 发生错误:" + e.getMessage());
e.printStackTrace();
}
}
}
代码详解
DeliverCallback(消息接收回调)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("收到消息:" + message);
};
consumerTag:消费者标签,用于标识消费者delivery:投递对象,包含消息内容和元数据
CancelCallback(取消消费回调)
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费被取消:" + consumerTag);
};
当消费者被取消时触发(例如队列被删除)。
basicConsume(开始消费)
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
autoAck = true:自动确认模式,消息被接收后自动确认
🚀 运行测试
步骤一:启动消费者
先启动消费者,等待接收消息:
# 运行 Consumer.java
输出:
✅ 连接 RabbitMQ 成功!
✅ 创建通道成功!
✅ 声明队列成功:hello
⏳ 等待接收消息... 按 Ctrl+C 退出
=====================================
步骤二:启动生产者
再启动生产者,发送消息:
# 运行 Producer.java
输出:
✅ 连接 RabbitMQ 成功!
✅ 创建通道成功!
✅ 声明队列成功:hello
✅ 消息发送成功:Hello RabbitMQ!
✅ 资源关闭成功!
步骤三:查看消费者输出
消费者收到消息后会打印:
📨 收到消息:
- 内容:Hello RabbitMQ!
- 消费者标签:amq.ctag-xxxxx
- 投递标签:1
- 交换机:
- 路由键:hello
=====================================
🎯 进阶练习
练习1:发送多条消息
修改生产者,循环发送 10 条消息:
// 发送10条消息
for (int i = 1; i <= 10; i++) {
String message = "消息 #" + i + " - " + System.currentTimeMillis();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("✅ 发送消息:" + message);
// 模拟间隔
Thread.sleep(500);
}
练习2:发送 JSON 消息
发送结构化的 JSON 数据:
import com.fasterxml.jackson.databind.ObjectMapper;
// 创建消息对象
Map<String, Object> messageMap = new HashMap<>();
messageMap.put("id", 1001);
messageMap.put("name", "张三");
messageMap.put("action", "注册");
messageMap.put("timestamp", System.currentTimeMillis());
// 转换为 JSON 字符串
ObjectMapper objectMapper = new ObjectMapper();
String jsonMessage = objectMapper.writeValueAsString(messageMap);
// 发送消息
channel.basicPublish("", QUEUE_NAME, null, jsonMessage.getBytes("UTF-8"));
System.out.println("✅ 发送 JSON 消息:" + jsonMessage);
练习3:设置消息属性
发送带属性的消息:
import com.rabbitmq.client.AMQP;
import java.util.HashMap;
import java.util.Map;
// 设置消息属性
Map<String, Object> headers = new HashMap<>();
headers.put("source", "hello-world-demo");
headers.put("version", "1.0");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.contentEncoding("UTF-8")
.headers(headers)
.deliveryMode(2) // 2 表示持久化
.priority(5) // 优先级
.build();
// 发送消息
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes("UTF-8"));
🔍 通过管理界面查看
打开 RabbitMQ 管理界面 http://localhost:15672:
Queues 标签页:可以看到
hello队列点击队列名称查看详情:
- Ready:等待消费的消息数
- Unacked:已投递但未确认的消息数
- Total:消息总数
Get messages:可以手动获取队列中的消息查看
❓ 常见问题
Q1:连接失败怎么办?
检查以下几点:
- RabbitMQ 服务是否已启动
- 用户名密码是否正确
- 端口是否正确(默认 5672)
- 防火墙是否放行端口
Q2:消息发送成功但消费者收不到?
- 检查队列名称是否一致
- 先启动消费者再发送消息
- 查看管理界面确认消息是否在队列中
Q3:为什么消费者程序不能关闭连接?
消费者需要持续监听队列,如果关闭连接就无法接收新消息了。消费者程序通常是常驻进程。
📝 本章小结
本章我们完成了:
- ✅ 创建 Maven 项目并添加依赖
- ✅ 编写连接工具类
- ✅ 实现生产者发送消息
- ✅ 实现消费者接收消息
- ✅ 了解简单模式的工作原理
关键知识点:
| 方法 | 作用 |
|---|---|
queueDeclare() | 声明队列 |
basicPublish() | 发送消息 |
basicConsume() | 消费消息 |
下一步
简单模式虽然容易理解,但功能有限。接下来学习 工作队列模式,实现多消费者负载均衡!
