Golang控流

概述 #

流控(Rate limiting)是构建可扩展弹性系统的重要技术之一,目的是通过限制指定时间内允许通过的请求数量来控制流量。在 Go 中实施流控可以确保最佳的资源利用率,并保护应用不被过多的流量或滥用行为所冲垮。

流控包括定义一套规则,确定客户端在给定时间窗口内可以发出多少请求,从而确保系统能够处理负载,防止滥用或拒绝服务攻击。两种常见的流控方法是:

拒绝服务攻击(Denial of Service, DoS)是一种恶意行为,旨在剥夺合法用户访问网络服务或资源的能力。在拒绝服务攻击中,攻击者通过采取各种手段使目标系统或网络资源过载或不可用,从而阻止合法用户访问。

拒绝服务攻击的目标可以是各种网络服务,例如网站、服务器、路由器、域名系统(DNS)等。攻击者可能利用系统或网络的弱点,通过发送大量请求、占用资源、耗尽带宽或利用其他漏洞来导致服务不可用。

  • 固定窗口控流:在这种方法中,在一个固定时间窗口内执行控流。例如,如果流控设置为每分钟 100 个请求,则系统在任何给定的 60 秒窗口内最多允许 100 个请求,超过此限制的请求将被拒绝或延迟到下一个时间窗口。
  • 令牌桶控流:令牌桶控流基于令牌从桶中消失的概念。令牌桶最初装满固定数量的令牌,每个令牌代表一个请求。当客户端要发出请求时,必须从桶中获取一个令牌。如果令牌桶是空的,客户端必须等待,直到有令牌可用。

Go 提供了一个名为 golang.org/x/time/rate 的内置软件包,实现了流控功能。接下来我们看看如何使用固定窗口和令牌桶两种方法来实现流控。

固定窗口控流 #

package main

import (
 "fmt"
 "golang.org/x/time/rate"
 "time"
)

func main() {
 limiter := rate.NewLimiter(rate.Limit(100), 1) // Allow 100 requests per second

 for i := 0; i < 200; i++ {
  if !limiter.Allow() {
   fmt.Println("Rate limit exceeded. Request rejected.")
   continue
  }
  // Process the request
  fmt.Println("Request processed successfully.")
  time.Sleep(time.Millisecond * 100) // Simulate request processing time
 }
}

在上面的代码片段中,我们用 rate.NewLimiter 创建了一个限制器,其速率限制为每秒 100 个请求。每个请求都会调用 limiter.Allow() 方法,如果允许请求,则返回 true,如果超过速率限制,则返回 false,超过速率限制的请求将被拒绝。

令牌桶控流 #

package main

import (
 "fmt"
 "golang.org/x/time/rate"
 "time"
)

func main() {
 limiter := rate.NewLimiter(rate.Limit(10), 5) // Allow 10 requests per second with a burst of 5

 for i := 0; i < 15; i++ {
  if err := limiter.Wait(context.TODO()); err != nil { // 使用 context.TODO() 作为临时占位符
   fmt.Println("Rate limit exceeded. Request rejected.")
   continue
  }
  // Process the request
  fmt.Println("Request processed successfully.")
  time.Sleep(time.Millisecond * 100) // Simulate request processing time
 }
}

在上述代码中,我们用 rate.NewLimiter 创建了一个限制器,其速率限制为每秒 10 个请求,允许 5 个并发请求。每个请求都会调用 limiter.Wait() 方法,该方法会阻塞直到有令牌可用。如果令牌桶是空的,没有可用令牌,请求就会被拒绝。

动态控流 #

动态流控是指根据客户端行为、系统负载或业务规则等动态因素调整速率限制。这种技术允许我们实时调整流控,以优化资源利用率并提供更好的用户体验。让我们看看 Go 中动态流控的示例:

package main

import (
 "fmt"
 "golang.org/x/time/rate"
 "time"
)

func main() {
 limiter := rate.NewLimiter(rate.Limit(100), 1) // Initial rate limit of 100 requests per second

 // Dynamic rate adjustment
 go func() {
  time.Sleep(time.Minute) // Adjust rate every minute
  limiter.SetLimit(rate.Limit(200)) // Increase rate limit to 200 requests per second
 }()

 for i := 0; i < 300; i++ {
  if !limiter.Allow() {
   fmt.Println("Rate limit exceeded. Request rejected.")
   continue
  }
  // Process the request
  fmt.Println("Request processed successfully.")
  time.Sleep(time.Millisecond * 100) // Simulate request processing time
 }
}

在上面的代码片段中,我们创建了一个限制器,初始速率限制为每秒 100 个请求。然后,启动一个 goroutine,在一分钟后将速率限制调整为每秒 200 个请求。这样,我们就能根据不断变化的情况动态调整流控。

自适应控流 #

自适应流控可根据之前请求的响应时间或错误率动态调整速率限制,从而允许系统自动适应不同的流量条件,确保获得最佳性能和资源利用率。让我们看看 Go 中自适应流控示例:

package main

import (
 "fmt"
 "golang.org/x/time/rate"
 "time"
)

func main() {
 limiter := rate.NewLimiter(rate.Limit(100), 1) // Initial rate limit of 100 requests per second

 // Adaptive rate adjustment
 go func() {
  for {
   responseTime := measureResponseTime() // Measure the response time of previous requests
   if responseTime > 500*time.Millisecond {
    limiter.SetLimit(rate.Limit(50)) // Decrease rate limit to 50 requests per second
   } else {
    limiter.SetLimit(rate.Limit(100)) // Increase rate limit to 100 requests per second
   }
   time.Sleep(time.Minute) // Adjust rate every minute
  }
 }()

 for i := 0; i < 200; i++ {
  if !limiter.Allow() {
   fmt.Println("Rate limit exceeded. Request rejected.")
   continue
  }
  // Process the request
  fmt.Println("Request processed successfully.")
  time.Sleep(time.Millisecond * 100) // Simulate request processing time
 }
}

func measureResponseTime() time.Duration {
 // Measure the response time of previous requests
 // Implement your own logic to measure the response time
 return time.Millisecond * 200
}

在上述代码片段中,我们用 measureResponseTime 函数模拟测量之前请求的响应时间。根据测量到的响应时间,通过 limiter.SetLimit 设置不同的值来动态调整速率限制。这样,系统就能根据观察到的响应时间调整其流控策略。

分布式控流算法 #

流控似乎很简单: 只允许给定的客户端每分钟执行X个调用。在单个服务器实例上实现流控非常容易,可以很容易找到相关的库来实现。但问题是我们的API托管在6个数据中心(欧洲、北美和亚洲),每个数据中心都有多个实例,这意味着我们需要某种分布式流控系统。

流控不仅与调用次数有关,还需要和客户端同步当前被限制的状态(例如,使用专用的报头和状态码)。但是本文将主要关注用于流控的算法和系统。

利用负载均衡 #

在尝试开发自己的系统之前,更重要的是查看现有的基础设施是否能够提供想要的特性。

那么,部署在数据中心所有实例之前,并且已经在负责检查、路由流量的是什么?负载均衡器。大多数负载均衡器都提供了流控特性或某种可用于实现流控的抽象。例如,HAProxy有现成的可用于设置流控的stick tables,可以在实例之间同步状态,并且工作得很好。

不幸的是,负载均衡不支持我们需要的某些特性(动态限制、令牌自省token introspection、……),因此我们需要自己实现这些特定的需求。

初级方案 #

会话粘连(Sticky sessions) #

会话粘连(Session Stickiness)是指在负载均衡器(如反向代理服务器或负载均衡器)上将一次会话的所有请求都发送到同一台服务器的现象。这种现象通常发生在使用基于会话的应用程序(如 Web 应用程序)并且负载均衡器未正确配置时。

说到负载均衡,如果给定客户端的负载并不均衡,并且总是与单个实例交互🤓,那么就不需要分布式流控系统。大多数客户端访问距离最近的数据中心(通过geo-DNS),如果在负载均衡器上启用“stickiness”,客户端应该总是访问相同的实例,这种情况下我们可以使用简单的“本地”速率限制。

这在理论上可行,但在实践中行不通。Criteo系统面临的负载不是恒定的,例如,黑色星期五/Cyber Week是一年中最重要的时段。在此期间,团队随时处于戒备状态,准备扩大基础设施,以应对客户不断增长的需求。但是会话粘连和可伸缩性不能很好的配合(如果所有客户端都粘连在旧实例上,那么创建新实例又有什么用呢?)

使用更智能的会话粘连(在扩展时重新分配令牌)会有所帮助,但这意味着每次扩展时,客户端都可能切换到另一个实例上,而且并不知道客户端在前一个实例上执行了多少调用。本质上说,这将使我们的流控在每次伸缩时不一致,客户端可能在每次系统面临压力时会进行更多调用。

Chatty服务器 #

如果客户端可以访问任何实例,意味着“调用计数”必须在实例之间共享。一种方案是让每个实例调用所有其他实例,请求给定客户端的当前计数并相加。另一种方案反过来,每个服务器向其他服务器广播“计数更新”。

这会造成两个主要问题:

  • 实例越多,需要进行的调用就越多。
  • 每个实例都需要知道其他实例的地址,并且每次服务扩缩容时都必须更新地址。

虽然可以实现这个解决方案(本质上是一个点对点环,许多系统已经实现了),但成本较高。

Kafka #

如果不想让每个实例都和其他实例通信,可以利用Kafka同步所有实例中的计数器。

例如,当某个实例被调用时,就将一个事件推到对应的topic。这些事件会被滑动窗口聚合(Kafka Stream在这方面做得很好),每个客户端最后一分钟的最新计数会被发布到另一个topic上。然后,每个实例通过此topic获得所有客户端的共享计数。

问题在于Kafka本质上是异步的,虽然通常延迟很小,但当API负载增加时,也会增加延迟。如果实例使用了过时的计数器,那么可能会漏过那些本应被阻止的调用。


这些解决方案都有一个共同点: 当一切正常时,可以很好的工作,但当负载过重时,就会退化。我们的大部分系统都是这样设计的,通常情况下没有问题,但流控并不是典型组件,其目标就是保护系统的其他部分免受这种过重负载的影响。

流控系统的目标是在系统负载较重时工作良好,其构建目标是为最差的1%而不是好的99%的情况服务。

分布式算法 #

我们需要一个中心化的同步存储系统,以及为每个客户端计算当前速率的算法。内存缓存(如Memcached或Redis)是理想的系统,但并不是所有的流控算法都可以在缓存系统中实现。下面我们看看有什么样的算法。

简化起见,我们考虑尝试实现“每分钟100次调用”的流控。

基于事件日志的滑动窗口(Sliding window via event log) #

如果想知道某个客户端在过去一分钟内进行了多少次调用,可以在缓存中为每个客户端存储一个时间戳列表。每次调用时,相应的时间戳都会添加到列表中。然后循环遍历列表中的每一项,丢弃超过一分钟的旧项,并计算新项。

👍优点:

  • 非常精确
  • 简单

👎缺点:

  • 需要强大的事务支持(处理同一客户端的两个实例需要更新相同的列表)。
  • 根据不同的调用限制和次数,存储对象(列表)的大小可能相当大。
  • 性能不稳定(更多的调用意味着需要处理更多的时间戳)。
固定窗口(Fixed window) #

大多数分布式缓存系统都有特定的、高性能的“计数器”抽象(一个可以自动增加的整数值,附加在一个字符串键上)。

以“{clientId}”为key为每个客户端维护一个计数器非常容易,但只会计算自计数器创建以来客户端调用的次数,而不是最后一分钟的次数。以“{clientId}_{yyyyMMddHHmm}”为key可以每分钟都为客户端维护一个计数器(换句话说: 以1分钟为固定窗口),查找与当前时间相对应的计数器可以告诉我们这一分钟客户端执行的调用数量,如果这个值超过上限,就可以阻止调用。

请注意,这与“最近一分钟”不是一回事。如果在上午07:10:23有一次调用,固定窗口计数器会显示在上午07:10:00到07:10:23之间调用的数量。但我们真正想要的是早上07:09:23到07:10:23之间的调用数量。

在某种程度上,固定窗口算法每过一分钟都会“忘记”之前有多少次呼叫,因此客户端理论上可以在07:09:59执行100次调用,然后在07:10:00执行100次额外的调用。

👍优点:

  • 非常快(单个原子增量+读取操作)
  • 只需要非常基本的事务支持(原子计数器)
  • 性能稳定
  • 简单

👎缺点:

  • 不准确(最多会允许2倍调用)
令牌桶(Token bucket) #

回到1994年,父母把你送到游戏厅和朋友们一起玩《超级街霸2》。他们给你一个装了5美元硬币的小桶,然后去了街对面的酒吧,并且每个小时都会过来往桶里扔5美元硬币。因此你基本上被限制每小时玩5美元(希望你在《街头霸王》中表现出色)。

这就是“令牌桶”算法背后的主要思想: 与简单计数器不同,“桶”存储在每个客户端缓存中。桶是由两个属性组成的对象:

  • 剩余“令牌”的数量(或剩余可以进行的调用的数量)
  • 最后一次调用的时间戳。

当API被调用时,检索桶,根据当前调用和最后一次调用之间的时间间隔,向桶中添加新的令牌,如果有多余令牌,递减并允许调用。

所以,和“街头霸王”的例子相反,没有“父母”帮我们每分钟重新装满桶。桶在与令牌消耗相同的操作中被有效的重新填充(令牌的数量对应于上次调用之后的时间间隔)。如果最后一次调用是在半分钟之前,那么每分钟100次调用的限制意味着将向桶中添加50个令牌。如果桶太“旧”(最后一次调用超过1分钟),令牌计数将重置为100。

事实上,可以在初始化的时候填充超过100个令牌(但补充速度为100令牌/分钟): 这类似于“burst”功能,允许客户端在一小段时间内超过流控的限制,但不能长期维持。

注意: 正确计算要添加的令牌数很重要,否则有可能错误的填充桶。

该算法提供了完美的准确性,同时提供了稳定的性能,主要问题是需要事务(不希望两个实例同时更新缓存对象)。

👍优点:

  • 非常精确
  • 快速
  • 性能稳定
  • 优化初始令牌数量可以允许客户端“burst”调用

👎缺点:

  • 更复杂
  • 需要强大的事务支持

漏桶(Leaky bucket): 该算法的另一个版本。在这个版本中,调用堆积在bucket中,并以恒定的速率(匹配速率限制)处理。如果桶溢出,则拒绝调用。这实现起来比较复杂,但可以平滑服务负载(这可能是您想要的,也可能不是)。

🏆最好的算法? #

比较这三种算法,令牌桶似乎在性能和准确性方面提供了最好的折衷。但只有当系统提供良好的事务支持时,才有可能实现。如果有Redis集群,这是完美方案(甚至可以实现基于Lua的算法,直接运行在Redis集群上,以提高性能),但Memcached只支持原子计数器,而不是事务。

可以基于Memcached实现令牌桶的乐观并发(optimistic concurrent)版本[3],但这更加复杂,而且乐观并发的性能在负载较重的情况下会下降。


用固定窗口近似模拟滑动窗口 #

如果没有强大的事务支持,是否注定要使用不准确的固定窗口算法?

算是吧,但还有改进的空间。请记住,固定窗口的主要问题是它每过一分钟都会“忘记”之前发生的事情,但我们仍然可以访问相关信息(在前一分钟的计数器中),所以可以通过计算加权平均值来估计前一分钟的调用次数。

例如: 如果在00:01:43进行了一次调用,递增得到“00:01”计数器的值。由于这是当前的日历分钟,现在包含00:01:00至00:01:43之间的调用数(最后17秒还没有发生)。 但我们想要的是60s滑动窗口中的调用数,意味着我们错过了00:00:43到00:01:00这段时间的计数。为此我们可以读取“00:00”计数器,并以17/60的因子进行调整,从而说明我们只对最后17秒感兴趣。

如果负载不变,这一近似是完美的。但如果大多数调用都集中在前一分钟,那就会获得一个高估的值。而当大多数调用都集中在前一分钟结束后,这个数字就会被低估。

比较 #

为了更准确的了解精度差异,最好是在相同的条件下模拟两种算法。

下面的图显示了“固定计数器”算法在输入随机流量时将返回什么。灰色的线是一个“完美”的滑动窗口输出,该窗口在任何时间点对应于过去60秒内的呼叫次数,这是我们的目标。橙色虚线表示固定窗口算法对相同流量的“计数”。

它们在第一分钟的输出是相同的,但很快就可以看到固定窗口版本在每分钟标记处有很大的下降。固定窗口算法很少会超过100个调用的限制,这意味着会允许很多本应被阻止的调用。

下面的图表示相同的场景,具有相同的流量,但使用了近似的滑动窗口。同样,灰色线代表“完美”滑动窗口。橙色虚线表示近似算法。

在每分钟标记附近不再看到下降,可以看到新版本算法与完美算法更接近,它有时略高,有时略低,但总体上是巨大的进步。

收益递减 #

但我们能做得更好吗?

我们的近似算法只使用当前和以前的60秒固定窗口。但是,也可以使用几个更小的子窗口,一种极端方法是使用60个1s窗口来重建最后一分钟的流量。显然这意味着为每个调用读取60个计数器,这将极大增加性能成本。不过我们可以选择任意固定窗口时间,从中拟合近似值。窗口越小,需要的计数器就越多,近似值也就越精确。

我们看看组合5个15秒窗口会有什么效果:

正如预期的那样,准确率有所提高,但仍然不够完美。

我们处在一个经典的更好的准确性=更差的性能的情况下。没有绝对的最佳方案,必须平衡对于准确性和性能的要求,找到最适合的解决方案。如果你只关心保护自己的服务不被过度使用,而不需要持续控制,那么甚至最简单的固定窗口就可能是可行的解决方案。