介绍
双队列是一种高效的内存数据结构,在多线程 编程中,能保证生产者线程的写入和消费者的读出尽量做到最低的影响,避免了共享队列的锁开销。本文将介绍一种双队列的设计,并给出实现代码,然后会举例使用的场景。该双队列在项目中使用,性能也得到了验证。设计
接下来具体介绍双队列的设计,并且会粘贴少量方法代码,帮助介绍。 本文中讲述的双队列,本质上是两个数组保存写入的Object,一个数组负责写入,另一个被消费者读出,两个数组都对应一个重入锁。数组内写入的数据会被计数。public class DoubleCachedQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable { private static final long serialVersionUID = 1L; private static int default_line_limit = 1000; private static long max_cache_size = 67108864L; private int lineLimit; private long cacheSize; private T[] itemsA; private T[] itemsB; private ReentrantLock readLock, writeLock; private Condition notFull; private Condition awake; /** * writeArray : in reader's eyes, reader get data from data source and write * data to this line array. readArray : in writer's eyes, writer put data to * data destination from this line array. * * Because of this is doubleQueue mechanism, the two line will exchange when * time is suitable. * */ private T[] writeArray, readArray; private volatile int writeCount, readCount; private int writeArrayTP, readArrayHP; private volatile boolean closed = false; private int spillSize = 0; private long lineRx = 0; private long lineTx = 0;
队列实现了阻塞队列的接口,所以在向队列offer数据的时候是阻塞的,同样,取出操作poll也会阻塞。两个数组会在适当的时候进行queueSwitch操作。queueSwitch的条件就是当读者把queue读空了之后,且写入的queue此时不为空的时候,两个queue就会进行交换。在交换的时候,写入queue会被上锁,此时生产者不能让队列里写入数据。一般情况下,queue互换其实就是两个数组的引用互换,将相应的计数器也重置,写队列的计数器此时就清零了,因为queue交换是因为读队列已经被读空。
private long queueSwitch(long timeout, boolean isInfinite)
throws InterruptedException {
System.out.println("queue switch");
writeLock.lock();
try {
if (writeCount <= 0) {
if (closed) {
return -2;
}
try {
if (isInfinite && timeout <= 0) {
awake.await();
return -1;
} else {
return awake.awaitNanos(timeout);
}
} catch (InterruptedException ie) {
awake.signal();
throw ie;
}
} else {
T[] tmpArray = readArray;
readArray = writeArray;
writeArray = tmpArray;
readCount = writeCount;
readArrayHP = 0;
writeCount = 0;
writeArrayTP = 0;
notFull.signal();
// logger.debug("Queue switch successfully!");
return -1;
}
} finally {
writeLock.unlock();
}
}
上面queue交换的时候,可以看到当要被交换的写队列也已经为空的时候,会做一次检查。如果此时queue已经被显示地关闭了,那么poll操作就会返回空,读者此时应该检查queue是否已经被closed了,若已经closed了,那么读者已经把queue里的数据读完了。这里的显示close是我们给双队列加的一个状态,close这件事的作用是为了让读者知道:生产者已经停止往queue里写新数据了,但是queue里其实可能还有未取完的数据(在写queue里,此时还差一次queue switch),你往queue poll取数据的时候,如果取到空了,那么应该做一次check,如果queue已经关闭了,那么读者就知道本次读的任务完全结束了。反过来,close状态其实不影响写,生产者如果还想写的话,其实也是可以的,但是我不推荐这么做。
public void close() {
writeLock.lock();
try {
closed = true;
//System.out.println(this);
awake.signalAll();
} finally {
writeLock.unlock();
}
}
一对多
上面已经大致介绍了双队列的读写。在实际项目中,一对多的场景需要注意的地方有两: 单个生产者需要在结束的时候关闭queue多个消费者需要知道任务结束(知道其他线程已经完成任务) 第一点很简单,比如读文件的话,当生产者readLine()最后为空的时候,就认为数据源已经读完,调用方法把queue close()。而消费者在读queue的时候,有时候可能会由于延迟、queue交换等原因取到空数据,此时就如上面一节所说,消费者线程拿到空数据后应该检查queue的状态,如果queue没有关闭,那么应该等待一小会儿后继续poll数据;如果queue关闭了,那么其实说明该线程已经完成了任务。同理,其他消费者线程也应该在取到空的时候做这样的操作。
vcfpv/a1xLuwo6zO0rXE1/a3qMrHyMPJ+rL61d+3xcjr0ru49kVPRqOstbHEs8/fs8zIobW9RU9GtcTKsbryo6zL+9aqtcDX1Ly6yse12tK7uPbT9rW9vqHNt7XEyMujrMv7u+HWw9K7uPayvLb7o6y2+Mbky/vP37PM1NrIobW9v9W1xMqxuvK74bzssum4w7K8tvsmIzIwNTQwO6Os1eLR+b7NxNzWqrXAyse38dLRvq3T0NChu++w6dLRvq3Ew7W9RU9GwcujrMTHw7TV4sqxuvK+zb/J0tRjb3VudERvd27By6OstvjEw7W9RU9GtcTP37PMvfizzGNvdW50RG93brrzvs1hd2FpdCgpo6zX7rrzzcuz9qGjCs/Cw+bKx87S19S8utXrttTV4tbWs6G+sKOsyrnTw8urttPB0LXEt73KvaOsxuTW0LXEZnJv