true if there is at least one thread waiting to
* acquire the shared lock, otherwise returns false.
*/
//当前是否有线程等待
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* Provide access to the list of threads waiting to acquire this limited
* shared latch.
*/
//获取所有等待的线程
public Collection
代码应该还是很简单的吧,而且注释也算是说的比较清楚。。。其实是构建了一个继承自AbstractQueuedSynchronizer的Sync对象,通过它来进行真正的同步功能。。。然后通过一个原子的整数计数器,和一个最大值,来判断当前是否可以获取锁
好啦,这里来看看Tomcat是如何通过LimitLatch来控制连接数量的吧,先来看看NioEndpoint的启动方法:
//启动当前的endpoint
public void startInternal() throws Exception {
if (!running) {
running = true; //设置表示为,表示已经看是运行了
paused = false; //没有暂停
// Create worker collection
if ( getExecutor() == null ) { //如果没有executor,那么创建
createExecutor(); //创建executor
}
initializeConnectionLatch(); //初始化闭锁,用于控制连接的数量
// Start poller threads
pollers = new Poller[getPollerThreadCount()]; //根据设置的poller数量来创建poller对象的数组
for (int i=0; i
这里调用了initializeConnectionLatch方法来初始化闭锁,来看看吧:
//初始化闭锁,用于控制连接的数量
protected LimitLatch initializeConnectionLatch() {
if (maxConnections==-1) return null; //这个是无限的链接数量
if (connectionLimitLatch==null) {
connectionLimitLatch = new LimitLatch(getMaxConnections()); //根据最大的链接数量来创建
}
return connectionLimitLatch;
}
我们知道在Connector的配置中可以设置最大的链接数量,其实这里也就是通过这个数量来构建LimitLatch对象的。。。
嗯,Tomcat是从哪里获取连接呢,这个就要从Accecptor看了。。。
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused && running) { //如果暂停了
state = AcceptorState.PAUSED; //更改当前acceptor的状态
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) { //如果没有运行,那么这里直接跳过
break;
}
state = AcceptorState.RUNNING; //设置当前acceptor的状态是running
try {
//if we have reached max connections, wait
countUpOrAwaitConnection(); //增减闭锁的计数,如果connection数量已经达到了最大,那么暂停一下,这里用到的是connectionLimitLatch锁,可以理解为一个闭锁吧
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
// socket
socket = serverSock.accept(); //调用serversocket的accept方法
} catch (IOException ioe) {
//we didn't get a socket
countDownConnection(); //出了异常,并没有获取链接,那么这里减少闭锁的计数
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
}
// Successful accept, reset the error delay
errorDelay = 0;
// setSocketOptions() will add channel to the poller
// if successful
if (running && !paused) {
if (!setSocketOptions(socket)) { //这里主要是将socket加入到poller对象上面去,而且还要设置参数
countDownConnection(); //加入poller对象失败了的话,那么将闭锁的计数减低
closeSocket(socket); //关闭刚刚 创建的这个socket
}
} else {
countDownConnection();
closeSocket(socket);
}
} catch (SocketTimeoutException sx) {
// Ignore: Normal condition
} catch (IOException x) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), x);
}
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
lo