Java 线程池之 ThreadPoolExecutor 源码分析
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Java 线程池之 ThreadPoolExecutor 源码分析,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含9928字,纯文字阅读大概需要15分钟。
内容图文
![Java 线程池之 ThreadPoolExecutor 源码分析](/upload/InfoBanner/zyjiaocheng/840/5a680ff5323043e78ae47e982489c2bc.jpg)
Java 线程池源码分析(基于JDK1.8):
ThreadPoolExecutor 是Java最常用的线程池,今天来分享下源码分析,以下是ThreadPoolExecutor具体类继承关系,以及方法详情
我们看到ExecutorService接口,提供了submit(Runnable)、submit(Runnable,T)、submit(Callable<T>)三个接口方法,以供调用,具体实现在AbstractExecutorService中,ThreadPoolExecutor并没有对其进一步封装,那么我们以提交一个Runable接口实例入手,看下如何进行submit的。
查看AbstractExecutorService.submit(Runable)方法实现:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
我们看下这里的newTaskFor实现
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
这里可以看到,把task包装成了一个RunnableFuture对象,这个对象的具体实现为FutureTask,下图为FutureTask的类继承关系
看下这里的构造方法
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
可以看出,这里又将Runnable包装成callable了,这里具体包装成的对象为:
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
包装的call方法,直接运行task.run,并且返回预设的result值。
其实这个对象就是调用AbstractExecutorService.submit(Callable<T>)方法时,返回的Future结果,可以获取线程返回值,已经取消线程运行等操作。
OK,我们继续往下看,执行了execute(ftask),这个方法定义在最上层的Executor接口中,具体实现在ThreadPoolExecutor中,我们看下具体的源码实现:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/**
处理分为3步
1. 如果当前运行线程数,小于corePoolSize,则尝试启动一个新线程,否则执行下一步
2. 如果任务加入队列成功,则入队,否则执行下一步
3. 如果入队失败,则尝试启动一个新线程,如果启动失败,执行拒绝策略
*/
// ctl 是一个AtmoicInteger变量,最高3位,表示当前运行的状态分别为RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED,
// 后29为表示当前正在运行的线程数,所以线程数最多为2^29-1,而不是2^32-1
int c = ctl.get();
// 判断当前正在运行线程数是否小于corePoolSize, 成立则尝试新建线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// db-check 入队是否成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 入队失败,则尝试添加新线程运行任务
else if (!addWorker(command, false))
reject(command);
}
以上代码为ThreadPoolExecutor.execute(Runnable)具体实现,可以看出添加一个任务到线程池重的流程,首先判断corePoolSize,再判断入队列,最后尝试新启动线程,这三步都是上面一步成功,都不会继续往下执行。
这里有个重要方法addWork(Runnable,boolean),这是ThreadPoolExecutor的一个私有方法,我们来分析如何启动线程,并运行task任务的。具体实现如下:
//添加新任务firstTask, core表示是否为核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
对于addWorker方法,我们看到retry标签,retry标签下的for循环,是用来循环检查是否满足启动新线程的条件,判断条件为当前为RUNNING状态,或者处于SHUTDOWN状态并且队列不为空,第一个条件可以理解,第二条件是当执行了shutdown方法时,继续运行未运行完成的线程。retry标签后,就是启动新线程的过程了,新线程为Worker的一个实例,构造一个Woker实例后,添加到workers集合中。我们可以在57行看到t.start(),这里就具体启动了Woker实例中的线程了
这里遇到了Worker类,这个具体是什么,我们现在分析下这个类,先看下这个类构造方法的定义:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
这个Worker类,其实就是对Runnable的包装,使用线程工程方法,构造新线程。主要看下这个Worker的run方法实现:
public void run() {
runWorker(this);
}
可以看出,这里的run方法,去调用了ThreadPoolExecutor.runWorker(Worker)方法,查看代码如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
这里可以看到while (task != null || (task = getTask()) != null)方法,这个循环可以看出,当然给定的任务task不为空,则运行,否则可以getTask, getTask方法,其实是从队列中取一个任务运行。进一步看下while循环内部做了什么事情,首先判断线程池是否还在运行状态,其次判断当前线程是否被中断。然后执行beforeExecute、task.run以及afterExecute方法,可以看出,如果我们想在任务执行前后执行预制或后续处理,可以扩展beforeExecute或afterExecute方法。这里执行了task.run方法,因为task就是我实际传递过来的对象,run方法就是我们具体写的逻辑,这里开始运行我们的任务了。
以上分析,是针对如果提交运行一个Runnable接口实例
OK,上述分析解决了如何运行没有返回值的Runnable对象,那么针对有返回值的Callable对象,又是怎么运行的呢?
上述在分析Runnable的时候,我们看到了一个针对Runnable的封装类FutureTask,其实当AbstractExecutorService.submit(Callable<T>)时,也会构造一个FutureTask任务
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
我们看下如何构造出RunnableFuture的,
// 这里是AbstractExecutorService类的方法
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
// 这里是FutureTask的构造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
上述分析,这里会调用FutureTask的run方法,执行任务,所以FutureTask应该也会有run方法的实现:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
上面代码就是对run方法的实现,我们可以看到,方法中直接调用了c.call()方法,并且将返回值赋值给了result,最后set到FutureTask中,set方法当然不是简单的赋值操作,我们来看下实现:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
这里可以看到,设置返回值时,会将当前FutureTask的实例属性stateOffset从NEW状态,设置为COMPLETING状态。
这里,我们还会思考一个问题,在FutureTask调用get方法获取结果时,可以传递一个超时时间,我们看下具体代码:
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
这里可以看出,其实get方法,一直在等待FutureTask的状态变成COMPLETING,并设置等待时间,当set方法设置COMPLETING时,get方法才能正确返回结果。
总结:
- ThreadPoolExecutor提交任务的方法submit,定义在ExecutorServer中,实现在AbstractExecutorService中
- 利用submit提交Runnable或Callable时,都会包装成FutureTask运行
- 提交任务Task时,并不一定会启动新线程,判断流程为,先判断coreSize是否达到,再判断入队是否成功,最后判断直接启动新线程是否成功(这里主要看是否达到maximumPoolSize)
- 运行任务时,会执行Worker工作线程,工作线程运行任务的run方法
- 运行任务时,可以通过扩展方法,设置beforeExecute和afterExecute
- 提交Callable任务时,会运行FutureTask.set方法,将result,设置会FutureTask实例中,并将状态设置为COMPLETING
- get方法会等待FutureTask任务状态变为COMPLETING
下一篇,我们继续分析ScheduledThreadPoolExecutor源码实现
内容总结
以上是互联网集市为您收集整理的Java 线程池之 ThreadPoolExecutor 源码分析全部内容,希望文章能够帮你解决Java 线程池之 ThreadPoolExecutor 源码分析所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。