ThreadPoolExecutor的应用和实现分析(续)—— 任务饱和丢弃策略

在前面三篇文章中,我们已经对ThreadPoolExecutor的应用以及任务处理和生命周期相关的源码实现做了整理分析。这篇我们简要整理下java.util.concurrent包中的RejectedExecutionHandler这个接口和对应的实现类。

0. RejectedExecutionHandler接口

当ThreadPoolExecutor执行任务的时候,如果线程池的线程已经饱和,并且任务队列也已满。那么就会做丢弃处理,这也是execute()方法实现中的操作,源码如下:

    else if (!addWorker(command, false))
            reject(command);

这个reject()方法很简单,直接调用丢弃处理的handler方法的rejectedExecution()。

在java.util.concurrent中,专门为此定义了一个接口,是RejectedExecutionHandler


public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

其中只有rejectedExecution()一个方法。返回为void,而参数一个是具体的Runnable任务,另一个则是被提交任务的ThreadPoolExecutor。

凡是实现了这个方法的类都可以作为丢弃处理器在ThreadPoolExecutor对象构造的时候作为参数传入,这个前面的文章已经提到过了。其中ThreadPoolExecutor给出了4种基本策略的实现。分别是:

  • CallerRunsPolicy
  • AbortPolicy
  • DiscardPolicy
  • DiscardOldestPolicy

下面分别详细说明。

1. 直接丢弃

这个也是实现最简单的类,其中的rejectedExecution()方法是空实现,即什么也不做,那么提交的任务将会被丢弃,而不做任何处理。

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

这个策略使用的时候要小心,要明确需求。不然不知不觉的任务就丢了。

2. 丢弃最老

和上面的有些类似,也是会丢弃掉一个任务,但是是队列中最早的。

实现如下:

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

注意,会先判断ThreadPoolExecutor对象是否已经进入SHUTDOWN以后的状态。之后取出队列头的任务并不做任何处理,即丢弃,再重新调用execute()方法提交新任务。

3. 废弃终止

这个RejectedExecutionHandler类和直接丢弃不同的是,不是默默地处理,而是抛出java.util.concurrent.RejectedExecutionException异常,这个异常是RuntimeException的子类。这个策略实现如下:

    public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

注意,处理这个异常的线程是执行execute()的调用者线程。

4. 调用者执行策略

在这个策略实现中,任务还是会被执行,但线程池中不会开辟新线程,而是提交任务的线程来负责维护任务。

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

注意,和DiscardOldestPolicy同样,也会先判断ThreadPoolExecutor对象的状态,之后执行任务。这样处理的一个好处,是让caller线程运行任务,以推迟该线程进一步提交新任务,有效的缓解了线程池对象饱和的情况。

上面只是SunJDK中提供的4种最基本策略,开发者可以根据具体需求定制。

此外,前文提到ThreadPoolExecutor也可以进行扩展。在java.util.concurrent包中有ScheduledThreadPoolExecutor这样一个类,其扩展了ThreadPoolExecutor,实现了一些时间和任务调度相关的方法。这里我就不具体整理了。

此条目发表在 Java, Java语言, 并发, 开发, 计算机技术 分类目录,贴了 , , , , , , , 标签。将固定链接加入收藏夹。

ThreadPoolExecutor的应用和实现分析(续)—— 任务饱和丢弃策略》有 2 条评论

  1. shane 说:

    我把你的文章转到我的博客上行吧

发表评论

电子邮件地址不会被公开。 必填项已用 * 标注

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>