消息队列设计模式:从基础到高级应用
一、消息队列概述
1.1 什么是消息队列
消息队列(Message Queue)是一种异步通信机制,通过将消息发送到队列中,实现生产者和消费者的解耦。核心特点包括:
- 异步通信:生产者发送消息后无需等待响应
- 解耦:生产者和消费者无需知道彼此的存在
- 削峰填谷:缓冲突发流量
- 可靠性:消息持久化和重试机制
1.2 常见消息队列对比
| 特性 | RabbitMQ | Kafka | RocketMQ | ActiveMQ |
|---|---|---|---|---|
| 协议 | AMQP | TCP自定义 | 自定义协议 | AMQP/MQTT |
| 吞吐量 | 中等 | 极高 | 高 | 中等 |
| 延迟 | 低 | 中 | 低 | 中 |
| 持久化 | 支持 | 支持 | 支持 | 支持 |
| 分布式 | 支持 | 原生支持 | 原生支持 | 支持 |
| 事务 | 支持 | 支持 | 强支持 | 支持 |
二、消息队列核心模式
2.1 点对点模式(Point-to-Point)
一对一的消息传递模式,每条消息只能被一个消费者消费。
@Component public class P2PPublisher { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String queueName, Object message) { rabbitTemplate.convertAndSend(queueName, message); } } @Component public class P2PConsumer { @RabbitListener(queues = "direct.queue") public void receiveMessage(String message) { System.out.println("Received: " + message); } }适用场景:任务分发、工单处理、异步任务
2.2 发布/订阅模式(Publish/Subscribe)
一对多的消息传递模式,消息会被广播到所有订阅者。
@Component public class PubSubPublisher { @Autowired private RabbitTemplate rabbitTemplate; public void publishMessage(String exchangeName, Object message) { rabbitTemplate.convertAndSend(exchangeName, "", message); } } @Component public class PubSubConsumer { @RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(value = "fanout.exchange", type = ExchangeTypes.FANOUT) )) public void receiveMessage(String message) { System.out.println("Subscriber received: " + message); } }适用场景:事件通知、日志广播、实时更新
2.3 请求/回复模式(Request/Reply)
客户端发送请求消息,等待服务端的响应消息。
@Component public class RequestReplyClient { @Autowired private RabbitTemplate rabbitTemplate; public String sendRequest(String request) throws InterruptedException { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); Message reply = rabbitTemplate.sendAndReceive( "request.exchange", "request.routing.key", MessageBuilder.withBody(request.getBytes()).build(), correlationId ); return reply != null ? new String(reply.getBody()) : null; } } @Component public class RequestReplyServer { @RabbitListener(queues = "request.queue") public String handleRequest(String request) { // 处理请求 return "Response for: " + request; } }适用场景:RPC调用、异步查询
三、高级消息模式
3.1 死信队列(Dead Letter Queue)
处理无法正常消费的消息,实现延迟重试或死信归档。
@Configuration public class DeadLetterConfig { @Bean public Queue mainQueue() { return QueueBuilder.durable("main.queue") .withArgument("x-dead-letter-exchange", "dlx.exchange") .withArgument("x-dead-letter-routing-key", "dlx.routing.key") .withArgument("x-message-ttl", 60000) .build(); } @Bean public Queue deadLetterQueue() { return QueueBuilder.durable("dlq.queue").build(); } @Bean public DirectExchange dlxExchange() { return new DirectExchange("dlx.exchange"); } @Bean public Binding dlqBinding() { return BindingBuilder.bind(deadLetterQueue()) .to(dlxExchange()) .with("dlx.routing.key"); } }适用场景:消息重试、错误处理、审计日志
3.2 延迟队列(Delay Queue)
实现消息的延迟投递,用于定时任务、订单超时处理等场景。
@Component public class DelayQueueProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendDelayedMessage(String message, long delayMs) { rabbitTemplate.convertAndSend( "delay.exchange", "delay.routing.key", message, messagePostProcessor -> { messagePostProcessor.getMessageProperties() .setDelay((int) delayMs); return messagePostProcessor; } ); } } @Configuration public class DelayQueueConfig { @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange("delay.exchange", "x-delayed-message", true, false, args); } @Bean public Queue delayQueue() { return new Queue("delay.queue", true); } @Bean public Binding delayBinding() { return BindingBuilder.bind(delayQueue()) .to(delayExchange()) .with("delay.routing.key") .noargs(); } }适用场景:订单超时取消、定时通知、重试机制
3.3 优先级队列(Priority Queue)
根据消息优先级进行消费排序,高优先级消息优先处理。
@Component public class PriorityQueueProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendPriorityMessage(String message, int priority) { MessageProperties properties = new MessageProperties(); properties.setPriority(priority); Message messageObj = new Message(message.getBytes(), properties); rabbitTemplate.send("priority.exchange", "priority.routing.key", messageObj); } } @Configuration public class PriorityQueueConfig { @Bean public Queue priorityQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-max-priority", 10); return new Queue("priority.queue", true, false, false, args); } }适用场景:紧急任务处理、VIP服务、资源调度
四、消息可靠性保障
4.1 消息持久化
确保消息在Broker重启后不丢失。
@Configuration public class PersistenceConfig { @Bean public Queue durableQueue() { return QueueBuilder.durable("persistent.queue") .build(); } @Bean public DirectExchange durableExchange() { return ExchangeBuilder.directExchange("persistent.exchange") .durable(true) .build(); } }4.2 消息确认机制
生产者确认(Publisher Confirm)和消费者确认(Consumer ACK)。
@Component public class ConfirmProducer { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println("Message confirmed: " + correlationData.getId()); } else { System.err.println("Message rejected: " + cause); // 重试或持久化到数据库 } }); rabbitTemplate.setReturnsCallback(returnedMessage -> { System.err.println("Message returned: " + returnedMessage.getMessage()); }); } } @Component public class AckConsumer { @RabbitListener(queues = "ack.queue") public void receiveMessage(Message message, Channel channel) throws IOException { try { System.out.println("Processing message: " + new String(message.getBody())); // 手动确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 拒绝并重新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }4.3 幂等性保障
防止消息重复处理导致数据不一致。
@Component public class IdempotentConsumer { @Autowired private RedisTemplate<String, String> redisTemplate; @RabbitListener(queues = "idempotent.queue") public void processMessage(Message message) { String messageId = message.getMessageProperties().getMessageId(); // 使用Redis实现幂等 Boolean processed = redisTemplate.opsForValue().setIfAbsent( "message:" + messageId, "processed", Duration.ofMinutes(30) ); if (Boolean.TRUE.equals(processed)) { // 首次处理 processBusinessLogic(message); } else { // 重复消息,跳过处理 System.out.println("Duplicate message: " + messageId); } } private void processBusinessLogic(Message message) { // 业务处理逻辑 } }五、消息路由模式
5.1 直接路由(Direct Routing)
基于精确匹配路由键进行消息分发。
@Configuration public class DirectRoutingConfig { @Bean public Queue orderQueue() { return new Queue("order.queue"); } @Bean public Queue paymentQueue() { return new Queue("payment.queue"); } @Bean public DirectExchange directExchange() { return new DirectExchange("direct.exchange"); } @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()) .to(directExchange()) .with("order.created"); } @Bean public Binding paymentBinding() { return BindingBuilder.bind(paymentQueue()) .to(directExchange()) .with("payment.completed"); } }5.2 主题路由(Topic Routing)
基于通配符匹配路由键进行消息分发。
@Configuration public class TopicRoutingConfig { @Bean public Queue auditQueue() { return new Queue("audit.queue"); } @Bean public Queue alertQueue() { return new Queue("alert.queue"); } @Bean public TopicExchange topicExchange() { return new TopicExchange("topic.exchange"); } @Bean public Binding auditBinding() { return BindingBuilder.bind(auditQueue()) .to(topicExchange()) .with("order.*.*"); } @Bean public Binding alertBinding() { return BindingBuilder.bind(alertQueue()) .to(topicExchange()) .with("*.error.*"); } }5.3 头部路由(Headers Routing)
基于消息头部属性进行路由匹配。
@Configuration public class HeadersRoutingConfig { @Bean public Queue highPriorityQueue() { return new Queue("high.priority.queue"); } @Bean public Queue lowPriorityQueue() { return new Queue("low.priority.queue"); } @Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers.exchange"); } @Bean public Binding highPriorityBinding() { Map<String, Object> headers = new HashMap<>(); headers.put("priority", "high"); return BindingBuilder.bind(highPriorityQueue()) .to(headersExchange()) .whereAll(headers).match(); } @Bean public Binding lowPriorityBinding() { Map<String, Object> headers = new HashMap<>(); headers.put("priority", "low"); return BindingBuilder.bind(lowPriorityQueue()) .to(headersExchange()) .whereAll(headers).match(); } }六、消息流处理模式
6.1 消息过滤(Message Filtering)
在消费端或Broker端对消息进行过滤。
@Component public class FilteredConsumer { @RabbitListener(queues = "filtered.queue") public void processMessage(Message message) { MessageProperties properties = message.getMessageProperties(); // 根据头部信息过滤 String contentType = properties.getContentType(); if (!"application/json".equals(contentType)) { return; // 跳过非JSON消息 } processMessageBody(message.getBody()); } }6.2 消息聚合(Message Aggregation)
将多个相关消息聚合成一个完整的消息进行处理。
@Component public class MessageAggregator { private final Map<String, List<Message>> messageGroups = new ConcurrentHashMap<>(); private static final int GROUP_SIZE = 10; @RabbitListener(queues = "aggregate.queue") public void receiveMessage(Message message) { String groupId = message.getMessageProperties().getHeader("groupId"); messageGroups.compute(groupId, (key, messages) -> { if (messages == null) { messages = new ArrayList<>(); } messages.add(message); if (messages.size() >= GROUP_SIZE) { processGroup(groupId, messages); return new ArrayList<>(); } return messages; }); } private void processGroup(String groupId, List<Message> messages) { // 聚合处理逻辑 System.out.println("Processing group: " + groupId + ", count: " + messages.size()); } }6.3 消息拆分(Message Splitting)
将大消息拆分成多个小消息进行处理。
@Component public class MessageSplitter { @Autowired private RabbitTemplate rabbitTemplate; public void splitAndSend(String exchange, String routingKey, List<String> items) { for (String item : items) { rabbitTemplate.convertAndSend(exchange, routingKey, item); } // 发送结束标记 rabbitTemplate.convertAndSend(exchange, routingKey, "END_OF_BATCH"); } }七、分布式事务处理
7.1 两阶段提交(2PC)
基于XA协议的分布式事务处理。
@Transactional public void processOrderWithTransaction(Order order) { // 1. 本地数据库操作 orderRepository.save(order); // 2. 发送消息(参与XA事务) rabbitTemplate.convertAndSend("order.exchange", "order.created", order); // 3. 如果上述操作都成功,事务提交;否则回滚 }7.2 本地消息表模式
通过本地消息表实现最终一致性。
@Component public class LocalMessageService { @Autowired private MessageLogRepository messageLogRepository; @Autowired private RabbitTemplate rabbitTemplate; @Transactional public void sendMessage(String exchange, String routingKey, Object payload) { // 1. 保存消息到本地消息表 MessageLog log = new MessageLog(); log.setMessageId(UUID.randomUUID().toString()); log.setExchange(exchange); log.setRoutingKey(routingKey); log.setPayload(payload.toString()); log.setStatus(MessageStatus.PENDING); messageLogRepository.save(log); try { // 2. 发送消息 rabbitTemplate.convertAndSend(exchange, routingKey, payload); // 3. 更新消息状态为已发送 log.setStatus(MessageStatus.SENT); messageLogRepository.save(log); } catch (Exception e) { // 4. 发送失败,消息保持PENDING状态,由重试任务处理 throw new RuntimeException("Message send failed", e); } } } @Component public class MessageRetryTask { @Scheduled(fixedRate = 60000) public void retryPendingMessages() { List<MessageLog> pendingMessages = messageLogRepository.findByStatus(MessageStatus.PENDING); for (MessageLog log : pendingMessages) { try { rabbitTemplate.convertAndSend(log.getExchange(), log.getRoutingKey(), log.getPayload()); log.setStatus(MessageStatus.SENT); } catch (Exception e) { log.setRetryCount(log.getRetryCount() + 1); if (log.getRetryCount() >= 3) { log.setStatus(MessageStatus.FAILED); } } messageLogRepository.save(log); } } }八、消息队列监控与运维
8.1 消息队列监控指标
@Component public class QueueMonitor { @Autowired private RabbitAdmin rabbitAdmin; public Map<String, Object> getQueueStats(String queueName) { Map<String, Object> stats = new HashMap<>(); try { QueueInfo info = rabbitAdmin.getQueueInfo(queueName); stats.put("queueName", info.getName()); stats.put("messageCount", info.getMessageCount()); stats.put("consumerCount", info.getConsumerCount()); stats.put("memory", info.getMemory()); } catch (Exception e) { stats.put("error", e.getMessage()); } return stats; } }8.2 队列限流与流量控制
@Component public class RateLimitedConsumer { private final Semaphore semaphore = new Semaphore(100); @RabbitListener(queues = "rate.limited.queue") public void processMessage(String message) throws InterruptedException { semaphore.acquire(); try { processBusinessLogic(message); } finally { semaphore.release(); } } private void processBusinessLogic(String message) { // 业务处理 } }九、性能优化策略
9.1 消息批量处理
@Component public class BatchConsumer { @RabbitListener(queues = "batch.queue", containerFactory = "batchContainerFactory") public void processBatch(List<Message> messages) { // 批量处理消息 for (Message message : messages) { processMessage(message); } } @Bean public SimpleRabbitListenerContainerFactory batchContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setBatchListener(true); factory.setBatchSize(100); factory.setReceiveTimeout(5000); return factory; } }9.2 消息压缩
@Component public class CompressedMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendCompressedMessage(String exchange, String routingKey, String message) throws IOException { byte[] compressed = compress(message.getBytes(StandardCharsets.UTF_8)); MessageProperties properties = new MessageProperties(); properties.setContentEncoding("gzip"); properties.setContentType("application/json"); Message msg = new Message(compressed, properties); rabbitTemplate.send(exchange, routingKey, msg); } private byte[] compress(byte[] data) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (GZIPOutputStream gzos = new GZIPOutputStream(baos)) { gzos.write(data); } return baos.toByteArray(); } }9.3 异步消费优化
@Component public class AsyncConsumer { @Async @RabbitListener(queues = "async.queue") public CompletableFuture<Void> processAsync(String message) { return CompletableFuture.runAsync(() -> { // 异步处理 processMessage(message); }); } }十、总结
消息队列是构建高可用、高性能分布式系统的关键组件。本文介绍了多种消息队列设计模式:
- 基础模式:点对点、发布/订阅、请求/回复
- 高级模式:死信队列、延迟队列、优先级队列
- 可靠性保障:持久化、消息确认、幂等性
- 路由模式:直接路由、主题路由、头部路由
- 流处理模式:消息过滤、聚合、拆分
- 分布式事务:2PC、本地消息表模式
- 监控运维:指标监控、限流控制
- 性能优化:批量处理、消息压缩、异步消费
选择合适的消息队列和设计模式,能够有效提升系统的可扩展性、可靠性和性能。