设为首页 加入收藏

TOP

源码 | 批量执行invokeAll() && 多选一invokeAny()(二)
2017-12-07 14:22:10 】 浏览:790
Tags:源码 批量 执行 invokeAll invokeAny
)

invokeAny()在任意一个任务成功(或ExecutorService被中断/超时)后就会返回。也分为不限时和限时版本,但为了进一步保障性能,invokeAny()的实现思路与invokeAll()略有不同。

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {
        return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
    }
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

内部调用了doInvokeAny()。

学习5-8行的写法,代码自解释。

doInvokeAny()

简化如下:

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
...
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    ExecutorCompletionService<T> ecs =
        new ExecutorCompletionService<T>(this);
...
    try {
        ExecutionException ee = null;
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Iterator<? extends Callable<T>> it = tasks.iterator();
        futures.add(ecs.submit(it.next()));
        --ntasks;
        int active = 1;
        for (;;) {
            Future<T> f = ecs.poll();
            if (f == null) {
                if (ntasks > 0) {
                    --ntasks;
                    futures.add(ecs.submit(it.next()));
                    ++active;
                }
                else if (active == 0)
                    break;
                else if (timed) {
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    nanos = deadline - System.nanoTime();
                }
                else
                    f = ecs.take();
            }
            if (f != null) {
                --active;
                try {
                    return f.get();
                } catch (...) {
                    ee = ...;
                }
            }
        }
...
        throw ee;
    } finally {
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);
    }
}

要点:

  • ntasks维护未提交的任务数,active维护已提交未结束的任务数。
  • 内部使用ExecutorCompletionService维护已完成的任务。
  • 如果没有任务成功结束,则返回捕获的最后一个异常。
  • 第一个任务是必将被执行的,其他任务按照迭代器顺序增量提交。

14行先向线程池提交一个任务(迭代器第一个),ntasks–,active=1:

futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;

这里是真“提交”了,不是“执行”。
然后18-45行循环检查是否有任务成功结束。

首先,19行通过及时返回的poll()方法,尝试取出一个已完成的任务:

Future<T> f = ecs.poll();

根据f的结果,分成两种情况讨论。

ExecutorCompletionService默认使用LinkedBlockingQueue作为任务队列。对LinkedBlockingQueue不熟悉的可参照源码|并发一枝花之BlockingQueue

case1:如果有任务完成

如果有任务完成,则f不为null,进入40-49行,active–,并尝试取出任务结果:

if (f != null) {
    --active;
    try {
        return f.get();
    } catch (...) {
        ee = ...;
    }
}
  • 如果能够成功取出,即当前任务已成功结束,直接返回。
  • 如果抛出异常,则当前任务异常结束,使用ee记录异常。

显然,如果已完成的任务是异常结束的,invokeAny()不会退出,而是继续查看其它任务。

FutureTask#get()的用法参照源码|使用FutureTask的正确姿势

case2:如果没有任务完成

如果没有任务完成,则f为null,进入23-39行,判断是继续提交任务、退出还是等待任务结果:

if (f == null) {
    if (ntasks > 0) { // check1
        --ntasks;
        futures.add(ecs.submit(it.next()));
        ++active;
    }
    else if (active == 0) // check2
        break;
    else if (timed) { // check3
        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
        if (f == null)
            throw new TimeoutException();
        nanos = deadline - System.nanoTime();
    }
    else // check4
        f = ecs.take();
}
  • check1:如果还有剩余任务(ntasks > 0),那就继续提交,同时ntasks–,active++。
  • check2:如果没有剩余任务了,且也没有已提交未结束的任务(active == 0),则表示全部任务均已执行结束,但没有一个任务是成功的,可以退出循环。退出循环后,将在47行抛出ee记录的最后一个异常。
  • check3:如果可以没有剩余任务,但还
首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Redis 分布式锁的正确实现方式( .. 下一篇使用 Spock 框架进行单元测试

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目