龙空技术网

面试官:你在项目中用过 多线程 吗?

面试辅导 414

前言:

如今朋友们对“mysql多线程读写”大约比较着重,咱们都想要学习一些“mysql多线程读写”的相关内容。那么小编也在网上搜集了一些对于“mysql多线程读写””的相关文章,希望小伙伴们能喜欢,小伙伴们快快来学习一下吧!

hi 大家好,我是田哥

最近,从去年到现在,我给小伙伴们做模拟面试已有100多场。有时候我也在想,现在真的很卷吗?大部分人第一次模拟面试结束,给我的感觉不像大家说的那么卷。

奇怪的现象

我在做模拟面试的过程中,无意中发现一个现象,就是如果问八股文,问知识点,在校生或刚刚毕业不久的,回答的斗殴挺好的,往往是工作三五年的在这一块非常欠缺。

我也私下问过很多人,为什么这种现象,主要原因差不多就是:

每天太忙,没时间学习年纪大了,记不住就是不要想学

工作三五年的也不是就真的没有优点,他们的优点就是有大量的项目经验。换着问项目业务和设计之类的,他们明显占优势,但,问他们稍微往深地问,就会懵逼。比如:你们项目中使用到了Redis,用来干嘛,他们能立马回答上来。

如果继续追问:如何保证Redis和数据库中的数据一致性?然后就会稀里糊涂地回答。还有就是问他们Redis的持久化方式使用的是哪种?“这个没注意,不是我安装的”,继续问:那你觉得哪种方式更好,答案各种各样的都有。

总结起来就亮点:

学生或新人,八股文占优势(也有一部分啥都不知道,啥也没去背的)。三五年有项目经验,但大部分都停留在用上面,稍微问题问题就容易暴露自己的家点(也有一小分部知道的比较多)

在模拟面试的时候,我问过很多人是否在项目中用过并发编程的相关技术,用了什么?

基本上都回答:用过线程池

好吧,接下来,那我们就以一个线程池的面试题来对比以上两类人的回答。

聊聊线程池线程池核心参数

学生或新人:基本上都是一口气就能把这些参数回答上来,另外有部分优秀的会对这些参数做一个解释。

三五年的:部分人能全部回答出来,一部分人能说出核心线程数、最大线程数,其他参数就吱吱呜呜地回答,还有一部分就是完全一脸懵逼。

我们来看看到底有哪些参数:

public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {
corePoolSize:核心线程数maximumPoolSize:最大线程数keepAliveTime:空闲时间unit:空闲时间单位workQueue:阻塞队列threadFactory:线程工厂handler:拒绝策略

问核心参数时,至少要回答corePoolSizemaximumPoolSizeworkQueuehandler。还是建议全部回答吧,反正也没几个参数。

线程池原理

学生或新人:按照八股文来回答一番,甚至有的在回答核心参数的时候,顺带着就会把线程池的原理给说了(刚刚遇到能说的,就顺带着多说点)。

三五年的:有部分人也会按照八股文上的来回答,有部分人是吱吱呜呜的,不知道在说啥,还有一分部人就是瞎说咯。

关于线程池原理,我这里借用网上一张图:

如果看图记不住,我也有办法,我们可以使用生活案例来理解。

公司A:线程池公司A自己的员工:核心线程数公司A接到的订单:我们的业务线程公司A的仓库:阻塞队列公司B派的人:最大线程数

开始表演:

公司A接到订单,先给自己员工处理,如果自己员工处理不来了,就丢到公司A仓库里,如果仓库堆满了,这时候就去找公司B,公司B就派人(最大线程数)到公司A,订单持续爆棚,公司B派来的员工和公司A的员工都搞不来了,那就只能把后面来的订单拒绝掉(拒绝策略)。如果公司B的员工在公司A里吧任务做完了,闲着没事了,公司A也不会立马就让人家回公司B,毕竟人员来回还是有成本的,所以,可以适当的给点时间(keepAliveTime),是在没有什么任务了,那你们还是回公司B吧。

好了,按照这个故事去编就行了,也可以模仿着编其他故事,至少让面试官觉得你不是在背八股文。

核心线程数量设置

学生或新人:就算知道也是被八股文的,但很遗憾,问过十多个人,回答上来的应该占30%左右。

三五年的:问过几十个人,回答上来的寥寥无几,甚是遗憾。在项目中敢用线程池,却不知道如何设置核心线程数,这不是瞎搞吗?有的人能回答出CPU密集型和IO密集型,但问他哪些类型是CPU密集型、哪些是IO密集型?分别举两个例子,此时很多人都会慌的。

下面,我们来说说CPU密集型和IO密集型:

CPU密集型:这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

比如:像加解密,压缩、计算等一系列需要大量耗费 CPU 资源的任务,大部分场景下都是纯 CPU 计算。

IO密集型:这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占 用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 :

核心线程数=CPU核心数量*2

比如:像 MySQL 数据库、文件的读写、网络通信等任务,这类任务不会特别消耗 CPU 资源,但是 IO 操作比较耗时,会占用比较多时间。

另外,线程的平均工作时间所占比例越高,就需要越少的线程;线程的平均等待时间所占比例越高,就需要越多的线程;

以上只是理论值,实际项目中建议在本地或者测试环境进行多次调优,找到相对理想的值大小。

线程是如何复用的

关于这个问题,目前我见过的,只有两个人能说个大概。

我们都知道,继承Thread类或者实现Runnable接口,然后调用其start()方法就可以启动线程了,但是如果调用run()方法就和调用普通方法一样。

我们来看看,线程池中,我们提交的线程实例(任务),在线程池中到底是怎么被执行的。

线程池中有个Worker的角色,我们调用execute(Runnable tak)时候,会创建一个Worker:

我们先来看看这个Worker是怎么定义的:

private final class Worker    extends AbstractQueuedSynchronizer    implements Runnable{        private static final long serialVersionUID = 6138294804551838833L;        final Thread thread;    Runnable firstTask;    volatile long completedTasks;    // Worker 只有这一个构造方法,传入 firstTask    Worker(Runnable firstTask) {        setState(-1);         this.firstTask = firstTask;        // 调用 ThreadFactory 来创建一个新的线程,这里创建的线程到时候用来执行任务        // 我们发现创建线程的时候传入的值是this,我们知道创建线程可以通过继承Runnable的方法,        // Worker继承了Runnable,并且下面重写了run()方法        this.thread = getThreadFactory().newThread(this);    }    // 由上面创建线程时传入的this,上面的thread启动后,会执行这里的run()方法,并且此时runWorker传入的也是this    public void run() {        runWorker(this);    }}

Worker继承了Runnable,并且下面重写了run()方法,这时候我们调用new Worker().start()方法后,就会调用Worker类中的run()方法。

此时的Worker和我们平时的Thread类就类似了

好了,我们再回到前面的说的execute()方法中来。

public void execute(Runnable command) {     int c = ctl.get();    // 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务,    // 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask)    if (workerCountOf(c) < corePoolSize) {        // 添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就可以返回了        // 至于执行的结果,到时候会包装到 FutureTask 中。        // 这里的true代表当前线程数小于corePoolSize,表示以corePoolSize为线程数界限        if (addWorker(command, true))            return;        c = ctl.get();    }    // 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了    // 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略        if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    // 如果 workQueue 队列满了,那么进入到这个分支    // 这里的false代表当前线程数大于corePoolSize,表示以 maximumPoolSize 为界创建新的 worker    // 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略    else if (!addWorker(command, false))        reject(command);}

这段代码里我们看到了很多地方都在调用接着addWorker()方法,我们来看看这个方法(方法内容有点多):

private boolean addWorker(Runnable firstTask, boolean core) {    //相当于goto,虽然不建议滥用,看看大神们是如何使用吧    retry:    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);        // 如果线程池已关闭,并满足以下条件之一,那么不创建新的 worker:        // 1. 线程池状态大于 SHUTDOWN,其实也就是 STOP, TIDYING, 或 TERMINATED        // 2. firstTask != null        // 3. workQueue.isEmpty()        if (rs >= SHUTDOWN &&            ! (rs == SHUTDOWN &&               firstTask == null &&               ! workQueue.isEmpty()))            return false;        for (;;) {            int wc = workerCountOf(c);            //这里就是通过core参数对当前线程数的判断            if (wc >= CAPACITY ||                wc >= (core ? corePoolSize : maximumPoolSize))                return false;            if (compareAndIncrementWorkerCount(c))                break retry;            c = ctl.get();            if (runStateOf(c) != rs)                continue retry;            // else CAS failed due to workerCount change; retry inner loop        }    }    /*     * 到这里,我们认为在当前这个时刻,可以开始创建线程来执行任务了,     */    // worker 是否已经启动    boolean workerStarted = false;    // 是否已将这个 worker 添加到 workers 这个 HashSet 中    boolean workerAdded = false;    Worker w = null;    try {        final ReentrantLock mainLock = this.mainLock;        // 把 firstTask 传给 worker 的构造方法        w = new Worker(firstTask);        // 取 worker 中的线程对象,之前说了,Worker的构造方法会调用 ThreadFactory 来创建一个新的线程        final Thread t = w.thread;        if (t != null) {            // 这个是整个类的全局锁,因为关闭一个线程池需要这个锁,至少我持有锁的期间,线程池不会被关闭            mainLock.lock();            try {                int c = ctl.get();                int rs = runStateOf(c);                // 小于 SHUTTDOWN 那就是 RUNNING                // 如果等于 SHUTDOWN,前面说了,不接受新的任务,但是会继续执行等待队列中的任务                if (rs < SHUTDOWN ||                    (rs == SHUTDOWN && firstTask == null)) {                    // worker 里面的 thread 可不能是已经启动的                    if (t.isAlive())                        throw new IllegalThreadStateException();                    // 加到 workers 这个 HashSet 中                    workers.add(w);                    int s = workers.size();                    // largestPoolSize 用于记录 workers 中的个数的最大值                    // 因为 workers 是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值                    if (s > largestPoolSize)                        largestPoolSize = s;                    workerAdded = true;                }            } finally {                mainLock.unlock();            }            // 添加成功的话,启动这个线程            if (workerAdded) {                // 启动线程,最重要的就是这里,下面我们会讲解如何执行任务                t.start();                workerStarted = true;            }        }    } finally {        // 如果线程没有启动,需要做一些清理工作,如前面 workCount 加了 1,将其减掉        if (! workerStarted)            addWorkerFailed(w);    }    // 返回线程是否启动成功    return workerStarted;}

请注意:t.start();这里的t其实就是我们创建的Worker对象,就回到我们前面说的,调用start()方法后,会执行到他的run()方法中来,我们继续看Worker中的run()方法。

//Worker的run方法public void run() {      runWorker(this);}final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        //取出需要执行的任务,        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            //如果task不是null,或者去队列中取任务,注意这里会阻塞,后面会分析getTask方法            while (task != null || (task = getTask()) != null) {               //这个lock在这里是为了如果线程被中断,那么会抛出InterruptedException,而退出循环,结束线程                w.lock();                //判断线程是否需要中断                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                   //任务开始执行前的hook方法                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                                                task.run();//这里就是直接调用run 方法                                            } catch (RuntimeException x) {                        thrown = x; throw x;                    } finally {                       ////任务开始执行后的hook方法                        afterExecute(task, thrown);                    }                } finally {                    task = null;//清空task                    w.completedTasks++;//完成数添加                    w.unlock();                }            }            completedAbruptly = false;        } finally {           //Worker退出           processWorkerExit(w, completedAbruptly);    }}

这里注意这行代码:

while (task != null || (task = getTask()) != null)

注释中已经说清楚了:如果task不是null,或者去队列中取任务,注意这里会阻塞。

另外,一个注意点:

task.run();

这个其实就是调用我们我们传入到execute(Runnable task)中的参数,也就是说,我们创建的Runnable对象,根本就不会去调用其start()方法,而是直接调用其run()方法。

我们在看看getTask()方法到底是做什么?

// 此方法有三种可能:// 1. 阻塞直到获取到任务返回。我们知道,默认 corePoolSize 之内的线程是不会被回收的,//      它们会一直等待任务// 2. 超时退出。keepAliveTime 起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭// 3. 如果发生了以下条件,此方法必须返回 null://    - 池中有大于 maximumPoolSize 个 workers 存在(通过调用 setMaximumPoolSize 进行设置)//    - 线程池处于 SHUTDOWN,而且 workQueue 是空的,前面说了,这种不再接受新的任务//    - 线程池处于 STOP,不仅不接受新的线程,连 workQueue 中的线程也不再执行private Runnable getTask() {    boolean timedOut = false;     retry:    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);        // 两种可能        // 1. rs == SHUTDOWN && workQueue.isEmpty()        // 2. rs >= STOP        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {            // CAS 操作,减少工作线程数            decrementWorkerCount();            return null;        }        boolean timed;      // Are workers subject to culling?        for (;;) {            int wc = workerCountOf(c);            // 允许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭            timed = allowCoreThreadTimeOut || wc > corePoolSize;            if (wc <= maximumPoolSize && ! (timedOut && timed))                break;            if (compareAndDecrementWorkerCount(c))                return null;            c = ctl.get();  // Re-read ctl            // compareAndDecrementWorkerCount(c) 失败,线程池中的线程数发生了改变            if (runStateOf(c) != rs)                continue retry;            // else CAS failed due to workerCount change; retry inner loop        }        // wc <= maximumPoolSize 同时没有超时        try {            // 到 workQueue 中获取任务            // 如果timed=wc > corePoolSize=false,我们知道核心线程数之内的线程永远不会销毁,则执行workQueue.take();我前面文章中讲过,take()方法是阻塞方法,如果队里中有任务则取到任务,如果没有任务,则一直阻塞在这里知道有任务被唤醒。            //如果timed=wc > corePoolSize=true,这里将执行超时策略,poll(keepAliveTime, TimeUnit.NANOSECONDS)会阻塞keepAliveTime这么长时间,没超时就返回任务,超时则返回null.            Runnable r = timed ?                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                workQueue.take();            if (r != null)                return r;            timedOut = true;        } catch (InterruptedException retry) {            // 如果此 worker 发生了中断,采取的方案是重试            // 解释下为什么会发生中断,这个读者要去看 setMaximumPoolSize 方法,            // 如果开发者将 maximumPoolSize 调小了,导致其小于当前的 workers 数量,            // 那么意味着超出的部分线程要被关闭。重新进入 for 循环,自然会有部分线程会返回 null            timedOut = false;        }    }}

到这里,大家应该都知道了,线程池中到底是如何复用线程的吧。

我来总结一下:

线程池中,维护了一个Worker的内部类,其中Worker也实现了Runnable接口,重写了run()方法,在调用这个run()时候,会采用类似于死循环的while方式重复使用这个worker去获取任务并执行我们传入execute(Runnable task)方法的参数task(task存放在阻塞队列里)。

标签: #mysql多线程读写