在laravel框架中的command使用php-amqplib扩展对接rabbitMQ流程步骤
Laravel  /  管理员 发布于 10个月前   412
php-amqplib扩展包是 AMQP 0-9-1 协议的纯 PHP 实现。它已通过 RabbitMQ 测试。
RabbitMQ in Action 的 PHP 示例和 RabbitMQ 官方教程都使用了该库。
进入安装步骤:
1、安装php-amqplib
composer require php-amqplib/php-amqplib
2、在 config 文件夹中创建 rabbit_mq.php 配置对应的参数
return [
'host' => env('RABBIT_MQ_HOST', '127.0.0.1'),
'port' => env('RABBIT_MQ_PORT', '5672'),
'user' => env('RABBIT_MQ_USER', 'guest'),
'password' => env('RABBIT_MQ_PASSWORD', 'guest'),
'vhost' => env('RABBIT_MQ_VHOST', '/'),
];
3、创建 RabbitMqService 接口,并实现
interface RabbitMqService
{
public function __construct();
public function createExchange(string $exchangeName, string $type = 'direct', bool $passive = false, bool $durable = true, bool $autoDelete = false);
public function createQueue(string $queueName, bool $passive = false, bool $durable = true, bool $exclusive = false, bool $autoDelete = false);
public function bindQueue(string $queueName, string $exchangeName, string $routingKey = '');
public function confirm(callable $callback);
public function returnListen(callable $callback);
public function publishMessage(string $exchangeName, string $messageBody, string $correlationId = null);
public function waitAck();
public function handle(string $queueName, callable $callback);
public function __destruct();
}
class RabbitMqServiceImpl implements RabbitMqService
{
private AMQPStreamConnection $connection;
private AMQPChannel $channel;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
config('rabbit_mq.host'),
config('rabbit_mq.port'),
config('rabbit_mq.user'),
config('rabbit_mq.password'),
config('rabbit_mq.vhost')
);
$this->channel = $this->connection->channel();
}
public function createExchange(string $exchangeName, string $type = 'direct', bool $passive = false, bool $durable = true, bool $autoDelete = false)
{$this->channel->exchange_declare($exchangeName, $type, $passive, $durable, $autoDelete);
}
public function createQueue(string $queueName, bool $passive = false, bool $durable = true, bool $exclusive = false, bool $autoDelete = false)
{
$this->channel->queue_declare($queueName, $passive, $durable, $exclusive, $autoDelete);
}
public function bindQueue(string $queueName, string $exchangeName, string $routingKey = '')
{
$this->channel->queue_bind($queueName, $exchangeName, $routingKey);
}
public function confirm(callable $callback)
{
$this->channel->confirm_select();
$this->channel->set_ack_handler(function (AMQPMessage $message) use ($callback) {
$callback(true, $message, $this->channel);
});
$this->channel->set_nack_handler(function (AMQPMessage $message) use ($callback) {
$callback(false, $message, $this->channel);
});
}
public function returnListen(callable $callback)
{
$this->channel->set_return_listener(function(int $replyCode, string $replyText, string $exchange, string $routingKey, AMQPMessage $message) use ($callback) {
$error = $exchange . '[' . $replyCode .'] ' . $replyText;
$callback($error, $message, $this->channel);
});
}
public function publishMessage(string $exchangeName, string $messageBody, string $correlationId = null)
{
$properties = [];
if ($correlationId === null) {
$correlationId = Str::uuid();
}
$properties['correlation_id'] = $correlationId;
$message = new AMQPMessage($messageBody, $properties);
$this->channel->basic_publish($message, $exchangeName, '', true);
}
public function waitAck()
{
$this->channel->wait_for_pending_acks_returns();
}
public function handle(string $queueName, callable $callback)
{
$this->channel->basic_qos(null, 1, false);
$this->channel->basic_consume($queueName, '', false, false, false, false, function ($message) use ($callback) {
$callback($message, $this->channel);
});
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
public function __destruct()
{
$this->channel->close();
$this->connection->close();
}
}
4、注册服务
$this->app->bind(RabbitMqService::class, RabbitMqServiceImpl::class);
5、创建 console command
class SetupRabbitMQ extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'rabbitmq:setup';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Setup RabbitMQ exchanges and queues';
protected RabbitMqService $rabbitMqService;
/**
* Create a new command instance.
*
* @return void
*/
public function __construct(RabbitMqService $rabbitMqService)
{
parent::__construct();
$this->rabbitMqService = $rabbitMqService;
}
/**
* Execute the console command.
*
* @return void
*/
public function handle()
{
// 创建交换机
$this->rabbitMqService->createExchange('exchange1');
// 创建队列
$this->rabbitMqService->createQueue('exchange1-queue1');
$this->rabbitMqService->createQueue('exchange1-queue2');
// 绑定队列到交换机
$this->rabbitMqService->bindQueue('exchange1-queue1', 'exchange1');
$this->rabbitMqService->bindQueue('exchange1-queue2', 'exchange1');
$this->info('Exchange and queue have been set up successfully.');
}
}
6、制作生产者
class RabbitMqTest extends TestCase
{
public function testSend()
{
$rabbitMqService = new RabbitMqServiceImpl();
$rabbitMqService->confirm(function (bool $ack, AMQPMessage $message) {
if ($ack) {
echo "id " . $message->get('correlation_id') . " 消息成功发送至Exchange" . PHP_EOL;
} else {
echo "id " . $message->get('correlation_id') . " 消息发送到Exchange失败" . PHP_EOL;
}
});
$rabbitMqService->returnListen(function (string $error, AMQPMessage $message) {
echo "id " . $message->get('correlation_id') . " " . $error . PHP_EOL;
});
$count = 10;
for ($i = 1; $i <= $count; $i++) {
$rabbitMqService->publishMessage('exchange1', 'Hello, RabbitMQ! My Count Is: ' . $i . ' ', $i);
}
$rabbitMqService->waitAck();
$this->assertTrue(true);
}
}
7、制作消费者 (只举例一个队列)
class RabbitMQExchange1Queue1Job extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'rabbitmq:exchange1-queue1';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Starts the RabbitMQ [exchange1-queue1]worker';
protected RabbitMqService $rabbitMqService;
/**
* Create a new command instance.
*
* @return void
*/
public function __construct(RabbitMqService $rabbitMqService)
{
parent::__construct();
$this->rabbitMqService = $rabbitMqService;
}
/**
* Execute the console command.
*
* @return void
*/
public function handle()
{
$this->rabbitMqService->handle('exchange1-queue1', function (AMQPMessage $message, AMQPChannel $channel) {
try {
// 处理消息
$this->info('[exchange1-queue1][' . $message->get('correlation_id') . '] Received ' . $message->body . ' At ' . Carbon::now()->format('Y-m-d H i s'));
// 如果没有异常,发送 basic_ack
$channel->basic_ack($message->delivery_info['delivery_tag']);
} catch (\Exception $e) {
if ($message->delivery_info['redelivered']) {
// 如果消息已经被重新发送过,发送 basic_reject
$channel->basic_reject($message->delivery_info['delivery_tag'], false);
} else {
// 如果消息没有被重新发送过,发送 basic_nack 并设置 requeue 为 true
$channel->basic_nack($message->delivery_info['delivery_tag'], false, true);
}
}
});
}
}
8、运行消费者(区分windows/linux系统)
windows:
php artisan rabbitmq:exchange1-queue1
linux:(可以直接用自带的定时任务)
这里我使用的 docker 安装了 Supervisor
[program:exchange1-queue1]
process_name=%(program_name)s_%(process_num)02d
command=php /www/artisan rabbitmq:exchange1-queue1
autostart=true
autorestart=true
user=root
numprocs=2
redirect_stderr=true
stdout_logfile=/www/storage/logs/exchange1-queue1.log
这样就完成了一个简单的mq对接示例,
有兴趣的自行测试,有业务逻辑的根据实际逻辑稍作修改。
123 在
Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..原梓番博客 在
在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..博主 在
佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..1111 在
佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..路人 在
php中使用hyperf框架调用讯飞星火大模型实现国内版chatgpt功能示例中评论 教程很详细,如果加个前端chatgpt对话页面就完美了..Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号