Fork me on GitHub

读源码后把自实现的线程池改成了这样

这个标题我取得一股营销号味道hhh,很早之前学习多线程的时候有自己写过一个简易的线程池,最近读了源码发现自己以前写的代码有很多可以改进的地方。我尤其喜欢用迭代的方式去学习——先实现,然后深入学习,最后改进之前实现的,完成一个迭代。因为这种方式更能看到自己的思路的过程,更明白所以然。好了,闲话不多说。开撸。

本文分为三个部分:
一、如何实现一个简单的线程池?
二、jdk线程池源码分析
三、改进自实现的线程池

一、如何实现一个简单的线程池?

思路分析:线程池其实就是一个生产者消费者模型。

首先我们需要一个队列作为任务容器作为存放任务, 还需要多个线程去执行任务, 消费者线程不断查询任务队列是否有任务,如果没有该线程等待,如果有任务,取出一个任务,唤醒所有在等待获取队列的线程,释放掉锁。执行任务线程的run方法。

分析完毕,在开始写代码之前想想具体做法。

  • 任务容器,用LinkedList实现。
  • add方法,用于把任务线程放入任务容器。
  • 构造函数,一次性启动3个 消费者线程。
  • 任务:一个有run方法的线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class ThreadPool {
// 线程池大小
private int corePoolSize;
// 任务容器
private LinkedList<Runnable> workQueue = new LinkedList<Runnable>();
private volatile boolean RUNNING =true;
public ThreadPool(int corePoolSize) {
//创建时候初始化线程
this.corePoolSize = corePoolSize;
synchronized (workQueue) {
for (int i = 0; i < corePoolSize; i++) {
new Worker("线程" + i).start();
}
}
}
//将任务线程放入线程池
public void execute(Runnable r) {
synchronized (workQueue) {
workQueue.addLast(r);
//放入后唤醒所有wait线程
workQueue.notifyAll();
}
}

class Worker extends Thread {
Runnable task;
public Worker(String name) {
super(name);
}
@Override
public void run() {
//锁住任务队列,试图取任务,失败则该线程wait
while (RUNNING) {
synchronized (workQueue) {
while (workQueue.isEmpty()) {
try {
workQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
task = workQueue.removeLast();
workQueue.notifyAll();
}
//取到线程即可释放锁
System.out.println(this.getName() + "获得到并且开始执行任务");
task.run();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TestThreadPool {
public static void main(String[] args) throws InterruptedException {
//线程池大小初始化为3
ThreadPool threadPool=new ThreadPool(3);
//生成10个任务放入线程池
for (int i = 0; i < 10; i++) {
Runnable task= new Runnable(){
@Override
public void run() {
System.out.println("做任务中。。。");
}
};
threadPool.add(task);
Thread.sleep(1000);
}
}
}

启动TestThreadPool,输出

大功告成。

注意

ConsumerThread类中这段代码:

1
2
3
4
5
synchronized (tasks) {
//试图取任务。。。
}
//取到线程即可释放锁
task.run();

从任务中取任务时候是获取任务队列锁的,但执行任务的时候要先释放锁。如果不释放就去执行,那它执行的过程中其他线程都得等着,相当于一次只能有一个线程进行任务

2019年06月13号注:最近读书《java开发手册》OOP规约第11条:构造方法里禁止加入任何业务逻辑,如果有初始化逻辑,请放在init中。上面代码违背了这个规约。我这里也不改了,做个反例。再接下来的代码中改进

二、jdk线程池源码分析

线程池的重要方法

execute()接受任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 表示 “线程池状态” 和 “线程数” 的整数
int c = ctl.get();
//分为3种情况
// 情况1:如果当前线程数少于corePoolSize,直接新创建一个 worker 线程,并把当前 command 作为这个线程firstTask
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
// 添加任务成功
return;
// 返回 false 代表线程池不允许提交任务
c = ctl.get();
}
// 情况2:要么当前线程数大于等于corePoolSize,要么刚刚 addWorker 失败了
// 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//下面这个分支的意图是:担心任务提交到队列中了,但是线程都关闭了
// 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 情况3:如果 workQueue 队列满了,以 maximumPoolSize 为界创建新的 worker,
else if (!addWorker(command, false))
// 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
reject(command);
}

解释一下这行代码, 这个方法会返回一个表示 “线程池状态” 和 “线程数” 的整数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//一开始的状态是ctlOf(RUNNING, 0)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// COUNT_BITS 设置为 29(32-3),意味着前三位用于存放线程状态,后29位用于存放线程数
private static final int COUNT_BITS = Integer.SIZE - 3;

// 000 11111111111111111111111111111
// 这里得到的是 29 个 1,也就是说线程池的最大线程数是 2^29-1=536870911
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 运算结果为 111跟29个0:111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;

// 将整数 c 的低 29 位修改为 0,就得到了线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 将整数 c 的高 3 为修改为 0,就得到了线程池中的线程数
private static int workerCountOf(int c) { return c & CAPACITY; }

线程池的状态

  • RUNNING:接受新的任务,处理等待队列中的任务
  • SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务
  • STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
  • TIDYING:所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
  • TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个

重点区分SHUTDOWN和STOP。

在提一下关闭线程池的方法。

shutdown():设置 线程池的状态 为 SHUTDOWN,然后中断所有没有正在执行任务的线程

shutdownNow():设置 线程池的状态 为 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表
使用建议:一般调用shutdown()关闭线程池;若任务不一定要执行完,则调用shutdownNow()

addWorker()方法:增加工作线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// 参数一:第一个任务。参数二:是否使用核心线程数 corePoolSize 作为创建线程的界限
private boolean addWorker(Runnable firstTask, boolean core) {
//打破多重循环的关键字retry
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

//下面代码的意图:担心创建线程的时候,线程池已经关闭了
// 如满足以下条件之一,那么不创建新的 worker:
// 1. 线程池状态大于 SHUTDOWN,其实也就是 STOP, TIDYING, 或 TERMINATED
// 2. firstTask != null
// 3. workQueue.isEmpty()
// SHUTDOWN 的语义:不允许提交新的任务,但是要把已经进入到 workQueue 的任务执行完,所以在满足条件的基础上,是允许创建新的 Worker 的
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 如果成功,那么就是所有创建线程前的条件校验都满足了,准备创建线程执行任务了
// 这里失败的话,说明有其他线程也在尝试往线程池中创建线程
if (compareAndIncrementWorkerCount(c))
//跳出多重循环
break retry;
// 由于有并发,重新再读取一下 ctl
c = ctl.get();
// 正常如果是 CAS 失败的话,进到下一个里层的for循环就可以了
// 可是如果是因为其他线程导致线程池的状态发生了变更,比如有其他线程关闭了这个线程池
// 那么需要回到外层的for循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

/*
* 啊终于可以开始创建线程了
*/

// worker 是否已经启动
boolean workerStarted = false;
// 是否已将这个 worker 添加到 workers 这个 HashSet 中
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
// 把 firstTask 传给 worker 的构造方法
w = new Worker(firstTask);
// 取 worker 中的线程对象,Worker的构造方法会调用 ThreadFactory 来创建一个新的线程
final Thread t = w.thread;
if (t != null) {
// 这个是整个线程池的全局锁,持有这个锁才能让下面的操作“顺理成章”,
// 因为关闭一个线程池需要这个锁,至少我持有锁的期间,线程池不会被关闭
mainLock.lock();
try {

int c = ctl.get();
int rs = runStateOf(c);

// 小于 SHUTTDOWN 那就是 RUNNING
// 如果等于 SHUTDOWN,不接受新的任务,但是会继续执行等待队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// worker 里面的 thread 可不能是已经启动的
if (t.isAlive())
throw new IllegalThreadStateException();
// 加到 workers 这个 HashSet 中
workers.add(w);
int s = workers.size();
// largestPoolSize 用于记录 workers 中的个数的最大值
// 因为 workers 是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加成功的话,启动这个线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程没有启动,需要做一些清理工作,如前面 workCount 加了 1,将其减掉
if (! workerStarted)
addWorkerFailed(w);
}
// 返回线程是否启动成功
return workerStarted;
}

总结

线程池的线程创建时机

  • 线程池是懒加载的,声明的时候并不会创建好线程等待任务,而是当提交第一个任务时才去新建线程;
  • 当提交一个任务时,如果当前线程数小于corePoolSize,就直接创建一个新线程执行任务;
  • 如果当前线程数大于corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
  • 如果阻塞队列满了,并且当前线程数小于maxPoolSize,那就创建新的线程执行当前任务;
  • 如果池里的线程数大于maxPoolSize,这时再有任务来,只能调用拒绝策略

工厂模式

从线程池的创建讲起,线程池有两种创建方式。

一是ThreadPoolExecutor threadPool= new ThreadPoolExecutor(10, 15, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

二是executorService= Executors.newFixedThreadPool(3);

第二钟方式用到了工厂模式,点进方法newFixedThreadPool(3)去看是返回一个定义好初始化的参数线程池实例,代码如下:

1
2
3
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L,
TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}

第二种方法最终还是会导向方法一ThreadPoolExecutor的构造方法ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}

工厂模式的好处是:

  • 用户只要知道名字就行了,比如我想创建一个固定长度的线程池,只需要调用newFixedThreadPool()方法,而不需要在了解ThreadPoolExecutor构造函数之后自己new一个配置一堆参数。不需要关注太多细节。
  • 用工厂模式可以消除循环依赖,如果一个类构造函数的参数变了,所有实例化这个类的代码都得改。

阻塞队列

上一篇我是用synchronized锁住任务队列,工作线程wait和notify去实现线程阻塞去任务队列去任务,jdk是用LinkedBlockingQueue的put和take方法来实现。如果put或者take操作无法立即执行,这两个方法调用将会发生阻塞,直到能够执行。

抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用

延迟加载Lazy_load

线程池是懒加载的,声明的时候并不会创建好线程等待任务,而是当提交第一个任务时才去新建线程;

三、改进自实现的线程池

改进

1.实现延迟加载。我们是在构造函数创建线程,而java线程池只有当提交一个任务时,线程池才会创建一个新线程执行任务,直到当前线程数等于corePoolSize。这样会更节约资源。

2.这里我们用LinkedList加线程wait()notify()来实现,也可以像java线程池一样用阻塞链表LinkedBlockingQueue来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadPool {
//工作线程数
private int workerCount;
// 线程池大小
private int corePoolSize;
// 任务容器
private BlockingQueue<Runnable> workQueue;
//工作线程容器
private Set<Worker> workers;
// 任务容器

private static ThreadPool threadPool;

public static ThreadPool newThreadPool() {
threadPool = new ThreadPool(3);
return threadPool;
}

private ThreadPool(int corePoolSize) {
this.corePoolSize = corePoolSize;
//LinkedBlockingQueue的默认容量是Integer.MAX_VALUE,一直加任务线程可能会造成OOM内存溢出
workQueue = new LinkedBlockingQueue<>(1024);//这里指定大小
workers = new HashSet<>();
}

public void execute(Runnable r) {
if (workerCount < corePoolSize) {
addWorker(r);
} else {
try {
workQueue.put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private void addWorker(Runnable r) {
workerCount++;
Worker worker = new Worker(r);
Thread t = worker.thread;
workers.add(worker);
t.start();
}

class Worker implements Runnable {
Runnable task;
Thread thread;

public Worker(Runnable task) {
this.task = task;
this.thread = new Thread(this);
}

@Override
public void run() {
while (true) {
Runnable task = this.task;
// 执行当前的任务,所以把这个任务置空,以免造成死循环
this.task = null;
if (task != null || (task = getTask()) != null) {
task.run();
}
}
}
}

private Runnable getTask() {
Runnable r = null;
try {
r = workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return r;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TestThreadPool {
public static void main(String[] args) throws InterruptedException {

ThreadPool threadPool=ThreadPool.newThreadPool();
//生成10个任务放入线程池
for (int i = 0; i < 10; i++) {
Runnable task= new Runnable(){
@Override
public void run() {
System.out.println("做任务中。。。");
}
};
threadPool.execute(task);
Thread.sleep(1000);
}
}
}

参考

模式方面菜鸟教程写的不错

Java高并发之BlockingQueue

https://blog.csdn.net/GitHub_boy/article/details/51046574