前言:
如今看官们对“phprabbitmq”大致比较关怀,姐妹们都需要剖析一些“phprabbitmq”的相关内容。那么小编同时在网络上搜集了一些关于“phprabbitmq””的相关资讯,希望小伙伴们能喜欢,咱们一起来学习一下吧!本文工具:thinkphp5.1,rabbitmq
1.安装Erlang环境
wget
yum install erlang-22.2-1.el7.x86_64.rpm
2.下载rabbitMQ的rpm包
wget
3.安装RabbitMQ
yum install rabbitmq-server-3.8.1-1.el7.noarch.rpm
4.启动RabbitMQ
chkconfig rabbitmq-server on #设置rabbitmq 服务为开机启动
/sbin/service rabbitmq-server start #启动
/sbin/service rabbitmq-server stop #关闭
或者
/bin/systemctl start rabbitmq-server.service #启动
/bin/systemctl start rabbitmq-server.service #关闭
5.启动UI插件
创建登录用户 rabbitmq-plugins enable rabbitmq_management #启动管理插件,下次无需再手动启动该插件
PHP
composer "yarayzw/rabbitmq" : "dev-master",
创建worker配置文件
<?php// +----------------------------------------------------------------------// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]// +----------------------------------------------------------------------// | Copyright (c) 2006-2018 All rights reserved.// +----------------------------------------------------------------------// | Licensed ( )// +----------------------------------------------------------------------// | Author: liu21st <liu21st@gmail.com>// +----------------------------------------------------------------------use core\base\BaseMqService;use core\helper\SysHelper;// +----------------------------------------------------------------------// | Workerman设置 仅对 php think worker 指令有效// +----------------------------------------------------------------------return [ // 扩展自身需要的配置 //自动化 处理redis 3677 //sendplan 11001 - 11010 //sendmsg 11020 - 11030 'host' => '0.0.0.0', // 监听地址 'port' => 11020, // 监听端口 'root' => '', // WEB 根目录 默认会定位public目录 'app_path' => '', // 应用目录 守护进程模式必须设置(绝对路径) 'file_monitor' => false, // 是否开启PHP文件更改监控(调试模式下自动开启) 'file_monitor_interval' => 2, // 文件监控检测时间间隔(秒) 'file_monitor_path' => [], // 文件监控目录 默认监控application和config目录 // 支持workerman的所有配置参数 'name' => 'xx-tj', 'count' => 2, 'daemonize' => false,// 'pidFile' => App::getRuntimePath().'worker/' . 'worker.pid', 'onWorkerStart' => static function ($worker) { switch ($worker->id) { case 0: //mq消费监听 BaseMqService::setConfig(SysHelper::getConf('rabbitmq.')); BaseMqService::receive(); break; case 1: break; case 2: break; case 3: break; default: throw new Exception('无效进程'); } },];
启动worker
php think worker
创建mq基础类
<?phpnamespace core\base;use app\admin\service\UserPromulgatingService;use core\common\BaseException;use Exception;use hq\mq\MqSendDataStruct;use hq\mq\MqService;class BaseMqService extends MqService{ protected static $appName = 'xx-tj'; protected static $config = [ 'host' => '127.0.0.1', 'port' => '5672', 'user' => 'yara', 'password' => 'qwer1234', 'vhost' => 'xx', 'is_delay' => true, //是否需要开启延迟队列 'pre_exchange' => 'xx', //交换机前缀 'exchange_type' => 'topic', //默认topic类型 'exchange_key' => '', 'passive' => false, //查询某一个队列是否已存在,如果不存在,不想建立该队列 'durable' => true, //是否持久化 'auto_delete' => false, //是否自动删除 'exclusive' => false, //队列的排他性 'no_local' => false, 'no_ack' => false, //是否需不需要应答 'nowait' => false, //该方法需要应答确认 'consumer_tag' => '' ]; protected static $consumer = [ 'yara' => [ 'name' => 'yara', 'exchange' => 'xx.yara', 'route' => 'xx.yara.*', 'queue' => 'xx-tj.yara', 'operations' => [ ['name' => 'yara', 'title' => '测试', 'queue' => 'xx-tj.yara.yaratest', 'route' => 'xx.yara.yaratest', 'class' => UserPromulgatingService::class, 'method' => 'yarayzwtest'], ] ], ]; //延迟队列 protected static $delays = [// 'order_timeout_cancel' => ['name' => 'order_timeout_cancel', 'title' => '订单超时取消订单', 'expiry' => 30 * 60, 'class' => MqDelayOrderService::class, 'method' => 'cancel'],// 'order_assemble_timeout_cancel' => ['name' => 'order_assemble_timeout_cancel', 'title' => '拼团订单超时退订单', 'expiry' => 24 * 60 * 60, 'class' => MqDelayOrderService::class, 'method' => 'refundAssemble'], ]; /** * @param MqSendDataStruct $data * @param $routeKey * @throws Exception */ public static function sendEventMq(MqSendDataStruct $data, string $routeKey): void { if (empty($routeKey)) { throw new BaseException('mq路由key必须填写!'); } if (empty($data->getEvent())) { throw new BaseException('请在事件类中定义event属性,并设置默认event_code值,参照Event类中事件对应的code!'); } if (empty($data->getUserId())) { throw new BaseException('请在事件类中定义user_id属性'); } $eventLog = [ 'mq_router_key' => $routeKey, # 'user_type' => 0, 'user_id' => $data->getUserId(), 'user_name' => $data->getUserName(), 'event_code' => $data->getEvent(), 'send_data' => json_encode($data->toArray()), 'create_at' => date('Y-m-d H:i:s') ];// EventLogService::insert($eventLog); self::send($data, $routeKey); }}
创建字段约定类
<?phpnamespace entity\mq;use core\base\BaseEntity;use hq\mq\MqSendDataStruct;class GetOrder extends MqSendDataStruct{ use BaseEntity; private $thirdparty; private $channel_id; private $start_time; private $end_time; /** * @return mixed */ public function getThirdparty() { return $this->thirdparty; } /** * @param mixed $thirdparty */ public function setThirdparty($thirdparty): void { $this->thirdparty = $thirdparty; } /** * @return mixed */ public function getChannelId() { return $this->channel_id; } /** * @param mixed $channel_id */ public function setChannelId($channel_id): void { $this->channel_id = $channel_id; } /** * @return mixed */ public function getStartTime() { return $this->start_time; } /** * @param mixed $start_time */ public function setStartTime($start_time): void { $this->start_time = $start_time; } /** * @return mixed */ public function getEndTime() { return $this->end_time; } /** * @param mixed $end_time */ public function setEndTime($end_time): void { $this->end_time = $end_time; }}
发送mq消息
public function setDataByMysql(){ $sendData = new GetOrder([ 'channel_id' => 0, 'start_time' => 'T00:00:00+08:00', 'end_time' => 'T23:59:59+08:00', 'thirdparty' => 1, ]); try { BaseMqService::send($sendData, 'xx.yara.yaratest'); } catch (\Exception $e) { FilterHelper::writeLog('dsrw_error','推广链接数据采集-error',$e->getMessage()); }}
标签: #phprabbitmq