Java 7之多线程并发容器 - ConcurrentHashMap(一)

2014-11-24 02:40:31 · 作者: · 浏览: 2

与HashMap一样,ConcurrentHashMap也是一个基于散列的Map,但是它使用了锁分段的技术来提供更高的并发性和伸缩性。

锁分段就是进一步对一组独立的对象进行分解。例如,在ConcurrentHashMap的实现中使用了一个包含16个锁的数组,每个锁保护所有散列桶的1/16,其实第N个散列桶由第(N mod 16)个锁来保护。所以这个并发集合可以支持多达16个并发的写入器。

首先举个例子:

public class StripedMap {
    // Synchronization policy: buckets[n] guarded by locks[n%N_LOCKS]
    private static final int N_LOCKS = 16;  // 并发锁的数量
    private final Node[] buckets;           // 散列桶
    private final Object[] locks;           // 锁数组

    private static class Node {             // 链表中的节点
        Node next;
        Object key;
        Object value;
    }

    public StripedMap(int numBuckets) {    // 构造函数
        buckets = new Node[numBuckets];
        locks = new Object[N_LOCKS];
        for (int i = 0; i < N_LOCKS; i++)
            locks[i] = new Object();
    }

    private final int hash(Object key) {    // 计算值的存储位置,相当于散列函数
        return Math.abs(key.hashCode() % buckets.length);
    }

    public Object get(Object key) {
        int hash = hash(key);
        synchronized (locks[hash % N_LOCKS]) {                   // 计算出由哪个锁来保护这个散列桶
            for (Node m = buckets[hash]; m != null; m = m.next)  // 遍历这个散列桶,找到需要的值
                if (m.key.equals(key))
                    return m.value;
        }
        return null;
    }

    public void clear() {
        for (int i = 0; i < buckets.length; i++) {
            synchronized (locks[i % N_LOCKS]) {                 // 将锁分段中的值清空
                buckets[i] = null;
            }
        }
    }
}

如上使用了锁分段技术简单实现了一个Map并发容器,但是与采用单个锁来实现独占访问相比,要获取多个锁来实现独占访问将更加困难并且开销更高。如当ConcurrentHashMap需要扩展映射范围,以及重新计算键值的散列值要分布到更大的桶集合中时,就需要获取分段锁集合中所有的锁。

首先来看一下ConcurrentHashMap中最主要的一个构造函数,如下:

public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
        // 参数有效性判断
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        // concurrencyLevel是用来计算segments的容量
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        int sshift = 0;
        int ssize = 1;
        // ssize是大于或等于concurrencyLevel的最小的2的N次方值
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        // 初始化segmentShift和segmentMask
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        // 哈希表的初始容量
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize; // 计算哈希表的实际容量
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY; // segments中的HashEntry数组的长度
        while (cap < c)
            cap <<= 1;
        // segments
        Segment
  
    s0 = new Segment
   
    (loadFactor, (int)(cap * loadFactor), (HashEntry
    
     [])new HashEntry[cap]); Segment
     
      [] ss = (Segment
      
       [])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
      
     
    
   
  
如上使用了Segment来表示锁分段,来看一下这个类源代码,如下:

static final class Segment
  
    extends ReentrantLock implements Serializable {
    transient volatile HashEntry
   
    [] table; // threshold阈,是哈希表在其容量自动增加之前可以达到多满的一种尺度。 transient int threshold; // loadFactor是加载因子 final float loadFactor; Segment(float lf, int threshold, HashEntry
    
     [] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } // 省略部分代码 }
    
   
  
在每个Segment中通过HashEntry来表示链结构,类似于前面例子中的Node节点,主要的代码如下:

 static final class HashEntry
  
    {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry
   
     next; HashEntry(int hash, K key, V value, HashEntry
    
      next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } // 省略部分代码 }
    
   
  


1、获取元素


    public V get(Object key) {
        Segment
  
    s; // manually integrate access methods to reduce overhead
        HashEntry
   
    [] tab; int h = hash(key.hashCode()); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment
    
     )UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) { for (HashEntry
     
       e = (HashEntry
      
       ) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
      
     
    
   
  


2、添加元素


 public V put(K key, V value) {
        Segment
  
    s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment
   
    )UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
   
  
接着调用了Segment类中的put方法将元素添加到链表中,如下:

 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry
  
    node = tryLock()   null : scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry
   
    [] tab = table; int index = (tab.length - 1) & hash; HashEntry
    
      first = entryAt(tab, index); for (HashEntry
     
       e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry