基于Redis的频率限制

最常见的计数限流

设置接口每秒请求阈值,一秒内每一个请求计数+1,超过阈值的请求可以随意处理。

<?php

$redis = new redis();
$redis->connect($host, $port);
$redis->auth($auth);

$key = "request:" . $requestUri . ":" . time();
$qps = $redis->incr($key);
$redis->expire($key, 3);

if ($qps > $qpsThreshold) {
    // 处理超过阈值的请求
}

不过这样的限流策略有很大的局限性: 某接口设置100阈值,0.9~1.0秒中间进入100个请求,1.0~1.1秒中间进入100个请求,系统需要在0.9~1.1秒处理200个请求

在一个时间窗口内对请求进行限速

如果有一个时间窗口,那么就可以做到0.9~1.9秒中间只能进入100个请求

<?php

$redis = new redis();
$redis->connect($host, $port);
$redis->auth($auth);

// 计数器自增
$keyCount = "phpRateLimit_{$requestUri}_count";
$count = $redis->incr($keyCount);

// 处理“时间窗口key”和“请求唯一标记”
$keyMonitor = "phpRateLimit_{$requestUri}_monitor";
$uuid = md5(uniqid(mt_rand(), true));

// 处理“请求时间”和“限制时间”
$requestTime = floatval(sprintf('%.0f', array_sum(explode(' ', microtime())) * 1000));
$limiterTime = $limiterTime * 1000;

$result = $redis->multi()
    // 删除“时间窗口集合”中,过期的数据
    ->zRemRangeByScore($keyMonitor, "-inf", ($requestTime - $limiterTime))
    // 添加当前请求到“时间窗口集合”
    ->zAdd($keyMonitor, $requestTime, $uuid)
    // 当前“接口集合”和“时间窗口集合”取交集,删除过期的数据
    ->zInter($requestUri, [$requestUri, $keyMonitor], [1.0, 0.0])
    // 添加当前请求到“接口集合”
    ->zAdd($requestUri, $count, $uuid)
    // 查询当前请求在“接口集合”中排名
    ->zRank($requestUri, $uuid)
    ->exec();
$rank = $result[count($result) - 1];

if ($rank >= $limit) {
    $redis->multi()
        ->zrem($keyMonitor, $uuid)
        ->zrem($requestUri, $uuid)
        ->exec();
    // 处理超过阈值的请求
}

并发测试

配置Redis限流每秒2个请求,通过Jmeter压测,效果如下:

每秒成功2个请求

PHP并发请求

该部分属于扩展部分,基于GuzzleHttp库,以及Laravel框架的Artisan,实现的PHP并发测试脚本

<?php

namespace App\Console;

use Illuminate\Console\Command;
use GuzzleHttp\Pool;
use GuzzleHttp\Client;
use GuzzleHttp\Psr7\Request;
use Psr\Http\Message\ResponseInterface;

class Mock extends Command
{
    protected $signature = 'mock:concurrence {requestCount} {concurrencyCount}';

    public function handle()
    {
        $requestCount = intval($this->argument('requestCount'));
        $concurrencyCount = intval($this->argument('concurrencyCount'));

        $client = new Client();

        $requests = function ($requestCount) {
            $uri = 'http://xxx/redis/rate_limiter';
            for ($i = 0; $i < $requestCount; $i++) {
                yield new Request('GET', $uri);
            }
        };

        $sum = [];
        $pool = new Pool($client, $requests($requestCount), [
            'concurrency' => $concurrencyCount,
            'fulfilled' => function ($response, $index) use (&$sum) {
                /** @var ResponseInterface $response */
                $resp = json_decode($response->getBody()->getContents(), true);
                $currentTime = explode('.', $resp['results'][0])[0];
                if (empty($sum[$currentTime])) {
                    $sum[$currentTime] = [
                        'total' => 0,
                        'success' => 0,
                        'failure' => 0
                    ];
                }
                $sum[$currentTime]['total'] += 1;
                if ($resp['errorCode'] == 0) {
                    $sum[$currentTime]['success'] += 1;
                } else {
                    $sum[$currentTime]['failure'] += 1;
                }
            },
            'rejected' => function ($reason, $index) {
                $this->error('index: ' . $index . 'reason: ' . $reason);
            },
        ]);

        // Initiate the transfers and create a promise
        $promise = $pool->promise();

        // Force the pool of requests to complete.
        $promise->wait();

        foreach ($sum as $currentTime => $data) {
            $this->info('time: ' . $currentTime);
            $total = $data['total'];
            $success = $data['success'];
            $failure = $data['failure'];
            $this->warn(sprintf('total: %d, success: %d, failure: %d', $total, $success, $failure));
        }
    }
}

配置2000个请求,concurrency设置成20,效果如下: