前言:
此刻兄弟们对“netty对比akka”都比较珍视,大家都想要知道一些“netty对比akka”的相关内容。那么小编也在网摘上搜集了一些有关“netty对比akka””的相关文章,希望看官们能喜欢,朋友们快快来了解一下吧!7.线程交互
7.1交互方式
线程交互也就是线程直接的通信,最直接的办法就是线程直接直接通信传值,而间接方式则是通过共享变量来达到彼此的交互。
等待:释放对象锁,允许其他线程进入同步块通知:重新获取对象锁,继续执行中断:状态交互,通知其他线程进入中断织入:合并线程,多个线程合并为一个
7.2线程安全
我们最关注的还是通过共享变量来达到交互的方式。线程如果都各自干活互不搭理的话自然相安无事,但多数情况下线程直接需要打交道,而且需要分享共享资源,那么这个时候最核心的就是线程安全了。
什么是线程安全?
当多个线程访问同一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替运行,也不需要进行额外的同步,或者在调用方进行任何其他的协调操作,调用这个对象的行为都可以获取正确的结果,那这个对象是线程安全的。(摘自《深入Java虚拟机》)
如何保证线程安全?
我们最早接触线程安全可能是JDK提供的一些号称线程安全的容器,比如Vetor较ArrayList是线程安全,HashTable较HashMap是线程安全?其实线程安全类并不代表也不等同线程安全的程序,而线程不安全的类同样可以完成线程安全的程序。我们关注的也就是写出线程安全的程序,那么如何写出线程安全的代码呢?下面列举了线程安全的主要设计技术:
无状态
这个有点函数式编程的味道,下文并发模式会介绍到,总之就是线程只有入参和局部变量,如果变量是引用的话,确保变量的创建和调用生命周期都发生在线程栈内,就可以确保线程安全。
无共享状态
完全要求线程无状态比较难实现,必要的状态是无法避免的,那么我们就必须维护不同线程之间的不同状态,这可是个麻烦事。幸好我们有ThreadLocal这个神器,该对象跟当前线程绑定,而且只对当前线程可见,完美解决了无共享状态的问题。
不可变状态
最后实在没办法避免状态共享,在线程之间共享状态,最怕的就是无法确保能维护好正确的读写顺序,而且多线程确实也无法正确维护好这个共享变量。那么我们索性粗暴点,把共享的状态定位不可变,比如价格final修饰一下,这样就达到安全状态共享。
消息传递
一个线程通常也不是所有步骤都需要共享状态,而是部分环节才需要的,那么我们把共享状态的代码拆开,无共享状态的那部分自然不用关心,而共享状态的小段代码,则通过加入消息组件来传递状态。这个设计到并发模式的流水线编程模式,下文并发模式会重点介绍。
线程安全容器
JUC里面提供大量的并发容器,涉及到线程交互的时候,使用安全容器可以避免大部分的错误,而且大大降低了代码的复杂度。
通过synchronized给方法加上内置锁来实现线程安全的类如Vector,HashTable,StringBufferAtomicXXX如AtomicIntegerConcurrentXXX如ConcurrentHashMapBlockingQueue/BlockingDequeCopyOnWriteArrayList/CopyOnWriteArraySetThreadPoolExecutor
synchronized同步
该关键字确保代码块同一时间只被一个线程执行,在这个前提下再设计符合线程安全的逻辑
其作用域为
对象:对象加锁,进入同步代码块之前获取对象锁实例方法:对象加锁,执行实例方法前获取对象实例锁类方法:类加锁,执行类方法前获取类锁
volatile约束
volatile确保每次操作都能强制同步CPU缓存和主存直接的变量。而且在编译期间能阻止指令重排。读写并发情况下volatile也不能确保线程安全,上文解析内存模型的时候有提到过。
这节我们论述了编写线程安全程序的指导思想,其中我们提到了JDK提供的JUC工具包,下一节将重点介绍并发编程常用的趁手工具。
8.线程工具
前文我们介绍了内存理论和线程的一些特征,大家都知道并发编程容易出错,而且出了错还不好调试排查,幸好JDK里面集成了大量实用的API工具,我们能熟悉这些工具,写起并发程序来也事半功倍。
工具篇其实就是对锁的不断变种,适应更多的开发场景,提高性能,提供更方便的工具,从最粗暴的同步修饰符,到灵活的可重入锁,到宽松的条件,接着到允许多个线程访问的信号量,最后到读写分离锁。
8.1同步控制
由于大多数的并发场景都是需要访问到共享资源的,为了保证线程安全,我们不得已采用锁的技术来做同步控制,这节我们介绍的是适用不同场景各种锁技术。
ReentrantLock
可重入互斥锁具有与使用synchronized的隐式监视器锁具有相同的行为和语义,但具有更好扩展功能。
ReentrantLock由最后成功锁定的线程拥有,而且还未解锁。当锁未被其他线程占有时,线程调用lock()将返回并且成功获取锁。如果当前线程已拥有锁,则该方法将立即返回。这可以使用方法isHeldByCurrentThread()和getHoldCount()来检查。
构造函数接受可选的fairness参数。当设置为true时,在竞争条件下,锁定有利于赋予等待时间最长线程的访问权限。否则,锁将不保证特定的访问顺序。在多线程访问的情况,使用公平锁比默认设置,有着更低的吞吐量,但是获得锁的时间比较小而且可以避免等待锁导致的饥饿。但是,锁的公平性并不能保证线程调度的公平性。因此,使用公平锁的许多线程中的一个可以连续多次获得它,而其他活动线程没有进展并且当前没有持有锁。不定时的tryLock()方法不遵循公平性设置。即使其他线程正在等待,如果锁可用,它也会成功。
任意指定锁的起始位置中断响应锁申请等待限时tryLock()公平锁
Condition
Condition从拥有监控方法(wait,notify,notifyAll)的Object对象中抽离出来成为独特的对象,高效的让每个对象拥有更多的等待线程。和锁对比起来,如果说用Lock代替synchronized,那么Condition就是用来代替Object本身的监控方法。
Condition实例跟Object本身的监控相似,同样提供wait()方法让调用的线程暂时挂起让出资源,知道其他线程通知该对象转态变化,才可能继续执行。Condition实例来源于Lock实例,通过Lock调用newCondition()即可。Condition较Object原生监控方法,可以保证通知顺序。
Semaphore
锁和同步块同时只能允许单个线程访问共享资源,这个明显有些单调,部分场景其实可以允许多个线程访问,这个时候信号量实例就派上用场了。信号量逻辑上维持了一组许可证, 线程调用acquire()阻塞直到许可证可用后才能执行。 执行release()意味着释放许可证,实际上信号量并没有真正的许可证,只是采用了计数功能来实现这个功能。
ReadWriteLock
顾名思义读写锁将读写分离,细化了锁的粒度,照顾到性能的优化。
CountDownLatch
这个锁有点“关门放狗”的意思,尤其在我们压测的时候模拟实时并行请求,该实例将线程积累到指定数量后,调用countDown()方法让所有线程同时执行。
CyclicBarrier
CyclicBarrier是加强版的CountDownLatch,上面讲的是一次性“关门放狗”,而循环栅栏则是集齐了指定数量的线程,在资源都允许的情况下同时执行,然后下一批同样的操作,周而复始。
LockSupport
LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。 LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。因为park() 和 unpark()有许可的存在;调用 park() 的线程和另一个试图将其 unpark() 的线程之间的竞争将保持活性。
8.2线程池
线程池总览
线程多起来的话就需要管理,不然就会乱成一锅。我们知道线程在物理上对应的就是栈里面的一段内存,存放着局部变量的空间和待执行指令集。如果每次执行都要从头初始化这段内存,然后再交给CPU执行,效率就有点低了。假如我们知道该段栈内存会被经常用到,那我们就不要回收,创建完就让它在栈里面呆着,要用的时候取出来,用完换回去,是不是就省了初始化线程空间的时间,这样是我们搞出线程池的初衷。
其实线程池很简单,就是搞了个池子放了一堆线程。既然我们搞线程池是为了提高效率,那就要考虑线程池放多少个线程比较合适,太多了或者太少了有什么问题,怎么拒绝多余的请求,除了异常怎么处理。首先我们来看跟线程池有关的一张类图。
线程池归结起来就是这几个类的使用技巧了,重点关注ThreadPoolExecutor和Executors即可。
创建线程池
万变不离其宗,创建线程池的各种马甲方法最后都是调用到这方法里面,包含核心线程数,最大线程数,线程工厂,拒绝策略等参数。其中线程工厂则可以实现自定义创建线程的逻辑。
public interface ThreadFactory { Thread newThread(Runnable r);}
创建的核心构造方法ThreadPoolExecutor.java 1301
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
拒绝策略包含:
/** 实际上并未真正丢弃任务,但是线程池性能会下降 * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded. */ public static class CallerRunsPolicy implements RejectedExecutionHandler /** 粗暴停止抛异常 * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */ public static class AbortPolicy implements RejectedExecutionHandler /** 悄无声息的丢弃拒绝的任务 * A handler for rejected tasks that silently discards the * rejected task. */ public static class DiscardPolicy implements RejectedExecutionHandler /** 丢弃最老的请求 * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded. */ public static class DiscardOldestPolicy implements RejectedExecutionHandler
包括Executors.java中的创建线程池的方法,具体实现也是通过ThreadPoolExecutor来创建的。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));}
调用线程池
ThreadPoolExecutor.java 1342
/** 同步执行线程,出现异常打印堆栈信息 * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */public void execute(Runnable command)/*** 异步提交线程任务,出现异常无法同步追踪堆栈,本质上也是调用execute()方法*/public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask;}
线程池优化
线程池已经是我们使用线程的一个优化成果了,而线程池本身的优化其实就是根据实际业务选择好不同类型的线程池,预估并发线程数量,控制好线程池预留线程数(最大线程数一般设为2N+1最好,N是CPU核数),这些涉及CPU数量,核数还有具体业务。
另外我们还注意到ForkJoinPool继承了AbstractExecutorService,这是在JDK7才加上去的,目的就是提高任务派生出来更多任务的执行效率,由上图的继承关系我们可以知道跟普通线程池最大的差异是执行的任务类型不同。
public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); externalPush(task);}public void execute(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = new ForkJoinTask.RunnableExecuteAction(task); externalPush(job); }
8.3并发容器
其实我们日常开发大多数并发场景直接用JDK 提供的线程安全数据结构足矣,下面列举了常用的列表,集合等容器,具体就不展开讲,相信大家都用得很熟悉了。
ConcurrentHashMapCopyOnWriteArrayListConcurrentLinkedQueueBlockingQueueConcurrentSkipListMapVectorHashTable…9.线程调优
9.1性能指标
回想一下,当我们在谈性能优化的时候,我们可能指的是数据库的读写次数,也可能指网站的响应时间。通常我们会用QPS,TPS,RT,并发数,吞吐量,更进一步的还会对比CPU负载来衡量一个系统的性能。
当然我们知道一个系统的吞吐量和响应时间跟外部网络,分布式架构等都存在强关联,性能优化也跟各级缓存设计,数据冗余等架构有很大关系,假设其他方面我们都已经完成了,聚焦到本文我们暂时关心的是单节点的性能优化。毕竟一屋不扫何以扫天下,整体系统的优化也有赖于各个节点的调优。从感官上来谈,当请求量很少的时候,我们可以很轻松的通过各种缓存优化来提高响应时间。但是随着用户激增,请求次数的增加,我们的服务也对应着需要并发模型来支撑。但是一个节点的并发量有个上限,当达到这个上限后,响应时间就会变长,所以我们需要探索并发到什么程度才是最优的,才能保证最高的并发数,同时响应时间又能保持在理想情况。由于我们暂时不关注节点以外的网络情况,那么下文我们特指的RT是指服务接收到请求后,完成计算,返回计算结果经历的时间。
单线程
单线程情况下,服务接收到请求后开始初始化,资源准备,计算,返回结果,时间主要花在CPU计算和CPU外的IO等待时间,多个请求来也只能排队一个一个来,那么RT计算如下
RT = T(cpu) + T(io)
QPS = 1000ms / RT
多线程
单线程情况很好计算,多线程情况就复杂了,我们目标是计算出最佳并发量,也就是线程数N
单核情况:N = [T(cpu) + T(io)] / T(cpu)
M核情况:N = [T(cpu) + T(io)] / T(cpu) * M
由于多核情况CPU未必能全部使用,存在一个资源利用百分比P
那么并发的最佳线程数 N = [T(cpu) + T(io)] / T(cpu) M P
吞吐量
我们知道单线程的QPS很容易算出来,那么多线程的QPS
QPS = 1000ms / RT N = 1000ms / T(cpu) + T(io) [T(cpu) + T(io)] / T(cpu) M P= 1000ms / T(cpu) M P
在机器核数固定情况下,也即是并发模式下最大的吞吐量跟服务的CPU处理时间和CPU利用率有关。CPU利用率不高,就是通常我们听到最多的抱怨,压测时候qps都打满了,但是cpu的load就是上不去。并发模型中多半个共享资源有关,而共享资源又跟锁息息相关,那么大部分时候我们想对节点服务做性能调优时就是对锁的优化,这个下一节会提到。
前面我们是假设机器核数固定的情况下做优化的,那假如我们把缓存,IO,锁都优化了,剩下的还有啥空间去突破呢?回想一下我们谈基础理论的时候提到的Amdahl定律,公式之前已经给出,该定律想表达的结论是随着核数或者处理器个数的增加,可以增加优化加速比,但是会达到上限,而且增加趋势愈发不明显。
9.2锁优化
说真的,我们并不喜欢锁的,只不过由于临界资源的存在不得已为之。如果业务上设计能避免出现临界资源,那就没有锁优化什么事了。但是,锁优化的一些原则还是要说一说的。
时间
既然我们并不喜欢锁,那么就按需索取,只在核心的同步块加锁,用完立马释放,减少锁定临界区的时间,这样就可以把资源竞争的风险降到最低。
粒度
进一步看,有时候我们核心同步块可以进一步分离,比如只读的情况下并不需要加锁,这时候就可以用读写锁各自的读写功能。
还有一种情况,有时候我们反而会小心翼翼的到处加锁来防止意外出现,可能出现三个同步块加了三个锁,这也造成CPU的过多停顿,根据业务其实可以把相关逻辑合并起来,也就是锁粗化。
锁的分离和粗化具体还得看业务如何操作。
尺度
除了锁暂用时间和粒度外,还有就是锁的尺度,还是根据业务来,能用共享锁定的情况就不要用独享锁。
死锁
这个不用说都知道,死锁防不胜防,我们前面也介绍很多现成的工具,比如可重入锁,还有线程本地变量等方式,都可以一定程度避免死锁。
9.3JVM锁机制
我们在代码层面把锁的应用都按照安全法则做到最好了,那接下来要做的就是下钻到JVM级别的锁优化。具体实现原理我们暂不展开,后续有机会再搞个专题写写JVM锁实现。
自旋锁(Spin Lock)
自旋锁的原理非常简单。如果持有锁的线程可以在短时间内释放锁资源,那么等待竞争锁的那些线程不需要在内核状态和用户状态之间进行切换。 它只需要等待,并且锁可以在释放锁之后立即获得锁。这可以避免消耗用户线程和内核切换。
但是,自旋锁让CPU空等着什么也不干也是一种浪费。 如果自旋锁的对象一直无法获得临界资源,则线程也无法在没有执行实际计算的情况下一致进行CPU空转,因此需要设置自旋锁的最大等待时间。如果持有锁的线程在旋转等待的最大时间没有释放锁,则自旋锁线程将停止旋转进入阻塞状态。
JDK1.6开启自旋锁 -XX:+UseSpinning,1.7之后控制器收回到JVM自主控制
偏向锁(Biased Lock)
偏向锁偏向于第一个访问锁的线程,如果在运行过程中,同步锁只有一个线程访问,不存在多线程争用的情况,则线程是不需要触发同步的,这种情况下,就会给线程加一个偏向锁。如果在运行过程中,遇到了其他线程抢占锁,则持有偏向锁的线程会被挂起,JVM会消除它身上的偏向锁,将锁恢复到标准的轻量级锁。
JDK1.6开启自旋锁 -XX:+UseBiasedLocking,1.7之后控制器收回到JVM自主控制
轻量级锁(Lightweight Lock)
轻量级锁是由偏向锁升级来的,偏向锁运行在一个线程进入同步块的情况下,当第二个线程加入锁竞争的时候,偏向锁就会升级为轻量级锁。
重量级锁(Heavyweight Lock)
如果锁检测到与另一个线程的争用,则锁定会膨胀至重量级锁。也就是我们常规用的同步修饰产生的同步作用。
9.4无锁
最后其实我想说的是,虽然锁很符合我们人类的逻辑思维,设计起来也相对简单,但是摆脱不了临界区的限制。那么我们不妨换个思路,进入无锁的时间,也就是我们可能会增加业务复杂度的情况下,来消除锁的存在。
CAS策略
著名的CAS(Compare And Swap),是多线程中用于实现同步的原子指令。 它将内存位置的内容与给定值进行比较,并且只有它们相同时,才将该内存位置的内容修改为新的给定值。 这是作为单个原子操作完成的。 原子性保证了新值是根据最新信息计算出来的; 如果在此期间该值已被另一个线程更新,则写入将失败。 操作的结果必须表明它是否进行了替换; 这可以通过简单的Boolean来响应,或通过返回从内存位置读取的值(而不是写入它的值)来完成。
也就是一个原子操作包含了要操作的数据和给定认为正确的值进行对比,一致的话就继续,不一致则会重试。这样就在没有锁的情况下完成并发操作。
我们知道原子类 AtomicInteger内部实现的原理就是采用了CAS策略来完成的。
AtomicInteger.java 132
/** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update);}
类似的还有AtomicReference.java 115
/** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */public final boolean compareAndSet(V expect, V update) { return unsafe.compareAndSwapObject(this, valueOffset, expect, update);}
有兴趣的同学可以再了解一下Unsafe的实现,进一步可以了解Distuptor无锁框架。
10.并发模型
前面我们大费周章的从并发的基础概念到多线程的使用方法和优化技巧。但都是战术层面的,本节我们试着从战略的高度来扩展一下并发编程的世界。可能大多数情况下我们谈并发都会想到多线程,但是本节我们要打破这种思维,在完全不用搞多线程那一套的情况下实现并发。
首先我们用”多线程模式“来回顾前文所讲的所有关于Thread衍生出来的定义,开发和优化的技术。
多线程模式
单位线程完成完整的任务,也即是一条龙服务线程。
优势:映射现实单一任务,便于理解和编码劣势:
有状态多线程共享资源,导致资源竞争,死锁问题,线程等待阻塞,失去并发意义有状态多线程非阻塞算法,有利减少竞争,提升性能,但难以实现多线程执行顺序无法预知
流水线模型
介绍完传统多线程工作模式后,我们来学习另外一种并发模式,传统的多线程工作模式,理解起来很直观,接下来我们要介绍的另外一种并发模式看起来就不那么直观了。
流水线模型,特点是无状态线程,无状态也意味着无需竞争共享资源,无需等待,也就是非阻塞模型。流水线模型顾名思义就是流水线上有多个环节,每个环节完成自己的工作后就交给下一个环节,无需等待上游,周而复始的完成自己岗位上的一亩三分地就行。各个环节之间交付无需等待,完成即可交付。
而工厂的流水线也不止一条,所以有多条流水线同时工作。
不同岗位的生产效率是不一样的,所以不同流水线之间也可以发生协同。
我们说流水线模型也称为响应式模型或者事件驱动模型,其实就是流水线上上游岗位完成生产就通知下游岗位,所以完成了一个事件的通知,每完成一次就通知一下,就是响应式的意思。
流水线模型总体的思想就是纵向切分任务,把任务里面耗时过久的环节单独隔离出来,避免完成一个任务需要耗费等待的时间。在实现上又分为Actors和Channels模型
Actors该模型跟我们讲述的流水线模型基本一致,可以理解为响应式模型
Channels
由于各个环节直接不直接交互,所以上下游之间并不知道对方是谁,好比不同环节直接用的是几条公共的传送带来接收物品,各自只需要把完成后的半成品扔到传送带,即使后面流水线优化了,去掉中间的环节,对于个体岗位来说也是无感知的,它只是周而复始的从传送带拿物品来加工。
流水线的优缺点:
优势:
无共享状态:无需考虑资源抢占,死锁等问题独享内存:worker可以持有内存,合并多次操作到内存后再持久化,提升效率贴合底层:单线程模式贴合硬件运行流程,便于代码维护任务顺序可预知
劣势:
不够直观:一个任务被拆分为流水线上多个环节,代码层面难以直观理解业务逻辑
由于流水线模式跟人类的顺序执行思维不一样,比较费解,那么有没有办法让我们编码的时候像写传统的多线程代码一样,而运行起来又是流水线模式呢?答案是肯定的,比如基于Java的Akka/Reator/Vert.x/Play/Qbit框架,或者golang就是为流水线模式而生的并发语言,还有nodeJS等等。
流水线模型的开发实践可以参考流水线模型实践。
其实流水线模型背后用的也还是多线程来实现,只不过对于传统多线程模式下我们需要小心翼翼来处理跟踪资源共享问题,而流水线模式把以前一个线程做的事情拆成多个,每一个环节再用一条线程来完成,避免共享,线程直接通过管道传输消息。
这一块展开也是一个专题,主要设计NIO,Netty和Akka的编程实践,先占坑后面补上。
函数式模型
函数式并行模型类似流水线模型,单一的函数是无状态的,所以避免了资源竞争的复杂度,同时每个函数类似流水线里面的单一环境,彼此直接通过函数调用传递参数副本,函数之外的数据不会被修改。函数式模式跟流水线模式相辅相成逐渐成为更为主流的并发架构。具体的思想和编程实践也是个大专题,篇幅限制本文就先不展开,拟在下个专题中详细介绍《函数式编程演化》。
11.总结
由于CPU和I/O天然存在的矛盾,传统顺序的同步工作模式导致任务阻塞,CPU空等着没有执行,浪费资源。多线程为突破了同步工作模式的情况下浪费CPU资源,即使单核情况下也能将时间片拆分成单位给更多的线程来轮询享用。多线程在不同享状态的情况下非常高效,不管协同式还是抢占式都能在单位时间内执行更多的任务,从而更好的榨取CPU资源。
但是多数情况下线程之间是需要通信的,这一核心场景导致了一系列的问题,也就是线程安全。内存被共享的单位由于被不同线程轮番读取写入操作,这种操作带来的后果往往是写代码的人类没想到的,也就是并发带来的脏数据等问题。解决了资源使用效率问题,又带来了新的安全问题,如何解决?悲观方式就是对于存在共享内存的场景,无论如何只同意同一时刻一个线程操作,也就是同步操作方法或者代码段或者显示加锁。或者volatile来使共享的主存跟每条线程的工作内存同步(每次读都从主存刷新,每次写完都刷到主存)
要保证线程安全:
1、不要使用多线程,2、多线程各干各的不要共享内存,3、共享的内存空间是不可变的(常量,final),4、实在要变每次变完要同步到主存volatile(依赖当前值的逻辑除外),5、原子变量,6、根据具体业务,避免脏数据(这块就是多线程最容易犯错的地方)
线程安全后,要考虑的就是效率问题,如果不解决效率问题,那还干嘛要多线程。。。
如果所有线程都很自觉,快速执行完就跑路,那就是我们的理想情况了。但是,部分线程又臭又长(I/O阻塞),不能让一直赖在CPU不走,就把他上下文(线程号,变量,执行到哪等数值的快照)保存到内存,然后让它滚蛋下一个线程来。但是切换太快的话也不合适,毕竟每次保存线程的作案现场也要花不少时间的,单位时间执行线程数要控制在一个适当的个数。创建线程也是一项很吃力的工作,一个线程就是在栈内存里面开辟一段内存空间,根据字节码分配临时变量空间,不同操作系统通常不一样。不能频繁的创建销毁线程。那就搞个线程池出来,用的时候拿出来,用完扔回去,简单省事。但是线程池的创建也有门道,不能无限创建不然就失去意义了。操作系统有一定上限,线程池太多线程内存爆了,系统奔溃,所以需要一个机制。容纳1024个线程,多了排队再多了扔掉。回到线程切换,由于创建线程耗费资源,切换也花费,有时候切换线程的时间甚至比让线程待在cpu无所事事更长,那就给加个自旋锁,就是让它自己再cpu打滚啥事不干,一会儿轮到它里面就能干活。
既然多线程同步又得加锁耗资源,不同步又有共享安全问题。那能不能把这些锁,共享,同步,要注意的问题封装起来。搞出一个异步的工作机制,不用管底层的同步问题,只管业务问题。传统是工匠干活一根筋干完,事件驱动是流水线,把一件事拆分成多个环节,每个环节有唯一标识,各个环节批量生产,在流水线对接。这样在CPU单独干,不共享,不阻塞,干完自己的通知管工,高效封装了内部线程的运行规则,把业务关系暴露给管理者。
本文主要将的数基于JAVA的传统多线程并发模型,下面例牌给出知识体系图。
视频资料领取☟☟☟
原创: 编程原理林振华
标签: #netty对比akka