//基本的FIFO队列接口定义type Interface interface {	Add(item interface{})	Len() int	Get() (item interface{}, shutdown bool) //获取队列头部的元素	Done(item interface{}) //标记该元素已被处理	ShutDown() //关闭队列	ShuttingDown() bool //队列是否正在关闭} //数据结构定义,实现了Interface的方法type Type struct {	// queue defines the order in which we will work on items. Every	// element of queue should be in the dirty set and not in the	// processing set.	queue []t	// dirty defines all of the items that need to be processed.	dirty set	// Things that are currently being processed are in the processing set.	// These things may be simultaneously in the dirty set. When we finish	// processing something and remove it from this set, we'll check if	// it's in the dirty set, and if so, add it to the queue.	processing set	cond *sync.Cond	shuttingDown bool	metrics queueMetrics //监控指标相关的,用于Prometheus监控	unfinishedWorkUpdatePeriod time.Duration	clock                      clock.Clock}
queue 实际存储元素的地方dirty 类型是set(使用Map的key来实现的,确保唯一),能保证去重;同时也保证了在并发情况下,一个元素在被处理之前买,哪怕被添加了多次,也只会被处理一次processing 用于标记一个元素正在被处理



// Add marks item as needing processing.func (q *Type) Add(item interface{}) {	q.cond.L.Lock()	defer q.cond.L.Unlock()	if q.shuttingDown {		return	}	if q.dirty.has(item) { //在dirty中如果存在该元素,就直接返回;确保了元素在队列中的唯一性		return	}	q.metrics.add(item)	q.dirty.insert(item) //先将元素插入dirty中	if q.processing.has(item) { //如果该元素正在被处理,则返回		return	}	q.queue = append(q.queue, item) //将元素加入队列尾部,等待消费	q.cond.Signal() //唤醒一个消费者goroutine去队列中消费元素(调用Get方法)}......func (q *Type) Get() (item interface{}, shutdown bool) {	q.cond.L.Lock()	defer q.cond.L.Unlock()	for len(q.queue) == 0 && !q.shuttingDown {		q.cond.Wait() //如果队列为空并且没有处于关闭中,则阻塞,等待被唤醒(调用了Add方法和ShutDown方法Done方法都会唤醒该阻塞)	}	if len(q.queue) == 0 {//如果是上面的阻塞被唤醒了,但是队列长度还是0,则表示该队列被关闭		// We must be shutting down.		return nil, true 	}	item, q.queue = q.queue[0], q.queue[1:] //从队列头部取一个元素	q.metrics.get(item)	q.processing.insert(item)//标记该元素正在被处理	q.dirty.delete(item) //正在处理的元素从dirty中删除	return item, false}// 表示某个元素被处理完成func (q *Type) Done(item interface{}) {	q.cond.L.Lock()	defer q.cond.L.Unlock()	q.metrics.done(item)	q.processing.delete(item) //从processing中移除该元素	if q.dirty.has(item) { //如果dirty中还有一个相同的元素存在,说明在该元素被处理的时候,又加入了相同的元素进来		q.queue = append(q.queue, item)//此时需要将元素添加至Queue中,等待被消费    q.cond.Signal() //唤醒消费goroutine(那些调用了Get方法而阻塞的goroutine,只会被唤醒一个)	}}//关闭WorkQueuefunc (q *Type) ShutDown() {	q.cond.L.Lock()	defer q.cond.L.Unlock()	q.shuttingDown = true  q.cond.Broadcast() //唤醒所有的消费者goroutine,让它们安全退出(哪些调用了Get方法而阻塞的goroutine)}


type DelayingInterface interface {	Interface	// AddAfter adds an item to the workqueue after the indicated duration has passed	AddAfter(item interface{}, duration time.Duration)}type delayingType struct {	Interface	// clock tracks time for delayed firing	clock clock.Clock	// stopCh lets us signal a shutdown to the waiting loop	stopCh chan struct{}	// heartbeat ensures we wait no more than maxWait before firing	heartbeat clock.Ticker	// waitingForAddCh is a buffered channel that feeds waitingForAdd	waitingForAddCh chan *waitFor //初始化延迟队列的时候,channel长度为1000	// metrics counts the number of retries	metrics           retryMetrics	deprecatedMetrics retryMetrics}




// AddAfter adds the given item to the work queue after the given delayfunc (q *delayingType) AddAfter(item interface{}, duration time.Duration) {	// don't add if we're already shutting down	if q.ShuttingDown() {		return	}	q.metrics.retry()	q.deprecatedMetrics.retry()	// immediately add things with no delay	if duration <= 0 {		q.Add(item)		return	}	select {	case <-q.stopCh:		// unblock if ShutDown() is called	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:	}}.../ waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.func (q *delayingType) waitingLoop() {	defer utilruntime.HandleCrash()	// Make a placeholder channel to use when there are no items in our list	never := make(<-chan time.Time)	waitingForQueue := &waitForPriorityQueue{}	heap.Init(waitingForQueue)	waitingEntryByData := map[t]*waitFor{}	for {		if q.Interface.ShuttingDown() {			return		}		now := q.clock.Now()		// Add ready entries		for waitingForQueue.Len() > 0 {			entry := waitingForQueue.Peek().(*waitFor)			if entry.readyAt.After(now) {				break			}			entry = heap.Pop(waitingForQueue).(*waitFor)			q.Add(entry.data)			delete(waitingEntryByData, entry.data)		}		// Set up a wait for the first item's readyAt (if one exists)		nextReadyAt := never		if waitingForQueue.Len() > 0 {			entry := waitingForQueue.Peek().(*waitFor)			nextReadyAt = q.clock.After(entry.readyAt.Sub(now))		}		select {		case <-q.stopCh:			return		case <-q.heartbeat.C():			// continue the loop, which will add ready items		case <-nextReadyAt:			// continue the loop, which will add ready items		case waitEntry := <-q.waitingForAddCh:			if waitEntry.readyAt.After(q.clock.Now()) {				insert(waitingForQueue, waitingEntryByData, waitEntry)			} else {				q.Add(waitEntry.data)			}			drained := false			for !drained {				select {				case waitEntry := <-q.waitingForAddCh:					if waitEntry.readyAt.After(q.clock.Now()) {						insert(waitingForQueue, waitingEntryByData, waitEntry)					} else {						q.Add(waitEntry.data)					}				default:					drained = true				}			}		}	}}


type RateLimitingInterface interface {	DelayingInterface //延迟队列	// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok	AddRateLimited(item interface{}) //想队列对添加元素	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you	// still have to call `Done` on the queue.	Forget(item interface{})//当某个元素处理完成之后,调用Forget,清空元素的排队数,调用具体限速算法的同名方法	// NumRequeues returns back how many times the item was requeued	NumRequeues(item interface{}) int //获取指定元素的排队数,调用具体限速算法的同名方法}


// rateLimitingType wraps an Interface and provides rateLimited re-enquingtype rateLimitingType struct {   DelayingInterface   rateLimiter RateLimiter //限速队列的接口定义是比较简单的,限速队列的重点就在于其提供的几种不同限速算法}


type RateLimiter interface {   // When gets an item and gets to decide how long that item should wait   When(item interface{}) time.Duration //获取指定元素插入队列前应该等待的时间   // Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing   // or for success, we'll stop tracking it   Forget(item interface{})//释放指定元素,清空该元素的排队数   // NumRequeues returns back how many failures the item has had   NumRequeues(item interface{}) int//返回指定元素的排队数}


令牌桶算法(BucketRateLimiter): 以固定的速率往桶里填充Token,直到填满为止(多余的会被丢弃);每个元素都会从桶里获取一个Token,只有拿到Token的才允许通过,否则该元素处理等待Token的状态;以此达到限速的目的。排队指数算法(ItemExponentialFailureRateLimiter): 将相同元素排队数作为指数,排队数增大,速率限制呈指数增长,但最大不会超过maxDelay。计数器算法(ItemFastFlowRateLimiter):限制一段时间内允许通过的元素数量。混合模式: 将多种限速算法混合使用。


// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok//添加元素到队列中//首先要先获取某个元素需要等待的时间,q.rateLimiter.When(item) //然后调用延迟队列的Addfter添加元素到队列func (q *rateLimitingType) AddRateLimited(item interface{}) {   q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))}//排队指数算法的 When方法func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {	r.failuresLock.Lock()	defer r.failuresLock.Unlock()	exp := r.failures[item] //获取指定元素的排队数	r.failures[item] = r.failures[item] + 1	// The backoff is capped such that 'calculated' value never overflows.	backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp)) //指数计算需要等待的时间	if backoff > math.MaxInt64 {		return r.maxDelay	}	calculated := time.Duration(backoff)	if calculated > r.maxDelay {		return r.maxDelay //确保等待时间不会大于maxDelay,maxDelay为1000s	}	return calculated}


