golang限流实践

前言 #

在编写服务的过程中,我们会遇到需要对服务接入请求进行限流的情况。通过对不同用户、访问内容的限流,我们可以减轻单个服务收到较大流量时对其他系统内服务的影响,防止系统因为单个来源的请求而无法及时对其他来源的请求响应,同时也可以减轻后续服务处理的压力。

当前系统限流方式 #

当前的服务中主要使用了令牌桶对用户和 token api 接入的请求进行了区分限流。令牌桶是一个比较均衡的限流方式,在能保证均匀恢复限流阈值的同时,还允许按照设定的容量值进行动态增加容量和速率。

起始时令牌桶中填满了可用的令牌,当有新的请求进入后,会从令牌桶取走令牌,同时令牌桶按照指定的速率恢复令牌。当令牌桶的令牌被取完后,新的请求因无法获取到令牌而被拒绝,直至令牌恢复到最小单个请求所需的令牌数量后,新的请求才可通过。也就是说请求到来的速度如果始终大于令牌产生的速度时,在消耗完桶中已存有的令牌后,超出部分的请求会被拒绝;桶内存有的令牌允许短时间内接收等于令牌数量的请求,即允许按照配置的数量(容量)处理突发的请求。

系统中设置了两个令牌桶限流器组,分别是用户令牌桶组和 api 令牌桶组。以 api 令牌桶组为例,配置令牌桶添加令牌的速率为 20 / s,桶的容量为 30(即在桶中的令牌未被取走前最大允许同时有 30 个请求进入)。桶容量一般设置为速率的 0.6~1.5 倍之间。

var (
	userLimiters  = NewLimiters("user", 10, 15)   // 用户限流
	tokenLimiters = NewLimiters("token", 20, 30) // api 限流
)

func NewLimiters(_type string, r float64, size int) *Limiters {
	return &Limiters{
		_type: _type,
		r:     rate.Limit(r),
		b:     size,
		l:     sync.RWMutex{},
		m:     make(map[int64]*rate.Limiter),
	}
}

type Limiters struct {
	_type string     // 限流类型
	r     rate.Limit // 每秒补充令牌速度
	b     int        // 令牌桶大小

	l sync.RWMutex
	m map[int64]*rate.Limiter
}

// GetOrSet 获取(或创建并获取)对应 id 的限流器
func (l *Limiters) GetOrSet(id int64) (rl *rate.Limiter) {
	var ok bool
	l.l.RLock()
	rl, ok = l.m[id]
	if ok {
		l.l.RUnlock()
		return
	}
	l.l.RUnlock()

	l.l.Lock()
	defer l.l.Unlock()
	rl, ok = l.m[id]
	if ok {
		return
	}
	rl = rate.NewLimiter(l.r, l.b)
	l.m[id] = rl
	return
}

// Allow 允许当前 id 通过
func (l *Limiters) Allow(id int64) (ok bool) {
	return l.GetOrSet(id).Allow()
}

// UserRateLimiter 用户请求限流
func UserRateLimiter(c *gin.Context) {
	var ok bool
	ur, uErr := serviceUser.GetCurrentUser(c)
	if uErr == nil {
		ok = userLimiters.Allow(ur.Id)
	} else {
		ok = userLimiters.Allow(0)
	}
	if ok {
		c.Next()
		return
	}
	resp.AbortWithError(c, http.StatusTooManyRequests, fmt.Errorf("用户 %d 请求超限,请稍候再试", ur.Id))
}

// TokenRateLimiter api 请求限流
func TokenRateLimiter(c *gin.Context) {
	var ok bool
	token, tErr := serviceToken.GetCurrentToken(c)
	if tErr == nil {
		ok = tokenLimiters.Allow(token.Id)
	} else {
		ok = tokenLimiters.Allow(0)
	}
	if ok {
		c.Next()
		return
	}
	resp.AbortWithError(c, http.StatusTooManyRequests, fmt.Errorf("api%d 请求超限,请稍候再试", token.Id))
}

当用户的请求进入时,首先请求会通过注册的 Gin 中间件调用UserRateLimiter方法,该方法从请求中获取鉴权得到的用户 id,使用该 id 从 map 中获取对应的令牌桶。当令牌桶中还存有令牌时,允许该请求通过;没有剩余的令牌时,拒绝该请求,并返回429 请求过多给对应的请求者。令牌桶会按照设定的速率(20 / s)按单个数量恢复可用的令牌直至桶的容量(30)恢复。

下面介绍一下令牌桶基础的实现:

type TokenBucket struct {
	rate       float64
	capacity   float64
	tokens     float64
	lastFilled time.Time
	mutex      sync.Mutex
}

func NewTokenBucket(rate float64, capacity float64) *TokenBucket {
	return &TokenBucket{
		rate:       rate,       // 恢复令牌速率
		capacity:   capacity,   // 桶容量
		tokens:     capacity,   // 可用令牌数量
		lastFilled: time.Now(), // 最后填充时间
	}
}

func (tb *TokenBucket) fillTokens() {
	now := time.Now()
	delta := now.Sub(tb.lastFilled).Seconds() // 获取当前时间和上一次填充时间差值
	tb.tokens = tb.tokens + tb.rate*delta     // 使用插值秒和速率相乘得到可恢复的数量
	if tb.tokens > tb.capacity {              // 如果桶满则不恢复
		tb.tokens = tb.capacity
	}
	tb.lastFilled = now
}

func (tb *TokenBucket) Allow() (ok bool) {
	tb.mutex.Lock()
	defer tb.mutex.Unlock()

	tb.fillTokens() // 首先恢复令牌

	if tb.tokens >= 1 {
		tb.tokens = tb.tokens - 1
		return true
	}
	return
}

var tb = NewTokenBucket(10, 20)

通过调用NewTokenBucket方法,我们设置了令牌恢复速率为 10,容量为 20。新的请求获取令牌时会调用Allow方法,该方法首先会根据时间差值恢复令牌。恢复完成后,尝试从桶中取出令牌,当剩余令牌大于 1 时,令牌数量减一,并返回取出成功;令牌不足时返回失败。

此外,在查询相关资料的时候我有看到在一些高流量的情况下会需要进一步地调整流量使限制值和实际的速率匹配,如果每个请求只消耗一个令牌的话,可能最终的速率(250K/s)会和期望的值(300K/s)有偏差。因此,我们可以设定单个请求消耗的令牌为多个(1000),而令牌桶对应的数值也为原来的 1000 倍,在注意数值放大后可能会溢出的前提下,可以对限制的精度进行进一步的提高。

测试 #

这里我们使用apache benchmark进行测试,测试命令如下

ab -n 100 -c 10 -H 'token:xxx' http://yyy:5001/token/v1/proxy/services/micro_idcard_service/idVerify?id=zzz

其中-n表示测试次数,-c表示同时请求数量,返回结果如下

可以看到在没有其他请求的情况下(即令牌桶满),发送成功的请求数量为 33 个,相当于是 桶容量(30)+测试周期内恢复的令牌数量(3),符合设定的限流。

其他限流方法优缺点介绍 #

其他常用的限流方法有 计数器,滑动窗口和漏桶,上面主要对当前系统中使用的令牌桶限流方式的原理进行了说明,下面再简单介绍一下其他限流方式的优缺点。

  1. 计数器

计数器的算法实现是给一段时间内的请求数设定一个阈值,丢弃所有超过阈值的请求,并在下一个时间段开始时重置计数值。这个方法又叫固定窗口。

type Counter struct {
	rate  int           // 计数周期内最多允许的请求数
	begin time.Time     // 计数开始时间
	cycle time.Duration // 计数周期
	count int           // 计数周期内累计收到的请求数
	lock  sync.Mutex
}

func (l *Counter) Allow() bool {
	l.lock.Lock()
	defer l.lock.Unlock()

	if l.count == l.rate-1 {
		now := time.Now()
		if now.Sub(l.begin) >= l.cycle {
			//速度允许范围内, 重置计数器
			l.Reset(now)
			return true
		} else {
			return false
		}
	} else {
		//没有达到速率限制,计数加1
		l.count++
		return true
	}
}

func (l *Counter) Set(r int, cycle time.Duration) {
	l.rate = r
	l.begin = time.Now()
	l.cycle = cycle
	l.count = 0
}

func (l *Counter) Reset(t time.Time) {
	l.begin = t
	l.count = 0
}

func main() {
	var lr Counter
	lr.Set(3, time.Second) // 1s内最多请求3次

	var wg sync.WaitGroup
	wg.Add(10)
	for i := 0; i < 10; i++ {
		go func(i int) {
			if lr.Allow() {
				log.Println("ok:", i)
			} else {
				log.Println("fail:", i)
			}
			wg.Done()
		}(i)
		time.Sleep(200 * time.Millisecond)
	}
	wg.Wait()
}

运行结果如下,超过 3 次 /s 的调用返回了失败:

由于计数器重置的逻辑,例如计数器每 10 s 重置一次,从 50 秒开始计数,当时间快要到达下一分钟时,用户发送数量等于阈值的请求,并在接下来的计数器重置后,又发送了数量等于阈值的请求,那么在这个瞬间系统接收到了 2 倍于阈值的请求,和实际上设定的限制不符,可能会导致临界点时系统过载。

  1. 滑动窗口

相比较于计数器的方法,滑动窗口将时间段花费为了更加小的时间块,因此可以解决计数器在重置计数的临界时遇到的超量的问题。但由于每个小窗口的划分大小影响实际计数的精度,因此精度要求越高,对空间占用越大,同时滑动窗口仅支持少量的流量徒增的情况。当不进行小窗口划分时,滑动窗口方法也就变成了计数器(固定窗口)。

  1. 漏桶

漏桶和令牌桶一样都带有桶字,形象地来讲,两者都隐含着缓存了部分内容的含义。漏桶保证请求处理时是按照匀速处理,对接收请求时超过桶容量的丢弃。他具备了一定的缓存能力,但是由于处理速度始终是匀速的,对于突然较多的请求接入时,会出现系统处理速率与实际可承受的情况不匹配,在桶满之后,其他请求依然会被丢弃。

总结 #

在系统中引入限流的过程中,我们对令牌桶的实现和具体的应用方式有了更多的了解。令牌桶允许一定程度的突发流量的特点相较于其他方法可以较为准确的对流量进行控制。同时,令牌桶本身还支持动态增减速率和桶容量。在需要更精细的控制的情况下,还可以使单个请求消耗的令牌数量为多个,即消耗的速率和恢复的速率以及桶容量都为原来的多倍,从而增加精度。