RPC 模式
RPC(Remote Procedure Call)远程过程调用是一种请求/响应模式,客户端发送请求消息并等待服务端的响应。
🎯 本章目标
- 理解 RPC 模式的原理
- 掌握 RabbitMQ 实现 RPC 的方法
- 了解 RPC 的适用场景
📖 什么是 RPC
传统 RPC vs 消息队列 RPC
传统 RPC(同步阻塞):
┌────────┐ HTTP/gRPC ┌────────┐
│ Client │ ───────────→ │ Server │
│ │ ←─────────── │ │
└────────┘ Response └────────┘
↑
└── 客户端阻塞等待
消息队列 RPC(异步解耦):
┌────────┐ Request ┌─────────┐ Request ┌────────┐
│ Client │ ───────────→ │ RabbitMQ│ ───────────→ │ Server │
│ │ ←─────────── │ │ ←─────────── │ │
└────────┘ Response └─────────┘ Response └────────┘
↑
└── 通过消息队列解耦
RPC 工作流程
1. 客户端发送请求
┌────────┐ ┌─────────────┐
│ Client │ ──── Request ────→ │ rpc_queue │
└────────┘ └─────────────┘
↑ │
│ ↓
│ ┌────────────┐
│ │ Server │
│ └────────────┘
│ │
│ ↓
┌─────────────────┐ ┌─────────────┐
│ reply_to_queue │ ←──────── │ Response │
└─────────────────┘ └─────────────┘
2. 关键属性:
- reply_to: 响应队列名称
- correlation_id: 请求关联ID
📦 核心概念
消息属性
| 属性 | 说明 | 用途 |
|---|---|---|
| reply_to | 回复队列名称 | 告诉服务端响应发送到哪里 |
| correlation_id | 关联ID | 匹配请求和响应 |
回调队列
客户端创建一个临时队列用于接收响应:
// 创建临时队列(排他、自动删除)
String replyQueueName = channel.queueDeclare().getQueue();
💻 原生 Java 实现
Maven 依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
RPC 服务端
package com.example.rabbitmq.rpc;
import com.rabbitmq.client.*;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) throws Exception {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明 RPC 队列
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);
// 一次只处理一个请求
channel.basicQos(1);
System.out.println(" [x] RPC 服务端启动,等待请求...");
// 定义消息处理回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 构建响应属性(使用请求的 correlationId)
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] 收到请求: fib(" + message + ")");
response = String.valueOf(fibonacci(n));
} catch (RuntimeException e) {
System.out.println(" [.] 处理异常: " + e.getMessage());
response = "error: " + e.getMessage();
} finally {
// 发送响应到 reply_to 队列
channel.basicPublish(
"",
delivery.getProperties().getReplyTo(),
replyProps,
response.getBytes("UTF-8")
);
// 确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(" [x] 发送响应: " + response);
}
};
// 开始消费
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
/**
* 斐波那契计算(模拟业务处理)
*/
private static int fibonacci(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fibonacci(n - 1) + fibonacci(n - 2);
}
}
RPC 客户端
package com.example.rabbitmq.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class RPCClient implements AutoCloseable {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin123");
connection = factory.newConnection();
channel = connection.createChannel();
}
/**
* 发送 RPC 请求
*/
public String call(String message) throws IOException, InterruptedException, ExecutionException {
// 生成唯一的关联ID
final String correlationId = UUID.randomUUID().toString();
// 创建临时回复队列
String replyQueueName = channel.queueDeclare().getQueue();
// 设置请求属性
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(correlationId)
.replyTo(replyQueueName)
.build();
// 发送请求
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
// 异步等待响应
final CompletableFuture<String> response = new CompletableFuture<>();
// 消费回复队列
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
// 校验 correlationId
if (delivery.getProperties().getCorrelationId().equals(correlationId)) {
response.complete(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {});
// 获取响应结果
String result = response.get();
// 取消消费
channel.basicCancel(ctag);
return result;
}
@Override
public void close() throws IOException {
connection.close();
}
public static void main(String[] args) {
try (RPCClient client = new RPCClient()) {
for (int i = 0; i <= 10; i++) {
String result = client.call(String.valueOf(i));
System.out.println(" [x] fib(" + i + ") = " + result);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
🌱 Spring Boot 实现
配置类
package com.example.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RpcConfig {
public static final String RPC_QUEUE = "rpc.queue";
public static final String RPC_EXCHANGE = "rpc.exchange";
public static final String RPC_ROUTING_KEY = "rpc.key";
@Bean
public DirectExchange rpcExchange() {
return new DirectExchange(RPC_EXCHANGE);
}
@Bean
public Queue rpcQueue() {
return new Queue(RPC_QUEUE);
}
@Bean
public Binding rpcBinding() {
return BindingBuilder.bind(rpcQueue())
.to(rpcExchange())
.with(RPC_ROUTING_KEY);
}
/**
* 配置 RabbitTemplate 支持 RPC
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setExchange(RPC_EXCHANGE);
template.setRoutingKey(RPC_ROUTING_KEY);
// 设置回复超时时间(毫秒)
template.setReplyTimeout(10000);
return template;
}
}
RPC 服务端
package com.example.rabbitmq.rpc;
import com.example.rabbitmq.config.RpcConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class RpcServer {
/**
* 处理 RPC 请求,返回值自动发送到 reply_to 队列
*/
@RabbitListener(queues = RpcConfig.RPC_QUEUE)
public String handleRpcRequest(String message) {
log.info("📥 收到 RPC 请求: {}", message);
try {
int n = Integer.parseInt(message);
int result = fibonacci(n);
log.info("✅ 计算完成: fib({}) = {}", n, result);
return String.valueOf(result);
} catch (Exception e) {
log.error("❌ 处理失败: {}", e.getMessage());
return "error: " + e.getMessage();
}
}
private int fibonacci(int n) {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
}
RPC 客户端
package com.example.rabbitmq.rpc;
import com.example.rabbitmq.config.RpcConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class RpcClient {
private final RabbitTemplate rabbitTemplate;
/**
* 同步 RPC 调用
*/
public String callSync(String message) {
log.info("📤 发送 RPC 请求: {}", message);
// convertSendAndReceive 会自动处理 reply_to 和 correlation_id
Object response = rabbitTemplate.convertSendAndReceive(
RpcConfig.RPC_EXCHANGE,
RpcConfig.RPC_ROUTING_KEY,
message
);
String result = response != null ? response.toString() : "null";
log.info("📥 收到 RPC 响应: {}", result);
return result;
}
/**
* 带超时的 RPC 调用
*/
public String callWithTimeout(String message, long timeoutMs) {
log.info("📤 发送 RPC 请求(超时 {}ms): {}", timeoutMs, message);
// 临时修改超时时间
rabbitTemplate.setReplyTimeout(timeoutMs);
try {
Object response = rabbitTemplate.convertSendAndReceive(
RpcConfig.RPC_EXCHANGE,
RpcConfig.RPC_ROUTING_KEY,
message
);
if (response == null) {
throw new RuntimeException("RPC 调用超时");
}
return response.toString();
} finally {
// 恢复默认超时
rabbitTemplate.setReplyTimeout(10000);
}
}
}
使用示例
package com.example.rabbitmq.controller;
import com.example.rabbitmq.rpc.RpcClient;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/rpc")
@RequiredArgsConstructor
public class RpcController {
private final RpcClient rpcClient;
@GetMapping("/fibonacci/{n}")
public String fibonacci(@PathVariable int n) {
return rpcClient.callSync(String.valueOf(n));
}
@GetMapping("/fibonacci/{n}/timeout/{ms}")
public String fibonacciWithTimeout(@PathVariable int n, @PathVariable long ms) {
return rpcClient.callWithTimeout(String.valueOf(n), ms);
}
}
🔄 异步 RPC
使用 AsyncRabbitTemplate
package com.example.rabbitmq.config;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AsyncRpcConfig {
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate) {
return new AsyncRabbitTemplate(rabbitTemplate);
}
}
异步 RPC 客户端
package com.example.rabbitmq.rpc;
import com.example.rabbitmq.config.RpcConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
@Slf4j
@Service
@RequiredArgsConstructor
public class AsyncRpcClient {
private final AsyncRabbitTemplate asyncTemplate;
/**
* 异步 RPC 调用
*/
public CompletableFuture<String> callAsync(String message) {
log.info("📤 发送异步 RPC 请求: {}", message);
AsyncRabbitTemplate.RabbitConverterFuture<Object> future =
asyncTemplate.convertSendAndReceive(
RpcConfig.RPC_EXCHANGE,
RpcConfig.RPC_ROUTING_KEY,
message
);
return future.toCompletableFuture().thenApply(response -> {
String result = response != null ? response.toString() : "null";
log.info("📥 收到异步响应: {}", result);
return result;
});
}
/**
* 批量异步调用
*/
public void batchCallAsync(int... numbers) {
for (int n : numbers) {
callAsync(String.valueOf(n))
.thenAccept(result -> log.info("fib({}) = {}", n, result))
.exceptionally(ex -> {
log.error("fib({}) 调用失败: {}", n, ex.getMessage());
return null;
});
}
}
}
⚠️ 注意事项
1. 超时处理
// 设置合理的超时时间
rabbitTemplate.setReplyTimeout(5000); // 5秒
// 处理超时
try {
Object response = rabbitTemplate.convertSendAndReceive(exchange, routingKey, message);
if (response == null) {
// 超时返回 null
throw new TimeoutException("RPC 调用超时");
}
} catch (Exception e) {
// 异常处理
}
2. 避免阻塞
// ❌ 错误:在 RPC 服务端中调用 RPC(可能死锁)
@RabbitListener(queues = "queue.a")
public String handleA(String msg) {
// 这里又发起 RPC 调用,可能导致死锁
return rpcClient.callSync(msg);
}
// ✅ 正确:使用异步或避免嵌套调用
@RabbitListener(queues = "queue.a")
public String handleA(String msg) {
return processLocally(msg);
}
3. 错误处理
@RabbitListener(queues = RpcConfig.RPC_QUEUE)
public String handleRpc(String message) {
try {
return doProcess(message);
} catch (BusinessException e) {
// 业务异常,返回错误信息
return "ERROR:" + e.getMessage();
} catch (Exception e) {
// 系统异常,记录日志
log.error("RPC 处理异常", e);
return "ERROR:SYSTEM_ERROR";
}
}
🎯 适用场景
| 场景 | 是否适合 | 说明 |
|---|---|---|
| 需要同步响应 | ✅ | 如计算服务、查询服务 |
| 高并发请求 | ⚠️ | 需要注意超时和性能 |
| 长耗时任务 | ❌ | 建议使用异步消息 |
| 微服务调用 | ⚠️ | 考虑使用 gRPC/HTTP |
📊 RPC vs 普通消息
| 对比项 | RPC 模式 | 普通消息 |
|---|---|---|
| 通信方式 | 请求-响应 | 单向发送 |
| 等待响应 | 是 | 否 |
| 耦合度 | 较高 | 较低 |
| 复杂度 | 较高 | 较低 |
| 适用场景 | 需要结果 | 异步任务 |
📝 本章小结
- RPC 原理:通过 reply_to 和 correlation_id 实现请求响应匹配
- 实现方式:原生 Java 或 Spring Boot(推荐)
- 注意事项:超时处理、避免死锁、错误处理
- 适用场景:需要同步获取响应的场景
下一步
学习完 RPC 模式后,让我们了解另一种交换机类型:Headers 交换机!
