news 2026/5/9 12:25:47

消息队列模式:异步处理最佳实践

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
消息队列模式:异步处理最佳实践

消息队列模式:异步处理最佳实践

核心概念

消息队列是实现异步通信的重要工具,可以解耦系统组件、提高系统的可扩展性和可靠性。本文将介绍常见的消息队列模式和最佳实践。

消息队列模式

1. 点对点模式

// 点对点生产者 @Component public class PointToPointProducer { private final RabbitTemplate rabbitTemplate; public PointToPointProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendMessage(String queueName, Object message) { rabbitTemplate.convertAndSend(queueName, message); } public void sendMessageWithCorrelation(String queueName, Object message, String correlationId) { rabbitTemplate.convertAndSend(queueName, message, msg -> { msg.getMessageProperties().setCorrelationId(correlationId); return msg; }); } } // 点对点消费者 @Component public class PointToPointConsumer { @RabbitListener(queues = "task-queue") public void handleMessage(String message) { System.out.println("Received message: " + message); processMessage(message); } private void processMessage(String message) { // 处理消息逻辑 } }

2. 发布/订阅模式

// 发布者 @Component public class Publisher { private final RabbitTemplate rabbitTemplate; public Publisher(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void publish(String exchangeName, String routingKey, Object message) { rabbitTemplate.convertAndSend(exchangeName, routingKey, message); } } // 订阅者 1 @Component public class Subscriber1 { @RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(value = "events", type = ExchangeTypes.TOPIC), key = "user.created" )) public void handleUserCreated(UserCreatedEvent event) { System.out.println("User created: " + event.getUserId()); } } // 订阅者 2 @Component public class Subscriber2 { @RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(value = "events", type = ExchangeTypes.TOPIC), key = "user.*" )) public void handleUserEvents(Object event) { System.out.println("User event received: " + event); } }

3. 请求/回复模式

// 请求者 @Component public class Requestor { private final RabbitTemplate rabbitTemplate; public Requestor(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setReplyTimeout(60000); } public String sendRequest(String request) { return rabbitTemplate.convertSendAndReceive("request-queue", request, String.class); } public CompletableFuture<String> sendAsyncRequest(String request) { return rabbitTemplate.convertSendAndReceiveAsynchronously("request-queue", request) .thenApply(o -> (String) o); } } // 回复者 @Component public class Replier { @RabbitListener(queues = "request-queue") public String handleRequest(String request) { String response = processRequest(request); return response; } private String processRequest(String request) { // 处理请求并返回响应 return "Response for: " + request; } }

消息可靠性

// 消息确认配置 @Configuration public class RabbitMqConfig { @Bean public Queue durableQueue() { return QueueBuilder.durable("durable-queue") .build(); } @Bean public Exchange durableExchange() { return ExchangeBuilder.topicExchange("durable-exchange") .durable(true) .build(); } @Bean public Binding binding(Queue durableQueue, Exchange durableExchange) { return BindingBuilder.bind(durableQueue) .to(durableExchange) .with("routing.key") .noargs(); } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); return factory; } } // 手动确认消费者 @Component public class ManualAckConsumer { @RabbitListener(queues = "manual-ack-queue", containerFactory = "rabbitListenerContainerFactory") public void handleMessage(String message, Channel channel, Message messageObj) throws IOException { try { processMessage(message); channel.basicAck(messageObj.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 拒绝并重新入队 channel.basicNack(messageObj.getMessageProperties().getDeliveryTag(), false, true); } } private void processMessage(String message) { // 处理消息逻辑 } }

死信队列

// 死信队列配置 @Configuration public class DeadLetterConfig { @Bean public Queue mainQueue() { return QueueBuilder.durable("main-queue") .deadLetterExchange("dead-letter-exchange") .deadLetterRoutingKey("dead-letter-key") .build(); } @Bean public Queue deadLetterQueue() { return QueueBuilder.durable("dead-letter-queue") .build(); } @Bean public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange("dead-letter-exchange") .build(); } @Bean public Binding deadLetterBinding() { return BindingBuilder.bind(deadLetterQueue()) .to(deadLetterExchange()) .with("dead-letter-key") .noargs(); } } // 死信队列消费者 @Component public class DeadLetterConsumer { @RabbitListener(queues = "dead-letter-queue") public void handleDeadLetter(Message message) { System.out.println("Dead letter received: " + new String(message.getBody())); // 记录日志、发送告警、人工处理等 } }

消息幂等性

// 幂等性处理器 @Component public class IdempotentMessageHandler { private final RedisTemplate<String, String> redisTemplate; private static final String PROCESSING_PREFIX = "processing:"; private static final String PROCESSED_PREFIX = "processed:"; public IdempotentMessageHandler(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } public boolean processIfNotProcessed(String messageId, Runnable handler) { String processingKey = PROCESSING_PREFIX + messageId; String processedKey = PROCESSED_PREFIX + messageId; // 检查是否已处理 if (Boolean.TRUE.equals(redisTemplate.hasKey(processedKey))) { return false; } // 尝试获取处理锁 Boolean acquired = redisTemplate.opsForValue() .setIfAbsent(processingKey, "processing", 5, TimeUnit.MINUTES); if (Boolean.FALSE.equals(acquired)) { return false; } try { handler.run(); // 标记为已处理 redisTemplate.opsForValue().set(processedKey, "true", 24, TimeUnit.HOURS); return true; } finally { // 释放处理锁 redisTemplate.delete(processingKey); } } } // 使用幂等处理器 @Service public class OrderService { private final IdempotentMessageHandler idempotentHandler; public OrderService(IdempotentMessageHandler idempotentHandler) { this.idempotentHandler = idempotentHandler; } public void processOrder(OrderCreatedEvent event) { boolean processed = idempotentHandler.processIfNotProcessed( event.getMessageId(), () -> { // 实际处理逻辑 createOrder(event); } ); if (!processed) { System.out.println("Message already processed: " + event.getMessageId()); } } private void createOrder(OrderCreatedEvent event) { // 创建订单逻辑 } }

消息重试

// 重试配置 @Configuration public class RetryConfig { @Bean public RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); retryPolicy.setMaxAttempts(3); FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(1000); retryTemplate.setRetryPolicy(retryPolicy); retryTemplate.setBackOffPolicy(backOffPolicy); return retryTemplate; } } // 使用重试的消息处理器 @Component public class RetryableConsumer { private final RetryTemplate retryTemplate; public RetryableConsumer(RetryTemplate retryTemplate) { this.retryTemplate = retryTemplate; } @RabbitListener(queues = "retry-queue") public void handleMessage(String message) { retryTemplate.execute(context -> { processMessage(message); return null; }); } private void processMessage(String message) { // 可能失败的处理逻辑 } }

Kafka 集成

// Kafka 生产者配置 @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); config.put(ProducerConfig.ACKS_CONFIG, "all"); config.put(ProducerConfig.RETRIES_CONFIG, 3); return new DefaultKafkaProducerFactory<>(config); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } } // Kafka 消费者配置 @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new DefaultKafkaConsumerFactory<>(config); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; } } // Kafka 消费者 @Component public class KafkaMessageConsumer { @KafkaListener(topics = "orders", containerFactory = "kafkaListenerContainerFactory") public void consume(ConsumerRecord<String, OrderCreatedEvent> record, Acknowledgment acknowledgment) { try { OrderCreatedEvent event = record.value(); processOrder(event); acknowledgment.acknowledge(); } catch (Exception e) { // 处理异常,可能需要重试或发送到死信队列 } } private void processOrder(OrderCreatedEvent event) { // 处理订单创建事件 } }

最佳实践

  1. 消息持久化:确保消息在 broker 重启后不丢失
  2. 消息确认:使用手动确认确保消息被正确处理
  3. 幂等性:确保重复消息不会导致重复处理
  4. 死信队列:处理无法处理的消息
  5. 重试机制:对失败的消息进行重试
  6. 监控告警:监控消息队列的状态和性能
  7. 消息过期:设置消息过期时间避免消息积压
  8. 批量处理:对大量消息进行批量处理

实际应用场景

  • 异步任务处理:将耗时操作异步化
  • 事件驱动架构:实现松耦合的组件通信
  • 流量削峰:通过队列缓冲突发流量
  • 日志收集:收集和处理日志数据

总结

消息队列是构建高可用、高扩展系统的重要组件。通过合理使用消息队列模式,可以实现系统组件的解耦和异步处理。在实际应用中,需要注意消息可靠性、幂等性和监控等方面。

别叫我大神,叫我 Alex 就好。这其实可以更优雅一点,合理的消息队列设计让系统变得更加可靠和高效。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/9 12:25:46

分子三维结构生成:从2D到3D的AI跨越与药物研发应用

1. 分子三维结构生成&#xff1a;从二维蓝图到三维世界的跨越在药物研发和材料科学的实验室里&#xff0c;我们常常面对一个核心矛盾&#xff1a;分子的三维结构决定了它的性质与功能&#xff0c;但获取这个三维结构却异常昂贵和缓慢。传统的量子化学计算方法&#xff0c;如密度…

作者头像 李华
网站建设 2026/5/9 12:25:37

【Voxel-SLAM】验证报告(十)

验证报告 / Verification Report 生成日期 / Generated: 2026-04-28 源代码版本 / Source Revision: 70fc8a2 (branch: main) 源代码总行数 / Total Source Lines: 8,262 文档总行数 / Total Doc Lines: 9,895 (不含本报告 / excluding this report) 源代码引用标签总数 / Tota…

作者头像 李华
网站建设 2026/5/9 12:25:27

神经网络变分蒙特卡洛的计算负载优化与GPU性能分析

1. 神经网络变分蒙特卡洛的计算负载特性深度解析量子化学计算领域近年来迎来了一项突破性技术——神经网络变分蒙特卡洛&#xff08;NNVMC&#xff09;。作为一名长期从事高性能计算与量子化学交叉研究的从业者&#xff0c;我见证了这项技术从理论构想到实际应用的完整发展历程…

作者头像 李华
网站建设 2026/5/9 12:24:11

CANN/runtime IPC事件同步示例

2_ipcevent_sample 【免费下载链接】runtime 本项目提供CANN运行时组件和维测功能组件。 项目地址: https://gitcode.com/cann/runtime 描述 本样例展示了两个进程之间通过 IPC Event 进行任务同步。 进程A&#xff08;生产者&#xff09;&#xff1a;创建IPC事件&…

作者头像 李华
网站建设 2026/5/9 12:24:10

CANN 3D高斯溅射优化

基于昇腾平台的3D Gausssian Spaltting的训推优化实践 【免费下载链接】cann-recipes-spatial-intelligence 本项目针对空间智能业务中的典型模型、加速算法&#xff0c;提供基于CANN平台的优化样例 项目地址: https://gitcode.com/cann/cann-recipes-spatial-intelligence …

作者头像 李华
网站建设 2026/5/9 12:23:49

CANN/cann-recipes-infer SwigluClipQuant算子

custom.npu_swiglu_clip_quant 【免费下载链接】cann-recipes-infer 本项目针对LLM与多模态模型推理业务中的典型模型、加速算法&#xff0c;提供基于CANN平台的优化样例 项目地址: https://gitcode.com/cann/cann-recipes-infer 产品支持情况 产品是否支持 Atlas A3 …

作者头像 李华