)
invokeAny()在任意一个任务成功(或ExecutorService被中断/超时)后就会返回。也分为不限时和限时版本,但为了进一步保障性能,invokeAny()的实现思路与invokeAll()略有不同。
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
内部调用了doInvokeAny()。
学习5-8行的写法,代码自解释。
doInvokeAny()
简化如下:
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
...
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
...
try {
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
if (f != null) {
--active;
try {
return f.get();
} catch (...) {
ee = ...;
}
}
}
...
throw ee;
} finally {
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
要点:
- ntasks维护未提交的任务数,active维护已提交未结束的任务数。
- 内部使用ExecutorCompletionService维护已完成的任务。
- 如果没有任务成功结束,则返回捕获的最后一个异常。
- 第一个任务是必将被执行的,其他任务按照迭代器顺序增量提交。
14行先向线程池提交一个任务(迭代器第一个),ntasks–,active=1:
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
这里是真“提交”了,不是“执行”。
然后18-45行循环检查是否有任务成功结束。
首先,19行通过及时返回的poll()方法,尝试取出一个已完成的任务:
Future<T> f = ecs.poll();
根据f的结果,分成两种情况讨论。
ExecutorCompletionService默认使用LinkedBlockingQueue作为任务队列。对LinkedBlockingQueue不熟悉的可参照源码|并发一枝花之BlockingQueue。
case1:如果有任务完成
如果有任务完成,则f不为null,进入40-49行,active–,并尝试取出任务结果:
if (f != null) {
--active;
try {
return f.get();
} catch (...) {
ee = ...;
}
}
- 如果能够成功取出,即当前任务已成功结束,直接返回。
- 如果抛出异常,则当前任务异常结束,使用ee记录异常。
显然,如果已完成的任务是异常结束的,invokeAny()不会退出,而是继续查看其它任务。
FutureTask#get()的用法参照源码|使用FutureTask的正确姿势。
case2:如果没有任务完成
如果没有任务完成,则f为null,进入23-39行,判断是继续提交任务、退出还是等待任务结果:
if (f == null) {
if (ntasks > 0) { // check1
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0) // check2
break;
else if (timed) { // check3
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else // check4
f = ecs.take();
}
- check1:如果还有剩余任务(ntasks > 0),那就继续提交,同时ntasks–,active++。
- check2:如果没有剩余任务了,且也没有已提交未结束的任务(active == 0),则表示全部任务均已执行结束,但没有一个任务是成功的,可以退出循环。退出循环后,将在47行抛出ee记录的最后一个异常。
- check3:如果可以没有剩余任务,但还