前言:
现在兄弟们对“net40线程池场景”可能比较着重,姐妹们都需要了解一些“net40线程池场景”的相关内容。那么小编在网上搜集了一些有关“net40线程池场景””的相关知识,希望小伙伴们能喜欢,朋友们一起来学习一下吧!ants是一个高性能的 goroutine 池, 实现了对大规模 goroutine 的调度管理、goroutine 复用, 允许使用者在开发并发程序的时候限制 goroutine 数量, 复用资源, 达到更高效执行任务的效果。
github地址:
功能:
自动调度海量的 goroutines, 复用 goroutines
定期清理过期的 goroutines, 进一步节省资源
提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool
优雅处理 panic, 防止程序崩溃
资源复用, 极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能
非阻塞机制
使用 ants v1 版本:
go get -u github.com/panjf2000/ants
使用 ants v2 版本 (开启 GO111MODULE=on):
go get -u github.com/panjf2000/ants/v2
使用
写 go 并发程序的时候如果程序会启动大量的 goroutine , 势必会消耗大量的系统资源(内存, CPU), 通过使用 ants, 可以实例化一个 goroutine 池, 复用 goroutine , 节省资源, 提升性能:
通过在调用NewPool/NewPoolWithFunc之时使用各种 optional function, 可以设置ants.Options中各个配置项的值, 然后用它来定制化 goroutine pool.
自定义池
ants支持实例化使用者自己的一个 Pool, 指定具体的池容量;通过调用 NewPool 方法可以实例化一个新的带有指定容量的 Pool , 如下:
// Set 10000 the size of goroutine pool
p, _ := ants.NewPool(10000)
任务提交
提交任务通过调用 ants.Submit(func())方法:
ants.Submit(func(){})
动态调整 goroutine 池容量
需要动态调整 goroutine 池容量可以通过调用Tune(int):
pool.Tune(1000) // Tune its capacity to 1000
pool.Tune(100000) // Tune its capacity to 100000
该方法是线程安全的。
预先分配 goroutine 队列内存
ants允许你预先把整个池的容量分配内存, 这个功能可以在某些特定的场景下提高 goroutine 池的性能。比如, 有一个场景需要一个超大容量的池,而且每个 goroutine 里面的任务都是耗时任务,这种情况下,预先分配 goroutine 队列内存将会减少不必要的内存重新分配。
// ants will pre-malloc the whole capacity of pool when you invoke this function
p, _ := ants.NewPool(100000, ants.WithPreAlloc(true))
释放 Pool
pool.Release()
重启 Pool
// 只要调用 Reboot() 方法,就可以重新激活一个之前已经被销毁掉的池,并且投入使用。
pool.Reboot()
实例:
package mainimport ( "fmt" "sync" "sync/atomic" "time" "github.com/panjf2000/ants/v2")var sum int32func myFunc(i interface{}) { n := i.(int32) atomic.AddInt32(&sum, n) fmt.Printf("run with %d\n", n)}func demoFunc() { time.Sleep(10 * time.Millisecond) fmt.Println("Hello World!")}func main() { defer ants.Release() //释放 runTimes := 1000 // Use the common pool. var wg sync.WaitGroup // 维持一个计数器 syncCalculateSum := func() { demoFunc() wg.Done() //计数器 -1 }for i := 0; i < runTimes; i++ { wg.Add(1) //计数器 +1 _ = ants.Submit(syncCalculateSum) //提交任务}wg.Wait() //阻塞直到计数器变为0fmt.Printf("running goroutines: %d\n", ants.Running())fmt.Printf("finish all tasks.\n")// Use the pool with a function,// set 10 to the capacity of goroutine pool and 1 second for expired duration.p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { //协程池使用的第二种方法myFunc(i)wg.Done()})defer p.Release()// Submit tasks one by one.for i := 0; i < runTimes; i++ {wg.Add(1)_ = p.Invoke(int32(i)) //向协程池中提交任务}wg.Wait() //阻塞直到计数器变为0fmt.Printf("running goroutines: %d\n", p.Running()) //返回当前正在运行的goroutine的数量。fmt.Printf("finish all tasks, result is %d\n", sum)}
协程池应用场景:
处理大量并发是 Go 语言的一大优势。语言内置了方便的并发语法, 可以非常方便的创建很多个轻量级的 goroutine 并发处理任务。
相比于创建多个线程,goroutine 更轻量、资源占用更少、切换速度更快、无线程上下文切换开销更少。但是受限于资源总量, 系统中能够创建的 goroutine 数量也是受限的。
默认每个 goroutine 占用 8KB 内存, 一台 8GB 内存的机器满打满算也只能创建 8GB/8KB = 1000000 个 goroutine, 更何况系统还需要保留一部分内存运行日常管理任务, go 运行时需要内存运行 gc、处理 goroutine 切换等。
使用的内存超过机器内存容量, 系统会使用交换区(swap), 导致性能急速下降。我们可以简单验证一下创建过多 goroutine 会发生什么:
package mainimport ("sync""time")func main() {var wg sync.WaitGroupwg.Add(10000000)for i := 0; i < 10000000; i++ {go func() {time.Sleep(1 * time.Minute)}()}wg.Wait()}
在我的机器上(8G内存)运行上面的程序会报errno 1455, 即Out of Memory错误, 这很好理解。谨慎运行。
另一方面, goroutine 的管理也是一个问题。goroutine 只能自己运行结束, 外部没有任何手段可以强制结束一个 goroutine。如果一个 goroutine 因为某种原因没有自行结束, 就会出现 goroutine 泄露。此外, 频繁创建 goroutine 也是一个开销。
鉴于上述原因, 自然出现了与线程池一样的需求, 即 goroutine 池。一般的 goroutine 池自动管理 goroutine 的生命周期, 可以按需创建, 动态缩容。向 goroutine 池提交一个任务, goroutine 池会自动安排某个 goroutine 来处理。
ants就是其中一个实现 goroutine 池的库。
执行流程如下:
初始化 goroutine 池;
提交任务给 goroutine 池, 检查是否有空闲的 goroutine:
有, 获取空闲 goroutine
无, 检查池中的 goroutine 数量是否已到池容量上限:
已到上限, 检查 goroutine 池是否是非阻塞的:
非阻塞, 直接返回nil表示执行失败
阻塞, 等待 goroutine 空闲
未到上限, 创建一个新的 goroutine 处理任务
任务处理完成, 将 goroutine 交还给池, 以待处理下一个任务
默认池
为了方便使用, 很多 Go 库都喜欢提供其核心功能类型的一个默认实现。可以直接通过库提供的接口调用。例如net/http, 例如ants。
ants库中定义了一个默认的池, 默认容量为MaxInt32。goroutine 池的各个方法都可以直接通过ants包直接访问
package mainimport ("fmt""sync""time""github.com/panjf2000/ants/v2")func wrapper(i int, wg *sync.WaitGroup) func() {return func() {fmt.Printf("hello from task:%d\n", i)time.Sleep(1 * time.Second)wg.Done() // 释放}}// 采用默认值的方法func main() {// DefaultAntsPoolSize = math.MaxInt32defer ants.Release()var wg sync.WaitGroupwg.Add(2)for i := 1; i <= 2; i++ {// 提交方法ants.Submit(wrapper(i, &wg))}wg.Wait()}输出结果:/*hello from task:1hello from task:2*/
最大等待队列长度
package mainimport ("fmt""sync""time""github.com/panjf2000/ants/v2")func wrapper(i int, wg *sync.WaitGroup) func() { return func() { fmt.Printf("hello from task:%d\n", i) time.Sleep(1 * time.Second) wg.Done() }}func main() { /* MaxBlockingTasks:最大阻塞任务数量。即池中 goroutine 数量已到池容量, 且所有 goroutine 都处理繁忙状态, 这时到来的任务会在阻塞列表等待。 这个选项设置的是列表的最大长度。阻塞的任务数量达到这个值后, 后续任务提交直接返回失败 */ p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2)) defer p.Release() var wg sync.WaitGroup wg.Add(8) for i := 1; i <= 8; i++ { go func(i int) { err := p.Submit(wrapper(i, &wg)) if err != nil { fmt.Printf("task:%d err:%v\n", i, err) wg.Done() } }(i) } wg.Wait()}输出结果:/*hello from task:8hello from task:1hello from task:4hello from task:6task:5 err:too many goroutines blocked on submit or Nonblocking is settask:7 err:too many goroutines blocked on submit or Nonblocking is sethello from task:2hello from task:3*/
非阻塞
package mainimport ("fmt""sync""time""github.com/panjf2000/ants/v2")func wrapper(i int, wg *sync.WaitGroup) func() { return func() { fmt.Printf("hello from task:%d\n", i) time.Sleep(1 * time.Second) wg.Done() }}// 自定义协程池的参数func main() { /* Nonblocking:池是否阻塞, 默认阻塞。提交任务时, 如果ants池中 goroutine 已到上限且全部繁忙, 阻塞的池会将任务添加的阻塞列表等待(当然受限于阻塞列表长度, 见上一个选项)。非阻塞的池直接返回失败 */ p, _ := ants.NewPool(2, ants.WithNonblocking(true)) defer p.Release() var wg sync.WaitGroup wg.Add(3) for i := 1; i <= 3; i++ { err := p.Submit(wrapper(i, &wg)) if err != nil { fmt.Printf("task:%d err:%v\n", i, err) wg.Done() } } wg.Wait()}输出结果:/*hello from task:2task:3 err:too many goroutines blocked on submit or Nonblocking is sethello from task:1*/
使用案例:
package mainimport ( "fmt" "math/rand" "sync" "github.com/panjf2000/ants/v2")type Task struct { index int nums []int sum int wg *sync.WaitGroup}func (t *Task) Do() { for _, num := range t.nums { t.sum += num } t.wg.Done()}func taskFunc(data interface{}) { task := data.(*Task) task.Do() fmt.Printf("task:%d sum:%d\n", task.index, task.sum)}const ( DataSize = 10000 DataPerTask = 100)func main() {// 创建 goroutine 池, 注意池使用完后需要手动关闭, 这里使用defer关闭/*ants.NewPoolWithFunc()创建了一个 goroutine 池。第一个参数是池容量, 即池中最多有 10 个 goroutine。第二个参数为每次执行任务的函数。当我们调用p.Invoke(data)的时候, ants池会在其管理的 goroutine 中找出一个空闲的, 让它执行函数taskFunc, 并将data作为参数。*/p, _ := ants.NewPoolWithFunc(10, taskFunc)defer p.Release()// 模拟数据, 做数据切分, 生成任务, 交给 ants 处理/*随机生成 10000 个整数, 将这些整数分为 100 份, 每份 100 个, 生成Task结构, 调用p.Invoke(task)处理。wg.Wait()等待处理完成, 然后输出ants正在运行的 goroutine 数量, 这时应该是 0。*/nums := make([]int, DataSize, DataSize)for i := range nums {nums[i] = rand.Intn(1000)}var wg sync.WaitGroupwg.Add(DataSize / DataPerTask)tasks := make([]*Task, 0, DataSize/DataPerTask)for i := 0; i < DataSize/DataPerTask; i++ {task := &Task{index: i + 1,nums: nums[i*DataPerTask : (i+1)*DataPerTask],wg: &wg,}tasks = append(tasks, task)p.Invoke(task) // 唤起}wg.Wait()fmt.Printf("running goroutines: %d\n", ants.Running()) // 运行的数量//将结果汇总,并验证一下结果,与直接相加得到的结果做一个比较var sum intfor _, task := range tasks {sum += task.sum}var expect intfor _, num := range nums {expect += num}/*确实, 任务完成之后, 正在运行的 goroutine 数量变为 0。而且我们验证了, 结果没有偏差。另外需要注意, goroutine 池中任务的执行顺序是随机的, 与提交任务的先后没有关系。由上面运行打印的任务标识我们也能发现这一点。*/fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)}
panic 处理器
一个鲁棒性强的库一定不会忽视错误的处理, 特别是宕机相关的错误。在 Go 语言中就是 panic, 也被称为运行时恐慌, 在程序运行的过程中产生的严重性错误, 例如索引越界, 空指针解引用等, 都会触发 panic。
如果不处理 panic, 程序会直接意外退出, 可能造成数据丢失的严重后果。
ants中如果 goroutine 在执行任务时发生panic, 会终止当前任务的执行, 将发生错误的堆栈输出到os.Stderr。注意, 该 goroutine 还是会被放回池中, 下次可以取出执行新的任务。
package mainimport ("fmt""os""sync""time""github.com/panjf2000/ants/v2")func wrapper(i int, wg *sync.WaitGroup) func() { return func() { fmt.Printf("hello from task:%d\n", i) /* 我们让偶数个任务触发panic。提交两个任务,第二个任务一定会触发panic。 触发panic之后,我们还可以继续提交任务 3、5。注 意这里没有 4,提交任务 4 还是会触发panic。 */ if i%2 == 0 { panic(fmt.Sprintf("panic from task:%d", i)) } wg.Done() }}/*除了ants提供的默认 panic 处理器,我们还可以使用WithPanicHandler(paincHandler func(interface{}))指定我们自己编写的 panic 处理器。处理器的参数就是传给panic的值:*/func panicHandler(err interface{}) { fmt.Fprintln(os.Stderr, err)}func main() { // p, _ := ants.NewPool(2) p, _ := ants.NewPool(2, ants.WithPanicHandler(panicHandler)) defer p.Release() var wg sync.WaitGroup wg.Add(3) for i := 1; i <= 2; i++ { p.Submit(wrapper(i, &wg)) } time.Sleep(1 * time.Second) p.Submit(wrapper(3, &wg)) p.Submit(wrapper(5, &wg)) wg.Wait()}/*hello from task:2panic from task:2hello from task:1hello from task:3hello from task:5*/
标签: #net40线程池场景