消息追踪

消息追踪是排查 RabbitMQ 问题的重要手段,可以帮助我们了解消息的完整流转过程。

🎯 本章目标

  • 掌握 RabbitMQ Firehose 消息追踪
  • 学会使用 rabbitmq_tracing 插件
  • 了解生产环境的追踪方案

📖 追踪方式概览

方式说明性能影响适用场景
Firehose内置追踪机制开发调试
rabbitmq_tracing官方插件开发调试
应用层追踪代码埋点生产环境
日志分析分析日志问题排查

🔥 Firehose 追踪

什么是 Firehose

Firehose 是 RabbitMQ 内置的消息追踪机制,会将所有发布和投递的消息副本发送到特定交换机。

                    ┌─────────────────┐
                    │   amq.rabbitmq  │
                    │     .trace      │
                    │   (topic交换机)  │
                    └────────┬────────┘
                             │
         ┌───────────────────┼───────────────────┐
         │                   │                   │
         ▼                   ▼                   ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ publish.#       │ │ deliver.#       │ │ publish.myqueue │
│ (所有发布消息)    │ │ (所有投递消息)    │ │ (指定队列发布)    │
└─────────────────┘ └─────────────────┘ └─────────────────┘

路由键格式

类型格式示例
发布publish.{exchangename}publish.order.exchange
投递deliver.{queuename}deliver.order.queue

启用 Firehose

# 启用 Firehose(全局)
rabbitmqctl trace_on

# 指定 vhost
rabbitmqctl trace_on -p /

# 关闭 Firehose
rabbitmqctl trace_off

消费追踪消息

package com.example.rabbitmq.trace;

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FirehoseConsumer {
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin123");
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        // 声明追踪队列
        String traceQueue = "my.trace.queue";
        channel.queueDeclare(traceQueue, false, false, true, null);
        
        // 绑定到 amq.rabbitmq.trace 交换机
        // publish.# 接收所有发布的消息
        channel.queueBind(traceQueue, "amq.rabbitmq.trace", "publish.#");
        // deliver.# 接收所有投递的消息
        channel.queueBind(traceQueue, "amq.rabbitmq.trace", "deliver.#");
        
        log.info("🔍 开始追踪消息...");
        
        DeliverCallback callback = (consumerTag, delivery) -> {
            String routingKey = delivery.getEnvelope().getRoutingKey();
            String body = new String(delivery.getBody());
            
            // 解析追踪信息
            AMQP.BasicProperties props = delivery.getProperties();
            
            log.info("═══════════════════════════════════════");
            log.info("📌 追踪类型: {}", routingKey.startsWith("publish") ? "发布" : "投递");
            log.info("📌 路由键: {}", routingKey);
            log.info("📌 消息内容: {}", body);
            
            if (props.getHeaders() != null) {
                log.info("📌 Headers:");
                props.getHeaders().forEach((k, v) -> log.info("    {} = {}", k, v));
            }
        };
        
        channel.basicConsume(traceQueue, true, callback, consumerTag -> {});
    }
}

Spring Boot 方式

package com.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FirehoseConfig {
    
    public static final String TRACE_QUEUE = "my.trace.queue";
    
    @Bean
    public Queue traceQueue() {
        return new Queue(TRACE_QUEUE, false, false, true);
    }
    
    /**
     * 绑定到系统追踪交换机 - 追踪所有发布
     */
    @Bean
    public Binding tracePublishBinding() {
        return BindingBuilder.bind(traceQueue())
                .to(new TopicExchange("amq.rabbitmq.trace"))
                .with("publish.#");
    }
    
    /**
     * 绑定到系统追踪交换机 - 追踪所有投递
     */
    @Bean
    public Binding traceDeliverBinding() {
        return BindingBuilder.bind(traceQueue())
                .to(new TopicExchange("amq.rabbitmq.trace"))
                .with("deliver.#");
    }
}
package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.FirehoseConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class TraceConsumer {
    
    @RabbitListener(queues = FirehoseConfig.TRACE_QUEUE)
    public void handleTrace(Message message) {
        String routingKey = message.getMessageProperties().getReceivedRoutingKey();
        String body = new String(message.getBody());
        
        log.info("═══════════════════════════════════════");
        log.info("📌 追踪路由键: {}", routingKey);
        log.info("📌 消息内容: {}", body);
        log.info("📌 Headers: {}", message.getMessageProperties().getHeaders());
    }
}

📊 rabbitmq_tracing 插件

启用插件

# 启用追踪插件
rabbitmq-plugins enable rabbitmq_tracing

# 重启服务生效
systemctl restart rabbitmq-server

通过管理界面配置

  1. 访问管理界面:http://localhost:15672
  2. 进入 AdminTracing
  3. 添加新的 Trace:
    • Name: my-trace
    • Format: TextJSON
    • Max payload bytes: 1000
    • Pattern: #(匹配所有)

通过 API 配置

# 创建追踪
curl -u admin:admin123 -X PUT \
  http://localhost:15672/api/traces/%2f/my-trace \
  -H "Content-Type: application/json" \
  -d '{"format":"json","pattern":"#","max_payload_bytes":1000}'

# 查看追踪列表
curl -u admin:admin123 http://localhost:15672/api/traces/%2f

# 删除追踪
curl -u admin:admin123 -X DELETE \
  http://localhost:15672/api/traces/%2f/my-trace

追踪日志格式

Text 格式:

================================================================================
2024-01-15 10:30:45:678: Message published

Node:         rabbit@localhost
Connection:   127.0.0.1:54321 -> 127.0.0.1:5672
Virtual host: /
User:         admin
Channel:      1
Exchange:     order.exchange
Routing keys: [order.created]
Properties:   [{delivery_mode,2},{headers,[]}]
Payload:      {"orderId":"12345","amount":99.99}

JSON 格式:

{
  "timestamp": "2024-01-15T10:30:45.678Z",
  "type": "published",
  "node": "rabbit@localhost",
  "connection": "127.0.0.1:54321 -> 127.0.0.1:5672",
  "vhost": "/",
  "user": "admin",
  "channel": 1,
  "exchange": "order.exchange",
  "routing_keys": ["order.created"],
  "properties": {
    "delivery_mode": 2
  },
  "payload": "{\"orderId\":\"12345\",\"amount\":99.99}"
}

🏢 生产环境追踪方案

应用层追踪(推荐)

package com.example.rabbitmq.trace;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import java.util.UUID;

@Slf4j
@Service
@RequiredArgsConstructor
public class TracedMessageService {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 发送带追踪信息的消息
     */
    public void sendWithTrace(String exchange, String routingKey, Object message) {
        String traceId = UUID.randomUUID().toString();
        String spanId = UUID.randomUUID().toString().substring(0, 8);
        
        MessagePostProcessor postProcessor = msg -> {
            MessageProperties props = msg.getMessageProperties();
            
            // 添加追踪信息
            props.setHeader("X-Trace-Id", traceId);
            props.setHeader("X-Span-Id", spanId);
            props.setHeader("X-Timestamp", System.currentTimeMillis());
            props.setHeader("X-Source", "order-service");
            
            return msg;
        };
        
        rabbitTemplate.convertAndSend(exchange, routingKey, message, postProcessor);
        
        log.info("📤 发送消息 [TraceId={}] exchange={}, routingKey={}", 
                traceId, exchange, routingKey);
    }
}

消费者追踪

package com.example.rabbitmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class TracedConsumer {
    
    @RabbitListener(queues = "order.queue")
    public void handleMessage(Message message) {
        // 提取追踪信息
        String traceId = getHeader(message, "X-Trace-Id");
        String spanId = getHeader(message, "X-Span-Id");
        Long timestamp = getLongHeader(message, "X-Timestamp");
        String source = getHeader(message, "X-Source");
        
        long latency = timestamp != null ? System.currentTimeMillis() - timestamp : -1;
        
        log.info("📥 收到消息 [TraceId={}] [SpanId={}] 来源={} 延迟={}ms", 
                traceId, spanId, source, latency);
        
        // 处理业务逻辑
        processMessage(new String(message.getBody()), traceId);
    }
    
    private String getHeader(Message message, String key) {
        Object value = message.getMessageProperties().getHeaders().get(key);
        return value != null ? value.toString() : null;
    }
    
    private Long getLongHeader(Message message, String key) {
        Object value = message.getMessageProperties().getHeaders().get(key);
        return value instanceof Number ? ((Number) value).longValue() : null;
    }
    
    private void processMessage(String content, String traceId) {
        log.info("处理消息 [TraceId={}]: {}", traceId, content);
    }
}

集成分布式追踪系统

使用 Sleuth + Zipkin

<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-sleuth-zipkin</artifactId>
</dependency>
# application.yml
spring:
  zipkin:
    base-url: http://localhost:9411
  sleuth:
    sampler:
      probability: 1.0  # 100% 采样
    messaging:
      rabbit:
        enabled: true

使用 Micrometer + Prometheus

package com.example.rabbitmq.metrics;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Slf4j
@Service
public class MessageMetrics {
    
    private final Counter publishCounter;
    private final Counter consumeCounter;
    private final Timer consumeTimer;
    
    public MessageMetrics(MeterRegistry registry) {
        this.publishCounter = Counter.builder("rabbitmq.messages.published")
                .description("发布的消息总数")
                .register(registry);
        
        this.consumeCounter = Counter.builder("rabbitmq.messages.consumed")
                .description("消费的消息总数")
                .register(registry);
        
        this.consumeTimer = Timer.builder("rabbitmq.messages.consume.time")
                .description("消息处理耗时")
                .register(registry);
    }
    
    public void recordPublish() {
        publishCounter.increment();
    }
    
    public void recordConsume(long processingTimeMs) {
        consumeCounter.increment();
        consumeTimer.record(processingTimeMs, TimeUnit.MILLISECONDS);
    }
}

🔍 问题排查实例

场景:消息丢失排查

package com.example.rabbitmq.debug;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j
@Component
public class MessageLossDebugger {
    
    private final RabbitTemplate rabbitTemplate;
    
    public MessageLossDebugger(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    
    @PostConstruct
    public void init() {
        // 发布确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            String id = correlationData != null ? correlationData.getId() : "unknown";
            
            if (ack) {
                log.debug("✅ 消息到达交换机 [id={}]", id);
            } else {
                log.error("❌ 消息未到达交换机 [id={}] 原因: {}", id, cause);
            }
        });
        
        // 消息回退回调
        rabbitTemplate.setReturnsCallback(returned -> {
            log.error("❌ 消息路由失败:");
            log.error("   Exchange: {}", returned.getExchange());
            log.error("   RoutingKey: {}", returned.getRoutingKey());
            log.error("   ReplyCode: {}", returned.getReplyCode());
            log.error("   ReplyText: {}", returned.getReplyText());
            log.error("   Message: {}", new String(returned.getMessage().getBody()));
        });
        
        rabbitTemplate.setMandatory(true);
    }
}

场景:消息延迟排查

package com.example.rabbitmq.debug;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class LatencyDebugger {
    
    @RabbitListener(queues = "debug.queue")
    public void handleMessage(Message message) {
        long publishTime = getLongHeader(message, "X-Publish-Time");
        long now = System.currentTimeMillis();
        long latency = now - publishTime;
        
        if (latency > 1000) {  // 延迟超过 1 秒
            log.warn("⚠️ 消息延迟过高: {}ms", latency);
            log.warn("   消息ID: {}", message.getMessageProperties().getMessageId());
            log.warn("   发布时间: {}", publishTime);
            log.warn("   接收时间: {}", now);
            log.warn("   队列: {}", message.getMessageProperties().getConsumerQueue());
        }
        
        // 处理消息...
    }
    
    private long getLongHeader(Message message, String key) {
        Object value = message.getMessageProperties().getHeaders().get(key);
        return value instanceof Number ? ((Number) value).longValue() : 0;
    }
}

⚠️ 注意事项

1. 性能影响

# Firehose 会显著影响性能
# 仅在开发/测试环境使用

# 生产环境建议使用应用层追踪
# 或采样追踪(只追踪部分消息)

2. 安全考虑

# 追踪消息可能包含敏感信息
# 确保追踪队列的访问权限
rabbitmqctl set_permissions -p / trace_user "" "" "^my\.trace\."

3. 存储管理

# 追踪日志会占用磁盘空间
# 定期清理或设置过期时间

📝 本章小结

方式特点适用场景
Firehose简单直接,性能影响大开发调试
rabbitmq_tracing可视化管理测试环境
应用层追踪灵活可控生产环境
分布式追踪全链路追踪微服务架构

下一步

了解消息追踪后,让我们学习 消息序列化 的最佳实践!