一架梯子,一头程序猿,仰望星空!
RabbitMQ教程 > 内容正文

PHP RabbitMQ 发布订阅模式(Fanout模式)


PHP RabbitMQ发布订阅模式(又称广播模式、fanout模式),就是一个生产者发送的消息会被多个队列的消费者处理,架构如下图

RabbitMQ Work模式

Fanout交换机可以将消息转发给所有绑定的队列。

提示:无论使用RabbitMQ那种工作模式,区别就是使用的交换机(Exchange)类型和路由参数不一样。

1.前置教程

请先阅读下面章节,了解相关知识

2.定义Fanout交换机

通过channel的exchange_declare函数定义交换机

$channel->exchange_declare(
        'tizi365.fanout', // 交换机名,需要唯一,不能重复
        'fanout', // 交换机类型
        false,
        false, // 是否持久化
        false
    );

提示: 无论是消息生产者还是消费者都需要交换机。

3.发送消息

我们将消息发送给交换机,由交换机根据路由规则投递消息到对应的队列。

<?php require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 创建rabbitmq连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建Channel
$channel = $connection->channel();

// 声明交换机
$channel->exchange_declare(
    'tizi365.fanout', // 交换机名,需要唯一,不能重复
    'fanout', // 交换机类型
    false,
    false, // 是否持久化
    false
);
// 消息对象,参数是消息内容
$msg = new AMQPMessage("hello tizi365.com");

$channel->basic_publish(
    $msg, // 消息对象
    'tizi365.fanout' // 交换机名字
);

echo ' [x] Sent ', $msg->getBody(), "\n";

// 释放资源
$channel->close();
$connection->close();

4.接收消息

4.1.定义队列&绑定交换机

要想消费队列消息,需要先定义一个队列,然后将队列绑定到目标交换机上。
下面定义一个队列,绑定到指定交换机上

// 声明队列,如果队列名为空,则自动生成一个唯一ID,且返回队列名
list($queue_name, ,) = $channel->queue_declare(
    "", // 队列名,不允许重复,如果为空则自动生成一个唯一ID,这个时候就是一个匿名队列
    false,
    false, // 是否持久化
    true,
    false
);

// 队列绑定指定交换机
$channel->queue_bind(
    $queue_name, // 队列名
    'tizi365.fanout' // 交换机名字
);

4.2.接收消息完整代码

<?php require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 创建rabbitmq连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建Channel
$channel = $connection->channel();

// 声明交换机
$channel->exchange_declare(
    'tizi365.fanout', // 交换机名,需要唯一,不能重复
    'fanout', // 交换机类型
    false,
    false, // 是否持久化
    false
);

// 声明队列
list($queue_name, ,) = $channel->queue_declare(
    "", // 队列名,不允许重复,如果为空则自动生成一个唯一ID,这个时候就是一个匿名队列
    false,
    false, // 是否持久化
    true,
    false
);

// 队列绑定指定交换机
$channel->queue_bind(
    $queue_name, // 队列名
    'tizi365.fanout' // 交换机名字
);

echo " [*] Waiting for message. To exit press CTRL+C\n";

// 定义消息处理函数(这里使用匿名函数)
$callback = function ($msg) {
    // 消息处理逻辑
    echo ' [x] ', $msg->body, "\n";
};

// 创建消费者
$channel->basic_consume(
    $queue_name, // 队列名,需要消费的队列名
    '', // 消费者名,忽略,则自动生成一个唯一ID
    false,
    true, // 是否自动提交消息,即自动告诉rabbitmq消息已经处理成功。
    false,
    false,
    $callback // 消息处理函数
);

// 如果信道没有关闭,则一直阻塞进程,避免进程退出
while ($channel->is_open()) {
    $channel->wait();
}

// 释放资源
$channel->close();
$connection->close();

因为前面定义交换机(exchange)的时候使用的是fanout类型,所以每一条消息,都会分发给所有绑定到当前交换机的队列中。


推荐教程