news 2026/4/16 16:04:37

【RabbitMQ】SpringBoot整合RabbitMQ:工作队列 发布/订阅模式 路由模式 通配符模式

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【RabbitMQ】SpringBoot整合RabbitMQ:工作队列 发布/订阅模式 路由模式 通配符模式

文章目录

    • 一、Work Queue(工作队列模式)
      • 引入依赖
      • 添加配置
      • 编写生产者代码
      • 编写消费者代码
      • 运行结果
    • 二、Publish/Subscribe(发布/订阅模式)
      • 编写生产者代码
      • 编写消费者代码
    • 消费者另一种写法
    • 三、Routing(路由模式)
      • 编写生产者代码
      • 编写消费者代码
    • 四、Topics(通配符模式)
      • 编写生产者代码
      • 编写消费者代码


Spring 官方: Spring AMQP
RabbitMQ 官方: RabbitMQ tutorial - “Hello World!” | RabbitMQ

一、Work Queue(工作队列模式)

步骤:(后面其它模式也是如此)

  1. 引入依赖

  2. 编写 yml 配置文件,基本信息配置

  3. 编写生产者代码

  4. 编写消费者代码

    1. 定义监听类,使用@RabbitListener注解完成队列监听
  5. 运行观察结果

引入依赖

<!--Spring MVC相关依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--RabbitMQ相关依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

添加配置

# 配置RabbitMQ的基本信息spring:rabbitmq:host:127.0.0.1port:5672username:lirenpassword:123123virtual-host:lirendada

编写生产者代码

常量类:

publicclassConstants{publicstaticfinalStringWORK_QUEUE="work_queue";}

然后在 config 包中声明队列:(注意包要导对~)

importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.QueueBuilder;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{@Bean("workQueue")publicQueueWorkQueue(){returnQueueBuilder.durable(Constants.WORK_QUEUE).build();}}

最后在需要发送消息的地方调用RabbitTemplate发送消息:

@RequestMapping("/producer")@RestControllerpublicclassProducerController{@AutowiredprivateRabbitTemplaterabbitTemplate;@RequestMapping("/work")publicStringwork(){for(inti=0;i<10;i++){// 使用内置交换机发送消息, routingKey和队列名称保持一致rabbitTemplate.convertAndSend("",Constants.WORK_QUEUE,"hello spring amqp: work...");}return"发送成功";}}

编写消费者代码

定义监听类,用于消费队列中的消息:(注意包要导对~)

importcom.liren.springbootrabbitmq.constant.Constants;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassWorkListener{@RabbitListener(queues=Constants.WORK_QUEUE)publicvoidworkqueue1(Messagemessage){System.out.println("workqueue1 ["+Constants.WORK_QUEUE+"]收到消息:"+message);}@RabbitListener(queues=Constants.WORK_QUEUE)publicvoidworkqueue2(Stringmessage){System.out.println("workqueue2 ["+Constants.WORK_QUEUE+"]收到消息:"+message);}}

@RabbitListener是 Spring 框架中用于监听 RabbitMQ 队列的注解,通过使用这个注解,可以定义一个方法,以便从 RabbitMQ 队列中接收消息。该注解支持多种参数类型,这些参数类型代表了从 RabbitMQ 接收到的消息和相关信息。

以下是一些常用的参数类型:

  1. String:返回消息的内容
  2. Message(org.springframework.amqp.core.Message):Spring AMQP 的Message类,返回原始的消息体以及消息的属性,如消息ID、内容、队列信息等。
  3. Channel(com.rabbitmq.client.Channel):RabbitMQ 的通道对象,可以用于进行更高级的操作,如手动确认消息。

运行结果

运行程序,然后发起请求,会有三个队列接收消息,如下所示:

管理页面中可以看到三个消费者以及一个生产者通道:

二、Publish/Subscribe(发布/订阅模式)

RabbitMQ 交换机常见三种类型:fanoutdirecttopic,不同类型有着不同的路由策略。

  1. Fanout广播策略,将消息交给所有绑定到该交换机的队列(Publish/Subscribe 模式
  2. Direct定向策略,把消息交给符合指定routing key的队列(Routing 模式
  3. Topic通配符策略,把消息交给符合routing pattern的队列(Topics 模式

编写生产者代码

常量类:

// 发布订阅模式publicstaticfinalStringFANOUT_QUEUE1="fanout.queue1";publicstaticfinalStringFANOUT_QUEUE2="fanout.queue2";publicstaticfinalStringFANOUT_EXCHANGE="fanout.exchange";

然后在 config 包中声明队列:(注意包要导对~)

// 发布订阅模式@Bean("publishConfirmQueue1")publicQueuepublishConfirmQueue1(){returnQueueBuilder.durable(Constants.FANOUT_QUEUE1).build();// 声明队列}@Bean("publishConfirmQueue2")publicQueuepublishConfirmQueue2(){returnQueueBuilder.durable(Constants.FANOUT_QUEUE2).build();// 声明队列}@Bean("fanoutExchange")publicFanoutExchangefanoutExchange(){returnExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).build();// 声明交换机}@Bean("fanoutBinding1")publicBindingfanoutBinding1(@Qualifier("publishConfirmQueue1")Queuequeue,@Qualifier("fanoutExchange")FanoutExchangeexchange){returnBindingBuilder.bind(queue).to(exchange);// 绑定交换机和队列}@Bean("fanoutBinding2")publicBindingfanoutBinding2(@Qualifier("publishConfirmQueue2")Queuequeue,@Qualifier("fanoutExchange")FanoutExchangeexchange){returnBindingBuilder.bind(queue).to(exchange);// 绑定交换机和队列}

使用接口发送消息

@RequestMapping("/fanout")publicStringfanout(){rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello spring amqp: fanout...");return"发送成功!";}

编写消费者代码

@ComponentpublicclassFanoutListener{@RabbitListener(queues=Constants.FANOUT_QUEUE1)publicvoidfanoutQueue1(Stringmessage){System.out.println("fanoutQueue1 ["+Constants.FANOUT_QUEUE1+"]收到消息:"+message);}@RabbitListener(queues=Constants.FANOUT_QUEUE2)publicvoidfanoutQueue2(Stringmessage){System.out.println("fanoutQueue2 ["+Constants.FANOUT_QUEUE2+"]收到消息:"+message);}}

消费者另一种写法

@RabbitListener是一个功能强大的注解。这个注解里面可以配置@QueueBinding@Queue@Exchange,直接通过这个组合注解一次性搞定多个交换机、绑定、路由、并且配置监听功能等

@Slf4j@ComponentpublicclassUserRegisterListener{@RabbitListener(bindings=@QueueBinding(value=@Queue(value=Constants.USER_QUEUE_NANE,// 队列名durable="true"// 是否持久化),exchange=@Exchange(value=Constants.USER_EXCHANGE_NAME,// 交换机名type=ExchangeTypes.FANOUT// fanout 交换机)// fanout 不需要 routingKey))publicvoidMailListenerQueue(Messagemessage,Channelchannel)throwsIOException{longdeliveryTag=message.getMessageProperties().getDeliveryTag();try{// 处理用户注册消息Stringbody=newString(message.getBody());log.info("用户注册消息处理成功,deliveryTag={}, message={}",deliveryTag,body);// 发送邮件TODO// 确认消息channel.basicAck(deliveryTag,true);}catch(Exceptione){// 异常拒绝消息,进行重发channel.basicNack(deliveryTag,true,true);log.error("用户注册消息处理失败,拒绝消息,deliveryTag={}",deliveryTag,e);}}}

启动时 Spring AMQP 会做的事情,顺序大致是:

  1. QueueDeclare
    1. 声明一个 durable 队列
  2. ExchangeDeclare
    1. 声明一个 fanout 交换机
  3. QueueBind
    1. 把队列绑定到交换机

三、Routing(路由模式)

RabbitMQ 交换机常见三种类型:fanoutdirecttopic,不同类型有着不同的路由策略。

  1. Fanout广播策略,将消息交给所有绑定到该交换机的队列(Publish/Subscribe 模式
  2. Direct定向策略,把消息交给符合指定routing key的队列(Routing 模式
  3. Topic通配符策略,把消息交给符合routing pattern的队列(Topics 模式

路由模式采用的是 RabbitMQ 中的Direct定向策略,生产者发送消息的时候,交换机需要根据消息中的Routing Key将消息发送给指定的队列,而不是发给每一个队列了!

此时,队列和交换机的绑定,不能是任意的绑定了,而是要指定一个Binding Key

只有队列绑定时的Binding Key和消息中的Routing Key完全一致,队列才会接收到消息

编写生产者代码

常量类:

// 路由模式publicstaticfinalStringDIRECT_EXCHANGE="direct.exchange";publicstaticfinalStringDIRECT_QUEUE1="direct.queue1";publicstaticfinalStringDIRECT_QUEUE2="direct.queue2";

和发布订阅模式的区别是:交换机类型不同、绑定队列的Binding Key不同。

// 路由模式(direct模式)@Bean("directQueue1")publicQueuedirectQueue1(){returnQueueBuilder.durable(Constants.DIRECT_QUEUE1).build();// 声明队列}@Bean("directQueue2")publicQueuedirectQueue2(){returnQueueBuilder.durable(Constants.DIRECT_QUEUE2).build();// 声明队列}@Bean("directExchange")publicDirectExchangedirectExchange(){returnExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).build();// 声明交换机}// 队列1绑定orange@Bean("directBinding1")publicBindingdirectBinding1(@Qualifier("directQueue1")Queuequeue,@Qualifier("directExchange")DirectExchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with("orange");// 绑定交换机和队列,以及bindingKey}// 队列2绑定green、black@Bean("directBinding2")publicBindingdirectBinding2(@Qualifier("directQueue2")Queuequeue,@Qualifier("directExchange")DirectExchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with("green");// 绑定交换机和队列,以及bindingKey}@Bean("directBinding3")publicBindingdirectBinding3(@Qualifier("directQueue2")Queuequeue,@Qualifier("directExchange")DirectExchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with("black");// 绑定交换机和队列,以及bindingKey}

使用接口发送消息:

@RequestMapping("/direct/{routing_key}")publicStringdirct(@PathVariable("routing_key")Stringrouting_key){rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,routing_key,"hello spring amqp: direct..."+routing_key);return"发送成功!";}

编写消费者代码

@ComponentpublicclassDirectListener{@RabbitListener(queues=Constants.DIRECT_QUEUE1)publicvoiddirectQueue1(Stringmessage){System.out.println("directQueue1 ["+Constants.DIRECT_QUEUE1+"]收到消息:"+message);}@RabbitListener(queues=Constants.DIRECT_QUEUE2)publicvoiddirectQueue2(Stringmessage){System.out.println("directQueue2 ["+Constants.DIRECT_QUEUE2+"]收到消息:"+message);}}

分别请求三个不同的 routingkey,结果如下所示:

四、Topics(通配符模式)

Topics 和 Routing 模式的区别是:

  1. 交换机类型不同:Topics 模式使用的交换机类型为topic;Routing 模式使用的交换机类型为direct
  2. 匹配规则不同:topic类型的交换机在匹配规则上进行了扩展,Binding Key支持通配符匹配;direct类型的交换机路由规则是Binding KeyRouting Key完全匹配。

匹配规则有如下要求:

  1. Routing Key是一系列由点.分隔的单词,比如 “stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  2. Binding KeyRouting Key一样,也是点.分割的字符串
  3. Binding Key中可以存在两种特殊字符串,用于模糊匹配
    1. \*:表示一个单词
    2. #:表示多个单词(0-N个)

编写生产者代码

常量类:

// 通配符模式publicstaticfinalStringTOPIC_EXCHANGE="topic.exchange";publicstaticfinalStringTOPIC_QUEUE1="topic.queue1";publicstaticfinalStringTOPIC_QUEUE2="topic.queue2";

生产者代码如下所示:

// 通配符模式(topics模式)@Bean("topicQueue1")publicQueuetopicQueue1(){returnQueueBuilder.durable(Constants.TOPIC_QUEUE1).build();// 声明队列}@Bean("topicQueue2")publicQueuetopicQueue2(){returnQueueBuilder.durable(Constants.TOPIC_QUEUE2).build();// 声明队列}@Bean("topicExchange")publicTopicExchangetopicExchange(){returnExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).build();// 声明交换机}// 队列1绑定error, 仅接收error信息@Bean("topicBinding1")publicBindingtopicBinding1(@Qualifier("topicQueue1")Queuequeue,@Qualifier("topicExchange")TopicExchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with("*.error");// 绑定交换机和队列,以及bindingKey}// 队列2绑定info, error: error,info信息都接收@Bean("topicBinding2")publicBindingtopicBinding2(@Qualifier("topicQueue2")Queuequeue,@Qualifier("topicExchange")TopicExchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with("*.error");// 绑定交换机和队列,以及bindingKey}@Bean("topicBinding3")publicBindingtopicBinding3(@Qualifier("topicQueue2")Queuequeue,@Qualifier("topicExchange")TopicExchangeexchange){returnBindingBuilder.bind(queue).to(exchange).with("#.info");// 绑定交换机和队列,以及bindingKey}

使用接口发送消息:

@RequestMapping("/topics/{routing_key}")publicStringtopics(@PathVariable("routing_key")Stringrouting_key){rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routing_key,"hello spring amqp: topics..."+routing_key);return"发送成功!";}

编写消费者代码

@ComponentpublicclassTopicListener{@RabbitListener(queues=Constants.TOPIC_QUEUE1)publicvoidtopicQueue1(Stringmessage){System.out.println("topicQueue1 ["+Constants.TOPIC_QUEUE1+"]收到消息:"+message);}@RabbitListener(queues=Constants.TOPIC_QUEUE2)publicvoidtopicQueue2(Stringmessage){System.out.println("topicQueue2 ["+Constants.TOPIC_QUEUE2+"]收到消息:"+message);}}

分别请求两个不同的请求以及参数之后,运行结果如下:

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 15:25:41

用Arduino打造智能大棚监控系统(附全套资源)

基于Arduino的温室大棚环境监测与控制系统&#xff1a; 1.使用DHT11温湿度传感器&#xff0c;实时监测大棚温湿度&#xff0c;数据一方面实时显示在OLED屏&#xff0c;另一方面上传手机APP&#xff0c;湿度过低时自动控制加湿器进行加湿&#xff0c;达到一定湿度后停止加湿&…

作者头像 李华
网站建设 2026/4/16 13:57:58

Multisim14仿真建模新手教程:零基础完成LED驱动设计

从零开始用Multisim14设计LED驱动电路&#xff1a;新手也能看懂的实战教程你是不是也曾经面对一堆电子元件和复杂的电路图&#xff0c;心里直打鼓&#xff1a;“这玩意儿怎么连&#xff1f;接错了会不会烧&#xff1f;”别担心&#xff0c;现在我们有了像Multisim14这样的仿真工…

作者头像 李华
网站建设 2026/4/10 23:16:07

IDA Pro下载文件校验:确保安全安装的步骤

如何安全下载 IDA Pro&#xff1a;绕过陷阱&#xff0c;确保工具链可信 你有没有想过&#xff0c;你每天依赖的逆向工程“利器”本身&#xff0c;会不会已经被人动了手脚&#xff1f; 在安全研究的世界里&#xff0c;我们习惯于剖析恶意软件、追踪攻击者踪迹&#xff0c;却常…

作者头像 李华
网站建设 2026/4/16 14:11:51

原子级薄材料显著缩小量子比特体积

原子级薄材料显著缩小量子比特体积 麻省理工学院的研究人员使用二维材料制造量子电路中的电容&#xff0c;以推动量子处理器的规模化。 量子计算的挑战 量子计算是一项极其复杂的技术&#xff0c;其发展面临诸多技术障碍。其中两个关键问题尤为突出&#xff1a;微型化和量子比特…

作者头像 李华