深入浅出Java Concurrency : 线程池 part 8 线程池的实现及原理 ((二)

2014-11-24 01:22:38 · 作者: · 浏览: 3
否中断其运行,如果不中断那么任务将继续运行直到结束。
此方法返回后任务要么处于运行结束状态,要么处于取消状态。isDone()将永远返回true,如果cancel()方法返回true,isCancelled()始终返回true。
来看看Future接口的实现类java.util.concurrent.FutureTask具体是如何操作的。

在FutureTask中使用了一个AQS数据结构来完成各种状态以及加锁、阻塞的实现。

在此AQS类java.util.concurrent.FutureTask.Sync中一个任务用4中状态:

ThreadPoolExecutor-FutureTask-state

初始情况下任务状态state=0,任务执行(innerRun)后状态变为运行状态RUNNING(state=1),执行完毕后变成运行结束状态RAN(state=2)。任务在初始状态或者执行状态被取消后就变为状态CANCELLED(state=4)。AQS最擅长无锁情况下处理几种简单的状态变更的。

void innerRun() {
if (!compareAndSetState(0, RUNNING))
return;
try {
runner = Thread.currentThread();
if (getState() == RUNNING) // recheck after setting thread
innerSet(callable.call());
else
releaseShared(0); // cancel
} catch (Throwable ex) {
innerSetException(ex);
}
}
执行一个任务有四步:设置运行状态、设置当前线程(AQS需要)、执行任务(Runnable#run或者Callable#call)、设置执行结果。这里也可以看到,一个任务只能执行一次,因为执行完毕后它的状态不在为初始值0,要么为CANCELLED,要么为RAN。

取消一个任务(cancel)又是怎样进行的呢?对比下前面取消任务的描述是不是很简单,这里无非利用AQS的状态来改变任务的执行状态,最终达到放弃未启动或者正在执行的任务的目的。

boolean innerCancel(boolean mayInterruptIfRunning) {
for (;;) {
int s = getState();
if (ranOrCancelled(s))
return false;
if (compareAndSetState(s, CANCELLED))
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
r.interrupt();
}
releaseShared(0);
done();
return true;
}
到目前为止我们依然没有说明到底是如何阻塞获取一个结果的。下面四段代码描述了这个过程。

1 V innerGet() throws InterruptedException, ExecutionException {
2 acquireSharedInterruptibly(0);
3 if (getState() == CANCELLED)
4 throw new CancellationException();
5 if (exception != null)
6 throw new ExecutionException(exception);
7 return result;
8 }
9 //AQS#acquireSharedInterruptibly
10 public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
11 if (Thread.interrupted())
12 throw new InterruptedException();
13 if (tryAcquireShared(arg) < 0)
14 doAcquireSharedInterruptibly(arg); //park current Thread for result
15 }
16 protected int tryAcquireShared(int ignore) {
17 return innerIsDone() 1 : -1;
18 }
19
20 boolean innerIsDone() {
21 return ranOrCancelled(getState()) && runner == null;
22 }
当调用Future#get()的时候尝试去获取一个共享变量。这就涉及到AQS的使用方式了。这里获取一个共享变量的状态是任务是否结束(innerIsDone()),也就是任务是否执行完毕或者被取消。如果不满足条件,那么在AQS中就会doAcquireSharedInterruptibly(arg)挂起当前线程,直到满足条件。AQS前面讲过,挂起线程使用的是LockSupport的park方式,因此性能消耗是很低的。

至于将Runnable接口转换成Callable接口,java.util.concurrent.Executors.callable(Runnable, T)也提供了一个简单实现。

static final class RunnableAdapter implements Callable {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
延迟、周期性任务调度的实现

java.util.concurrent.ScheduledThreadPoolExecutor是默认的延迟、周期性任务调度的实现。

有了整个线程池的实现,再回头来看延迟、周期性任务调度的实现应该就很简单了,因为所谓的延迟、周期性任务调度,无非添加一系列有序的任务队列,然后按照执行顺序的先后来处理整个任务队列。如果是周期性任务,那么在执行完毕的时候加入下一个时间点的任务即可。