龙空技术网

在多线程下如何计数

乐天派爱思考的考拉 133

前言:

现在小伙伴们对“c语言怎么计数有多少个”大约比较关怀,小伙伴们都需要分析一些“c语言怎么计数有多少个”的相关知识。那么小编同时在网络上网罗了一些关于“c语言怎么计数有多少个””的相关内容,希望朋友们能喜欢,朋友们快快来学习一下吧!

通过共享变量计数

我们先来看一下这个简单的demo

public class CountDemo {    private static int count;    public static void main(String[] args) throws InterruptedException {        ExecutorService executorService = Executors.newFixedThreadPool(10);        for (int i = 0; i < 1000; i++) {            executorService.execute(CountDemo::insrease);        }        Thread.sleep(1000);        System.out.println(count);        executorService.shutdown();    }    public static void insrease() {        count++;    }}

这段代码是通过线程池来计算共享变量的值,我们希望最终输出的结果是1000,但是,无论我们执行多少次,每次的数据结果都是<=1000(大部分时候,都小于1000)

这是因此count是一个共享变量,直接使用count++时,在多线程环境下,无法保证原子性,因此,我们可以通过synchronized来保证原子性

public class CountDemo {    private static int count;    public static void main(String[] args) throws InterruptedException {        ExecutorService executorService = Executors.newFixedThreadPool(10);        for (int i = 0; i < 1000; i++) {            executorService.execute(CountDemo::insrease);        }        Thread.sleep(1000);        System.out.println(count);        executorService.shutdown();    }    public static synchronized void insrease() {        count++;    }}

添加了synchronized 关键字之后,保证了原子性,因此,每次输出的结果都是1000;但是我们都知道

synchronized 是一个重量级锁,那么是否可以通过其他方式来计数呢?

AtomicLong

在jdk1.5的时候增加了atomic相关的类,来保证线程安全,我们先来看一下这个例子

public class CountDemo {    private static AtomicLong atomicLong = new AtomicLong();    public static void main(String[] args) throws InterruptedException {        ExecutorService executorService = Executors.newFixedThreadPool(10);        for (int i = 0; i < 1000; i++) {            executorService.execute(CountDemo::insrease);        }        Thread.sleep(1000);        System.out.println(atomicLong.get());        executorService.shutdown();    }    public static void insrease() {        atomicLong.addAndGet(1);    }}

通过AtomicLong ,我们便能实现准备的计数,那么它究竟是什么原理呢?

public final long addAndGet(long delta) {    return unsafe.getAndAddLong(this, valueOffset, delta) + delta;}public final long getAndAddLong(Object var1, long var2, long var4) {    long var6;    do {        var6 = this.getLongVolatile(var1, var2);    } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));    return var6;}

这个源码实际上很简单,就是调用了Unsafe的两个方法,getLongVolatile是直接通过native的方法,获取元素的当前值(可以保证可见性),compareAndSwapLong这是比较当前内存中的值和获取的值是否相当,如果相等,则替换,不相等,则通过for循环自旋,直到替换成功,这个实际就是CAS(比较并交换),通过这种形式来保证原子性。那么这种方式有什么问题呢?

当在高并发的时候,需要通过大量的自旋操作来保证原子性,这样会浪费CPU的性能,那么我们能够通过什么手段去优化呢

LongAddr

LongAddr是JDK1.8新增的一个线程安全计数类,我们先来看一下代码示例

public class CountDemo {    private static LongAdder longAdder = new LongAdder();    public static void main(String[] args) throws InterruptedException {        ExecutorService executorService = Executors.newFixedThreadPool(10);        for (int i = 0; i < 1000; i++) {            executorService.execute(CountDemo::insrease);        }        Thread.sleep(1000);        System.out.println(longAdder.longValue());        executorService.shutdown();    }    public static void insrease() {        longAdder.add(1);    }}

通过LongAddr,我们同样可以实现线程安全的计数,那么它和AtomicLong 有什么区别呢?我们来通过源码分析一下

public void add(long x) {    Cell[] as; long b, v; int m; Cell a;    if ((as = cells) != null || !casBase(b = base, b + x)) {        boolean uncontended = true;        if (as == null || (m = as.length - 1) < 0 ||            (a = as[getProbe() & m]) == null ||            !(uncontended = a.cas(v = a.value, v + x)))            longAccumulate(x, null, uncontended);    }}final void longAccumulate(long x, LongBinaryOperator fn,                          boolean wasUncontended) {    int h;    if ((h = getProbe()) == 0) {        ThreadLocalRandom.current(); // force initialization        h = getProbe();        wasUncontended = true;    }    boolean collide = false;                // True if last slot nonempty    for (;;) {        Cell[] as; Cell a; int n; long v;        if ((as = cells) != null && (n = as.length) > 0) {            if ((a = as[(n - 1) & h]) == null) {                if (cellsBusy == 0) {       // Try to attach new Cell                    Cell r = new Cell(x);   // Optimistically create                    if (cellsBusy == 0 && casCellsBusy()) {                        boolean created = false;                        try {               // Recheck under lock                            Cell[] rs; int m, j;                            if ((rs = cells) != null &&                                (m = rs.length) > 0 &&                                rs[j = (m - 1) & h] == null) {                                rs[j] = r;                                created = true;                            }                        } finally {                            cellsBusy = 0;                        }                        if (created)                            break;                        continue;           // Slot is now non-empty                    }                }                collide = false;            }            else if (!wasUncontended)       // CAS already known to fail                wasUncontended = true;      // Continue after rehash            else if (a.cas(v = a.value, ((fn == null) ? v + x :                                         fn.applyAsLong(v, x))))                break;            else if (n >= NCPU || cells != as)                collide = false;            // At max size or stale            else if (!collide)                collide = true;            else if (cellsBusy == 0 && casCellsBusy()) {                try {                    if (cells == as) {      // Expand table unless stale                        Cell[] rs = new Cell[n << 1];                        for (int i = 0; i < n; ++i)                            rs[i] = as[i];                        cells = rs;                    }                } finally {                    cellsBusy = 0;                }                collide = false;                continue;                   // Retry with expanded table            }            h = advanceProbe(h);        }        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {            boolean init = false;            try {                           // Initialize table                if (cells == as) {                    Cell[] rs = new Cell[2];                    rs[h & 1] = new Cell(x);                    cells = rs;                    init = true;                }            } finally {                cellsBusy = 0;            }            if (init)                break;        }        else if (casBase(v = base, ((fn == null) ? v + x :                                    fn.applyAsLong(v, x))))            break;                          // Fall back on using base    }}

这段代码比较多,实际上,它的核心就是,当没有竞争的时候,直接通过base来进行计数,当线程出现竞争的时候,则会创建一个cell数据,通过数据的每一个节点来分别计数,我们在获取值的时候,则通过base和cell的每个元素的和,通过这种性能,则可以讲每个线程分散到cell数组的不同节点,一次来减少并发冲突,减少自旋的次数

标签: #c语言怎么计数有多少个