一架梯子,一头程序猿,仰望星空!
RabbitMQ教程 > 内容正文

Golang RabbitMQ发布订阅模式(广播模式、fanout模式)


golang RabbitMQ发布订阅模式(广播模式、fanout模式),就是一个生产者发送的消息会被多个消费者处理。

fanout模式

说明:

  • P 代表生产者 , C1、C2 代表消费者,红色代表队列, X代表交换机(Exchange)。
  • 交换机(Exchange)负责将消息转发至绑定交换机的所有队列。
  • 可以定义多个队列,分别绑定同一个交换机。
  • 每个队列可以有一个或者多个消费者。

提示:如果不了解RabbitMQ,请先阅读rabbitmq基础概念章节。

1.安装依赖包

go get github.com/streadway/amqp

2.发送消息

下面分步骤演示消息生产者如何发送消息

2.1. 连接RabbitMQ Server

// 连接RabbitMQ Server
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

连接地址说明:

amqp://账号:密码@RabbitMQ地址:端口/

2.2. 创建Channel

大部分操作都是在Channel(信道 )完成的。

ch, err := conn.Channel()
defer ch.Close()

2.3. 声明交换机

消息先发送到交换机(Exchange),由交换机根据策略转发消息到队列。

err = ch.ExchangeDeclare(
        "tizi365",   // 交换机名字
        "fanout", // 交换机类型,这里使用fanout类型,即: 发布订阅模式
        true,     // 是否持久化
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )

2.4. 推送消息

// 消息内容
body := "Hello Tizi365.com!"

// 推送消息
err = ch.Publish(
  "tizi365",     // exchange(交换机名字,跟前面声明对应)
  "", // 路由参数,fanout类型交换机,自动忽略路由参数,填了也没用。
  false,  // mandatory
  false,  // immediate
  amqp.Publishing {
    ContentType: "text/plain", // 消息内容类型,这里是普通文本
    Body:        []byte(body),  // 消息内容
  })

2.5.完整的消息推送代码

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    // 连接rabbitmq
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    // 创建信道
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 声明交换机
    err = ch.ExchangeDeclare(
        "tizi365",   // 交换机名字
        "fanout", // 交换机类型,fanout发布订阅模式
        true,     // 是否持久化
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    // 消息内容
    body := "Hello Tizi365.com!"
    // 推送消息
    err = ch.Publish(
        "tizi365",     // exchange(交换机名字,跟前面声明对应)
        "", // 路由参数,fanout类型交换机,自动忽略路由参数,填了也没用。
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
            ContentType: "text/plain", // 消息内容类型,这里是普通文本
            Body:        []byte(body),  // 消息内容
        })

    log.Printf("发送内容 %s", body)
}

3.接收消息

接收消息前面三个步骤:连接RabbitMQ、创建信道、声明交换机跟发送消息一样,参考前面2.1、2.2、2.3章节即可。

3.1.声明队列

声明需要操作的队列

q, err := ch.QueueDeclare(
        "",    // 队列名字,不填则随机生成一个
        false, // 是否持久化队列
        false, // delete when unused
        true,  // exclusive
        false, // no-wait
        nil,   // arguments
    )

3.2.队列绑定交换机

队列需要绑定到交换机才能接收到消息

err = ch.QueueBind(
        q.Name, // 队列名
        "",     // 路由参数,fanout类型交换机,自动忽略路由参数
        "tizi365", // 交换机名字,需要跟消息发送端定义的交换器保持一致
        false,
        nil)

提示:实际应用中,我们可以定义N个队列,分别绑定到同一个交换机上,就可以接收交换机转发过来的消息,这就是发布订阅模式体现的地方。

3.3.创建消费者

msgs, err := ch.Consume(
        q.Name, // 引用前面的队列名
        "",     // 消费者名字,不填自动生成一个
        true,   // 自动向队列确认消息已经处理
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )

// 循环处理消息
for d := range msgs {
            log.Printf("接收消息=%s", d.Body)
        }

3.4.完整消费者代码

package main

import (
    "log"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    // 连接rabbitmq
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    // 创建信道,通常一个消费者一个
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 声明交换机
    err = ch.ExchangeDeclare(
        "tizi365",   // 交换机名,需要跟消息发送方保持一致
        "fanout", // 交换机类型
        true,     // 是否持久化
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    // 声明需要操作的队列
    q, err := ch.QueueDeclare(
        "",    // 队列名字,不填则随机生成一个
        false, // 是否持久化队列
        false, // delete when unused
        true,  // exclusive
        false, // no-wait
        nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")

    // 队列绑定指定的交换机
    err = ch.QueueBind(
        q.Name, // 队列名
        "",     // 路由参数,fanout类型交换机,自动忽略路由参数
        "tizi365", // 交换机名字,需要跟消息发送端定义的交换器保持一致
        false,
        nil)
    failOnError(err, "Failed to bind a queue")

    // 创建消费者
    msgs, err := ch.Consume(
        q.Name, // 引用前面的队列名
        "",     // 消费者名字,不填自动生成一个
        true,   // 自动向队列确认消息已经处理
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    // 循环消费队列中的消息
    for d := range msgs {
        log.Printf("接收消息=%s", d.Body)
    }
}

3.5.多个消费者

参考Work模式章节,使用协程启动多个消费者即可。


推荐教程