Java 7之多线程第9篇 - 显式的Condition对象(一)

2014-11-24 02:40:33 · 作者: · 浏览: 0

下面这个例子使用synchronized关键字和wait() 、notifyAll()方法实现同步。

public abstract class BaseBoundedBuffer
  
    {
	private final V[] buf;
	private int tail;
	private int head;
	private int count;

	protected BaseBoundedBuffer(int capacity) {
		this.buf = (V[]) new Object[capacity];
	}

	protected synchronized final void doPut(V v) {
		buf[tail] = v;
		if (++tail == buf.length)
			tail = 0;
		++count;
	}

	protected synchronized final V doTake() {
		V v = buf[head];
		buf[head] = null;
		if (++head == buf.length)
			head = 0;
		--count;
		return v;
	}

	public synchronized final boolean isFull() {
		return count == buf.length;
	}

	public synchronized final boolean isEmpty() {
		return count == 0;
	}
}
  


public class BoundedBuffer 
  
    extends BaseBoundedBuffer
   
     { // CONDITION PREDICATE: not-full (!isFull()) // CONDITION PREDICATE: not-empty (!isEmpty()) public BoundedBuffer() { this(100); } public BoundedBuffer(int size) { super(size); } // BLOCKS-UNTIL: not-full public synchronized void put(V v) throws InterruptedException { while (isFull()) wait(); doPut(v); notifyAll(); } // BLOCKS-UNTIL: not-empty public synchronized V take() throws InterruptedException { while (isEmpty()) wait(); V v = doTake(); notifyAll(); return v; } // BLOCKS-UNTIL: not-full // Alternate form of put() using conditional notification public synchronized void alternatePut(V v) throws InterruptedException { while (isFull()) wait(); boolean wasEmpty = isEmpty(); doPut(v); if (wasEmpty) notifyAll(); } }
   
  
下面来使用Condition来实现。

Condition接口中定义的方法如下所示:

public interface Condition {
	// 造成当前线程在接到信号或被中断之前一直处于等待状态。
	void await();
	// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
	boolean await(long time, TimeUnit unit);
	// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
	long awaitNanos(long nanosTimeout);
	// 造成当前线程在接到信号之前一直处于等待状态。
	void awaitUninterruptibly();
	// 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
	boolean awaitUntil(Date deadline);
	void signal();    // 唤醒一个等待线程
	void signalAll(); // 唤醒所有等待线程
}
在Condition对象中,与wait()、notifyAll()和notify()方法相对应的分别为await、signalAll和signal。

举个具体的例子,如下:

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition(); // 不满
    final Condition notEmpty = lock.newCondition(); // 不空

    final Object[] items = new Object[5];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();               // 获取锁
        try {
            // 如果缓冲已满,则等待;直到缓冲不是满的,才将x添加到缓冲中
            while (count == items.length)
                   notFull.await();
            items[putptr] = x;     // 将x添加到缓冲中
            // 将put统计数putptr+1;如果缓冲已满,则设putptr为0。
            if (++putptr == items.length) putptr = 0;
            ++count;               // 将缓冲数量+1
            notEmpty.signal();     // 唤醒take线程,因为take线程通过notEmpty.await()等待
            // 打印写入的数据
            System.out.println(Thread.currentThread().getName() + " put  "+ (Integer)x);
        } finally {
            lock.unlock();         // 释放锁
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();           // 获取锁
        try {
            while (count == 0) // 如果缓冲为空,则等待;直到缓冲不为空,才将x从缓冲中取出
                notEmpty.await();
            Object x = items[takeptr]; // 将x从缓冲中取出
            // 将take统计数takeptr+1;如果缓冲为空,则设takeptr为0。
            if (++takeptr == items.length) 
            	  takeptr = 0;
            --count;            // 将缓冲数量-1
            notFull.signal();   // 唤醒put线程,因为put线程通过notFull.await()等待
            // 打印取出的数据
            System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
            return x;
        } finally {
            lock.unlock();      // 释放锁
        }
    } 
}

如上使用了ReentrantLock独占锁和Condition对象给出了有界缓存的实现,即