golang RabbitMQ发布订阅模式(广播模式、fanout模式),就是一个生产者发送的消息会被多个消费者处理。
说明:
- P 代表生产者 , C1、C2 代表消费者,红色代表队列, X代表交换机(Exchange)。
- 交换机(Exchange)负责将消息转发至绑定交换机的所有队列。
- 可以定义多个队列,分别绑定同一个交换机。
- 每个队列可以有一个或者多个消费者。
提示:如果不了解RabbitMQ,请先阅读rabbitmq基础概念章节。
1.安装依赖包
go get github.com/streadway/amqp
2.发送消息
下面分步骤演示消息生产者如何发送消息
2.1. 连接RabbitMQ Server
// 连接RabbitMQ Server
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
连接地址说明:
amqp://账号:密码@RabbitMQ地址:端口/
2.2. 创建Channel
大部分操作都是在Channel(信道 )完成的。
ch, err := conn.Channel()
defer ch.Close()
2.3. 声明交换机
消息先发送到交换机(Exchange),由交换机根据策略转发消息到队列。
err = ch.ExchangeDeclare(
"tizi365", // 交换机名字
"fanout", // 交换机类型,这里使用fanout类型,即: 发布订阅模式
true, // 是否持久化
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
2.4. 推送消息
// 消息内容
body := "Hello Tizi365.com!"
// 推送消息
err = ch.Publish(
"tizi365", // exchange(交换机名字,跟前面声明对应)
"", // 路由参数,fanout类型交换机,自动忽略路由参数,填了也没用。
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain", // 消息内容类型,这里是普通文本
Body: []byte(body), // 消息内容
})
2.5.完整的消息推送代码
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接rabbitmq
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建信道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明交换机
err = ch.ExchangeDeclare(
"tizi365", // 交换机名字
"fanout", // 交换机类型,fanout发布订阅模式
true, // 是否持久化
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 消息内容
body := "Hello Tizi365.com!"
// 推送消息
err = ch.Publish(
"tizi365", // exchange(交换机名字,跟前面声明对应)
"", // 路由参数,fanout类型交换机,自动忽略路由参数,填了也没用。
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain", // 消息内容类型,这里是普通文本
Body: []byte(body), // 消息内容
})
log.Printf("发送内容 %s", body)
}
3.接收消息
接收消息前面三个步骤:连接RabbitMQ、创建信道、声明交换机跟发送消息一样,参考前面2.1、2.2、2.3章节即可。
3.1.声明队列
声明需要操作的队列
q, err := ch.QueueDeclare(
"", // 队列名字,不填则随机生成一个
false, // 是否持久化队列
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
3.2.队列绑定交换机
队列需要绑定到交换机才能接收到消息
err = ch.QueueBind(
q.Name, // 队列名
"", // 路由参数,fanout类型交换机,自动忽略路由参数
"tizi365", // 交换机名字,需要跟消息发送端定义的交换器保持一致
false,
nil)
提示:实际应用中,我们可以定义N个队列,分别绑定到同一个交换机上,就可以接收交换机转发过来的消息,这就是发布订阅模式体现的地方。
3.3.创建消费者
msgs, err := ch.Consume(
q.Name, // 引用前面的队列名
"", // 消费者名字,不填自动生成一个
true, // 自动向队列确认消息已经处理
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
// 循环处理消息
for d := range msgs {
log.Printf("接收消息=%s", d.Body)
}
3.4.完整消费者代码
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接rabbitmq
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建信道,通常一个消费者一个
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明交换机
err = ch.ExchangeDeclare(
"tizi365", // 交换机名,需要跟消息发送方保持一致
"fanout", // 交换机类型
true, // 是否持久化
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 声明需要操作的队列
q, err := ch.QueueDeclare(
"", // 队列名字,不填则随机生成一个
false, // 是否持久化队列
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 队列绑定指定的交换机
err = ch.QueueBind(
q.Name, // 队列名
"", // 路由参数,fanout类型交换机,自动忽略路由参数
"tizi365", // 交换机名字,需要跟消息发送端定义的交换器保持一致
false,
nil)
failOnError(err, "Failed to bind a queue")
// 创建消费者
msgs, err := ch.Consume(
q.Name, // 引用前面的队列名
"", // 消费者名字,不填自动生成一个
true, // 自动向队列确认消息已经处理
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
// 循环消费队列中的消息
for d := range msgs {
log.Printf("接收消息=%s", d.Body)
}
}
3.5.多个消费者
参考Work模式章节,使用协程启动多个消费者即可。