死信队列
Consumer在消费消息包的时候,如果出现一些异常,希望消息包不被直接丢弃,而是可以过段时间继续消费。
依赖Queue的x-dead-letter-*属性
basic_consume
设置ack模式- 声明死信队列,设置
x-message-ttl=30000
,x-dead-letter-routing-key=主队列名
- 声明主队列,设置
x-dead-letter-routing-key=死信队列名
- 主队列通过
RoutingKey
绑定到Exchange
效果
如果消费逻辑出现异常,消费脚本会调用basic_reject(),消息包会被RabbitMQ Requeue到死信队列
中。
30s超时后,消息包会重新进入主队列的队尾。
消息包重试次数上限
进入死信队列的消息包,通过配置,可以在指定时间之后,回到主队列。如果一个消息包被重复消费多次之后,希望能后丢弃掉,而不是无限重复消费。
依赖Message的Header中x-death属性
消息包如果从一个队列,requeue到另一个队列后,会带上x-death
属性,x-death
会记录该消息包在该队列存在的次数。
private static function needConsume(AMQPMessage $message, $suggestRetryThreshold)
{
$needConsume = true;
if (!empty($message->get_properties()['application_headers'])) {
/**
* @var AMQPTable $amqpTable
*/
$amqpTable = $message->get_properties()['application_headers'];
if (
!empty($amqpTable->getNativeData()['x-death']) &&
is_array($amqpTable->getNativeData()['x-death'])
) {
$xDeathHeader = $amqpTable->getNativeData()['x-death'];
foreach ($xDeathHeader as $xDeathDetail) {
$queueName = !empty($xDeathDetail['queue']) ? $xDeathDetail['queue'] : '';
$deathCount = !empty($xDeathDetail['count']) ? $xDeathDetail['count'] : 0;
// 只需要统计死信队列(*_requeue_*) deathCount
if (strpos($queueName, '_requeue_') === false) {
continue;
}
// 由Consumer代码控制重试次数
if ($deathCount > $suggestRetryThreshold) {
$needConsume = false;
break;
}
}
}
}
return $needConsume;
}
效果
消息包第一次消费失败,进入死信队列,30s超时后,进入主队列,开始第一次重试。
如果设置5次重试次数。那么该消息包会被重试6次(第一次正常消费,后面五次是重试消费)
延迟队列
产品会提出,用户触发一个事件多久之后,希望能够做什么事情。
依赖Queue的x-message-ttl和x-dead-letter-*属性
官方其实没有提供正式的延迟队列扩展。使用ttl配合dlx,可以做出延迟队列的效果。
basic_consume
设置ack模式- 声明死信队列,设置
x-message-ttl=30000
,x-dead-letter-routing-key=主队列名
- 声明主队列,设置
x-dead-letter-routing-key=死信队列名
- 声明延迟队列,设置
x-message-ttl=maxDelayTime
,x-dead-letter-routing-key=主队列名
- 主队列通过
RoutingKey
绑定到Exchange
- 消息包在publish的过程中,Message可以设置
expiration
属性
效果
消息包会优先延迟队列,过期后,会进入主队列,之后流程会和死信队列效果一样。
优先级队列
如果队列中堆积的消息比较多的情况下,希望优先处理某些核心用户数据。
依赖Queue的x-max-priority和Message的priority属性
待补充
效果
待补充
广播到所有消费者
RabbitMQ中Exchange有Fanout模式,意味着一个消息包可以复制到多个Queue中。
后台的服务部署可能是反向代理模式,服务部署在多台服务器上。可能会出现一个消息包广播到所有的服务场景。
前段时间,参与一个Websocket项目开发。Websocket连接保持在各个服务内部,也就是说每台机器上的服务,都可能有Websocket连接。
如果想要推送消息给所有用户,就需要广播到所有服务。
依赖Exchange的autoDelete和Queue的exclusive属性
待补充
效果
待补充