问题背景
当前RabbitMQ集群架构如图所示,消费者consumer通过SLB连接到RabbitMQ集群。
但是SLB有连接超时限制,具体限制如下:
4. 负载均衡各监听连接超时时间是多少?
TCP监听: 900秒
UDP监听: 90秒
HTTP监听: 60秒
HTTPS监听: 60秒
Consumer与SLB之间的TCP连接,如果900s(15分钟)内,没有任何消息包传递,该TCP连接就会被SLB端主动关闭。
Consumer业务代码就会报错,报错日志如下:
报错代码如下:
class StreamIO extends AbstractIO
{
public function read(...)
{
...
if (!is_resource($this->sock) || feof($this->sock)) {
throw new AMQPRuntimeException('Broken pipe or closed connection');
}
...
}
}
解决思路
通过翻看 php-amqplib 源码,有2种方式,可以保证TCP连接不会因为空闲原因被关闭
- 开启tcp的keepalive开关,依赖操作系统tcp的keepalive能力
- 配置heartbeat参数,依赖sdk封装,在 read() select() 方法调用中,主动调用 check_heartbeat() 方法
keepalive
PHP有2套socket的封装:stream扩展和socket扩展。
- stream系列函数可以直接用fread/fwrite来读写,PHP做了一些封装,使用起来更方便。(当前团队选用stream)
- stream扩展是2层封装的,而socket扩展是原生socket函数的封装,性能更好。
在代码中开启tcp的keepalive开关,都是调用 socket_set_option() 函数。
class StreamIO extends AbstractIO
{
protected function enable_keepalive()
{
$socket = socket_import_stream($this->sock);
socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
}
}
class SocketIO extends AbstractIO
{
protected function enable_keepalive()
{
socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1);
}
}
开启keepalive开关后,tcp可以配置3个参数
/etc/sysctl.conf
/proc/sys/net/ipv4/tcp_keepalive*
tcp_keepalive_intvl (integer; default: 75; since Linux 2.4)
The number of seconds between TCP keep-alive probes.
tcp_keepalive_probes (integer; default: 9; since Linux 2.2)
The maximum number of TCP keep-alive probes to send before giving up and killing the connection if no
response is obtained from the other end.
tcp_keepalive_time (integer; default: 7200; since Linux 2.2)
The number of seconds a connection needs to be idle before TCP begins sending out keep-alive probes. Keep-
alives are sent only when the SO_KEEPALIVE socket option is enabled. The default value is 7200 seconds (2
hours). An idle connection is terminated after approximately an additional 11 minutes (9 probes an interval
of 75 seconds apart) when keep-alive is enabled.
tcp_keepalive_time: KeepAlive的空闲时长,或者说每次正常发送心跳的周期,默认值为7200s(2小时)
tcp_keepalive_intvl: KeepAlive探测包的发送间隔,默认值为75s
tcp_keepalive_probes: 在tcp_keepalive_time之后,没有接收到对方确认,继续发送保活探测包次数,默认值为9(次)
需要将tcp_keepalive_time,修改成 < 900 的数值,才能保证TCP连接的活跃
heartbeat
如果配置了heartbeat参数 30s,那么每15s Client端就会发送心跳包到Server端。
public function check_heartbeat()
{
// ignore unless heartbeat interval is set
if ($this->heartbeat !== 0 && $this->last_read && $this->last_write) {
$t = microtime(true);
$t_read = round($t - $this->last_read);
$t_write = round($t - $this->last_write);
// server has gone away
if (($this->heartbeat * 2) < $t_read) {
$this->reconnect();
}
// time for client to send a heartbeat
if (($this->heartbeat / 2) < $t_write) {
$this->write_heartbeat();
}
}
}
结果验证
keepalive
net.ipv4.tcp_keepalive_time = 901
启动Consumer,启动时间 2019-08-06 11:48:36,没有配置 heartbeat
截止 2019-08-06 12:03:36,刚好15分钟,TCP连接被关闭
// info 启动日志
2019-08-06 11:48:36.270514 web.h5:INFO T<amq.ctag-UFKq8ymnP2UwZhKEVDjrXg> consumer[amq.ctag-UFKq8ymnP2UwZhKEVDjrXg] started on queue[queue_test__dlx], procFunc[App\Console\MqConsumers\TestKeepAlive::logic] - MsgQHelper.php:494#startConsumer - 26694
// error 异常日志
2019-08-06 12:03:36.519344 web.h5:ERROR T<amq.ctag-UFKq8ymnP2UwZhKEVDjrXg> <0~Broken pipe or closed connection@StreamIO.php:214> - MsgQHelper.php:338#subscribeStr - 26694
net.ipv4.tcp_keepalive_time = 899
启动Consumer,启动时间 2019-08-06 12:07:47,没有配置 heartbeat
截止 2019-08-06 12:37:50,已超过30分钟,TCP连接仍然活跃
heartbeat
其中Consumer
- 启动时间 2019-08-06 11:34:45
- heartbeat 30s
通过图片分析
- 从黄线可以看出,client端每15s,发送一个消息包到server端
- 从蓝线可以看出,server端每30s,返回结果到client端