龙空技术网

schedule函数

咸鱼养成记 73

前言:

此时你们对“c语言scheduler函数”大约比较关心,同学们都需要了解一些“c语言scheduler函数”的相关内容。那么小编也在网摘上汇集了一些有关“c语言scheduler函数””的相关资讯,希望同学们能喜欢,大家一起来了解一下吧!

在讨论main goroutine的调度时我们已经见过schedule函数,因为当时我们的主要关注点在于main goroutine是如何被调度到CPU上运行的,所以并未对schedule函数如何挑选下一个goroutine出来运行做深入的分析,现在是重新回到schedule函数详细分析其调度策略的时候了。

runtime/proc.go : 2467

// One round of scheduler: find a runnable goroutine and execute it.// Never returns.func schedule() {    _g_ := getg()   //_g_ = m.g0    ......    var gp *g    ......       if gp == nil {    // Check the global runnable queue once in a while to ensure fairness.    // Otherwise two goroutines can completely occupy the local runqueue    // by constantly respawning each other.       //为了保证调度的公平性,每个工作线程每进行61次调度就需要优先从全局运行队列中获取goroutine出来运行,       //因为如果只调度本地运行队列中的goroutine,则全局运行队列中的goroutine有可能得不到运行        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {            lock(&sched.lock) //所有工作线程都能访问全局运行队列,所以需要加锁            gp = globrunqget(_g_.m.p.ptr(), 1) //从全局运行队列中获取1个goroutine            unlock(&sched.lock)        }    }    if gp == nil {        //从与m关联的p的本地运行队列中获取goroutine        gp, inheritTime = runqget(_g_.m.p.ptr())        if gp != nil && _g_.m.spinning {            throw("schedule: spinning with local work")        }    }    if gp == nil {        //如果从本地运行队列和全局运行队列都没有找到需要运行的goroutine,        //则调用findrunnable函数从其它工作线程的运行队列中偷取,如果偷取不到,则当前工作线程进入睡眠,        //直到获取到需要运行的goroutine之后findrunnable函数才会返回。        gp, inheritTime = findrunnable() // blocks until work is available    }    ......    //当前运行的是runtime的代码,函数调用栈使用的是g0的栈空间    //调用execte切换到gp的代码和栈空间去运行    execute(gp, inheritTime)  }

schedule函数分三步分别从各运行队列中寻找可运行的goroutine:

第一步,从全局运行队列中寻找goroutine。为了保证调度的公平性,每个工作线程每经过61次调度就需要优先尝试从全局运行队列中找出一个goroutine来运行,这样才能保证位于全局运行队列中的goroutine得到调度的机会。全局运行队列是所有工作线程都可以访问的,所以在访问它之前需要加锁。第二步,从工作线程本地运行队列中寻找goroutine。如果不需要或不能从全局运行队列中获取到goroutine则从本地运行队列中获取。第三步,从其它工作线程的运行队列中偷取goroutine。如果上一步也没有找到需要运行的goroutine,则调用findrunnable从其他工作线程的运行队列中偷取goroutine,findrunnable函数在偷取之前会再次尝试从全局运行队列和当前线程的本地运行队列中查找需要运行的goroutine。

下面我们先来看如何从全局运行队列中获取goroutine。

从全局运行队列中获取goroutine

从全局运行队列中获取可运行的goroutine是通过globrunqget函数来完成的,该函数的第一个参数是与当前工作线程绑定的p,第二个参数max表示最多可以从全局队列中拿多少个g到当前工作线程的本地运行队列中来。

runtime/proc.go : 4663

// Try get a batch of G's from the global runnable queue.// Sched must be locked.func globrunqget(_p_ *p, max int32) *g {    if sched.runqsize == 0 {  //全局运行队列为空        return nil    }    //根据p的数量平分全局运行队列中的goroutines    n := sched.runqsize / gomaxprocs + 1    if n > sched.runqsize { //上面计算n的方法可能导致n大于全局运行队列中的goroutine数量        n = sched.runqsize    }    if max > 0 && n > max {        n = max   //最多取max个goroutine    }    if n > int32(len(_p_.runq)) / 2 {        n = int32(len(_p_.runq)) / 2  //最多只能取本地队列容量的一半    }    sched.runqsize -= n    //直接通过函数返回gp,其它的goroutines通过runqput放入本地运行队列    gp := sched.runq.pop()  //pop从全局运行队列的队列头取    n--    for ; n > 0; n-- {        gp1 := sched.runq.pop()  //从全局运行队列中取出一个goroutine        runqput(_p_, gp1, false)  //放入本地运行队列    }    return gp}

globrunqget函数首先会根据全局运行队列中goroutine的数量,函数参数max以及_p_的本地队列的容量计算出到底应该拿多少个goroutine,然后把第一个g结构体对象通过返回值的方式返回给调用函数,其它的则通过runqput函数放入当前工作线程的本地运行队列。这段代码值得一提的是,计算应该从全局运行队列中拿走多少个goroutine时根据p的数量(gomaxprocs)做了负载均衡。

如果没有从全局运行队列中获取到goroutine,那么接下来就在工作线程的本地运行队列中寻找需要运行的goroutine。

从工作线程本地运行队列中获取goroutine

从代码上来看,工作线程的本地运行队列其实分为两个部分,一部分是由p的runq、runqhead和runqtail这三个成员组成的一个无锁循环队列,该队列最多可包含256个goroutine;另一部分是p的runnext成员,它是一个指向g结构体对象的指针,它最多只包含一个goroutine。

从本地运行队列中寻找goroutine是通过runqget函数完成的,寻找时,代码首先查看runnext成员是否为空,如果不为空则返回runnext所指的goroutine,并把runnext成员清零,如果runnext为空,则继续从循环队列中查找goroutine。

runtime/proc.go : 4825

// Get g from local runnable queue.// If inheritTime is true, gp should inherit the remaining time in the// current time slice. Otherwise, it should start a new time slice.// Executed only by the owner P.func runqget(_p_ *p) (gp *g, inheritTime bool) {    // If there's a runnext, it's the next G to run.    //从runnext成员中获取goroutine    for {        //查看runnext成员是否为空,不为空则返回该goroutine        next := _p_.runnext          if next == 0 {            break        }        if _p_.runnext.cas(next, 0) {            return next.ptr(), true        }    }    //从循环队列中获取goroutine    for {        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers        t := _p_.runqtail        if t == h {            return nil, false        }        gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()        if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume            return gp, false        }    }}

这里首先需要注意的是不管是从runnext还是从循环队列中拿取goroutine都使用了cas操作,这里的cas操作是必需的,因为可能有其他工作线程此时此刻也正在访问这两个成员,从这里偷取可运行的goroutine。

其次,代码中对runqhead的操作使用了atomic.LoadAcq和atomic.CasRel,它们分别提供了load-acquire和cas-release语义。

对于atomic.LoadAcq来说,其语义主要包含如下几条:

原子读取,也就是说不管代码运行在哪种平台,保证在读取过程中不会有其它线程对该变量进行写入;位于atomic.LoadAcq之后的代码,对内存的读取和写入必须在atomic.LoadAcq读取完成后才能执行,编译器和CPU都不能打乱这个顺序;当前线程执行atomic.LoadAcq时可以读取到其它线程最近一次通过atomic.CasRel对同一个变量写入的值,与此同时,位于atomic.LoadAcq之后的代码,不管读取哪个内存地址中的值,都可以读取到其它线程中位于atomic.CasRel(对同一个变量操作)之前的代码最近一次对内存的写入。

对于atomic.CasRel来说,其语义主要包含如下几条:

原子的执行比较并交换的操作;位于atomic.CasRel之前的代码,对内存的读取和写入必须在atomic.CasRel对内存的写入之前完成,编译器和CPU都不能打乱这个顺序;线程执行atomic.CasRel完成后其它线程通过atomic.LoadAcq读取同一个变量可以读到最新的值,与此同时,位于atomic.CasRel之前的代码对内存写入的值,可以被其它线程中位于atomic.LoadAcq(对同一个变量操作)之后的代码读取到。

因为可能有多个线程会并发的修改和读取runqhead,以及需要依靠runqhead的值来读取runq数组的元素,所以需要使用atomic.LoadAcq和atomic.CasRel来保证上述语义。

我们可能会问,为什么读取p的runqtail成员不需要使用atomic.LoadAcq或atomic.load?因为runqtail不会被其它线程修改,只会被当前工作线程修改,此时没有人修改它,所以也就不需要使用原子相关的操作。

最后,由p的runq、runqhead和runqtail这三个成员组成的这个无锁循环队列非常精妙

标签: #c语言scheduler函数