java.io包中的字节流(上)—— 基本的InputStream和OutputStream

前面在Java IO概述的文章中已经提过,JDK1.0中就有的传统的IO包括字节流和字符流。我们这篇就说下java.io中的字节流。说Java IO字节流也就是InputStream和OutputStream,而从这两个类的继承类层次结构来看,从FilterInputStream和FilterOutputStream开始,上下可分为原始的字节流和“装饰”过的功能封装字节流这篇先说说前半部分——原始字节流的各个类——包括部分源码分析。

0. InputStream和OutputStream

先看下类的声明:

public abstract class InputStream implements Closeable
public abstract class OutputStream implements Closeable, Flushable

可见此二者都是抽象类,而非接口。也就是说除了分别满足java.io.Closeable和java.io.Flushable,提供了close()和flush()方法的默认实现外,还给出了其它实现,像InputStream就提供了skip()方法实现等。

我们更关心的是InputStream中的几个read()方法和OutputStream的几个write()方法。而实际上最核心的read()和write()方法,InputStream和OutputStream并未给出直接实现,这正是InputStream和OutputStream抽象的地方,我们来看下。

InputStream的read()方法:

  • public abstract int read() throws IOException; 核心read()方法,留给子类实现。
  • public int read(byte b[])
  • public int read(byte b[], int off, int len)

后两者是调用第一个方法读特定长度的数据放入byte数组中。

OutputStream的write()方法:

  • public abstract void write(int b) throws IOException; 核心write()方法,留给子类实现。
  • public void write(byte b[])
  •  public void write(byte b[], int off, int len)

和read()类似,把字节数组中的特定数据挨个调用第一个write()方法写出。

再说read()和wirte()实现,那么就要看一下InputStream和OutputStream的子类。我们先来看看InputStream和OutputStream的直接子类。

在java.io中InputStream的直接子类有:

  • java.io.ByteArrayInputStream
  • java.io.FileInputStream
  • java.io.FilterInputStream
  • java.io.ObjectInputStream
  • java.io.PipedInputStream
  • java.io.SequenceInputStream
  • java.io.StringBufferInputStream

而java.io中OutputStream的直接子类有:

  • java.io.ByteArrayOutputStream
  • java.io.FileOutputStream
  • java.io.FilterOutputStream
  • java.io.ObjectOutputStream
  • java.io.PipedOutputStream

这些当中,FileInputStream和FileOutputStream是与外部IO直接有关系的,而FilterInputStream和FilterOutputStream是“装饰者”设计实现的基类,其它各类都是特定场景下InputStream和OutputStream的实现,我们来具体看看。

1. FileInputStream/FileOutputStream

顾名思义,是文件的输入流和输出流。文件存在于哪里呢?通常存在于外部设备上,这个是这些实现类中比较特殊的。我们如果做过C语言开发,知道我们通常会open操作系统中的文件,并保留其文件描述符fd。其实,在Java中我们也有类似的东西。首先,就有fd属性,它是FileDescriptor类,和C等底层语言中一样,这实际上是对底层文件的一个描述符,和底层文件进行交互的时候少不了它。实际上在FileInputStream/FileOutputStream对象构造方法中,我们初始化了FileDescriptor的fd对象,并调用了open()方法。

遗憾的是,像open()、read()、write()、skip()、available()这些FileInputStream/FileOutputStream中的具体操作方法,在(Sun)JDK中都冠以native,即本地实现,这样做的好处就是上层开发使用者不必关心,统统交由JVM等底层实现进行处理,实现了平台无关性。

在FileInputStream还有这样几个属性:

  • private final FileDescriptor fd;
  • private FileChannel channel = null;   JDK1.4之后为了支持NIO的Channel操作
  • private final Object closeLock = new Object();  关闭时的并发同步锁
  • private volatile boolean closed = false; 关闭标志
  • private static final ThreadLocal<Boolean> runningFinalize =
    new ThreadLocal<>();  finalize是否运行的标志

此外,再简要看下FileInputStream构造方法的一个具体实现:

    public FileInputStream(File file) throws FileNotFoundException {
        String name = (file != null ? file.getPath() : null);
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkRead(name);
        }
        if (name == null) {
            throw new NullPointerException();
        }
        fd = new FileDescriptor();
        fd.incrementAndGetUseCount();
        open(name);
    }

其它重载要么是调用了这个方法,要么实质上是一致的。FileOutputStream也是类似,出了open()调用的时候有一个append属性参数,标志是否为文件追加。

2. ByteArrayInputStream/ByteArrayOutputStream

名称也很直接,实际上就是字节数组,在构造类对象的时候给出一个byte数组。对于ByteArrayInputStream来说,这个byte数组就是读取的源头,而ByteArrayOutputStream则是write()的目的地。

此外,构造方法中还可以指定这个byte数组的有效起点和长度。

    public ByteArrayInputStream(byte buf[], int offset, int length) {
        this.buf = buf;
        this.pos = offset;
        this.count = Math.min(offset + length, buf.length);
        this.mark = offset;
    }

而对于ByteArrayOutputStream,给出byte数组长度参数即可。甚至还有默认实现,长度32。

    public ByteArrayOutputStream(int size) {
        if (size < 0) {
            throw new IllegalArgumentException("Negative initial size: "
                                               + size);
        }
        buf = new byte[size];
    }

3. PipedInputStream/PipedOutputStream

从名字理解也没错,就是管道输入输出(字节)流。它们的存在需要彼此,即PipedInputStream对象和PipedOutputStream对象只有互相存在才真正有意义,当然可以先构建后连接(connect)。

总体来说就是PipedInputStream维护了一个PipedOutputStream对象的属性,而PipedOutputStream也维护了一个PipedInputStream对象属性。而PipedInputStream额外维护了一个缓冲区数组。当PipedOutputStream执行write()的时候,如果PipedInputStream对象未就绪,会发生一个异常。就绪情况下会调用PipedInputStream对象的receive()方法进行接收,也就是写入缓冲数组buf[]。而最终的PipedInputStream的读取就从这个buf[]缓冲数组中来。

4. SequenceInputStream

这个SequenceInputStream是InputStream的另一个实现,而且没有OutputStream与其对应(至少在java.io中没有)。这个类实际上所做的,也是一个对多个其它InputStream的“包装”,而这个所谓的“包装”的作用就是把这些依次(sequently)连接起来,当一个read()到头了(返回-1)就接着读下一个。

下面是read()方法实现:

    public int read() throws IOException {
        if (in == null) {
            return -1;
        }
        int c = in.read();
        if (c == -1) {
            nextStream();
            return read();
        }
        return c;
    }

5. StringBufferInputStream

这个类的对象是通过一个String字符串对象来构造,read()实际上就是读取其中的字节(读取到char再和0xFF与操作)。遗憾的是这个类已经被@Deprecated,即不建议使用。不多说了。

6. ObjectInputStream/ObjectOutputStream

ObjectInputStream除了extends了InputStream,还实现了ObjectOutput, ObjectStreamConstants两个接口。ObjectOutputStream也类似。

这两个类与序列化有关,后面序列化文章统一说明。

7. FilterInputStream/FilterOutputStream

从文章开头就提到了,这两个类实际上是Java IO字节流另外一部分的开头,就是以“装饰者模式”实现的,提供了更多功能的封装类别字节流,下篇文章详细介绍整理。

作者原创,难免有错误,欢迎读者热心评论留言指出,以免误导他人,谢谢!

发表在 IO, Java, Java语言, 开发, 计算机技术 | 标签为 , , , , , | 10 条评论

Java IO整理概要

随着上一篇介绍Fork Join的文章结束,本阶段对Java并发的整理就先告一段落。其实关于Java并发开发还有很多很多内容,像Disruptor这种优秀的工具等,但不可能一一赘述。

从这篇开始,我会对Java的又一重要的基础点进行复习整理,那就是Java IO。

IO实际上就是input & output,解释起来就是输入和输出。对于一个完整的计算机系统,IO绝对是不可缺少的一部分。没有了IO,计算机也就没有了数据的源头和目标。IO在计算机系统中是复杂多样的,磁盘、网络、扫描、显示、打印等都是IO,对于一个特定的IO设备,通常我们的操作系统会有一个程序模块,就是驱动。也多亏了操作系统,封装了驱动等偏底层的内容,使得应用者还开发者可以以更通用的方式来看待复杂多样的输入输出。而无论是和操作系统层面关联更密切的C,还是有了JVM的高级语言Java,对IO的部分都进行了封装,屏蔽了底层的繁琐细节,而建立了抽象的模型,为上层开发提供了方便。

下面简要整理下,本篇以及后面一系列文章所要整理到的内容。

  • 字节流。java.io包中的InputStreamOutputStream两个类及各具体子类,这是Java IO发展过程当中最早就有的内容。
  • 字符流。java.io包中的ReaderWriter类以及具体子类。在字节流处理的基础上,封装了编码等内容,提供了字符级别的处理,更方便开发。
  • 文件处理。输入输出最基本的就是通过磁盘文件进行,Java IO中的文件处理也很重要。这其中包括File类的使用、FileInputStream/FileOutputStream、RandomAccessFile以及文件的映射和锁定等。
  • Socket相关。除了最基本的文件处理,企业级开发中网络的数据传输在Java IO中有很多要点,而对于网络的输入输出我们都是通过Socket以及其他java.net包中的类进行的。
  • NIO。这一内容是从JDK1.4开始正式新增的,包括java.nio和子包。所谓NIO,很多朋友的理解是non-blocking IO,但貌似更正式的解释是New IO,当然其中也具有了一些对非阻塞(non-blocking)的支持。在NIO中,Buffer和Channel是主要的角色。
  • Selector和IO复用。和NIO紧密结合,很多应用开发,尤其是对Socket数据通信相关的处理中,Selector发挥了很重要的作用,使得同一个线程可以处理多个连接发来的请求,节省了资源。
  • 序列化。其实在java.io包中还包含了像Serializable和Externalizable,目的就是方便进行和输入输出相关的序列化和反序列化操作。
  • Java 7 AIO。在JavaSE7的API和JDK1.7中,在java.nio.channels包中增加了一些和AIO相关的内容。
Java NIO

《Java NIO》封面

Java的JDK对IO的支持从1.0到1.1,再到1.4和1.7都有着很大的变化,目的就在于提高IO的性能和IO工具使用的方便性。可见Java IO的重要性,NIO同时成为了大量连接处理所采用的主要策略,这也成为了很多大公司面试官面试的重要题目。

要说的大概就这些,后面的文章会对这些一一整理。

作者原创,难免有错误,欢迎读者热心评论留言指出,以免误导他人,谢谢!

发表在 IO, Java, Java语言, 开发, 计算机技术 | 标签为 , , , | 留下评论

Java7中的ForkJoin并发框架初探(下)—— ForkJoin的应用

前两篇文章已经对Fork Join的设计和JDK中源码的简要分析。这篇文章,我们来简单地看看我们在开发中怎么对JDK提供的工具类进行应用,以提高我们的需求处理效率。

Fork Join这东西确实用好了能给我们的任务处理提高效率,也为开发带来方便。但Fork Join不是那么容易用好的,我们先来看几个例子(反例)。

0. 反例错误分析

我们先来看看这篇文章中提供的例子:http://www.iteye.com/topic/643724 (因为是反例,就不提供超链接了,只以普通文本给出URL)

这篇文章是我学习和整理Fork Join时搜索到的一篇文章,其实总的来说这篇文章前面分析得还是比较好的,只是给出的第一个例子(有返回结果的RecursiveTask应用的例子)没有正确地对Fork Join进行应用。为了方便分析,还是贴下这个例子中具体的的代码吧。

    public class Calculator extends RecursiveTask {

        private static final int THRESHOLD = 100;
        private int start;
        private int end;

        public Calculator(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            int sum = 0;
            if((start - end) < THRESHOLD){
                for(int i = start; i< end;i++){
                    sum += i;
                }
            }else{
                int middle = (start + end) /2;
                Calculator left = new Calculator(start, middle);
                Calculator right = new Calculator(middle + 1, end);
                left.fork();
                right.fork();

                sum = left.join() + right.join();
            }
            return sum;
        }

    }

我们看到其中一段已经高亮的代码,显示对两个子任务进行fork()调用,即分别提交给当前线程的任务队列,依次加到末尾。紧接着,又按照调用fork()的顺序执行两个子任务对象的join()方法。

其实,这样就有一个问题,在每次迭代中,第一个子任务会被放到线程队列的倒数第二个位置,第二个子任务是最后一个位置。当执行join()调用的时候,由于第一个子任务不在队列尾而不能通过执行ForkJoinWorkerThread的unpushTask()方法取出任务并执行,线程最终只能挂起阻塞,等待通知。而Fork Join本来的做法是想通过子任务的合理划分,避免过多的阻塞情况出现。这样,这个例子中的操作就违背了Fork Join的初衷,每次子任务的迭代,线程都会因为第一个子任务的join()而阻塞,加大了代码运行的成本,提高了资源开销,不利于提高程序性能。

除此之外,这段程序还是不能进入Fork Join的过程,因为还有一个低级错误。看下第15、16行代码的条件,就清楚了。按照逻辑,start必然是比end小的。这将导致所有任务都将以循环累加的方式完成,而不会执行fork()和join()。

由此可见,Fork Join的使用还是要注意对其本身的理解和对开发过程中细节的把握的。我们看下JDK中RecursiveAction和RecursiveTask这两个类。

1. RecursiveAction分析及应用实例

这两个类都是继承了ForkJoinTask,本身给出的实现逻辑并不多不复杂,在JDK的类文件中,它的注释比源码还要多。我们可以看下它的实现代码。

public abstract class RecursiveAction extends ForkJoinTask<Void> {
    private static final long serialVersionUID = 5232453952276485070L;

    protected abstract void compute();

    public final Void getRawResult() { return null; }

    protected final void setRawResult(Void mustBeNull) { }

    protected final boolean exec() {
        compute();
        return true;
    }
}

我们看到其中两个方法是关于处理空返回值的方法。而exec方法则是调用了compute(),这个compute就是我们使用Fork Join时需要自己实现的逻辑。

我们可以看下API中给出的一个最简单最具体的例子:

class IncrementTask extends RecursiveAction {
   final long[] array; final int lo; final int hi;
   IncrementTask(long[] array, int lo, int hi) {
     this.array = array; this.lo = lo; this.hi = hi;
   }
   protected void compute() {
     if (hi - lo < THRESHOLD) {
       for (int i = lo; i < hi; ++i)
         array[i]++;
     }
     else {
       int mid = (lo + hi) >>> 1;
       invokeAll(new IncrementTask(array, lo, mid),
                 new IncrementTask(array, mid, hi));
     }
   }
 }

大致的逻辑就是,对给定一个特定数组的某段,进行逐个加1的操作。我们看到else中的代码块,显示取一个lo和hi的中间值,此后分割成两个子任务,并进行invokeAll()调用。我们来看下继承自FutureTask的invokeAll()方法实现。很简单:

    public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
        t2.fork();
        t1.invoke();
        t2.join();
    }

对于参数中的两个子任务,对第二个子任务进行fork(),即放入线程对应队列的结尾,然后执行第一个子任务,再调用第二个子任务的join(),实际上就是跳转到第二个子任务,进行执行(当然如果不能执行,就需要阻塞等待了)。

其实invokeAll()是个重载方法,同名的还有另外两个,基本逻辑都是一样的,我们拿出一个通用一点的来看一下:

    public static void invokeAll(ForkJoinTask<?>... tasks) {
        Throwable ex = null;
        int last = tasks.length - 1;
        for (int i = last; i >= 0; --i) {
            ForkJoinTask<?> t = tasks[i];
            if (t == null) {
                if (ex == null)
                    ex = new NullPointerException();
            }
            else if (i != 0)
                t.fork();
            else if (t.doInvoke() < NORMAL && ex == null)
                ex = t.getException();
        }
        for (int i = 1; i <= last; ++i) {
            ForkJoinTask<?> t = tasks[i];
            if (t != null) {
                if (ex != null)
                    t.cancel(false);
                else if (t.doJoin() < NORMAL && ex == null)
                    ex = t.getException();
            }
        }
        if (ex != null)
            UNSAFE.throwException(ex);
    }

我们发现第一个子任务(i==0的情况)没有进行fork,而是直接执行,其余的统统先调用fork()放入任务队列,之后再逐一join()。其实我们注意到一个要点就是第一个任务不要fork()再join(),也就是上面中例子的错误所在,这样会造成阻塞,而不能充分利用Fork Join的特点,也就不能保证任务执行的性能。

Oracle的JavaSE7 API中在RecursiveAction里还有一个更复杂的例子,是计算double数组平方和的,由于代码较长,就不列在这里了。总体思路和上面是一样的,额外增加了动态阈值的判断,感兴趣的想深入理解的可以到这里去参考一下。

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/RecursiveAction.html

2. RecursiveTask简要说明

其实说完了RecursiveAction,RecursiveTask可以用“同理”来解释。实现代码也很简单:

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;

    V result;

    protected abstract V compute();

    public final V getRawResult() {
        return result;
    }

    protected final void setRawResult(V value) {
        result = value;
    }

    protected final boolean exec() {
        result = compute();
        return true;
    }

}

我们看到唯一不同的是返回结果的处理,其余都可以和RecursiveAction一样使用。

3. Fork Join应用小结

Fork Join是为我们提供了一个非常好的“分而治之”思想的实现平台,并且在一定程度上实现了“变串行并发为并行”。但Fork Join不是万能的页不完全是通用的,对于可很好分解成子任务的场景,我们可以对其进行应用,更多时候要考虑需求和应用场景,并且注意其使用要点才行。

发表在 Java, Java语言, 并发, 开发, 计算机技术 | 标签为 , , , , | 25 条评论

Java7中的ForkJoin并发框架初探(中)——JDK中实现简要分析

根据前文描述的Doug Lea的理论基础,在JDK1.7中已经给出了Fork Join的实现。在Java SE 7的API中,多了ForkJoinTask、ForkJoinPool、ForkJoinWorkerThread、RecursiveAction、RecursiveTask这样5个类。本文就对JDK1.7中增加这5个工具类实现做简要分析。

0. JDK中ForkJoin实现概述

在JavaSE7的API和JDK1.7中,分别集成了支持ForkJoin的五个类:

  • ForkJoinPool 实现ForkJoin的线程池
  • ForkJoinWorkerThread  实现ForkJoin的线程
  • ForkJoinTask<V> 一个描述ForkJoin的抽象类
  • RecursiveAction 无返回结果的ForkJoinTask实现
  • RecursiveTask<V> 有返回结果的ForkJoinTask实现

ForkJoinPool维护了多个线程构成的数组,维护了任务提交队列,给出了多个线程之间工作窃取的实现。给出了任务类型适配,和提交任务逻辑的实现。需要和线程紧密配合。

而ForkJoinWorkerThread则继承了java.lang.Thread类,维护了线程自己的队列,同一个任务fork()操作原则上会添加到同一个线程队列中。而这个线程类需要和ForkJoinPool紧密合作,有指向对应ForkJoinPool对象的引用。

ForkJoinTask则实现了Future接口,除了对接口的实现外,主要是fork()和join()操作。注意,貌似fork()只有ForkJoinWorkerThread 中才能执行。

两个子类RecursiveAction和RecursiveTask则实现比较简单,区别就在于返回值的处理不同。

1. ForkJoinPool

ForkJoinPool是实现了 Fork Join 的线程池。看JDK源码我们知道ForkJoinPool是extends AbstractExecutorService的,也就是说间接地实现了Executor和ExecutorService接口。实际上也就意味着ForkJoinPool是继ThreadPoolExecutor后的又一个Executor(Service)的具体实现。

1.1. 构建初始化

我们先看ForkJoinPool的构造方法,一共有3个重载的实现。有一个单参数的默认实现,通常我们使用这个就足够了,这最终会以默认的参数调用3参数的构造方法。我们再来看3个参数的构造方法实现。其中:

  • int parallelism 第一个参数是并行度,这个参数简介影响着(会额外做一些运算)这个ForkJoinPool的ForkJoinWorkerThread 线程数。默认情况下,这个参数是任务运行环境的处理器个数,比如系统提供的处理器数目为4,初始化线程池会开启16个线程。
  • ForkJoinWorkerThreadFactory factory 这个是ForkJoinPool构建新线程ForkJoinWorkerThread 对象的工厂,类似于ThreadPoolExecutor中用到的ThreadFactory。
  • Thread.UncaughtExceptionHandler handler 这个前面并发的文章页提到过,是线程异常处理器,这里不多说了。

1.2. 任务提交

前面已经提到,ForkJoinPool也是Executor(Service)的实现,那么execute()和submit()这样向ThreadPoolExecutor提交任务的方法对于ForkJoinPool来说也是一样有效的。

需要说明的是,除了增加支持ForkJoinTask对象参数的重载实现外,还在Runnable和Callable参数的方法中对原始的Runnable和Callable对象做了到ForkJoinTask的适配,使用的分别是ForkJoinTask的静态内部类AdaptedRunnable和AdaptedCallable的对象。而这两个类型参数对应的方法最终都会调用ForkJoinTask参数的方法:

    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        forkOrSubmit(task);
        return task;
    }

我们接下来再看下任务提交中被调用到的forkOrSubmit()方法:

    private <T> void forkOrSubmit(ForkJoinTask<T> task) {
        ForkJoinWorkerThread w;
        Thread t = Thread.currentThread();
        if (shutdown)
            throw new RejectedExecutionException();
        if ((t instanceof ForkJoinWorkerThread) &&
            (w = (ForkJoinWorkerThread)t).pool == this)
            w.pushTask(task);
        else
            addSubmission(task);
    }

逻辑很容易理解,先判断ForkJoinPool的状态,若已停止,则抛异常返回。之后如果当前线程是ForkJoinWorkerThread类型的,则将任务追加到ForkJoinWorkerThread对象中维护的队列上,否则将新的任务放入ForkJoinPool的提交队列中,并通知线程工作。

1.3. 线程的启动和工作

前面已经强调过,ForkJoinPool和ForkJoinWorkerThread是紧密相关,耦合在一起的。Thread的start()会调用run(),而ForkJoinWorkerThread类重写了run()方法,会调用对应的线程池ForkJoinPool对象的work()方法。

我们来看一下work()方法的实现。

    final void work(ForkJoinWorkerThread w) {
        boolean swept = false;                // true on empty scans
        long c;
        while (!w.terminate && (int)(c = ctl) >= 0) {
            int a;                            // active count
            if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
                swept = scan(w, a);
            else if (tryAwaitWork(w, c))
                swept = false;
        }
    }

里面主要是一个while循环体,只要当前的线程和线程池不是处于终止状态,则这个循环一直执行。执行的内容则是这样的,如果能够根据scan()方法得到任务,并执行,否则进入阻塞状态。

我们来看一下scan()方法的实现。

    private boolean scan(ForkJoinWorkerThread w, int a) {
        int g = scanGuard; // mask 0 avoids useless scans if only one active
        int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
        ForkJoinWorkerThread[] ws = workers;
        if (ws == null || ws.length <= m)         // staleness check
            return false;
        for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
            ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
            ForkJoinWorkerThread v = ws[k & m];
            if (v != null && (b = v.queueBase) != v.queueTop &&
                (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
                long u = (i << ASHIFT) + ABASE;
                if ((t = q[i]) != null && v.queueBase == b &&
                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
                    int d = (v.queueBase = b + 1) - v.queueTop;
                    v.stealHint = w.poolIndex;
                    if (d != 0)
                        signalWork();             // propagate if nonempty
                    w.execTask(t);
                }
                r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
                return false;                     // store next seed
            }
            else if (j < 0) {                     // xorshift
                r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
            }
            else
                ++k;
        }
        if (scanGuard != g)                       // staleness check
            return false;
        else {                                    // try to take submission
            ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
            if ((b = queueBase) != queueTop &&
                (q = submissionQueue) != null &&
                (i = (q.length - 1) & b) >= 0) {
                long u = (i << ASHIFT) + ABASE;
                if ((t = q[i]) != null && queueBase == b &&
                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
                    queueBase = b + 1;
                    w.execTask(t);
                }
                return false;
            }
            return true;                         // all queues empty
        }
    }

看起来很复杂,实际的原理则很简单,就是先尝试做任务窃取( Work Stealing ),如果不满足条件则到提交队列中获取任务。而ForkJoinWorkerThread线程本身也维护了线程内fork和join任务操作得到的队列,结合起来,总体执行任务的顺序就是:

  • 线程会先执行ForkJoinWorkerThread对象内维护的任务队列中的任务,即ForkJoinWorkerThread的execTask()方法中的循环实现。通常是LIFO,即去最新的任务。也有特殊情况,这个根据变量locallyFifo的值来判断。
  • 之后会尝试做任务窃取,尝试从其他线程中获取任务
  • 任务窃取条件不满足时,到提交队列中获取提交的任务

1.4. ForkJoinPool的其它属性

除了上述提到的操作,ForkJoin中还维护了

  • 线程数组和提交任务的队列,这是最基本的
  • 操作相关的锁和条件对象
  • volatile long ctl; 等线程池ForkJoinPool状态的属性
  • static final Random workerSeedGenerator; 等和任务窃取策略相关的一系列属性
  •  private volatile long stealCount; 等数据统计相关属性

等数据属性。

2. ForkJoinWorkerThread

ForkJoinWorkerThread扩展于Thread类,但提供了很多支持ForkJoin的特性。

上文在介绍ForkJoinPool的时候已经对这个类做了很多描述,也强调过线程类ForkJoinWorkerThread和ForkJoinPool相互依赖,放在一起才有意义。实际上,还要提到描述Fork Join任务的类ForkJoinTask。

除了上面提到的以外,对于ForkJoinWorkerThread这个类,再稍微提一下这样几个点:

  • ForkJoinTask<?>[] queue; 这是维护和ForkJoin相关的(子)任务队列,还有queueTop和queueBase属性,分别标记队列的尾部和头部
  • final ForkJoinPool pool; 指向线程池的引用,需要注意的是,这个属性被final修饰
  • 和ForkJoinTask的fork()和join()方法相关的方法——pushTask()和unpushTask(),分别负责在当前ForkJoinWorkerThread对象维护的队列中新增和取回任务
  • 其它与状态和统计相关的属性

3. ForkJoinTask及两个抽象子类

ForkJoinTask是ForkJoin框架中的主体,是ForkJoin中任务的体现。这个类实现了Future和Serializable接口。除了Futrue接口要满足的方法外,我想有这样3个方法是有必要知道的,分别是fork()、join()和exec()。

对于fork(),这个也许大家都很熟悉了,在这里也就是分解出子任务的执行。这个在实现上很简单那,就是在当前线程ForkJoinWorkerThread对象维护的队列中加入新的子任务。实现如下:

    public final ForkJoinTask fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
        return this;
    }

需要注意的是fork()方法的调用是在当前线程对象为ForkJoinWorkerThread的条件下。

我们再来看看对应的join()实现:

    public final V join() {
        if (doJoin() != NORMAL)
            return reportResult();
        else
            return getRawResult();
    }

显然,它有调用了doJoin()方法,我们再来深入了解下。

    private int doJoin() {
        Thread t; ForkJoinWorkerThread w; int s; boolean completed;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            if ((s = status) < 0)
                return s;
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    return setCompletion(NORMAL);
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }

大概的逻辑是这样的,在当前线程对象为ForkJoinWorkerThread的条件下,从队列中取回当前任务ForkJoinTask对象,并尝试在调用线程对其直接执行,否则当前线程调用wait()阻塞等待。更深入的理解可续继续查阅源码。

最后,我们再来看看exec()方法,这个是在ForkJoinTask中是没有给出实现的。

在JDK中,有ForkJoinTask的两个抽象子类RecursiveAction和RecursiveTask,他们分别给出了exec()的实现,这也是这两个子类主要做的事情,实际上是调用了各自的compute()方法,而在RecursiveAction和RecursiveTask中compute()又是未给出实现的。

实际上,compute()方法就是Fork Join要执行的内容,是Fork Join任务的实质,需要开发者给出。

而RecursiveAction和RecursiveTask就是方便开发者使用Fork Join的,RecursiveAction和RecursiveTask这两个类的区别仅仅是返回结果的情况不同。而这个compute()方法就是留给开发者继承扩展使用的。这个会在下篇文章详细讲述。

发表在 Java, Java语言, 并发, 开发, 计算机技术 | 标签为 , , , , , , | 11 条评论

Java7中的ForkJoin并发框架初探(上)——需求背景和设计原理

最近事情较多,好久没发文章了。前面关于Java并发的文章中主要介绍了并发的概念、思想、JavaSE5中java.util.concurrent包中的工具类的使用和实现源码的分析。这篇我们来简要了解一下JavaSE7中提供的一个新特性 —— Fork Join 框架

0. 处理器发展和需求背景

回想一下并发开发的初衷,其实可以说是有两点,或者说可以从两个方面看。

  • 对于单核的处理器来说,在进行IO操作等比较费时的操作进行时,如果执行任务的方式是单任务的,那么CPU将会“空转”,知道IO操作结束。如果有多任务的调度机制,则在一个任务不需要CPU支持的时候,CPU可以被调度处理其他任务。简单地讲,并发可以提高CPU计算资源的利用率。
  • 对于多核,或者多个计算资源的情况下,并发可以在某种程度上达到“并行”,即同时运行,缩短了任务完成的时间,提高了任务完成的效率。

我们再来看一下处理器计算能力的发展(讲并发或者并行基本都要提到),Intel的创始人之一Gordon Moore曾经说过一句话,大概意思是:

当价格不变时,集成电路上可容纳的晶体管数目,约每隔18个月便会增加一倍,性能也将提升一倍。

我们可以这样理解,处理器的计算能力在一定意义上和芯片上集成的晶体管数量有关,而这项继承技术的发展史飞快的。但是,什么事情都是有一个极限的,提升计算性能仅仅靠增加晶体管数量提高处理器主频是不现实的,于是多核处理器的概念就出来了。

随着在硬件上多核处理器的发展和广泛使用,软件开发上的变革也在进行。简单来想,对于多个不相关的小任务来讲,可以分派到不同的处理器核心来进行处理。然而,对于一个比较大的任务,如何能够充分利用多核计算资源就是一个值得考虑的问题。

解决这个问题的办法就是“分而治之”,而Fork Join正式这样一种思路的产物。

1. Fork Join 的设计简介

看过《Introduction to Algorithms》(《算法导论》)的朋友们应该还记得,在讲到归并排序(Merge Sort)和快速排序的时候,有一种很简单又很有效率的思路就是“分而治之”,即“分治法”。而Fork Join的思路也是同理,只不过划分之后的任务更适合分派给不同的计算资源,可以并行的完成任务。

ForkJoin的任务分解和合并

ForkJoin的任务分解和合并

当计算分别完成之后,最后再合并回来。

简单来看,就是一个递归的分解和合并,直到任务小到可以接受的程度。

2. Fork Join 设计要点

Fork Join设计出来就是为了提高任务完成的效率,围绕这个目标,有一些要点是设计中需要考虑的,下面就给出一些要点。

  • 线程的管理和线程的单纯性。基于如上的设计思路,我们可以看到子任务之间的相关性是相对比较简单的,可以并行处理。为了提高效率,并不需要重量级的线程结构和对应的线程维护,线程实现简单就好,满足需求即可,降低维护成本。
  • 队列机制,硬件支持一定是比较有限的,那么分解的任务应该用队列维护起来,一个好的队列设计是很有必要的。
  • “工作窃取”,也就是设计论文原文中提到的 Work Stealing 。对于负载比较轻的线程,可以帮助负载较重的执行线程分担任务。

对于使用Fork Join的开发者来讲,需要注意:

  • 可用线程数和硬件支持。线程这东西,也是有开销的东西,绝对不是越多越好,尤其在硬件基础有限的情况下。
  • 任务分解的粒度。和前者有关系,就是分解的任务,“小”到什么程度是可以接受的,不可再分。

3. Fork Join数据结构支持

按照如上设计,分解执行一个大的任务,Fork Join至少需要考虑如下一些数据结构。

  • 轻量级的线程结构。
  • 维护线程的线程池,负责线程的创建,数量维护和任务管理。
  • 维护任务,并支持Work Stealing的双端队列。如下图。
ForkJoin队列

支持ForkJoin任务维护的双端队列Deque

对于子任务的分解,可以从后端取出分解再放入,而对于WorkStealing则可以从头部取出,放入其他队列的尾部。

到此,本文仅仅是对Fork Join的大致设计思路做一个描述、勾勒。下一篇文章中会对JDK1.7中给出的实现作出分析。

4. Fork Join其它参考

Doug Lea的文章可参见这里:DougLea关于ForkJoin设计、实现和性能分析的文章原文

发表在 Java, Java语言, 并发, 开发, 计算机技术 | 标签为 , , , , , | 5 条评论
第 15 页,共 23 页« 最新...369...1314151617...21...最旧 »