redis与限流算法 由于 Web 应用在部署时,常常使用多网关,微服务的组织形式,因此经常需要在进程间进行数据交换,从而达到更好的限流效果。由于限流算法中的数据具有很强的时效性,并且不需要较高的安全性,使用 redis 组件来实现限流算法是一个很好的选择。
常见的限流算法一共有四种:固定窗口,滑动窗口,桶漏算法,令牌桶。这些算法具有一定的复杂度,需要借助 redis 中的 lua 脚本来实现。
固定窗口 每个请求到达时,检查是否具有窗口存在,如果无窗口存在,则将创建窗口并且设置过期时间、允许通过的流量。此后,每一个请求到达时,如果窗口存在,都将流量-1,如果不能够完成操作,则需要等待。优点是性能比较好,实现简单,缺点是精度不够高,这个适合作为每一个用户的限流次数。
1 2 3 4 5 6 7 8 9 10 local key = KEYS[1 ]if redis.call("exists" ,key) == 0 then redis.call("set" ,key,10 ,"EX" ,10 ) return 10 end local count = tonumber (redis.call("decr" ,key))return count
滑动窗口 滑动窗口是固定窗口的一种优化,它将窗口改变为一个链表,如将一分钟划分为 6 段,每一段代表十秒钟。每个请求到达时,先检查所有窗口是否过期,如果过期则删除窗口,并且检查最后一个窗口的创建时间,如果窗口超过 10s,那么在其后面写入多个窗口,并在最后一个窗口处写入值。滑动窗口同样适合作为每一个用户的限流算法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 local key = KEYS[1 ]local expire = tonumber (ARGV[1 ])local currentMs = tonumber (ARGV[2 ])local count = tonumber (ARGV[3 ])local windowStartMs = currentMs - expire * 1000 ;local current = redis.call('zcount' , key, windowStartMs, currentMs)if current and tonumber (current) >= count then return tonumber (current); end redis.call("ZREMRANGEBYSCORE" , key, 0 , windowStartMs); redis.call("zadd" , key, tostring (currentMs), currentMs); redis.call("expire" , key, expire); return tonumber (current)
桶漏算法 记录每一个请求,并且将其放入缓冲区中,缓冲区具有上限,如果超出缓冲区则直接拒绝。每隔设置的时间间隔,会从缓冲区中选取设置数量的请求进行处理。优点是请求处理非常平滑,缺点是不能够应对突发请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 local capacity = tonumber (ARGV[1 ])local rate = tonumber (ARGV[2 ])local now = tonumber (ARGV[3 ])local acquire = tonumber (ARGV[4 ])local water = tonumber (redis.call('hget' , KEYS[1 ] , KEYS[2 ]) or 0 ) local time = tonumber (redis.call('hget' , KEYS[1 ] , KEYS[3 ]) or now) water = math .max (0 , water - (now - time ) * rate) redis.call('hset' , KEYS[1 ] ,KEYS[3 ] , now) if (water + acquire <= capacity) then redis.call('hset' , KEYS[1 ] , KEYS[2 ] , water + acquire) return 1 else return 0 end
令牌桶 桶漏算法的相反操作。每隔一段时间,将名额放入缓冲区中,名额具有上限,每一个请求需要拿到一个名额,否则会被拒绝处理。令牌桶算法允许的瞬时压力是桶上限+ 定时刷新名额,持续压力是定时刷新的名额。允许一定突发压力出现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 local function acquire (key, permits, curr_mill_second) local local_key = key if tonumber (redis.pcall ("EXISTS" , local_key)) < 1 then return 0 end local rate_limit_info = redis.pcall ("HMGET" , local_key, "last_mill_second" , "curr_permits" , "max_permits" , "rate" ) local last_mill_second = rate_limit_info[1 ] local curr_permits = tonumber (rate_limit_info[2 ]) local max_permits = tonumber (rate_limit_info[3 ]) local rate = rate_limit_info[4 ] if type (max_permits) == 'boolean' or max_permits == nil then return 0 end if type (rate) == 'boolean' or rate == nil then return 0 end local local_curr_permits = max_permits; if (type (last_mill_second) ~= 'boolean' and last_mill_second ~= nil ) then if (curr_mill_second - last_mill_second < 0 ) then return -1 end local reverse_permits = math .floor (((curr_mill_second - last_mill_second) / 1000 ) * rate) local expect_curr_permits = reverse_permits + curr_permits; local_curr_permits = math .min (expect_curr_permits, max_permits); if (reverse_permits > 0 ) then redis.pcall ("HSET" , local_key, "last_mill_second" , curr_mill_second) end else redis.pcall ("HSET" , local_key, "last_mill_second" , curr_mill_second) end local result = -1 if (local_curr_permits - permits >= 0 ) then result = 1 redis.pcall ("HSET" , local_key, "curr_permits" , local_curr_permits - permits) else redis.pcall ("HSET" , local_key, "curr_permits" , local_curr_permits) end return result end
四种算法对比
固定窗口:不需要定时维护,实现非常简单,单个用户限流
滑动窗口:可以在用户请求线程中维护,精度需求较高的单个用户限流
桶漏算法:一般用于保护数据库等后端系统不会被突发压力击溃。
令牌桶:一般用于统一网关的限流,允许合理范围内应对突发压力。