Fork me on GitHub

java线程池详解

使用线程池的好处

  1. 降低资源销毁。不用频繁的创建销毁线程,线程可以循环重复使用。
  2. 提高响应速度。每当任务到达时,无需创建新线程。
  3. 提高线程的可管理性。线程池可以统一分配、调优和监控。可以根据系统的承受能力,调整线程的数量,防止因为消耗过多内存导致服务器崩溃。

创建线程

创建线程池有两种方法。一、构造方法创建。二、通过Executor框架的工具类Executors实现。

一、构造方法创建

在ThreadPoolExecutor类中提供了四个构造方法。

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,RejectedExecutionHandler handler);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

参数介绍

这里使用了上面构造方法的第一种进行创建一个线程池

1
2
ThreadPoolExecutor threadPool= new ThreadPoolExecutor(
        10, 15, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(1024));

第一个参数corePoolSize=10 表示这个线程池初始化了10个线程在里面工作
第二个参数maximumPoolSize=15 表示如果10个线程不够用了,就会自动增加到最多15个线程
第三个参数keepAliveTime=60 结合第四个参数TimeUnit.SECONDS,表示经过60秒,多出来的线程还没有接到活儿,就会回收,最后保持池子里就10个
第四个参数TimeUnit.SECONDS 第三参数的单位为秒,有7种静态属性。
第五个参数 new LinkedBlockingQueue() 用来放任务的集合。有三个选择

  • ArrayBlockingQueue;
  • LinkedBlockingQueue;
  • SynchronousQueue;

拒绝策略

handler:表示当处理任务时的四种拒绝策略:

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
  • ThreadPoolExecutor.DiscardPolicy:丢弃任务(当前将要加入队列的任务),但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最老的任务,然后重新尝试执行任务,即重复此过程。
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

二、通过Executor框架的工具类Executors实现

  • newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。适合处理短时间工作任务。
  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool 创建一个定长线程池,支持定时或者周期性任务执行。
  • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,如何有异常结束,会有另一个线程去取代它。保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
  • newWorkStealingPool:1.8版本出现,利用working-stealing算法,可窃取任务,并行处理,需要穿一个并行级别的参数,如果不传,则被设定为默认的CPU数量。
1
2
3
4
5
6
ExecutorService c= Executors.newCachedThreadPool();
ExecutorService f= Executors.newFixedThreadPool(3);
ScheduledExecutorService s= Executors.newScheduledThreadPool(5);
ExecutorService sin= Executors.newSingleThreadExecutor(); 
// 设置并行级别为2,即默认每时每刻只有2个线程同时执行
ExecutorService m = Executors.newWorkStealingPool(2);

三、不要使用Executors默认创建线程池的方式

另外注意:不要使用Executors默认创建线程池的方式,这可能会导致OOM,因为LinkedBlockingQueue时未指定容量,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。是可以不断的向队列中加入任务的,这种情况下就有可能因为任务过多而导致内存溢出问题。
上面提到的问题主要体现在newFixedThreadPool和newSingleThreadExecutor两个工厂方法上,并不是说newCachedThreadPool和newScheduledThreadPool这两个方法就安全了,这两种方式默认创建的最大线程数可能是Integer.MAX_VALUE,而创建这么多线程,必然就有可能导致OOM。
反例:

1
 ExecutorService executor = Executors.newFixedThreadPool(15);//不要这样做

正例

1
2
ExecutorService executor = new ThreadPoolExecutor(
10, 10,60L, TimeUnit.SECONDS,new ArrayBlockingQueue(10));

线程池类的社会关系

关系

ThreadPoolExecutor——>AbstractExecutorService——>ExecutorService——>Executor
线程池类ThreadPoolExecutor在包java.util.concurrent下, 继承了AbstractExecutorService 抽象类,该抽象类实现了ExecutorService接口,而该接口继承了Executor接口

分析

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
ExecutorService接口声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象类AbstractExecutorService基本实现了ExecutorService中声明的所有方法;
在ThreadPoolExecutor类中有几个非常重要的方法:

  • execute()
  • submit()
  • shutdown()
  • shutdownNow()

execute()可以向线程池提交一个任务。
submit()也是用来向线程池提交任务的,但是它和execute()方法不的是能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。
shutdown()和shutdownNow()是用来关闭线程池的。

线程池的状态(5种)

其中AtomicInteger变量ctl的功能非常强大:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:
1、RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
2、SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
3、STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
4、TIDYING : 2 << COUNT_BITS,即高3位为010,该状态表示线程池对线程进行整理优化;
5、TERMINATED: 3 << COUNT_BITS,即高3位为011,该状态表示线程池停止工作;

实现原理

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;
如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果阻塞队列满了,那就创建新的线程执行当前任务;
直到线程池中的线程数达到maxPoolSize,这时再有任务来,只能执行reject()处理该任务。

如何合理配置线程池的大小?

根据《Java并发编程实战》对于计算密集型的任务,在拥有N个处理器的系统上,当线程池的大小为N+1时,通常能实现最优的效率。(即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保CPU的时钟周期不会被浪费。)

如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
如果是IO密集型任务(比如数据库数据交互、文件上传下载、网络数据传输等等),参考值可以设置为2*NCPU
tips:这行代码可以查看Ncpu
System.out.println(Runtime.getRuntime().availableProcessors());//输出运行时可用处理器

参考博文
这篇比较详细,还有源码解读
关于ThreadFactory与BlockingQueue