龙空技术网

Client-go客户端源码解析——WorkQueue工作队列

队长100 157

前言:

如今同学们对“操作系统银行家算法work怎么算”可能比较珍视,你们都想要知道一些“操作系统银行家算法work怎么算”的相关资讯。那么小编同时在网络上收集了一些关于“操作系统银行家算法work怎么算””的相关资讯,希望大家能喜欢,兄弟们快快来学习一下吧!

在我们上一遍文章《Client-go客户端源码解析——Controller总体流程

》提供的示例代码中(我们可以称它为一个比较简陋的自定义控制器),我们将接受到的事件(资源对象)直接打印出来了,并没有经过任何处理。但是在正常的业务需求中,我们需要根据接收到的事件类型,最资源对象做各种各样的负责的计算和处理动作。所以在生产用的自定义控制器中,我们的EventHandler方法中接收到事件之后往往不会马上处理(或者只是简单地处理下数据),而是将事件资源对象的Key先放保存至一个队列,然后由自定义控制器提前启动好的多个gorouine并发的消费这个队列的数据,这样以来不仅可以提高自定义控制器的吞吐量,还可以利用队列的特性来达到限速事件消费的目的,最终使得自定义控制既稳定又高性能。

这个队列,我们一般会使用Kubernetes官方提高的WorkQueue,如官方github的sample-controller里面就使用了WorkQueue()。我们接下来对WorkQueue的核心代码做个分析。

WorkQueue支持3种队列,并提供了3种接口。

最基本的FIFO队列,支持提供了队列的基本操作方法,如:

//基本的FIFO队列接口定义type Interface interface {	Add(item interface{})	Len() int	Get() (item interface{}, shutdown bool) //获取队列头部的元素	Done(item interface{}) //标记该元素已被处理	ShutDown() //关闭队列	ShuttingDown() bool //队列是否正在关闭} //数据结构定义,实现了Interface的方法type Type struct {	// queue defines the order in which we will work on items. Every	// element of queue should be in the dirty set and not in the	// processing set.	queue []t	// dirty defines all of the items that need to be processed.	dirty set	// Things that are currently being processed are in the processing set.	// These things may be simultaneously in the dirty set. When we finish	// processing something and remove it from this set, we'll check if	// it's in the dirty set, and if so, add it to the queue.	processing set	cond *sync.Cond	shuttingDown bool	metrics queueMetrics //监控指标相关的,用于Prometheus监控	unfinishedWorkUpdatePeriod time.Duration	clock                      clock.Clock}
queue 实际存储元素的地方dirty 类型是set(使用Map的key来实现的,确保唯一),能保证去重;同时也保证了在并发情况下,一个元素在被处理之前买,哪怕被添加了多次,也只会被处理一次processing 用于标记一个元素正在被处理

workqueue

基础FIFO队列核心源码

// Add marks item as needing processing.func (q *Type) Add(item interface{}) {	q.cond.L.Lock()	defer q.cond.L.Unlock()	if q.shuttingDown {		return	}	if q.dirty.has(item) { //在dirty中如果存在该元素,就直接返回;确保了元素在队列中的唯一性		return	}	q.metrics.add(item)	q.dirty.insert(item) //先将元素插入dirty中	if q.processing.has(item) { //如果该元素正在被处理,则返回		return	}	q.queue = append(q.queue, item) //将元素加入队列尾部,等待消费	q.cond.Signal() //唤醒一个消费者goroutine去队列中消费元素(调用Get方法)}......func (q *Type) Get() (item interface{}, shutdown bool) {	q.cond.L.Lock()	defer q.cond.L.Unlock()	for len(q.queue) == 0 && !q.shuttingDown {		q.cond.Wait() //如果队列为空并且没有处于关闭中,则阻塞,等待被唤醒(调用了Add方法和ShutDown方法Done方法都会唤醒该阻塞)	}	if len(q.queue) == 0 {//如果是上面的阻塞被唤醒了,但是队列长度还是0,则表示该队列被关闭		// We must be shutting down.		return nil, true 	}	item, q.queue = q.queue[0], q.queue[1:] //从队列头部取一个元素	q.metrics.get(item)	q.processing.insert(item)//标记该元素正在被处理	q.dirty.delete(item) //正在处理的元素从dirty中删除	return item, false}// 表示某个元素被处理完成func (q *Type) Done(item interface{}) {	q.cond.L.Lock()	defer q.cond.L.Unlock()	q.metrics.done(item)	q.processing.delete(item) //从processing中移除该元素	if q.dirty.has(item) { //如果dirty中还有一个相同的元素存在,说明在该元素被处理的时候,又加入了相同的元素进来		q.queue = append(q.queue, item)//此时需要将元素添加至Queue中,等待被消费    q.cond.Signal() //唤醒消费goroutine(那些调用了Get方法而阻塞的goroutine,只会被唤醒一个)	}}//关闭WorkQueuefunc (q *Type) ShutDown() {	q.cond.L.Lock()	defer q.cond.L.Unlock()	q.shuttingDown = true  q.cond.Broadcast() //唤醒所有的消费者goroutine,让它们安全退出(哪些调用了Get方法而阻塞的goroutine)}
延迟队列

延迟队列,基于FIFO队列接口封装,延迟一段时间后再将元素插入FIFO队列,主要在原有的功能上增加了AddAfter方法。

type DelayingInterface interface {	Interface	// AddAfter adds an item to the workqueue after the indicated duration has passed	AddAfter(item interface{}, duration time.Duration)}type delayingType struct {	Interface	// clock tracks time for delayed firing	clock clock.Clock	// stopCh lets us signal a shutdown to the waiting loop	stopCh chan struct{}	// heartbeat ensures we wait no more than maxWait before firing	heartbeat clock.Ticker	// waitingForAddCh is a buffered channel that feeds waitingForAdd	waitingForAddCh chan *waitFor //初始化延迟队列的时候,channel长度为1000	// metrics counts the number of retries	metrics           retryMetrics	deprecatedMetrics retryMetrics}

延迟队列运行的原理

延迟队列

核心代码

// AddAfter adds the given item to the work queue after the given delayfunc (q *delayingType) AddAfter(item interface{}, duration time.Duration) {	// don't add if we're already shutting down	if q.ShuttingDown() {		return	}	q.metrics.retry()	q.deprecatedMetrics.retry()	// immediately add things with no delay	if duration <= 0 {		q.Add(item)		return	}	select {	case <-q.stopCh:		// unblock if ShutDown() is called	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:	}}.../ waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.func (q *delayingType) waitingLoop() {	defer utilruntime.HandleCrash()	// Make a placeholder channel to use when there are no items in our list	never := make(<-chan time.Time)	waitingForQueue := &waitForPriorityQueue{}	heap.Init(waitingForQueue)	waitingEntryByData := map[t]*waitFor{}	for {		if q.Interface.ShuttingDown() {			return		}		now := q.clock.Now()		// Add ready entries		for waitingForQueue.Len() > 0 {			entry := waitingForQueue.Peek().(*waitFor)			if entry.readyAt.After(now) {				break			}			entry = heap.Pop(waitingForQueue).(*waitFor)			q.Add(entry.data)			delete(waitingEntryByData, entry.data)		}		// Set up a wait for the first item's readyAt (if one exists)		nextReadyAt := never		if waitingForQueue.Len() > 0 {			entry := waitingForQueue.Peek().(*waitFor)			nextReadyAt = q.clock.After(entry.readyAt.Sub(now))		}		select {		case <-q.stopCh:			return		case <-q.heartbeat.C():			// continue the loop, which will add ready items		case <-nextReadyAt:			// continue the loop, which will add ready items		case waitEntry := <-q.waitingForAddCh:			if waitEntry.readyAt.After(q.clock.Now()) {				insert(waitingForQueue, waitingEntryByData, waitEntry)			} else {				q.Add(waitEntry.data)			}			drained := false			for !drained {				select {				case waitEntry := <-q.waitingForAddCh:					if waitEntry.readyAt.After(q.clock.Now()) {						insert(waitingForQueue, waitingEntryByData, waitEntry)					} else {						q.Add(waitEntry.data)					}				default:					drained = true				}			}		}	}}
限速队列

限速队列是基于延迟队列和FIFO队列接口封装的,限速队列的接口:

type RateLimitingInterface interface {	DelayingInterface //延迟队列	// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok	AddRateLimited(item interface{}) //想队列对添加元素	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you	// still have to call `Done` on the queue.	Forget(item interface{})//当某个元素处理完成之后,调用Forget,清空元素的排队数,调用具体限速算法的同名方法	// NumRequeues returns back how many times the item was requeued	NumRequeues(item interface{}) int //获取指定元素的排队数,调用具体限速算法的同名方法}

限速队列的数据结构:

// rateLimitingType wraps an Interface and provides rateLimited re-enquingtype rateLimitingType struct {   DelayingInterface   rateLimiter RateLimiter //限速队列的接口定义是比较简单的,限速队列的重点就在于其提供的几种不同限速算法}

限速队列的原理,就是利用了延迟队列的特性,延迟某个元素的插入时间,达到限速的目的。RateLimiter接口定义如下:

type RateLimiter interface {   // When gets an item and gets to decide how long that item should wait   When(item interface{}) time.Duration //获取指定元素插入队列前应该等待的时间   // Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing   // or for success, we'll stop tracking it   Forget(item interface{})//释放指定元素,清空该元素的排队数   // NumRequeues returns back how many failures the item has had   NumRequeues(item interface{}) int//返回指定元素的排队数}

Workqueue提供了4种限速算法:

令牌桶算法(BucketRateLimiter): 以固定的速率往桶里填充Token,直到填满为止(多余的会被丢弃);每个元素都会从桶里获取一个Token,只有拿到Token的才允许通过,否则该元素处理等待Token的状态;以此达到限速的目的。排队指数算法(ItemExponentialFailureRateLimiter): 将相同元素排队数作为指数,排队数增大,速率限制呈指数增长,但最大不会超过maxDelay。计数器算法(ItemFastFlowRateLimiter):限制一段时间内允许通过的元素数量。混合模式: 将多种限速算法混合使用。

我们主要看下排队指数算法下,如何添加元素到队列中。

// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok//添加元素到队列中//首先要先获取某个元素需要等待的时间,q.rateLimiter.When(item) //然后调用延迟队列的Addfter添加元素到队列func (q *rateLimitingType) AddRateLimited(item interface{}) {   q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))}//排队指数算法的 When方法func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {	r.failuresLock.Lock()	defer r.failuresLock.Unlock()	exp := r.failures[item] //获取指定元素的排队数	r.failures[item] = r.failures[item] + 1	// The backoff is capped such that 'calculated' value never overflows.	backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp)) //指数计算需要等待的时间	if backoff > math.MaxInt64 {		return r.maxDelay	}	calculated := time.Duration(backoff)	if calculated > r.maxDelay {		return r.maxDelay //确保等待时间不会大于maxDelay,maxDelay为1000s	}	return calculated}

调用q.rateLimiter.When(item)拿到所需等待的时间后,就会执行延迟队列的AddAfter方法将元素添加进队列,后续具体的添加动作就是延迟队列的相关操作。

标签: #操作系统银行家算法work怎么算