团队系统架构是基于RabbitMQ做的消息总线,所以需要在laravel中实现对RMQ的监控功能,在网上看到的现有的扩展包(诸如:laravel-queue-rabbitmq )都是实现了基于rmq实现了laravel框架的queue功能,但是无法实现自定义监听rmq的消息,所以自己写了一套监听rmq自定义消息的功能。这个过程,关键的是自定义Artisan命令以及使用事件(Event)实现对队列消息的个性化处理。

加载amqp的SDK包

composer require php-amqplib/php-amqplib

创建配置文件:config/rabbitMQ.php

<?php

return [
    //rmq账号密码
    'rmq' => [
        'host'                      => env('RMQ_HOST'),
        'port'                      => env('RMQ_PORT'),
        'user'                      => env('RMQ_USER'),
        'pass'                      => env('RMQ_PASS'),
        'vhost'                     => env('RMQ_VHOST'),
    ],


//队列预警监听队列
'WarningNotice' => [
    'consumer_tag'              => 'rmq_warning_notice',
    'exchange'                  => 'rmq_e_pvm_data',
    'queue'                     => 'rmq_q_warning_notice',
    'routing_key'               => 'rmq_r_warning_notice',
    'event_name'                => 'WarningNotice',//定义事件名称,在EventServiceProvider会用到
],
];


自定义Artisan命令:

在app\Console\Kernel.php中的commands中增加

protected $commands = [
        Commands\rmq::class
    ];

在app\Console\Commands下创建rmq.php文件

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Event;

class rmq extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'rmq:q {name?}';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'rmq队列命令';

    /**
     * rmq constructor.
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $queueName = $this->argument('name');//接收队列名称
        $q = config('rabbitMQ.rmq');
        $c = config('rabbitMQ.' . $queueName);//获取队列配置
        //判定合法
        if (empty($queueName)) {
            die('请指定队列' . PHP_EOL);
        } elseif (empty($c)) {
            die('队列' . $queueName . '未配置' . PHP_EOL);
        }
        //初始化队列监听
        $conn = new AMQPStreamConnection($q['host'], $q['port'], $q['user'], $q['pass'], $q['vhost'],
                                        $insist = false,
                                        $login_method = 'AMQPLAIN',
                                        $login_response = null,
                                        $locale = 'en_US',
                                        $connection_timeout = 10,
                                        $read_write_timeout = 120,
                                        $context = null,
                                        $keepalive = true,
                                        $heartbeat = 60);
        $ch = $conn->channel();
        $ch->queue_declare($c['queue'], false, true, false, false);
        $ch->exchange_declare($c['exchange'], 'topic', false, true, false);
        $ch->queue_bind($c['queue'], $c['exchange'], $c['routing_key']);
        $ch->basic_consume($c['queue'], $c['consumer_tag'], false, false, false, false, function ($msg) use ($queueName) {
            //接收队列消息,触发对应事件
            \Log::info('队列消息接收:' . $msg->body);
            $dataArr = json_decode($msg->body, 1);
            $object = "App\\Events\\" . $queueName;//queueName 在配置文件中设置
            Event::fire(new $object($dataArr));
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            // Send a message with the string "quit" to cancel the consumer.
            if ($msg->body == 'quit') {
                $msg->delivery_info['channel']->basic_cancel($msg->delivery_info['consumer_tag']);
            }
        });
        while (count($ch->callbacks)) {
            $read = array($conn->getSocket()); // add here other sockets that you need to attend
            $write = null;
            $except = null;
            if (false === ($numChangedStreams = stream_select($read, $write, $except, 60))) {
                /* Error handling */
                \Log::error('rmq监听异常');
            } elseif ($numChangedStreams > 0) {
                $ch->wait();
            }
        }
    }

}

监听Listenter配置

上面全做完之后,我们就可以在app\Providers\EventServiceProvider.php中订阅事件,

protected $listen = [
        //预警消息
        'App\Events\WarningNotice' => [
            'App\Listeners\SendSms',//发送预警短信
        ],
    ];

其中“WarningNotice”事件的名称是在rabbitMQ.php中配置的,而“SendSms”则是监听者,如实现发短信的动作。

自此,代码部分全部定义完成,不同的队列,只需要在配置文件中定义就可以使用。

在控制台进行队列监听指令:


php artisan rqm:q {queue}

其中 “queue”即为队列名称,在RMQ中和配置文件中必须一一对应即可。

后台用运行队列监听

与laravel队列监听一样使用supervisor

[program:pvm-center-rmq-warning]                                                                                                                                                                                                         
[
process_name=%(program_name)s_%(process_num)02d                                                                                                                                                                                          
command=php /home/deploy/{path}/artisan rmq:q {queue}                                                                                                                                                   
autostart=true                                                                                                                                                                                                                           
autorestart=true                                                                                                                                                                                                                         
user=www-data                                                                                                                                                                                                                            
numprocs=2                                                                                                                                                                                                                               
redirect_stderr=true                                                                                                                                                                                                                     
stdout_logfile=/home/deploy/{path}/storage/logs/worker.log