与HashMap一样,ConcurrentHashMap也是一个基于散列的Map,但是它使用了锁分段的技术来提供更高的并发性和伸缩性。
锁分段就是进一步对一组独立的对象进行分解。例如,在ConcurrentHashMap的实现中使用了一个包含16个锁的数组,每个锁保护所有散列桶的1/16,其实第N个散列桶由第(N mod 16)个锁来保护。所以这个并发集合可以支持多达16个并发的写入器。
首先举个例子:
public class StripedMap {
// Synchronization policy: buckets[n] guarded by locks[n%N_LOCKS]
private static final int N_LOCKS = 16; // 并发锁的数量
private final Node[] buckets; // 散列桶
private final Object[] locks; // 锁数组
private static class Node { // 链表中的节点
Node next;
Object key;
Object value;
}
public StripedMap(int numBuckets) { // 构造函数
buckets = new Node[numBuckets];
locks = new Object[N_LOCKS];
for (int i = 0; i < N_LOCKS; i++)
locks[i] = new Object();
}
private final int hash(Object key) { // 计算值的存储位置,相当于散列函数
return Math.abs(key.hashCode() % buckets.length);
}
public Object get(Object key) {
int hash = hash(key);
synchronized (locks[hash % N_LOCKS]) { // 计算出由哪个锁来保护这个散列桶
for (Node m = buckets[hash]; m != null; m = m.next) // 遍历这个散列桶,找到需要的值
if (m.key.equals(key))
return m.value;
}
return null;
}
public void clear() {
for (int i = 0; i < buckets.length; i++) {
synchronized (locks[i % N_LOCKS]) { // 将锁分段中的值清空
buckets[i] = null;
}
}
}
}
如上使用了锁分段技术简单实现了一个Map并发容器,但是与采用单个锁来实现独占访问相比,要获取多个锁来实现独占访问将更加困难并且开销更高。如当ConcurrentHashMap需要扩展映射范围,以及重新计算键值的散列值要分布到更大的桶集合中时,就需要获取分段锁集合中所有的锁。
首先来看一下ConcurrentHashMap中最主要的一个构造函数,如下:
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
// 参数有效性判断
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// concurrencyLevel是用来计算segments的容量
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
int ssize = 1;
// ssize是大于或等于concurrencyLevel的最小的2的N次方值
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 初始化segmentShift和segmentMask
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
// 哈希表的初始容量
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize; // 计算哈希表的实际容量
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY; // segments中的HashEntry数组的长度
while (cap < c)
cap <<= 1;
// segments
Segment
s0 = new Segment
(loadFactor, (int)(cap * loadFactor), (HashEntry
[])new HashEntry[cap]); Segment
[] ss = (Segment
[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
如上使用了Segment来表示锁分段,来看一下这个类源代码,如下:
static final class Segment在每个Segment中通过HashEntry来表示链结构,类似于前面例子中的Node节点,主要的代码如下:extends ReentrantLock implements Serializable { transient volatile HashEntry [] table; // threshold阈,是哈希表在其容量自动增加之前可以达到多满的一种尺度。 transient int threshold; // loadFactor是加载因子 final float loadFactor; Segment(float lf, int threshold, HashEntry [] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } // 省略部分代码 }
static final class HashEntry{ final int hash; final K key; volatile V value; volatile HashEntry next; HashEntry(int hash, K key, V value, HashEntry next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } // 省略部分代码 }
1、获取元素
public V get(Object key) {
Segment
s; // manually integrate access methods to reduce overhead
HashEntry
[] tab; int h = hash(key.hashCode()); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment
)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) { for (HashEntry
e = (HashEntry
) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
2、添加元素
public V put(K key, V value) {
Segment
s;
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment
)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
接着调用了Segment类中的put方法将元素添加到链表中,如下:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry
node = tryLock() null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry
[] tab = table; int index = (tab.length - 1) & hash; HashEntry
first = entryAt(tab, index); for (HashEntry
e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry