Java线程池介绍

线程池

为什么要使用线程池?

池化技术 (Pool) 是一种很常见的编程技巧,在请求量大时能明显优化应用性能,降低系统频繁建连的资源开销。我们日常工作中常见的有数据库连接池、线程池、对象池等,它们的特点都是将 “昂贵的”、“费时的” 的资源维护在一个特定的 “池子” 中,规定其最小连接数、最大连接数、阻塞队列等配置,方便进行统一管理和复用,通常还会附带一些探活机制、强制回收、监控一类的配套功能。

普通情况下,我们需要使用线程的时候就直接去创建一个线程,这样子操作的方式十分简单,但是在高并发的情况下就会产生问题:

并发的线程数量过多,而每个线程完成其任务后便会被销毁,往往其执行的时间很短,这样频繁地创建线程就会大大降低系统的效率,因为频繁创建和销毁线程时间的开销很大。

线程池提供了一种限制和管理资源的Pool。每个线程池还会维护一些基本信息,例如已经完成任务的数量。线程池可以使得线程可以复用,就是执行完一个任务但不被销毁,而是可以继续执行其他的任务。

《Java 并发编程的艺术》中关于使用线程池的优点

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

实现Runnable接口和Callable接口的区别

两者最大的区别,实现Callable接口的任务线程能返回执行结果,而实现Runnable接口的任务线程不能返回执行结果。Runnable自Java 1.0以来就一直存在,而Callable在Java 1.5后才加入,目的就是为了处理Runnable不支持的场景。

工具类 Executors 可以实现将 Runnable 对象转换成 Callable 对象。(Executors.callable(Runnable task)Executors.callable(Runnable task, Object result)

Runnable.java

1
2
3
4
5
6
7
@FunctionalInterface
public interface Runnable {
/**
* 被线程执行,没有返回值也无法抛出异常
*/
public abstract void run();
}

Callable.java

1
2
3
4
5
6
7
8
9
@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果,或在无法这样做时抛出异常。
* @return 计算得出的结果
* @throws 如果无法计算结果,则抛出异常
*/
V call() throws Exception;
}

执行execute()方法和submit()方法的区别

  1. execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。

  2. submit()方法用于提交需要返回值的任务。线程池会返回一个Future类型的对象。通过这个Future对象可以判断任务是否执行成功,并且可以通过Future对象的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后直接返回,这时候有可能任务还没有执行完。

    接下来看一下AbstractExecutorService 接口下的sumbit方法:

1
2
3
4
5
6
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

上面的方法中调用了newTaskFor方法并且返回了一个FutureTask对象:

1
2
3
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

excute()方法没有返回值:

1
2
3
public void execute(Runnable command) {
...
}

示例一:使用get()方法获取返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ExecutorService executorService = Executors.newFixedThreadPool(3);

Future<String> submit = executorService.submit(() -> {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});

String s = submit.get();
System.out.println(s);
executorService.shutdown();

输出:

1
abc

示例2:使用get(long timeout,TimeUnit unit)方法获取返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ExecutorService executorService = Executors.newFixedThreadPool(3);

Future<String> submit = executorService.submit(() -> {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});

String s = submit.get(3, TimeUnit.SECONDS);
System.out.println(s);
executorService.shutdown();

输出:

1
2
3
Exception in thread "main" java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:204)
at Test.main(Test.java:42)

如何创建线程池

方法一:通过构造方法

image-20220210192503528

ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么),这里就不贴代码讲了,比较简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 核心线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数:

  1. keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;

  2. unit : keepAliveTime 参数的时间单位。

  3. threadFactory :executor 创建新线程的时候会用到。

  4. handler :饱和策略。关于饱和策略下面单独介绍一下。

    ThreadPoolExecutor 饱和策略定义:

    简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:

    如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
    如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
    如果workerCount >= corePoolSize 并且 workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
    如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。ThreadPoolTaskExecutor 定义一些策略:

    • ThreadPoolExecutor.AbortPolicy 抛出 RejectedExecutionException来拒绝新任务的处理。
    • ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务 ,也就是直接由调用execute方法的线程去执行被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
    • ThreadPoolExecutor.DiscardPolicy 不处理新任务,直接丢弃掉。
    • ThreadPoolExecutor.DiscardOldestPolicy 此策略将丢弃最早的未处理的任务请求。

    举个例子: Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。

方式二:通过 Executor 框架的工具类 Executors 来实现

我们可以创建三种类型的 ThreadPoolExecutor:

  • FixedThreadPool : 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。

  • **SingleThreadExecutor**: 该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。

  • **CachedThreadPool**: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

    对应Executors工具类中的方法如图所示:

image-20220210193814940

​ 这些new*()方法内部其实还是调用了ThreadPoolExecutor的构造方法。例如newSingleThreadExecutor(ThreadFactory threadFactory)方法。

image-20220210215206393


《阿里巴巴Java开发手册》中强制线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式可以更加明确线程池的运行规则,从而降低资源耗尽的风险。

阿里巴巴为什么不允许使用Executors去创建线程池?

  1. 缓存队列LinkedBlockingQueue没有设置固定容量大小

    1.1 Executors.newFixedThreadPool()

    1
    2
    3
    4
    5
    6
    // 创建固定大小的线程池
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }

    ThreadPoolExecutor 部分参数:

    • corePoolSize :线程池中核心线程数的最大值。此处为 nThreads个。
    • maximumPoolSize :线程池中能拥有最多线程数 。此处为 nThreads 个。
    • LinkedBlockingQueue : 用于缓存任务的阻塞队列 。 此处没有设置容量大小,默认是 Integer.MAX_VALUE,可以认为是无界的。

    综上源码可以得出,虽然newFixedThreadPool()中可以给定核心线程数和最大线程数,并且固定为nThreads个,但是当线程数超过nThreads时,多余的线程会加入到LinkedBlockingQueue中,而LinkedBlockingQueue相当于时无界的,会导致其无限增大,最终导致内存溢出。

    1.2 Executors.newSingleThreadExecutor()

    1
    2
    3
    4
    5
    6
    7
    // 创建只有单个线程的线程池
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }

    优点: 创建一个单线程的线程池,保证线程的顺序执行;
    缺点:newFixedThreadPool() 相同。

    总结:newFixedThreadPool()newSingleThreadExecutor() 底层代码 中 LinkedBlockingQueue 没有设置容量大小,默认是 Integer.MAX_VALUE, 可以认为是无界的。线程池中 多余的线程会被缓存到 LinkedBlockingQueue中,最终会导致内存溢出。

  2. 最大线程数量是Integer.MAX_VALUE

    2.1 Executors.newCachedThreadPool()

    缓存线程池,线程池的数量可能不固定,可以根据需求自动更改数量。

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }

    ThreadPoolExecutor 部分参数:

    • corePoolSize :线程池中核心线程数的最大值。此处为 0 个。
    • maximumPoolSize :线程池中能拥有最多线程数。此处为 Integer.MAX_VALUE 。可以认为是无限大。

    CachedThreadPoolcorePoolSize 被设置为空(0),maximumPoolSize被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。

    CachedThreadPool()excute()方法执行示意图:

    image-20220213214438769

    上图说明:

    1. 首先执行 SynchronousQueue.offer(Runnable task) 提交任务到任务队列。如果当前 maximumPool 中有空闲线程则执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成,否则执行下面的步骤 2;
    2. 当初始 maximumPool 为空,或者 maximumPool 中没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤 1 将失败,此时 CachedThreadPool 会创建新线程执行任务,execute 方法执行完成;

    2.2 Executors.newScheduledThreadPool()

    创建固定大小的线程,可以延迟或者定时地执行任务。下面介绍一下ScheduledThreadPoolExecutor

    1
    2
    3
    4
    5
    6
    7
    8
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public static ScheduledExecutorService newScheduledThreadPool(
    int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

    ScheduledThreadPoolExecutor 使用的任务队列 DelayQueue 封装了一个 PriorityQueuePriorityQueue 会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTasktime 变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTasksquenceNumber 变量小的先执行)。

    ScheduledThreadPoolExecutorTimer 的比较:

    • Timer 对系统时钟的变化敏感,ScheduledThreadPoolExecutor不是;
    • Timer 只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor 可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程;
    • TimerTask 中抛出的运行时异常会杀死一个线程,从而导致 Timer 死机:即计划任务将不再运行。ScheduledThreadExecutor 不仅捕获运行时异常,还允许您在需要时处理它们(通过重写 afterExecute 方法ThreadPoolExecutor)。抛出异常的任务将被取消,但其他任务将继续运行。

    ScheduledThreadPoolExecutor运行机制如图所示:

    image-20220213215519598

    ScheduledThreadPoolExecutor 的执行主要分为两大部分:

    1. 当调用 ScheduledThreadPoolExecutorscheduleAtFixedRate() 方法或者 scheduleWithFixedDelay() 方法时,会向 ScheduledThreadPoolExecutorDelayQueue 添加一个实现了 RunnableScheduledFuture 接口的 ScheduledFutureTask
    2. 线程池中的线程从 DelayQueue 中获取 ScheduledFutureTask,然后执行任务。

    ScheduledThreadPoolExecutor 为了实现周期性的执行任务,对 ThreadPoolExecutor做了如下修改:

    • 使用 DelayQueue 作为任务队列;
    • 获取任务的方不同
    • 执行周期任务后,增加了额外的处理

    优点: 创建一个固定大小线程池,可以定时或周期性的执行任务 ;
    缺点:newCachedThreadPool() 相同。

    总结:newCachedThreadPool()newScheduledThreadPool() 的底层代码中的最大线程数maximumPoolSizeInteger.MAX_VALUE,可以认为是无限大,如果线程池中,执行中的线程没有及时结束,并且不断地有线程加入并执行,最终会导致内存溢出。

  3. 饱和策略不能自定义

    Executors 底层其实是使用的 ThreadPoolExecutor 的方式创建的,但是使用的是 ThreadPoolExecutor 的默认策略,即 AbortPolicy

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    //默认策略
    private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

    //构造函数
    public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    Executors.defaultThreadFactory(), defaultHandler);
    }

示例代码1:Runnable+ThreadPoolExecutor

首先创建一个 Runnable 接口的实现类(也可以是 Callable 接口)

MyRunnable.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.Date;

/**
* 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
* @author shuang.kou
*/
public class MyRunnable implements Runnable {

private String command;

public MyRunnable(String s) {
this.command = s;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
processCommand();
System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
}

private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public String toString() {
return this.command;
}
}

编写测试程序,这里使用阿里巴巴推荐的方式 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

ThreadPoolExecutorDemo.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {
// 核心线程数
private static final int CORE_POOL_SIZE = 5;
// 最大线程数
private static final int MAX_POOL_SIZE = 10;
// 阻塞队列的容量
private static final int QUEUE_CAPACITY = 100;
// 生存时间
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {

//使用阿里巴巴推荐的创建线程池的方式
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());

for (int i = 0; i < 10; i++) {
//创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
Runnable worker = new MyRunnable("" + i);
//执行Runnable
executor.execute(worker);
}
//终止线程池
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}

可以看到我们上面的代码指定了:

  1. corePoolSize: 核心线程数为 5。
  2. maximumPoolSize :最大线程数 10。
  3. keepAliveTime : 等待时间为 1L。
  4. unit: 等待时间的单位为 TimeUnit.SECONDS。
  5. workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100。
  6. handler:饱和策略为 CallerRunsPolicy

Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020

示例代码2:Callable+ThreadPoolExecutor

MyCallable.java

1
2
3
4
5
6
7
8
9
10
import java.util.concurrent.Callable;

public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
//返回执行当前 Callable 的线程名字
return Thread.currentThread().getName();
}
}

CallableDemo.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CallableDemo {

private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;

public static void main(String[] args) {

//使用阿里巴巴推荐的创建线程池的方式
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());

List<Future<String>> futureList = new ArrayList<>();
Callable<String> callable = new MyCallable();
for (int i = 0; i < 10; i++) {
//提交任务到线程池
Future<String> future = executor.submit(callable);
//将返回值 future 添加到 list,我们可以通过 future 获得 执行 Callable 得到的返回值
futureList.add(future);
}
for (Future<String> fut : futureList) {
try {
System.out.println(new Date() + "::" + fut.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
//关闭线程池
executor.shutdown();
}
}

输出:

1
2
3
4
5
6
7
8
9
10
Wed Nov 13 22:40:41 CST 2021::pool-1-thread-1
Wed Nov 13 22:40:42 CST 2021::pool-1-thread-2
Wed Nov 13 22:40:42 CST 2021::pool-1-thread-3
Wed Nov 13 22:40:42 CST 2021::pool-1-thread-4
Wed Nov 13 22:40:42 CST 2021::pool-1-thread-5
Wed Nov 13 22:40:42 CST 2021::pool-1-thread-3
Wed Nov 13 22:40:43 CST 2021::pool-1-thread-2
Wed Nov 13 22:40:43 CST 2021::pool-1-thread-1
Wed Nov 13 22:40:43 CST 2021::pool-1-thread-4
Wed Nov 13 22:40:43 CST 2021::pool-1-thread-5

线程池原理分析

通过上面代码的输出结果可以看出:线程池首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。

现在,通过分析上面的输出内容来简单分析一下线程池原理。

为了理解线程池的原理,需要首先分析一下 execute方法。 在上述的Demo 中使用 了executor.execute(worker)来提交一个任务到线程池中去,这个方法非常重要,下面来看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 在未来的某个时间执行给定的任务。该任务可以在新线程或现有池线程中执行。如果任务无法提交执行,要么是因为这个执行器已经关闭,	要么是因为它的容量已经达到,任务由当前的 RejectedExecutionHandler处理。
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int workerCountOf(int c) {
return c & COUNT_MASK;
}
//任务队列
private final BlockingQueue<Runnable> workQueue;

public void execute(Runnable command) {
// 如果任务为null,则抛出异常。
if (command == null)
throw new NullPointerException();
// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();

// 下面会涉及到3步操作
// 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize
// 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果当前执行的任务数量大于等于 corePoolSize的时候就会走到这里
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程池为空就新创建一个线程并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
//这种情况就是核心线程池已满,等待队列已满但是还没有达到maximumPoolSize(即线程池还没有满),那就创建一个新的线程去执行该任务。
else if (!addWorker(command, false))
reject(command);
}

下图展示了任务提交的处理流程:

图解线程池实现原理

addWorker 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则返回 false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
 // 全局锁(RenntrantLock:可重入锁)
private final ReentrantLock mainLock = new ReentrantLock();
// 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问
private int largestPoolSize;
// 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<>();
//获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//判断线程池的状态是否为 Running
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}


/**
* 添加新的工作线程到线程池
* @param firstTask 要执行
* @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小
* @return 添加成功就返回true否则返回false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
// SHUTDOWN=0
// 首先判断线程池是否是非RUNNING状态,不是RUNNING状态那么继续判断线程池是不是STOP状态,是STOP状态直接返回false,不是STOP状态继续判断firstTask是否为null,如果firstTask为null,那么直接返回false,如果firstTask不为null,那么继续判断工作队列是否为空,如果为空,那么返回false,否则往下执行。
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;

for (;;) {
//获取线程池中线程的数量
int wc = workerCountOf(c);
// core参数为true的话表明队列也满了,线程池大小变为 maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//原子操作将workcount的数量加1
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果线程的状态改变了就再次执行上述操作
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 标记工作线程是否启动成功
boolean workerStarted = false;
// 标记工作线程是否创建成功
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池状态
int c = ctl.get();
//isRunning(c) 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中
//(runStateLessThan(c, STOP) && firstTask == null)) 如果线程池状态小于STOP,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
// firstTask == null证明只新建线程而不执行任务
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
//如果这个线程不是未启动状态,会抛出异常
//Thread.State.NEW: Thread state for a thread which has not yet started.
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
// 工作线程启动成功
workerAdded = true;
//更新当前工作线程的最大容量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
// 释放锁
mainLock.unlock();
}
//// 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
if (workerAdded) {
t.start();
/// 标记线程启动成功
workerStarted = true;
}
}
} finally {
// 线程启动失败,需要从工作线程中移除对应的Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

addWorker执行流程如下:

申请线程执行流程图

关于线程池源码分析更多的内容参考以下文章:

JUC 线程池 ThreadPoolExecutor 源码分析 (opens new window)

Java线程池实现原理及其在美团业务中的实践

如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答

Demo代码中模拟了 10 个任务,配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。

作者

Cindy

发布于

2022-03-12

许可协议

CC BY-NC-SA 4.0

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×