嗯,今天其实在看HtttpProcessor的实现,但是突然想到了以前在看poller的时候看到了有闭锁,用于控制当前connector的连接数量,嗯,那就顺便把这部分来看了。。。
在Tomcat中,通过继承AbstractQueuedSynchronizer来实现了自己的同步工具,进而来实现了一个用于控制连接数量的闭锁。。LimitLatch。。
这里就需对AbstractQueuedSynchronizer有一些初步的了解。。。
首先它concurrent类库中提供的一个用于构建自己的同步工具的一个工具类。。可以通过继承他来快速的完成一个同步类的实现
(1)acquireSharedInterruptibly()方法,用于以共享的方式来获取锁,如果暂时无法获取,将会将线程挂起到队列,进行阻塞,对于这个方法是否最终能获取锁,是通过tryAcquireShared()方法的返回来定义的,这个方法需要自己实现。。。如果能获取锁,那么返回1,否则返回-1.。。
(2)releaseShared()方法。以共享的方法释放一个锁,这样前面提到的挂起的线程将会唤醒,进而重新尝试获取锁。。。
好啦,接下来就来看看LimitLatch的定义吧,直接上代码好了,。,。代码还是很简单的。。
//其实是通过AbstractQueuedSynchronizer来构建的
public class LimitLatch {
private static final Log log = LogFactory.getLog(LimitLatch.class);
//构建Sync类型,实现基本的同步,以及阻塞。。
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public Sync() {
}
@Override
//用于增加计数,如果计数增加之后小于最大的,那么返回1,不会阻塞,否则将会返回-1阻塞
protected int tryAcquireShared(int ignored) { //调用acquiredShared方法的时候会调用这个方法来返回状态,如果返回1,那么表示获取成功,返回-1表示获取失败,将会阻塞
long newCount = count.incrementAndGet(); //先增加计数
if (!released && newCount > limit) { //如果当前已经超过了最大的限制
// Limit exceeded
count.decrementAndGet(); //减少计数
return -1; //返回-1,将阻塞当前线程
} else {
return 1;
}
}
@Override
//用于减少计数
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();
return true;
}
}
private final Sync sync; //同步对象
private final AtomicLong count; //计数器
private volatile long limit; //最大的数量
private volatile boolean released = false; //是否全部释放
/**
* Instantiates a LimitLatch object with an initial limit.
* @param limit - maximum number of concurrent acquisitions of this latch
*/
public LimitLatch(long limit) {
this.limit = limit; //最大限制
this.count = new AtomicLong(0);
this.sync = new Sync(); //sync 对象
}
/**
* Returns the current count for the latch
* @return the current count for latch
*/
public long getCount() {
return count.get();
}
/**
* Obtain the current limit.
*/
public long getLimit() {
return limit;
}
/**
* Sets a new limit. If the limit is decreased there may be a period where
* more shares of the latch are acquired than the limit. In this case no
* more shares of the latch will be issued until sufficient shares have been
* returned to reduce the number of acquired shares of the latch to below
* the new limit. If the limit is increased, threads currently in the queue
* may not be issued one of the newly available shares until the next
* request is made for a latch.
*
* @param limit The new limit
*/
public void setLimit(long limit) {
this.limit = limit;
}
/**
* Acquires a shared latch if one is available or waits for one if no shared
* latch is current available.
*/
//增加计数,如果太大,那么等等待
public void countUpOrAwait() throws InterruptedException {
if (log.isDebugEnabled()) {
log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
}
sync.acquireSharedInterruptibly(1);
}
/**
* Releases a shared latch, making it available for another thread to use.
* @return the previous counter value
*/
//减少计数
public long countDown() {
sync.releaseShared(0); //释放
long result = getCount();
if (log.isDebugEnabled()) {
log.debug("Counting down["+Thread.currentThread().getName()+"] latch="+result);
}
return result;
}
/**
* Releases all waiting threads and causes the {@link #limit} to be ignored
* until {@link #reset()} is c