并发集合(二)
nsume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
五个队列所提供的各有不同:
ArrayBlockingQueue :一个由数组支持的有界队列。
LinkedBlockingQueue :一个由链接节点支持的可选有界队列。
PriorityBlockingQueue :一个由优先级堆支持的无界优先级队列。
DelayQueue :一个由优先级堆支持的、基于时间的调度队列。
SynchronousQueue :一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。
前两个类 ArrayBlockingQueue 和 LinkedBlockingQueue 几乎相同,只是在后备存储器方面有所不同, LinkedBlockingQueue 并不总是有容量界限。无大小界限的 LinkedBlockingQueue 类在添加元素时永远不会有阻塞队列的等待(至少在其中有 Integer.MAX_VALUE元素之前不会)。
PriorityBlockingQueue 是具有无界限容量的队列,它利用所包含元素的 Comparable 排序顺序来以逻辑顺序维护元素。可以将它看作TreeSet 的可能替代物。例如,在队列中加入字符串 One、Two、Three 和 Four 会导致 Four 被第一个取出来。对于没有天然顺序的元素,可以为构造函数提供一个 Comparator 。不过对 PriorityBlockingQueue 有一个技巧。从 iterator() 返回的 Iterator 实例不需要以优先级顺序返回元素。如果必须以优先级顺序遍历所有元素,那么让它们都通过 toArray() 方法并自己对它们排序,像Arrays.sort(pq.toArray()) 。
新的 DelayQueue 实现可能是其中最有意思(也是最复杂)的一个。加入到队列中的元素必须实现新的 Delayed 接口(只有一个方法 —— long getDelay(java.util.concurrent.TimeUnit unit) )。因为队列的大小没有界限,使得添加可以立即返回,但是在延迟时间过去之前,不能从队列中取出元素。如果多个元素完成了延迟,那么最早失效/失效时间最长的元素将第一个取出。实际上没有听上去这样复杂。清单 3 演示了这种新的阻塞队列集合的使用:
清单 3. 使用 DelayQueue 实现
import java.util.*;
import java.util.concurrent.*;
public class Delay {
/**
* Delayed implementation that actually delays
*/
static class NanoDelay implements Delayed {
long trigger;
NanoDelay(long i) {
trigger = System.nanoTime() + i;
}
public int compareTo(Object y) {
long i = trigger;
long j = ((NanoDelay)y).trigger;
if (i < j) return -1;
if (i > j) return 1;
return 0;
}
public boolean equals(Object other) {
return ((NanoDelay)other).trigger == trigger;
}
public boolean equals(NanoDelay other) {
return ((NanoDelay)other).trigger == trigger;
}
public long getDelay(TimeUnit unit) {
long n = trigger - System.nanoTime();
return unit.convert(n, TimeUnit.NANOSECONDS);
}
public long getTriggerTime() {
return trigger;
}
public String toString() {
return String.valueOf(trigger);
}
}
public static void main(String args[]) throws InterruptedException {
Random random = new Random();
DelayQueue queue = new DelayQueue();
for (int i=0; i < 5; i++) {
queue.add(new NanoDelay(random.nextInt(1000)));
}
long last = 0;
for (int i=0; i < 5; i++) {
NanoDelay delay = (NanoDelay)(queue.take());
long tt = delay.getTriggerTime();
System.out.println("Trigger time: " + tt);
if (i != 0) {
System.out.println("Delta: " + (tt - last));
}
last = tt;
}
}
}
这个例子首先是一个内部类 NanoDelay ,它实质上将暂停给定的任意纳秒(nanosecond)数,这里利用了 System 的新 nanoTime()方法。然后 main() 方法只是将 NanoDelay 对象放到队列中并再次将它们取出来。如果希望队列项做一些其他事情,就需要在 Delayed对象的实现中加入方法,并在从队列中取出后调用这个新方法。(请随意扩展 NanoDelay 以试验加入其他方法做一些有趣的事情。)显示从队列中取出元素的两次调用之间的时间差。如果时间差是负数,可以视为一个错误,因为永远不会在延迟时间结束后,在一个更早的触发时间从队列中取得项。
SynchronousQueue 类是最简单的。它没有内部容量。它就像线程之间的手递手机制。在队列中加入一个元素的生产者会等待另一个线程的消费者。当这个消费者出现时,这个元素就直接在消费者和生产者之间传递,永远不会加入到阻塞队列中。
回页首
使用 ConcurrentMap 实现
新