优先级队列
优先级队列允许为消息设置优先级,高优先级的消息会被优先消费。
🎯 本章目标
- 理解优先级队列的工作原理
- 掌握优先级队列的配置和使用
- 了解优先级队列的适用场景
📋 应用场景
| 场景 | 说明 |
|---|---|
| VIP 用户优先 | VIP 用户的请求优先处理 |
| 紧急工单 | 紧急工单优先于普通工单 |
| 支付订单 | 支付订单优先于浏览日志 |
| 错误告警 | 错误级别告警优先处理 |
🔧 配置优先级队列
配置类
package com.example.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 优先级队列配置
*/
@Configuration
public class PriorityQueueConfig {
public static final String PRIORITY_EXCHANGE = "priority.exchange";
public static final String PRIORITY_QUEUE = "priority.queue";
public static final String PRIORITY_ROUTING_KEY = "priority.key";
// 最大优先级(0-255,建议不要太大,会消耗更多资源)
public static final int MAX_PRIORITY = 10;
@Bean
public DirectExchange priorityExchange() {
return new DirectExchange(PRIORITY_EXCHANGE);
}
@Bean
public Queue priorityQueue() {
Map<String, Object> args = new HashMap<>();
// 设置队列最大优先级
args.put("x-max-priority", MAX_PRIORITY);
return QueueBuilder.durable(PRIORITY_QUEUE)
.withArguments(args)
.build();
}
@Bean
public Binding priorityBinding() {
return BindingBuilder.bind(priorityQueue())
.to(priorityExchange())
.with(PRIORITY_ROUTING_KEY);
}
}
生产者
package com.example.rabbitmq.producer;
import com.example.rabbitmq.config.PriorityQueueConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
/**
* 优先级消息生产者
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class PriorityProducer {
private final RabbitTemplate rabbitTemplate;
/**
* 发送带优先级的消息
* @param message 消息内容
* @param priority 优先级 (0-10, 数字越大优先级越高)
*/
public void sendWithPriority(String message, int priority) {
// 确保优先级在有效范围内
int validPriority = Math.min(Math.max(priority, 0), PriorityQueueConfig.MAX_PRIORITY);
rabbitTemplate.convertAndSend(
PriorityQueueConfig.PRIORITY_EXCHANGE,
PriorityQueueConfig.PRIORITY_ROUTING_KEY,
message,
msg -> {
msg.getMessageProperties().setPriority(validPriority);
return msg;
}
);
log.info("📤 发送消息 [优先级={}]: {}", validPriority, message);
}
/**
* 发送普通优先级消息(默认优先级 5)
*/
public void sendNormal(String message) {
sendWithPriority(message, 5);
}
/**
* 发送高优先级消息
*/
public void sendHigh(String message) {
sendWithPriority(message, 10);
}
/**
* 发送低优先级消息
*/
public void sendLow(String message) {
sendWithPriority(message, 1);
}
}
消费者
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.config.PriorityQueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 优先级消息消费者
*/
@Slf4j
@Component
public class PriorityConsumer {
@RabbitListener(queues = PriorityQueueConfig.PRIORITY_QUEUE)
public void handleMessage(String message, Channel channel, Message amqpMessage)
throws IOException {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
Integer priority = amqpMessage.getMessageProperties().getPriority();
log.info("📥 收到消息 [优先级={}]: {}", priority, message);
try {
// 处理消息
processMessage(message, priority);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("消息处理失败", e);
channel.basicNack(deliveryTag, false, false);
}
}
private void processMessage(String message, Integer priority) throws InterruptedException {
// 模拟处理时间
Thread.sleep(500);
log.info("✅ 处理完成 [优先级={}]: {}", priority, message);
}
}
🔍 实战案例:工单系统
package com.example.rabbitmq.ticket;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 工单实体
*/
@Data
public class Ticket {
private String ticketId;
private String title;
private String content;
private TicketLevel level; // 工单级别
private String userId;
private LocalDateTime createTime;
public enum TicketLevel {
NORMAL(3), // 普通
URGENT(7), // 紧急
CRITICAL(10); // 严重
private final int priority;
TicketLevel(int priority) {
this.priority = priority;
}
public int getPriority() {
return priority;
}
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class TicketService {
private final RabbitTemplate rabbitTemplate;
/**
* 提交工单
*/
public void submitTicket(Ticket ticket) {
ticket.setTicketId(generateTicketId());
ticket.setCreateTime(LocalDateTime.now());
// 根据工单级别设置优先级
int priority = ticket.getLevel().getPriority();
rabbitTemplate.convertAndSend(
"ticket.exchange",
"ticket.submit",
ticket,
msg -> {
msg.getMessageProperties().setPriority(priority);
return msg;
}
);
log.info("工单提交成功: {} [{}]", ticket.getTicketId(), ticket.getLevel());
}
}
@Slf4j
@Component
public class TicketProcessor {
@RabbitListener(queues = "ticket.queue")
public void processTicket(Ticket ticket, Channel channel, Message message)
throws IOException {
log.info("📋 处理工单: {} [{}] - {}",
ticket.getTicketId(),
ticket.getLevel(),
ticket.getTitle()
);
try {
// 分配给对应的处理人员
assignTicket(ticket);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
private void assignTicket(Ticket ticket) {
// 根据级别分配处理人员
switch (ticket.getLevel()) {
case CRITICAL:
log.info("🚨 分配给高级工程师处理");
break;
case URGENT:
log.info("⚠️ 分配给值班工程师处理");
break;
default:
log.info("📝 加入普通工单队列");
}
}
}
⚠️ 注意事项
1. 优先级只在队列有堆积时生效
情况1:队列空闲
消息 A(优先级1) → 立即被消费
消息 B(优先级10) → 立即被消费
结果:A 先于 B 被消费(按到达顺序)
情况2:队列有堆积
队列中已有 [消息A(优先级1), 消息B(优先级10)]
消费顺序:B → A(按优先级排序)
2. 设置合适的 prefetch
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只取一条,让优先级生效
3. 不要设置过大的最大优先级
// 不推荐
args.put("x-max-priority", 255);
// 推荐
args.put("x-max-priority", 10);
📝 本章小结
| 要点 | 说明 |
|---|---|
| x-max-priority | 声明队列时设置最大优先级 |
| setPriority | 发送消息时设置优先级 |
| 生效条件 | 队列有积压时才按优先级排序 |
| prefetch | 建议设置为 1 |
恭喜!
你已经完成了进阶篇的学习!接下来可以进入 高级篇 学习集群和运维知识。
