监控告警

完善的监控体系是保障系统稳定运行的关键。

🎯 本章目标

  • 了解 RabbitMQ 监控指标
  • 掌握监控工具的使用
  • 建立告警机制

📊 核心监控指标

指标说明告警阈值
消息堆积队列中待消费消息数> 10000
内存使用节点内存使用率> 80%
磁盘空间剩余磁盘空间< 20%
连接数客户端连接数量> 1000
通道数通道数量> 5000
消费速率消息消费速度下降 50%

🔧 管理 API

获取概览信息

curl -u admin:admin123 http://localhost:15672/api/overview

获取队列信息

# 所有队列
curl -u admin:admin123 http://localhost:15672/api/queues

# 指定队列
curl -u admin:admin123 http://localhost:15672/api/queues/%2F/my.queue

Java 监控服务

@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitMQMonitorService {
    
    private final RestTemplate restTemplate;
    
    private static final String BASE_URL = "http://localhost:15672/api";
    private static final String USERNAME = "admin";
    private static final String PASSWORD = "admin123";
    
    /**
     * 获取队列消息数
     */
    public int getQueueMessageCount(String queueName) {
        String url = BASE_URL + "/queues/%2F/" + queueName;
        
        HttpHeaders headers = createAuthHeaders();
        HttpEntity<String> entity = new HttpEntity<>(headers);
        
        try {
            ResponseEntity<Map> response = restTemplate.exchange(
                url, HttpMethod.GET, entity, Map.class);
            
            Map body = response.getBody();
            return (Integer) body.getOrDefault("messages", 0);
            
        } catch (Exception e) {
            log.error("获取队列信息失败: {}", queueName, e);
            return -1;
        }
    }
    
    /**
     * 获取节点内存使用率
     */
    public double getMemoryUsage() {
        String url = BASE_URL + "/nodes";
        
        HttpHeaders headers = createAuthHeaders();
        HttpEntity<String> entity = new HttpEntity<>(headers);
        
        try {
            ResponseEntity<List> response = restTemplate.exchange(
                url, HttpMethod.GET, entity, List.class);
            
            List<Map> nodes = response.getBody();
            if (nodes != null && !nodes.isEmpty()) {
                Map node = nodes.get(0);
                long memUsed = ((Number) node.get("mem_used")).longValue();
                long memLimit = ((Number) node.get("mem_limit")).longValue();
                return (double) memUsed / memLimit * 100;
            }
            
        } catch (Exception e) {
            log.error("获取内存信息失败", e);
        }
        return -1;
    }
    
    /**
     * 检查健康状态
     */
    public boolean isHealthy() {
        String url = BASE_URL + "/health/checks/alarms";
        
        try {
            HttpHeaders headers = createAuthHeaders();
            HttpEntity<String> entity = new HttpEntity<>(headers);
            
            ResponseEntity<Map> response = restTemplate.exchange(
                url, HttpMethod.GET, entity, Map.class);
            
            return response.getStatusCode().is2xxSuccessful();
            
        } catch (Exception e) {
            return false;
        }
    }
    
    private HttpHeaders createAuthHeaders() {
        HttpHeaders headers = new HttpHeaders();
        String auth = USERNAME + ":" + PASSWORD;
        String encodedAuth = Base64.getEncoder().encodeToString(auth.getBytes());
        headers.add("Authorization", "Basic " + encodedAuth);
        return headers;
    }
}

📈 Prometheus + Grafana

启用 Prometheus 插件

rabbitmq-plugins enable rabbitmq_prometheus

Prometheus 配置

# prometheus.yml
scrape_configs:
  - job_name: 'rabbitmq'
    static_configs:
      - targets: ['rabbitmq1:15692', 'rabbitmq2:15692', 'rabbitmq3:15692']
    metrics_path: /metrics

常用 Prometheus 指标

# 队列消息数
rabbitmq_queue_messages

# 消息发布速率
rabbitmq_channel_messages_published_total

# 消息消费速率
rabbitmq_channel_messages_delivered_total

# 连接数
rabbitmq_connections

# 内存使用
rabbitmq_process_resident_memory_bytes

Grafana Dashboard

导入 RabbitMQ 官方 Dashboard:

  • Dashboard ID: 10991

⚠️ 告警配置

告警规则

# alertmanager.yml
groups:
  - name: rabbitmq
    rules:
      # 消息堆积告警
      - alert: RabbitMQQueueMessages
        expr: rabbitmq_queue_messages > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "队列消息堆积"
          description: "队列 {{ $labels.queue }} 消息数: {{ $value }}"
      
      # 内存告警
      - alert: RabbitMQMemoryHigh
        expr: rabbitmq_process_resident_memory_bytes / rabbitmq_resident_memory_limit_bytes > 0.8
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "RabbitMQ 内存使用过高"
          description: "内存使用率: {{ $value | humanizePercentage }}"
      
      # 节点宕机
      - alert: RabbitMQNodeDown
        expr: up{job="rabbitmq"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "RabbitMQ 节点宕机"
          description: "节点 {{ $labels.instance }} 不可用"

定时检查任务

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMQHealthCheck {
    
    private final RabbitMQMonitorService monitorService;
    private final AlertService alertService;
    
    @Scheduled(fixedRate = 60000)  // 每分钟检查
    public void checkHealth() {
        // 检查服务状态
        if (!monitorService.isHealthy()) {
            alertService.sendAlert("RabbitMQ 服务异常", AlertLevel.CRITICAL);
            return;
        }
        
        // 检查内存
        double memoryUsage = monitorService.getMemoryUsage();
        if (memoryUsage > 80) {
            alertService.sendAlert(
                String.format("RabbitMQ 内存使用率: %.2f%%", memoryUsage),
                AlertLevel.WARNING
            );
        }
        
        // 检查队列堆积
        checkQueueBacklog("order.queue", 10000);
        checkQueueBacklog("payment.queue", 5000);
    }
    
    private void checkQueueBacklog(String queue, int threshold) {
        int messageCount = monitorService.getQueueMessageCount(queue);
        if (messageCount > threshold) {
            alertService.sendAlert(
                String.format("队列 %s 消息堆积: %d", queue, messageCount),
                AlertLevel.WARNING
            );
        }
    }
}

📝 本章小结

监控内容工具
基础指标Management API
详细指标Prometheus
可视化Grafana
告警AlertManager

下一步

监控体系建立后,请阅读 最佳实践 总结生产经验!