Java线程池介绍
线程池
为什么要使用线程池?
池化技术 (Pool) 是一种很常见的编程技巧,在请求量大时能明显优化应用性能,降低系统频繁建连的资源开销。我们日常工作中常见的有数据库连接池、线程池、对象池等,它们的特点都是将 “昂贵的”、“费时的” 的资源维护在一个特定的 “池子” 中,规定其最小连接数、最大连接数、阻塞队列等配置,方便进行统一管理和复用,通常还会附带一些探活机制、强制回收、监控一类的配套功能。
普通情况下,我们需要使用线程的时候就直接去创建一个线程,这样子操作的方式十分简单,但是在高并发的情况下就会产生问题:
并发的线程数量过多,而每个线程完成其任务后便会被销毁,往往其执行的时间很短,这样频繁地创建线程就会大大降低系统的效率,因为频繁创建和销毁线程时间的开销很大。
线程池提供了一种限制和管理资源的Pool。每个线程池还会维护一些基本信息,例如已经完成任务的数量。线程池可以使得线程可以复用,就是执行完一个任务但不被销毁,而是可以继续执行其他的任务。
《Java 并发编程的艺术》中关于使用线程池的优点:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
实现Runnable接口和Callable接口的区别
两者最大的区别,实现Callable
接口的任务线程能返回执行结果,而实现Runnable
接口的任务线程不能返回执行结果。Runnable
自Java 1.0以来就一直存在,而Callable
在Java 1.5后才加入,目的就是为了处理Runnable
不支持的场景。
工具类
Executors
可以实现将Runnable
对象转换成Callable
对象。(Executors.callable(Runnable task)
或Executors.callable(Runnable task, Object result)
)
Runnable.java
1 |
|
Callable.java
1 |
|
执行execute()方法和submit()方法的区别
execute()
方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。submit()
方法用于提交需要返回值的任务。线程池会返回一个Future
类型的对象。通过这个Future
对象可以判断任务是否执行成功,并且可以通过Future
对象的get()
方法来获取返回值,get()
方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)
方法则会阻塞当前线程一段时间后直接返回,这时候有可能任务还没有执行完。接下来看一下
AbstractExecutorService
接口下的sumbit
方法:
1 | public Future<?> submit(Runnable task) { |
上面的方法中调用了newTaskFor
方法并且返回了一个FutureTask
对象:
1 | protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { |
而excute()
方法没有返回值:
1 | public void execute(Runnable command) { |
示例一:使用get()
方法获取返回值
1 | ExecutorService executorService = Executors.newFixedThreadPool(3); |
输出:
1 | abc |
示例2:使用get(long timeout,TimeUnit unit)
方法获取返回值
1 | ExecutorService executorService = Executors.newFixedThreadPool(3); |
输出:
1 | Exception in thread "main" java.util.concurrent.TimeoutException |
如何创建线程池
方法一:通过构造方法
ThreadPoolExecutor
类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么),这里就不贴代码讲了,比较简单。
1 | public ThreadPoolExecutor(int corePoolSize, |
ThreadPoolExecutor
3 个最重要的参数:
corePoolSize
: 核心线程数定义了最小可以同时运行的线程数量。maximumPoolSize
: 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。workQueue
: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
ThreadPoolExecutor
其他常见参数:
keepAliveTime
:当线程池中的线程数量大于corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
才会被回收销毁;unit
:keepAliveTime
参数的时间单位。threadFactory
:executor 创建新线程的时候会用到。handler
:饱和策略。关于饱和策略下面单独介绍一下。ThreadPoolExecutor
饱和策略定义:简单来说,在执行
execute()
方法时如果状态一直是RUNNING
时,的执行过程如下:如果
workerCount
<corePoolSize
,则创建并启动一个线程来执行新提交的任务;
如果workerCount
>=corePoolSize
,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
如果workerCount
>=corePoolSize
并且workerCount
<maximumPoolSize
,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
如果workerCount
>=maximumPoolSize
,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。ThreadPoolTaskExecutor
定义一些策略:ThreadPoolExecutor.AbortPolicy
: 抛出RejectedExecutionException
来拒绝新任务的处理。ThreadPoolExecutor.CallerRunsPolicy
: 由调用线程处理该任务 ,也就是直接由调用execute
方法的线程去执行被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。ThreadPoolExecutor.DiscardPolicy
: 不处理新任务,直接丢弃掉。ThreadPoolExecutor.DiscardOldestPolicy
: 此策略将丢弃最早的未处理的任务请求。
举个例子: Spring 通过
ThreadPoolTaskExecutor
或者我们直接通过ThreadPoolExecutor
的构造函数创建线程池的时候,当我们不指定RejectedExecutionHandler
饱和策略的话来配置线程池的时候默认使用的是ThreadPoolExecutor.AbortPolicy
。在默认情况下,ThreadPoolExecutor
将抛出RejectedExecutionException
来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用ThreadPoolExecutor.CallerRunsPolicy
。当最大池被填满时,此策略为我们提供可伸缩队列。
方式二:通过 Executor 框架的工具类 Executors 来实现
我们可以创建三种类型的 ThreadPoolExecutor:
FixedThreadPool
: 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。**
SingleThreadExecutor
**: 该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。**
CachedThreadPool
**: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。对应
Executors
工具类中的方法如图所示:
这些new*()方法内部其实还是调用了ThreadPoolExecutor
的构造方法。例如newSingleThreadExecutor(ThreadFactory threadFactory)
方法。
《阿里巴巴Java开发手册》中强制线程池不允许使用Executors
去创建,而是通过ThreadPoolExecutor
的方式,这样的处理方式可以更加明确线程池的运行规则,从而降低资源耗尽的风险。
阿里巴巴为什么不允许使用Executors去创建线程池?
缓存队列
LinkedBlockingQueue
没有设置固定容量大小1.1
Executors.newFixedThreadPool()
1
2
3
4
5
6// 创建固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}ThreadPoolExecutor 部分参数:
corePoolSize
:线程池中核心线程数的最大值。此处为nThreads
个。maximumPoolSize
:线程池中能拥有最多线程数 。此处为nThreads
个。LinkedBlockingQueue
: 用于缓存任务的阻塞队列 。 此处没有设置容量大小,默认是Integer.MAX_VALUE
,可以认为是无界的。
综上源码可以得出,虽然
newFixedThreadPool()
中可以给定核心线程数和最大线程数,并且固定为nThreads
个,但是当线程数超过nThreads
时,多余的线程会加入到LinkedBlockingQueue
中,而LinkedBlockingQueue
相当于时无界的,会导致其无限增大,最终导致内存溢出。1.2
Executors.newSingleThreadExecutor()
1
2
3
4
5
6
7// 创建只有单个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}优点: 创建一个单线程的线程池,保证线程的顺序执行;
缺点: 与newFixedThreadPool()
相同。总结:
newFixedThreadPool()
、newSingleThreadExecutor()
底层代码 中LinkedBlockingQueue
没有设置容量大小,默认是Integer.MAX_VALUE
, 可以认为是无界的。线程池中 多余的线程会被缓存到LinkedBlockingQueue
中,最终会导致内存溢出。最大线程数量是
Integer.MAX_VALUE
2.1
Executors.newCachedThreadPool()
缓存线程池,线程池的数量可能不固定,可以根据需求自动更改数量。
1
2
3
4
5public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}ThreadPoolExecutor 部分参数:
corePoolSize
:线程池中核心线程数的最大值。此处为 0 个。maximumPoolSize
:线程池中能拥有最多线程数。此处为Integer.MAX_VALUE
。可以认为是无限大。
CachedThreadPool
的corePoolSize
被设置为空(0),maximumPoolSize
被设置为Integer.MAX.VALUE
,即它是无界的,这也就意味着如果主线程提交任务的速度高于maximumPool
中线程处理任务的速度时,CachedThreadPool
会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。CachedThreadPool()
的excute()
方法执行示意图:上图说明:
- 首先执行
SynchronousQueue.offer(Runnable task)
提交任务到任务队列。如果当前maximumPool
中有空闲线程则执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
,那么主线程执行offer
操作与空闲线程执行的poll
操作配对成功,主线程把任务交给空闲线程执行,execute()
方法执行完成,否则执行下面的步骤 2; - 当初始
maximumPool
为空,或者maximumPool
中没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
。这种情况下,步骤 1 将失败,此时CachedThreadPool
会创建新线程执行任务,execute 方法执行完成;
2.2
Executors.newScheduledThreadPool()
创建固定大小的线程,可以延迟或者定时地执行任务。下面介绍一下
ScheduledThreadPoolExecutor
:1
2
3
4
5
6
7
8public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}ScheduledThreadPoolExecutor
使用的任务队列DelayQueue
封装了一个PriorityQueue
,PriorityQueue
会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask
的time
变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask
的squenceNumber
变量小的先执行)。ScheduledThreadPoolExecutor
和Timer
的比较:Timer
对系统时钟的变化敏感,ScheduledThreadPoolExecutor
不是;Timer
只有一个执行线程,因此长时间运行的任务可以延迟其他任务。ScheduledThreadPoolExecutor
可以配置任意数量的线程。 此外,如果你想(通过提供ThreadFactory
),你可以完全控制创建的线程;- 在
TimerTask
中抛出的运行时异常会杀死一个线程,从而导致Timer
死机:即计划任务将不再运行。ScheduledThreadExecutor
不仅捕获运行时异常,还允许您在需要时处理它们(通过重写afterExecute
方法ThreadPoolExecutor
)。抛出异常的任务将被取消,但其他任务将继续运行。
ScheduledThreadPoolExecutor
运行机制如图所示:ScheduledThreadPoolExecutor
的执行主要分为两大部分:- 当调用
ScheduledThreadPoolExecutor
的scheduleAtFixedRate()
方法或者scheduleWithFixedDelay()
方法时,会向ScheduledThreadPoolExecutor
的DelayQueue
添加一个实现了RunnableScheduledFuture
接口的ScheduledFutureTask
。 - 线程池中的线程从
DelayQueue
中获取ScheduledFutureTask
,然后执行任务。
ScheduledThreadPoolExecutor
为了实现周期性的执行任务,对ThreadPoolExecutor
做了如下修改:- 使用
DelayQueue
作为任务队列; - 获取任务的方不同
- 执行周期任务后,增加了额外的处理
优点: 创建一个固定大小线程池,可以定时或周期性的执行任务 ;
缺点: 与newCachedThreadPool()
相同。总结:
newCachedThreadPool()
、newScheduledThreadPool()
的底层代码中的最大线程数maximumPoolSize
是Integer.MAX_VALUE
,可以认为是无限大,如果线程池中,执行中的线程没有及时结束,并且不断地有线程加入并执行,最终会导致内存溢出。饱和策略不能自定义
Executors
底层其实是使用的ThreadPoolExecutor
的方式创建的,但是使用的是ThreadPoolExecutor
的默认策略,即AbortPolicy
。1
2
3
4
5
6
7
8
9
10
11
12
13//默认策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
//构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
示例代码1:Runnable
+ThreadPoolExecutor
首先创建一个 Runnable
接口的实现类(也可以是 Callable
接口)
MyRunnable.java
1 | import java.util.Date; |
编写测试程序,这里使用阿里巴巴推荐的方式 ThreadPoolExecutor
构造函数自定义参数的方式来创建线程池。
ThreadPoolExecutorDemo.java
1 | import java.util.concurrent.ArrayBlockingQueue; |
可以看到我们上面的代码指定了:
corePoolSize
: 核心线程数为 5。maximumPoolSize
:最大线程数 10。keepAliveTime
: 等待时间为 1L。unit
: 等待时间的单位为 TimeUnit.SECONDS。workQueue
:任务队列为ArrayBlockingQueue
,并且容量为 100。handler
:饱和策略为CallerRunsPolicy
。
Output:
1 | pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020 |
示例代码2:Callable
+ThreadPoolExecutor
MyCallable.java
1 | import java.util.concurrent.Callable; |
CallableDemo.java
1 | import java.util.ArrayList; |
输出:
1 | Wed Nov 13 22:40:41 CST 2021::pool-1-thread-1 |
线程池原理分析
通过上面代码的输出结果可以看出:线程池首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。
现在,通过分析上面的输出内容来简单分析一下线程池原理。
为了理解线程池的原理,需要首先分析一下 execute
方法。 在上述的Demo 中使用 了executor.execute(worker)
来提交一个任务到线程池中去,这个方法非常重要,下面来看源码:
1 | // 在未来的某个时间执行给定的任务。该任务可以在新线程或现有池线程中执行。如果任务无法提交执行,要么是因为这个执行器已经关闭, 要么是因为它的容量已经达到,任务由当前的 RejectedExecutionHandler处理。 |
下图展示了任务提交的处理流程:
addWorker
这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则返回 false。
1 | // 全局锁(RenntrantLock:可重入锁) |
addWorker
执行流程如下:
关于线程池源码分析更多的内容参考以下文章:
JUC 线程池 ThreadPoolExecutor 源码分析 (opens new window)
Demo代码中模拟了 10 个任务,配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。