最常见的计数限流
设置接口每秒请求阈值,一秒内每一个请求计数+1,超过阈值的请求可以随意处理。
<?php
$redis = new redis();
$redis->connect($host, $port);
$redis->auth($auth);
$key = "request:" . $requestUri . ":" . time();
$qps = $redis->incr($key);
$redis->expire($key, 3);
if ($qps > $qpsThreshold) {
// 处理超过阈值的请求
}
不过这样的限流策略有很大的局限性: 某接口设置100阈值,0.9~1.0秒中间进入100个请求,1.0~1.1秒中间进入100个请求,系统需要在0.9~1.1秒处理200个请求
在一个时间窗口内对请求进行限速
如果有一个时间窗口,那么就可以做到0.9~1.9秒中间只能进入100个请求
<?php
$redis = new redis();
$redis->connect($host, $port);
$redis->auth($auth);
// 计数器自增
$keyCount = "phpRateLimit_{$requestUri}_count";
$count = $redis->incr($keyCount);
// 处理“时间窗口key”和“请求唯一标记”
$keyMonitor = "phpRateLimit_{$requestUri}_monitor";
$uuid = md5(uniqid(mt_rand(), true));
// 处理“请求时间”和“限制时间”
$requestTime = floatval(sprintf('%.0f', array_sum(explode(' ', microtime())) * 1000));
$limiterTime = $limiterTime * 1000;
$result = $redis->multi()
// 删除“时间窗口集合”中,过期的数据
->zRemRangeByScore($keyMonitor, "-inf", ($requestTime - $limiterTime))
// 添加当前请求到“时间窗口集合”
->zAdd($keyMonitor, $requestTime, $uuid)
// 当前“接口集合”和“时间窗口集合”取交集,删除过期的数据
->zInter($requestUri, [$requestUri, $keyMonitor], [1.0, 0.0])
// 添加当前请求到“接口集合”
->zAdd($requestUri, $count, $uuid)
// 查询当前请求在“接口集合”中排名
->zRank($requestUri, $uuid)
->exec();
$rank = $result[count($result) - 1];
if ($rank >= $limit) {
$redis->multi()
->zrem($keyMonitor, $uuid)
->zrem($requestUri, $uuid)
->exec();
// 处理超过阈值的请求
}
并发测试
配置Redis限流每秒2个请求,通过Jmeter压测,效果如下:
每秒成功2个请求
PHP并发请求
该部分属于扩展部分,基于GuzzleHttp库,以及Laravel框架的Artisan,实现的PHP并发测试脚本
<?php
namespace App\Console;
use Illuminate\Console\Command;
use GuzzleHttp\Pool;
use GuzzleHttp\Client;
use GuzzleHttp\Psr7\Request;
use Psr\Http\Message\ResponseInterface;
class Mock extends Command
{
protected $signature = 'mock:concurrence {requestCount} {concurrencyCount}';
public function handle()
{
$requestCount = intval($this->argument('requestCount'));
$concurrencyCount = intval($this->argument('concurrencyCount'));
$client = new Client();
$requests = function ($requestCount) {
$uri = 'http://xxx/redis/rate_limiter';
for ($i = 0; $i < $requestCount; $i++) {
yield new Request('GET', $uri);
}
};
$sum = [];
$pool = new Pool($client, $requests($requestCount), [
'concurrency' => $concurrencyCount,
'fulfilled' => function ($response, $index) use (&$sum) {
/** @var ResponseInterface $response */
$resp = json_decode($response->getBody()->getContents(), true);
$currentTime = explode('.', $resp['results'][0])[0];
if (empty($sum[$currentTime])) {
$sum[$currentTime] = [
'total' => 0,
'success' => 0,
'failure' => 0
];
}
$sum[$currentTime]['total'] += 1;
if ($resp['errorCode'] == 0) {
$sum[$currentTime]['success'] += 1;
} else {
$sum[$currentTime]['failure'] += 1;
}
},
'rejected' => function ($reason, $index) {
$this->error('index: ' . $index . 'reason: ' . $reason);
},
]);
// Initiate the transfers and create a promise
$promise = $pool->promise();
// Force the pool of requests to complete.
$promise->wait();
foreach ($sum as $currentTime => $data) {
$this->info('time: ' . $currentTime);
$total = $data['total'];
$success = $data['success'];
$failure = $data['failure'];
$this->warn(sprintf('total: %d, success: %d, failure: %d', $total, $success, $failure));
}
}
}
配置2000个请求,concurrency设置成20,效果如下: