前言:
现时我们对“httpsqsphp”都比较着重,你们都需要学习一些“httpsqsphp”的相关知识。那么小编在网上网罗了一些对于“httpsqsphp””的相关知识,希望各位老铁们能喜欢,我们快快来了解一下吧!HTTPSQS是国内著名软件工程师张宴的作品, 曾在金山有广泛应用, 常用来进行异步解耦操作,如发送手机短信、发送电子邮件等场景。
HTTPSQS(HTTP Simple Queue Service)是一款基于HTTP GET/POST协议的轻量级开源简单消息队列服务,使用Tokyo Cabinet的B+Tree Key/Value数据库来做数据的持久化存储。
HTTPSQS具有以下特征:
非常简单,基于HTTP GET/POST 协议,支持HTTP协议的编程语言均可调用非常快速,入队列、出队列速度超过10000次/秒支持多队列单个队列支持的最大队列数量高达10亿条低内存消耗,海量数据存储,存储几十GB的数据只需不到100MB的物理内存缓冲区可以在不停止服务的情况下便捷地修改单个队列的最大队列数量可以实时查看队列状态(入队列位置、出队列位置、未读队列数量、最大队列数量)十分轻, 源代码不超过800行
安装安装libevent
libvent官网:
# 下载安装包wget 解压tar zxvf libevent-2.1.10-stable.tar.gzcd libevent-2.1.10-stable# 配置编译参数./configure --prefix=/usr/local/libevent/# 编译make# 安装sudo make install安装tokyocabinet
tokyocabinet官网:
# 下载安装包wget zxvf tokyocabinet-1.4.48.tar.gzcd tokyocabinet# 配置编译参数./configure --prefix=/usr/local/tokyocabinet/# 编译make# 安装sudo make install安装httpsqs
# 解压安装包tar zxvf httpsqs-1.7.tar.gz# 修改libevent和tokyocabinet的安装路径vim MakeFile # 编译make# 安装sudo make install # 安装到了/usr/bin下# 习惯将安装路径放在/usr/local目录下sudo mkdir -p /usr/local/httpsqs/bin/sudo mkdir -p /usr/local/httpsqs/tmp/sudo mv /usr/bin/httpsqs /usr/local/httpsqs/bin/启动https: /usr/local/httpsqs/bin/httpsqs -d -p 端口号 -x 数据存放目录关闭httpsqs: 直接kill掉httpsqs进程HTTPSQS管理脚本
个人比较习惯使用类似service start/stop的命令来管理各项服务,下面分享一个httpsqs管理脚本:
#!/bin/bash # 快速启动httpsqs脚本# EXEC=/usr/local/httpsqs/bin/httsqs PORT=1218PIDFILE=/usr/local/httpsqs/tmp/httpsqs.pidDATADIR=/usr/local/httpsqs/datacase "$1" in start) if [ -f $PIDFILE ] then echo "$PIDFILE exists, process is already running or crashed" else echo "Starting httpsqs ..." ulimit -SHn 65535 $EXEC -d -p $PORT -x $DATADIR -i $PIDFILE fi ;; stop) if [ ! -f $PIDFILE ] then echo "$PIDFILE does not exist, process is not running" else PID=$(cat $PIDFILE) kill $PID while [ -x /proc/${PID} ] do echo "Waiting for httpsqs to shutdown ..." sleep 1 done echo "httpsqs stopped" fi ;; *) echo "Please use start or stop as first argument" ;; esacHTTPSQS的使用入列
入列:将文本消息放入队列
以curl命令为例:
curl ";opt=put&data=经过URL编码的文本消息&auth=mypass123"
返回数据:
HTTPSQS_PUT_OK: 入列成功HTTPSQS_PUT_ERROR: 入列失败HTTPSQS_PUT_END: 队列已满出列
出列:从队列中取出文本消息
以curl命令为例:
curl ";name=your_queue_name&opt=get&auth=mypass123"
返回消息队列的内容给客户端, 如果没有未取出的消息队列,则返回:HTTPSQS_GET_END
查看队列状态
以curl命令为例:
curl ";opt=status_json&auth=mypass123"
返回数据示例:
{"name":"xoyo","maxqueue":1000000,"putpos":45,"putlap":1,"getpos":6,"getlap":1,"unread":39}HTTPSQS客户端
由于HTTPSQS是基于HTTP协议实现通信的,可以依据接口,自行编写library。本文分享基于PHP CI框架的HTTPSQS客户端类:
<?php if (!defined('BASEPATH')) exit('No direct script access allowed');/*----------------------------------------------------------------------------------------------------------------HTTP Simple Queue Service - httpsqs client class for PHP v1.7.1Author: Zhang Yan (), E-mail: net@s135.comThis is free software, and you are welcome to modify and redistribute it under the New BSD License----------------------------------------------------------------------------------------------------------------Useage:<?phpinclude_once("httpsqs_client.php");$httpsqs = new httpsqs($httpsqs_host, $httpsqs_port, $httpsqs_auth, $httpsqs_charset);$result = $httpsqs->put($queue_name, $queue_data); //1. PUT text message into a queue. If PUT successful, return boolean: true. If an error occurs, return boolean: false. If queue full, return text: HTTPSQS_PUT_END$result = $httpsqs->get($queue_name); //2. GET text message from a queue. Return the queue contents. If an error occurs, return boolean: false. If there is no unread queue message, return text: HTTPSQS_GET_END$result = $httpsqs->gets($queue_name); //3. GET text message and pos from a queue. Return example: array("pos" => 7, "data" => "text message"). If an error occurs, return boolean: false. If there is no unread queue message, return: array("pos" => 0, "data" => "HTTPSQS_GET_END")$result = $httpsqs->status($queue_name); //4. View queue status$result = $httpsqs->status_json($queue_name); //5. View queue status in json. Return example: {"name":"queue_name","maxqueue":5000000,"putpos":130,"putlap":1,"getpos":120,"getlap":1,"unread":10}$result = $httpsqs->view($queue_name, $queue_pos); //6. View the contents of the specified queue pos (id). Return the contents of the specified queue pos.$result = $httpsqs->reset($queue_name); //7. Reset the queue. If reset successful, return boolean: true. If an error occurs, return boolean: false$result = $httpsqs->maxqueue($queue_name, $num); //8. Change the maximum queue length of per-queue. If change the maximum queue length successful, return boolean: true. If it be cancelled, return boolean: false$result = $httpsqs->synctime($num); //9. Change the interval to sync updated contents to the disk. If change the interval successful, return boolean: true. If it be cancelled, return boolean: false?>----------------------------------------------------------------------------------------------------------------*/class Httpsqs{ public $httpsqs_host; public $httpsqs_port; public $httpsqs_auth; public $httpsqs_charset; protected $_ci; public function __construct($params=array()) { $this->_ci = & get_instance(); $this->httpsqs_host = isset($params['host']) ? $params['host'] : $this->_ci->config->item('httpsqs_host'); $this->httpsqs_port = isset($params['port']) ? $params['port'] : $this->_ci->config->item('httpsqs_port'); $this->httpsqs_auth = isset($params['auth']) ? $params['auth'] : $this->_ci->config->item('httpsqs_auth'); $this->httpsqs_charset = isset($params['charset']) ? $params['charset'] : $this->_ci->config->item('httpsqs_charset'); return true; } public function http_get($query) { $socket = fsockopen($this->httpsqs_host, $this->httpsqs_port, $errno, $errstr, 5); if (!$socket) { return false; } $header = ''; $pos_value = 0; $host = $this->httpsqs_host; $out = "GET ${query} HTTP/1.1\r\n"; $out .= "Host: ${host}\r\n"; $out .= "Connection: close\r\n"; $out .= "\r\n"; fwrite($socket, $out); $line = trim(fgets($socket)); $header .= $line; list($proto, $rcode, $result) = explode(" ", $line); $len = -1; while (($line = trim(fgets($socket))) != "") { $header .= $line; if (strstr($line, "Content-Length:")) { list($cl, $len) = explode(" ", $line); } if (strstr($line, "Pos:")) { list($pos_key, $pos_value) = explode(" ", $line); } if (strstr($line, "Connection: close")) { $close = true; } } if ($len < 0) { return false; } $body = fread($socket, $len); $fread_times = 0; while(strlen($body) < $len){ $body1 = fread($socket, $len); $body .= $body1; unset($body1); if ($fread_times > 100) { break; } $fread_times++; } //if ($close) fclose($socket); fclose($socket); $result_array["pos"] = (int)$pos_value; $result_array["data"] = $body; return $result_array; } public function http_post($query, $body) { $socket = fsockopen($this->httpsqs_host, $this->httpsqs_port, $errno, $errstr, 1); if (!$socket) { return false; } $header = ''; $host = $this->httpsqs_host; $out = "POST ${query} HTTP/1.1\r\n"; $out .= "Host: ${host}\r\n"; $out .= "Content-Length: " . strlen($body) . "\r\n"; $out .= "Connection: close\r\n"; $out .= "\r\n"; $out .= $body; fwrite($socket, $out); $line = trim(fgets($socket)); $header .= $line; list($proto, $rcode, $result) = explode(" ", $line); $len = -1; while (($line = trim(fgets($socket))) != "") { $header .= $line; if (strstr($line, "Content-Length:")) { list($cl, $len) = explode(" ", $line); } if (strstr($line, "Pos:")) { list($pos_key, $pos_value) = explode(" ", $line); } if (strstr($line, "Connection: close")) { $close = true; } } if ($len < 0) { return false; } $body = @fread($socket, $len); //if ($close) fclose($socket); fclose($socket); $result_array["pos"] = (int)$pos_value; $result_array["data"] = $body; return $result_array; } public function put($queue_name, $queue_data) { $result = $this->http_post("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=put", $queue_data); if ($result["data"] == "HTTPSQS_PUT_OK") { return true; } else if ($result["data"] == "HTTPSQS_PUT_END") { return $result["data"]; } return false; } public function get($queue_name) { $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=get"); if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) { return false; } return $result["data"]; } public function gets($queue_name) { $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=get"); if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) { return false; } return $result; } public function status($queue_name) { $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=status"); if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) { return false; } return $result["data"]; } public function view($queue_name, $queue_pos) { $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=view&pos=".$pos); if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) { return false; } return $result["data"]; } public function reset($queue_name) { $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=reset"); if ($result["data"] == "HTTPSQS_RESET_OK") { return true; } return false; } public function maxqueue($queue_name, $num) { $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=maxqueue&num=".$num); if ($result["data"] == "HTTPSQS_MAXQUEUE_OK") { return true; } return false; } public function status_json($queue_name) { $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=".$queue_name."&opt=status_json"); if ($result == false || $result["data"] == "HTTPSQS_ERROR" || $result["data"] == false) { return false; } return $result["data"]; } public function synctime($num) { $result = $this->http_get("/?auth=".$this->httpsqs_auth."&charset=".$this->httpsqs_charset."&name=httpsqs_synctime&opt=synctime&num=".$num); if ($result["data"] == "HTTPSQS_SYNCTIME_OK") { return true; } return false; }}?>
其他HTTPSQS用法可参考:
标签: #httpsqsphp