龙空技术网

golang高性能协程池--ants

寒笛过霜天 326

前言:

现在兄弟们对“net40线程池场景”可能比较着重,姐妹们都需要了解一些“net40线程池场景”的相关内容。那么小编在网上搜集了一些有关“net40线程池场景””的相关知识,希望小伙伴们能喜欢,朋友们一起来学习一下吧!

ants是一个高性能的 goroutine 池, 实现了对大规模 goroutine 的调度管理、goroutine 复用, 允许使用者在开发并发程序的时候限制 goroutine 数量, 复用资源, 达到更高效执行任务的效果。

github地址:

功能:

自动调度海量的 goroutines, 复用 goroutines

定期清理过期的 goroutines, 进一步节省资源

提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool

优雅处理 panic, 防止程序崩溃

资源复用, 极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能

非阻塞机制

使用 ants v1 版本:

go get -u github.com/panjf2000/ants

使用 ants v2 版本 (开启 GO111MODULE=on):

go get -u github.com/panjf2000/ants/v2

使用

写 go 并发程序的时候如果程序会启动大量的 goroutine , 势必会消耗大量的系统资源(内存, CPU), 通过使用 ants, 可以实例化一个 goroutine 池, 复用 goroutine , 节省资源, 提升性能:

通过在调用NewPool/NewPoolWithFunc之时使用各种 optional function, 可以设置ants.Options中各个配置项的值, 然后用它来定制化 goroutine pool.

自定义池

ants支持实例化使用者自己的一个 Pool, 指定具体的池容量;通过调用 NewPool 方法可以实例化一个新的带有指定容量的 Pool , 如下:

// Set 10000 the size of goroutine pool

p, _ := ants.NewPool(10000)

任务提交

提交任务通过调用 ants.Submit(func())方法:

ants.Submit(func(){})

动态调整 goroutine 池容量

需要动态调整 goroutine 池容量可以通过调用Tune(int):

pool.Tune(1000) // Tune its capacity to 1000

pool.Tune(100000) // Tune its capacity to 100000

该方法是线程安全的。

预先分配 goroutine 队列内存

ants允许你预先把整个池的容量分配内存, 这个功能可以在某些特定的场景下提高 goroutine 池的性能。比如, 有一个场景需要一个超大容量的池,而且每个 goroutine 里面的任务都是耗时任务,这种情况下,预先分配 goroutine 队列内存将会减少不必要的内存重新分配。

// ants will pre-malloc the whole capacity of pool when you invoke this function

p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))

释放 Pool

pool.Release()

重启 Pool

// 只要调用 Reboot() 方法,就可以重新激活一个之前已经被销毁掉的池,并且投入使用。

pool.Reboot()

实例:

package mainimport (    "fmt"    "sync"    "sync/atomic"    "time"    "github.com/panjf2000/ants/v2")var sum int32func myFunc(i interface{}) {    n := i.(int32)    atomic.AddInt32(&sum, n)    fmt.Printf("run with %d\n", n)}func demoFunc() {    time.Sleep(10 * time.Millisecond)    fmt.Println("Hello World!")}func main() {    defer ants.Release() //释放    runTimes := 1000    // Use the common pool.    var wg sync.WaitGroup // 维持一个计数器    syncCalculateSum := func() {        demoFunc()        wg.Done() //计数器 -1		}for i := 0; i < runTimes; i++ {    wg.Add(1) //计数器 +1    _ = ants.Submit(syncCalculateSum) //提交任务}wg.Wait() //阻塞直到计数器变为0fmt.Printf("running goroutines: %d\n", ants.Running())fmt.Printf("finish all tasks.\n")// Use the pool with a function,// set 10 to the capacity of goroutine pool and 1 second for expired duration.p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { //协程池使用的第二种方法myFunc(i)wg.Done()})defer p.Release()// Submit tasks one by one.for i := 0; i < runTimes; i++ {wg.Add(1)_ = p.Invoke(int32(i)) //向协程池中提交任务}wg.Wait() //阻塞直到计数器变为0fmt.Printf("running goroutines: %d\n", p.Running()) //返回当前正在运行的goroutine的数量。fmt.Printf("finish all tasks, result is %d\n", sum)}

协程池应用场景:

处理大量并发是 Go 语言的一大优势。语言内置了方便的并发语法, 可以非常方便的创建很多个轻量级的 goroutine 并发处理任务。

相比于创建多个线程,goroutine 更轻量、资源占用更少、切换速度更快、无线程上下文切换开销更少。但是受限于资源总量, 系统中能够创建的 goroutine 数量也是受限的。

默认每个 goroutine 占用 8KB 内存, 一台 8GB 内存的机器满打满算也只能创建 8GB/8KB = 1000000 个 goroutine, 更何况系统还需要保留一部分内存运行日常管理任务, go 运行时需要内存运行 gc、处理 goroutine 切换等。

使用的内存超过机器内存容量, 系统会使用交换区(swap), 导致性能急速下降。我们可以简单验证一下创建过多 goroutine 会发生什么:

package mainimport ("sync""time")func main() {var wg sync.WaitGroupwg.Add(10000000)for i := 0; i < 10000000; i++ {go func() {time.Sleep(1 * time.Minute)}()}wg.Wait()}

在我的机器上(8G内存)运行上面的程序会报errno 1455, 即Out of Memory错误, 这很好理解。谨慎运行。

另一方面, goroutine 的管理也是一个问题。goroutine 只能自己运行结束, 外部没有任何手段可以强制结束一个 goroutine。如果一个 goroutine 因为某种原因没有自行结束, 就会出现 goroutine 泄露。此外, 频繁创建 goroutine 也是一个开销。

鉴于上述原因, 自然出现了与线程池一样的需求, 即 goroutine 池。一般的 goroutine 池自动管理 goroutine 的生命周期, 可以按需创建, 动态缩容。向 goroutine 池提交一个任务, goroutine 池会自动安排某个 goroutine 来处理。

ants就是其中一个实现 goroutine 池的库。

执行流程如下:

初始化 goroutine 池;

提交任务给 goroutine 池, 检查是否有空闲的 goroutine:

有, 获取空闲 goroutine

无, 检查池中的 goroutine 数量是否已到池容量上限:

已到上限, 检查 goroutine 池是否是非阻塞的:

非阻塞, 直接返回nil表示执行失败

阻塞, 等待 goroutine 空闲

未到上限, 创建一个新的 goroutine 处理任务

任务处理完成, 将 goroutine 交还给池, 以待处理下一个任务

默认池

为了方便使用, 很多 Go 库都喜欢提供其核心功能类型的一个默认实现。可以直接通过库提供的接口调用。例如net/http, 例如ants。

ants库中定义了一个默认的池, 默认容量为MaxInt32。goroutine 池的各个方法都可以直接通过ants包直接访问

package mainimport ("fmt""sync""time""github.com/panjf2000/ants/v2")func wrapper(i int, wg *sync.WaitGroup) func() {return func() {fmt.Printf("hello from task:%d\n", i)time.Sleep(1 * time.Second)wg.Done() // 释放}}// 采用默认值的方法func main() {// DefaultAntsPoolSize = math.MaxInt32defer ants.Release()var wg sync.WaitGroupwg.Add(2)for i := 1; i <= 2; i++ {// 提交方法ants.Submit(wrapper(i, &wg))}wg.Wait()}输出结果:/*hello from task:1hello from task:2*/

最大等待队列长度

package mainimport ("fmt""sync""time""github.com/panjf2000/ants/v2")func wrapper(i int, wg *sync.WaitGroup) func() {    return func() {        fmt.Printf("hello from task:%d\n", i)        time.Sleep(1 * time.Second)        wg.Done()    }}func main() {    /*    MaxBlockingTasks:最大阻塞任务数量。即池中 goroutine 数量已到池容量, 且所有 goroutine 都处理繁忙状态, 这时到来的任务会在阻塞列表等待。    这个选项设置的是列表的最大长度。阻塞的任务数量达到这个值后, 后续任务提交直接返回失败    */    p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))    defer p.Release()    var wg sync.WaitGroup    wg.Add(8)    for i := 1; i <= 8; i++ {    go func(i int) {    err := p.Submit(wrapper(i, &wg))    if err != nil {    fmt.Printf("task:%d err:%v\n", i, err)    wg.Done()    }    }(i)    }    wg.Wait()}输出结果:/*hello from task:8hello from task:1hello from task:4hello from task:6task:5 err:too many goroutines blocked on submit or Nonblocking is settask:7 err:too many goroutines blocked on submit or Nonblocking is sethello from task:2hello from task:3*/

非阻塞

package mainimport ("fmt""sync""time""github.com/panjf2000/ants/v2")func wrapper(i int, wg *sync.WaitGroup) func() {    return func() {    fmt.Printf("hello from task:%d\n", i)    time.Sleep(1 * time.Second)    wg.Done()    }}// 自定义协程池的参数func main() {    /*    Nonblocking:池是否阻塞, 默认阻塞。提交任务时, 如果ants池中 goroutine 已到上限且全部繁忙, 阻塞的池会将任务添加的阻塞列表等待(当然受限于阻塞列表长度, 见上一个选项)。非阻塞的池直接返回失败    */    p, _ := ants.NewPool(2, ants.WithNonblocking(true))    defer p.Release()    var wg sync.WaitGroup    wg.Add(3)    for i := 1; i <= 3; i++ {    err := p.Submit(wrapper(i, &wg))    if err != nil {    fmt.Printf("task:%d err:%v\n", i, err)    wg.Done()    }    }    wg.Wait()}输出结果:/*hello from task:2task:3 err:too many goroutines blocked on submit or Nonblocking is sethello from task:1*/

使用案例:

package mainimport (    "fmt"    "math/rand"    "sync"    "github.com/panjf2000/ants/v2")type Task struct {    index int    nums []int    sum int    wg *sync.WaitGroup}func (t *Task) Do() {    for _, num := range t.nums {    		t.sum += num    }    t.wg.Done()}func taskFunc(data interface{}) {    task := data.(*Task)    task.Do()    fmt.Printf("task:%d sum:%d\n", task.index, task.sum)}const (    DataSize = 10000    DataPerTask = 100)func main() {// 创建 goroutine 池, 注意池使用完后需要手动关闭, 这里使用defer关闭/*ants.NewPoolWithFunc()创建了一个 goroutine 池。第一个参数是池容量, 即池中最多有 10 个 goroutine。第二个参数为每次执行任务的函数。当我们调用p.Invoke(data)的时候, ants池会在其管理的 goroutine 中找出一个空闲的, 让它执行函数taskFunc, 并将data作为参数。*/p, _ := ants.NewPoolWithFunc(10, taskFunc)defer p.Release()// 模拟数据, 做数据切分, 生成任务, 交给 ants 处理/*随机生成 10000 个整数, 将这些整数分为 100 份, 每份 100 个, 生成Task结构, 调用p.Invoke(task)处理。wg.Wait()等待处理完成, 然后输出ants正在运行的 goroutine 数量, 这时应该是 0。*/nums := make([]int, DataSize, DataSize)for i := range nums {nums[i] = rand.Intn(1000)}var wg sync.WaitGroupwg.Add(DataSize / DataPerTask)tasks := make([]*Task, 0, DataSize/DataPerTask)for i := 0; i < DataSize/DataPerTask; i++ {task := &Task{index: i + 1,nums: nums[i*DataPerTask : (i+1)*DataPerTask],wg: &wg,}tasks = append(tasks, task)p.Invoke(task) // 唤起}wg.Wait()fmt.Printf("running goroutines: %d\n", ants.Running()) // 运行的数量//将结果汇总,并验证一下结果,与直接相加得到的结果做一个比较var sum intfor _, task := range tasks {sum += task.sum}var expect intfor _, num := range nums {expect += num}/*确实, 任务完成之后, 正在运行的 goroutine 数量变为 0。而且我们验证了, 结果没有偏差。另外需要注意, goroutine 池中任务的执行顺序是随机的, 与提交任务的先后没有关系。由上面运行打印的任务标识我们也能发现这一点。*/fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)}

panic 处理器

一个鲁棒性强的库一定不会忽视错误的处理, 特别是宕机相关的错误。在 Go 语言中就是 panic, 也被称为运行时恐慌, 在程序运行的过程中产生的严重性错误, 例如索引越界, 空指针解引用等, 都会触发 panic。

如果不处理 panic, 程序会直接意外退出, 可能造成数据丢失的严重后果。

ants中如果 goroutine 在执行任务时发生panic, 会终止当前任务的执行, 将发生错误的堆栈输出到os.Stderr。注意, 该 goroutine 还是会被放回池中, 下次可以取出执行新的任务。

package mainimport ("fmt""os""sync""time""github.com/panjf2000/ants/v2")func wrapper(i int, wg *sync.WaitGroup) func() {    return func() {    		fmt.Printf("hello from task:%d\n", i)        /*        我们让偶数个任务触发panic。提交两个任务,第二个任务一定会触发panic。        触发panic之后,我们还可以继续提交任务 3、5。注 意这里没有 4,提交任务 4 还是会触发panic。        */        if i%2 == 0 {            panic(fmt.Sprintf("panic from task:%d", i))            }        wg.Done()    }}/*除了ants提供的默认 panic 处理器,我们还可以使用WithPanicHandler(paincHandler func(interface{}))指定我们自己编写的 panic 处理器。处理器的参数就是传给panic的值:*/func panicHandler(err interface{}) {    fmt.Fprintln(os.Stderr, err)}func main() {    // p, _ := ants.NewPool(2)    p, _ := ants.NewPool(2, ants.WithPanicHandler(panicHandler))    defer p.Release()    var wg sync.WaitGroup    wg.Add(3)    for i := 1; i <= 2; i++ {    p.Submit(wrapper(i, &wg))    }    time.Sleep(1 * time.Second)    p.Submit(wrapper(3, &wg))    p.Submit(wrapper(5, &wg))    wg.Wait()}/*hello from task:2panic from task:2hello from task:1hello from task:3hello from task:5*/

标签: #net40线程池场景