前言:
如今同学们对“操作系统银行家算法work怎么算”可能比较珍视,你们都想要知道一些“操作系统银行家算法work怎么算”的相关资讯。那么小编同时在网络上收集了一些关于“操作系统银行家算法work怎么算””的相关资讯,希望大家能喜欢,兄弟们快快来学习一下吧!在我们上一遍文章《Client-go客户端源码解析——Controller总体流程
》提供的示例代码中(我们可以称它为一个比较简陋的自定义控制器),我们将接受到的事件(资源对象)直接打印出来了,并没有经过任何处理。但是在正常的业务需求中,我们需要根据接收到的事件类型,最资源对象做各种各样的负责的计算和处理动作。所以在生产用的自定义控制器中,我们的EventHandler方法中接收到事件之后往往不会马上处理(或者只是简单地处理下数据),而是将事件资源对象的Key先放保存至一个队列,然后由自定义控制器提前启动好的多个gorouine并发的消费这个队列的数据,这样以来不仅可以提高自定义控制器的吞吐量,还可以利用队列的特性来达到限速事件消费的目的,最终使得自定义控制既稳定又高性能。
这个队列,我们一般会使用Kubernetes官方提高的WorkQueue,如官方github的sample-controller里面就使用了WorkQueue()。我们接下来对WorkQueue的核心代码做个分析。
WorkQueue支持3种队列,并提供了3种接口。
最基本的FIFO队列,支持提供了队列的基本操作方法,如:
//基本的FIFO队列接口定义type Interface interface { Add(item interface{}) Len() int Get() (item interface{}, shutdown bool) //获取队列头部的元素 Done(item interface{}) //标记该元素已被处理 ShutDown() //关闭队列 ShuttingDown() bool //队列是否正在关闭} //数据结构定义,实现了Interface的方法type Type struct { // queue defines the order in which we will work on items. Every // element of queue should be in the dirty set and not in the // processing set. queue []t // dirty defines all of the items that need to be processed. dirty set // Things that are currently being processed are in the processing set. // These things may be simultaneously in the dirty set. When we finish // processing something and remove it from this set, we'll check if // it's in the dirty set, and if so, add it to the queue. processing set cond *sync.Cond shuttingDown bool metrics queueMetrics //监控指标相关的,用于Prometheus监控 unfinishedWorkUpdatePeriod time.Duration clock clock.Clock}queue 实际存储元素的地方dirty 类型是set(使用Map的key来实现的,确保唯一),能保证去重;同时也保证了在并发情况下,一个元素在被处理之前买,哪怕被添加了多次,也只会被处理一次processing 用于标记一个元素正在被处理
基础FIFO队列核心源码
// Add marks item as needing processing.func (q *Type) Add(item interface{}) { q.cond.L.Lock() defer q.cond.L.Unlock() if q.shuttingDown { return } if q.dirty.has(item) { //在dirty中如果存在该元素,就直接返回;确保了元素在队列中的唯一性 return } q.metrics.add(item) q.dirty.insert(item) //先将元素插入dirty中 if q.processing.has(item) { //如果该元素正在被处理,则返回 return } q.queue = append(q.queue, item) //将元素加入队列尾部,等待消费 q.cond.Signal() //唤醒一个消费者goroutine去队列中消费元素(调用Get方法)}......func (q *Type) Get() (item interface{}, shutdown bool) { q.cond.L.Lock() defer q.cond.L.Unlock() for len(q.queue) == 0 && !q.shuttingDown { q.cond.Wait() //如果队列为空并且没有处于关闭中,则阻塞,等待被唤醒(调用了Add方法和ShutDown方法Done方法都会唤醒该阻塞) } if len(q.queue) == 0 {//如果是上面的阻塞被唤醒了,但是队列长度还是0,则表示该队列被关闭 // We must be shutting down. return nil, true } item, q.queue = q.queue[0], q.queue[1:] //从队列头部取一个元素 q.metrics.get(item) q.processing.insert(item)//标记该元素正在被处理 q.dirty.delete(item) //正在处理的元素从dirty中删除 return item, false}// 表示某个元素被处理完成func (q *Type) Done(item interface{}) { q.cond.L.Lock() defer q.cond.L.Unlock() q.metrics.done(item) q.processing.delete(item) //从processing中移除该元素 if q.dirty.has(item) { //如果dirty中还有一个相同的元素存在,说明在该元素被处理的时候,又加入了相同的元素进来 q.queue = append(q.queue, item)//此时需要将元素添加至Queue中,等待被消费 q.cond.Signal() //唤醒消费goroutine(那些调用了Get方法而阻塞的goroutine,只会被唤醒一个) }}//关闭WorkQueuefunc (q *Type) ShutDown() { q.cond.L.Lock() defer q.cond.L.Unlock() q.shuttingDown = true q.cond.Broadcast() //唤醒所有的消费者goroutine,让它们安全退出(哪些调用了Get方法而阻塞的goroutine)}延迟队列
延迟队列,基于FIFO队列接口封装,延迟一段时间后再将元素插入FIFO队列,主要在原有的功能上增加了AddAfter方法。
type DelayingInterface interface { Interface // AddAfter adds an item to the workqueue after the indicated duration has passed AddAfter(item interface{}, duration time.Duration)}type delayingType struct { Interface // clock tracks time for delayed firing clock clock.Clock // stopCh lets us signal a shutdown to the waiting loop stopCh chan struct{} // heartbeat ensures we wait no more than maxWait before firing heartbeat clock.Ticker // waitingForAddCh is a buffered channel that feeds waitingForAdd waitingForAddCh chan *waitFor //初始化延迟队列的时候,channel长度为1000 // metrics counts the number of retries metrics retryMetrics deprecatedMetrics retryMetrics}
延迟队列运行的原理
核心代码
// AddAfter adds the given item to the work queue after the given delayfunc (q *delayingType) AddAfter(item interface{}, duration time.Duration) { // don't add if we're already shutting down if q.ShuttingDown() { return } q.metrics.retry() q.deprecatedMetrics.retry() // immediately add things with no delay if duration <= 0 { q.Add(item) return } select { case <-q.stopCh: // unblock if ShutDown() is called case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: }}.../ waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.func (q *delayingType) waitingLoop() { defer utilruntime.HandleCrash() // Make a placeholder channel to use when there are no items in our list never := make(<-chan time.Time) waitingForQueue := &waitForPriorityQueue{} heap.Init(waitingForQueue) waitingEntryByData := map[t]*waitFor{} for { if q.Interface.ShuttingDown() { return } now := q.clock.Now() // Add ready entries for waitingForQueue.Len() > 0 { entry := waitingForQueue.Peek().(*waitFor) if entry.readyAt.After(now) { break } entry = heap.Pop(waitingForQueue).(*waitFor) q.Add(entry.data) delete(waitingEntryByData, entry.data) } // Set up a wait for the first item's readyAt (if one exists) nextReadyAt := never if waitingForQueue.Len() > 0 { entry := waitingForQueue.Peek().(*waitFor) nextReadyAt = q.clock.After(entry.readyAt.Sub(now)) } select { case <-q.stopCh: return case <-q.heartbeat.C(): // continue the loop, which will add ready items case <-nextReadyAt: // continue the loop, which will add ready items case waitEntry := <-q.waitingForAddCh: if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { q.Add(waitEntry.data) } drained := false for !drained { select { case waitEntry := <-q.waitingForAddCh: if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { q.Add(waitEntry.data) } default: drained = true } } } }}限速队列
限速队列是基于延迟队列和FIFO队列接口封装的,限速队列的接口:
type RateLimitingInterface interface { DelayingInterface //延迟队列 // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok AddRateLimited(item interface{}) //想队列对添加元素 // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you // still have to call `Done` on the queue. Forget(item interface{})//当某个元素处理完成之后,调用Forget,清空元素的排队数,调用具体限速算法的同名方法 // NumRequeues returns back how many times the item was requeued NumRequeues(item interface{}) int //获取指定元素的排队数,调用具体限速算法的同名方法}
限速队列的数据结构:
// rateLimitingType wraps an Interface and provides rateLimited re-enquingtype rateLimitingType struct { DelayingInterface rateLimiter RateLimiter //限速队列的接口定义是比较简单的,限速队列的重点就在于其提供的几种不同限速算法}
限速队列的原理,就是利用了延迟队列的特性,延迟某个元素的插入时间,达到限速的目的。RateLimiter接口定义如下:
type RateLimiter interface { // When gets an item and gets to decide how long that item should wait When(item interface{}) time.Duration //获取指定元素插入队列前应该等待的时间 // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing // or for success, we'll stop tracking it Forget(item interface{})//释放指定元素,清空该元素的排队数 // NumRequeues returns back how many failures the item has had NumRequeues(item interface{}) int//返回指定元素的排队数}
Workqueue提供了4种限速算法:
令牌桶算法(BucketRateLimiter): 以固定的速率往桶里填充Token,直到填满为止(多余的会被丢弃);每个元素都会从桶里获取一个Token,只有拿到Token的才允许通过,否则该元素处理等待Token的状态;以此达到限速的目的。排队指数算法(ItemExponentialFailureRateLimiter): 将相同元素排队数作为指数,排队数增大,速率限制呈指数增长,但最大不会超过maxDelay。计数器算法(ItemFastFlowRateLimiter):限制一段时间内允许通过的元素数量。混合模式: 将多种限速算法混合使用。
我们主要看下排队指数算法下,如何添加元素到队列中。
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok//添加元素到队列中//首先要先获取某个元素需要等待的时间,q.rateLimiter.When(item) //然后调用延迟队列的Addfter添加元素到队列func (q *rateLimitingType) AddRateLimited(item interface{}) { q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))}//排队指数算法的 When方法func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration { r.failuresLock.Lock() defer r.failuresLock.Unlock() exp := r.failures[item] //获取指定元素的排队数 r.failures[item] = r.failures[item] + 1 // The backoff is capped such that 'calculated' value never overflows. backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp)) //指数计算需要等待的时间 if backoff > math.MaxInt64 { return r.maxDelay } calculated := time.Duration(backoff) if calculated > r.maxDelay { return r.maxDelay //确保等待时间不会大于maxDelay,maxDelay为1000s } return calculated}
调用q.rateLimiter.When(item)拿到所需等待的时间后,就会执行延迟队列的AddAfter方法将元素添加进队列,后续具体的添加动作就是延迟队列的相关操作。
标签: #操作系统银行家算法work怎么算