消息序列化

消息序列化是将对象转换为字节流的过程,选择合适的序列化方式对性能和兼容性至关重要。

🎯 本章目标

  • 了解常见的序列化方式及其优缺点
  • 掌握 Spring Boot 中配置不同序列化器
  • 学会根据场景选择合适的序列化方案

📊 序列化方式对比

方式性能可读性跨语言体积推荐场景
JDK 序列化⭐⭐不推荐
JSON⭐⭐⭐通用场景
Protobuf⭐⭐⭐⭐⭐高性能场景
Avro⭐⭐⭐⭐大数据场景
MessagePack⭐⭐⭐⭐高性能场景

📦 JDK 序列化(默认,不推荐)

Spring AMQP 默认使用 JDK 序列化,存在以下问题:

// 默认的 SimpleMessageConverter 使用 JDK 序列化
// 问题:
// 1. 性能差
// 2. 序列化后体积大
// 3. 不支持跨语言
// 4. 存在安全漏洞风险

// ❌ 不推荐
rabbitTemplate.convertAndSend("queue", new Order()); // 使用 JDK 序列化

🌟 JSON 序列化(推荐)

配置 Jackson 序列化器

package com.example.rabbitmq.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQJsonConfig {
    
    /**
     * 自定义 ObjectMapper
     */
    @Bean
    public ObjectMapper objectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        
        // 支持 Java 8 日期时间
        mapper.registerModule(new JavaTimeModule());
        
        // 日期格式化为 ISO 格式
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        
        // 忽略未知属性(向后兼容)
        mapper.configure(
            com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
            false
        );
        
        return mapper;
    }
    
    /**
     * JSON 消息转换器
     */
    @Bean
    public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
        return new Jackson2JsonMessageConverter(objectMapper);
    }
    
    /**
     * 配置 RabbitTemplate 使用 JSON 转换器
     */
    @Bean
    public RabbitTemplate rabbitTemplate(
            ConnectionFactory connectionFactory,
            MessageConverter jsonMessageConverter) {
        
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter);
        return template;
    }
}

使用示例

package com.example.rabbitmq.model;

import lombok.Data;
import java.time.LocalDateTime;
import java.math.BigDecimal;

@Data
public class Order {
    private String orderId;
    private String userId;
    private BigDecimal amount;
    private LocalDateTime createTime;
    private OrderStatus status;
    
    public enum OrderStatus {
        CREATED, PAID, SHIPPED, COMPLETED
    }
}
package com.example.rabbitmq.producer;

import com.example.rabbitmq.model.Order;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.time.LocalDateTime;

@Slf4j
@Service
@RequiredArgsConstructor
public class JsonProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    public void sendOrder(Order order) {
        // 自动使用 JSON 序列化
        rabbitTemplate.convertAndSend("order.exchange", "order.created", order);
        log.info("📤 发送订单: {}", order);
    }
}
package com.example.rabbitmq.consumer;

import com.example.rabbitmq.model.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class JsonConsumer {
    
    @RabbitListener(queues = "order.queue")
    public void handleOrder(Order order) {
        // 自动从 JSON 反序列化
        log.info("📥 收到订单: {}", order);
    }
}

序列化后的消息格式

{
  "orderId": "ORD-20240115-001",
  "userId": "U123456",
  "amount": 99.99,
  "createTime": "2024-01-15T10:30:00",
  "status": "CREATED"
}

🚀 Protobuf 序列化(高性能)

添加依赖

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.25.1</version>
</dependency>

<!-- Maven 插件 -->
<plugin>
    <groupId>org.xolstice.maven.plugins</groupId>
    <artifactId>protobuf-maven-plugin</artifactId>
    <version>0.6.1</version>
    <configuration>
        <protocArtifact>com.google.protobuf:protoc:3.25.1:exe:${os.detected.classifier}</protocArtifact>
    </configuration>
    <executions>
        <execution>
            <goals>
                <goal>compile</goal>
            </goals>
        </execution>
    </executions>
</plugin>

定义 Proto 文件

// src/main/proto/order.proto
syntax = "proto3";

package com.example.rabbitmq.proto;

option java_package = "com.example.rabbitmq.proto";
option java_outer_classname = "OrderProto";

message Order {
    string order_id = 1;
    string user_id = 2;
    double amount = 3;
    int64 create_time = 4;
    OrderStatus status = 5;
    
    enum OrderStatus {
        CREATED = 0;
        PAID = 1;
        SHIPPED = 2;
        COMPLETED = 3;
    }
}

Protobuf 消息转换器

package com.example.rabbitmq.config;

import com.example.rabbitmq.proto.OrderProto;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

public class ProtobufMessageConverter implements MessageConverter {
    
    private static final String CONTENT_TYPE = "application/x-protobuf";
    
    @Override
    public Message toMessage(Object object, MessageProperties props) 
            throws MessageConversionException {
        
        if (object instanceof OrderProto.Order) {
            OrderProto.Order order = (OrderProto.Order) object;
            
            props.setContentType(CONTENT_TYPE);
            props.setHeader("x-protobuf-class", OrderProto.Order.class.getName());
            
            return new Message(order.toByteArray(), props);
        }
        
        throw new MessageConversionException("不支持的消息类型: " + object.getClass());
    }
    
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String className = (String) message.getMessageProperties()
                .getHeaders().get("x-protobuf-class");
        
        try {
            if (OrderProto.Order.class.getName().equals(className)) {
                return OrderProto.Order.parseFrom(message.getBody());
            }
        } catch (InvalidProtocolBufferException e) {
            throw new MessageConversionException("Protobuf 反序列化失败", e);
        }
        
        throw new MessageConversionException("不支持的 Protobuf 类型: " + className);
    }
}

使用示例

package com.example.rabbitmq.producer;

import com.example.rabbitmq.proto.OrderProto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class ProtobufProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    public void sendOrder() {
        OrderProto.Order order = OrderProto.Order.newBuilder()
                .setOrderId("ORD-001")
                .setUserId("U123")
                .setAmount(99.99)
                .setCreateTime(System.currentTimeMillis())
                .setStatus(OrderProto.Order.OrderStatus.CREATED)
                .build();
        
        rabbitTemplate.convertAndSend("protobuf.exchange", "protobuf.key", order);
        log.info("📤 发送 Protobuf 消息: {}", order);
    }
}

📝 多类型消息处理

类型识别转换器

package com.example.rabbitmq.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;

public class TypeAwareJsonConverter extends Jackson2JsonMessageConverter {
    
    public TypeAwareJsonConverter(ObjectMapper objectMapper) {
        super(objectMapper);
    }
    
    @Override
    public Message toMessage(Object object, MessageProperties props) 
            throws MessageConversionException {
        
        // 添加类型信息到 Header
        props.setHeader("__TypeId__", object.getClass().getName());
        
        return super.toMessage(object, props);
    }
    
    @Override
    protected Message createMessage(Object object, MessageProperties props) {
        props.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        return super.createMessage(object, props);
    }
}

配置多类型支持

package com.example.rabbitmq.config;

import com.example.rabbitmq.model.Order;
import com.example.rabbitmq.model.Payment;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MultiTypeMessageConfig {
    
    @Bean
    public MessageConverter multiTypeConverter(ObjectMapper objectMapper) {
        Jackson2JsonMessageConverter converter = 
                new Jackson2JsonMessageConverter(objectMapper);
        
        // 配置类型映射
        DefaultClassMapper classMapper = new DefaultClassMapper();
        
        Map<String, Class<?>> idClassMapping = new HashMap<>();
        idClassMapping.put("order", Order.class);
        idClassMapping.put("payment", Payment.class);
        
        classMapper.setIdClassMapping(idClassMapping);
        classMapper.setTrustedPackages("com.example.rabbitmq.model");
        
        converter.setClassMapper(classMapper);
        
        return converter;
    }
}

🔄 版本兼容性处理

向后兼容的消息定义

package com.example.rabbitmq.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;

/**
 * 使用 @JsonIgnoreProperties 忽略未知字段
 * 确保新版本生产的消息可以被旧版本消费
 */
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Order {
    private String orderId;
    private String userId;
    private Double amount;
    
    // 新增字段使用默认值
    private String remark = "";
    
    // 新增字段可以为 null
    private String couponCode;
}

版本控制策略

package com.example.rabbitmq.model;

import lombok.Data;

@Data
public class VersionedMessage<T> {
    private String version;  // 消息版本号
    private String type;     // 消息类型
    private T payload;       // 消息内容
    private long timestamp;  // 时间戳
}
package com.example.rabbitmq.consumer;

import com.example.rabbitmq.model.Order;
import com.example.rabbitmq.model.VersionedMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class VersionAwareConsumer {
    
    @RabbitListener(queues = "versioned.queue")
    public void handleVersionedMessage(VersionedMessage<Order> message) {
        String version = message.getVersion();
        
        switch (version) {
            case "1.0":
                handleV1(message.getPayload());
                break;
            case "2.0":
                handleV2(message.getPayload());
                break;
            default:
                log.warn("未知消息版本: {}", version);
        }
    }
    
    private void handleV1(Order order) {
        log.info("处理 V1 版本订单: {}", order);
    }
    
    private void handleV2(Order order) {
        log.info("处理 V2 版本订单: {}", order);
    }
}

📊 性能测试对比

package com.example.rabbitmq.benchmark;

import com.example.rabbitmq.model.Order;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;

import java.io.*;

@Slf4j
public class SerializationBenchmark {
    
    private static final int ITERATIONS = 100000;
    
    public static void main(String[] args) throws Exception {
        Order order = createOrder();
        
        // JDK 序列化
        benchmarkJdk(order);
        
        // JSON 序列化
        benchmarkJson(order);
    }
    
    private static void benchmarkJdk(Order order) throws Exception {
        long start = System.currentTimeMillis();
        byte[] data = null;
        
        for (int i = 0; i < ITERATIONS; i++) {
            // 序列化
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(bos);
            oos.writeObject(order);
            data = bos.toByteArray();
            
            // 反序列化
            ByteArrayInputStream bis = new ByteArrayInputStream(data);
            ObjectInputStream ois = new ObjectInputStream(bis);
            ois.readObject();
        }
        
        log.info("JDK 序列化: {}ms, 大小: {} bytes", 
                System.currentTimeMillis() - start, data.length);
    }
    
    private static void benchmarkJson(Order order) throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        
        long start = System.currentTimeMillis();
        byte[] data = null;
        
        for (int i = 0; i < ITERATIONS; i++) {
            // 序列化
            data = mapper.writeValueAsBytes(order);
            
            // 反序列化
            mapper.readValue(data, Order.class);
        }
        
        log.info("JSON 序列化: {}ms, 大小: {} bytes", 
                System.currentTimeMillis() - start, data.length);
    }
    
    private static Order createOrder() {
        Order order = new Order();
        order.setOrderId("ORD-001");
        order.setUserId("U123");
        order.setAmount(99.99);
        return order;
    }
}

测试结果示例

方式10万次耗时消息大小
JDK8500ms285 bytes
JSON2100ms89 bytes
Protobuf580ms23 bytes

📝 本章小结

要点说明
默认序列化JDK 序列化,不推荐使用
推荐方案JSON(通用)或 Protobuf(高性能)
版本兼容使用 @JsonIgnoreProperties
类型识别通过 Header 或 ClassMapper
性能差异Protobuf > JSON > JDK

选型建议

场景推荐方案
内部系统JSON(易于调试)
高并发系统Protobuf(高性能)
跨语言系统JSON 或 Protobuf
大数据场景Avro

下一步

进阶篇学习完成!接下来进入 高级篇 学习集群和运维知识!