龙空技术网

PHP|Rabbitmq 安装与使用

小和尚唱情歌 341

前言:

如今看官们对“phprabbitmq”大致比较关怀,姐妹们都需要剖析一些“phprabbitmq”的相关内容。那么小编同时在网络上搜集了一些关于“phprabbitmq””的相关资讯,希望小伙伴们能喜欢,咱们一起来学习一下吧!

本文工具:thinkphp5.1,rabbitmq

1.安装Erlang环境

wget

yum install erlang-22.2-1.el7.x86_64.rpm

2.下载rabbitMQ的rpm包

wget

3.安装RabbitMQ

yum install rabbitmq-server-3.8.1-1.el7.noarch.rpm

4.启动RabbitMQ

chkconfig rabbitmq-server on #设置rabbitmq 服务为开机启动

/sbin/service rabbitmq-server start #启动

/sbin/service rabbitmq-server stop #关闭

或者

/bin/systemctl start rabbitmq-server.service #启动

/bin/systemctl start rabbitmq-server.service #关闭

5.启动UI插件

创建登录用户 rabbitmq-plugins enable rabbitmq_management #启动管理插件,下次无需再手动启动该插件

PHP

composer "yarayzw/rabbitmq" : "dev-master",

创建worker配置文件

<?php// +----------------------------------------------------------------------// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]// +----------------------------------------------------------------------// | Copyright (c) 2006-2018  All rights reserved.// +----------------------------------------------------------------------// | Licensed (  )// +----------------------------------------------------------------------// | Author: liu21st <liu21st@gmail.com>// +----------------------------------------------------------------------use core\base\BaseMqService;use core\helper\SysHelper;// +----------------------------------------------------------------------// | Workerman设置 仅对 php think worker 指令有效// +----------------------------------------------------------------------return [    // 扩展自身需要的配置    //自动化 处理redis 3677    //sendplan 11001 - 11010    //sendmsg 11020 - 11030    'host'                  => '0.0.0.0', // 监听地址    'port'                  => 11020, // 监听端口    'root'                  => '', // WEB 根目录 默认会定位public目录    'app_path'              => '', // 应用目录 守护进程模式必须设置(绝对路径)    'file_monitor'          => false, // 是否开启PHP文件更改监控(调试模式下自动开启)    'file_monitor_interval' => 2, // 文件监控检测时间间隔(秒)    'file_monitor_path'     => [], // 文件监控目录 默认监控application和config目录    // 支持workerman的所有配置参数    'name'                  => 'xx-tj',    'count'                 => 2,    'daemonize'             => false,//    'pidFile'               => App::getRuntimePath().'worker/' . 'worker.pid',    'onWorkerStart' => static function ($worker) {        switch ($worker->id) {            case 0:                //mq消费监听                BaseMqService::setConfig(SysHelper::getConf('rabbitmq.'));                BaseMqService::receive();                break;            case 1:                break;            case 2:                break;            case 3:                break;            default:                throw new Exception('无效进程');        }    },];

启动worker

php think worker

创建mq基础类

<?phpnamespace core\base;use app\admin\service\UserPromulgatingService;use core\common\BaseException;use Exception;use hq\mq\MqSendDataStruct;use hq\mq\MqService;class BaseMqService extends MqService{    protected static $appName = 'xx-tj';        protected static $config = [        'host' => '127.0.0.1',        'port' => '5672',        'user' => 'yara',        'password' => 'qwer1234',        'vhost' => 'xx',        'is_delay' => true,        //是否需要开启延迟队列        'pre_exchange' => 'xx',       //交换机前缀        'exchange_type' => 'topic',     //默认topic类型        'exchange_key' => '',        'passive' => false,     //查询某一个队列是否已存在,如果不存在,不想建立该队列        'durable' => true,      //是否持久化        'auto_delete' => false, //是否自动删除        'exclusive' => false,   //队列的排他性        'no_local' => false,        'no_ack' => false,       //是否需不需要应答        'nowait' => false,      //该方法需要应答确认        'consumer_tag' => ''    ];    protected static $consumer = [        'yara' => [            'name' => 'yara',            'exchange' => 'xx.yara',            'route' => 'xx.yara.*',            'queue' => 'xx-tj.yara',            'operations' => [                ['name' => 'yara', 'title' => '测试', 'queue' => 'xx-tj.yara.yaratest', 'route' => 'xx.yara.yaratest', 'class' => UserPromulgatingService::class, 'method' => 'yarayzwtest'],            ]        ],    ];    //延迟队列    protected static $delays = [//        'order_timeout_cancel' => ['name' => 'order_timeout_cancel', 'title' => '订单超时取消订单', 'expiry' => 30 * 60, 'class' => MqDelayOrderService::class, 'method' => 'cancel'],//        'order_assemble_timeout_cancel' => ['name' => 'order_assemble_timeout_cancel', 'title' => '拼团订单超时退订单', 'expiry' => 24 * 60 * 60, 'class' => MqDelayOrderService::class, 'method' => 'refundAssemble'],    ];    /**     * @param MqSendDataStruct $data     * @param $routeKey     * @throws Exception     */    public static function sendEventMq(MqSendDataStruct $data, string $routeKey): void    {        if (empty($routeKey)) {            throw new BaseException('mq路由key必须填写!');        }        if (empty($data->getEvent())) {            throw new BaseException('请在事件类中定义event属性,并设置默认event_code值,参照Event类中事件对应的code!');        }        if (empty($data->getUserId())) {            throw new BaseException('请在事件类中定义user_id属性');        }        $eventLog = [            'mq_router_key' => $routeKey,            # 'user_type' => 0,            'user_id' => $data->getUserId(),            'user_name' => $data->getUserName(),            'event_code' => $data->getEvent(),            'send_data' => json_encode($data->toArray()),            'create_at' => date('Y-m-d H:i:s')        ];//        EventLogService::insert($eventLog);        self::send($data, $routeKey);    }}

创建字段约定类

<?phpnamespace entity\mq;use core\base\BaseEntity;use hq\mq\MqSendDataStruct;class GetOrder extends MqSendDataStruct{    use BaseEntity;    private $thirdparty;    private $channel_id;    private $start_time;    private $end_time;    /**     * @return mixed     */    public function getThirdparty()    {        return $this->thirdparty;    }    /**     * @param mixed $thirdparty     */    public function setThirdparty($thirdparty): void    {        $this->thirdparty = $thirdparty;    }    /**     * @return mixed     */    public function getChannelId()    {        return $this->channel_id;    }    /**     * @param mixed $channel_id     */    public function setChannelId($channel_id): void    {        $this->channel_id = $channel_id;    }    /**     * @return mixed     */    public function getStartTime()    {        return $this->start_time;    }    /**     * @param mixed $start_time     */    public function setStartTime($start_time): void    {        $this->start_time = $start_time;    }    /**     * @return mixed     */    public function getEndTime()    {        return $this->end_time;    }    /**     * @param mixed $end_time     */    public function setEndTime($end_time): void    {        $this->end_time = $end_time;    }}

发送mq消息

public function setDataByMysql(){    $sendData = new GetOrder([        'channel_id' => 0,        'start_time' => 'T00:00:00+08:00',        'end_time' => 'T23:59:59+08:00',        'thirdparty' => 1,    ]);    try {        BaseMqService::send($sendData, 'xx.yara.yaratest');    } catch (\Exception $e) {        FilterHelper::writeLog('dsrw_error','推广链接数据采集-error',$e->getMessage());    }}

标签: #phprabbitmq