龙空技术网

带你深入理解一个Linux下C线程池的实现(图文详解)

Linux特训营 176

前言:

现在咱们对“nginxpooldestroy”都比较重视,朋友们都需要分析一些“nginxpooldestroy”的相关文章。那么小编也在网上汇集了一些有关“nginxpooldestroy””的相关文章,希望小伙伴们能喜欢,看官们一起来学习一下吧!

关于线程和线程池的学习,我们可以从以下几个方面入手:

第一,什么是线程,线程和进程的区别是什么

第二,线程中的基本概念,线程的生命周期

第三,单线程和多线程

第四,线程池的原理解析

第五,常见的几种线程池的特点以及各自的应用场景

一、什么是线程,线程和进程的区别是什么

线程,程序从行流的最小执行单位,是行程中的实际运作单位,经常容易和进程这个概念混淆。那么,线程和进程究竟有什么区别呢?首先,进程是一个动态的过程,是一个活动的实体。简单来说,一个应用程序的运行就可以被看做是一个进程,而线程,是运行中的实际的任务执行者。可以说,进程中包含了多个可以同时运行的线程。

二、线程中的基本概念,线程的生命周期

线程的生命周期,线程的生命周期可以利用以下的图解来更好的理解:

三、什么是单线程和多线程?

一般来说实现一个线程池主要包括以下几个组成部分:

1)线程管理器:用于创建并管理线程池。

2)工作线程:线程池中实际执行任务的线程。在初始化线程时会预先创建好固定数目的线程在池中,这些初始化的线程一般处于空闲状态,一般不占用CPU,占用较小的内存空间。

3)任务接口:每个任务必须实现的接口,当线程池的任务队列中有可执行任务时,被空闲的工作线程调去执行,把任务抽象出来形成接口,可以做到线程池与具体的任务无关。

4)任务队列:用来存放没有处理的任务,提供一种缓冲机制,实现这种结构有好几种方法,常用的是队列,主要运用先进先出原理,另外一种是链表之类的数据结构,

什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了。如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了。

下面是Linux系统下用C语言创建的一个线程池。线程池会维护一个任务链表(每个CThread_worker结构就是一个任务)。

pool_init()函数预先创建好max_thread_num个线程,每个线程执thread_routine ()函数。该函数中

while (pool->cur_queue_size == 0){      pthread_cond_wait (&(pool->queue_ready),&(pool->queue_lock));}

示如果任务链表中没有任务,则该线程处于阻塞等待状态。否则从队列中取出任务并执行。

pool_add_worker()函数向线程池的任务链表中加入一个任务,加入后通过调用pthread_cond_signal (&(pool->queue_ready))唤醒一个处于阻塞状态的线程(如果有的话)。

pool_destroy ()函数用于销毁线程池,线程池任务链表中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出。

#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <sys/types.h>#include <pthread.h>#include <assert.h>/**线程池里所有运行和等待的任务都是一个CThread_worker*由于所有任务都在链表里,所以是一个链表结构*/typedef struct worker{/*回调函数,任务运行时会调用此函数,注意也可声明成其它形式*/void *(*process) (void *arg);void *arg;/*回调函数的参数*/struct worker *next;} CThread_worker;/*线程池结构*/typedef struct{pthread_mutex_t queue_lock;pthread_cond_t queue_ready;/*链表结构,线程池中所有等待任务*/CThread_worker *queue_head;/*是否销毁线程池*/int shutdown;pthread_t *threadid;/*线程池中允许的活动线程数目*/int max_thread_num;/*当前等待队列的任务数目*/int cur_queue_size;} CThread_pool;int pool_add_worker (void *(*process) (void *arg), void *arg);void *thread_routine (void *arg);//share resourcestatic CThread_pool *pool = NULL;voidpool_init (int max_thread_num){pool = (CThread_pool *) malloc (sizeof (CThread_pool));pthread_mutex_init (&(pool->queue_lock), NULL);pthread_cond_init (&(pool->queue_ready), NULL);pool->queue_head = NULL;pool->max_thread_num = max_thread_num;pool->cur_queue_size = 0;pool->shutdown = 0;pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));int i = 0;for (i = 0; i < max_thread_num; i++){pthread_create (&(pool->threadid[i]), NULL, thread_routine,NULL);}}/*向线程池中加入任务*/intpool_add_worker (void *(*process) (void *arg), void *arg){/*构造一个新任务*/CThread_worker *newworker = (CThread_worker *) malloc (sizeof (CThread_worker));newworker->process = process;newworker->arg = arg;newworker->next = NULL;/*别忘置空*/pthread_mutex_lock (&(pool->queue_lock));/*将任务加入到等待队列中*/CThread_worker *member = pool->queue_head;if (member != NULL){while (member->next != NULL)member = member->next;member->next = newworker;}else{pool->queue_head = newworker;}assert (pool->queue_head != NULL);pool->cur_queue_size++;pthread_mutex_unlock (&(pool->queue_lock));/*好了,等待队列中有任务了,唤醒一个等待线程;注意如果所有线程都在忙碌,这句没有任何作用*/pthread_cond_signal (&(pool->queue_ready));return 0;}/*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出*/intpool_destroy (){if (pool->shutdown)return -1;/*防止两次调用*/pool->shutdown = 1;/*唤醒所有等待线程,线程池要销毁了*/pthread_cond_broadcast (&(pool->queue_ready));/*阻塞等待线程退出,否则就成僵尸了*/int i;for (i = 0; i < pool->max_thread_num; i++)pthread_join (pool->threadid[i], NULL);free (pool->threadid);/*销毁等待队列*/CThread_worker *head = NULL;while (pool->queue_head != NULL){head = pool->queue_head;pool->queue_head = pool->queue_head->next;free (head);}/*条件变量和互斥量也别忘了销毁*/pthread_mutex_destroy(&(pool->queue_lock));pthread_cond_destroy(&(pool->queue_ready));free (pool);/*销毁后指针置空是个好习惯*/pool=NULL;return 0;}void *thread_routine (void *arg){printf ("starting thread 0x%x\n", pthread_self ());while (1){pthread_mutex_lock (&(pool->queue_lock));/*如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁*/while (pool->cur_queue_size == 0 && !pool->shutdown){printf ("thread 0x%x is waiting\n", pthread_self ());pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));}/*线程池要销毁了*/if (pool->shutdown){/*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/pthread_mutex_unlock (&(pool->queue_lock));printf ("thread 0x%x will exit\n", pthread_self ());pthread_exit (NULL);}printf ("thread 0x%x is starting to work\n", pthread_self ());/*assert是调试的好帮手*/assert (pool->cur_queue_size != 0);assert (pool->queue_head != NULL);/*等待队列长度减去1,并取出链表中的头元素*/pool->cur_queue_size--;CThread_worker *worker = pool->queue_head;pool->queue_head = worker->next;pthread_mutex_unlock (&(pool->queue_lock));/*调用回调函数,执行任务*/(*(worker->process)) (worker->arg);free (worker);worker = NULL;}/*这一句应该是不可达的*/pthread_exit (NULL);}// 下面是测试代码void *myprocess (void *arg){printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);sleep (1);/*休息一秒,延长任务的执行时间*/return NULL;}intmain (int argc, char **argv){pool_init (3);/*线程池中最多三个活动线程*//*连续向池中投入10个任务*/int *workingnum = (int *) malloc (sizeof (int) * 10);int i;for (i = 0; i < 10; i++){workingnum[i] = i;pool_add_worker (myprocess, &workingnum[i]);}/*等待所有任务完成*/sleep (5);/*销毁线程池*/pool_destroy ();free (workingnum);return 0;}

将上述所有代码放入threadpool.c文件中,

在Linux输入编译命令

$ gcc -o threadpool threadpool.c -lpthread

以下是运行结果

starting thread 0xb7df6b90thread 0xb7df6b90 is waitingstarting thread 0xb75f5b90thread 0xb75f5b90 is waitingstarting thread 0xb6df4b90thread 0xb6df4b90 is waitingthread 0xb7df6b90 is starting to workthreadid is 0xb7df6b90, working on task 0thread 0xb75f5b90 is starting to workthreadid is 0xb75f5b90, working on task 1thread 0xb6df4b90 is starting to workthreadid is 0xb6df4b90, working on task 2thread 0xb7df6b90 is starting to workthreadid is 0xb7df6b90, working on task 3thread 0xb75f5b90 is starting to workthreadid is 0xb75f5b90, working on task 4thread 0xb6df4b90 is starting to workthreadid is 0xb6df4b90, working on task 5thread 0xb7df6b90 is starting to workthreadid is 0xb7df6b90, working on task 6thread 0xb75f5b90 is starting to workthreadid is 0xb75f5b90, working on task 7thread 0xb6df4b90 is starting to workthreadid is 0xb6df4b90, working on task 8thread 0xb7df6b90 is starting to workthreadid is 0xb7df6b90, working on task 9thread 0xb75f5b90 is waitingthread 0xb6df4b90 is waitingthread 0xb7df6b90 is waitingthread 0xb75f5b90 will exitthread 0xb6df4b90 will exitthread 0xb7df6b90 will exit
总结;为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效的进行线程控制。他们都在java.util.concurrent包中,是JDK并发包的核心。其中一个比较重要的类:Executors,他扮演着线程工厂的角色,我们通过Executors可以创建特定功能的线程池。

不积跬步无以至千里,学习C/C++,Linux,Nginx,golang,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,ffmpeg,流媒体, 音视频,CDN,P2P,K8S,Docker,Golang,TCP/IP,协程,嵌入式,ARM,DPDK等等。。。

可以后台私信‘资料’即可领取相关学习资料

标签: #nginxpooldestroy