性能优化

本章介绍 RabbitMQ 性能优化的关键技巧。

🎯 本章目标

  • 了解影响性能的因素
  • 掌握生产者优化技巧
  • 掌握消费者优化技巧
  • 了解服务器端优化

📊 性能影响因素

因素影响优化方向
消息持久化降低吞吐量非关键消息可不持久化
消息确认增加延迟批量确认
消息大小影响传输速度压缩消息
连接数消耗资源使用连接池
预取数量影响消费速度调整 prefetch

📤 生产者优化

1. 批量发送

@Slf4j
@Service
@RequiredArgsConstructor
public class BatchProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 批量发送消息
     */
    public void sendBatch(List<String> messages) {
        rabbitTemplate.execute(channel -> {
            for (String message : messages) {
                channel.basicPublish(
                    "exchange",
                    "routing.key",
                    null,
                    message.getBytes()
                );
            }
            return null;
        });
        
        log.info("批量发送 {} 条消息", messages.size());
    }
}

2. 异步发送

@Slf4j
@Service
@RequiredArgsConstructor
public class AsyncProducer {
    
    private final RabbitTemplate rabbitTemplate;
    private final ExecutorService executor = Executors.newFixedThreadPool(10);
    
    /**
     * 异步发送消息
     */
    public CompletableFuture<Void> sendAsync(String message) {
        return CompletableFuture.runAsync(() -> {
            rabbitTemplate.convertAndSend("exchange", "key", message);
        }, executor);
    }
    
    /**
     * 批量异步发送
     */
    public CompletableFuture<Void> sendBatchAsync(List<String> messages) {
        List<CompletableFuture<Void>> futures = messages.stream()
            .map(this::sendAsync)
            .collect(Collectors.toList());
        
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    }
}

3. 消息压缩

@Service
public class CompressedProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendCompressed(Object message) throws Exception {
        // 序列化
        byte[] data = objectMapper.writeValueAsBytes(message);
        
        // 压缩
        byte[] compressed = compress(data);
        
        // 发送
        rabbitTemplate.convertAndSend("exchange", "key", compressed, msg -> {
            msg.getMessageProperties().setContentEncoding("gzip");
            return msg;
        });
    }
    
    private byte[] compress(byte[] data) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (GZIPOutputStream gzip = new GZIPOutputStream(baos)) {
            gzip.write(data);
        }
        return baos.toByteArray();
    }
}

📥 消费者优化

1. 调整 Prefetch

spring:
  rabbitmq:
    listener:
      simple:
        # 每次预取消息数量
        prefetch: 50

Prefetch 建议值:

场景建议值
快速处理(<1ms)100-300
一般处理(1-10ms)30-100
慢速处理(>100ms)1-10

2. 增加消费者

spring:
  rabbitmq:
    listener:
      simple:
        # 并发消费者数量
        concurrency: 5
        max-concurrency: 20

3. 批量确认

@Slf4j
@Component
public class BatchAckConsumer {
    
    private final AtomicInteger counter = new AtomicInteger(0);
    private final int BATCH_SIZE = 10;
    
    @RabbitListener(queues = "batch.queue")
    public void handleMessage(String message, Channel channel, Message amqpMessage) 
            throws IOException {
        
        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
        
        // 处理消息
        processMessage(message);
        
        // 批量确认
        if (counter.incrementAndGet() >= BATCH_SIZE) {
            channel.basicAck(deliveryTag, true);  // multiple=true
            counter.set(0);
            log.info("批量确认 {} 条消息", BATCH_SIZE);
        }
    }
}

4. 并行处理

@Slf4j
@Component
public class ParallelConsumer {
    
    private final ExecutorService executor = Executors.newFixedThreadPool(10);
    
    @RabbitListener(queues = "parallel.queue", concurrency = "5-10")
    public void handleMessage(String message, Channel channel, Message amqpMessage) {
        
        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
        
        // 异步处理
        executor.submit(() -> {
            try {
                processMessage(message);
                channel.basicAck(deliveryTag, false);
            } catch (Exception e) {
                try {
                    channel.basicNack(deliveryTag, false, true);
                } catch (IOException ex) {
                    log.error("确认失败", ex);
                }
            }
        });
    }
}

🖥️ 服务器优化

1. 内存配置

# 修改配置文件 /etc/rabbitmq/rabbitmq.conf

# 内存水位线(使用系统内存的 60%)
vm_memory_high_watermark.relative = 0.6

# 磁盘空间限制
disk_free_limit.relative = 1.5

2. 文件描述符

# 修改 /etc/security/limits.conf
rabbitmq soft nofile 65536
rabbitmq hard nofile 65536

3. 网络优化

# /etc/sysctl.conf
net.core.somaxconn = 4096
net.ipv4.tcp_max_syn_backlog = 4096
net.core.netdev_max_backlog = 4096

📊 性能测试

使用 PerfTest

# 安装
docker pull pivotalrabbitmq/perf-test

# 基础测试
docker run -it --rm pivotalrabbitmq/perf-test \
  --uri amqp://admin:admin123@rabbitmq:5672 \
  --producers 5 \
  --consumers 5 \
  --queue perf-test \
  --time 60

# 持久化消息测试
docker run -it --rm pivotalrabbitmq/perf-test \
  --uri amqp://admin:admin123@rabbitmq:5672 \
  --producers 5 \
  --consumers 5 \
  --queue perf-test-durable \
  --flag persistent \
  --time 60

📝 本章小结

优化点方法
生产者批量发送、异步发送、消息压缩
消费者调整 prefetch、增加并发、批量确认
服务器内存配置、文件描述符、网络调优

下一步

性能优化完成后,需要建立 监控告警 体系!