Java线程池ThreadPoolExecutor源码分析
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Java线程池ThreadPoolExecutor源码分析,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含8557字,纯文字阅读大概需要13分钟。
内容图文
ThreadPoolExecutor是jdk内置线程池的一个实现,基本上大部分情况都会使用这个线程池完成各项操作。
本文分析ThreadPoolExecutor的实现原理。
ThreadPoolExecutor的状态和属性
ThreadPoolExecutor的属性在之前的一篇java内置的线程池笔记文章中解释过了,本文不再解释。
ThreadPoolExecutor线程池有5个状态,分别是:
RUNNING:可以接受新的任务,也可以处理阻塞队列里的任务
SHUTDOWN:不接受新的任务,但是可以处理阻塞队列里的任务
STOP:不接受新的任务,不处理阻塞队列里的任务,中断正在处理的任务
TIDYING:过渡状态,也就是说所有的任务都执行完了,当前线程池已经没有有效的线程,这个时候线程池的状态将会TIDYING,并且将要调用terminated方法
TERMINATED:终止状态。terminated方法调用完成以后的状态
状态之间可以进行转换:
RUNNING -> SHUTDOWN:手动调用shutdown方法,或者ThreadPoolExecutor要被GC回收的时候调用finalize方法,finalize方法内部也会调用shutdown方法
(RUNNING or SHUTDOWN) -> STOP:调用shutdownNow方法
SHUTDOWN -> TIDYING:当队列和线程池都为空的时候
STOP -> TIDYING:当线程池为空的时候
TIDYING -> TERMINATED:terminated方法调用完成之后
ThreadPoolExecutor内部还保存着线程池的有效线程个数。
状态和线程数在ThreadPoolExecutor内部使用一个整型变量保存,没错,一个变量表示两种含义。
为什么一个整型变量既可以保存状态,又可以保存数量? 分析一下:
首先,我们知道java中1个整型占4个字节,也就是32位,所以1个整型有32位。
所以整型1用二进制表示就是:00000000000000000000000000000001
整型-1用二进制表示就是:11111111111111111111111111111111(这个是补码,不懂的同学可以看下原码,反码,补码的知识)
在ThreadPoolExecutor,整型中32位的前3位用来表示线程池状态,后3位表示线程池中有效的线程数。
// 前3位表示状态,所有线程数占29位
private static final int COUNT_BITS = Integer.SIZE - 3;
线程池容量大小为 1 << 29 - 1 = 00011111111111111111111111111111(二进制),代码如下
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
RUNNING状态 -1 << 29 = 11111111111111111111111111111111 << 29 = 11100000000000000000000000000000(前3位为111):
private static final int RUNNING = -1 << COUNT_BITS;
SHUTDOWN状态 0 << 29 = 00000000000000000000000000000000 << 29 = 00000000000000000000000000000000(前3位为000)
private static final int SHUTDOWN = 0 << COUNT_BITS;
STOP状态 1 << 29 = 00000000000000000000000000000001 << 29 = 00100000000000000000000000000000(前3位为001):
private static final int STOP = 1 << COUNT_BITS;
TIDYING状态 2 << 29 = 00000000000000000000000000000010 << 29 = 01000000000000000000000000000000(前3位为010):
private static final int TIDYING = 2 << COUNT_BITS;
TERMINATED状态 3 << 29 = 00000000000000000000000000000011 << 29 = 01100000000000000000000000000000(前3位为011):
private static final int TERMINATED = 3 << COUNT_BITS;
清楚状态位之后,下面是获得状态和线程数的内部方法:
// 得到线程数,也就是后29位的数字。 直接跟CAPACITY做一个与操作即可,CAPACITY就是的值就 1 << 29 - 1 = 00011111111111111111111111111111。 与操作的话前面3位肯定为0,相当于直接取后29位的值
private static int workerCountOf(int c) { return c & CAPACITY; }
// 得到状态,CAPACITY的非操作得到的二进制位11100000000000000000000000000000,然后做在一个与操作,相当于直接取前3位的的值
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 或操作。相当于更新数量和状态两个操作
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池初始化状态线程数变量:
// 初始化状态和数量,状态为RUNNING,线程数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ThreadPoolExecutor执行任务
使用ThreadPoolExecutor执行任务的时候,可以使用execute或submit方法,submit方法如下:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
很明显地看到,submit方法内部使用了execute方法,而且submit方法是有返回值的。在调用execute方法之前,使用FutureTask包装一个Runnable,这个FutureTask就是返回值。
由于submit方法内部调用execute方法,所以execute方法就是执行任务的方法,来看一下execute方法,execute方法内部分3个步骤进行处理。
如果当前正在执行的Worker数量比corePoolSize(基本大小)要小。直接创建一个新的Worker执行任务,会调用addWorker方法
如果当前正在执行的Worker数量大于等于corePoolSize(基本大小)。将任务放到阻塞队列里,如果阻塞队列没满并且状态是RUNNING的话,直接丢到阻塞队列,否则执行第3步。丢到阻塞队列之后,还需要再做一次验证(丢到阻塞队列之后可能另外一个线程关闭了线程池或者刚刚加入到队列的线程死了)。如果这个时候线程池不在RUNNING状态,把刚刚丢入队列的任务remove掉,调用reject方法,否则查看Worker数量,如果Worker数量为0,起一个新的Worker去阻塞队列里拿任务执行
丢到阻塞失败的话,会调用addWorker方法尝试起一个新的Worker去阻塞队列拿任务并执行任务,如果这个新的Worker创建失败,调用reject方法
上面说的Worker可以暂时理解为一个执行任务的线程。
execute方法源码如下,上面提到的3个步骤对应源码中的3个注释:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf© < corePoolSize) { // 第一个步骤,满足线程池中的线程大小比基本大小要小
if (addWorker(command, true)) // addWorker方法第二个参数true表示使用基本大小
return;
c = ctl.get();
}
if (isRunning© && workQueue.offer(command)) { // 第二个步骤,线程池的线程大小比基本大小要大,并且线程池还在RUNNING状态,阻塞队列也没满的情况,加到阻塞队列里
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 虽然满足了第二个步骤,但是这个时候可能突然线程池关闭了,所以再做一层判断
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 第三个步骤,直接使用线程池最大大小。addWorker方法第二个参数false表示使用最大大小
reject(command);
}
addWorker关系着如何起一个线程,再看addWorker方法之前,先看一下ThreadPoolExecutor的一个内部类Worker, Worker是一个AQS的实现类(为何设计成一个AQS在闲置Worker里会说明),同时也是一个实现Runnable的类,使用独占锁,它的构造函数只接受一个Runnable参数,内部保存着这个Runnable属性,还有一个thread线程属性用于包装这个Runnable(这个thread属性使用ThreadFactory构造,在构造函数内完成thread线程的构造),另外还有一个completedTasks计数器表示这个Worker完成的任务数。Worker类复写了run方法,使用ThreadPoolExecutor的runWorker方法(在addWorker方法里调用),直接启动Worker的话,会调用ThreadPoolExecutor的runWork方法。需要特别注意的是这个Worker是实现了Runnable接口的,thread线程属性使用ThreadFactory构造Thread的时候,构造的Thread中使用的Runnable其实就是Worker。下面的Worker的源码:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 使用ThreadFactory构造Thread,这个构造的Thread内部的Runnable就是本身,也就是Worker。所以得到Worker的thread并start的时候,会执行Worker的run方法,也就是执行ThreadPoolExecutor的runWorker方法
setState(-1); 把状态位设置成-1,这样任何线程都不能得到Worker的锁,除非调用了unlock方法。这个unlock方法会在runWorker方法中一开始就调用,这是为了确保Worker构造出来之后,没有任何线程能够得到它的锁,除非调用了runWorker之后,其他线程才能获得Worker的锁
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
接下来看一下addWorker源码:
// 两个参数,firstTask表示需要跑的任务。boolean类型的core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小
// 返回值是boolean类型,true表示新任务被接收了,并且执行了。否则是false
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;
内容总结
以上是互联网集市为您收集整理的Java线程池ThreadPoolExecutor源码分析全部内容,希望文章能够帮你解决Java线程池ThreadPoolExecutor源码分析所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。