RabbitMQ原生不支持延迟消息,目前主要通过死信交换机 + 消息TTL方案或者rabbitmq-delayed-message-exchange插件实现。
延迟队列应用场景
- 对消息生产和消费有时间窗口要求的场景。例如,在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
- 通过消息触发延时任务的场景。例如,在指定时间段之后向用户发送提醒消息。
死信交换机 + 消息TTL方案
这个方案核心思想就是,创建一个没有消费者的队列,借助消息过期时间(TTL),当一条消息过期后会成为死信,这条死信消息会投递到死信交换机,死信交换机将消息发给死信队列,我们只要消费死信队列即可。
在这个方案中,消息过期时间就是消息延迟时间,例如: 消息TTL=30秒,因为这个队列没有消费者,消息30秒后过期,这条消息就变成死信,会被死信队列处理。
实现方案结合下面两个章节的教程设置下属性即可:
延迟消息插件方案
1.安装插件
rabbitmq-delayed-message-exchange的Github地址
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
从github的release页面的assets, 下载rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez文件,把文件放到rabbitmq插件目录(plugins目录)
提示:版本号可能跟本教程不一样,如果你的rabbitmq就是最新版本,插件也选择最新版本就行。
2.激活插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3.定义交换机
通过x-delayed-type设置自定义交换机属性,支持发送延迟消息
props := make(map[string]interface{})
//关键参数,支持发送延迟消息
props["x-delayed-type"] = "direct"
// 声明交换机
err = ch.ExchangeDeclare(
"delay.queue", // 交换机名字
"fanout", // 交换机类型
true, // 是否持久化
false,
false,
false,
props, // 设置属性
)
4.发送延迟消息
通过消息头(x-delay),设置消息延迟时间。
msgHeaders := make(map[string]interface{})
// 通过消息头,设置消息延迟时间,单位毫秒
msgHeaders["x-delay"] = 6000
err = ch.Publish(
"delay.queue", // 交换机名字
"", // 路由参数
false,
false,
amqp.Publishing{
Headers:msgHeaders, // 设置消息头
ContentType: "text/plain",
Body: []byte(body),
})
提示:如果你直接使用阿里云的RabbitMQ消息云服务,通过消息头属性(delay),设置延迟时间即可,不用安装插件,阿里云已经对rabbitmq扩展了。