双队列的一种实现(三)

2014-11-24 02:08:38 · 作者: · 浏览: 2
iteLock.lockInterruptibly(); if(itemsA == null || itemsB == null){ initArray(lines[0]); } try { for (;;) { if (writeCount + size <= writeArray.length) { insert(lines, size); if (writeCount >= spillSize) { awake.signalAll(); } return true; } // Time out if (nanoTime <= 0) { return false; } // keep waiting try { nanoTime = notFull.awaitNanos(nanoTime); } catch (InterruptedException ie) { notFull.signal(); throw ie; } } } finally { writeLock.unlock(); } } /** * Close the synchronized lock and one inner state. * */ public void close() { writeLock.lock(); try { closed = true; //System.out.println(this); awake.signalAll(); } finally { writeLock.unlock(); } } public boolean isClosed() { return closed; } /** * * * @param timeout * appointed limit time * * @param unit * time unit */ public T poll(long timeout, TimeUnit unit) throws InterruptedException { long nanoTime = unit.toNanos(timeout); readLock.lockInterruptibly(); try { for (;;) { if (readCount > 0) { return extract(); } if (nanoTime <= 0) { return null; } nanoTime = queueSwitch(nanoTime, true); } } finally { readLock.unlock(); } } /** * * @param ea * line buffer * * * @param timeout * a appointed limit time * * @param unit * a time unit * * @return line number of data.if less or equal than 0, means fail. * * @throws InterruptedException * if being interrupted during the try limit time. */ public int poll(T[] ea, long timeout, TimeUnit unit) throws InterruptedException { long nanoTime = unit.toNanos(timeout); readLock.lockInterruptibly(); try { for (;;) { if (readCount > 0) { return extract(ea); } if (nanoTime == -2) { return -1; } if (nanoTime <= 0) { return 0; } nanoTime = queueSwitch(nanoTime, false); } } finally { readLock.unlock(); } } public Iterator iterator() { return null; } /** * Get size of {@link Storage} in bytes. * * @return Storage size. * * */ @Override public int size() { return (writeCount + readCount); } @Override public int drainTo(Collection c) { return 0; } @Override public int drainTo(Collection c, int maxElements) { return 0; } /** * If exists write space, it will return true, and write one line to the * space.
* otherwise, it will try to do that in a appointed time(20 * milliseconds),when time out if still failed, return false. * * @param line * a Line. * * @see DoubleCachedQueue#offer(Line, long, TimeUnit) * */ @Override public boolean offer(T line) { try { return offer(line, 20, TimeUnit.MILLISECONDS); } catch (InterruptedException e1) { log.debug(e1.getMessage(), e1); } return false; } @Override public void put(T e) throws InterruptedException { } @Override public int remainingCapacity() { return 0; } @Override public T take() throws InterruptedException { return null; } @Override public T peek() { return null; } @Override public T poll() { try { return poll(1*1000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.debug(e.getMessage(), e); } return null; } }


(全文完)