Java RabbitMQ主题模式(Topic模式),使用的交换机类型为TopicExchange,跟路由模式(Direct)的区别就路由参数支持模糊匹配,因为路由匹配比较灵活,所以是比较常用的模式,架构如下图
提示:无论使用RabbitMQ那种工作模式,区别就是使用的交换机(Exchange)类型和路由参数不一样。
1.前置教程
请先阅读下面章节,了解相关知识
- RabbitMQ基础概念
- RabbitMQ 主题模式原理
- RabbitMQ Java快速入门章节 (必读,因为后续章节不会重复贴代码,仅展示关键代码)
- Java RabbitMQ 发布订阅模式章节 (必读,因为代码写法几乎一样,就是交换机类型和路由参数不一样)
2.定义Topic交换机
在Spring AMQP中Direct交换机对应的类就是TopicExchange,我们通过Springboot配置类,定义交换机。
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// 定义交换机
// 参数为交换机名字,不能重复
return new TopicExchange("tizi365.topic");
}
}
提示: 无论是消息生产者还是消费者都需要交换机。
3.发送消息
我们将消息发送给交换机,由交换机根据路由规则投递消息到对应的队列。
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
@Autowired
private RabbitTemplate template;
@Autowired
private TopicExchange topic;
// 为演示,这里使用定时任务,每秒发送一条消息
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// 消息内容
String message = "Hello World!";
// 发送消息
// 第一个参数是交换机名字
// 第二个参数是路由参数,topic交换机将消息投递到路由参数匹配的队列
// 第三个参数是消息内容,支持任意类型,只要支持序列化
template.convertAndSend(topic.getName(), "www.tizi365.com", message);
System.out.println("发送消息 '" + message + "'");
}
}
提示:注意convertAndSend方法中的第二个参数”www.tizi365.com”,这是个关键参数。
4.接收消息
4.1.定义队列&绑定交换机
要想消费队列消息,需要先定义一个队列,然后将队列绑定到目标交换机上。
下面定义两个队列,分别绑定到同一个交换机上
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public TopicExchange topic() {
// 定义交换机
// 参数为交换机名字,不能重复
return new TopicExchange("tizi365.topic");
}
@Bean
public Queue queue1() {
// 定义队列1
return new Queue("tizi365.topic.queue1");
}
@Bean
public Queue queue2() {
// 定义队列2
return new Queue("tizi365.topic.queue2");
}
@Bean
public Binding binding1(TopicExchange topic, Queue queue1) {
// 定义一个绑定关系,将队列1绑定到topic交换机上, 路由参数为:*.tizi365.com
return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
}
@Bean
public Binding binding2(TopicExchange topic, Queue queue2) {
// 定义一个绑定关系,将队列2绑定到direct交换机上, 路由参数为:*.baidu.com
return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
}
}
提示: 队列1和队列2绑定交换机的时候,设置的路由参数都使用了*(星号)通配符,可以匹配一个单词,如果改成 # (井号),则可以匹配多个单词。
4.2.定义队列监听器
通过RabbitListener注解定义消息监听器,消费指定队列的消息。
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// 将当前类交给Spring管理
@Component
public class DemoListener {
// 定义一个监听器,通过queues参数指定监听那个队列
@RabbitListener(queues = "tizi365.topic.queue1")
public void receive1(String msg) {
System.out.println("收到队列1的消息 = " + msg);
}
// 定义一个监听器,通过queues参数指定监听那个队列
@RabbitListener(queues = "tizi365.topic.queue2")
public void receive2(String msg) {
System.out.println("收到队列2的消息 = " + msg);
}
}
只有队列1,绑定交换机的时候设置的路由参数为*.tizi365.com, 匹配消息的路由参数(www.tizi365.com), 所以只有队列1可以收到消息,队列2因为路由参数不匹配,所提收不到。