`
songzi0206
  • 浏览: 156792 次
  • 性别: Icon_minigender_1
  • 来自: 上海
博客专栏
Group-logo
All are from ...
浏览量:33383
Group-logo
Programming w...
浏览量:19285
社区版块
存档分类
最新评论

ThreadPoolExecutor execute 方法分析

阅读更多

      分析完AbstractExecutorService异步任务提交之后,一直留着一个问题:就是任务提交之后的最终执行方法execute(Runnable)始终没有细究,只知道它会在将来某个时刻去执行任务,也就是所谓的异步执行。 现在可以揭开异步执行方法executor(Runnable command)的真面目了,回到线程池执行器ThreadPoolExecutor,乍看这个方法,蛮精干的:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
}

      作为异步任务执行的核心方法,看上去短短10行,其实蕴含了许多依赖的组件,这个可以参考文章“ThreadPoolExecutor 分析之类基础架构 ”。

      在分析之前,需要再解释几个概念,ThreadPoolExecutor是一个线程池执行器,它里面维护了一个线程池,以及一个等待执行的任务队列。

      所谓线程池其实就是一个Worker对象的集合:HashSet<Worker> workers = new HashSet<Worker>();至于这个Worker的基本类图如下:      


 

     从类图看(当然最好结合代码)Worker本身是一个Runnable,它自己维护了执行它的线程对象thead又维护了一个Runnable对象firstTask(这个对象就是ThreadPoolExecutor线程池的任务对象了),当ThreadPoolExecutor执行一个任务的时候,先获得 (最直接的方法就是new)一个可用Thread对象,然后再获得(最直接方法就是new)一个Worker对象,并把Thread对象包装进这Worker对象中,接着让这个thread对象start就开始执行这个Worker对象的run()方法,而run()方法中会去执行Worker.firstTask.run()方法。这就间接的的执行了目标任务,同时通过worker这个包(或者说代理)之后,可用做很多额外的工作,比如中断自身执行线程,记录在该线程上执行过的任务数量等。分析addIfUnderCorePoolSize(command)方法时还会分析Worker类。 

    线程池数量poolSize指工作线程Worker对象的集合workers的实际大小,通过workers.size()直接获得。            

      核心线程池数量corePoolSize可理解为工作线程Worker对象的集合workers的目标大小。如果poolSize > corePoolSize那么ThreadPoolExecutor就会有机制在适当的时候回收闲置的线程。

 

     最大线程池数量maxPoolSize就是工作线程Worker对象的集合workers的大小上限。假如说任务队列满了,再来新任务时,若poolSize还没达到maxPoolSize则继续创建新的线程来执行新任务,若不幸poolSize达到了上限maxPoolSize那不能再创建新的线程了,只能采取reject策略来拒绝新任务。

    所谓任务队列就是一个Runnable对象的阻塞队列BlockingQueue<Runnable> workQueue; 可根据不同需求设置不同的队列类型。

     下面分析execute(command)执行流程:

1.  第四行if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

 

          解释:  如果当前线程池中线程数量poolSize >=  核心线程数量corePoolSize 成立,那么逻辑或运算符后面的方法addIfUnderCorePoolSize(command)就忽略不做,而直接进花括号内部;如果poolSize >= corePoolSize不成立,尝试调用addIfUnderCorePoolSize(command)方法,该方法返回true就进花括号,否则整个execute方法就结束。

          理解:这很好理解,如果当前线程数量poolSize>=核心线程数量corePoolSize,那当然无法再把当前任务加入到核心线程池中执行了,于是进花括号选择其他的策略执行;如果poolSize没有达到corePoolSize那很自然是把当前任务放到核心线程池执行,也就是执行逻辑或运算符后的方法addIfUnderCorePoolSize(command)。“放到核心线程池执行”是什么意思呢?就是new 一个新工作线程放到workers集合中,让这个新线程来执行当前的任务command,而这个新线程可以认为是核心线程池中的其中一个线程。

 addIfUnderCorePoolSize(command)方法做了什么事情?请看代码:

 

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }

 

      主要就是在poolSize < corePoolSize并且当前状态runState == RUNNING时通过方法addThread(firstTask)返回一个线程t然后马上t.start()执行任务,返回true。这里有失败的可能,因为虽然在前面execute方法中已经保证了poolSize < corePoolSize进入该方法,但是当时并没有加锁,很有可能到了当前的位置poolSize已经改变了,所以这里必须再次检查并且必须加锁访问。顺便简单看下addThread方法:

 

private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        if (t != null) {
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }

 

       直接new一个工作线程Worker对象,封装目标任务firstTask,用线程工厂创建一个新线程t,然后t又赋值给Workerthread属性,这下对Worker有了新理解吧?线程t 用来执行Worker对象,而Worker对象的thread属性的值=t ,他维护了执行它自己的那个线程。t.start()便开始执行worker.run(),而worker.run中会去执行封装目标任务firstTask.run()方法。当然Worker还没有完,后面还会分析到。

       addIfUnderCorePoolSize(command)会返回false吗?如果corePoolSize设置的足够大,基本就不会失败,那execute(command)方法做完这句if语句判断就结束了,压根不需要进入花括号继续。但是不幸的是,addIfUnderCorePoolSize还是经常会失败的。所以接下来还得看看if大括号里面的逻辑。

 

2. 第五行if (runState == RUNNING && workQueue.offer(command))

       解释:runState表示这个TheadPoolExecutor的状态,可以有4个状态

        a)      RUNNING可接收新任务并执行任务队列

        b)      SHUTDOWN不能接收新任务,但可以继续执行任务队列

        c)       STOP不能接收新任务,也不在处理任务队列,并且中断正在执行的任务

        d)      TERMINATEDSTOP基础上,所有线程都已终止

    程序若到了这一步,说明当前线程数量poolSize >=核心线程数量corePoolSize这里先判断是不是这个TheadPoolExecutor还是RUNNING状态,若是则试着加入到任务队列workQueue中,无法加入的唯一可能就是队列已经满了。先说没满的情况,就是任务加入到任务队列成功。按照常理,加入了队列以后,只要保证有工作线程就ok了,工作线程会自动去执行任务队列的。所以判断一下if ( runState != RUNNING || poolSize == 0),在这个iftrue时候,去保证一下任务队列有线程会执行,即执行ensureQueuedTaskHandled(command)方法。这里有两种情况,情况一:runState != RUNNING这种情况在ensureQueuedTaskHandled方法中会把任务丢给reject拒绝策略处理,情况二poolSize == 0这种情况是new一个新线程加入到工作线程集合workers中。

 

3. 第九行else if (!addIfUnderMaximumPoolSize(command))

       解释:程序执行到这个分支,说明上面第五行if条件为false,也就是说当前状态runState = RUNNING或者任务队列workQueue已经满了。先看第一个条件下,前面解释过runState,除了RUNNING状态,其他三个状态都不能接收新任务,所以当runState = RUNNING时新任务只能根据reject策略拒绝,而这个拒绝的逻辑是在addIfUnderMaximumPoolSize方法中实现的;再看第二个条件下,workQueue已经满,潜在的条件是runState == RUNNING这种情况怎么处理新任务呢?很简单,若当前线程数量已经poolSize没有达到最大线程数量maxPoolSize则创建新的线程去执行这个无法加入任务队列的新任务,否则就根据reject策略拒绝,这里的拒绝逻辑就在这个else if条件成立的子句中做的,即第10reject(command);

 

      到此,任务异步执行的整个过程execute(Runnable command)分析完毕,顺便画个流程图作为本文的结尾,本来应该呼应一下前文,完成对工作线程Worker类的分析,包括选取执行任务队列、及在任务队列为空时将Worker对象回收,但是看来得放到下一篇了。


  • 大小: 9.9 KB
  • 大小: 43.1 KB
分享到:
评论
2 楼 秦时明月黑 2014-06-12  
三年了,距今三年了,三年前已经有人熟读源码,膜拜,邀想2012年刚毕业
1 楼 fengzhonghun102 2014-03-18  
太专业了! 非常感谢!

相关推荐

    ThreadPoolExecutor线程池原理及其execute方法(详解)

    下面小编就为大家带来一篇ThreadPoolExecutor线程池原理及其execute方法(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧

    线程池ThreadPoolExecutor

    void execute(Runnable command) :执行任务/命令,没有返回值,一般用来执行Runnable Future submit(Callable task):执行任务,有返回值,一般又来执行Callable void shutdown() :关闭线程池 ...

    线程池核心组件源码剖析.docx

    抽象类 AbstractExecutorService 主要对公共行为 submit()系列方法进行了实现,这些 submit()方法 的实现使用了 模板方法模式,其中调用的 execute()方法 是未实现的 来自 Executor 接口 的方法。实现类 ...

    java线程池概念.txt

    前面的判断过程中并没有加锁,因此可能在execute方法判断的时候poolSize小于corePoolSize,而判断完之后,在其他线程中又向线程池提交了任务,就可能导致poolSize不小于corePoolSize了,所以需要在这个地方继续判断 ...

    JAVA线程池的分析和使用

    合理利用线程池能够带来三个好处。第一:降低资源消耗。...创建一个线程池需要输入几个参数:我们可以使用execute提交的任务,但是execute方法没有返回值,所以无法判断任务是否被线程池执行成功。通过以下代码

    在Android线程池里运行代码任务实例

    在ThreadPoolExecutor.execute()里传入 Runnable对象启动任务。这个方法会把任务添加到线程池工作队列。当有空闲线程时,管理器会取出等待最久的任务,在线程上运行。 代码如下: public class PhotoManager {  ...

    第7章-JUC多线程v1.1.pdf

    线程池的顶层接口是Executor, 这个接口定义了一个核心方法executor(Runnable command), 这个方法最后被ThreadPoolExecutor类实现, 这个方法用来传入任务, 并且该类是线程池的核心类, 构造方法如下 : public ...

    java线程池源码-cThreadPool:JAVA线程池源码分析与重写

    ThreadPoolExecutor类下部分方法和内部类介绍: 1、Worker类: 描述:Worker类实现Runnable接口、继承AbstractQueuedSynchronizer类 Thread thread : 工作线程,用于处理任务 Runnable firstTask : 第一个任务,当...

    天行线程池代理

    要在线程之间传播上下文,我们可能首先要增强Runnable,但这并不好,因为Runnable不仅用于线程中,因此应用ThreadPoolExecutor#execute的建议来包装Runnable参数是一个不错的选择,但是很难做到SkyWalking的Java...

    lasync:使执行程序服务更困难

    如果使用常规的则ThreadPoolExecutor调用不会阻塞的队列的“ ”方法:插入任务并返回true,或者在队列“容量受限”且达到其容量的情况下返回false。 尽管此行为很有用,但在某些情况下,我们确实需要阻止并等待,...

    Java并发编程(学习笔记).xmind

    afterExecute(Runnable r, Throwable t) beforeExecute(Thread t, Runnable r) terminated 递归算法的并行化 构建并发应用程序 任务执行 在线程中执行任务 清晰的任务边界以及明确的任务执行...

    java head space.txt

    at org.apache.shiro.subject.support.DelegatingSubject.execute(DelegatingSubject.java:387) at org.apache.shiro.web.servlet.AbstractShiroFilter.doFilterInternal(AbstractShiroFilter.java:362) at org....

Global site tag (gtag.js) - Google Analytics