前言:
如今朋友们对“mysql多线程读写”大约比较着重,咱们都想要学习一些“mysql多线程读写”的相关内容。那么小编也在网上搜集了一些对于“mysql多线程读写””的相关文章,希望小伙伴们能喜欢,小伙伴们快快来学习一下吧!hi 大家好,我是田哥
最近,从去年到现在,我给小伙伴们做模拟面试已有100多场。有时候我也在想,现在真的很卷吗?大部分人第一次模拟面试结束,给我的感觉不像大家说的那么卷。
奇怪的现象
我在做模拟面试的过程中,无意中发现一个现象,就是如果问八股文,问知识点,在校生或刚刚毕业不久的,回答的斗殴挺好的,往往是工作三五年的在这一块非常欠缺。
我也私下问过很多人,为什么这种现象,主要原因差不多就是:
每天太忙,没时间学习年纪大了,记不住就是不要想学
工作三五年的也不是就真的没有优点,他们的优点就是有大量的项目经验。换着问项目业务和设计之类的,他们明显占优势,但,问他们稍微往深地问,就会懵逼。比如:你们项目中使用到了Redis,用来干嘛,他们能立马回答上来。
如果继续追问:如何保证Redis和数据库中的数据一致性?然后就会稀里糊涂地回答。还有就是问他们Redis的持久化方式使用的是哪种?“这个没注意,不是我安装的”,继续问:那你觉得哪种方式更好,答案各种各样的都有。
总结起来就亮点:
学生或新人,八股文占优势(也有一部分啥都不知道,啥也没去背的)。三五年有项目经验,但大部分都停留在用上面,稍微问题问题就容易暴露自己的家点(也有一小分部知道的比较多)
在模拟面试的时候,我问过很多人是否在项目中用过并发编程的相关技术,用了什么?
基本上都回答:用过线程池
好吧,接下来,那我们就以一个线程池的面试题来对比以上两类人的回答。
聊聊线程池线程池核心参数
学生或新人:基本上都是一口气就能把这些参数回答上来,另外有部分优秀的会对这些参数做一个解释。
三五年的:部分人能全部回答出来,一部分人能说出核心线程数、最大线程数,其他参数就吱吱呜呜地回答,还有一部分就是完全一脸懵逼。
我们来看看到底有哪些参数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {corePoolSize:核心线程数maximumPoolSize:最大线程数keepAliveTime:空闲时间unit:空闲时间单位workQueue:阻塞队列threadFactory:线程工厂handler:拒绝策略
问核心参数时,至少要回答corePoolSize、maximumPoolSize、workQueue、handler。还是建议全部回答吧,反正也没几个参数。
线程池原理
学生或新人:按照八股文来回答一番,甚至有的在回答核心参数的时候,顺带着就会把线程池的原理给说了(刚刚遇到能说的,就顺带着多说点)。
三五年的:有部分人也会按照八股文上的来回答,有部分人是吱吱呜呜的,不知道在说啥,还有一分部人就是瞎说咯。
关于线程池原理,我这里借用网上一张图:
如果看图记不住,我也有办法,我们可以使用生活案例来理解。
公司A:线程池公司A自己的员工:核心线程数公司A接到的订单:我们的业务线程公司A的仓库:阻塞队列公司B派的人:最大线程数
开始表演:
公司A接到订单,先给自己员工处理,如果自己员工处理不来了,就丢到公司A仓库里,如果仓库堆满了,这时候就去找公司B,公司B就派人(最大线程数)到公司A,订单持续爆棚,公司B派来的员工和公司A的员工都搞不来了,那就只能把后面来的订单拒绝掉(拒绝策略)。如果公司B的员工在公司A里吧任务做完了,闲着没事了,公司A也不会立马就让人家回公司B,毕竟人员来回还是有成本的,所以,可以适当的给点时间(keepAliveTime),是在没有什么任务了,那你们还是回公司B吧。
好了,按照这个故事去编就行了,也可以模仿着编其他故事,至少让面试官觉得你不是在背八股文。
核心线程数量设置
学生或新人:就算知道也是被八股文的,但很遗憾,问过十多个人,回答上来的应该占30%左右。
三五年的:问过几十个人,回答上来的寥寥无几,甚是遗憾。在项目中敢用线程池,却不知道如何设置核心线程数,这不是瞎搞吗?有的人能回答出CPU密集型和IO密集型,但问他哪些类型是CPU密集型、哪些是IO密集型?分别举两个例子,此时很多人都会慌的。
下面,我们来说说CPU密集型和IO密集型:
CPU密集型:这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
比如:像加解密,压缩、计算等一系列需要大量耗费 CPU 资源的任务,大部分场景下都是纯 CPU 计算。
IO密集型:这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占 用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 :
核心线程数=CPU核心数量*2。
比如:像 MySQL 数据库、文件的读写、网络通信等任务,这类任务不会特别消耗 CPU 资源,但是 IO 操作比较耗时,会占用比较多时间。
另外,线程的平均工作时间所占比例越高,就需要越少的线程;线程的平均等待时间所占比例越高,就需要越多的线程;
以上只是理论值,实际项目中建议在本地或者测试环境进行多次调优,找到相对理想的值大小。
线程是如何复用的
关于这个问题,目前我见过的,只有两个人能说个大概。
我们都知道,继承Thread类或者实现Runnable接口,然后调用其start()方法就可以启动线程了,但是如果调用run()方法就和调用普通方法一样。
我们来看看,线程池中,我们提交的线程实例(任务),在线程池中到底是怎么被执行的。
线程池中有个Worker的角色,我们调用execute(Runnable tak)时候,会创建一个Worker:
我们先来看看这个Worker是怎么定义的:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; volatile long completedTasks; // Worker 只有这一个构造方法,传入 firstTask Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; // 调用 ThreadFactory 来创建一个新的线程,这里创建的线程到时候用来执行任务 // 我们发现创建线程的时候传入的值是this,我们知道创建线程可以通过继承Runnable的方法, // Worker继承了Runnable,并且下面重写了run()方法 this.thread = getThreadFactory().newThread(this); } // 由上面创建线程时传入的this,上面的thread启动后,会执行这里的run()方法,并且此时runWorker传入的也是this public void run() { runWorker(this); }}
Worker继承了Runnable,并且下面重写了run()方法,这时候我们调用new Worker().start()方法后,就会调用Worker类中的run()方法。
此时的Worker和我们平时的Thread类就类似了
好了,我们再回到前面的说的execute()方法中来。
public void execute(Runnable command) { int c = ctl.get(); // 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务, // 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask) if (workerCountOf(c) < corePoolSize) { // 添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就可以返回了 // 至于执行的结果,到时候会包装到 FutureTask 中。 // 这里的true代表当前线程数小于corePoolSize,表示以corePoolSize为线程数界限 if (addWorker(command, true)) return; c = ctl.get(); } // 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了 // 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果 workQueue 队列满了,那么进入到这个分支 // 这里的false代表当前线程数大于corePoolSize,表示以 maximumPoolSize 为界创建新的 worker // 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略 else if (!addWorker(command, false)) reject(command);}
这段代码里我们看到了很多地方都在调用接着addWorker()方法,我们来看看这个方法(方法内容有点多):
private boolean addWorker(Runnable firstTask, boolean core) { //相当于goto,虽然不建议滥用,看看大神们是如何使用吧 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果线程池已关闭,并满足以下条件之一,那么不创建新的 worker: // 1. 线程池状态大于 SHUTDOWN,其实也就是 STOP, TIDYING, 或 TERMINATED // 2. firstTask != null // 3. workQueue.isEmpty() if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //这里就是通过core参数对当前线程数的判断 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } /* * 到这里,我们认为在当前这个时刻,可以开始创建线程来执行任务了, */ // worker 是否已经启动 boolean workerStarted = false; // 是否已将这个 worker 添加到 workers 这个 HashSet 中 boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; // 把 firstTask 传给 worker 的构造方法 w = new Worker(firstTask); // 取 worker 中的线程对象,之前说了,Worker的构造方法会调用 ThreadFactory 来创建一个新的线程 final Thread t = w.thread; if (t != null) { // 这个是整个类的全局锁,因为关闭一个线程池需要这个锁,至少我持有锁的期间,线程池不会被关闭 mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); // 小于 SHUTTDOWN 那就是 RUNNING // 如果等于 SHUTDOWN,前面说了,不接受新的任务,但是会继续执行等待队列中的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // worker 里面的 thread 可不能是已经启动的 if (t.isAlive()) throw new IllegalThreadStateException(); // 加到 workers 这个 HashSet 中 workers.add(w); int s = workers.size(); // largestPoolSize 用于记录 workers 中的个数的最大值 // 因为 workers 是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 添加成功的话,启动这个线程 if (workerAdded) { // 启动线程,最重要的就是这里,下面我们会讲解如何执行任务 t.start(); workerStarted = true; } } } finally { // 如果线程没有启动,需要做一些清理工作,如前面 workCount 加了 1,将其减掉 if (! workerStarted) addWorkerFailed(w); } // 返回线程是否启动成功 return workerStarted;}
请注意:t.start();这里的t其实就是我们创建的Worker对象,就回到我们前面说的,调用start()方法后,会执行到他的run()方法中来,我们继续看Worker中的run()方法。
//Worker的run方法public void run() { runWorker(this);}final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //取出需要执行的任务, Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //如果task不是null,或者去队列中取任务,注意这里会阻塞,后面会分析getTask方法 while (task != null || (task = getTask()) != null) { //这个lock在这里是为了如果线程被中断,那么会抛出InterruptedException,而退出循环,结束线程 w.lock(); //判断线程是否需要中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //任务开始执行前的hook方法 beforeExecute(wt, task); Throwable thrown = null; try { task.run();//这里就是直接调用run 方法 } catch (RuntimeException x) { thrown = x; throw x; } finally { ////任务开始执行后的hook方法 afterExecute(task, thrown); } } finally { task = null;//清空task w.completedTasks++;//完成数添加 w.unlock(); } } completedAbruptly = false; } finally { //Worker退出 processWorkerExit(w, completedAbruptly); }}
这里注意这行代码:
while (task != null || (task = getTask()) != null)
注释中已经说清楚了:如果task不是null,或者去队列中取任务,注意这里会阻塞。
另外,一个注意点:
task.run();
这个其实就是调用我们我们传入到execute(Runnable task)中的参数,也就是说,我们创建的Runnable对象,根本就不会去调用其start()方法,而是直接调用其run()方法。
我们在看看getTask()方法到底是做什么?
// 此方法有三种可能:// 1. 阻塞直到获取到任务返回。我们知道,默认 corePoolSize 之内的线程是不会被回收的,// 它们会一直等待任务// 2. 超时退出。keepAliveTime 起作用的时候,也就是如果这么多时间内都没有任务,那么应该执行关闭// 3. 如果发生了以下条件,此方法必须返回 null:// - 池中有大于 maximumPoolSize 个 workers 存在(通过调用 setMaximumPoolSize 进行设置)// - 线程池处于 SHUTDOWN,而且 workQueue 是空的,前面说了,这种不再接受新的任务// - 线程池处于 STOP,不仅不接受新的线程,连 workQueue 中的线程也不再执行private Runnable getTask() { boolean timedOut = false; retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 两种可能 // 1. rs == SHUTDOWN && workQueue.isEmpty() // 2. rs >= STOP if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // CAS 操作,减少工作线程数 decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); // 允许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭 timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl // compareAndDecrementWorkerCount(c) 失败,线程池中的线程数发生了改变 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } // wc <= maximumPoolSize 同时没有超时 try { // 到 workQueue 中获取任务 // 如果timed=wc > corePoolSize=false,我们知道核心线程数之内的线程永远不会销毁,则执行workQueue.take();我前面文章中讲过,take()方法是阻塞方法,如果队里中有任务则取到任务,如果没有任务,则一直阻塞在这里知道有任务被唤醒。 //如果timed=wc > corePoolSize=true,这里将执行超时策略,poll(keepAliveTime, TimeUnit.NANOSECONDS)会阻塞keepAliveTime这么长时间,没超时就返回任务,超时则返回null. Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { // 如果此 worker 发生了中断,采取的方案是重试 // 解释下为什么会发生中断,这个读者要去看 setMaximumPoolSize 方法, // 如果开发者将 maximumPoolSize 调小了,导致其小于当前的 workers 数量, // 那么意味着超出的部分线程要被关闭。重新进入 for 循环,自然会有部分线程会返回 null timedOut = false; } }}
到这里,大家应该都知道了,线程池中到底是如何复用线程的吧。
我来总结一下:
线程池中,维护了一个Worker的内部类,其中Worker也实现了Runnable接口,重写了run()方法,在调用这个run()时候,会采用类似于死循环的while方式重复使用这个worker去获取任务并执行我们传入execute(Runnable task)方法的参数task(task存放在阻塞队列里)。
标签: #mysql多线程读写