Java7中的ForkJoin并发框架初探(下)—— ForkJoin的应用

前两篇文章已经对Fork Join的设计和JDK中源码的简要分析。这篇文章,我们来简单地看看我们在开发中怎么对JDK提供的工具类进行应用,以提高我们的需求处理效率。

Fork Join这东西确实用好了能给我们的任务处理提高效率,也为开发带来方便。但Fork Join不是那么容易用好的,我们先来看几个例子(反例)。

0. 反例错误分析

我们先来看看这篇文章中提供的例子:http://www.iteye.com/topic/643724 (因为是反例,就不提供超链接了,只以普通文本给出URL)

这篇文章是我学习和整理Fork Join时搜索到的一篇文章,其实总的来说这篇文章前面分析得还是比较好的,只是给出的第一个例子(有返回结果的RecursiveTask应用的例子)没有正确地对Fork Join进行应用。为了方便分析,还是贴下这个例子中具体的的代码吧。

    public class Calculator extends RecursiveTask {

        private static final int THRESHOLD = 100;
        private int start;
        private int end;

        public Calculator(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            int sum = 0;
            if((start - end) < THRESHOLD){
                for(int i = start; i< end;i++){
                    sum += i;
                }
            }else{
                int middle = (start + end) /2;
                Calculator left = new Calculator(start, middle);
                Calculator right = new Calculator(middle + 1, end);
                left.fork();
                right.fork();

                sum = left.join() + right.join();
            }
            return sum;
        }

    }

我们看到其中一段已经高亮的代码,显示对两个子任务进行fork()调用,即分别提交给当前线程的任务队列,依次加到末尾。紧接着,又按照调用fork()的顺序执行两个子任务对象的join()方法。

其实,这样就有一个问题,在每次迭代中,第一个子任务会被放到线程队列的倒数第二个位置,第二个子任务是最后一个位置。当执行join()调用的时候,由于第一个子任务不在队列尾而不能通过执行ForkJoinWorkerThread的unpushTask()方法取出任务并执行,线程最终只能挂起阻塞,等待通知。而Fork Join本来的做法是想通过子任务的合理划分,避免过多的阻塞情况出现。这样,这个例子中的操作就违背了Fork Join的初衷,每次子任务的迭代,线程都会因为第一个子任务的join()而阻塞,加大了代码运行的成本,提高了资源开销,不利于提高程序性能。

除此之外,这段程序还是不能进入Fork Join的过程,因为还有一个低级错误。看下第15、16行代码的条件,就清楚了。按照逻辑,start必然是比end小的。这将导致所有任务都将以循环累加的方式完成,而不会执行fork()和join()。

由此可见,Fork Join的使用还是要注意对其本身的理解和对开发过程中细节的把握的。我们看下JDK中RecursiveAction和RecursiveTask这两个类。

1. RecursiveAction分析及应用实例

这两个类都是继承了ForkJoinTask,本身给出的实现逻辑并不多不复杂,在JDK的类文件中,它的注释比源码还要多。我们可以看下它的实现代码。

public abstract class RecursiveAction extends ForkJoinTask<Void> {
    private static final long serialVersionUID = 5232453952276485070L;

    protected abstract void compute();

    public final Void getRawResult() { return null; }

    protected final void setRawResult(Void mustBeNull) { }

    protected final boolean exec() {
        compute();
        return true;
    }
}

我们看到其中两个方法是关于处理空返回值的方法。而exec方法则是调用了compute(),这个compute就是我们使用Fork Join时需要自己实现的逻辑。

我们可以看下API中给出的一个最简单最具体的例子:

class IncrementTask extends RecursiveAction {
   final long[] array; final int lo; final int hi;
   IncrementTask(long[] array, int lo, int hi) {
     this.array = array; this.lo = lo; this.hi = hi;
   }
   protected void compute() {
     if (hi - lo < THRESHOLD) {
       for (int i = lo; i < hi; ++i)
         array[i]++;
     }
     else {
       int mid = (lo + hi) >>> 1;
       invokeAll(new IncrementTask(array, lo, mid),
                 new IncrementTask(array, mid, hi));
     }
   }
 }

大致的逻辑就是,对给定一个特定数组的某段,进行逐个加1的操作。我们看到else中的代码块,显示取一个lo和hi的中间值,此后分割成两个子任务,并进行invokeAll()调用。我们来看下继承自FutureTask的invokeAll()方法实现。很简单:

    public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
        t2.fork();
        t1.invoke();
        t2.join();
    }

对于参数中的两个子任务,对第二个子任务进行fork(),即放入线程对应队列的结尾,然后执行第一个子任务,再调用第二个子任务的join(),实际上就是跳转到第二个子任务,进行执行(当然如果不能执行,就需要阻塞等待了)。

其实invokeAll()是个重载方法,同名的还有另外两个,基本逻辑都是一样的,我们拿出一个通用一点的来看一下:

    public static void invokeAll(ForkJoinTask<?>... tasks) {
        Throwable ex = null;
        int last = tasks.length - 1;
        for (int i = last; i >= 0; --i) {
            ForkJoinTask<?> t = tasks[i];
            if (t == null) {
                if (ex == null)
                    ex = new NullPointerException();
            }
            else if (i != 0)
                t.fork();
            else if (t.doInvoke() < NORMAL && ex == null)
                ex = t.getException();
        }
        for (int i = 1; i <= last; ++i) {
            ForkJoinTask<?> t = tasks[i];
            if (t != null) {
                if (ex != null)
                    t.cancel(false);
                else if (t.doJoin() < NORMAL && ex == null)
                    ex = t.getException();
            }
        }
        if (ex != null)
            UNSAFE.throwException(ex);
    }

我们发现第一个子任务(i==0的情况)没有进行fork,而是直接执行,其余的统统先调用fork()放入任务队列,之后再逐一join()。其实我们注意到一个要点就是第一个任务不要fork()再join(),也就是上面中例子的错误所在,这样会造成阻塞,而不能充分利用Fork Join的特点,也就不能保证任务执行的性能。

Oracle的JavaSE7 API中在RecursiveAction里还有一个更复杂的例子,是计算double数组平方和的,由于代码较长,就不列在这里了。总体思路和上面是一样的,额外增加了动态阈值的判断,感兴趣的想深入理解的可以到这里去参考一下。

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/RecursiveAction.html

2. RecursiveTask简要说明

其实说完了RecursiveAction,RecursiveTask可以用“同理”来解释。实现代码也很简单:

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;

    V result;

    protected abstract V compute();

    public final V getRawResult() {
        return result;
    }

    protected final void setRawResult(V value) {
        result = value;
    }

    protected final boolean exec() {
        result = compute();
        return true;
    }

}

我们看到唯一不同的是返回结果的处理,其余都可以和RecursiveAction一样使用。

3. Fork Join应用小结

Fork Join是为我们提供了一个非常好的“分而治之”思想的实现平台,并且在一定程度上实现了“变串行并发为并行”。但Fork Join不是万能的页不完全是通用的,对于可很好分解成子任务的场景,我们可以对其进行应用,更多时候要考虑需求和应用场景,并且注意其使用要点才行。

此条目发表在 Java, Java语言, 并发, 开发, 计算机技术 分类目录,贴了 , , , , 标签。将固定链接加入收藏夹。

Java7中的ForkJoin并发框架初探(下)—— ForkJoin的应用》有 18 条评论

  1. 黑云压城 说:

    天啊,我对Fork/join的好奇已经很强烈了,在看到此文之前,确实参考了文中起始处的反例,低级错误我也又发现,因而自己手写了一个快排例子,但是fork和jion的顺序问题,从未发现,实实在在就按照那篇文章的笔者思路写,您的文章,真是帮我大忙了,我想,这和ForkJoinTask管理的本线程中的任务队列是LIFO特点相呼应的吧。

  2. 不泪之城 说:

    这段程序还是不能进入Fork Join的过程,因为还有一个低级错误。看下第15、16行代码的条件,就清楚了。按照逻辑,start必然是比end小的。这将导致所有任务都将以循环累加的方式完成,而不会执行fork()和join()不明白这一段,整个过程应该是多个线程可以同时执行不同段(比如1-10)数据的累加,然后再把所有段的和累加起来吗?如果上面的例子不对,能给出一个正确的累加程序例子吗?

  3. 你就飞吧 说:

    work-stealing算法是从队列尾部窃取的吧,另外当工作线程由于调用join()操作阻塞时,会触发work-stealing算法的。你理解的是不是有偏差啊?

    • ForkJoin的队列是双端队列,WorkStealing是从其他线程的队列头部窃取,采用FIFO原则,请参见我关于ForkJoin文章的(上)篇(http://www.molotang.com/articles/694.html),或者直接参看Doug Lea的论文原文http://gee.cs.oswego.edu/dl/papers/fj.pdf,这一点上面写的清清楚楚;第二点也请参看(中)篇,或直接看JDK中的源码实现,欢迎讨论,谢谢

    • 闫新院 说:

      可以参照invokeAll方法的代码实现,fork和join的顺序需要倒置,这样才能满足效率最大化

  4. 最励志网:http://www.zuilizhi.net/? 前来拜访,欢迎互访!

  5. 闫新院 说:

    收获蛮多,多谢楼主

  6. 不错不错,来看看。。

  7. invokeAll(ForkJoinTask t1, ForkJoinTask t2) 这个方法解释我觉得有点问题。
    1:t1先join,如果未执行会调用joinTask,这个方法就是请求帮助,所谓steal;不一定会阻塞,或者说阻塞时间很小(状态判断代码流程)。
    2:t1的执行依然会阻塞t2的。这是在同一个线程里的任务队列。这个invokeall只是t2先加入队列,保证t1先执行。
    invokeAll(ForkJoinTask… tasks) 这个方法解释有问题。。
    倒序遍历只是保证lifo顺序。同样i=0任务先执行,后面那段 t.cancel(false);i=0已经执行完毕,后续的任务不能取消(中断)

    看源码后的个人理解

  8. 游客 说:

    代码有误,Calculator 类的compute()方法永远不会进入else判断,而且else里面的代码也有错误,楼主你代码都没测好就放上来了。。。。

  9. 游客 说:

    囧,看错了,没看到楼主在分析这段错误代码,我就是在iteye上被这段代码坑了,见谅哈

发表评论

电子邮件地址不会被公开。 必填项已用 * 标注

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>