然后,对于链表来说,肯定需要两个变量来标示头和尾:
Java代码
-
/** 头指针 */private transient Node
那么,对于入队和出队就很自然能理解了:
Java代码
-
private void enqueue(E x) {last = last.next = new Node
private E dequeue() {Node
另外,LinkedBlockingQueue相对于ArrayBlockingQueue还有不同是,有两个ReentrantLock,且队列现有元素的大小由一个AtomicInteger对象标示。
注:AtomicInteger类是以原子的方式操作整型变量。
Java代码
-
private final AtomicInteger count =new AtomicInteger(0);/** 用于读取的独占锁*/private final ReentrantLock takeLock =new ReentrantLock();/** 队列是否为空的条件 */private final Condition notEmpty = takeLock.newCondition();/** 用于写入的独占锁 */private final ReentrantLock putLock =new ReentrantLock();/** 队列是否已满的条件 */private final Condition notFull = putLock.newCondition();
有两个Condition很好理解,在ArrayBlockingQueue也是这样做的。但是为什么需要两个ReentrantLock呢?下面会慢慢道来。
让我们来看看offer和poll方法的代码:
Java代码
-
public boolean offer(E e) {if (e == null)throw new NullPointerException();final AtomicInteger count = this.count;if (count.get() == capacity)return false;int c = -1;final ReentrantLock putLock =this.putLock;//入队当然用putLockputLock.lock();try {if (count.get() < capacity) {enqueue(e); //入队c = count.getAndIncrement(); //队长度+1if (c + 1 < capacity)notFull.signal(); //队列没满,当然可以解锁了}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();//这个方法里发出了notEmpty.signal();return c >= 0;}
public E poll() {final AtomicInteger count = this.count;if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock =this.takeLock;出队当然用takeLocktakeLock.lock();try {if (count.get() > 0) {x = dequeue();//出队c = count.getAndDecrement();//队长度-1if (c > 1)notEmpty.signal();//队列没空,解锁}} finally {takeLock.unlock();}if (c == capacity)signalNotFull();//这个方法里发出了notFull.signal();return x;}
看看源代码发现和上面ArrayBlockingQueue的很类似,关键的问题在于:为什么要用两个ReentrantLockputLock和takeLock?
另外,还有一点需要说明一下:await()和singal()这两个方法执行时都会检查当前线程是否是独占锁的当前线程,如果不是则抛出java.lang.IllegalMonitorStateException异常。所以可以看到在源码中这两个方法都出现在Lock的保护块中。
-------------------------------我是分割线--------------------------------------
下面再来说说ConcurrentLinkedQueue,它是一个无锁的并发线程安全的队列。
以下部分的内容参照了这个帖子http://yanxuxin.iteye.com/blog/586943
对比锁机制的实现,使用无锁机制的难点在于要充分考虑线程间的协调。简单的说就是多个线程对内部数据结构进行访问时,如果其中一个线程执行的中途因为一些原因出现故障,其他的线程能够检测并帮助完成剩下的操作。这就需要把对数据结构的操作过程精细的划分成多个状态或阶段,考虑每个阶段或状态多线程访问会出现的情况。
ConcurrentLinkedQueue有两个volatile的线程共享变量:head,tail。要保证这个队列的线程安全就是保证对这两个Node的引用的访问(更新,查看)的原子性和可见性,由于volatile本身能够保证可见性,所以就是对其修改的原子性要被保证。
下面通过offer方法的实现来看看在无锁情况下如何保证原子性:
Java代码
-
public boolean offer(E e) {if (e == null)throw new NullPointerException();Node
此方法的循环内首先获得尾指针和其next指向的对象,由于tail和Node的next均是volatile的,所以保证了获得的分别都是最新的值。
代码a:t==tail是最上层的协调,如果其他线程改变了tail的引用,则说明现在