Java多线程总结之线程安全队列Queue(二)

2014-11-24 02:03:44 · 作者: · 浏览: 1
atile to ensure barrier separating write and read */volatile E item;Node next;Node(E x) { item = x; }}
然后,对于链表来说,肯定需要两个变量来标示头和尾:
Java代码 复制代码 收藏代码
    /** 头指针 */private transient Node head;//head.next是队列的头元素/** 尾指针 */private transient Node last;//last.next是null
    那么,对于入队和出队就很自然能理解了:
    Java代码 复制代码 收藏代码
      private void enqueue(E x) {last = last.next = new Node (x);//入队是为last再找个下家}
      private E dequeue() {Node first = head.next; //出队是把head.next取出来,然后将head向后移一位head = first;E x = first.item;first.item = null;return x;}
      另外,LinkedBlockingQueue相对于ArrayBlockingQueue还有不同是,有两个ReentrantLock,且队列现有元素的大小由一个AtomicInteger对象标示。
      注:AtomicInteger类是以原子的方式操作整型变量。
      Java代码 复制代码 收藏代码
        private final AtomicInteger count =new AtomicInteger(0);/** 用于读取的独占锁*/private final ReentrantLock takeLock =new ReentrantLock();/** 队列是否为空的条件 */private final Condition notEmpty = takeLock.newCondition();/** 用于写入的独占锁 */private final ReentrantLock putLock =new ReentrantLock();/** 队列是否已满的条件 */private final Condition notFull = putLock.newCondition();
        有两个Condition很好理解,在ArrayBlockingQueue也是这样做的。但是为什么需要两个ReentrantLock呢?下面会慢慢道来。
        让我们来看看offer和poll方法的代码:
        Java代码 复制代码 收藏代码
          public boolean offer(E e) {if (e == null)throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() == capacity)return false;int c = -1;final ReentrantLock putLock =this.putLock;//入队当然用putLockputLock.lock();try {if (count.get() < capacity) {enqueue(e); //入队c = count.getAndIncrement(); //队长度+1if (c + 1 < capacity)notFull.signal(); //队列没满,当然可以解锁了}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();//这个方法里发出了notEmpty.signal();return c >= 0;}
          public E poll() {final AtomicInteger count = this.count;if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock =this.takeLock;出队当然用takeLocktakeLock.lock();try {if (count.get() > 0) {x = dequeue();//出队c = count.getAndDecrement();//队长度-1if (c > 1)notEmpty.signal();//队列没空,解锁}} finally {takeLock.unlock();}if (c == capacity)signalNotFull();//这个方法里发出了notFull.signal();return x;}
          看看源代码发现和上面ArrayBlockingQueue的很类似,关键的问题在于:为什么要用两个ReentrantLockputLock和takeLock?
          我们仔细想一下,入队操作其实操作的只有队尾引用last,并且没有牵涉到head。而出队操作其实只针对head,和last没有关系。那么就是说入队和出队的操作完全不需要公用一把锁,所以就设计了两个锁,这样就实现了多个不同任务的线程入队的同时可以进行出队的操作,另一方面由于两个操作所共同使用的count是AtomicInteger类型的,所以完全不用考虑计数器递增递减的问题。
          另外,还有一点需要说明一下:await()和singal()这两个方法执行时都会检查当前线程是否是独占锁的当前线程,如果不是则抛出java.lang.IllegalMonitorStateException异常。所以可以看到在源码中这两个方法都出现在Lock的保护块中。

          -------------------------------我是分割线--------------------------------------

          下面再来说说ConcurrentLinkedQueue,它是一个无锁的并发线程安全的队列。
          以下部分的内容参照了这个帖子http://yanxuxin.iteye.com/blog/586943
          对比锁机制的实现,使用无锁机制的难点在于要充分考虑线程间的协调。简单的说就是多个线程对内部数据结构进行访问时,如果其中一个线程执行的中途因为一些原因出现故障,其他的线程能够检测并帮助完成剩下的操作。这就需要把对数据结构的操作过程精细的划分成多个状态或阶段,考虑每个阶段或状态多线程访问会出现的情况。
          ConcurrentLinkedQueue有两个volatile的线程共享变量:head,tail。要保证这个队列的线程安全就是保证对这两个Node的引用的访问(更新,查看)的原子性和可见性,由于volatile本身能够保证可见性,所以就是对其修改的原子性要被保证。
          下面通过offer方法的实现来看看在无锁情况下如何保证原子性:
          Java代码 复制代码 收藏代码
            public boolean offer(E e) {if (e == null)throw new NullPointerException();Node n = new Node (e, null);for (;;) {Node t = tail;Node s = t.getNext();if (t == tail) { //------------------------------aif (s == null) {//---------------------------bif (t.casNext(s, n)) { //-------------------ccasTail(t, n); //------------------------dreturn true;}} else {casTail(t, s); //----------------------------e}}}}
            此方法的循环内首先获得尾指针和其next指向的对象,由于tail和Node的next均是volatile的,所以保证了获得的分别都是最新的值。
            代码a:t==tail是最上层的协调,如果其他线程改变了tail的引用,则说明现在