消息序列化
消息序列化是将对象转换为字节流的过程,选择合适的序列化方式对性能和兼容性至关重要。
🎯 本章目标
- 了解常见的序列化方式及其优缺点
- 掌握 Spring Boot 中配置不同序列化器
- 学会根据场景选择合适的序列化方案
📊 序列化方式对比
| 方式 | 性能 | 可读性 | 跨语言 | 体积 | 推荐场景 |
|---|---|---|---|---|---|
| JDK 序列化 | ⭐⭐ | ❌ | ❌ | 大 | 不推荐 |
| JSON | ⭐⭐⭐ | ✅ | ✅ | 中 | 通用场景 |
| Protobuf | ⭐⭐⭐⭐⭐ | ❌ | ✅ | 小 | 高性能场景 |
| Avro | ⭐⭐⭐⭐ | ❌ | ✅ | 小 | 大数据场景 |
| MessagePack | ⭐⭐⭐⭐ | ❌ | ✅ | 小 | 高性能场景 |
📦 JDK 序列化(默认,不推荐)
Spring AMQP 默认使用 JDK 序列化,存在以下问题:
// 默认的 SimpleMessageConverter 使用 JDK 序列化
// 问题:
// 1. 性能差
// 2. 序列化后体积大
// 3. 不支持跨语言
// 4. 存在安全漏洞风险
// ❌ 不推荐
rabbitTemplate.convertAndSend("queue", new Order()); // 使用 JDK 序列化
🌟 JSON 序列化(推荐)
配置 Jackson 序列化器
package com.example.rabbitmq.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQJsonConfig {
/**
* 自定义 ObjectMapper
*/
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
// 支持 Java 8 日期时间
mapper.registerModule(new JavaTimeModule());
// 日期格式化为 ISO 格式
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
// 忽略未知属性(向后兼容)
mapper.configure(
com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false
);
return mapper;
}
/**
* JSON 消息转换器
*/
@Bean
public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
/**
* 配置 RabbitTemplate 使用 JSON 转换器
*/
@Bean
public RabbitTemplate rabbitTemplate(
ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter);
return template;
}
}
使用示例
package com.example.rabbitmq.model;
import lombok.Data;
import java.time.LocalDateTime;
import java.math.BigDecimal;
@Data
public class Order {
private String orderId;
private String userId;
private BigDecimal amount;
private LocalDateTime createTime;
private OrderStatus status;
public enum OrderStatus {
CREATED, PAID, SHIPPED, COMPLETED
}
}
package com.example.rabbitmq.producer;
import com.example.rabbitmq.model.Order;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Slf4j
@Service
@RequiredArgsConstructor
public class JsonProducer {
private final RabbitTemplate rabbitTemplate;
public void sendOrder(Order order) {
// 自动使用 JSON 序列化
rabbitTemplate.convertAndSend("order.exchange", "order.created", order);
log.info("📤 发送订单: {}", order);
}
}
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.model.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class JsonConsumer {
@RabbitListener(queues = "order.queue")
public void handleOrder(Order order) {
// 自动从 JSON 反序列化
log.info("📥 收到订单: {}", order);
}
}
序列化后的消息格式
{
"orderId": "ORD-20240115-001",
"userId": "U123456",
"amount": 99.99,
"createTime": "2024-01-15T10:30:00",
"status": "CREATED"
}
🚀 Protobuf 序列化(高性能)
添加依赖
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.1</version>
</dependency>
<!-- Maven 插件 -->
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.25.1:exe:${os.detected.classifier}</protocArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
定义 Proto 文件
// src/main/proto/order.proto
syntax = "proto3";
package com.example.rabbitmq.proto;
option java_package = "com.example.rabbitmq.proto";
option java_outer_classname = "OrderProto";
message Order {
string order_id = 1;
string user_id = 2;
double amount = 3;
int64 create_time = 4;
OrderStatus status = 5;
enum OrderStatus {
CREATED = 0;
PAID = 1;
SHIPPED = 2;
COMPLETED = 3;
}
}
Protobuf 消息转换器
package com.example.rabbitmq.config;
import com.example.rabbitmq.proto.OrderProto;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class ProtobufMessageConverter implements MessageConverter {
private static final String CONTENT_TYPE = "application/x-protobuf";
@Override
public Message toMessage(Object object, MessageProperties props)
throws MessageConversionException {
if (object instanceof OrderProto.Order) {
OrderProto.Order order = (OrderProto.Order) object;
props.setContentType(CONTENT_TYPE);
props.setHeader("x-protobuf-class", OrderProto.Order.class.getName());
return new Message(order.toByteArray(), props);
}
throw new MessageConversionException("不支持的消息类型: " + object.getClass());
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String className = (String) message.getMessageProperties()
.getHeaders().get("x-protobuf-class");
try {
if (OrderProto.Order.class.getName().equals(className)) {
return OrderProto.Order.parseFrom(message.getBody());
}
} catch (InvalidProtocolBufferException e) {
throw new MessageConversionException("Protobuf 反序列化失败", e);
}
throw new MessageConversionException("不支持的 Protobuf 类型: " + className);
}
}
使用示例
package com.example.rabbitmq.producer;
import com.example.rabbitmq.proto.OrderProto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class ProtobufProducer {
private final RabbitTemplate rabbitTemplate;
public void sendOrder() {
OrderProto.Order order = OrderProto.Order.newBuilder()
.setOrderId("ORD-001")
.setUserId("U123")
.setAmount(99.99)
.setCreateTime(System.currentTimeMillis())
.setStatus(OrderProto.Order.OrderStatus.CREATED)
.build();
rabbitTemplate.convertAndSend("protobuf.exchange", "protobuf.key", order);
log.info("📤 发送 Protobuf 消息: {}", order);
}
}
📝 多类型消息处理
类型识别转换器
package com.example.rabbitmq.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
public class TypeAwareJsonConverter extends Jackson2JsonMessageConverter {
public TypeAwareJsonConverter(ObjectMapper objectMapper) {
super(objectMapper);
}
@Override
public Message toMessage(Object object, MessageProperties props)
throws MessageConversionException {
// 添加类型信息到 Header
props.setHeader("__TypeId__", object.getClass().getName());
return super.toMessage(object, props);
}
@Override
protected Message createMessage(Object object, MessageProperties props) {
props.setContentType(MessageProperties.CONTENT_TYPE_JSON);
return super.createMessage(object, props);
}
}
配置多类型支持
package com.example.rabbitmq.config;
import com.example.rabbitmq.model.Order;
import com.example.rabbitmq.model.Payment;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MultiTypeMessageConfig {
@Bean
public MessageConverter multiTypeConverter(ObjectMapper objectMapper) {
Jackson2JsonMessageConverter converter =
new Jackson2JsonMessageConverter(objectMapper);
// 配置类型映射
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("order", Order.class);
idClassMapping.put("payment", Payment.class);
classMapper.setIdClassMapping(idClassMapping);
classMapper.setTrustedPackages("com.example.rabbitmq.model");
converter.setClassMapper(classMapper);
return converter;
}
}
🔄 版本兼容性处理
向后兼容的消息定义
package com.example.rabbitmq.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
/**
* 使用 @JsonIgnoreProperties 忽略未知字段
* 确保新版本生产的消息可以被旧版本消费
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Order {
private String orderId;
private String userId;
private Double amount;
// 新增字段使用默认值
private String remark = "";
// 新增字段可以为 null
private String couponCode;
}
版本控制策略
package com.example.rabbitmq.model;
import lombok.Data;
@Data
public class VersionedMessage<T> {
private String version; // 消息版本号
private String type; // 消息类型
private T payload; // 消息内容
private long timestamp; // 时间戳
}
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.model.Order;
import com.example.rabbitmq.model.VersionedMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class VersionAwareConsumer {
@RabbitListener(queues = "versioned.queue")
public void handleVersionedMessage(VersionedMessage<Order> message) {
String version = message.getVersion();
switch (version) {
case "1.0":
handleV1(message.getPayload());
break;
case "2.0":
handleV2(message.getPayload());
break;
default:
log.warn("未知消息版本: {}", version);
}
}
private void handleV1(Order order) {
log.info("处理 V1 版本订单: {}", order);
}
private void handleV2(Order order) {
log.info("处理 V2 版本订单: {}", order);
}
}
📊 性能测试对比
package com.example.rabbitmq.benchmark;
import com.example.rabbitmq.model.Order;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
@Slf4j
public class SerializationBenchmark {
private static final int ITERATIONS = 100000;
public static void main(String[] args) throws Exception {
Order order = createOrder();
// JDK 序列化
benchmarkJdk(order);
// JSON 序列化
benchmarkJson(order);
}
private static void benchmarkJdk(Order order) throws Exception {
long start = System.currentTimeMillis();
byte[] data = null;
for (int i = 0; i < ITERATIONS; i++) {
// 序列化
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(order);
data = bos.toByteArray();
// 反序列化
ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bis);
ois.readObject();
}
log.info("JDK 序列化: {}ms, 大小: {} bytes",
System.currentTimeMillis() - start, data.length);
}
private static void benchmarkJson(Order order) throws Exception {
ObjectMapper mapper = new ObjectMapper();
long start = System.currentTimeMillis();
byte[] data = null;
for (int i = 0; i < ITERATIONS; i++) {
// 序列化
data = mapper.writeValueAsBytes(order);
// 反序列化
mapper.readValue(data, Order.class);
}
log.info("JSON 序列化: {}ms, 大小: {} bytes",
System.currentTimeMillis() - start, data.length);
}
private static Order createOrder() {
Order order = new Order();
order.setOrderId("ORD-001");
order.setUserId("U123");
order.setAmount(99.99);
return order;
}
}
测试结果示例
| 方式 | 10万次耗时 | 消息大小 |
|---|---|---|
| JDK | 8500ms | 285 bytes |
| JSON | 2100ms | 89 bytes |
| Protobuf | 580ms | 23 bytes |
📝 本章小结
| 要点 | 说明 |
|---|---|
| 默认序列化 | JDK 序列化,不推荐使用 |
| 推荐方案 | JSON(通用)或 Protobuf(高性能) |
| 版本兼容 | 使用 @JsonIgnoreProperties |
| 类型识别 | 通过 Header 或 ClassMapper |
| 性能差异 | Protobuf > JSON > JDK |
选型建议
| 场景 | 推荐方案 |
|---|---|
| 内部系统 | JSON(易于调试) |
| 高并发系统 | Protobuf(高性能) |
| 跨语言系统 | JSON 或 Protobuf |
| 大数据场景 | Avro |
下一步
进阶篇学习完成!接下来进入 高级篇 学习集群和运维知识!
