PHP RabbitMQ最简单的队列模式就是一个生产者和一个消费者, 架构如下图。
说明:
P 代表生产者 , C 代表消费者,红色代表队列。
目前PHP操作RabbitMQ,官方推荐使用的是php-amqplib这个包。
1.前置教程
请先阅读下面章节,了解相关知识
2.安装PHP依赖包
使用composer方式安装
composer require php-amqplib/php-amqplib
提示:php-amqplib最新版本要求PHP 7.0以上
导入php-amqplib包
require_once __DIR__ . '/vendor/autoload.php';
3.发送消息
3.1. 创建RabbitMQ连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
参数说明:
- new AMQPStreamConnection(‘rabbitMQ服务器地址’, 端口号, ‘账号’, ‘密码’);
3.2. 创建Channel
$channel = $connection->channel();
大部分操作都是在Channel(信道 )完成的。
3.3. 声明队列
$channel->queue_declare(
'tizi365_hello', // 队列名,必须唯一
false,
true, // 是否持久化
false,
false
);
3.4. 推送消息
// 定义消息对象,参数我们需要发送的消息内容
$msg = new AMQPMessage('Hello World!');
// 消息是否持久化
// $msg->set('delivery_mode', AMQPMessage::DELIVERY_MODE_PERSISTENT);
// 发送消息
$channel->basic_publish(
$msg, // 消息对象
'', // 忽略交换机(exchange)
'tizi365_hello' // 路由参数,这里使用队列名作为路由参数
);
3.5. PHP发送消息完整代码
<?php require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 创建连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建channel
$channel = $connection->channel();
// 声明队列
$channel->queue_declare('tizi365_hello', false, false, false, false);
// 定义消息对象
$msg = new AMQPMessage('Hello World!');
// 发送消息
$channel->basic_publish($msg, '', 'tizi365_hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
保存至文件: send.php
4.消费消息
消费消息编码上的前面三个步骤(创建rabbitmq连接、创建channel、声明队列)跟发送消息一样,分别对应3.1、3.2、3.3章节。
完整的消费者代码如下
<?php require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
// 创建rabbitmq连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 创建channel
$channel = $connection->channel();
// 声明队列
$channel->queue_declare('tizi365_hello', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
// 定义消息处理函数(这里使用匿名函数)
$callback = function ($msg) {
// 消息处理逻辑
echo ' [x] Received ', $msg->body, "\n";
};
// 创建消费者
$channel->basic_consume(
'tizi365_hello', // 队列名,需要消费的队列名
'', // 消费者名,忽略,则自动生成一个唯一ID
false,
true, // 是否自动提交消息,即自动告诉rabbitmq消息已经处理成功。
false,
false,
$callback // 消息处理函数
);
// 如果信道没有关闭,则一直阻塞进程,避免进程退出
while ($channel->is_open()) {
$channel->wait();
}
// 释放资源
$channel->close();
$connection->close();
保存至文件: recv.php
5.执行DEMO
打开两个shell窗口, 分别执行两个脚本
# shell窗口1
php recv.php
# shell窗口2
php send.php