团队使用RabbitMQ几个场景

死信队列

Consumer在消费消息包的时候,如果出现一些异常,希望消息包不被直接丢弃,而是可以过段时间继续消费。

依赖Queue的x-dead-letter-*属性
  1. basic_consume设置ack模式
  2. 声明死信队列,设置x-message-ttl=30000x-dead-letter-routing-key=主队列名
  3. 声明主队列,设置x-dead-letter-routing-key=死信队列名
  4. 主队列通过RoutingKey绑定到Exchange
效果

如果消费逻辑出现异常,消费脚本会调用basic_reject(),消息包会被RabbitMQ Requeue到死信队列中。
30s超时后,消息包会重新进入主队列的队尾。

消息包重试次数上限

进入死信队列的消息包,通过配置,可以在指定时间之后,回到主队列。如果一个消息包被重复消费多次之后,希望能后丢弃掉,而不是无限重复消费。

依赖Message的Header中x-death属性

消息包如果从一个队列,requeue到另一个队列后,会带上x-death属性,x-death会记录该消息包在该队列存在的次数。

private static function needConsume(AMQPMessage $message, $suggestRetryThreshold)
{
    $needConsume = true;
    if (!empty($message->get_properties()['application_headers'])) {
        /**
         * @var AMQPTable $amqpTable
         */
        $amqpTable = $message->get_properties()['application_headers'];
        if (
            !empty($amqpTable->getNativeData()['x-death']) &&
            is_array($amqpTable->getNativeData()['x-death'])
        ) {
            $xDeathHeader = $amqpTable->getNativeData()['x-death'];
            foreach ($xDeathHeader as $xDeathDetail) {
                $queueName = !empty($xDeathDetail['queue']) ? $xDeathDetail['queue'] : '';
                $deathCount = !empty($xDeathDetail['count']) ? $xDeathDetail['count'] : 0;
                // 只需要统计死信队列(*_requeue_*) deathCount
                if (strpos($queueName, '_requeue_') === false) {
                    continue;
                }
                // 由Consumer代码控制重试次数
                if ($deathCount > $suggestRetryThreshold) {
                    $needConsume = false;
                    break;
                }
            }
        }
    }

    return $needConsume;
}
效果

消息包第一次消费失败,进入死信队列,30s超时后,进入主队列,开始第一次重试。
如果设置5次重试次数。那么该消息包会被重试6次(第一次正常消费,后面五次是重试消费)

延迟队列

产品会提出,用户触发一个事件多久之后,希望能够做什么事情。

依赖Queue的x-message-ttl和x-dead-letter-*属性

官方其实没有提供正式的延迟队列扩展。使用ttl配合dlx,可以做出延迟队列的效果。

  1. basic_consume设置ack模式
  2. 声明死信队列,设置x-message-ttl=30000x-dead-letter-routing-key=主队列名
  3. 声明主队列,设置x-dead-letter-routing-key=死信队列名
  4. 声明延迟队列,设置x-message-ttl=maxDelayTimex-dead-letter-routing-key=主队列名
  5. 主队列通过RoutingKey绑定到Exchange
  6. 消息包在publish的过程中,Message可以设置expiration属性
效果

消息包会优先延迟队列,过期后,会进入主队列,之后流程会和死信队列效果一样。

优先级队列

如果队列中堆积的消息比较多的情况下,希望优先处理某些核心用户数据。

依赖Queue的x-max-priority和Message的priority属性

待补充

效果

待补充

广播到所有消费者

RabbitMQ中Exchange有Fanout模式,意味着一个消息包可以复制到多个Queue中。 后台的服务部署可能是反向代理模式,服务部署在多台服务器上。可能会出现一个消息包广播到所有的服务场景。 前段时间,参与一个Websocket项目开发。Websocket连接保持在各个服务内部,也就是说每台机器上的服务,都可能有Websocket连接。
如果想要推送消息给所有用户,就需要广播到所有服务。

依赖Exchange的autoDelete和Queue的exclusive属性

待补充

效果

待补充