Hello Go

Mac 安装 GO

本人使用了Brew来安装。

安装前首先更新Brew。
brew update && brew upgrade
安装Go

使用brew安装go

brew install go
设置$GOPATH

Go从1.1版本开始必须设置这个变量,也就是说通过以上方式安装后就必须要设置$GOPATH了。

这个目录用来存放Go源码,可运行文件,和编译之后的包文件。

Mac系统设置

在bash中加入:

export GOPATH=/your/path

以上目录需要存在,不存在就自己手动创建一个。

然后运行

source ~/.zshrc

替换成你自己使用的shell,如.bashrc等。

然后,需要在/your/path目录下,新建 pkg,bin,src目录。

​ src目录存放源代码,如hello.go

​ pkg目录存放编译后的文件,如hello.a

​ bin目录存放生成后的可执行文件

src目录就是主要开发目录。如hello项目,则在src下新建hello目录。

到这里,就可以开始Go之旅了。

Hello Go

需要注意的是,在Go中,包名和文件名是可以不同的,包名为main的时候即为可以独立运行的包。

在src目录新建hello目录,创建hello.go文件。

package main //包名

import "fmt" //导入一个系统级别的fmt包

//使用func定义函数
//main函数没有参数,没有返回值
func main() {
  fmt.Println("Hello Go") //调用包函数的方法为 pkgName.funcName
}

一个hello.go就写好了,在命令行输入

go run hello.go

就可以看到输出Hello Go了。

PHP Curl 上传文件

有时候会遇到上传文件给第三方服务的情况,比如本身程序并不需要存储附件,而是把附件发送给一个公共的服务。

最近正好碰到这个问题,记录一下。

上代码。

发送端:

<?php

// 接口地址
$api = 'http://api.example.com/uploadfile';
$file = $_FILES['file'];//保存$_FILES到变量中。

// 此处可能存在上传失败等问题,需验证$_FILES["file"]["error"]。
// 做业务对应的规则验证,如文件格式,文件大小等。

// 创建一个 cURL 句柄
$ch = curl_init($api);

// 创建一个 CURLFile 对象
// 上传文件的路径,文件的Mimetype,文件名
$cfile = curl_file_create($file['tmp_name'],$file['type'],$file['name']);

$data = [
	'type'=>'image',
	'data'=>$cfile
];

// 设置 POST 数据
curl_setopt($ch, CURLOPT_POST,1);
curl_setopt($ch, CURLOPT_POSTFIELDS, $data);

$resp = curl_exec($ch);

if(!$resp) {
  die('Error: "' . curl_error($ch) . '" - Code: ' . curl_errno($ch));
} else {
  echo "Response HTTP Status Code : " . curl_getinfo($ch, CURLINFO_HTTP_CODE);
  echo "\nResponse HTTP Body : " . $resp;
}

// Close request to clear up some resources
curl_close($ch);

接收端:

<?php 
var_dump($_FILES);// 接收文件内容
va_dump($_POST);// 接收type

前端:

前端可使用Form或其他Ajax方式上传。

一个简单的php+redis队列示例

虽然RabbitMQ的坑年前就开始填了,但是并没有机会在项目中实际使用,机缘巧合,换工作后第一个比较重要的事就是做一个直播的页面,如果数据直接插入或读取自数据库,数据库端的压力就太大了,当时RabbitMQ还没看完,而其他的队列程序更是没有用过,只是稍微对Redis熟悉些,于是就使用了Redis做。

Redis比较常见的是作为数据缓存工具使用,数据存储在内存中,减少了数据库的连接和查询,效率高,又方便。

而其实Redis也可以用来做消息队列。

发送

发送端首先当然是做好数据的接收和检测,比如为空的情况下给个默认值或返回错误。

特殊的情况下可能还要正则处理。

处理完插入到Redis的list(列表)中。如果插入失败抛出异常。

<?php

$redis = new Redis();
$redis->connect('127.0.0.1');

//如果redis需要认证
$redis->auth('redispasswd');

//命令行中使用,可以使用把信息作为参数的方式
$data = $argv[1];
if(empty($data)) $data = "Hello World..!":

try{
	//便于调试,在信息后边加上当前时间
	$data .= '-at-'.date('Y-m-d H:i:s');
	$redis->LPUSH('redis_queue_1',json_encode($data));
}catch(Exception $e){
	echo $e->getMessage()."\n";
}

执行

php push.php "testinfo"

接收

由于消息是源源不断发送到队列中,所以接收端程序一般会以后台进程的方式运行。

<?php
$redis = new Redis();
$redis->connect('127.0.0.1');

$redis->auth('redispasswd');

while (true) {

	try {
		$data = $redis->rPopLPush('redis_queue_1','redis_queue_1_bak')."\n";
		if($data){
			echo $data;
		}else{
			echo "没有数据\n";
		}
	} catch (Exception $e) {
		echo $e->getMessage()."\n";
	}

	//每秒读取1次
	sleep(1);
}

执行

php get.php

更进一步

如果想把数据写入到MySQL中,该怎么办?

大多数情况下数据最终是需要持久化的,仅仅存在于redis中,也许内存会不够呢。

改造一下get.php。

<?php
$redis = new Redis();
$redis->connect('127.0.0.1');

$redis->auth('redispasswd');

$dsn = 'mysql:dbname=test;host=127.0.0.1';

try{
	$pdo = new PDO($dsn,'root','mysqlpasswd',array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES utf8'));
}catch(PDOExcetion $e){
	echo $e->getMessage()."\n";
}

while(true){
	$data = $redis->rPopLPush('redis_queue_1','redis_queue_1_bak');
	if($data){
		$sth = $pdo->prepare("INSERT INTO tb_test(msg,time) VALUES (:msg,:time)");
		
		$time = time();
		
		$sth->bindParam(':msg',$data,PDO::PARAM_STR);
		$sth->bindParam(':time',$time,PDO::PARAM_INT);
		
		$sth->execute();
		$id = $pdo->lastInsertId();
      
		echo "插入成功,ID=$id \n";
	}else{
		echo "没有数据 \n";
	}
	
	//实际任务中可能需要尽可能快的把数据导入到MySQL中
	sleep(1);
}

注意

​ 这里使用了PDO;

bindParam方法的第一个参数对应insert语句中的对应顺序的值,如第一个bindParam方法对应第一个value值。

​ bindParam方法第二个参数为实际的值,传入变量或固定值,传入方法调用时如time()会提示:

Strict standards: Only variables should be passed by reference in /www/get.php on line 24。

解决Discuz无法发布爱奇艺视频的问题

最近碰到需要在Discuz论坛中插入爱奇艺视频的问题,之前没关注过,搜索后有些答案说DZ不支持爱奇艺,有些说爱奇艺不支持DZ,并没有真正能解决问题的。

下午突然想到也许是DZ根据粘贴进来的flash地址生成的标签代码不对,试验后发现果然是这个原因。

打卡/static/js/editor.js 文件第1299行查看这段代码:

case 'vid':
	var mediaUrl = $(ctrlid + '_param_1').value;
	var auto = '';
	var posque = mediaUrl.lastIndexOf('?');
	posque = posque === -1 ? mb_strlen(mediaUrl) : posque;
	var ext = mediaUrl.lastIndexOf('.') === -1 ? '' : mediaUrl.substring(mediaUrl.lastIndexOf('.') + 1, posque).toLowerCase();
	ext = in_array(ext, ['mp3', 'wma', 'ra', 'rm', 'ram', 'mid', 'asx', 'wmv', 'avi', 'mpg', 'mpeg', 'rmvb', 'asf', 'mov', 'flv', 'swf']) ? ext : 'x';
	if(ext == 'x') {
		if(/^mms:\/\//.test(mediaUrl)) {
			ext = 'mms';
		} else if(/^(rtsp|pnm):\/\//.test(mediaUrl)) {
			ext = 'rtsp';
		}
	}
	var str = '[media=' + ext + ',' + $(ctrlid + '_param_2').value + ',' + $(ctrlid + '_param_3').value + ']' + squarestrip(mediaUrl) + '[/media]';
	insertText(str, str.length, 0, false, sel);
	break;

Discuz根据主流的视频网站的视频地址格式写的规则,生成discuz专用的[media]标签,在前台输出的时候再解析成embed这样的HTML代码。

解析之后的差不多就是这样:

<embed src="http://player.video.qiyi.com/7b42a1a27ff121c201ee5e6c6d757817/0/0/v_19rrklq2bs.swf-albumId=406283300-tvId=406283300-isPurchase=2-cnId=1" allowFullScreen="true" quality="high" width="480" height="350" align="middle" allowScriptAccess="always" type="application/x-shockwave-flash"></embed>

而不那么主流的爱奇艺的flash地址则是:”http://player.video.qiyi.com/7b42a1a27ff121c201ee5e6c6d757817/0/0/v_19rrklq2bs.swf-albumId=406283300-tvId=406283300-isPurchase=2-cnId=1”,查看上一段代码可以看到,discuz会用正则去看粘贴的地址最后一个"."后的后缀,如果这个后缀不在自己的已知flash格式数组中,就把类型设为"x",也就是生成的标签成了[media=x,500,350]。

再到了前台解析,不认识,直接生成一个a标签。

所以,我的解决办法是:

if(ext == 'x') {
	if(/^mms:\/\//.test(mediaUrl)) {
		ext = 'mms';
	} else if(/^(rtsp|pnm):\/\//.test(mediaUrl)) {
		ext = 'rtsp';
	} else if (mediaUrl.indexOf('player.video.qiyi.com')) {
  		ext = 'swf';	
	}
}

增加一个else if 判断是否包含爱奇艺播放器的域名,当用户粘贴的flash地址包含这个字符串时,就认为是粘贴了一个爱奇艺的视频,存储的格式也就成了[media=swf]。

但是这里其实不是十分严谨,如果一个非法的flash地址很巧合的包含了这个字符串,也会认为是flash了,那这种情况下就会出错了。

PHP RabbitMQ 教程(六) - 远程调用

远程调用

第二节中,我们学习了如何使用工作队列在多个worker中分发耗时任务。

但是如果我们需要在远程运行一个函数并等待返回结果怎么办?这是两码事,这个模式通常被称为远程过程调用(Remote Procedure Call,RPC)。

在本节中我们准备使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务端。由于我们没有值得分发的耗时任务,我们准备创建一个假的返回斐波那契数列的RPC服务。

客户端接口

我们创建一个简单的客户端类来说明RPC服务如何使用。这个类会展示call方法如何发送一个RPC请求并且阻塞,直到接收到返回值。

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo "[.] Got ", $response, "\n";

RPC注意事项

尽管RPC在计算机学中很常见,但它十分挑剔。当程序员不知道是否是调用一个本地的方法还是一个很慢的RPC会出现这个问题。这样的困惑便导致不可预测的系统并增加不必要的调试复杂性。比起简化的软件,误用RPC会导致不可维护的无头绪代码。

记住刚才的内容,考虑下面的建议:

​ 确保可以明显的看出哪个方法调用的是本地的哪个是远程的。

​ 系统文档化。让组件之间的依赖变得清晰可见。

​ 错误处理。当RPC服务长时间关闭客户端该作何反应?

如果有疑问,则尽量避免使用RPC。如果可以话,你应该使用异步管道——而不是RPC——像阻塞,结果被异步推送到下个计算阶段。

回调队列

通常在RabbitMQ上做RPC很简单。客户端发送请求消息,服务端回复消息。为了接收响应消息,我们需要在请求中附带一个”callback”队列地址,我们可以使用默认的队列。来试一试:

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$msg = new AMQPMessage(
	$payload,
	array('reply_to' => $queue_name));

$channel->basic_publish($msg, '', 'rpc_queue');

# ... then code to read a response message from the callback_queue

消息属性

AMQP协议定义了14个消息属性。大部分不常用,下面的除外:

​ delivery_mode:值为2时表示持久化,1为临时的。也许你还记得这个属性来自第二节。

​ content_type:编码格式,比如经常用的JSON格式,良好的做法是设置为:application/json。

​ reply_to:通常用来定义回调队列名称。

​ correlation_id:用来关联RPC的响应和请求。

Correlation Id

在上面的方法中我们建议为每一个RPC请求创建一个回调队列。这样非常低效,但是幸运的是有更好的办法 - 我们可以为每一个客户端创建一个单独的回调队列。

这样又带来一个新的问题,在队列接收到响应时,并不知道属于哪个请求。这也正是correlation_id属性要发挥的作用。我们为每一个请求的设定一个唯一的correlation_id值,然后,当在回调队列接收到消息时会查看它的属性,基于此,我们就可以把响应和请求进行匹配。如果发现一个未知的correlation_id值,可以安全的忽略掉这条消息 - 因为它不属于任何请求。

也许你会问,为什么应该忽略回调队列里的未知消息,而不是返回一个错误?因为服务可能会出现紊乱的情况,虽然不太可能,但是如果发生这种情况,RPC服务会在发送完响应后挂掉,但是还没有进行消息确认。如果发生了,重启RPC服务后会再次处理这个请求。这就是为什么在客户端我们必须适当的处理重复请求,而RPC服务最好的幂等的。

总结

RPC工作流程:

​ 当客户端开始运行时会创建一个匿名独有回调队列。

​ RPC请求中,客户端消息带有两个属性:reply_to用来设置回调队列,correlation_id用来唯一标识每一个请求。

​ 请求被发送到rpc_queue队列。

​ RPC worker(又称worker)在队列中守护,等待新请求。当请求到达,它会进行处理,然后把结果以消息的形式发送回客户端的队列,队列名便是客户端消息带有的reply_to的值。

​ 客户端等待回调队列中的数据。当消息到达,检查它的correlation_id的值。如果符合客户端发送给RPC服务器中请求的值,客户端会返回响应内容到应用中。

整合

斐波那契方法:

function fib($n){
  if($n == 0)
  	return 0;
  if($n == 1)
  	return 1;
  return fib($n-1) + fib($n-2);
}

定义完斐波那契方法。假定它仅接受数字类型的输入。(别期望它能处理大的数字,它很可能非常慢的处理完。)

RPC服务处理程序rpc_server.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('rpc_queue', false, false, false, false);

fuction fib($n){
  if($n == 0)
    return 0;
  if($n == 1)
    return 1;
  
  return fib($n-1) + fib($n-2);
}

echo " [x] Awaiting RPC requests\n";

$callback = function($req){
  $n = intval($req->body);
  echo " [.] fib(",$n,")\n";
  
  $msg = new AMQPMessage(
  	(string)fib($n),
    array('correlation_id' => $req->get('correlation_id'))
  );
  
  $req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
  $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)){
  $channel->wait();
}

$channel->close();
$connection->close();

?>

服务端代码相当简单:

​ 和以往一样我们会从创建连接,频道,和声明队列开始

​ 也许我们想要运行更多的进程。为了在多个服务器之间负载均衡,需要在$channel.basic_qos中设置prefech_count;

​ 我们使用basic_consume访问队列。然后进入while循环等待请求消息,处理,然后返回响应消息。

RPC客户端 rpc_client.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class FibonacciRpcClient{
	private $connection;
	private $channel;
	private $callback_queue;
	private $response;
	private $corr_id;
  
	public function __construct(){
		$this->connection = new AMQPStreamConnection(
		  'localhost', 5672, 'guest', 'guest'
		);
		$this->channel = $this->connection->channel();
		list($this->callback_queue, ,) = $this->channel->queue_declare("", false, false, true, false);
		$this->channel->basic_consume(
			$this->callback_queue, '', false, false, false, false,
			array($this, 'on_response'));
	}
  
 	public function on_response($req){
 		if($req->get('correlation_id') == $this->corr_id){
 			$this->response = $req->body;
 		}
 	}
  	
	public function call($n){
		$this->response = null;
		$this->corr_id = uniqid();

		$msg = new AMQPMessage(
  		(string) $n,
  		array(
  			'correlation_id' => $this->corr_id,
  			'reply_to' => $this->callback_queue)
  		);
    )
      
    $this->channel->basic_publish($msg, '', 'rpc_queue');
    while(!$this->response){
    	$this->channel->wait();
    }
    return intval($this->response);
  }
};

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";

?>

现在可以查看示例的完整代码了。rpc_client.phprpc_server.php

现在RPC服务端可以运行了:

php rpc_server.php
[x] Awiting RPC resquests

接收斐波那契数列运行:

php rpc_client.php
[x] Requesting fib(30)

这里展现的并不是RPC服务的唯一可能实现,但它有一些重要的优势:

​ 如果RPC服务太慢,可以按比例增加运行数量。试试在新控制台裕兴第二个rpc_server.php服务。

​ 在客户端,RPC要求只发送和接收一条消息。不能有像队列声明一样的异步调用。结果就是,对于单一的RPC请求,客户端仅需要一个网络往返。

现在的代码还是过于简单,并没有想解决更复杂(更重要)的问题,比如:

​ 要是没有服务端守护运行,客户端作何反应?

​ RPC客户端是否需要设置超时?

​ 如果服务端引发异常,是否该把它发送到客户端?

​ 处理前阻止无效消息(如检查范围,类型)进入?

如果想尝试,可以在rabbitmq-management 里找到一些有用的查看队列的插件。

原文地址:Remote procedure call (RPC)

PHP RabbitMQ 教程(五) - 主题

主题

(使用php-amqplib)

上一节我们改善了日志系统(logging system,以下简称日志系统),为了替代fanout类型的交换器,我们使用了一个direct类型的交换器,带来的好处是可以有选择的接收日志。

虽然使用direct交换器改善了系统,但是仍然有局限性 - 它不能根据多个条件进行路由。

在日志系统中,我们也许不仅仅想订阅严重等级的日志,也想订阅基于消息发布源的内容。也许你已经知道这个概念来自于UNIX的syslog工具,基于严重性(info/warn/crit…)和设备路由日志(auth/cron/kern…)的工具。

这可以提高灵活性 - 我们也许只想要监听来自’cron’的关键错误而不是来自’kern’的全部日志。

为了在我们的日志系统上实现这个功能,需要学习一个更复杂的 topic 交换器。

Topic 交换器

发送到 topic 交换器的消息不能随意设置 routing_key - 它必须是一个单词列表,以’.‘分隔。单词可以是任何内容,但是通常会具体说明消息的功能。一些有效的routing key示例:样:”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”。routing key 可以是任何长度的你喜欢的单词,最大255个字节。

binding key 也必须是同样的格式,topic交换器的逻辑和direct交换器类似 - 带有特定routing key的消息会被派发到所有绑定了binding key的队列,然而对于binding key依然有两个重要的特殊情况:

*可以代替一个单词

#可以代替0个或多个单词

下图比较好的解释了这个情况:

在这个例子中,我们准备全部发送描述动物的消息。这些消息带有由三个单词(两个点号分隔)组成的routing key,其中第一个单词表示速度,第二个表示颜色,第三个表示种类:”..“。

我们创建三个绑定:Q1的binding key为”*.orange”,Q2的binding key为”*.*.rabbit”和”lazy.#“。

这些绑定可以概括为:

Q1 关注所有orange的动物

Q2 想知道所有关于兔子(rabbits)和懒惰动物(lazy animals)的消息。

routing key为”quick.orange.rabbit”的消息会被发送到两个队列,”lazy.orange.elephant”也会被发送到这两个队列。而”quick.orange.fox”则只会发送到第一个队列,”lazy.brown.fox”会被只发送到第二个队列。”lazy.pink.rabbit”会只被发送到第二个队列一次,即使它匹配两个绑定。”quick.brown.fox”不匹配任何绑定,所以会被丢弃。

如果打破规则,发送一条带有一个或四个单词,如”orange”或”quick.orange.male.rabbit”会怎么样?好吧,消息会丢失,因为它不匹配任何一个绑定。

但是,”lazy.orange.male.rabbit”这种消息,即使它有4个单词,依然会匹配最后一个绑定,然后被发送到第二个队列。

topic 交换器

topic 交换器非常强大,可以表现得跟其他交换器一样。

当一个队列的binding key为"#"时,它会接收所有消息,忽略routing key,像fanout交换器一样。

当绑定中不存在"*"和"#"时,topic交换器会表现的跟direct交换器一样。

整合

我们准备在日志系统中使用topic交换器。假定日志的routing key由两个单词:”.“组成。

代码与上一节的几乎一致。

emit_log_topic.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($agrv, 2));
if(empty($data)) $data = "Hello Wrold!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo " [x] Sent ",$routing_key,':',$data," \n";

$channel->close();
$connection->close();

?>

receive_logs_topic.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if( empty($binding_keys) ){
  file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
  exit(1);
}

foreach($binding_keys as $binding_key){
  $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [*] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
}

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)){
  $channel->wait();
}

$channel->close();
$connection->close();

?>

接收所有日志:

php receive_logs_topic.php '#'

接收所有来自”kern”的日志:

php receive_logs_topic.php "kern.*"

只接收”致命(critical)“日志:

php receive_logs_topic.php "*.critical"

创建多个绑定:

php receive_logs_topic.php "kern.*" "*.critical"

发布routing key为”kern.critical”的日志就输入:

php emit_log_topic.php "kern.critical" "A critical kernel error"

注意此代码并没有做路由或捆绑的例子,也许你想试一下两个以上的routing key参数。

一些问题:

“*“会匹配routing key为空的消息吗?

”#.*“会匹配内容为”..“的消息吗?会匹配一个单词的消息吗?

“a.*.#“和”a.#“的区别是什么?

emit_log_topic.php完整代码 receive_logs_topic.php完整代码

下一步,在第六节中学习像远程过程调用一样完成消息往返。

原文地址:Topics

PHP RabbitMQ 教程(四) - 路由

路由

(使用php-amqplib) 在上一节中,我们创建了一个简单的日志系统(logging system)。我们已经可以广播日志消息到多个接收者了。

在本节中,我们要给它增加一个功能-使它能够只订阅消息的一个子集。比如,只把严重的错误信息写入到日志文件(存储到磁盘)中,但同时仍然会把所有日志信息输出到控制台中。

绑定

在上一节中我们已经创建了绑定(bindings),代码如下:

$channel->queue_bind($queue_name,'logs');

绑定(bindings)是指交换器(exchange)和队列(queue)的关系。可以简单的理解为:这个队列对这个交换器中的消息感兴趣。

绑定的时候可以带一个额外的 routing_key 参数。为了避免与$channel::basic_publish的参数混淆,我们把它叫做 binding_key,所以我们这样使用key创建一个绑定:

$binding_key = 'black';
$channel->queue_bind($queue_name,$exchange_name,$binding_key);

binding key的意义取决于交换器的类型。我们之前使用过的fanout类型的交换器,会忽略这个值。

Direct 交换器

之前创建的日志系统分发所有消息到所有的消费者。我们打算扩展一下,使它可以过滤严重的消息。比如,我们只想在接收到严重错误的时候才写入到磁盘中,不在警告或普通的消息上浪费磁盘空间。

我们使用的是没有太多扩展性的fanout交换器,它仅能够简单的广播消息。

我们将要使用一个direct交换器代替fanout交换器。路由算法很简单-只有binding key完全匹配routing key的消息会进入队列。

为了说明,考虑如下的场景:

在这个场景中,我们可以看到direct类型的交换器X有两个队列,第一个队列使用orange作为binding key,第二个队列有两个绑定,一个是black另一个是green。

在这个场景中,当routing key为orange的消息发送到交换器,将会被路由到队列Q1。routing key为black或green的消息将会发送到Q2。其他的消息则会被丢弃。

多个绑定

使用相同的binding key绑定多个队列是合法的。在这个例子中,我们会使用black作为binding key为X和Q1之间添加一个绑定。这样一来,direct 交换器就表现得跟fanout交换器一样,分发消息到匹配的队列。routing key为black的消息就会被分发到Q1和Q2。

发送日志

我们将要对日志系统使用这个模型,我们将要发送消息到一个direct交换器。将日志级别作为routing key。这样一来接收端程序就可以选择它想要接收的消息了。首先来看看发送日志。

和以往一样,需要创建一个交换器:

$channel->exchange_declare('direct_logs','direct',false,false,false);

然后准备发送消息:

$channel->exchange_declare('direct_logs','direct',false,false,false);
$channel->basic_publish($msg,'direct_logs',$severity);

为了简化,我们可以假定’severity’的值可以是’info’,‘warning’,‘error’中的一个。

订阅

接收消息的脚本会跟之前一样正常工作,但是我们准备为每一个我们感兴趣的日志级别创建一个新的绑定。

foreach($severities as $severity){
	$channel->queue_bind($queue_name,'direct_logs',$severity);
}

整合

emit_log_direct.php类的代码为:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStremConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs','direct',false,false,false);

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ',array_slice($argv,2));
if(empty($data)) $data = “Hello World!”;

$msg = “[x] Sent ”,$severity,':',$data,” \n”;

$channel->close();
$connection->close();
?>

receive_logs_direct.php的代码为:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs','direct',false,false,false);

list($queue_name, ,) = $channel->queue_declare(“”,false,false,true,false);

$severities = array_slice($argv,1);
if(empty($severities)){
	file_put_contents('php://stderr',”Usage:$argv[0][info][warning][error]\n”);
	exit(1);
}

foreach($severities as $severity){
	$channel->queue_bind($queue_name,'direct_logs',$severity);
}

echo '[*]Waiting for logs.To exit press CTRL+C',”\n”;

$callback = function($msg){
	echo '[*]',$msg->delivery_info['routing_key'],':',$msg->body,”\n”;	
};

$channel->basic_consume($queue_name,'',false,true,false,false,$callback);

while(count($channel->callbacks)){
	$channel->wait();
}

$channel->close();
$connection->close();
?>

如果你想只保存’warning’或’error’(而不是’info’)级别的消息,只需要打开命令行输入:

php receive_logs_direct.php warning error > logs_from_rabbit.log

如果你想在屏幕上输出所有的消息,打开一个新的终端,输入:

php receive_logs_direct.php info warning error
[*]Waiting for logs.To exit press CTRL+C

例如,发送error消息,输入:

php emit_log_direct.php error "Run. Run. Or it will explode."
[x] Sent 'error':'Run. Run. Or it will explode.'

emit_log_direct.php源码 receive_logs_direct.php源码

转到第五节,查看如何监听基于模式的消息。

原文地址:Routing

PHP RabbitMQ 教程(三) - 发布/订阅

发布/订阅

我们在上一节创建了一个工作队列,并假定队列对应的任务传送给了某个客户端。在这一章节我们会做一些完全不一样的东西–我们会发送一条消息到多个消费者,也称之为“发布/订阅”模式。

为了说明这个模式,我们会创建一个简单的日志系统(logging system,以下简称日志系统),它由两个程序组成–第一个是发送日志信息,第二个是接收日志并打印。

日志系统的每一个运行的接收端程序都会接收信息,这样就可以运行一个接收端就把日志保存到硬盘里,同时运行另一个接收端去实时显示日志到屏幕。

本质上,日志内容是广播给所有的接收端的。

交换器

在之前的章节中我们从一个队列里发送和接收消息,现在该把完整的RabbitMQ消息模型介绍给大家了。

让我们快速的回看一遍在之前的章节中的内容:

>生产者是一个用来发送消息的程序

>队列是一个存储消息的缓冲区

>消费者是一个接收消息的程序

RabbitMQ消息模型的核心思想是,生产者永远不会直接发送给任何消息队列,实际上,生产者一般情况下甚至不知道消息应该发送给哪个队列。

生产者只能发送消息到交换器中,交换器非常简单。一方面从生产者接收消息,另一方面把消息推送到队列中。交换器必须知道如何处理接收到的消息,是推送到某个队列?推送到多个队列?还是丢弃这条消息。这个规则通过交换器类型(exchange type)来指定。

这里是交换器的几个类型:direct,topic,headers,fanout。这里我们主要关注最后一个–fanout,创建一个类型为 fanout 的交换器,命名为 logs。

$channel->exchange_declare('logs','fanout',false,false,false);

fanout交换器非常简单,你可以从名称中猜出它的功能,它把所有接收到的消息广播给所有它知道的队列,这也正是我们的日志系统需要的功能。

列出交换器

可以使用rabbitmqctl 命令列出服务器上的所有交换器:

sudo rabbitmqctl list_exchanges

Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

结果中有一些amq.*和一些未命名的交换器,这是一些默认创建的交换器,它们不太可能是现在需要用到的。

未命名交换器

在之前的章节中我们对交换器一无所知,直到可以发送消息给队列。大概是因为我们当时正在使用一个以空字符串“”定义的默认的交换器。

回想一下之前怎么发布消息:

$channel->basic_publish($msg,'','hello');

这里就是使用默认或者说未命名的交换器:消息被routing_key的值 Here we use the default or nameless exchange: messages are routed to the queue with the name specified by routing_key, if it exists. The routing key is the second argument to basic_publish

现在,可以发布消息到这个队列。

$channel->exchange_declare('logs','fanout',false,false,false);
$channel->basic_publish($msg,'logs');

临时队列

也许你还记得在之前我们使用了一个指定的队列(还记得 hello 队列 和 task_queue 队列吗?)。可以命名一个队列是至关重要的–我们需要指定一个worker到同一个队列。当想让生产者和消费者使用同一个队列时给队列命名是非常重要的。

但是在我们的日志系统中情况不同了,我们想要接收所有的消息,不仅仅是其中的一部分,我们关心的是最新的消息而不是旧的,因此需要做两件事。

首先,当连接到RabbitMQ时,需要一个空的队列,可以手动创建一个名字随机的队列,或者,更好的办法是,让服务器为我们随机选一个队列名字。

其次,一旦与消费者失去连接,队列需要自动删除。

php-amqplib中,当我们创建了一个名字为空的队列时,实际上是创建了一个被生成了名字的非持久化的队列。

list($queue_name, ,) = $channel->queue_declare("");

方法执行后,$queue_name变量包含了一个RabbitMQ生成的字符串。比如也许是这样的:amq.gen-JzTY20BRgKO-HjmUJj0wLg。

当连接被关闭的时候,队列也会被删掉,因为队列是独有的。

绑定(Bindlings)

我们已经创建了一个fanout类型的交换器和一个队列。现在需要让交换器发送消息给队列。交换器和队列之间的关系称之为绑定(binding)

$channel->queue_bind($queue_name,'logs');

现在开始,logs 交换器会把消息附加到队列中。

列出绑定(Listing bindings)

可以使用 rabbitmqctl list_bindings列出所有存在的正在使用的绑定。

整合

发送日志消息的生产者,与之前的代码看起来没什么不同,最重要的变化是现在想要发送消息到我们的 logs 交换器中,需要在发送时提供一个routing_key,但是在 fanout类型的交换器中这个值是可以忽略的。下边是emit_log.php的代码。

<?php

require_once __DIR__ .'/verdor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $channel->channel();

$channel->exchange_declare('logs','fanout',false,false,false);

$data = implode(' ',array_slice($argv,1));

if(empty($data)) $data = "info:Hello World";
$msg = new AMQPMessage($data);

$channel->basic_publish($msg,'logs');

echo "[x]Sent ",$data,"\n";

$channel->close();
$connection->close();
?>

(emit_log.php)

如你所见,建立连接后声明了交换器,这一步是必须的,因为发送消息到一个不存在的交换器是被禁止的。

如果还没有队列绑定到交换器,信息会丢失,但是这对于我们是可以的,如果没有消费者监听,我们可以安全的丢弃消息。

receive_logs.php:

<?php

require_once __DIR__ .'/vendor/autoload.php';
use PhpAmqpLib\Connection\QMAPStreamConnection;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();

$channel->queue_bind($queue_name,'logs');

echo '[*] Waiting for logs. To exit press CTRL+C',"\n";

$callback = function($msg){
	echo '[x]',$msg->body,"\n";
};

$channel->basic_consume($queue_name,'',false,true,false,false,$callback);

whild(count($channel->callbacks)){
	$channel->wait();
}

$channel->close();
$connection->close();
?>

(receive_logs.php)

如果想保存日志到文件中,可以在命令中输入

php receive_logs.php > logs_from_rabbit.log

如果想在屏幕上查看日志,新打开一个终端并运行:

php receive_logs.php

发送日志:

php emit_log.php

使用 rabbitmqctl list_bindings 可以确认代码确实创建了绑定和队列,当两个receive_logs.php在运行的时候会看到类似这样的:

sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

对于结果的解释很简单,logs交换器中的数据发送到两个服务器指定的队列,而这正是我们要实现的。

想要弄明白怎样去监听部分消息,转到第四部分。

原文地址:Publish/Subscribe

PHP RabbitMQ 教程(二) - 工作队列

工作队列

(使用php-amqplib库)

在本教程第一部分 我们已经写完了从一个指定队列发送和接收消息的程序。在这一章节中,我们会创建一个工作队列(Work Queue)来分发耗时的任务给多个工作者(worker)。

工作队列(也被称为 任务队列-task queue)主要是避免立即执行资源密集型任务并且还要等待它执行完毕。相反,需要让任务稍后执行,我们把一个任务当做一条信息发送给队列,后台运行的工作者(worker)会取出任务并执行,当运行多个worker时任务会在它们之间共享。

这个概念在web应用中非常有用,可以在短暂的HTTP请求期间处理一些复杂的任务。

准备工作

在前面的部分我们发送了一条内容为“Hello World”的信息,现在我们会发送一些字符串,把这些字符串当做复杂的任务,我们并没有一个实际的任务,像是图片缩放,或者转换PDF文件,所以我们使用sleep方法来假设任务很繁忙。我们会在字符串中加入一些“.”来表示复杂复杂程度;每一个“.”表示需要耗时1秒,比如,“Hello …”代表需要耗时3秒。

我们从上一节的基础上稍微改动了一下send.php,来允许消息可以从命令行发送,这个程序会发送任务到队列中,把它命名为new_task.php

$data = impllode(' ',array_slice($argv,1));
if(empty($data))$data = "Hello World";
$msg = new AMQPMessage($data,
	array('delivery_mode'=>2)#设置消息持久化,下边会讲到。
);
$channel->basic_publish($msg,'','task_queue');
echo "[x] Sent ",$data,"\n";

上一节的receive.php也需要一些改动:需要为消息中的每一个“.”模拟1秒的工作。它会从队列中取出消息并运行,把它命名为worker.php:

$callback = function($msg){
	echo "[x] Received ",$msg->body,"\n";
	sleep(substr_count($msg->body,'.'));
	echo "[x] Done","\n";
	$msg->delivery_info['channel]->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_gos(null,1,null);
$channel->basic_consume('task_queue','',false,false,false,false,$callback);

注意我们伪造的任务需要花费时间(即发送的字符串中要有一些”.“)

然后运行:

php new_task.php "A very hard task which takes two seconds.."
php wordker.php

轮询分发

使用工作队列的一个好处就是它能够并行的处理队列。如果有太多工作需要处理,只需要添加新的worker就可以了。

首先,我们试着同时运行两个worker.php,它们都会从队列接收到消息,但是到底是不是这样呢?我们看一下。

此时需要打开3个终端,其中两个运行worker.php,这两个就是我们的消费者 - C1和C2。

shell1

php worker.php
[*] Waiting for messages. To exit press CTRL+C

shell2

php worker.php
[*] Waiting for messages. To exit press CTRL+C

在第三个终端中我们会发送新的任务,消费者程序开始运行后就可以发送一些消息了。

shell3

php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

我们看一下发送给worker的是什么:

shell1

php worker.php
[*] Waiting for messages.To exit press CTRL+C
[x]Received 'First message.'
[x]Received 'Third message...'
[x]Received 'Fifth message.....'

shell2

php worker.php
[*] Waiting for messages.To exit press CTRL+C
[x]Received 'Second message.'
[x]Received 'Fourth message...'

RabbitMQ会默认按顺序把消息发送给下一个消费者,平均每个消费者都会得到一样多数量的消息,这种分发消息的方式叫做轮询。试着添加三个或更多个worker来运行。

消息响应

执行一个任务会消耗一定的时间,也许你想知道如果一个消费者在执行一个耗时较长的任务时但是在执行一部分的时候挂掉会发生什么。在我们当前的代码中,一旦RabbitMQ把消息分发给消费者便会立即从内存中移除。这种情况下,如果停止一个worker,它正在处理的消息就会丢失。同时其他所有发送给这个worker的还没有处理的消息也会丢失。

但是我们不想丢失任何任务,如果一个worker挂掉,需要把任务发送到另一个worker。

为了确保消息永不丢失,RabbitMQ支持消息响应(message acknowledgements),消费者会发送一个响应告诉RabbitMQ已经收到了某条消息,并且已经处理,这样RabbitMQ就可以删掉它了。

如果一个消费者程序在未发送响应之前挂掉了(频道关闭,链接关闭,或者TCP连接丢失),RabbitMQ会认为消息没有完全处理然后会重新推送到队列中。如果此时有其他的消费者程序在运行,RabbitMQ会很快把消息发送给另一个消费者。这样就可以确保消息不会丢失,即使worker偶尔挂掉。

消息是没有超时的概念的,当worker断开连接的时候,RabbitMQ会重新发送消息,这样在处理一个耗时较长的消息任务时就不会出现问题了。

消息响应默认是关闭的。可以通过设置basic_consume的第四个参数为false(true表示不开启应答),然后在处理完任务的时候从worker发送一个正确的响应内容。

$callback = function($msg){
	echo "[x] Received ",$msg->body,"\n";
	sleep(substr_count($msg->body,'.'));
	echo "[x] Done","\n";
	$msg->delivery_info['channel]->basic_ack($msg->delivery_info['delivery_tag]);
};
$channel->basic_consume('task_queue','',false,false,false,false,$callback);

这样我们就可以确保当你CTRL+C杀掉一个正在处理消息的worker的时候,消息并不会丢失。在这个worker挂掉之后,所有未响应的消息就会发送。

忘了响应

一个很容易犯的错误就是忘了basic_ack,后果很严重。消息会在程序退出后重新发送(可能看起来像是随机返还 原文:which may look like random redelivery),但是如果它不释放未响应的消息,RabbitMQ就会占用越来越多的内存。

为了排除这种错误可以使用rabbitmqctl来打印messages_unacknowledges字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Listing queues ...
hello    0       0
...done.

消息持久化

我们已经学习了确保即使消费者程序挂掉,任务也不会丢失。但是任务还是会在RabbitMQ服务停止的时候丢失。

当RabbitMQ退出或崩溃,它会丢失之前所有的队列和消息,除非你特意告诉它。所以我们必须把队列和消息设为持久化。

首先,为了队列不丢失,需要把它声明为持久化(durable),所以修改queue_declare的第三个参数为true:

$channel->queue_declare('hello',false,true,false,false);

尽管这行代码本身是正确的,但是仍然不会正确运行。因为在之前已经定义过一个非持久化的 hello 队列。RabbitMQ不允许使用不同参数重新定义一个已经存在的队列,它会返回一个错误。但是可以用一个快捷的方法去解决,定义一个不同名字的队列,比如 task_queue:

$channel->queue_declare('task_queue',false,true,false,false);

需要把生产者和消费者程序都设置为 true。

这时候,我们就可以确保在RabbitMQ重启之后task_queue队列不会丢失。现在需要设置消息持久化了 - 通过设置AMQPMessage的属性数组中消息属性 delivery_mode = 2来达到。

$msg = new AMQPMessage($data,
		array('delivery_mode'=>2) //设置消息持久化
	);

关于消息持久化的说明

设置消息持久化并不能完全保证消息不会丢失。这只是告诉让RabbitMQ要把消息保存到硬盘,但是从RabbitMQ接收到消息到保存完成仍然还有一个短暂的间隔时间。因为RabbitMQ并不是每一条消息都会使用fsync(2),可能只是保存到缓存中而不是真正的写到磁盘里。并不能保证消息真正的持久化,但是对于简单的工作队列已经足够了。如果你需要更健壮的持久化,可以使用publisher confirms机制。

公平分发

也许你注意到它仍没有像我们想的那样去派发任务,比如在两个worker的情况下,处理奇数消息的比较繁忙,处理偶数消息的比较轻松,一个worker不断的忙碌而另一个几乎不需要工作,但是RabbitmQ并不知道这些,并且继续一如既往的派发消息。

这是因为RabbitMQ在消息进入队列的时候只管去派发,并不管消费者未做出响应的消息数。它只是把每第n条消息发送给第n个消费者。

我们可以使用basic_qos方法,并设置prefetch_count = 1。这样是告诉RabbitMQ在同一时刻不要发送超过1条消息给一个worker,或者说,不要发送新的消息给worker直到它已处理完上一条消息并作出了响应。这样,它就会把消息发送给下一个空闲的worker了。

$channel->basic_qos(null,1,null);

注意队列长度

如果所有的worker都处于忙碌状态,队列就会填满,你需要留意,添加更多的worker,或者使用其他的策略。

整合

最终,new_task.php的代码如下:

<?php

require_once __DIR__ .'/verdor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue',false,true,false,false);

$data = implode(' ',array_slice($argv,1));

if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data,
	array('delivery_mode'=>2) // 消息持久化
);
$channel->basic_publish($msg,'','task_queue');

echo "[x]Sent ", $data, "\n";

$channel->close();
$connection->close();
?>

new_task.php源码

worker.php

<?php

require_once __DIR__ .'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue',false,true,false,false);

echo '[*] Waiting for messages.To exit press CTRL+C',"\n";

$callback = function($msg){
	echo "[x]Received ",$msg->body,"\n";
	sleep(substr_count($msg->body,'.'));
	echo "[x]Done","\n";
	$msg->delivery_info['chennel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null,1,null);
$channel->basic_consume('task_queue','',false,false,false,false,$callback);

while(count($channel->callbacks)){
	$channel->wait();
}

$channel->close();
$connection->close();
?>

worker.php

使用消息应答和prefetch_count=1后,就可以运行一个工作队列了,持久模式选项会在即使RabbitMQ重启的情况下保留任务。

现在我们可以继续学习第三部分的内容,学习如何发送相同的消息给多个消费者。

原文地址:Work queues

PHP RabbitMQ 教程(一) - 介绍

准备工作

先决条件

本教程先决条件是RabbitMQ已经安装并正在以5672端口运行在 localhost,如果你使用了不同的域,端口,用户,密码,连接配置需要适当改变。

获得帮助

如果在本教程中遇到问题,可以通过邮件列表进行联系。

介绍

RabbitMQ是一个消息代理,它的本质是,从producers(生产者)接收消息,然后发送给consumers(消费者),在这个过程中,可以根据自己的配置规则使用路由,缓冲区,保存消息。

通常的,RabbitMQ,信息传送(messaging),使用一些专业术语。(RabbitMQ, and messaging in general, uses some jargon.)

>生产(Producing)仅仅意味着发送,发送信息的程序叫做生产者(producers),以下图表示:

>队列就是一个信箱的名字,存在于RabbitMQ内部,虽然消息在RabbitMQ和你的应用之间传输,但是只能存在于队列里,队列没有大小限制,它可以存储尽可能多的消息,本质上它是一个无限大的缓冲区,多个producers(生产者)可以通过一个队列发送消息,多个consumers(消费者)也可以尝试从一个队列接收消息,队列以下图表示,队列的名字在图的上边:

>consumers(消费者)的意思与接收相似,消费者主要是等待接收消息的程序,以下图表示:

需要注意的是,生产者,消费者,和代理,不需要一定在一台机器上,事实上在大多数情况下他们确实不在一台机器上。

“Hello World”

(使用php-amqplib库)

在这一部分,我们使用PHP写两段程序,一个生产者发送一条消息,一个消费者接收消息并打印出来。我们会忽略一些php-amqplib API的细节,从简单的事情开始学习,这是一段内容为“Hello World”的消息。

在下边的示意图中,“P”是生产者,“C”是消费者,中间的盒子是队列 — 一个RabbitMQ代表消费者的消息缓冲区。

php-amqplib库

RabbitMQ支持很多协议,本教程包含AMQP 0-9-1,一个开放,通用信息协议,RabbitMQ支持Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等多种语言(详见这里),在本教程中我们使用php-amqplib,使用Composer 管理依赖。

添加一个composer.json文件到你的项目目录。

{
    "require": {
        "php-amqplib/php-amqplib": "2.5.*"
    }
}

如果你已经安装了 Composer ,可以运行如下的代码:

composer.phar install

这是一个Windows系统下的Composer安装文件。

现在我们已经安装了php-amqplib,可以写程序了。

发送

新建一个send.php作为发送端,receive.php作为接收端,发送端会连接RabbitMQ,发送一条信息,然后退出。

在send.php中,需要引用php-amqplib库,和使用其中的一些必要的类。

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

接下来,建立到RabbitMQ服务器的连接:

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

这里我们使用socket进行连接,处理协议和鉴定,这样就已经连接到了本机的代理,如果想要连接不同的主机,只要更改localhost为该主机的名称或IP地址即可。

下一步,建立频道,大部分API的工作都在这完成。

要想发送信息,需要声明一个队列,之后可以向这个队列里发布消息。

$channel->queue_declare('hello', false, false, false, false);

$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

声明队列是幂等的 — 它仅在不存在的时候才会被创建,如果存在也不会受影响。消息内容是一个字节数组(byte array),所以可以发送任何内容。

最后,关闭频道和连接。

$channel->close();
$connection->close();

这是send.php类的完整内容。

发送失败

如果这是第一次使用RabbitMQ,并且没有看到“Sent”信息(即“ [x] Sent ‘Hello World!”),也许你抓耳挠腮的想知道为什么出错了,也许是代理没有足够的硬盘空间(默认情况下需要至少1G的空间)导致拒绝接收信息。检查日志文件,有必要的花调低限值。这个配置文件文档将会展示给你如何设置disk_free_limit。

接收

收件人,与发送者只发送一条消息不同,接收者会一直运行以监听信息并输出。

receive.php中与send.php中的 include和use 部分的代码一样。

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

设置连接与send.php一样,打开连接和频道,命名一个队列,需要注意的是,队列名需要与send.php所发布的队列的名字一致。

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

注意,我们在此声明了一个队列,因为有可能会在send程序开启前先开启receive程序,我们想要确保在试着接收消息之前队列就已经存在了。

下一步,告诉服务器去从队列传送消息,我们会定义一个用于从服务器接收消息的函数,记住,消息会异步的从服务器发送到客户端。

$callback = function($msg) {
  echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

此处使用while方法,当收到消息时,会把收到的消息传入到$callback方法里。

这是receive.php类的全部内容。

现在我们可以运行两段脚本了,在命令行里,执行sender程序。

php send.php

然后,执行receiver程序

php receive.php

receiver程序会把通过sender程序发送的内容打印出来,receiver程序会一直运行,监听新消息(使用ctrl+c停止),所以试着运行sender程序从另一个命令行。

如果想查看队列,可以运行rabbitmqctl list_queues。

Hello World!

查看第二部分,建立一个简单的队列。

原文地址:“Hello World!”