前言:
今天我们对“c语言怎么表示10的n次方”都比较关怀,朋友们都需要知道一些“c语言怎么表示10的n次方”的相关文章。那么小编在网上收集了一些对于“c语言怎么表示10的n次方””的相关文章,希望各位老铁们能喜欢,你们一起来学习一下吧!#挑战30天在头条写日记#
ConcurrentHashMap 底层是基于数组 +链表组成的,JDK 1.7 和 JDK 1.8 中具体的实现稍有不同。
1 JDK 1.7 中的 ConcurrentHashMap1.JDK 1.7 中 ConcurrentHashMap 底层结构
JDK 1.7 的 ConcurrentHashMap 类所采用的是分段锁的思想,将 HashMap 进行切割,把 HashMap 中的哈希数组切分成小数组,每个小数组由 n 个 HashEntry 组成,其中小数组(Segment)继承自 ReentrantLock(可重入锁)。
JDK 1.7 中的数据结构:
如图所示,是由 Segment 数组、HashEntry 组成,与 HashMap 一样,仍然是数组 + 链表。
ConcurrentHashMap 由很多个 Segment 组合,而每一个 Segment 是一个类似于 HashMap 的结构(子哈希表)。所以每个 HashMap 的内部都可以进行扩容。但是 Segment 的个数是 16 个,也可以认为 ConcurrentHashMap 默认支持最多 16 个线程并发。其中,Segment 里维护了一个 HashEntry 数组,Segment 继承自 ReentrantLock,并发环境下,对于不同的 Segment 数据进行操作是不用考虑锁竞争的,因此不会像 Hashtable 那样不管是添加、删除、查询操作都需要同步处理。
扒一下 ConcurrentHashMap 类:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { /** * The segments, each of which is a specialized hash table. */ final Segment<K, V>[] segments; ...}
Segment类是 ConcurrentHashMap 的一个静态内部类,内部结构跟 HashMap 差不多:
static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; // 1. 和 HashMap 中的 HashEntry 作⽤⼀样,是真正存放数据的数组位桶 transient volatile HashEntry<K,V>[] table; // 2. table 的数组容量 transient int count; // 3. 记录修改次数的变量 transient int modCount; // 4. 阈值⼤⼩,所能容纳的元素极限,用于扩容判断 transient int threshold; // 5. 负载因⼦,用于扩容 final float loadFactor;}
存放元素的HashEntry,也是一个静态内部类:
static final class HashEntry<K, V> { // hash 值 final int hash; // 键 final K key; // 值 volatile V value; // 下一个节点 volatile HashEntry<K, V> next; HashEntry(int hash, K key, V value, HashEntry<K, V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; }}
HashEntry 和 HashMap 中的 Entry 非常类似,唯一的区别就是其中的核心数据,比如 value、next 都使用了 volatile 关键字 修饰,这就保证了多线程环境下数据获取时的可见性。
volatile 关键字的特性:
保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这个新的值对其他线程来说是可见的(实时可见性);禁止进行指令重排序(实现有序性);volatile 只能保证对单次读写的原子性,像 i++ 这种操作不能保证原子性。
从类的定义上可以看到,Segment 这个静态内部类继承了 ReetrantLock 类
2.ConcurrentHashMap 的常量
// 初始初始容量static final int DEFAULT_INITIAL_CAPACITY = 16;// 默认负载因子static final float DEFAULT_LOAD_FACTOR = 0.75f;// 初始的并发等级static final int DEFAULT_CONCURRENCY_LEVEL = 16;// 最大容量static final int MAXIMUM_CAPACITY = 1 << 30;// segment最小容量static final int MIN_SEGMENT_TABLE_CAPACITY = 2;// 一个segment最大容量static final int MAX_SEGMENTS = 1 << 16; // 锁之前重试次数static final int RETRIES_BEFORE_LOCK = 2;3.初始化
public ConcurrentHashMap() { // DEFAULT_INITIAL_CAPACITY 表示初始化容量,默认为 16 // DEFAULT_LOAD_FACTOR 表示负载因子,默认为 0.75 // DEFAULT_CONCURRENCY_LEVEL 表示 Segment[] 初始并发等级,默认为 16 this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);} public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);} public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);}
this调用对应的构造方法:
// 通过指定的容量、负载因子和并发等级创建一个新的ConcurrentHashMappublic ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { // 1. 参数校验:对容量、负载因子和并发等级做限制 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); // 2. MAX_SEGMENTS = 1 << 16 = 65536,限制并发等级不可以大于最大等级,如果并发量大于 65536,concurrencyLevel = 65536 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 下面即通过并发等级来确定Segment的大小 // 3. sshift用来记录向左按位移动的次数,2的多少次方 int sshift = 0; // 4. ssize用来记录Segment数组的大小 int ssize = 1; // Segment的大小为大于等于concurrencyLevel的第一个2的n次方的数(也就是concurrencyLevel之上最近的2的次方值) while (ssize < concurrencyLevel) { // 新增移动次数,直到 ssize >= concurrencyLevel 为止,concurrencyLevel 为 16,循环之后 ssize = 16 ++sshift; // 位移动,向左移动 1 ssize <<= 1; } // 5. segmentShift、segmentMask 用于元素在Segment[]数组的定位 // 记录段偏移量 this.segmentShift = 32 - sshift; // segmentMask的值等于ssize - 1(这个值很重要) // 记录段掩码 this.segmentMask = ssize - 1; // 6. 传入初始化的容量值大于最大容量值,则默认为最大容量值 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // 7. c记录每个Segment上要放置多少个元素,即Segment中HashEntry的数组长度,c也一定为 2 的 n 次方,这里的计算类似于 HashMap 的容量 int c = initialCapacity / ssize; // 假如有余数,则Segment数量加1 if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; // Segment中的类似于HashMap的容量,至少是2或者2的倍数 while (cap < c) cap <<= 1; // 8. 创建第一个Segment对象,并放入Segment[]数组中,作为第一个Segment,默认数组长度为 2 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); // 9. 创建Segment[],指定segment数组的长度,默认数组长度为 16 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; // 10. 使用CAS方式,将上面创建的segment对象放入segment[]数组中 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss;}
总结一下在 JDK 1.7 中 ConcurrnetHashMap 的初始化逻辑。
必要参数校验;校验并发级别 concurrencyLevel 大小,如果大于最大值,重置为最大值。无参构造默认值是 16;寻找并发级别 concurrencyLevel 之上最近的 2 的幂次方值,作为初始化容量大小,默认是 16;记录 segmentShift 偏移量,这个值为【容量 = 2 的N次方】中的 N,在后面 Put 时计算位置时会用到。默认是 32 - sshift = 28;记录 segmentMask,默认是 ssize - 1 = 16 -1 = 15;初始化 segments[0],默认大小为 2,负载因子 0.75,扩容阀值是 2*0.75=1.5,插入第二个值时才会进行扩容。
从源码上可以看出,ConcurrentHashMap 初始化方法有三个参数:initialCapacity(初始化容量)为16、loadFactor(负载因子)为0.75、concurrentLevel(并发等级)为16,如果不指定则会使用默认值。
其中,值得注意的是 concurrentLevel 这个参数,虽然 Segment 数组大小 ssize 是由 concurrentLevel 来决定的,但是却不一定等于 concurrentLevel,ssize 通过位移动运算,一定是大于或者等于 concurrentLevel 的最小的 2 的次幂!
通过计算可以看出,按默认的 initialCapacity 初始容量为16,concurrentLevel 并发等级为16,理论上就允许 16 个线程并发执行,并且每一个线程独占一把锁访问 Segment,不影响其它的 Segment 操作。
4.put 操作
扒一下 put() 方法的源码:
public V put(K key, V value) { Segment<K,V> s; // 1. ConcurrentHashMap中key和value都不能为null if (value == null) throw new NullPointerException(); // 2. 计算key的哈希值 int hash = hash(key); // 3. 通过key的哈希值,定位ConcurrentHashMap中Segment[]的角标 // hash 值无符号右移28位(初始化时获得),然后与segmentMask=15做与运算 int j = (hash >>> segmentShift) & segmentMask; // 4. 使用CAS的方式,从Segment[]中获取j角标下的Segment对象,并判断是否存在 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment // 如果查到的Segment为空,初始化 s = ensureSegment(j); // 5. 底层使用了Segment对象的put()方法 return s.put(key, hash, value, false);}
从源码中可以看出,put 操作主要分为两步:
获取要 put 的 key 的位置,获取指定位置的 Segment;如果指定位置的 Segment 为空,则初始化这个 Segment;
调用 Segment 的 put() 方法。
扒一下初始化 Segment() 方法的源码:
@SuppressWarnings("unchecked")private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; // 判断 u 位置的 Segment 是否为null if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype // 获取0号 segment 里的 HashEntry<K,V> 初始化长度 int cap = proto.table.length; // 获取0号 segment 里的 hash 表里的扩容负载因子,所有的 segment 的 loadFactor 是相同的 float lf = proto.loadFactor; // 计算扩容阀值 int threshold = (int)(cap * lf); // 创建一个 cap 容量的 HashEntry 数组 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck // 再次检查 u 位置的 Segment 是否为null,因为这时可能有其他线程进行了操作 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 自旋检查 u 位置的 Segment 是否为null while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 使用CAS 赋值,只会成功一次 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg;}
初始化 Segment 流程:
检查计算得到的位置的 Segment 是否为 null;为 null 继续初始化,使用 Segment[0] 的容量和负载因子创建一个 HashEntry 数组;再次检查计算得到的指定位置的 Segment 是否为 null;使用创建的 HashEntry 数组初始化这个 Segment;自旋判断计算得到的指定位置的 Segment 是否为null,使用 CAS 在这个位置赋值为 Segment。
真正插入元素的 put() 方法:
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 1. tryLock是ReentrantLock类中的方法,表示尝试从CPU手上获取到锁。如果锁没有被另外一个线程持有,获得锁并返回true;否则返回false,并调用scanAndLockForPut()方法 // 这里是并发的关键,每一个Segment进行put时,都会加锁 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { // 2. 获取Segment对象中的HashEntry[]数组 HashEntry<K,V>[] tab = table; // 3. 确定key的hash值所在HashEntry数组的索引位置 int index = (tab.length - 1) & hash; // 4. 根据索引获取HashEntry对象 HashEntry<K,V> first = entryAt(tab, index); // 5. 遍历当前HashEntry链 for (HashEntry<K,V> e = first;;) { // 判断逻辑与HashMap相似 // 如果链头不为null if (e != null) { K k; // 如果在该链中找到相同的key,则用新值替换旧值,并退出循环 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } // 如果没有和key相同的,一直遍历到链尾,链尾的next为null,进入到else e = e.next; } else { // 如果key不存在,则把当前Entry插入到链头(头插法) if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); // 此时数量+1 int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) // 需要注意的地方:如果超出了HashEntry的阈值,就要对HashEntry[]进行扩容 rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // 6. 操作完成后,释放对象锁 unlock(); } return oldValue;}
从 put() 源码中可以看出,真正的 put 操作主要分为以下几步:
尝试获取对象锁,如果获取到就返回 true,否则执行 scanAndLockForPut() 方法,这个方法也是尝试获取对象锁;获取到锁之后,类似于 hashMap 的 put() 方法,通过 key 计算所在 HashEntry 数组的下标,然后获取这个位置上的 HashEntry;获取到数组下标之后遍历链表内容,为什么要遍历?因为这里获取的 HashEntry 可能是一个空元素,也可能是链表已存在,所以要区别对待。
① 如果这个位置上的 HashEntry 不存在:
a. 如果当前容量大于扩容阀值,小于最大容量,进行扩容。
b. 直接头插法插入。
② 如果这个位置上的 HashEntry 存在:
a. 判断链表当前元素 key 和 hash 值是否和要 put 的 key 和 hash 值一致。一致则替换值;
b. 不一致,获取链表下一个节点,直到发现相同进行值替换,或者链表表里没有相同的:
a) 如果当前容量大于扩容阀值,小于最大容量,进行扩容。 b) 直接链表头插法插入。
如果要插入的位置之前已经存在,替换后返回旧值,否则返回 null;
最后操作完整之后,释放对象锁;
再来扒一下 scanAndLockForPut() 方法的源码:
private HashEntry<K, V> scanAndLockForPut(K key, int hash, V value) { // 1. 定位HashEntry数组位置,获取第一个节点 HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; // 2. 重试次数 int retries = -1; // negative while locating node // 自旋获取锁 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) if(e == null) if (node == null) // speculatively create node // 3. 构造新节点 node = new HashEntry<K, V> (hash, key, value, null); retries = 0; } else if (key. equals(e. key)) retries = 0; else e = e.next; } else if (++retries > MAX_ SCAN _RETRIES) { // 4. 重试次数+1,如果大于最大次数,调用lock()方法获取锁,如果没有获取当前线程就被阻塞,直到获取并跳出循环 Lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { // 5. HashEntry存储内容发生变化,重置重试次数 e = first = f; // re-traverse if entry changed retries = -1; } } return node;}
scanAndLockForPut() 方法的操作也是分以下几步:
当前线程尝试去获得锁,查找 key 是否已经存在,如果不存在,就创建一个HashEntry 对象;如果重试次数大于最大次数,就调用 lock() 方法获取对象锁,如果依然没有获取到,当前线程就阻塞,直到获取之后退出循环;在这个过程中,key 可能被别的线程给插入,所以在第 5 步中,如果 HashEntry 存储内容发生变化,重置重试次数;
通过 scanAndLockForPut() 方法,当前线程就可以在即使获取不到 segment 锁的情况下,完成需要添加节点的实例化工作,当获取锁后,就可以直接将该节点插入链表即可。
这个方法还实现了类似于自旋锁的功能,循环式的判断对象锁是否能够被成功获取,直到获取到锁才会退出循环,防止执行 put 操作的线程频繁阻塞,这些优化都提升了 put 操作的性能。
5.get 操作
get() 方法因为不涉及增、删、改操作,所以不存在并发故障问题。
扒一下 get() 方法的源码:
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; // 1. 计算key的hash值 int h = hash(key); // 2. 计算该hash值所属的Segment[]的角标 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE ; // 3. 获取Segment[]中u角标下的Segment对象,不存在直接返回 if ((s = (Segment<K, V> )UNSAFE. getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { // 4. 再根据hash值,从Segment对象中的HashEntry[]获取HashEntry对象,并对HashEntry对象进行链表遍历 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE. getObjectVolatile (tab, ((Long)(((tab. Length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; // 5. 在链表中找到对应元素,并返回 if ((k = e.key) === key || (e.hash == h && key.equals(k))) return e. value; } } return null;}
get() 方法只需要两步就可以实现:
计算得到 key 的存放位置;遍历指定位置查找相同 key 的 value 值。
由于 HashEntry 涉及到的共享变量都使用 volatile 修饰,volatile 可以保证内存可见性,所以不会读取到过期数据。
6.remove 操作
remove() 操作和 put() 方法差不多,都需要获取锁对象才能操作,通过 key 找到元素所在的 Segment 对象,然后移除即可。
扒一下 remove() 方法的源码:
public V remove(0bject key) { // 1. 计算key的hash值 int hash = hash( key); // 2. 计算该hash值所属的Segment[]的角标,再通过角标下的Segment对象 Segment<K,V> S = segmentForHash(hash); // 3. 执行移除方法 return s == null ? null : s. remove(key, hash, null);}
与 get() 方法类似,都是先获取 Segment 数组所在的 Segment 对象,然后再调用 Segment 对象的 remove() 方法区移除。
扒一下 Segment 对象的 remove() 方法:
final V remove(0bject key, int hash, 0bject value) { // 1. 尝试获取对象锁 if (!tryLock()) scanAndLock(key, hash); V oLdValue = null; try { HashEntry<K,V>[] tab = table; // 2. 计算key的hash值在HashEntry[]中的角标 int index = (tab. length - 1) & hash; // 3. 根据index角标获取HashEntry对象 HashEntry<K,V> e = entryAt (tab, index); HashEntry<K,V> pred = null; // 4. 循环遍历HashEntry对象,HashEntry为单向链表结构 while (e != null) { K k; HashEntry<K,V> next = e. next; // 5. 通过key和hash判断key是否存在 if ((k = e.key) == key | (e.hash == hash && key. equals(k))) { V v = e.value; // 6. 移除元素,将下节点向上移 if (value == null|| value == v H value. equals(v)) { if (pred == null) setEntryAt(tab, index, next); else pred. setNext(next); ++modCount ; count; oldValue = V; } break; } pred = e e = next; } } finally { // 7. 释放锁 unLock(); return oldValue;}
先获取对象锁,如果获取到之后执行移除操作,之后的操作类似于 HashMap 的移除方法,步骤如下:
先获取对象锁;
计算 key 的 hash 值在 HashEntry[] 中的角标;根据 index 角标获取 HashEntry 对象;循环遍历 HashEntry 对象,HashEntry 为单向链表结构;通过 key 和 hash 判断 key 是否存在,如果存在,就移除元素,并将需要移除的元素节点的下一个向上移动;最后就是释放对象锁,以便其他线程使用。7. 扩容 rehash
ConcurrentHashMap 的扩容只会扩容到原来的两倍。老数组里的数据移动到新的数组时,位置要么不变,要么变为 index+ oldSize,参数里的 node 会在扩容之后使用链表头插法插入到指定位置。
扒一下 rehash() 方法的源码:
private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; // 老容量 int oldCapacity = oldTable.length; // 新容量,扩大两倍 int newCapacity = oldCapacity << 1; // 新的扩容阀值 threshold = (int)(newCapacity * loadFactor); // 创建新的数组 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; // 新的掩码,默认2扩容后是4,-1是3,二进制就是11。 int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { // 遍历老数组 HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; // 计算新的位置,新的位置只可能是不变或者是老的位置+老的容量。 int idx = e.hash & sizeMask; if (next == null) // Single node on list // 如果当前位置还不是链表,只是一个元素,直接赋值 newTable[idx] = e; else { // Reuse consecutive sequence at same slot // 如果是链表了 HashEntry<K,V> lastRun = e; int lastIdx = idx; // 新的位置只可能是不变或者是老的位置+老的容量。 // 遍历结束后,lastRun 后面的元素位置都是相同的 for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } // lastRun 后面的元素位置都是相同的,直接作为链表赋值到新位置。 newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { // 遍历剩余元素,头插法到指定 k 位置。 V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } // 头插法插入新的节点 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable;}
这里有两个 for 循环:第一个 for 是为了寻找这样一个节点,这个节点后面的所有 next 节点的新位置都是相同的。然后把这个作为一个链表赋值到新位置。第二个 for 循环是为了把剩余的元素通过头插法插入到指定位置链表。这样实现的原因可能是基于概率统计。
2 JDK 1.8 中的 ConcurrentHashMap1. ConcurrentHashMap 的关键属性
tablevolatile Node<K,V>[] table:
装载 Node 的数组,作为 ConcurrentHashMap 的数据容器,采用懒加载的方式,直到第一次插入数据的时候才会进行初始化操作,数组的大小总是为 2 的幂次方。
nextTablevolatile Node<K,V>[] nextTable;
扩容时使用,平时为 null,只有在扩容的时候才为非 null。
sizeCtlvolatile int sizeCtl;
该属性用来控制 table 数组的大小,根据是否初始化和是否正在扩容有几种情况:
当值为负数时: 如果为 -1 表示正在初始化,如果为 -N 则表示当前正有 N-1 个线程进行扩容操作;当值为正数时: 如果当前数组为 null 的话表示 table 在初始化过程中,sizeCtl 表示为需要新建数组的长度;若已经初始化了,表示当前数据容器(table 数组)可用容量也可以理解成临界值(插入节点数超过了该临界值就需要扩容),具体指为数组的长度 n 乘以加载因子 loadFactor;当值为0时,即数组长度为默认初始值。
sun.misc.Unsafe U
在 ConcurrentHashMapde 的实现中可以看到大量的 U.compareAndSwapXXXX 的方法去修改 ConcurrentHashMap 的一些属性。
这些方法实际上是利用了 CAS 算法保证了线程安全性,这是一种乐观策略,假设每一次操作都不会产生冲突,当且仅当冲突发生的时候再去尝试。
而 CAS 操作依赖于现代处理器指令集,通过底层 CMPXCHG 指令实现。CAS(V,O,N) 核心思想为:若当前变量实际值 V 与期望的旧值 O 相同,则表明该变量没被其他线程进行修改,因此可以安全的将新值 N 赋值给变量;若当前变量实际值 V 与期望的旧值 O 不相同,则表明该变量已经被其他线程做了处理,此时将新值 N 赋给变量操作就是不安全的,再进行重试。
而在大量的同步组件和并发容器的实现中使用 CAS 是通过 sun.misc.Unsafe 类实现的,该类提供了一些可以直接操控内存和线程的底层操作,可以理解为 java 中的“指针”。该成员变量的获取是在静态代码块中:
static { try { U = sun.misc.Unsafe.getUnsafe(); ....... } catch (Exception e) { throw new Error(e); }}2.ConcurrentHashMap 中关键内部类
Node
Node 实现了 Map.Entry 接口,主要存放 key-value 键值对,并且具有 next 域:
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } ...}
可以看出很多属性都是用 volatile 进行修饰的,也就是为了保证内存可见性。
TreeNode
树节点,继承于承载数据的 Node 类。而红黑树的操作是针对 TreeBin 类的,从该类的注释也可以看出,也就是 TreeBin 会将 TreeNode 进行再一次封装:
static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; ......}
TreeBin
这个类并不负责包装用户的 key、value 信息,而是包装的很多 TreeNode 节点。实际的 ConcurrentHashMap “数组” 中,存放的是 TreeBin 对象,而不是 TreeNode 对象:
static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock ......}
ForwardingNode
在扩容时才会出现的特殊节点,其 key、value、hash 全部为 null。并拥有 nextTable 指针引用新的 table 数组:
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } .....}3. JDK 1.8 中 ConcurrentHashMap 底层结构
虽然 JDK 1.7 中的 ConcurrentHashMap 解决了 HashMap 并发的安全性,但是当冲突的链表过长时,在查询遍历的时候依然很慢。
所以在 JDK 1.8 中,引入了 红黑树。当冲突的链表长度大于 8 时,会将链表转化成红黑树,红黑树又被称为平衡二叉树,在查询效率方面,又有所提升。
JDK 1.8 中的数据结构:
可以发现 JDK 1.8 的 ConcurrentHashMap 相对于 JDK 1.7 来说变化比较大,不再是之前的 Segment 数组 + HashEntry 数组 + 链表,而是 Node 数组 + 链表 / 红黑树。当冲突链表达到一定长度时,链表会转换成红黑树。
与 JDK1.7 中的 ConcurrentHashMap 相比, 它抛弃了原有的 Segment 分段锁实现,采用了 CAS + synchronized 来保证并发的安全性。
JDK 1.8 中的 ConcurrentHashMap 对节点 Node 类中的共享变量也使用了 volatile 关键字,保证多线程操作时变量的可见性。
扒一下 Node 类的源码:
static class Node<K,V> implements Map.Entry<K,V> { // hash 值 final int hash; // 键 final K key; // 值 volatile V val; // 下一个节点 volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } ...}4.实例构造器方法
在使用 ConcurrentHashMap 第一件事自然而然就是 new 出来一个 ConcurrentHashMap 对象,一共提供了几个构造器方法:
// 1. 构造一个空的map,即table数组还未初始化,初始化放在第一次插入数据时,默认大小为16ConcurrentHashMap()// 2. 给定map的大小ConcurrentHashMap(int initialCapacity) // 3. 给定一个mapConcurrentHashMap(Map<? extends K, ? extends V> m)// 4. 给定map的大小以及加载因子ConcurrentHashMap(int initialCapacity, float loadFactor)// 5. 给定map大小,加载因子以及并发度(预计同时操作数据的线程)ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
当传入了指定大小的 map 时,该构造器的源码为:
public ConcurrentHashMap(int initialCapacity) { // 1. 小于0直接抛异常 if (initialCapacity < 0) throw new IllegalArgumentException(); // 2. 判断是否超过了允许的最大值,超过了话则取最大值,否则再对该值进一步处理 int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); // 3. 赋值给sizeCtl this.sizeCtl = cap;}
这段代码的逻辑很容易理解:如果小于 0 就直接抛出异常;如果指定值大于了所允许的最大值的话就取最大值;否则,再对指定值做进一步处理。最后将 cap 赋值给 sizeCtl,当调用构造器方法之后,sizeCtl 的大小应该就代表了 ConcurrentHashMap 的大小,即 table 数组长度。
扒一下其中 tableSizeFor() 方法的源码:
/** * Returns a power of two table size for the given desired capacity. * See Hackers Delight, sec 3.2 */private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;}
该方法会将调用构造器方法时指定的大小转换成一个 2 的幂次方数,也就是说 ConcurrentHashMap 的大小一定是 2 的幂次方。比如,当指定大小为 18 时,为了满足 2 的幂次方特性,实际上 concurrentHashMap 的大小为 2 的 5 次方(32)。
另外,需要注意的是,调用构造器方法的时候并未构造出 table 数组(可以理解为 ConcurrentHashMap 的数据容器),只是算出 table 数组的长度,当第一次向 ConcurrentHashMap 插入数据的时候才真正的完成初始化创建 table 数组的工作。
5.put 操作
扒一下 put() 方法的源码:
public V put(K key, V value) { return putVal(key, value, false);}/** Implementation for put and putIfAbsent */final V putVal(K key, V value, boolean onlyIfAbsent) { // 1. key、value不允许为空 if (key == null || value == null) throw new NullPointerException(); // 2. 通过 key 获取到hash值 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { // f = 目标位置元素 Node<K,V> f; int n, i, fh; // 如果当前table还没有初始化,先调用initTable()方法将tab进行初始化 if (tab == null || (n = tab.length) == 0) // 3. 如果tab为空,就初始化node数组(自旋+CAS) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 4. 如果f为null,说明table中这个位置第一次插入元素,利用Unsafe.compareAndSwapObject()方法插入Node节点 // 桶内为空,CAS放入,不加锁,成功了就直接break if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 当前正在扩容 else if ((fh = f.hash) == MOVED) // 5. MOVED等于-1,如果f.hash等于-1,说明当前f是ForwardingNode节点,意味着其他线程正在扩容,则一起进行扩容操作 tab = helpTransfer(tab, f); else { // 6. 其余情况都是把新的Node节点按链表或红黑树的方式插入到合适的位置 V oldVal = null; // 7. 采用同步内置锁实现并发控制 synchronized (f) { // 7.1 节点插入之前,再次利用tabAt(tab, i)==f判断,防止被其他线程修改 if (tabAt(tab, i) == f) { // 7.2 如果fh=f.hash >= 0,说明当前为链表,在链表中插入新的键值对 if (fh >= 0) { // 7.3 遍历链表,如果找到对应的node节点,则修改value;否则直接在链表尾部加入节点 binCount = 1; // 循环加入新的或者覆盖节点 for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 7.4 如果f是TreeBin类型节点,说明f是红黑树根节点,则在树结构上遍历元素,更新或增加节点 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // 插入完键值对后再根据实际大小看是否需要转换成红黑树 if (binCount != 0) { // 8. 如果链表中节点数binCount>=TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 9. 插入完成之后,扩容判断(如果超过了临界值:实际大小*加载因子,就需要扩容) addCount(1L, binCount); return null;}
当进行 put 操作时,流程大概可以分如下几个步骤:
首先对于每一个放入的值,首先利用 spread() 方法对 key 的 hashcode 进行一次 hash 计算,由此来确定这个值在 table 中的位置;如果当前 table 数组还未初始化,先将 table 数组进行初始化操作;如果这个位置是 null 的,那么使用 CAS 操作直接放入;如果这个位置存在结点,说明发生了 hash 碰撞,首先判断这个节点的类型。如果该节点 fh==MOVED(代表 forwardingNode,数组正在进行扩容)的话,说明正在进行扩容;如果是链表节点(fh>0),则得到的结点就是 hash 值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到 key 相同的节点,则只需要覆盖该结点的 value 值即可。否则依次向后遍历,直到链表尾插入这个结点;如果这个节点的类型是 TreeBin 的话,直接调用红黑树的插入方法进行插入新的节点;插入完节点之后再次检查链表长度,如果长度大于 8,就把这个链表转换成红黑树;对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容。
put 操作大致的流程,就是这样的,可以看的出,复杂程度比 JDK1.7 上了一个台阶。
5.1 initTable 初始化数组
put() 方法的第 3 步中调用了 initTable()方法进行初始化数组:
private final Node<K,V>[] initTable() { // 1. 创建临时变量tab、sc,tab表示Node数组,sc表示临时变量 Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 2. sizeCtl默认为0,用来控制table的初始化和扩容操作,使用了volatile关键字修饰保证并发的可见性 // 保证只有一个线程正在进行初始化操作 // 如果 sizeCtl < 0,说明另外的线程执行CAS操作成功,当前线程只需要让出CPU时间片 if ((sc = sizeCtl) < 0) // 让出 CPU 使用权 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 3. 通过CAS方法,将sizeCtl修改为-1,有且只有一个线程能够修改成功 try { // 4. 初始化Node数组 if ((tab = table) == null || tab.length == 0) { // 得出数组的大小 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") // 这里才真正的初始化数组 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; // 计算数组中可用的大小:实际大小 n*0.75 // n-(n>>2) = n-(1/4)n = (3/4)n = 0.75 sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab;}
这里需要注意:计算数组中可用的大小(数组实际大小)时:n-(n>>2) = n-(1/4)n = (3/4)n = 0.75,位运算效率会高一些。
如果选择是无参的构造器的话,这里在 new Node 数组的时候会使用默认大小为 DEFAULT_CAPACITY(16),然后乘以加载因子 0.75 为 12,也就是说数组的可用大小为 12。
sizeCtl 是一个对象属性,使用了volatile关键字修饰保证并发的可见性,默认为 0,当第一次执行 put 操作时,通过 Unsafe.compareAndSwapInt() 方法,俗称CAS,将 sizeCtl修改为 -1,有且只有一个线程能够修改成功,接着执行 table 初始化任务。
如果别的线程发现 sizeCtl<0,意味着有另外的线程执行 CAS 操作成功,当前线程通过执行 Thread.yield() 让出 CPU 时间片等待 table 初始化完成。
从源码中可以发现 ConcurrentHashMap 的初始化是通过自旋和 CAS 操作完成的。里面的变量 sizeCtl,它的值决定着当前的初始化状态:
-1 说明正在初始化-N 说明有 N-1 个线程正在进行扩容表示 table 初始化大小,如果 table 没有初始化表示 table 容量,如果 table 已经初始化
5.2 helpTransfer() 帮组扩容
put() 方法中的第 5 步调用了 helpTransfer() 方法,如果 f.hash == -1,说明当前 f 是 ForwardingNode 节点,意味着有其他线程正在扩容,则一起进行扩容操作:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; // 1. 如果table不为空且node节点是转移类型,同时node节点的nextTable(新table)不为空,进行数据校验 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { // 2. 满足以上条件之后,尝试帮助扩容 // 数据数组的length得到一个标识符号 int rs = resizeStamp(tab.length); // 3. 如果nextTable没有被并发修改且tab也没有被并发修改,同时sizeCtl<0,说明还在扩容 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { // 4. 对sizeCtl参数值进行分析判断 // 判断1:如果sizeCtl无符号右移16不等于rs,则标识符变化了 // 判断2:如果sizeCtl == rs+1,标识扩容结束了,不再线程进行扩容 // 判断3:如果sizeCtl == rs+65535,表示达到最大帮助线程的数量,即65535 // 判断4:如果转移下标transferIndex <= 0,表示扩容结束 // 满足任何一个判断,结束循环,返回table if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 5. 如果以上都不是,将sizeCtl + 1,表示增加了一个线程帮助其扩容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 6. 对数组进行转移,执行完之后结束循环。 transfer(tab, nextTab); break; } } return nextTab; } return table;}
这个过程操作步骤如下:
第1步,对 table、node 节点、node 节点的 nextTable,进行数据校验;第2步,根据数组的 length 得到一个标识符号;第3步,进一步校验 nextTab、tab、sizeCtl 值,如果 nextTab 没有被并发修改并且 tab 也没有被并发修改,同时 sizeCtl < 0,说明还在扩容;第4步,对 sizeCtl 参数值进行分析判断,如果不满足任何一个判断,将 sizeCtl + 1, 增加了一个线程帮助其扩容。
5.3 addCount() 扩容判断
put() 方法的第 9 步调用了 addCount() 方法,插入完成之后进行扩容判断:
private final void addCount(long x, int check) { // 1. 从putVal传入的参数是 x=1、check=0,只有hash冲突且它的结构是链表的结构时,check才会大于1 CounterCell[] as; long b, s; // 2. 利用CAS方法更新baseCount的值 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } // 3. 检查是否需要扩容,默认check=1,需要检查 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 4. 如果map.size()大于等于sizeCtl(达到扩容阈值需要扩容)并且table不是空,同时table的长度小于最大容量,可以扩容 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // 5. 根据length得到一个标识 int rs = resizeStamp(n); // sc=sizeCtl,如果小于0,标识正在扩容,尝试帮助扩容 if (sc < 0) { // 6. 对sizeCtl参数值进行分析判断,与帮助扩容阶段的判断一样 // 判断1:如果sizeCtl无符号右移16不等于rs,则标识符变化了 // 判断2:如果sizeCtl == rs+1,标识扩容结束了,不再线程进行扩容 // 判断3:如果sizeCtl == rs+65535,表示达到最大帮助线程的数量,即65535 // 判断4:如果转移下标transferIndex <= 0,表示扩容结束 // 满足任何一个判断,结束循环,返回table if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 7. 如果不在扩容,将sizeCtl更新:标识符左移16位+2,也就是变成一个负数 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 8. 进行扩容处理 transfer(tab, null); s = sumCount(); } }}
这个过程操作步骤如下:
第1步,利用 CAS 将方法更新 baseCount 的值;第2步,检查是否需要扩容,默认 check = 1,需要检查;第3步,如果满足扩容条件,判断当前是否正在扩容,如果是正在扩容就一起扩容;第4步,如果不在扩容,将 sizeCtl 更新为负数,并进行扩容处理。
从 put 的流程中可以发现,里面大量的使用了 CAS 方法,CAS 表示比较与替换,里面有 3 个参数,分别是目标内存地址、旧值、新值,每次判断的时候,会将旧值与目标内存地址中的值进行比较,如果相等,就将新值更新到内存地址里,如果不相等,就继续循环,直到操作成功为止。
虽然使用的了CAS这种乐观锁方法,但是里面的细节设计的是很复杂的。
6.get 操作
get() 方法不涉及并发操作,直接查询就可以了:
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); // 1. 判断数组是否为空,通过key定位到数组下标是否为空 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 2. 判断node节点第一个元素是不是要找到,如果是直接返回 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 3. 如果是红黑树结构,就从红黑树里面查询 else if (eh < 0) // 头节点hash值小于0,说明正在扩容或者是红黑树,find查找 return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { // 是链表,遍历查询 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null;}
get() 方法的步骤如下:
第1步,判断数组是否为空,通过 key 定位到数组下标是否为空;第2步,判断 node 节点第一个元素是不是要找到,如果是直接返回;第3步,如果是红黑树结构,就从红黑树里面查询;第4步,如果是链表结构,循环遍历判断;7.remove 操作
remove() 方法和 put() 方法类似,只是方向是反的。
扒一下 remove() 方法的源码:
public V remove(Object key) { return replaceNode(key, null, null);}/** * Implementation for the four public remove/replace methods: * Replaces node value with v, conditional upon match of cv if * non-null. If resulting value is null, delete. */final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); // 1. 循环遍历数组 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 2. 参数校验 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; else if ((fh = f.hash) == MOVED) // 3. 帮助扩容 tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; // 4. 利用 synchronized 同步锁,保证并发时元素移除安全 synchronized (f) { if (tabAt(tab, i) == f) { // 4.1 判断当前冲突节点是否为链表结构,如果是循环遍历移除 if (fh >= 0) { validated = true; for (Node<K,V> e = f, pred = null;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } pred = e; if ((e = e.next) == null) break; } } // 4.2 如果是红黑树结构,利用红黑二叉树特性进行查找并移除节点,最后调整红黑树结构 else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } if (validated) { if (oldVal != null) { if (value == null) // 5. 因为check=-1,所以不会进行扩容操作,利用CAS操作修改baseCount值 addCount(-1L, -1); return oldVal; } break; } } } return null;}
remove() 操作的步骤如下:
第1步,循环遍历数组,接着校验参数;第2步,判断是否有别的线程正在扩容,如果是一起扩容;第3步,用 synchronized 同步锁,保证并发时元素移除安全;第4步,因为 check= -1,所以不会进行扩容操作,利用 CAS 操作修改 baseCount 值。8.transfer() 方法
当 ConcurrentHashMap 容量不足的时候,需要对 table 进行扩容。这个方法的基本思想跟 HashMap 是很像的,但是由于它是支持并发扩容的,所以要复杂的多。原因是它支持多线程进行扩容操作,而并没有加锁。这样做的目的可能不仅仅是为了满足 concurrent 的要求,而是希望利用并发处理去减少扩容带来的时间影响。
扒一下 transfer() 方法的源码:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 1. 新建Node数组,容量为之前的两倍 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; // 2. 新建forwardingNode引用,在之后会用到 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 3. 确定遍历中的索引i while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } // 4.将原数组中的元素复制到新数组中去 // 4.5 for循环退出,扩容结束修改sizeCtl属性 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } // 4.1 当前数组中第i个元素为null,用CAS设置成特殊节点forwardingNode(可以理解成占位符) else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 4.2 如果遍历到ForwardingNode节点,说明这个点已经被处理过了,直接跳过 这里是控制并发扩容的核心 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { // 4.3 处理当前节点为链表的头结点的情况,构造两个链表:一个是原链表、另一个是原链表的反序排列 int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 在nextTable的i位置上插入一个链表 setTabAt(nextTab, i, ln); // 在nextTable的i+n的位置上插入另一个链表 setTabAt(nextTab, i + n, hn); // 在table的i位置上插入forwardNode节点,表示已经处理过该节点 setTabAt(tab, i, fwd); // 设置advance为true,返回到上面的while循环中就可以执行i--操作 advance = true; } // 4.4 处理当前节点是TreeBin时的情况,操作和上面的类似 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } }}
整个扩容操作分为两个部分:
第一部分是构建一个 nextTable,它的容量是原来的两倍,这个操作是单线程完成的。新建 table 数组的代码为:Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1],在原容量大小的基础上右移一位。
第二个部分就是将原来 table 中的元素复制到 nextTable 中,主要是遍历复制的过程。根据运算得到当前遍历的数组的位置 i,然后利用 tabAt() 方法获得 i 位置的元素再进行判断:
如果这个位置为空,就在原 table 中的 i 位置放入 forwardNode 节点,这个也是触发并发扩容的关键点;如果这个位置是 Node 节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在 nextTable 的 i 和 i+n 的位置上;如果这个位置是 TreeBin 节点(fh<0),也做一个反序处理,并且判断是否需要 untreefi,把处理的结果分别放在 nextTable 的 i 和 i+n 的位置上;遍历过所有的节点以后就完成了复制工作,这时让 nextTable 作为新的 table,并且更新 sizeCtl 为新容量的 0.75 倍 ,完成扩容。设置为新容量的 0.75 倍代码为 sizeCtl = (n << 1) - (n >>> 1),仔细体会下是不是很巧妙,n<<1 相当于 n 左移一位表示 n 的两倍即 2n,n>>>1 右一位相当于 n 除以 2 即 0.5n,然后两者相减为 2n-0.5n=1.5n,就刚好等于新容量的 0.75 倍即 2n*0.75=1.5n。9.与 size 相关的一些方法
对于 ConcurrentHashMap 来说,这个 table 里到底装了多少东西其实是个不确定的数量,因为不可能在调用 size() 方法的时候像 GC 的 “stop the world” 一样让其他线程都停下来让你去统计,因此只能说这个数量是个估计值。对于这个估计值,ConcurrentHashMap 也是大费周章才计算出来的。
为了统计元素个数,ConcurrentHashMap 定义了一些变量和一个内部类:
/** * A padded cell for distributing counts. Adapted from LongAdder * and Striped64. See their internal docs for explanation. */@sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; }}/******************************************/ /** * 实际上保存的是hashmap中的元素个数 利用CAS锁进行更新 但它并不用返回当前hashmap的元素个数 */private transient volatile long baseCount;/** * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. */private transient volatile int cellsBusy;/** * Table of counter cells. When non-null, size is a power of 2. */private transient volatile CounterCell[] counterCells;
mappingCount 与 size() 方法:
mappingCount 与 size() 方法的类似,从给出的注释来看,应该使用 mappingCount 代替 size 方法,两个方法都没有直接返回 basecount ,而是统计一次这个值,而这个值其实也是一个大概的数值,因此可能在统计的时候有其他线程正在执行插入或删除操作。
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);} /** * Returns the number of mappings. This method should be used * instead of {@link #size} because a ConcurrentHashMap may * contain more mappings than can be represented as an int. The * value returned is an estimate; the actual count may differ if * there are concurrent insertions or removals. * * @return the number of mappings * @since 1.8 */public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values} final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value;//所有counter的值求和 } } return sum;}3 CAS(乐观锁)
CAS 是乐观锁的一种实现方式,是一种轻量级锁,JUC 中很多工具类的实现就是基于 CAS 的。
CAS 操作的流程如下图所示。线程在读取数据时不进行加锁,在准备写回数据时,比较原值是否被修改,若未被其他线程修改则写回,若已被修改,则重新执行读取流程。
这是一种乐观策略,认为并发操作并不总会发生。
乐观锁在开发场景中非常常用。
举个例子,假如我们要修改数据库中的一条数据,修改之前要先拿到它原来的值,然后还要在 SQL 中加一个判断:原来的值 == 现在拿到的它的原来的值是否一样,一样的话就可以修改了,不一样就说明被别的线程修改了,我们直接 return 错误就好了。
# oldValue 就是我们执⾏前查询出来的值update a set value = newValue where value = #{oldValue}1.CAS 关键操作
tabAt
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);}
该方法用来获取 table 数组中索引为 i 的 Node 元素。
casTabAt
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);}
利用 CAS 操作设置 tab 数组中索引为 i 的元素。
setTabAt
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);}
该方法用来设置 table 数组中索引为 i 的元素。
2.CAS 是否一定能保证数据没有被别的线程修改过?
当然不能。比如很经典的 ABA 问题,CAS 就无法判断。其实这一点在快速失败机制中也遇到了类似的问题。
3.什么是 ABA?
就是说,一个线程把值改为了 B,另一个线程又把值改为了 A。这个时候判断线程是否被修改,发现它的值还是 A,所以它并不知道这个值是否被修改过。其实在大多数场景下只是追求最终结果,结果正确就可以。
但是在实际开发中,是需要记录每一步的修改过程的,比如银行转账,每次的修改都应该是有记录的,方便回溯。
4.如何解决 ABA 问题?
方法1:版本号验证
用版本号去验证(可自定义组合而成)。比如说,我在修改前去查询它原来的值所对应的版本号,每次判断都将值和版本号一起进行判断,判断成功就给版本号加 1,说明记录下了这一次的操作。
# 判断原来的值和版本号是否匹配,中间有别的线程修改,值可能相等,但是版本号一定不相同update a set value = newValue ,vision = vision + 1 where value = #{oldValue} and vision = #{vision}
所以这样就解决了ABA问题,保证了一个线程操作的数据不被别的线程所修改。
方法2:时间戳
其实时间戳和版本号作用是一样的,异曲同工.对一个数据进行操作的时候把时间戳也带上,我们知道时间戳每一刻都在变化,所以任何时候的操作只要记录下来时间戳,后续再对该数据进行操作时带上时间戳进行判断就解决了 ABA 问题。
5.CAS乐观锁效率很高,synchronized 效率不是很好,为什么 JDK 1.8 之后反而增加了 synchronized 同步锁的数量?
synchronized 之前一直都是重量级的锁,但是后来 Java 官方是对它进行过升级了,它现在采用的是锁升级的方式去做的。
针对 sychronized 获取锁的方式,JVM 使用了锁升级的优化方式,就是先使用偏向锁优先同一线程,然后再次获取锁:
如果失败,就会升级为 CAS 轻量级锁;如果失败就会短暂自旋,防止线程被系统挂起;最后如果上述都失败,就会升级为重量级锁。
所以 synchronized 升级后,刚开始不是直接就使用重量级锁,而是通过很多轻量级锁的方式一步步升级上去的。
4 总结1.JDK 1.7
JDK 1.7 中 ConcurrentHashMap 使用的是分段锁来减小锁粒度,分割成若干个 Segment,然后每一个 Segment 上同时只有一个线程可以操作,每一个 Segment 都是一个类似 HashMap 数组的结构,它可以扩容,它的冲突会转化为链表。但是 Segment 的个数一但初始化就不能改变。
在 put 的时候需要锁住 Segment;get 的时候不加锁,使用 volatile 来保证可见性。当要统计全局时(size()),首先会尝试多次计算 modcount 来确定:这几次尝试中,是否有其他线程进行了修改操作。如果没有,则直接返回 size;如果有,则需要依次锁住所有的 Segment 来计算。
2.JDK 1.8
JDK 1.8 中的 ConcurrentHashMap 使用的 Synchronized 锁加 CAS 的机制。结构也由 JDK 1.7 中的 Segment 数组 + HashEntry 数组 + 链表 进化成了 Node 数组 + 链表 / 红黑树,Node 是类似于一个 HashEntry 的结构。它的冲突再达到一定大小时会转化成红黑树,在冲突小于一定数量时又退回链表。
3.对比
1.8 之前 put 定位节点时要先定位到具体的 segment,然后再在 segment 中定位到具体的桶。而在 1.8 的时候摒弃了 segment 臃肿的设计,直接针对的是 Node[] tale数组中的每一个桶,进一步减小了锁粒度。并且防止拉链过长导致性能下降,当链表长度大于 8 的时候采用红黑树的设计。
主要设计上的变化有以下几点:
不采用 segment 而采用 node,锁住 node 来实现减小锁粒度。设计了 MOVED 状态,在 resize 的过程中线程 2 还在 put 数据的话,线程 2 会帮助 resize。使用 3 个 CAS 操作来确保 node 的一些操作的原子性,这种方式代替了锁。sizeCtl 的不同值来代表不同含义,起到了控制的作用。采用 synchronized 而不是 ReentrantLock。
标签: #c语言怎么表示10的n次方