监控告警
完善的监控体系是保障系统稳定运行的关键。
🎯 本章目标
- 了解 RabbitMQ 监控指标
- 掌握监控工具的使用
- 建立告警机制
📊 核心监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| 消息堆积 | 队列中待消费消息数 | > 10000 |
| 内存使用 | 节点内存使用率 | > 80% |
| 磁盘空间 | 剩余磁盘空间 | < 20% |
| 连接数 | 客户端连接数量 | > 1000 |
| 通道数 | 通道数量 | > 5000 |
| 消费速率 | 消息消费速度 | 下降 50% |
🔧 管理 API
获取概览信息
curl -u admin:admin123 http://localhost:15672/api/overview
获取队列信息
# 所有队列
curl -u admin:admin123 http://localhost:15672/api/queues
# 指定队列
curl -u admin:admin123 http://localhost:15672/api/queues/%2F/my.queue
Java 监控服务
@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitMQMonitorService {
private final RestTemplate restTemplate;
private static final String BASE_URL = "http://localhost:15672/api";
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin123";
/**
* 获取队列消息数
*/
public int getQueueMessageCount(String queueName) {
String url = BASE_URL + "/queues/%2F/" + queueName;
HttpHeaders headers = createAuthHeaders();
HttpEntity<String> entity = new HttpEntity<>(headers);
try {
ResponseEntity<Map> response = restTemplate.exchange(
url, HttpMethod.GET, entity, Map.class);
Map body = response.getBody();
return (Integer) body.getOrDefault("messages", 0);
} catch (Exception e) {
log.error("获取队列信息失败: {}", queueName, e);
return -1;
}
}
/**
* 获取节点内存使用率
*/
public double getMemoryUsage() {
String url = BASE_URL + "/nodes";
HttpHeaders headers = createAuthHeaders();
HttpEntity<String> entity = new HttpEntity<>(headers);
try {
ResponseEntity<List> response = restTemplate.exchange(
url, HttpMethod.GET, entity, List.class);
List<Map> nodes = response.getBody();
if (nodes != null && !nodes.isEmpty()) {
Map node = nodes.get(0);
long memUsed = ((Number) node.get("mem_used")).longValue();
long memLimit = ((Number) node.get("mem_limit")).longValue();
return (double) memUsed / memLimit * 100;
}
} catch (Exception e) {
log.error("获取内存信息失败", e);
}
return -1;
}
/**
* 检查健康状态
*/
public boolean isHealthy() {
String url = BASE_URL + "/health/checks/alarms";
try {
HttpHeaders headers = createAuthHeaders();
HttpEntity<String> entity = new HttpEntity<>(headers);
ResponseEntity<Map> response = restTemplate.exchange(
url, HttpMethod.GET, entity, Map.class);
return response.getStatusCode().is2xxSuccessful();
} catch (Exception e) {
return false;
}
}
private HttpHeaders createAuthHeaders() {
HttpHeaders headers = new HttpHeaders();
String auth = USERNAME + ":" + PASSWORD;
String encodedAuth = Base64.getEncoder().encodeToString(auth.getBytes());
headers.add("Authorization", "Basic " + encodedAuth);
return headers;
}
}
📈 Prometheus + Grafana
启用 Prometheus 插件
rabbitmq-plugins enable rabbitmq_prometheus
Prometheus 配置
# prometheus.yml
scrape_configs:
- job_name: 'rabbitmq'
static_configs:
- targets: ['rabbitmq1:15692', 'rabbitmq2:15692', 'rabbitmq3:15692']
metrics_path: /metrics
常用 Prometheus 指标
# 队列消息数
rabbitmq_queue_messages
# 消息发布速率
rabbitmq_channel_messages_published_total
# 消息消费速率
rabbitmq_channel_messages_delivered_total
# 连接数
rabbitmq_connections
# 内存使用
rabbitmq_process_resident_memory_bytes
Grafana Dashboard
导入 RabbitMQ 官方 Dashboard:
- Dashboard ID: 10991
⚠️ 告警配置
告警规则
# alertmanager.yml
groups:
- name: rabbitmq
rules:
# 消息堆积告警
- alert: RabbitMQQueueMessages
expr: rabbitmq_queue_messages > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "队列消息堆积"
description: "队列 {{ $labels.queue }} 消息数: {{ $value }}"
# 内存告警
- alert: RabbitMQMemoryHigh
expr: rabbitmq_process_resident_memory_bytes / rabbitmq_resident_memory_limit_bytes > 0.8
for: 5m
labels:
severity: critical
annotations:
summary: "RabbitMQ 内存使用过高"
description: "内存使用率: {{ $value | humanizePercentage }}"
# 节点宕机
- alert: RabbitMQNodeDown
expr: up{job="rabbitmq"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "RabbitMQ 节点宕机"
description: "节点 {{ $labels.instance }} 不可用"
定时检查任务
@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMQHealthCheck {
private final RabbitMQMonitorService monitorService;
private final AlertService alertService;
@Scheduled(fixedRate = 60000) // 每分钟检查
public void checkHealth() {
// 检查服务状态
if (!monitorService.isHealthy()) {
alertService.sendAlert("RabbitMQ 服务异常", AlertLevel.CRITICAL);
return;
}
// 检查内存
double memoryUsage = monitorService.getMemoryUsage();
if (memoryUsage > 80) {
alertService.sendAlert(
String.format("RabbitMQ 内存使用率: %.2f%%", memoryUsage),
AlertLevel.WARNING
);
}
// 检查队列堆积
checkQueueBacklog("order.queue", 10000);
checkQueueBacklog("payment.queue", 5000);
}
private void checkQueueBacklog(String queue, int threshold) {
int messageCount = monitorService.getQueueMessageCount(queue);
if (messageCount > threshold) {
alertService.sendAlert(
String.format("队列 %s 消息堆积: %d", queue, messageCount),
AlertLevel.WARNING
);
}
}
}
📝 本章小结
| 监控内容 | 工具 |
|---|---|
| 基础指标 | Management API |
| 详细指标 | Prometheus |
| 可视化 | Grafana |
| 告警 | AlertManager |
下一步
监控体系建立后,请阅读 最佳实践 总结生产经验!
