优先级队列

优先级队列允许为消息设置优先级,高优先级的消息会被优先消费。

🎯 本章目标

  • 理解优先级队列的工作原理
  • 掌握优先级队列的配置和使用
  • 了解优先级队列的适用场景

📋 应用场景

场景说明
VIP 用户优先VIP 用户的请求优先处理
紧急工单紧急工单优先于普通工单
支付订单支付订单优先于浏览日志
错误告警错误级别告警优先处理

🔧 配置优先级队列

配置类

package com.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 优先级队列配置
 */
@Configuration
public class PriorityQueueConfig {
    
    public static final String PRIORITY_EXCHANGE = "priority.exchange";
    public static final String PRIORITY_QUEUE = "priority.queue";
    public static final String PRIORITY_ROUTING_KEY = "priority.key";
    
    // 最大优先级(0-255,建议不要太大,会消耗更多资源)
    public static final int MAX_PRIORITY = 10;
    
    @Bean
    public DirectExchange priorityExchange() {
        return new DirectExchange(PRIORITY_EXCHANGE);
    }
    
    @Bean
    public Queue priorityQueue() {
        Map<String, Object> args = new HashMap<>();
        // 设置队列最大优先级
        args.put("x-max-priority", MAX_PRIORITY);
        
        return QueueBuilder.durable(PRIORITY_QUEUE)
                .withArguments(args)
                .build();
    }
    
    @Bean
    public Binding priorityBinding() {
        return BindingBuilder.bind(priorityQueue())
                .to(priorityExchange())
                .with(PRIORITY_ROUTING_KEY);
    }
}

生产者

package com.example.rabbitmq.producer;

import com.example.rabbitmq.config.PriorityQueueConfig;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

/**
 * 优先级消息生产者
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class PriorityProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 发送带优先级的消息
     * @param message 消息内容
     * @param priority 优先级 (0-10, 数字越大优先级越高)
     */
    public void sendWithPriority(String message, int priority) {
        // 确保优先级在有效范围内
        int validPriority = Math.min(Math.max(priority, 0), PriorityQueueConfig.MAX_PRIORITY);
        
        rabbitTemplate.convertAndSend(
            PriorityQueueConfig.PRIORITY_EXCHANGE,
            PriorityQueueConfig.PRIORITY_ROUTING_KEY,
            message,
            msg -> {
                msg.getMessageProperties().setPriority(validPriority);
                return msg;
            }
        );
        
        log.info("📤 发送消息 [优先级={}]: {}", validPriority, message);
    }
    
    /**
     * 发送普通优先级消息(默认优先级 5)
     */
    public void sendNormal(String message) {
        sendWithPriority(message, 5);
    }
    
    /**
     * 发送高优先级消息
     */
    public void sendHigh(String message) {
        sendWithPriority(message, 10);
    }
    
    /**
     * 发送低优先级消息
     */
    public void sendLow(String message) {
        sendWithPriority(message, 1);
    }
}

消费者

package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.PriorityQueueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 优先级消息消费者
 */
@Slf4j
@Component
public class PriorityConsumer {
    
    @RabbitListener(queues = PriorityQueueConfig.PRIORITY_QUEUE)
    public void handleMessage(String message, Channel channel, Message amqpMessage) 
            throws IOException {
        
        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
        Integer priority = amqpMessage.getMessageProperties().getPriority();
        
        log.info("📥 收到消息 [优先级={}]: {}", priority, message);
        
        try {
            // 处理消息
            processMessage(message, priority);
            
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            log.error("消息处理失败", e);
            channel.basicNack(deliveryTag, false, false);
        }
    }
    
    private void processMessage(String message, Integer priority) throws InterruptedException {
        // 模拟处理时间
        Thread.sleep(500);
        log.info("✅ 处理完成 [优先级={}]: {}", priority, message);
    }
}

🔍 实战案例:工单系统

package com.example.rabbitmq.ticket;

import lombok.Data;
import java.time.LocalDateTime;

/**
 * 工单实体
 */
@Data
public class Ticket {
    private String ticketId;
    private String title;
    private String content;
    private TicketLevel level;  // 工单级别
    private String userId;
    private LocalDateTime createTime;
    
    public enum TicketLevel {
        NORMAL(3),      // 普通
        URGENT(7),      // 紧急
        CRITICAL(10);   // 严重
        
        private final int priority;
        
        TicketLevel(int priority) {
            this.priority = priority;
        }
        
        public int getPriority() {
            return priority;
        }
    }
}
@Slf4j
@Service
@RequiredArgsConstructor
public class TicketService {
    
    private final RabbitTemplate rabbitTemplate;
    
    /**
     * 提交工单
     */
    public void submitTicket(Ticket ticket) {
        ticket.setTicketId(generateTicketId());
        ticket.setCreateTime(LocalDateTime.now());
        
        // 根据工单级别设置优先级
        int priority = ticket.getLevel().getPriority();
        
        rabbitTemplate.convertAndSend(
            "ticket.exchange",
            "ticket.submit",
            ticket,
            msg -> {
                msg.getMessageProperties().setPriority(priority);
                return msg;
            }
        );
        
        log.info("工单提交成功: {} [{}]", ticket.getTicketId(), ticket.getLevel());
    }
}

@Slf4j
@Component
public class TicketProcessor {
    
    @RabbitListener(queues = "ticket.queue")
    public void processTicket(Ticket ticket, Channel channel, Message message) 
            throws IOException {
        
        log.info("📋 处理工单: {} [{}] - {}", 
            ticket.getTicketId(), 
            ticket.getLevel(), 
            ticket.getTitle()
        );
        
        try {
            // 分配给对应的处理人员
            assignTicket(ticket);
            
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
    
    private void assignTicket(Ticket ticket) {
        // 根据级别分配处理人员
        switch (ticket.getLevel()) {
            case CRITICAL:
                log.info("🚨 分配给高级工程师处理");
                break;
            case URGENT:
                log.info("⚠️ 分配给值班工程师处理");
                break;
            default:
                log.info("📝 加入普通工单队列");
        }
    }
}

⚠️ 注意事项

1. 优先级只在队列有堆积时生效

情况1:队列空闲
消息 A(优先级1) → 立即被消费
消息 B(优先级10) → 立即被消费
结果:A 先于 B 被消费(按到达顺序)

情况2:队列有堆积
队列中已有 [消息A(优先级1), 消息B(优先级10)]
消费顺序:B → A(按优先级排序)

2. 设置合适的 prefetch

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1  # 每次只取一条,让优先级生效

3. 不要设置过大的最大优先级

// 不推荐
args.put("x-max-priority", 255);

// 推荐
args.put("x-max-priority", 10);

📝 本章小结

要点说明
x-max-priority声明队列时设置最大优先级
setPriority发送消息时设置优先级
生效条件队列有积压时才按优先级排序
prefetch建议设置为 1

恭喜!

你已经完成了进阶篇的学习!接下来可以进入 高级篇 学习集群和运维知识。