Java中有些多线程编程模式在很大程序上都依赖于Queue实现的线程安全性,所以非常有必要认识,首先来看一下接口定义,如下:
public interface QueueBlockingQueue类继承了如上的接口,定义如下:extends Collection { // 向队列中添加元素 boolean add(E e); boolean offer(E e); // 删除队列元素 E remove(); E poll(); // 检查队列元素 E element(); E peek(); }
public interface BlockingQueue这个接口中本身定义的方法,加上从Queue接口中继承的方法后,可以将BlockingQueue方法大概分为4种形式,如下:extends Queue { boolean add(E e); boolean offer(E e); void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; E take() throws InterruptedException; E poll(long timeout, TimeUnit unit) throws InterruptedException; int remainingCapacity(); boolean remove(Object o); public boolean contains(Object o); int drainTo(Collection c); int drainTo(Collection c, int maxElements); }
vcbky/u1xM/fs8zKubbTwdDW2NDCseS1w7/Vz9DG8MC0o6zI57TTttPB0NbQ0saz/dK7uPa78tXftuC49tSqy9ijrLvy1d/N6sirx+W/1bbTwdChozxicj4Kz8LD5sC0xKPE4tK7uPbX6Mj7ttPB0LXEvPK1pcq1z9ajrMjnz8KjujwvcD4KPHA+PC9wPgo8cHJlIGNsYXNzPQ=="brush:java;">public class BlockingQueue { private List queue = new LinkedList(); private int limit = 10; public BlockingQueue(int limit) { this.limit = limit; } public synchronized void enqueue(Object item) throws InterruptedException { while (this.queue.size() == this.limit) { wait(); } if (this.queue.size() == 0) { notifyAll(); // 通知所有的线程来取出,如果是加入线程则继续等待 } this.queue.add(item); } public synchronized Object dequeue() throws InterruptedException { while (this.queue.size() == 0) { wait(); } if (this.queue.size() == this.limit) { notifyAll(); // 通知所有的线程来加入,如果是取出线程则继续等待 } return this.queue.remove(0); } }必须注意到,在enqueue和dequeue方法内部,只有队列的大小等于上限(limit)或者下限(0)时,才调用notifyAll方法。如果队列的大小既不等于上限,也不等于下限,任何线程调用enqueue或者dequeue方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。
Java提供了BlockingQueue接口的两个基本实现:LinkedBlockingQueue和ArrayBlockingQueue。他们都是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。他们的用法之间稍有区别,如已知队列的大小而能确定合适的边界时,用ArrayBlockingQueue非常高效。
下面来看ArrayBlockingQueue类的最主要的一个构造函数,如下:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}Lock的作用是提供独占锁机制,来保护竞争资源;而Condition是为了更加精细的对锁进行控制,它依赖于Lock,通过某个条件对多线程进行控制。
notEmpty表示锁的非空条件。当某线程想从队列中取数据时,而此时又没有数据,则该线程通过notEmpty.await()进行等待;当其它线程向队列中插入了元素之后,就调用notEmpty.signal()唤醒之前通过notEmpty.await()进入等待状态的线程。同理,notFull表示“锁的满条件”。当某线程想向队列中插入元素,而此时队列已满时,该线程等待;当其它线程从队列中取出元素之后,就唤醒该等待的线程。
1、添加元素
public boolean add(E e) {
return super.add(e);
}
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
insert(e);
return true;
} finally {
lock.unlock();
}
}
(1)add(E e)方法会调用AbstractQueue类中的方法,代码如下:
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
还是