client-go 中指数退避 backoff
Lapin Gris Lv3

指数退避算法

退避简单来说,就是客户端的重试策略。常用的退避策略有,

  • 固定退避,特点是每次重试间隔固定,简单可预测,大量客户端同时重试,可能造成更大的重试风暴。
  • 线性退避,特点是重试间隔线性增加固定,比如间隔为 1,2,3,4,5…
  • 指数退避,特点是重试间隔以指数倍增长,重试间隔增长快,比如 1,2,4,8,16…

指数退避的核心思想有两点,

  • 重试间隔以指数倍增长,减少无效请求减少服务端瞬时压力
  • 添加抖动 jitter,避免多个客户端同一时间同时重试,避免重试风暴(惊群)

基础公式

其中 初始的延迟时间, 是指数因子,通常取
代表当前的重试次数。 代表下轮重试前的延迟时间。

带上限的指数退避

指数退避延迟时间是指数级增长,重试越多延迟时间也越长。为了避免延迟时间过长,通常会添加上限 cap。到达上限后,延迟时间变为固定值。

全抖动 Full Jitter

基础的退避算法,拉长了重试间隔, 但大量客户端依然会在同一时间集中重试(重试风暴)。因此,为退避算法引入了抖动 jitter。避免大量客户端在同一时间点集中重试。

全抖动(Full Jitter)抖动的取值范围 ,最大化随机性,彻底打散请求。

等量抖动 Equal Jitter

等量抖动 (Equal Jitter) 抖动取值范围为 ,一半固定一半随机,请求比全抖动更加集中,也会导致无效重试次数增加。

去相关抖动 Decorrelated Jitter

使用上一次延迟 来生成当前延迟,

并更新:

通常初始化:

不同退避算法效果模拟

AWS 网站上有一篇关于不同退避算法在重试次数和完成时间的模拟,每轮只有一个客户端可以成功
https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/

重试次数(越少越好)

分别对比不同算法在重试次数表现,可以发现什么也不做的重试次数是增长最快的,而全抖动和等量抖动的重试次数几乎相同。
重试次数(越少越好)

完成时间(越短越好)

分别对比不同算法在完成时间表现,可以发现什么也不做的完成时间是最快的,全抖动和去相关抖动的完成时间接近,等量抖动的完成时间是最长的。
完成时间(越短越好)

kubelet中如何使用 backoff

在 Kubernetes 的 client-go 库中,flowcontrol 包实现了一个基于键(key)的指数退避机制。
其中,key 是一个稳定的实体标识符,例如 pod 的 UID、特定容器名称或镜像 digest,用于记录不同实体的重试状态,

稳定的实体标识符

容器, 使用 PODNAME_PODNAMESPACE_PODUID_CONTAINERNAME_CONTAINERHASH 作为容器的稳定标识符

func GetStableKey(pod *v1.Pod, container *v1.Container) string {
hash := strconv.FormatUint(kubecontainer.HashContainer(container), 16)
return fmt.Sprintf("%s_%s_%s_%s_%s", pod.Name, pod.Namespace, string(pod.UID), container.Name, hash)
}

镜像, 使用 PODUID_IMGREF 作为镜像的稳定标识符(imgref 示例 quay.io/cilium/cilium:v1.18.1)

backOffKey := fmt.Sprintf("%s_%s", pod.UID, imgRef)

典型用法, 在 kubelet 中分别初始化容器和镜像两个 backoff 管理器实现管理容器和镜像重试

klet.backOff = flowcontrol.NewBackOff(base, boMax) // 10s 5m0s
imageBackOff := flowcontrol.NewBackOff(imageBackOffPeriod, MaxImageBackOff) // 10s 5m0s

SyncPod

如果在下次 syncpod 时候,依然处于 backoff 窗口内,会出现 CrashLoopBackOff/ImagePullBackOff 的 backoff 错误提示。

func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
// ...
sctx := context.WithoutCancel(ctx)
result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
for _, r := range result.SyncResults {
if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff { // rashLoopBackOff ImagePullBackOff
return false, err
}
}

return false, nil
}

return false, nil
}

源码分析

client-go 中实现了 backoff 思想和理论的 backoff 算法相同的,但是具体细节略有差异,比如,

  • 上限时间设置为参数(max)的 2 倍
  • 到达上限时间后,重置 backoff,而不是退化为固定间隔退避
  • jitter 抖动不是固定模式,可通过参数实现抖动放大或者缩小

Backoff

client-go backoff 实现在 staging/src/k8s.io/client-go/util/flowcontrol/backoff.go

package flowcontrol

type backoffEntry struct {
backoff time.Duration // 延迟时间
lastUpdate time.Time // 上一次更新 backoff 时间
}

type Backoff struct {
sync.RWMutex
Clock clock.Clock
HasExpiredFunc func(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool // 目前未使用
defaultDuration time.Duration // base delay
maxDuration time.Duration // delay 上限
perItemBackoff map[string]*backoffEntry // 保存不同实体的 backoff 信息
rand *rand.Rand // jitter 生成抖动相关
maxJitterFactor float64 // jitter 生成抖动相关
}

NewBackOff & NewBackOffWithJitter

构造 BackOff 实例提供了两个方法,其中 NewBackOff 为不支持抖动,NewBackOffWithJitter 支持抖动

func NewBackOff(initial, max time.Duration) *Backoff {
return NewBackOffWithJitter(initial, max, 0.0)
}

func NewBackOffWithJitter(initial, max time.Duration, maxJitterFactor float64) *Backoff {
clock := clock.RealClock{}
return newBackoff(clock, initial, max, maxJitterFactor)
}
func newBackoff(clock clock.Clock, initial, max time.Duration, maxJitterFactor float64) *Backoff {
var random *rand.Rand
if maxJitterFactor > 0 {
random = rand.New(rand.NewSource(clock.Now().UnixNano()))
}
return &Backoff{
perItemBackoff: map[string]*backoffEntry{},
Clock: clock,
defaultDuration: initial,
maxDuration: max,
maxJitterFactor: maxJitterFactor,
rand: random,
}
}

Next

设置 key 下一轮的延迟时间 (backoff)

// move backoff to the next mark, capping at maxDuration
func (p *Backoff) Next(id string, eventTime time.Time) {
p.Lock()
defer p.Unlock()
entry, ok := p.perItemBackoff[id]
if !ok || p.hasExpired(eventTime, entry.lastUpdate, p.maxDuration) { // 初次加入 or 延迟时间到达上限
entry = p.initEntryUnsafe(id) // 重新初始化 backoff
entry.backoff += p.jitter(entry.backoff). // 添加抖动 jitter
} else {
delay := entry.backoff * 2 // backoff 指数增长
delay += p.jitter(entry.backoff) // 添加抖动 jitter
entry.backoff = min(delay, p.maxDuration) // 取 backoff 和上限的较小值
}
entry.lastUpdate = p.Clock.Now() // 更新 backoff update时间戳
}

jitter 是为 backoff 添加抖动,当未传递 maxJitterFactor 时候, 代表不添加抖动。

抖动的取值范围为 [0, maxJitterFactor * delay],

  • maxJitterFactor 取值为 0,为基本指数退避
  • maxJitterFactor 为 0.5,为等量抖动
  • maxJitterFactor 为 1,为全抖动
  • maxJitterFactor > 1,会放大抖动,抖动出现在更大的范围内

// Take a lock on *Backoff, before calling initEntryUnsafe
func (p *Backoff) initEntryUnsafe(id string) *backoffEntry {
entry := &backoffEntry{backoff: p.defaultDuration} // 设置初始延迟时间 backoff
p.perItemBackoff[id] = entry
return entry
}

func (p *Backoff) jitter(delay time.Duration) time.Duration {
if p.rand == nil { // 未开启抖动 jitter
return 0
}

return time.Duration(p.rand.Float64() * p.maxJitterFactor * float64(delay))
}

IsInBackOffSince & IsInBackOffSinceUpdate

判定当前是否还在 backoff 窗口内,提供了两种比较方式,

  • IsInBackOffSince 提供了到当前时间的判定
  • IsInBackOffSinceUpdate 提供了到上次 backoff 时间的判定

// Returns True if the elapsed time since eventTime is smaller than the current backoff window
func (p *Backoff) IsInBackOffSince(id string, eventTime time.Time) bool {
p.RLock()
defer p.RUnlock()
entry, ok := p.perItemBackoff[id]
if !ok {
return false
}
if p.hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
return false
}
return p.Clock.Since(eventTime) < entry.backoff
}

// Returns True if time since lastupdate is less than the current backoff window.
func (p *Backoff) IsInBackOffSinceUpdate(id string, eventTime time.Time) bool {
p.RLock()
defer p.RUnlock()
entry, ok := p.perItemBackoff[id]
if !ok {
return false
}
if p.hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
return false
}
return eventTime.Sub(entry.lastUpdate) < entry.backoff
}

hasExpired 检查当前到达延迟时间的上限,

  • 默认的检查是,距离上次 backoff 时间超过 2 倍 cap
  • 提供了一个自定义 HasExpiredFunc,可以自定义实现,目前没有接口可以传递参数…
// Unless an alternate function is provided, after 2*maxDuration we restart the backoff factor to the beginning
func (p *Backoff) hasExpired(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool {
if p.HasExpiredFunc != nil {
return p.HasExpiredFunc(eventTime, lastUpdate, maxDuration)
}
return eventTime.Sub(lastUpdate) > maxDuration*2 // consider stable if it's ok for twice the maxDuration
}

其他

Reset

将 key 从 map 中删除,实现重置key 的 backoff

// Reset forces clearing of all backoff data for a given key.
func (p *Backoff) Reset(id string) {
p.Lock()
defer p.Unlock()
delete(p.perItemBackoff, id)
}

GC

定期清理已经过期的 Expired 对象。

// Garbage collect records that have aged past their expiration, which defaults
// to 2*maxDuration (see hasExpired godoc). Backoff users are expected to invoke
// this periodically.
func (p *Backoff) GC() {
p.Lock()
defer p.Unlock()
now := p.Clock.Now()
for id, entry := range p.perItemBackoff {
if p.hasExpired(now, entry.lastUpdate, p.maxDuration) {
delete(p.perItemBackoff, id)
}
}
}

Get

查询 key 的延迟时间(backoff)

// Get the current backoff Duration
func (p *Backoff) Get(id string) time.Duration {
p.RLock()
defer p.RUnlock()
var delay time.Duration
entry, ok := p.perItemBackoff[id]
if ok {
delay = entry.backoff
}
return delay
}

DeleteEntry

从 map 中删除 key 对应的 backoff。

func (p *Backoff) DeleteEntry(id string) {
p.Lock()
defer p.Unlock()
delete(p.perItemBackoff, id)
}