Java RabbitMQ最简单的队列模式就是一个生产者和一个消费者。
目前Java操作RabbitMQ主要使用Springboot的spring-boot-starter-amqp包, 其实就是使用Spring AMQP操作队列。
1.前置教程
请先阅读下面章节,了解相关知识
2.依赖包
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</dependency>
3.配置RabbitMQ
修改application.yml配置
spring:
rabbitmq:
# rabbitMQ服务器地址
host: localhost
port: 5672
username: guest
password: guest
4.声明队列
通过springboot的configuration类配置队列
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
@Bean
public Queue helloQueue() {
// 声明队列, 队列名需要唯一
return new Queue("hello");
}
}
提示: 你可以根据业务需要定义多个队列,队列名和Queue的bean id不一样即可, 在这里方法名就是bean的id。
5.发送消息
发送消息需要用到RabbitTemplate类,springboot已经帮我们初始化,注入实例即可
package com.tizi365.rabbitmq.service;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class SendService {
// 注入RabbitTemplate实例
@Autowired
private RabbitTemplate template;
// 注入前面定义的队列
@Autowired
@Qualifier("helloQueue")
private Queue helloQueue;
// 为了演示,这里使用spring内置的定时任务,定时发送消息(每秒发送一条消息)
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
public void send() {
// 消息内容
String message = "Hello World!";
// 发送消息
// 第一个参数是路由参数,这里使用队列名作为路由参数
// 第二个参数是消息内容,支持任意类型,只要支持序列化
template.convertAndSend(helloQueue.getName(), message);
System.out.println("发送消息 '" + message + "'");
}
}
提示:这里没有直接使用交换机(exchange),底层会使用默认的交换机(Direct交换机)。
6.接收消息
消费者接收消息也很简单
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// 声明消息监听器,通过queues参数指定监听的队列,需要跟前面的队列名保持一致
@RabbitListener(queues = "hello")
public class HelloListener {
// 使用RabbitHandler标记消息处理器,用来执行消息处理逻辑
@RabbitHandler
public void receive(String msg) {
System.out.println("消费者 - 收到消息 '" + msg + "'");
}
}
7.自定义消息类型
前面我们发送的消息是一个字符串类型,实际业务中我们更愿意直接发送各种自定义Java对象类型的数据。
定义一个实体对象
package com.tizi365.rabbitmq.domain;
import java.io.Serializable;
import lombok.Data;
// 博客内容
@Data
public class Blog implements Serializable {
// id
private Integer id;
// 标题
private String title;
}
发送自定义类型消息
Blog blog = new Blog();
blog.setId(100);
blog.setTitle("Tizi365 RabbitMQ教程");
// 发送消息
template.convertAndSend(helloQueue.getName(), blog);
接收自定义类型消息
@RabbitHandler
// 方法参数改为自定义消息类型即可
public void receive(Blog msg) {
System.out.println("消费者 - 收到消息 '" + msg.getTitle() + "'");
}
使用Json序列化消息内容
RabbitMQ发送Java实体对象数据的时候,默认使用JDK的对象序列化工具,我们可以改成使用json格式对数据进行序列化,这样可以支持其他类型的语言消费Java发送出去的消息,同时也让消息格式更具有可读性。
修改前面的配置类,增加下面配置, 使用Jackson json解析器对消息数据进行序列化和反序列化。
@Bean
public Jackson2JsonMessageConverter messageConverter() {
// 设置默认消息转换器
return new Jackson2JsonMessageConverter();
}