性能优化
本章介绍 RabbitMQ 性能优化的关键技巧。
🎯 本章目标
- 了解影响性能的因素
- 掌握生产者优化技巧
- 掌握消费者优化技巧
- 了解服务器端优化
📊 性能影响因素
| 因素 | 影响 | 优化方向 |
|---|---|---|
| 消息持久化 | 降低吞吐量 | 非关键消息可不持久化 |
| 消息确认 | 增加延迟 | 批量确认 |
| 消息大小 | 影响传输速度 | 压缩消息 |
| 连接数 | 消耗资源 | 使用连接池 |
| 预取数量 | 影响消费速度 | 调整 prefetch |
📤 生产者优化
1. 批量发送
@Slf4j
@Service
@RequiredArgsConstructor
public class BatchProducer {
private final RabbitTemplate rabbitTemplate;
/**
* 批量发送消息
*/
public void sendBatch(List<String> messages) {
rabbitTemplate.execute(channel -> {
for (String message : messages) {
channel.basicPublish(
"exchange",
"routing.key",
null,
message.getBytes()
);
}
return null;
});
log.info("批量发送 {} 条消息", messages.size());
}
}
2. 异步发送
@Slf4j
@Service
@RequiredArgsConstructor
public class AsyncProducer {
private final RabbitTemplate rabbitTemplate;
private final ExecutorService executor = Executors.newFixedThreadPool(10);
/**
* 异步发送消息
*/
public CompletableFuture<Void> sendAsync(String message) {
return CompletableFuture.runAsync(() -> {
rabbitTemplate.convertAndSend("exchange", "key", message);
}, executor);
}
/**
* 批量异步发送
*/
public CompletableFuture<Void> sendBatchAsync(List<String> messages) {
List<CompletableFuture<Void>> futures = messages.stream()
.map(this::sendAsync)
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
}
3. 消息压缩
@Service
public class CompressedProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendCompressed(Object message) throws Exception {
// 序列化
byte[] data = objectMapper.writeValueAsBytes(message);
// 压缩
byte[] compressed = compress(data);
// 发送
rabbitTemplate.convertAndSend("exchange", "key", compressed, msg -> {
msg.getMessageProperties().setContentEncoding("gzip");
return msg;
});
}
private byte[] compress(byte[] data) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(baos)) {
gzip.write(data);
}
return baos.toByteArray();
}
}
📥 消费者优化
1. 调整 Prefetch
spring:
rabbitmq:
listener:
simple:
# 每次预取消息数量
prefetch: 50
Prefetch 建议值:
| 场景 | 建议值 |
|---|---|
| 快速处理(<1ms) | 100-300 |
| 一般处理(1-10ms) | 30-100 |
| 慢速处理(>100ms) | 1-10 |
2. 增加消费者
spring:
rabbitmq:
listener:
simple:
# 并发消费者数量
concurrency: 5
max-concurrency: 20
3. 批量确认
@Slf4j
@Component
public class BatchAckConsumer {
private final AtomicInteger counter = new AtomicInteger(0);
private final int BATCH_SIZE = 10;
@RabbitListener(queues = "batch.queue")
public void handleMessage(String message, Channel channel, Message amqpMessage)
throws IOException {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
// 处理消息
processMessage(message);
// 批量确认
if (counter.incrementAndGet() >= BATCH_SIZE) {
channel.basicAck(deliveryTag, true); // multiple=true
counter.set(0);
log.info("批量确认 {} 条消息", BATCH_SIZE);
}
}
}
4. 并行处理
@Slf4j
@Component
public class ParallelConsumer {
private final ExecutorService executor = Executors.newFixedThreadPool(10);
@RabbitListener(queues = "parallel.queue", concurrency = "5-10")
public void handleMessage(String message, Channel channel, Message amqpMessage) {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
// 异步处理
executor.submit(() -> {
try {
processMessage(message);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
log.error("确认失败", ex);
}
}
});
}
}
🖥️ 服务器优化
1. 内存配置
# 修改配置文件 /etc/rabbitmq/rabbitmq.conf
# 内存水位线(使用系统内存的 60%)
vm_memory_high_watermark.relative = 0.6
# 磁盘空间限制
disk_free_limit.relative = 1.5
2. 文件描述符
# 修改 /etc/security/limits.conf
rabbitmq soft nofile 65536
rabbitmq hard nofile 65536
3. 网络优化
# /etc/sysctl.conf
net.core.somaxconn = 4096
net.ipv4.tcp_max_syn_backlog = 4096
net.core.netdev_max_backlog = 4096
📊 性能测试
使用 PerfTest
# 安装
docker pull pivotalrabbitmq/perf-test
# 基础测试
docker run -it --rm pivotalrabbitmq/perf-test \
--uri amqp://admin:admin123@rabbitmq:5672 \
--producers 5 \
--consumers 5 \
--queue perf-test \
--time 60
# 持久化消息测试
docker run -it --rm pivotalrabbitmq/perf-test \
--uri amqp://admin:admin123@rabbitmq:5672 \
--producers 5 \
--consumers 5 \
--queue perf-test-durable \
--flag persistent \
--time 60
📝 本章小结
| 优化点 | 方法 |
|---|---|
| 生产者 | 批量发送、异步发送、消息压缩 |
| 消费者 | 调整 prefetch、增加并发、批量确认 |
| 服务器 | 内存配置、文件描述符、网络调优 |
下一步
性能优化完成后,需要建立 监控告警 体系!
