RabbitMQ消费脚本注册信号

ctrl + c / kill pid

Consumer消费逻辑代码如下,精简了所有声明逻辑。

<?php

class AMQPLibSubscribe
{
    private static $channel = null;

    private static $flagInterrupted = false;

    private static $consumerTag = null;

    public static function subscribe($callUserFunc)
    {
        // 这里省略获取Channel、声明Exchange、声明Queue、Exchange绑定Queue逻辑
        // 所有Channel调用函数的参数,都有省略
        $channel = self::$channel;

        // 注册信号处理
        self::registSignal();

        // 处理消费逻辑
        $callback = function ($message) use ($callUserFunc, $channel) {
            try {
                call_user_func($callUserFunc, $message->body);
                // 不抛异常则认为是处理成功,ack后删除
                $channel->basic_ack($message->delivery_info['delivery_tag']);
            } catch (\Exception $e) {
                // 抛出异常,需要重新消费,reject后requeue消息包
                $channel->basic_reject($message->delivery_info['delivery_tag']);
            }
        };
        while (!self::$flagInterrupted) {
            self::$consumerTag = $channel->basic_consume($callback);
            while (isset($channel->callbacks[self::$consumerTag]) && $channel->getConnection()->select(null)) {
                $channel->wait();
            }
        }
    }

    private static function registSignal()
    {
        pcntl_async_signals(true);
        pcntl_signal(SIGINT, function ($signal) {
            self::handleSignal($signal);
        });
        pcntl_signal(SIGTERM, function ($signal) {
            self::handleSignal($signal);
        });
        pcntl_signal(SIGHUP, SIG_IGN);
    }

    private static function handleSignal($signal)
    {
        echo (sprintf('consumerTag[%s] got signal[%d]', self::$consumerTag, $signal)) . "\n";
        self::$flagInterrupted = true;
        self::$channel->basic_cancel(self::$consumerTag);
    }
}

注册了 SIGINT SIGTERM SIGHUP 信号处理函数,当触发 ctrl + c (SIGINT) 或者 kill pid (SIGTERM) 信号后,flagInterrupted 会被赋值成 true,Channel也会调用basic_cancel()取消consumer。

我们结合代码,详细分析下中断流程

// 1. 上面 subscribe 逻辑
public static function subscribe($callUserFunc)
{
    // ...

    while (!self::$flagInterrupted) {
        self::$consumerTag = $channel->basic_consume($callback);
        while (isset($channel->callbacks[self::$consumerTag]) && $channel->getConnection()->select(null)) {
            $channel->wait();
        }
    }
}

// 2. StreamIO.php Line:405
public function select($sec, $usec)
{
    // ...

    set_error_handler(array($this, 'error_handler'));
    try {
        $result = stream_select($read, $write, $except, $sec, $usec);
    } catch (\ErrorException $e) {
        restore_error_handler();
        throw $e;
    }
    restore_error_handler();

    return $result;
}

while 条件判断 $channel->getConnection()->select(null) 会执行到 $result = stream_select($read, $write, $except, $sec, $usec);。 当没有中断信号时,select(null) 会永久block住当前进程,直到有事件发生,或系统中断信号。

官方stream_select()文档,对于 sec 字段描述如下:

If tv_sec is NULL stream_select() can block indefinitely, 
returning only when an event on one of the watched streams occurs (or if a signal interrupts the system call).

对于返回值描述如下:

On error FALSE is returned and a warning raised (this can happen if the system call is interrupted by an incoming signal).

稍微总结一下
如果RabbitMQ没有发送content给消费脚本,或没有系统中断信号,进程会一直 block 在 $channel->wait() 之前。 当管理员使用 ctrl + c 或者 kill pid 发起系统中断信号时,会触发 StreamIO->error_handle() 函数,以及我们注册的 handleSignal($signal) 逻辑。
处理完注册函数后,$result = stream_select($read, $write, $except, $sec, $usec); 会返回false。 等到当前消息包处理完之后,回到while的循环判断这里,条件将就不再满足,脚本会优雅的退出。

kill -9 pid

kill -9 pid 会强制中断掉当前进程,但是此时可能还有消息包在消费中。如果我们选用的是ack模式,那么消息包并不会被删除,该消息包会被投递到下一个活跃的消费者。 对于业务逻辑来说,如果消息包被重复消费多次,或消费过程中被中断,我们需要使用事务以及幂等,来保证数据的最终一致性。

区别

kill -9 pid 不会等到当前消费的消息包结束,直接中断进程。由于消息包没有ack,同时consumer和RabbitMQ的连接断开。没有消费的消息包并不会被删除。
ctrl + ckill pid 会发起关闭进程信号,等待当前消息包处理完,并且ack后,才会退出进程。

效果展示

publish 循环20个消息包

kill pid

kill pid之后,第4个消息包执行完,才退出脚本。

ctrl + c

接着消费第5个消息包,ctrl + c之后,第7个消息包执行完,才退出脚本。

kill -9 pid

接着消费第8个消息包,kill -9 pid之后,第10个消息包没有执行完,进程就直接退出。

继续消费

接着消费kill -9中断的第10个消息包。