Java Executor 框架

背景

大多数并发应用程序都是围绕“任务执行(Task Execution)”来构造的:任务通常是些抽象且离散的工作单元。任务执行时应该选择清晰的任务边界以及明确的任务执行策略。执行策略有如下几种:

  • 串行执行
    在多任务中执行性能太差,只能一个个顺序处理,无法提高吞吐率或快速响应性
  • 显示创建线程
    new Thread(task).start():显示创建。但是大量创建会有如下问题:线程生命周期开销非常高;资源消耗:尤其是内存,当可运行线程数量多于处理器数量时,会出现大量闲置进程,会占用很多内存,线程竞争 CPU 资源时也会产生性能消耗;稳定性;
  • 使用并发框架
    如:Executor 框架

Executor 框架简介

该框架基于生产者-消费者模式,支持多种不同类型的任务执行策略,提供了标准方法将任务的提交和执行解耦,并用 Runnable 来表示任务。提供了对生命周期的支持,以及统计信息收集,程序管理和性能监视等。

接口定义

1
2
3
public interface Executor {
void execute(Runnable command);
}

执行策略

在执行策略中定义了任务执行的“What, Where, When, How”等方面:

  • 在什么(What)线程中执行任务?
  • 任务按照什么(What)顺序执行(FIFO, LIFO,优先级)?
  • 有多少个(How Many)任务能并发执行?
  • 在队列中有多少个(How Many)任务在等待执行?
  • 如果系统由于过载而需要拒绝一个任务,那么应该选择哪一个(Which)任务?另外,如何(How)通知应用程序有任务被拒绝?

执行策略是资源管理工具,最佳策略取决于可用的计算资源以及服务质量的需求。通过限制并发任务的数量,可用确保应用程序不会由于资源耗尽而失败,或者由于资源竞争影响性能。

Executor 框架类图结构

0007-java-executor-Executor-framework.png

  • Executor
    接口定义了一个接收 Runnable 对象的方法 execute
  • ExecutorService
    Executor 使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回 Future 的方法
  • AbstractExecutorService
    抽象类,ExecutorService 执行方法的默认实现
  • ScheduledExecutorService
    接口,可定时调度任务
  • ThreadPoolExecutor
    线程池的核心实现类,用来执行被提交的任务
  • ScheduledThreadPoolExecutor
    继承 ThreadPoolExecutor并实现ScheduledExecutorService ,一个可定时调度任务的线程池
  • Executors
    提供了一系列静态工厂方法和共用方法,创建 Executor 框架的相关类:ExecutorService, ScheduledExecuorSevice, ThreadFactor, Callable...

初识线程池

线程池是指管理一组同构工作线程的资源池,其中工作队列(Work Queue)保存了所有等待执行的任务,工作者线程(Worker Thread)的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。通过重用线程池中现有线程而不是创建新线程,可以在处理多个请求时分摊线程创建和销毁的巨大开销;另外任务请求到达时,工作线程已经存在,提高了响应性;调整线程池大小,可以防止过多线程相互竞争资源导致耗尽内存

基本接口

Runnable 接口定义

1
2
3
public interface Runnable {
public abstract void run();
}

可以看到 Runnable 是没有返回值的

Callable 接口定义

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

在多线程并发中,我们需要获取任务的执行结果,显然 Runnable 无法实现。所以重新定义了 Callable 返回任务结果 V 或者抛出异常。

Future 接口定义

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

Future 表示异步执行的结果。可以对具体的 Runnable 或者 Callable 任务进行取消、查询是否取消/完成、获取结果。通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。

RunnableFuture 接口定义

1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

RunnableFuture 可被执行并能异步获取结果。

FutureTask

RunnableAdapter

1
2
3
4
5
6
7
8
9
10
11
12
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}

作用:将 Runnableresult 封装成 Callable
这个适配器很简单,就是简单的实现了 Callable 接口,在 call() 实现中调用Runnable.run() 方法,然后把传入的 result 作为任务的结果返回。

FutureTask 类定义

1
2
3
4
public class FutureTask<V> implements RunnableFuture<V> {
public FutureTask(Callable<V> callable) {...}
public FutureTask(Runnable runnable, V result) {...}
}
  • 可以通过构造函数注入 RunnableCallable。如果传入的是 Runnableresult, 将通过 RunnableAdapter 封装成 Callable
  • FutureTaskRunnableFuture 的实现类,既能当做一个 Runnable 直接被 Thread 执行,也能作为 Future 用来得到执行结果(构造函数传入的 Callable 的结果)或者抛出异常。通常作为任务提交后的异步返回的结果,也可以用来取消任务等。

0007-java-executor-future-task-class.png

任务运行的状态及转换

FutureTask 在任务执行过程中,记录了如下 7 种状态:

  • NEW:新建
  • COMPLETING
    任务已经执行完成,但是任务的结果还在保存中
  • NORMAL
    任务执行完后的结果正常
  • EXCEPTIONAL
    任务执行完后的结果异常
  • CANCELLED:取消
    任务还没开始执行或者已经开始执行但是还没有执行完成的时候被取消
  • INTERRUPTING:中断中
  • INTERRUPTED:已经被中断

可能出现的状态转换:

1
2
3
4
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

任务在构造函数中初始化的状态都是 NEW,所有流程都是以此为起点。

任务执行

任务执行是执行者调用的,不用主动去执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

可以从源码中得到这几个信息:

  • 任务运行前,状态一定是 NEW
  • 调用传入的Callable.call 执行任务
  • 正确执行,保存执行结果 Callableresult
  • 抛出异常时,保存异常 Throwable

ExecutorService

生命周期

ExecutorService 的生命周期有三种状态:运行,关闭和终止。

生命周期管理

ExecutorService 提供了管理 Executor 生命周期的方法:

1
2
3
4
5
6
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
  • ExecutorService 在初始创建时处于运行状态
  • shutdown 方法将会执行平缓的关闭过程:不再接受新的任务,同时等待提交的任务执行完成(包含那些还未开始执行的任务)
  • shutdownNow 方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,而且不再启动队列中尚未开始执行的任务。

提交任务

1
2
3
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

不管是 Runnable 还是 Callable 都可以作为任务提交,在执行 submit 时都会将任务封装成 Future 的实现类实例,调用 Executor.execute 来执行,并同时作为结果返回。

线程池

类定义及构造方法

1
2
3
4
5
6
7
8
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {...}
...
}

参数说明:

  • corePoolSize
    核心池线程数大小
  • maximumPoolSize
    线程池允许创建线程数最大值
  • keepAliveTime
    空闲线程存活时间:如果工作线程数多于 corePoolSize,则这些多的线程的空闲时间超过 keepAliveTime 时将被终止
  • unit
    keepAliveTime 参数的时间单位
  • workQueue
    缓存任务的阻塞队列
  • threadFactory
    使用 ThreadFactory 创建新线程,默认使用 defaultThreadFactory 创建线程
  • handle
    定义处理被拒绝任务的策略,默认使用 ThreadPoolExecutor.AbortPolicy,任务被拒绝时将抛出 RejectExecutorException

几个重要成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 核心线程池大小
private volatile int corePoolSize;
// 线程池最大值
private volatile int maximumPoolSize;
// 超过 corePoolSize外的线程空闲存活之间
private volatile long keepAliveTime;
// 任务缓存队列,用来保存等待中的任务,等待 worker 线程空闲时执行任务
private final BlockingQueue<Runnable> workQueue;
// 线程工厂,用来新建线程
private volatile ThreadFactory threadFactory;
// 任务拒绝策略
private volatile RejectedExecutionHandler handler;

// 更新 poolSize, corePoolSize,maximumPoolSize, runState, and workers set 时需要持有这个锁
private final ReentrantLock mainLock = new ReentrantLock();
// 用来保存工作中的执行线程
private final HashSet<Worker> workers = new HashSet<Worker>();
// 是否对 corePoolSize 内的线程设置空闲存活时间
private volatile boolean allowCoreThreadTimeOut;
// 记录线程池中出现过的最大线程数大小
private int largestPoolSize;
// 已经执行完的线程数
private long completedTaskCount;

线程池的优势

合理利用线程池能够带来三个好处:

  • 降低资源消耗
    通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度
    当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性
    线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池的大小

一般需要根据任务的类型来配置线程池大小:

  • CPU 密集型任务
    就需要尽量压榨 CPU,参考值可以设为 NCPU + 1
  • IO 密集型任务
    参考值可以设置为 2*NCPU

可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

线程池计数及状态

定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

计数及状态解析

使用 AtomicInteger ctl 一个值,来同时表示当前线程数和线程池的状态。其中:

  • runState: 高 3 位,表示线程池状态
  • workerCount:剩下的 29 位,表示当前线程池的工作线程数
  • COUNT_BITS:表示线程数计数的位数为 29
  • CAPACITY:表示最大的工作线程数计数容量
  • ctlOf:表示将状态和计数组合成一个值
  • runStateOf:从 ctl 中解析状态
  • workerCountOf:从 ctl 中解析线程计数

计数增减

使用 CAS: Compare And Set 算法(无锁算法)实现共享变量 ctl 的自增自减(复合)操作,尽量确保多线程安全:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}

/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

五种状态

  • RUNNING
    接受新任务并处理队列中的任务
  • SHUTDOWN
    不接受新任务但仍然处理队列中的任务
  • STOP
    不接受新任务,也不处理队列中的任务,并且设置正在进行中任务的中断标志位
  • TIDYING
    所有任务都结束了,wokrCount 设置为 0,并调用钩子方法 terminated()
  • TERMINATED
    钩子方法 terminated() 执行完毕

状态转移及其含义

  • RUNNING -> SHUTDOWN
    调用 shutdown(),可能出现在 finalize 中调用
  • (RUNNING or SHUTDOWN) -> STOP
    调用 shutdownNow()
  • STOP -> TIDYING
    当线程池为空时
  • TIDYING -> TERMINATED
    当钩子方法 terminated() 执行完毕时

线程池中线程创建及任务执行

Worker 工作线程类

类定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
...
}
  • AQS: AbstractQueuedSynchronizer: 用来做锁控制的,分为:独享功能和共享功能
  • Worker 实现了 AQS 独占锁功能,主要维护任务执行线程的中断控制状态
  • Worker 实现了 Runnable 并封装了任务(Runnable)和线程
  • 线程池的所有线程都是在 Worker 的构造方法中通过线程工厂新建的
  • Worker.run 直接调用了线程池的 runWorker(this),可以参考后面的分析

任务提交及线程创建策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

任务执行 execute 源码很短,流程也很清晰:异步执行一个指定的任务,可以新开一个线程或者线程池中已有的线程来执行。如果线程池已经处于关闭状态(不是RUNNING 状态,只有该状态才会接受新任务)或者已经达到最大容量(maximumPoolSize),将会拒绝该任务。源码分析:

  • 如果任务为空,直接抛出空指针异常
  • workerCount/wc = workerCountOf(ctl.get()):线程池中的当前线程数
  • 如果 workerCount < corePoolSize,每次都会新建一个 Worker 线程去执行这个任务
  • 如果 workerCount >= corePoolSize,则每来一个任务会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行
  • 如果队列添加失败(一般来说是任务缓存队列已满),直接尝试新建线程(前提条件:workerCount < maximumPoolSize)执行该任务,如果新建失败拒绝服务

线程创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private boolean addWorker(Runnable firstTask, boolean core) {
...
// 状态判断 RUNNING 才能接受新任务
// 线程数计数判断 workerCount, wc >= (core ? corePoolSize : maximumPoolSize)


boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建 Worker 工作线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加到 set 中
workers.add(w);
int s = workers.size();
// 线程池大小存在动态调整,记录largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 添加成功后,开启线程准备异步执行
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
add

源码中可以看到,在新建的 Worker 工作线程开始执行前,会反复确认线程池状态(RUNNING),并且加锁二次判断后,添加到 HashSet 中保存所有的执行线程,添加成功则开始执行任务,同时记录线程池曾经出现的最大线程数 largestPoolSize

任务运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果任务不为空,执行当前任务;
// 如果任务为空,线程复用,从缓冲队列中拿出任务执行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 钩子方法,任务执行前调用
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 任务正式开始运行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子方法,任务执行后调用
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
}

源码可以看出:

  • Worker 线程先执行当前任务,如果执行完毕会复用该线程,从阻塞队列中获取排队的任务继续执行
  • 任务执行前会将当前 Worker 加锁,避免在任务执行过程中被线程池中断
  • 任务执行前后有钩子方法可以调用 beforeExecute/afterExecute

从缓冲队列中获取任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
// 状态检查
...

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 线程数检查
...

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

源码分析:

  • 取任务前先对线程池状态和当前线程数做判断,是否符合要求
  • 如果允许空闲线程存活 allowCoreThreadTimeOut || wc > corePoolSize,则缓存队列最多等待 keepAliveTime 时间任务入队;否则队列中阻塞等待取出任务

阻塞队列

在线程池中的作用

队列大小和最大池大小可能需要相互折衷:

  • 大型队列 + 小型池
    可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。
  • 小型队列 + 大型池
    CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

常见队列及特性

在前面我们多次提到了任务缓存队列,即 workQueue,它用来存放等待执行的任务。
workQueue 的类型为 BlockingQueue<Runnable>,通常可以取下面三种类型:

  • ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小
  • LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为 Integer.MAX_VALUE,是一个无界队列
  • synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务

拒绝策略

在线程池中的作用

四个策略都在线程池中都是内部类。

常见策略及特性

1
2
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

当线程池的任务缓存队列已满并且线程池中的线程数目达到 maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

  • ThreadPoolExecutor.AbortPolicy
    丢弃任务并抛出 RejectedExecutionException 异常。
  • ThreadPoolExecutor.DiscardPolicy
    不能执行的任务被丢弃,但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy
    丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
  • ThreadPoolExecutor.CallerRunsPolicy
    调用线程处理该任务,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

使用方式

原理

通信机制

Executors

创建常见线程池

  • 固定长度线程池
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory);
    }

创建一个固定长度的线程池,每提交一个任务就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(除非某个线程异常结束后会补充一个新线程)。特点:可以重用固定数量线程的线程池,处理一个共享的无边界队列 。任何时间点,最多有 nThreads 个线程会处于活动状态执行任务。如果当所有线程都是活动时,有多的任务被提交过来,那么它会一致在队列中等待直到有线程可用。如果任何线程在执行过程中因为错误而中止,新的线程会替代它的位置来执行后续的任务。所有线程都会一致存于线程池中,直到显式的执行 ExecutorService.shutdown() 关闭。
LinkedBlockingQueue 作为线程池的工作队列,是一个无界队列,当线程池的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池的线程数量不会超过 corePoolSize,同时 maxiumPoolSize 也就变成了一个无效的参数,并且运行中的线程池并不会拒绝任务。

  • 可缓存无限容量线程池
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>(),
    threadFactory);
    }

corePoolSize = 0; maxiumPoolSize = Integer.MAX_VALUE,即线程池的规模不做限制是无界的。在线程可用时,重用之前构造好的池中线程。这个线程池在执行大量短生命周期的异步任务时(many short-lived asynchronous task),可以显著提高程序性能。调用 execute 时,可以重用之前已构造的可用线程,如果不存在可用线程,那么会重新创建一个新的线程并将其加入到线程池中。如果线程超过 60 秒还未被使用,就会被中止并从缓存中移除。因此线程池在长时间空闲后不会消耗任何资源。

  • 单线程池
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory));
    }

特殊的固定长度线程池,只有一个线程来执行任务,可以确保任务按照队列中的顺序串行执行。使用单个工作线程来执行一个无边界的队列。(如果单个线程在执行过程中因为某些错误中止,新的线程会替代它执行后续线程)。它可以保证认为是按顺序执行的,任何时候都不会有多于一个的任务处于活动状态。和 newFixedThreadPool(1) 的区别在于,如果线程遇到错误中止,它是无法使用替代线程的。

  • 延时线程池
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
    (new ScheduledThreadPoolExecutor(1));
    }
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
    (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public static ScheduledExecutorService newScheduledThreadPool(
    int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

延迟线程池的构造函数中:maxiumPoolSize = Integer.MAX_VALUE,表明线程池最大容量是无界的。以延迟或定时的方式来执行任务。

示例

先简单总结下 Runnable, Callable, Future, FutureTaskExecutorService.submit 的关系和概念:

  • Runnable:作为 submit 的参数,不需要返回值
  • Callable:作为 submit 的参数,并定义返回值类型
  • Future
    表示异步执行的返回值。作为 submit 的返回值,实际是对 Callable 返回值的一个再次封装。
  • FutureTask
    作为 submit 的参数,但是因为实现了 Future 所以同时包含了 submit 的返回值。查看源码发现上面的 Runnable, Callable 都会先被封装成 FutureTask 再去执行 submit

Runnable

并不关心返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TestRunnable {
public static void main(String[] args) {
ExecutorService single = Executors.newSingleThreadExecutor();
single.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("TestRunnable::run...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

single.shutdown();
System.out.println("TestRunnable::main exit.");
}
}

// 运行结果
TestRunnable::main exit.
TestRunnable::run...

Callable + Future

Callable 作为参数传入,Future 作为异步结果返回。通过 Future.get() 来同步等待获取线程执行的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TestCallableFuture {

public static void main(String[] args)
throws ExecutionException, InterruptedException {
ExecutorService single = Executors.newSingleThreadExecutor();
Future<String> stringFuture = single.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(200);
System.out.println("TestCallableFuture::Callable::call");
return "TestCallableFuture::Callable::Call::result";
}
});

single.shutdown();
System.out.println("TestCallableFuture::main, wait");
String result = stringFuture.get();
System.out.println("TestCallableFuture:: result = " + result);
System.out.println("TestCallableFuture::main, exit.");
}
}

// 执行结果
TestCallableFuture::main, wait
TestCallableFuture::Callable::call
TestCallableFuture:: result = TestCallableFuture::Callable::Call::result
TestCallableFuture::main, exit.

Callable 转换为 FutureTask

Callable 先转换为 FutureTask,作为参数传入,等待执行完毕后能够通过 FutureTask.get() 同步等待获取结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TestCallableFutureTask {
public static void main(String[] args)
throws ExecutionException, InterruptedException {
ExecutorService single = Executors.newSingleThreadExecutor();
FutureTask<String> stringFutureTask = new FutureTask<String>(
new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(200);
System.out.println("*::FutureTask::Callable::call");
return "*::FutureTask::Callable::call:result";
}
});
single.submit(stringFutureTask);
single.shutdown();
System.out.println("TestCallableFutureTask::wait...");
String result = stringFutureTask.get();
System.out.println("TestCallableFutureTask::main,result =" + result);
System.out.println("TestCallableFutureTask::main, exit.");
}
}

// 执行结果
TestCallableFutureTask::wait...
TestCallableFutureTask::FutureTask::Callable::call
TestCallableFutureTask::main, result = *::FutureTask::Callable::call:result
TestCallableFutureTask::main, exit.

参考文档

  1. Java线程池(ThreadPool)详解
  2. ThreadPool用法与优势
  3. java并发编程–Executor框架
  4. Executor框架的详解
  5. 深入学习FutureTask
  6. Java 并发
0%