RPC 模式

RPC(Remote Procedure Call)远程过程调用是一种请求/响应模式,客户端发送请求消息并等待服务端的响应。

🎯 本章目标

  • 理解 RPC 模式的原理
  • 掌握 RabbitMQ 实现 RPC 的方法
  • 了解 RPC 的适用场景

📖 什么是 RPC

传统 RPC vs 消息队列 RPC

传统 RPC(同步阻塞):
┌────────┐  HTTP/gRPC   ┌────────┐
│ Client │ ───────────→ │ Server │
│        │ ←─────────── │        │
└────────┘   Response   └────────┘
    ↑
    └── 客户端阻塞等待

消息队列 RPC(异步解耦):
┌────────┐   Request    ┌─────────┐   Request    ┌────────┐
│ Client │ ───────────→ │ RabbitMQ│ ───────────→ │ Server │
│        │ ←─────────── │         │ ←─────────── │        │
└────────┘   Response   └─────────┘   Response   └────────┘
    ↑
    └── 通过消息队列解耦

RPC 工作流程

1. 客户端发送请求
   ┌────────┐                    ┌─────────────┐
   │ Client │ ──── Request ────→ │ rpc_queue   │
   └────────┘                    └─────────────┘
         ↑                              │
         │                              ↓
         │                       ┌────────────┐
         │                       │   Server   │
         │                       └────────────┘
         │                              │
         │                              ↓
   ┌─────────────────┐           ┌─────────────┐
   │ reply_to_queue  │ ←──────── │  Response   │
   └─────────────────┘           └─────────────┘

2. 关键属性:
   - reply_to: 响应队列名称
   - correlation_id: 请求关联ID

📦 核心概念

消息属性

属性说明用途
reply_to回复队列名称告诉服务端响应发送到哪里
correlation_id关联ID匹配请求和响应

回调队列

客户端创建一个临时队列用于接收响应:

// 创建临时队列(排他、自动删除)
String replyQueueName = channel.queueDeclare().getQueue();

💻 原生 Java 实现

Maven 依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

RPC 服务端

package com.example.rabbitmq.rpc;

import com.rabbitmq.client.*;

public class RPCServer {
    
    private static final String RPC_QUEUE_NAME = "rpc_queue";
    
    public static void main(String[] args) throws Exception {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin123");
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        // 声明 RPC 队列
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        channel.queuePurge(RPC_QUEUE_NAME);
        
        // 一次只处理一个请求
        channel.basicQos(1);
        
        System.out.println(" [x] RPC 服务端启动,等待请求...");
        
        // 定义消息处理回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            // 构建响应属性(使用请求的 correlationId)
            AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                    .correlationId(delivery.getProperties().getCorrelationId())
                    .build();
            
            String response = "";
            
            try {
                String message = new String(delivery.getBody(), "UTF-8");
                int n = Integer.parseInt(message);
                
                System.out.println(" [.] 收到请求: fib(" + message + ")");
                response = String.valueOf(fibonacci(n));
                
            } catch (RuntimeException e) {
                System.out.println(" [.] 处理异常: " + e.getMessage());
                response = "error: " + e.getMessage();
            } finally {
                // 发送响应到 reply_to 队列
                channel.basicPublish(
                    "", 
                    delivery.getProperties().getReplyTo(),
                    replyProps,
                    response.getBytes("UTF-8")
                );
                
                // 确认消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                
                System.out.println(" [x] 发送响应: " + response);
            }
        };
        
        // 开始消费
        channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, consumerTag -> {});
    }
    
    /**
     * 斐波那契计算(模拟业务处理)
     */
    private static int fibonacci(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fibonacci(n - 1) + fibonacci(n - 2);
    }
}

RPC 客户端

package com.example.rabbitmq.rpc;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class RPCClient implements AutoCloseable {
    
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    
    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin123");
        
        connection = factory.newConnection();
        channel = connection.createChannel();
    }
    
    /**
     * 发送 RPC 请求
     */
    public String call(String message) throws IOException, InterruptedException, ExecutionException {
        // 生成唯一的关联ID
        final String correlationId = UUID.randomUUID().toString();
        
        // 创建临时回复队列
        String replyQueueName = channel.queueDeclare().getQueue();
        
        // 设置请求属性
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .correlationId(correlationId)
                .replyTo(replyQueueName)
                .build();
        
        // 发送请求
        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
        
        // 异步等待响应
        final CompletableFuture<String> response = new CompletableFuture<>();
        
        // 消费回复队列
        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            // 校验 correlationId
            if (delivery.getProperties().getCorrelationId().equals(correlationId)) {
                response.complete(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {});
        
        // 获取响应结果
        String result = response.get();
        
        // 取消消费
        channel.basicCancel(ctag);
        
        return result;
    }
    
    @Override
    public void close() throws IOException {
        connection.close();
    }
    
    public static void main(String[] args) {
        try (RPCClient client = new RPCClient()) {
            for (int i = 0; i <= 10; i++) {
                String result = client.call(String.valueOf(i));
                System.out.println(" [x] fib(" + i + ") = " + result);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

🌱 Spring Boot 实现

配置类

package com.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RpcConfig {
    
    public static final String RPC_QUEUE = "rpc.queue";
    public static final String RPC_EXCHANGE = "rpc.exchange";
    public static final String RPC_ROUTING_KEY = "rpc.key";
    
    @Bean
    public DirectExchange rpcExchange() {
        return new DirectExchange(RPC_EXCHANGE);
    }
    
    @Bean
    public Queue rpcQueue() {
        return new Queue(RPC_QUEUE);
    }
    
    @Bean
    public Binding rpcBinding() {
        return BindingBuilder.bind(rpcQueue())
                .to(rpcExchange())
                .with(RPC_ROUTING_KEY);
    }
    
    /**
     * 配置 RabbitTemplate 支持 RPC
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setExchange(RPC_EXCHANGE);
        template.setRoutingKey(RPC_ROUTING_KEY);
        
        // 设置回复超时时间(毫秒)
        template.setReplyTimeout(10000);
        
        return template;
    }
}

RPC 服务端

package com.example.rabbitmq.rpc;

import com.example.rabbitmq.config.RpcConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class RpcServer {
    
    /**
     * 处理 RPC 请求,返回值自动发送到 reply_to 队列
     */
    @RabbitListener(queues = RpcConfig.RPC_QUEUE)
    public String handleRpcRequest(String message) {
        log.info("📥 收到 RPC 请求: {}", message);
        
        try {
            int n = Integer.parseInt(message);
            int result = fibonacci(n);
            
            log.info("✅ 计算完成: fib({}) = {}", n, result);
            return String.valueOf(result);
            
        } catch (Exception e) {
            log.error("❌ 处理失败: {}", e.getMessage());
            return "error: " + e.getMessage();
        }
    }
    
    private int fibonacci(int n) {
        if (n <= 1) return n;
        return fibonacci(n - 1) + fibonacci(n - 2);
    }
}

RPC 客户端

package com.example.rabbitmq.rpc;

import com.example.rabbitmq.config.RpcConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class RpcClient {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 同步 RPC 调用
     */
    public String callSync(String message) {
        log.info("📤 发送 RPC 请求: {}", message);
        
        // convertSendAndReceive 会自动处理 reply_to 和 correlation_id
        Object response = rabbitTemplate.convertSendAndReceive(
                RpcConfig.RPC_EXCHANGE,
                RpcConfig.RPC_ROUTING_KEY,
                message
        );
        
        String result = response != null ? response.toString() : "null";
        log.info("📥 收到 RPC 响应: {}", result);
        
        return result;
    }
    
    /**
     * 带超时的 RPC 调用
     */
    public String callWithTimeout(String message, long timeoutMs) {
        log.info("📤 发送 RPC 请求(超时 {}ms): {}", timeoutMs, message);
        
        // 临时修改超时时间
        rabbitTemplate.setReplyTimeout(timeoutMs);
        
        try {
            Object response = rabbitTemplate.convertSendAndReceive(
                    RpcConfig.RPC_EXCHANGE,
                    RpcConfig.RPC_ROUTING_KEY,
                    message
            );
            
            if (response == null) {
                throw new RuntimeException("RPC 调用超时");
            }
            
            return response.toString();
        } finally {
            // 恢复默认超时
            rabbitTemplate.setReplyTimeout(10000);
        }
    }
}

使用示例

package com.example.rabbitmq.controller;

import com.example.rabbitmq.rpc.RpcClient;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/rpc")
@RequiredArgsConstructor
public class RpcController {
    
    private final RpcClient rpcClient;
    
    @GetMapping("/fibonacci/{n}")
    public String fibonacci(@PathVariable int n) {
        return rpcClient.callSync(String.valueOf(n));
    }
    
    @GetMapping("/fibonacci/{n}/timeout/{ms}")
    public String fibonacciWithTimeout(@PathVariable int n, @PathVariable long ms) {
        return rpcClient.callWithTimeout(String.valueOf(n), ms);
    }
}

🔄 异步 RPC

使用 AsyncRabbitTemplate

package com.example.rabbitmq.config;

import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AsyncRpcConfig {
    
    @Bean
    public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate) {
        return new AsyncRabbitTemplate(rabbitTemplate);
    }
}

异步 RPC 客户端

package com.example.rabbitmq.rpc;

import com.example.rabbitmq.config.RpcConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Slf4j
@Service
@RequiredArgsConstructor
public class AsyncRpcClient {
    
    private final AsyncRabbitTemplate asyncTemplate;
    
    /**
     * 异步 RPC 调用
     */
    public CompletableFuture<String> callAsync(String message) {
        log.info("📤 发送异步 RPC 请求: {}", message);
        
        AsyncRabbitTemplate.RabbitConverterFuture<Object> future = 
                asyncTemplate.convertSendAndReceive(
                        RpcConfig.RPC_EXCHANGE,
                        RpcConfig.RPC_ROUTING_KEY,
                        message
                );
        
        return future.toCompletableFuture().thenApply(response -> {
            String result = response != null ? response.toString() : "null";
            log.info("📥 收到异步响应: {}", result);
            return result;
        });
    }
    
    /**
     * 批量异步调用
     */
    public void batchCallAsync(int... numbers) {
        for (int n : numbers) {
            callAsync(String.valueOf(n))
                    .thenAccept(result -> log.info("fib({}) = {}", n, result))
                    .exceptionally(ex -> {
                        log.error("fib({}) 调用失败: {}", n, ex.getMessage());
                        return null;
                    });
        }
    }
}

⚠️ 注意事项

1. 超时处理

// 设置合理的超时时间
rabbitTemplate.setReplyTimeout(5000); // 5秒

// 处理超时
try {
    Object response = rabbitTemplate.convertSendAndReceive(exchange, routingKey, message);
    if (response == null) {
        // 超时返回 null
        throw new TimeoutException("RPC 调用超时");
    }
} catch (Exception e) {
    // 异常处理
}

2. 避免阻塞

// ❌ 错误:在 RPC 服务端中调用 RPC(可能死锁)
@RabbitListener(queues = "queue.a")
public String handleA(String msg) {
    // 这里又发起 RPC 调用,可能导致死锁
    return rpcClient.callSync(msg);
}

// ✅ 正确:使用异步或避免嵌套调用
@RabbitListener(queues = "queue.a")
public String handleA(String msg) {
    return processLocally(msg);
}

3. 错误处理

@RabbitListener(queues = RpcConfig.RPC_QUEUE)
public String handleRpc(String message) {
    try {
        return doProcess(message);
    } catch (BusinessException e) {
        // 业务异常,返回错误信息
        return "ERROR:" + e.getMessage();
    } catch (Exception e) {
        // 系统异常,记录日志
        log.error("RPC 处理异常", e);
        return "ERROR:SYSTEM_ERROR";
    }
}

🎯 适用场景

场景是否适合说明
需要同步响应如计算服务、查询服务
高并发请求⚠️需要注意超时和性能
长耗时任务建议使用异步消息
微服务调用⚠️考虑使用 gRPC/HTTP

📊 RPC vs 普通消息

对比项RPC 模式普通消息
通信方式请求-响应单向发送
等待响应
耦合度较高较低
复杂度较高较低
适用场景需要结果异步任务

📝 本章小结

  1. RPC 原理:通过 reply_to 和 correlation_id 实现请求响应匹配
  2. 实现方式:原生 Java 或 Spring Boot(推荐)
  3. 注意事项:超时处理、避免死锁、错误处理
  4. 适用场景:需要同步获取响应的场景

下一步

学习完 RPC 模式后,让我们了解另一种交换机类型:Headers 交换机