ctrl + c / kill pid
Consumer消费逻辑代码如下,精简了所有声明逻辑。
<?php
class AMQPLibSubscribe
{
private static $channel = null;
private static $flagInterrupted = false;
private static $consumerTag = null;
public static function subscribe($callUserFunc)
{
// 这里省略获取Channel、声明Exchange、声明Queue、Exchange绑定Queue逻辑
// 所有Channel调用函数的参数,都有省略
$channel = self::$channel;
// 注册信号处理
self::registSignal();
// 处理消费逻辑
$callback = function ($message) use ($callUserFunc, $channel) {
try {
call_user_func($callUserFunc, $message->body);
// 不抛异常则认为是处理成功,ack后删除
$channel->basic_ack($message->delivery_info['delivery_tag']);
} catch (\Exception $e) {
// 抛出异常,需要重新消费,reject后requeue消息包
$channel->basic_reject($message->delivery_info['delivery_tag']);
}
};
while (!self::$flagInterrupted) {
self::$consumerTag = $channel->basic_consume($callback);
while (isset($channel->callbacks[self::$consumerTag]) && $channel->getConnection()->select(null)) {
$channel->wait();
}
}
}
private static function registSignal()
{
pcntl_async_signals(true);
pcntl_signal(SIGINT, function ($signal) {
self::handleSignal($signal);
});
pcntl_signal(SIGTERM, function ($signal) {
self::handleSignal($signal);
});
pcntl_signal(SIGHUP, SIG_IGN);
}
private static function handleSignal($signal)
{
echo (sprintf('consumerTag[%s] got signal[%d]', self::$consumerTag, $signal)) . "\n";
self::$flagInterrupted = true;
self::$channel->basic_cancel(self::$consumerTag);
}
}
注册了 SIGINT SIGTERM SIGHUP 信号处理函数,当触发 ctrl + c
(SIGINT) 或者 kill pid
(SIGTERM) 信号后,flagInterrupted
会被赋值成 true,Channel也会调用basic_cancel()
取消consumer。
我们结合代码,详细分析下中断流程
// 1. 上面 subscribe 逻辑
public static function subscribe($callUserFunc)
{
// ...
while (!self::$flagInterrupted) {
self::$consumerTag = $channel->basic_consume($callback);
while (isset($channel->callbacks[self::$consumerTag]) && $channel->getConnection()->select(null)) {
$channel->wait();
}
}
}
// 2. StreamIO.php Line:405
public function select($sec, $usec)
{
// ...
set_error_handler(array($this, 'error_handler'));
try {
$result = stream_select($read, $write, $except, $sec, $usec);
} catch (\ErrorException $e) {
restore_error_handler();
throw $e;
}
restore_error_handler();
return $result;
}
while 条件判断 $channel->getConnection()->select(null)
会执行到 $result = stream_select($read, $write, $except, $sec, $usec);
。
当没有中断信号时,select(null)
会永久block住当前进程,直到有事件发生,或系统中断信号。
官方stream_select()文档,对于 sec 字段描述如下:
If tv_sec is NULL stream_select() can block indefinitely,
returning only when an event on one of the watched streams occurs (or if a signal interrupts the system call).
对于返回值描述如下:
On error FALSE is returned and a warning raised (this can happen if the system call is interrupted by an incoming signal).
稍微总结一下
如果RabbitMQ没有发送content给消费脚本,或没有系统中断信号,进程会一直 block 在 $channel->wait()
之前。
当管理员使用 ctrl + c
或者 kill pid
发起系统中断信号时,会触发 StreamIO->error_handle()
函数,以及我们注册的 handleSignal($signal)
逻辑。
处理完注册函数后,$result = stream_select($read, $write, $except, $sec, $usec);
会返回false。
等到当前消息包处理完之后,回到while的循环判断这里,条件将就不再满足,脚本会优雅的退出。
kill -9 pid
kill -9 pid
会强制中断掉当前进程,但是此时可能还有消息包在消费中。如果我们选用的是ack模式,那么消息包并不会被删除,该消息包会被投递到下一个活跃的消费者。
对于业务逻辑来说,如果消息包被重复消费多次,或消费过程中被中断,我们需要使用事务以及幂等,来保证数据的最终一致性。
区别
kill -9 pid
不会等到当前消费的消息包结束,直接中断进程。由于消息包没有ack,同时consumer和RabbitMQ的连接断开。没有消费的消息包并不会被删除。
ctrl + c
或 kill pid
会发起关闭进程信号,等待当前消息包处理完,并且ack后,才会退出进程。
效果展示
publish 循环20个消息包
kill pid
kill pid
之后,第4个消息包执行完,才退出脚本。
ctrl + c
接着消费第5个消息包,ctrl + c
之后,第7个消息包执行完,才退出脚本。
kill -9 pid
接着消费第8个消息包,kill -9 pid
之后,第10个消息包没有执行完,进程就直接退出。
继续消费
接着消费kill -9
中断的第10个消息包。