消息追踪
消息追踪是排查 RabbitMQ 问题的重要手段,可以帮助我们了解消息的完整流转过程。
🎯 本章目标
- 掌握 RabbitMQ Firehose 消息追踪
- 学会使用 rabbitmq_tracing 插件
- 了解生产环境的追踪方案
📖 追踪方式概览
| 方式 | 说明 | 性能影响 | 适用场景 |
|---|---|---|---|
| Firehose | 内置追踪机制 | 高 | 开发调试 |
| rabbitmq_tracing | 官方插件 | 高 | 开发调试 |
| 应用层追踪 | 代码埋点 | 低 | 生产环境 |
| 日志分析 | 分析日志 | 无 | 问题排查 |
🔥 Firehose 追踪
什么是 Firehose
Firehose 是 RabbitMQ 内置的消息追踪机制,会将所有发布和投递的消息副本发送到特定交换机。
┌─────────────────┐
│ amq.rabbitmq │
│ .trace │
│ (topic交换机) │
└────────┬────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ publish.# │ │ deliver.# │ │ publish.myqueue │
│ (所有发布消息) │ │ (所有投递消息) │ │ (指定队列发布) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
路由键格式
| 类型 | 格式 | 示例 |
|---|---|---|
| 发布 | publish.{exchangename} | publish.order.exchange |
| 投递 | deliver.{queuename} | deliver.order.queue |
启用 Firehose
# 启用 Firehose(全局)
rabbitmqctl trace_on
# 指定 vhost
rabbitmqctl trace_on -p /
# 关闭 Firehose
rabbitmqctl trace_off
消费追踪消息
package com.example.rabbitmq.trace;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FirehoseConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明追踪队列
String traceQueue = "my.trace.queue";
channel.queueDeclare(traceQueue, false, false, true, null);
// 绑定到 amq.rabbitmq.trace 交换机
// publish.# 接收所有发布的消息
channel.queueBind(traceQueue, "amq.rabbitmq.trace", "publish.#");
// deliver.# 接收所有投递的消息
channel.queueBind(traceQueue, "amq.rabbitmq.trace", "deliver.#");
log.info("🔍 开始追踪消息...");
DeliverCallback callback = (consumerTag, delivery) -> {
String routingKey = delivery.getEnvelope().getRoutingKey();
String body = new String(delivery.getBody());
// 解析追踪信息
AMQP.BasicProperties props = delivery.getProperties();
log.info("═══════════════════════════════════════");
log.info("📌 追踪类型: {}", routingKey.startsWith("publish") ? "发布" : "投递");
log.info("📌 路由键: {}", routingKey);
log.info("📌 消息内容: {}", body);
if (props.getHeaders() != null) {
log.info("📌 Headers:");
props.getHeaders().forEach((k, v) -> log.info(" {} = {}", k, v));
}
};
channel.basicConsume(traceQueue, true, callback, consumerTag -> {});
}
}
Spring Boot 方式
package com.example.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FirehoseConfig {
public static final String TRACE_QUEUE = "my.trace.queue";
@Bean
public Queue traceQueue() {
return new Queue(TRACE_QUEUE, false, false, true);
}
/**
* 绑定到系统追踪交换机 - 追踪所有发布
*/
@Bean
public Binding tracePublishBinding() {
return BindingBuilder.bind(traceQueue())
.to(new TopicExchange("amq.rabbitmq.trace"))
.with("publish.#");
}
/**
* 绑定到系统追踪交换机 - 追踪所有投递
*/
@Bean
public Binding traceDeliverBinding() {
return BindingBuilder.bind(traceQueue())
.to(new TopicExchange("amq.rabbitmq.trace"))
.with("deliver.#");
}
}
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.config.FirehoseConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TraceConsumer {
@RabbitListener(queues = FirehoseConfig.TRACE_QUEUE)
public void handleTrace(Message message) {
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
String body = new String(message.getBody());
log.info("═══════════════════════════════════════");
log.info("📌 追踪路由键: {}", routingKey);
log.info("📌 消息内容: {}", body);
log.info("📌 Headers: {}", message.getMessageProperties().getHeaders());
}
}
📊 rabbitmq_tracing 插件
启用插件
# 启用追踪插件
rabbitmq-plugins enable rabbitmq_tracing
# 重启服务生效
systemctl restart rabbitmq-server
通过管理界面配置
- 访问管理界面:
http://localhost:15672 - 进入 Admin → Tracing
- 添加新的 Trace:
- Name:
my-trace - Format:
Text或JSON - Max payload bytes:
1000 - Pattern:
#(匹配所有)
- Name:
通过 API 配置
# 创建追踪
curl -u admin:admin123 -X PUT \
http://localhost:15672/api/traces/%2f/my-trace \
-H "Content-Type: application/json" \
-d '{"format":"json","pattern":"#","max_payload_bytes":1000}'
# 查看追踪列表
curl -u admin:admin123 http://localhost:15672/api/traces/%2f
# 删除追踪
curl -u admin:admin123 -X DELETE \
http://localhost:15672/api/traces/%2f/my-trace
追踪日志格式
Text 格式:
================================================================================
2024-01-15 10:30:45:678: Message published
Node: rabbit@localhost
Connection: 127.0.0.1:54321 -> 127.0.0.1:5672
Virtual host: /
User: admin
Channel: 1
Exchange: order.exchange
Routing keys: [order.created]
Properties: [{delivery_mode,2},{headers,[]}]
Payload: {"orderId":"12345","amount":99.99}
JSON 格式:
{
"timestamp": "2024-01-15T10:30:45.678Z",
"type": "published",
"node": "rabbit@localhost",
"connection": "127.0.0.1:54321 -> 127.0.0.1:5672",
"vhost": "/",
"user": "admin",
"channel": 1,
"exchange": "order.exchange",
"routing_keys": ["order.created"],
"properties": {
"delivery_mode": 2
},
"payload": "{\"orderId\":\"12345\",\"amount\":99.99}"
}
🏢 生产环境追踪方案
应用层追踪(推荐)
package com.example.rabbitmq.trace;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Slf4j
@Service
@RequiredArgsConstructor
public class TracedMessageService {
private final RabbitTemplate rabbitTemplate;
/**
* 发送带追踪信息的消息
*/
public void sendWithTrace(String exchange, String routingKey, Object message) {
String traceId = UUID.randomUUID().toString();
String spanId = UUID.randomUUID().toString().substring(0, 8);
MessagePostProcessor postProcessor = msg -> {
MessageProperties props = msg.getMessageProperties();
// 添加追踪信息
props.setHeader("X-Trace-Id", traceId);
props.setHeader("X-Span-Id", spanId);
props.setHeader("X-Timestamp", System.currentTimeMillis());
props.setHeader("X-Source", "order-service");
return msg;
};
rabbitTemplate.convertAndSend(exchange, routingKey, message, postProcessor);
log.info("📤 发送消息 [TraceId={}] exchange={}, routingKey={}",
traceId, exchange, routingKey);
}
}
消费者追踪
package com.example.rabbitmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TracedConsumer {
@RabbitListener(queues = "order.queue")
public void handleMessage(Message message) {
// 提取追踪信息
String traceId = getHeader(message, "X-Trace-Id");
String spanId = getHeader(message, "X-Span-Id");
Long timestamp = getLongHeader(message, "X-Timestamp");
String source = getHeader(message, "X-Source");
long latency = timestamp != null ? System.currentTimeMillis() - timestamp : -1;
log.info("📥 收到消息 [TraceId={}] [SpanId={}] 来源={} 延迟={}ms",
traceId, spanId, source, latency);
// 处理业务逻辑
processMessage(new String(message.getBody()), traceId);
}
private String getHeader(Message message, String key) {
Object value = message.getMessageProperties().getHeaders().get(key);
return value != null ? value.toString() : null;
}
private Long getLongHeader(Message message, String key) {
Object value = message.getMessageProperties().getHeaders().get(key);
return value instanceof Number ? ((Number) value).longValue() : null;
}
private void processMessage(String content, String traceId) {
log.info("处理消息 [TraceId={}]: {}", traceId, content);
}
}
集成分布式追踪系统
使用 Sleuth + Zipkin
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth-zipkin</artifactId>
</dependency>
# application.yml
spring:
zipkin:
base-url: http://localhost:9411
sleuth:
sampler:
probability: 1.0 # 100% 采样
messaging:
rabbit:
enabled: true
使用 Micrometer + Prometheus
package com.example.rabbitmq.metrics;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class MessageMetrics {
private final Counter publishCounter;
private final Counter consumeCounter;
private final Timer consumeTimer;
public MessageMetrics(MeterRegistry registry) {
this.publishCounter = Counter.builder("rabbitmq.messages.published")
.description("发布的消息总数")
.register(registry);
this.consumeCounter = Counter.builder("rabbitmq.messages.consumed")
.description("消费的消息总数")
.register(registry);
this.consumeTimer = Timer.builder("rabbitmq.messages.consume.time")
.description("消息处理耗时")
.register(registry);
}
public void recordPublish() {
publishCounter.increment();
}
public void recordConsume(long processingTimeMs) {
consumeCounter.increment();
consumeTimer.record(processingTimeMs, TimeUnit.MILLISECONDS);
}
}
🔍 问题排查实例
场景:消息丢失排查
package com.example.rabbitmq.debug;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class MessageLossDebugger {
private final RabbitTemplate rabbitTemplate;
public MessageLossDebugger(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@PostConstruct
public void init() {
// 发布确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String id = correlationData != null ? correlationData.getId() : "unknown";
if (ack) {
log.debug("✅ 消息到达交换机 [id={}]", id);
} else {
log.error("❌ 消息未到达交换机 [id={}] 原因: {}", id, cause);
}
});
// 消息回退回调
rabbitTemplate.setReturnsCallback(returned -> {
log.error("❌ 消息路由失败:");
log.error(" Exchange: {}", returned.getExchange());
log.error(" RoutingKey: {}", returned.getRoutingKey());
log.error(" ReplyCode: {}", returned.getReplyCode());
log.error(" ReplyText: {}", returned.getReplyText());
log.error(" Message: {}", new String(returned.getMessage().getBody()));
});
rabbitTemplate.setMandatory(true);
}
}
场景:消息延迟排查
package com.example.rabbitmq.debug;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class LatencyDebugger {
@RabbitListener(queues = "debug.queue")
public void handleMessage(Message message) {
long publishTime = getLongHeader(message, "X-Publish-Time");
long now = System.currentTimeMillis();
long latency = now - publishTime;
if (latency > 1000) { // 延迟超过 1 秒
log.warn("⚠️ 消息延迟过高: {}ms", latency);
log.warn(" 消息ID: {}", message.getMessageProperties().getMessageId());
log.warn(" 发布时间: {}", publishTime);
log.warn(" 接收时间: {}", now);
log.warn(" 队列: {}", message.getMessageProperties().getConsumerQueue());
}
// 处理消息...
}
private long getLongHeader(Message message, String key) {
Object value = message.getMessageProperties().getHeaders().get(key);
return value instanceof Number ? ((Number) value).longValue() : 0;
}
}
⚠️ 注意事项
1. 性能影响
# Firehose 会显著影响性能
# 仅在开发/测试环境使用
# 生产环境建议使用应用层追踪
# 或采样追踪(只追踪部分消息)
2. 安全考虑
# 追踪消息可能包含敏感信息
# 确保追踪队列的访问权限
rabbitmqctl set_permissions -p / trace_user "" "" "^my\.trace\."
3. 存储管理
# 追踪日志会占用磁盘空间
# 定期清理或设置过期时间
📝 本章小结
| 方式 | 特点 | 适用场景 |
|---|---|---|
| Firehose | 简单直接,性能影响大 | 开发调试 |
| rabbitmq_tracing | 可视化管理 | 测试环境 |
| 应用层追踪 | 灵活可控 | 生产环境 |
| 分布式追踪 | 全链路追踪 | 微服务架构 |
下一步
了解消息追踪后,让我们学习 消息序列化 的最佳实践!
