java 线程池 ThreadPoolExecutor 部分源码分析
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了java 线程池 ThreadPoolExecutor 部分源码分析,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含5671字,纯文字阅读大概需要9分钟。
内容图文
![java 线程池 ThreadPoolExecutor 部分源码分析](/upload/InfoBanner/zyjiaocheng/1165/7e48569d4dd640889d1dd99daf576ca4.jpg)
首先放上参考链接,博主分析比较细致: https://fangjian0423.github.io/2016/03/22/java-threadpool-analysis/
1. 首先必须清楚这么几个常量,主要思想是用一个int型表示线程池状态及数量,用int高三位表示状态,低29位表示线程数,所以,线程池最大线程数为 :1 << 29 - 1 而不是 1 << 32 - 1。
2. 基于以上基础,线程池状态可以用 当前线程所表示的数字 c & ~((1 << 29) - 1) 可得线程状态, 当前线程所表示的数字 c & (1 << 29) - 1 可得线程池中当前线程数,状态 rs | wc(线程数) 可获得用于表示线程池当前信息的数字,所对应的的方法是
private static int runStateOf(int c) { return c & ~CAPACITY; } privatestaticint workerCountOf(int c) { return c & CAPACITY; } privatestaticint ctlOf(int rs, int wc) { return rs | wc; }
/** 照抄源码注释,嘻嘻,状态说明,代码中的注释,可以说很详细了: RUNNING: Accept new tasks and process queued tasks SHUTDOWN: Don‘t accept new tasks, but process queued tasks STOP: Don‘t accept new tasks, don‘t process queued tasks, and interrupt in-progress tasks TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method TERMINATED: terminated() has completed 线程状态之间的变化,运行状态随时间单调增加,但不必达到每个状态 RUNNING -> SHUTDOWN On invocation of shutdown(), perhaps implicitly in finalize() (RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow() SHUTDOWN -> TIDYING When both queue and pool are empty STOP -> TIDYING When pool is empty TIDYING -> TERMINATED When the terminated() hook method has completed */ private static final int COUNT_BITS = Integer.SIZE - 3; privatestaticfinalint CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bitsprivatestaticfinalint RUNNING = -1 << COUNT_BITS; privatestaticfinalint SHUTDOWN = 0 << COUNT_BITS; privatestaticfinalint STOP = 1 << COUNT_BITS; privatestaticfinalint TIDYING = 2 << COUNT_BITS; privatestaticfinalint TERMINATED = 3 << COUNT_BITS;
execute 代码中也有注释,分布,写的也很清楚
public void execute(Runnable command) { if (command == null) thrownew NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn‘t, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */// 获取线程池信息 数量 + 状态int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 当前执行线程数小于核心线程数,添加工作线程if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // 线程池是运行中并往工作队列添加任务成功(大于核心线程数 小于工作队列数)int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 线程池状态不是运行中则丢弃任务 reject(command); elseif (workerCountOf(recheck) == 0) // 工作线程数为空则添加工作任务 addWorker(null, false); } elseif (!addWorker(command, false)) // 超过任务队列数,则当前工作线程增加至最大线程数,添加失败则拒绝 reject(command); }
addWorker 分析,主要做了两件事, 1: 改变表示线程池的数字, 2:增加执行线程
// 两个参数,firstTask表示需要跑的任务。boolean类型的core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小 // 返回值是boolean类型,true表示新任务被接收了,并且执行了。否则是false private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 线程池当前状态 // 这个判断转换成 rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty)。 // 概括为3个条件: // 1. 线程池不在RUNNING状态并且状态是STOP、TIDYING或TERMINATED中的任意一种状态 // 2. 线程池不在RUNNING状态,线程池接受了新的任务 // 3. 线程池不在RUNNING状态,阻塞队列为空。 满足这3个条件中的任意一个的话,拒绝执行任务if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) returnfalse; for (;;) { int wc = workerCountOf(c); // 线程池线程个数if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 如果线程池线程数量超过线程池最大容量或者线程数量超过了基本大小(core参数为true,core参数为false的话判断超过最大大小)returnfalse; // 超过直接返回falseif (compareAndIncrementWorkerCount(c)) // 没有超过各种大小的话,cas操作线程池线程数量+1,cas成功的话跳出循环break retry; c = ctl.get(); // 重新检查状态if (runStateOf(c) != rs) // 如果状态改变了,重新循环操作continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 走到这一步说明cas操作成功了,线程池线程数量+1boolean workerStarted = false; // 任务是否成功启动标识boolean workerAdded = false; // 任务是否添加成功标识 Worker w = null; try { final ReentrantLock mainLock = this.mainLock; // 得到线程池的可重入锁 w = new Worker(firstTask); // 基于任务firstTask构造workerfinal Thread t = w.thread; // 使用Worker的属性thread,这个thread是使用ThreadFactory构造出来的if (t != null) { // ThreadFactory构造出的Thread有可能是null,做个判断 mainLock.lock(); // 锁住,防止并发try { // 在锁住之后再重新检测一下状态int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 如果线程池在RUNNING状态或者线程池在SHUTDOWN状态并且任务是个nullif (t.isAlive()) // 判断线程是否还活着,也就是说线程已经启动并且还没死掉thrownew IllegalThreadStateException(); // 如果存在已经启动并且还没死的线程,抛出异常 workers.add(w); // worker添加到线程池的workers属性中,是个HashSetint s = workers.size(); // 得到目前线程池中的线程个数if (s > largestPoolSize) // 如果线程池中的线程个数超过了线程池中的最大线程数时,更新一下这个最大线程数 largestPoolSize = s; workerAdded = true; // 标识一下任务已经添加成功 } } finally { mainLock.unlock(); // 解锁 } if (workerAdded) { // 如果任务添加成功,运行任务,改变一下任务成功启动标识 t.start(); // 启动线程,这里的t是Worker中的thread属性,所以相当于就是调用了Worker的run方法 workerStarted = true; } } } finally { if (! workerStarted) // 如果任务启动失败,调用addWorkerFailed方法 addWorkerFailed(w); } return workerStarted; }
原文:https://www.cnblogs.com/wanglg629/p/11238948.html
内容总结
以上是互联网集市为您收集整理的java 线程池 ThreadPoolExecutor 部分源码分析全部内容,希望文章能够帮你解决java 线程池 ThreadPoolExecutor 部分源码分析所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。