Tomcat源码阅读之闭锁的实现与连接数量的控制(二)

2014-11-24 11:18:58 · 作者: · 浏览: 1
alled. */ //通过将released设置为true,将会释放所有的线程,知道reset了 public boolean releaseAll() { released = true; return sync.releaseShared(0); } /** * Resets the latch and initializes the shared acquisition counter to zero. * @see #releaseAll() */ //重制 public void reset() { this.count.set(0); released = false; } /** * Returns true if there is at least one thread waiting to * acquire the shared lock, otherwise returns false. */ //当前是否有线程等待 public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** * Provide access to the list of threads waiting to acquire this limited * shared latch. */ //获取所有等待的线程 public Collection getQueuedThreads() { return sync.getQueuedThreads(); } }

代码应该还是很简单的吧,而且注释也算是说的比较清楚。。。其实是构建了一个继承自AbstractQueuedSynchronizer的Sync对象,通过它来进行真正的同步功能。。。然后通过一个原子的整数计数器,和一个最大值,来判断当前是否可以获取锁


好啦,这里来看看Tomcat是如何通过LimitLatch来控制连接数量的吧,先来看看NioEndpoint的启动方法:

    //启动当前的endpoint
    public void startInternal() throws Exception {

        if (!running) {
            running = true;  //设置表示为,表示已经看是运行了
            paused = false;  //没有暂停

            // Create worker collection
            if ( getExecutor() == null ) {  //如果没有executor,那么创建
                createExecutor();   //创建executor
            }

            initializeConnectionLatch();   //初始化闭锁,用于控制连接的数量

            // Start poller threads
            pollers = new Poller[getPollerThreadCount()];   //根据设置的poller数量来创建poller对象的数组
            for (int i=0; i
  
   

这里调用了initializeConnectionLatch方法来初始化闭锁,来看看吧:

    //初始化闭锁,用于控制连接的数量
    protected LimitLatch initializeConnectionLatch() {
        if (maxConnections==-1) return null;  //这个是无限的链接数量
        if (connectionLimitLatch==null) {
            connectionLimitLatch = new LimitLatch(getMaxConnections());  //根据最大的链接数量来创建
        }
        return connectionLimitLatch;
    }

我们知道在Connector的配置中可以设置最大的链接数量,其实这里也就是通过这个数量来构建LimitLatch对象的。。。

嗯,Tomcat是从哪里获取连接呢,这个就要从Accecptor看了。。。

 public void run() {

            int errorDelay = 0;

            // Loop until we receive a shutdown command
            while (running) {

                // Loop if endpoint is paused
                while (paused && running) {  //如果暂停了
                    state = AcceptorState.PAUSED;  //更改当前acceptor的状态
                    try {
                        Thread.sleep(50);  
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {  //如果没有运行,那么这里直接跳过
                    break;
                }
                state = AcceptorState.RUNNING;  //设置当前acceptor的状态是running

                try {
                    //if we have reached max connections, wait
                    countUpOrAwaitConnection();  //增减闭锁的计数,如果connection数量已经达到了最大,那么暂停一下,这里用到的是connectionLimitLatch锁,可以理解为一个闭锁吧

                    SocketChannel socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        socket = serverSock.accept();  //调用serversocket的accept方法
                    } catch (IOException ioe) {
                        //we didn't get a socket
                        countDownConnection();  //出了异常,并没有获取链接,那么这里减少闭锁的计数
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // setSocketOptions() will add channel to the poller
                    // if successful
                    if (running && !paused) {
                        if (!setSocketOptions(socket)) {  //这里主要是将socket加入到poller对象上面去,而且还要设置参数
                            countDownConnection();  //加入poller对象失败了的话,那么将闭锁的计数减低
                            closeSocket(socket);  //关闭刚刚 创建的这个socket
                        }
                    } else {
                        countDownConnection();
                        closeSocket(socket);
                    }
                } catch (SocketTimeoutException sx) {
                    // Ignore: Normal condition
                } catch (IOException x) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), x);
                    }
                } catch (OutOfMemoryError oom) {
                    try {
                        oomParachuteData = null;
                        releaseCaches();
                        lo