Java进阶系列-线程池的使用

唉,线程池真要详细写的话感觉好多啊,所以我这就挑重点写了,想更深入的可以直接看jdk源码。

emm…阿里告诫我们不要显示的创建线程,因为这样会很浪费资源,比如说来一个任务就创建一个线程给他的话,那么总得执行时间就是创建新线程的时间+任务执行时间+销毁线程的时间,如果任务本身执行时间远小于线程创建销毁的时间的话,这样的操作就会很不合理,线程也没法复用,就浪费掉了。

所以阿里给我们的建议是用线程池,来新任务直接丢给池子就好了。

先看看线程池一共有五种状态,分别是:

  1. RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务
  2. SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态)
  3. STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态
  4. TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态
  5. TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做

其中进入TERMINATED的条件如下:

  • 线程池不是RUNNING状态
  • 线程池状态不是TIDYING状态或TERMINATED状态
  • 如果线程池状态是SHUTDOWN并且workerQueue为空
  • workerCount为0
  • 设置TIDYING状态成功

线程池的核心类是java.util.concurrent下的ThreadPoolExecutor类,来看下继承关系

Executor是超级接口,只有一个抽象方法void execute(Runnable command)

ExecutorService接口继承了Executor,是真正的线程池接口,增加了线程池相关的方法

AbstractExecutorService抽象类实现了ExecutorService接口中的大部分方法

TheadPoolExecutor继承了AbstractExecutorService,是线程池的具体实现

ScheduledExecutorService接口继承了ExecutorService接口,提供了带”周期执行”功能ExecutorService

ScheduledThreadPoolExecutor既继承了TheadPoolExecutor线程池,也实现了ScheduledExecutorService接口,是带”周期执行”功能的线程池

Executors是线程池的静态工厂,提供了快捷创建线程池的静态方法,方便不太熟悉线程池机制的开发者调用,其中包含了以下:

  • newFixedThreadPool(创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,其中参数 corePoolSize 和 maximumPoolSize 相等,阻塞队列基于LinkedBlockingQueue)
  • newSingleThreadExecutor(初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行,内部使用LinkedBlockingQueue作为阻塞队列)
  • newCachedThreadPool(创建一个可缓存工作线程的线程池,默认存活时间60秒,线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列)
  • newScheduledThreadPool(初始化的线程池可以在指定的时间内周期性的执行所提交的任务)

但是阿里明确建议了不要用线程池的工厂

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

那就进入ThreadPoolExecutor的学习

直接看他的几个构造函数,提供了4个重载,其实就只有一个,因为其他的三个肯定都是调用的参数最全的那一个

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize, 
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • corePoolSize(核心线程数,在当前线程数小于它时,进来一个任务就会分配一个新线程执行。注意一下线程池的初始线程数是为0的,除非调用ThreadPoolExecutor的prestartAllCoreThreads()方法,线程池会启动所有核心线程)
  • maximumPoolSize(当上面的核心线程数不够用时的补救措施,怎么说呢,就是如果corePoolSize里的线程都在忙碌状态,阻塞队列也满了,这时会创建新线程去执行,但不会无休止的创建,这个前提就是线程数最大不能超过maximumPoolSize,超过就实施下面提到的拒绝策略)
  • keepAliveTime(空闲线程的存活时间,在allowCoreThreadTimeOut为false时,大于corePoolSize的线程会在存活时间后销毁,allowCoreThreadTimeOut设置为true时,线程池里的所有线程都会受存活时间的影响)
  • workQueue(workQueue必须是BlockingQueue阻塞队列,当线程池中的线程数超过corePoolSize的时候,线程会进入阻塞队列进行阻塞等待)
  • threadFactory(可参考ThreadPoolExecutor内部的DefaultThreadFactory)
  • RejectedExecutionHandler(上面提到的拒绝策略,有AbortPolicy:直接抛出异常,默认策略;CallerRunsPolicy:用调用者所在的线程来执行任务;DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;DiscardPolicy:直接丢弃任务)

关于workQueue的实现有SynchronousQueue、LinkedBlockingQueue(底层是链表,属于无界队列)、ArrayBlockingQueue(底层是数组,有界)。

关于SynchronousQueue这个队列很有意思,他没容量的,isEmpty()方法永远返回是true,机制就是每个插入操作必须等待另一个线程的对应移除操作,每个移除操作也必须等待另一个线程的对应插入操作,所以ThreadPoolExecutor试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,这么看来这个阻塞队列其实就是舍去了排队策略,newCachedThreadPool工厂就是用的它来实现的。严格来说内部其实不能存任何一个元素的它,不算容器,我觉得像个类似信道的东西,重点在传递,有兴趣的可以单独去看下,实现很复杂。

现在整理下线程池调度策略

  1. 如果线程池中的线程数量少于corePoolSize,就创建新的线程来执行新添加的任务
  2. 如果线程池中的线程数量大于等于corePoolSize,但队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执行(线程池中有线程空闲出来后依次将队列中的任务交付给空闲的线程执行)
  3. 如果线程池中的线程数量大于等于corePoolSize,且队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务
  4. 如果线程池中的线程数量等于了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理

好了,现在我们实践下吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(5), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 5; ++i) {
threadPool.execute(() -> {
try {
Thread.currentThread().sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "已被执行");
});
System.out.println("线程池中线程数目:" + threadPool.getPoolSize()
+ ",队列中等待执行的任务数目:" + threadPool.getQueue().size()
+ ",已执行完成的任务数目:" + threadPool.getCompletedTaskCount());
}
threadPool.shutdown();
}
}

每个线程让他sleep5秒,然后输出自己当前的线程名。这里lambda不懂得去看我之前的文章。

现在是创建了5个线程,正好等于定义的corePoolSize大小,所以不会有线程排队的,最后线程池里会有5个线程

输出:

线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完成的任务数目:0
pool-1-thread-1已被执行
pool-1-thread-4已被执行
pool-1-thread-3已被执行
pool-1-thread-5已被执行
pool-1-thread-2已被执行

然后增大循环次数,改成i < 6,这时候应该会有一个任务会在阻塞队列里等的

输出:

线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完成的任务数目:0
pool-1-thread-4已被执行
pool-1-thread-1已被执行
pool-1-thread-5已被执行
pool-1-thread-3已被执行
pool-1-thread-2已被执行
pool-1-thread-1已被执行

看到了吧,最后一个任务最开始被放到了队列里,但线程池中线程数依然是5,因为队列没满,我定义的有界队列容量是10,所以继续增大循环

现在i < 13差不多队列就会满了,这时候线程数会超过corePoolSize

输出:

线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行完成的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行完成的任务数目:0
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行完成的任务数目:0
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完成的任务数目:0
pool-1-thread-2已被执行
pool-1-thread-6已被执行
pool-1-thread-5已被执行
pool-1-thread-4已被执行
pool-1-thread-1已被执行
pool-1-thread-8已被执行
pool-1-thread-7已被执行
pool-1-thread-3已被执行
pool-1-thread-4已被执行
pool-1-thread-6已被执行
pool-1-thread-5已被执行
pool-1-thread-2已被执行
pool-1-thread-1已被执行

最后我们看看到maximumPoolSize大小时的报异常,i < 15吧,应该够了

输出:

线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完成的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行完成的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行完成的任务数目:0
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行完成的任务数目:0
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完成的任务数目:0
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行完成的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行完成的任务数目:0
Exception in thread “main” java.util.concurrent.RejectedExecutionException: Task Main$$Lambda$1/303563356@776ec8df rejected from java.util.concurrent.ThreadPoolExecutor@4eec7777[Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
at Main.main(Main.java:11)
pool-1-thread-1已被执行
pool-1-thread-3已被执行
pool-1-thread-5已被执行
pool-1-thread-4已被执行
pool-1-thread-2已被执行
pool-1-thread-8已被执行
pool-1-thread-7已被执行
pool-1-thread-6已被执行
pool-1-thread-9已被执行
pool-1-thread-10已被执行
pool-1-thread-3已被执行
pool-1-thread-4已被执行
pool-1-thread-5已被执行
pool-1-thread-1已被执行
pool-1-thread-2已被执行

果然异常了,这里还可以观察到有的线程名比如pool-1-thread-3出现了好几次,这就是线程被复用的效果。

差不多完了,累死,其实很多细节的东西得自己慢慢看,这里只是基本的入门。

文章作者: Shawn Qin
文章链接: https://qinshuang1998.github.io/2019/02/09/java-progress-01/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Shawn's Blog