Java线程池类

线程池的优点

  1. 线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用。

  2. 可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃。

线程池的原理

当我们把一个Runnable交给线程池去执行的时候,这个线程池处理的流程是这样的:

  1. 当线程数小于corePoolSize时,创建线程执行任务。
  2. 当线程数大于等于corePoolSize并且workQueue没有满时,放入workQueue中
  3. 线程数大于等于corePoolSize并且当workQueue满时,新任务新建线程运行,线程总数要小于maximumPoolSize
  4. 当线程总数等于maximumPoolSize并且workQueue满了的时候执行handler的rejectedExecution。也就是拒绝策略。

任务拒接策略(RejectedExecutionHandler)

当队列和线程池都满了的时候,再有新的任务到达,就必须要有一种办法来处理新来的任务。Java线程池中提供了以下四种策略:

  1. AbortPolicy: 直接抛异常
  2. CallerRunsPolicy:让调用者帮着跑这个任务
  3. DiscardOldestPolicy:会抛弃任务队列中最旧的任务,再把这个新任务添加进去。
  4. DiscardPolicy:不处理,直接扔掉

ExecutorService

Java中的线程池类有两个,分别是:ThreadPoolExecutor和ScheduledThreadPoolExecutor,这两个类都继承自ExecutorService。利用这两个类,可以创建各种不同的Java线程池,为了方便我们创建线程池,Java API提供了Executors工厂类来帮助我们创建各种各样的线程池。

Java线程池ExecutorService继承树:

ExecutorService继承树

ExecutorService的方法

execute(Runnable)

这个方法接收一个Runnable实例,并且异步的执行,例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ExecutorService executorService = Executors.newSingleThreadExecutor();

//java 7
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});

//java 8 lambda
executorService.execute(() -> {
System.out.println("Asynchronous task");
});

executorService.shutdown();

这个方法有个问题,就是没有办法获知task的执行结果。如果我们想获得task的执行结果,我们可以传入一个Callable的实例。

submit(Runnable)

submit(Runnable)可以返回一个Future对象,通过返回的Future对象,我们可以检查提交的任务是否执行完毕,例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
//java 7
Future future = executorService.submit(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});

//java 8 lambda
Future future = executorService.execute(() -> {
System.out.println("Asynchronous task");
});

future.get(); //returns null if the task has finished correctly.

如果任务执行完成,future.get()方法会返回一个null。注意,future.get()方法会产生阻塞。

submit(Callable)

submit(Callable)和submit(Runnable)类似,也会返回一个Future对象,但是除此之外,submit(Callable)接收的是一个Callable的实现,Callable接口中的call()方法有一个返回值,可以返回任务的执行结果,而Runnable接口中的run()方法是void的,没有返回值。例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//java 7
Future future = executorService.submit(new Callable(){
public Object call() throws Exception {
System.out.println("Asynchronous Callable");
return "Callable Result";
}
});

//java 8 lambda
Future<String> future = executorService.submit(() -> {
System.out.println("Asynchronous Callable");
return "Callable Result";
});


System.out.println("future done? " + future.isDone());

String result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);

如果任务执行完成,future.get()方法会返回Callable任务的执行结果。注意,future.get()方法会产生阻塞。

Timeouts

任何future.get()调用都会阻塞,然后等待直到callable中止。在最糟糕的情况下,一个callable持续运行——因此使你的程序将没有响应。我们可以简单的传入一个时长来避免这种情况。

1
2
3
4
5
6
7
8
9
10
11
12
ExecutorService executor = Executors.newFixedThreadPool(1);

Future<Integer> future = executor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(2);
return 123;
} catch (InterruptedException e) {
throw new IllegalStateException("task interrupted", e);
}
});

future.get(1, TimeUnit.SECONDS);

运行上面的代码将会产生一个TimeoutException:

1
2
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)

我们指定的最长等待时间为1分钟,而这个callable在返回结果之前实际需要两分钟。

invokeAny(…)

invokeAny(…)方法接收的是一个Callable的集合,执行这个方法不会返回Future,但是会返回所有Callable任务中任意一个成功的结果(如果没有异常发生),如果有正常或者异常的返回,那么那些没有完成的任务将会取消。例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
ExecutorService executorService = Executors.newSingleThreadExecutor();

//java 7
Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});

//java 8
List<Callable<String>> callables = Arrays.asList(
() -> "task2",
() -> "task1",
() -> "task3"
);

String result = executorService.invokeAny(callables);
System.out.println("result = " + result);
executorService.shutdown();

执行上面代码,每次执行都会返回一个结果,并且返回的结果是变化的。

invokeAll(…)

invokeAll(…)与 invokeAny(…)类似也是接收一个Callable集合,但是前者执行之后会返回一个Future的List,其中对应着每个Callable任务执行后的Future对象。例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});

List<Future<String>> futures = executorService.invokeAll(callables);

for(Future<String> future : futures){
System.out.println("future.get = " + future.get());
}

executorService.shutdown();

ExecutorService的关闭

当我们使用完成ExecutorService之后应该关闭它,否则它里面的线程会一直处于运行状态。

举个例子,如果的应用程序是通过main()方法启动的,在这个main()退出之后,如果应用程序中的ExecutorService没有关闭,这个应用将一直运行。之所以会出现这种情况,是因为ExecutorService中运行的线程会阻止JVM关闭。

如果要关闭ExecutorService中执行的线程,我们可以调用 ExecutorService.shutdown() 方法。在调用shutdown()方法之后,ExecutorService不会立即关闭,但是它不再接收新的任务,直到当前所有线程执行完成才会关闭,所有在shutdown()执行之前提交的任务都会被执行。

如果我们想立即关闭ExecutorService,我们可以调用 ExecutorService.shutdownNow()方法 。这个动作将跳过所有正在执行的任务和被提交还没有执行的任务。但是它并不对正在执行的任务做任何保证,有可能它们都会停止,也有可能执行完成。

通常关闭executors的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
try {
System.out.println("attempt to shutdown executor");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.err.println("tasks interrupted");
} finally {
if (!executor.isTerminated()) {
System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}

executor通过等待指定的时间让当前执行的任务终止来“温柔的”关闭executor。在等待最长N分钟的时间后,execuote最终会通过中断所有的正在执行的任务关闭。

Executors

Executors的工厂方法提供的5种不同的线程池。Executors只是一个工厂类,它所有的方法返回的都是ThreadPoolExecutor、ScheduledThreadPoolExecutor这两个类的实例。

重载后的版本,需要多传入实现了ThreadFactory接口的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1. newFixedThreadPool(int nThreads)

创建可重用固定大小(nThreads,大小不能超过int的最大值)的线程池,以共享的无界队列方式来运行这些线程,缓冲任务的队列为LinkedBlockingQueue,大小为整型的最大数。

当使用此线程池时,在同执行的任务数量超过传入的线程池大小值后,将会放入LinkedBlockingQueue,在LinkedBlockingQueue中的任务需要等待线程空闲后再执行,如果放入LinkedBlockingQueue中的任务超过整型的最大数时,抛出RejectedExecutionException。

2. newCachedThreadPool()

缓存线程池大小是不定值,可以需要创建不同数量的线程。

在使用缓存型池时,先查看池中有没有空闲的线程,如果有,就复用。如果没有,就新建新的线程加入池中。

缓存型池子通常用于执行一些生存期很短的异步型任务。

3. newSingleThreadExecutor()

创建大小为1的固定线程池,同时执行任务(task)的只有一个,其它的任务(task)都放在LinkedBlockingQueue中排队等待执行。

4. newScheduledThreadPool(int corePoolSize)

该方法返回一个可以控制线程池内线程延时或周期性执行某任务的线程池。

5. newSingleThreadScheduledExecutor()

该方法返回一个可以控制线程池内线程定时或周期性执行某任务的线程池。只不过和上面的区别是该线程池大小为1,而上面的可以指定线程池的大小。

ThreadPoolExecutor

先来通过ThreadPoolExecutor的构造方法了解一下这个类:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

主要参数有:

corePoolSize 核心线程的数量,不超过这个参数数量的线程会被保留在线程池内,即使它们是空闲的,如果设置了allowCoreThreadTimeOut为true除外。

maximumPoolSize 线程池所允许拥有线程的最大数量,当任务队列的任务已满,线程数已达到最大数量,任务会被拒绝。

keepAliveTime 当线程池的线程数量超过核心线程的数量,这些非核心线程会尝试在keepAliveTime内获取队列内的任务,如果获取失败则被线程池移除并终止。

unit 超时时间的单位。

workQueue 任务的阻塞队列,缓存将要执行的Runnable任务,由各线程轮询该任务队列获取任务执行。

threadFactory 线程创建的工厂。

handler 当任务由于线程数量或者任务队列达到上限,会执行该接口的方法处理任务的拒绝。

ThreadPoolExecutor的状态变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl是ThreadPoolExecutor的同步状态变量。

workerCountOf()方法取得当前线程池的线程数量,算法是将ctl的值取低29位。

runStateOf()方法取得线程池的状态,算法是将ctl的值取高3位:

  1. RUNNING 111 表示正在运行
  2. SHUTDOWN 000 表示拒绝接收新的任务
  3. STOP 001 表示拒绝接收新的任务并且不再处理任务队列中剩余的任务,并且中断正在执行的任务。
  4. TIDYING 010 表示所有线程已停止,准备执行terminated()方法。
  5. TERMINATED 011 表示已执行完terminated()方法。

Executor.execute(Runnable command)

该方法将使用线程池执行Runnable对象的run()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or rated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

以上代码对应了三种情况:

  1. 线程池的线程数量小于核心线程数量上限,开启核心线程执行任务。
  2. 线程池的线程数量不小于核心线程数量上限,或者开启核心线程失败,尝试将任务以非阻塞的方式添加到任务队列。
  3. 任务队列以满导致添加任务失败,开启新的非核心线程执行任务。

回顾FixedThreadPool,因为它配置的corePoolSize与maximumPoolSize相等,所以不会执行到情况3,并且因为workQueue为默认的LinkedBlockingQueue,其长度为Integer.MAX_VALUE,几乎不可能出现任务无法被添加到workQueue的情况,所以FixedThreadPool的所有任务执行在核心线程中。

而CachedThreadPool的corePoolSize为0,表示它不会执行到情况1,因为它的maximumPoolSize为Integer.MAX_VALUE,所以几乎没有线程数量上限,因为它的workQueue为SynchronousQueue,所以当线程池里没有闲置的线程SynchronousQueue就会添加任务失败,因此会执行到情况3添加新的线程执行任务。

addWorker方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is table
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

addWorker这个方法先尝试在线程池运行状态为RUNNING并且线程数量未达上限的情况下通过CAS操作将线程池数量+1,接着在ReentrantLock同步锁的同步保证下判断线程池为运行状态,然后把Worker添加到HashSet workers中。如果添加成功则执行Worker的内部线程。

Worker构造方法

1
2
3
4
5
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

这里指定了第一个要执行的任务,并通过线程池的线程工厂创建线程。可以发现这个线程的参数为this,即Worker对象,因为Worker实现了Runnable因此可以被当成任务执行,执行的即Worker实现的run方法:

1
2
3
public void run() {
runWorker(this);
}

runWorker方法

因为Worker为ThreadPoolExecutor的内部类,因此runWorker方法实际是ThreadPoolExecutor定义的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

这个方法是线程池复用线程的核心代码,注意Worker继承了AbstractQueuedSynchronizer,在执行每个任务前通过lock方法加锁,执行完后通过unlock方法解锁,这种机制用来防止运行中的任务被中断。在执行任务时先尝试获取firstTask,即构造方法传入的Runnable对象,然后尝试从getTask方法中获取任务队列中的任务。在任务执行前还要再次判断线程池是否已经处于STOP状态或者线程被中断。

注意这里w.lock方法是在获取到任务后才执行的,也就是如果线程获取到任务前都未加锁,这样能保证showDown方法尝试获取该锁中断空闲的线程,详见后面的解析。

当线程被中断、抛出异常、不能及时得到任务,processWorkerExit方法用于最后将线程回收。

getTask方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

还记得Executor.execute方法的情况是将任务添加到任务队列,getTask方法就是从任务队列中同步地取出任务。

这个方法通过一个循环不断轮询任务队列有没有任务到来,首先判断线程池是否处于正常运行状态,通过超时配置有两种方法取出任务:

  1. BlockingQueue.poll 阻塞指定的时间尝试获取任务,如果超过指定的时间还未获取到任务就返回null。
  2. BlockingQueue.take 这种方法会在取到任务前一直阻塞。

FixedThreadPool使用的是take方法,所以会线程会一直阻塞等待任务。CachedThreadPool使用的是poll方法,也就是说CachedThreadPool中的线程如果在60秒内未获取到队列中的任务就会被终止。

到此ThreadPoolExecutor是怎么执行Runnable任务的分析结束。

ExecutorService.shutdown()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

ExecutorService是Executor的子类,也是ThreadPoolExecutor的基类。首先通过mainLock加锁同步修改线程池状态为SHUTDOWN,然后通过interruptIdleWorkers方法中断空闲线程,OnShowDown方法是留给子类去实现的。

interruptIdleWorkers(boolean onlyOne)方法也是先用mainLock加锁同步,然后循环找出所有Worker中Thread未中断的,通过tryLock方法尝试获取锁。还记得上文的runWorker方法Worker的锁是在获取任务时才加的,interruptIdleWorkers方法通过竞争该锁抢先中断线程,这样就导致未执行任务的线程被中断了,而正在执行任务的线程不受影响,并且可以继续执行任务队列中的任务。

ExecutorService.shutdownNow()

与ExecutorService.shutdown()不同的是,shutdownNow方法除了让线程池拒绝接收新的任务,并且不再执行任务队列里未执行的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

首先mainLock同步将状态改为STOP,然后中断所有线程。

1
2
3
4
5
6
7
8
9
10
11
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

interruptWorkers方法将对所有Worker执行interruptIfStarted,即将所有运行中的线程中断:

1
2
3
4
5
6
7
8
9
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}

还记得Worker的构造函数中执行了setState(-1),而在runWorker方法中通过unlock将state改为0,因此可以被interruptWorkers方法中断。

这里注意的是中断并不意味着线程就一定停止工作,除非在任务中捕获InterruptedException退出任务。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor同ThreadPoolExecutor一样也可以从 Executors线程池工厂创建,所不同的是它具有定时执行,以周期或间隔循环执行任务等功能。

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,因此它具有ThreadPoolExecutor的所有能力。
通过super方法的参数可知,核心线程的数量即传入的参数,而线程池的线程数为Integer.MAX_VALUE,几乎为无上限。
这里采用了DelayedWorkQueue任务队列,也是定时任务的核心,留在后面分析。

ScheduledThreadPoolExecutor实现了ScheduledExecutorService 中的接口:

1
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

延时执行Callable任务的功能。

1
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

延时执行Runnable任务的功能。

1
2
3
4
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

可以延时循环执行周期性任务。

假设任务执行时间固定2s,period为1s,因为任务的执行时间大于规定的period,所以任务会每隔2s(任务执行时间)开始执行一次。如果任务执行时间固定为0.5s,period为1s,因为任务执行时间小于period,所以任务会每隔1s(period)开始执行一次。实际任务的执行时间即可能是大于period的,也可能小于period,scheduleAtFixedRate的好处就是每次任务的开始时间间隔必然大于等于period。

四种执行定时任务的方法

1
2
3
4
5
6
7
schedule(Runnable command,long delay, TimeUnit unit)

schedule(Callable<V> callable, long delay, TimeUnit unit)

scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)