Luckylau's Blog

Java并发之Executor框架

基础简介

​ 在Java中,使用线程来异步执行任务 。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源。同时,为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃。Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。

Executors框架

Executors框架结构

​ 在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。

Executor框架主要由3大部分组成如下:

任务。包括被执行任务需要实现的接口:Runnable接口或Callable接口。

任务的执行。包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。

异步计算的结果。包括接口Future和实现Future接口的FutureTask类。

主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object resule))。

然后可以把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command));或者也可以把Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callabletask))。

如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象(到目前为止的JDK中,返回的是FutureTask对象)。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行。

最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

ExecuteService

排队策略

​ 直接提交。缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们。如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。直接提交通常要求无界 maximumPoolSizes (Integer .MAX_VALUE) 以避免拒绝新提交的任务。newCachedThreadPool采用的便是这种策略。

​ 无界队列。使用无界队列(典型的便是采用预定义容量的 LinkedBlockingQueue,理论上是该缓冲队列可以对无限多的任务排队)将导致在所有 corePoolSize 线程都工作的情况下将新任务加入到缓冲队列中。这样,创建的线程就不会超过 corePoolSize,也因此,maximumPoolSize 的值也就无效了。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列。newFixedThreadPool采用的便是这种策略。

​ 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(一般缓冲队列使用ArrayBlockingQueue,并制定队列的最大长度)有助于防止资源耗尽,但是可能较难调整和控制,队列大小和最大池大小需要相互折衷,需要设定合理的参数。

submit和execute

submit有返回值,接受的是Callable任务而execute没有返回值,接受的是Runnable任务。

​ 在Java 5之后,任务分两类:一类是实现了Runnable接口的类,一类是实现了Callable接口的类。两者都可以被ExecutorService执行,但是Runnable任务没有返回值,而Callable任务有返回值。并且Callable的call()方法只能通过ExecutorService的submit(Callable task) 方法来执行,并且返回一个 Future,是表示任务等待完成的 Future。

​ Callable接口类似于Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable 不会返回结果,并且无法抛出经过检查的异常而Callable又返回结果,而且当获取返回结果时可能会抛出异常。Callable中的call()方法类似Runnable的run()方法,区别同样是有返回值,后者没有。

​ 当将一个Callable的对象传递给ExecutorService的submit方法,则该call方法自动在一个线程上执行,并且会返回执行结果Future对象。同样,将Runnable的对象传递给ExecutorService的submit方法,则该run方法自动在一个线程上执行,并且会返回执行结果Future对象,但是在该Future对象上调用get方法,将返回null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCachedThreadPool{
public static void main(String[] args){
ExecutorService executorService = Executors.newCachedThreadPool();
//ExecutorService executorService = Executors.newFixedThreadPool(3);
//ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++){
executorService.execute(new TestRunnable(i));
}
executorService.shutdown();
}
}
class TestRunnable implements Runnable{
private int count ;
public TestRunnable(int count){
this.count=count;
System.out.println("Create Thread-"+count);
}
public void run(){
System.out.println("线程池中的"+Thread.currentThread().getName() + "被调用来处理Thread-"+count);
}
}
//输出:
Create Thread-0
Create Thread-1
Create Thread-2
线程池中的pool-1-thread-1被调用来处理Thread-0
Create Thread-3
线程池中的pool-1-thread-2被调用来处理Thread-1
Create Thread-4
线程池中的pool-1-thread-2被调用来处理Thread-4
线程池中的pool-1-thread-3被调用来处理Thread-2
线程池中的pool-1-thread-4被调用来处理Thread-3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class CallableDemo{
public static void main(String[] args){
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<String>> resultList = new ArrayList<Future<String>>();
//创建10个任务并执行
for (int i = 0; i < 10; i++){
//使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
Future<String> future = executorService.submit(new TaskWithResult(i));
//将任务执行结果存储到List中
resultList.add(future);
}
//遍历任务的结果
for (Future<String> fs : resultList){
try{
while(!fs.isDone());//Future返回如果没有完成,则一直循环等待,直到Future返回完成
System.out.println(fs.get()); //打印各个线程(任务)执行的结果
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}finally{
//启动一次顺序关闭,执行以前提交的任务,但不接受新任务
executorService.shutdown();
}
}
}
}
class TaskWithResult implements Callable<String>{
private int id;
public TaskWithResult(int id){
this.id = id;
}
/**
* 任务的具体过程,一旦任务传给ExecutorService的submit方法,
* 则该方法自动在一个线程上执行
*/
public String call() throws Exception {
System.out.println("Task id="+id+" 的call()方法被" + Thread.currentThread().getName()+"自动调用!!");
//该返回结果将被Future的get方法得到
return "Task id="+id+" 的call()方法被自动调用,任务返回的结果是:" + id + "";
}
}
//输出:
Task id=0 的call()方法被pool-1-thread-1自动调用!!
Task id=2 的call()方法被pool-1-thread-3自动调用!!
Task id=1 的call()方法被pool-1-thread-2自动调用!!
Task id=3 的call()方法被pool-1-thread-4自动调用!!
Task id=4 的call()方法被pool-1-thread-5自动调用!!
Task id=0 的call()方法被自动调用,任务返回的结果是:0
Task id=9 的call()方法被pool-1-thread-1自动调用!!
Task id=6 的call()方法被pool-1-thread-7自动调用!!
Task id=5 的call()方法被pool-1-thread-6自动调用!!
Task id=7 的call()方法被pool-1-thread-8自动调用!!
Task id=1 的call()方法被自动调用,任务返回的结果是:1
Task id=2 的call()方法被自动调用,任务返回的结果是:2
Task id=8 的call()方法被pool-1-thread-9自动调用!!
Task id=3 的call()方法被自动调用,任务返回的结果是:3
Task id=4 的call()方法被自动调用,任务返回的结果是:4
Task id=5 的call()方法被自动调用,任务返回的结果是:5
Task id=6 的call()方法被自动调用,任务返回的结果是:6
Task id=7 的call()方法被自动调用,任务返回的结果是:7
Task id=8 的call()方法被自动调用,任务返回的结果是:8
Task id=9 的call()方法被自动调用,任务返回的结果是:9

​ 从结果中可以同样可以看出,pool-1-thread-1被调用2次处理id=0和id=9的任务,submit也是首先选择空闲线程来执行任务,如果没有,才会创建新的线程来执行任务。另外,需要注意:如果Future的返回尚未完成,则get()方法会阻塞等待,直到Future完成返回,可以通过调用isDone()方法判断Future是否完成了返回。

服务的关闭

​ shutdown()方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务的启动并试图停止当前正在执行的任务。在终止后,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService以允许回收其资源。

Executor框架成员

Executor框架的主要成员:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口和Executors。

ThreadPoolExecutor详解

Executors提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。在了解常用的静态线程池之前,我们先看一下ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {..}

corePoolSize:corePoolSize定义了线程池的基本大小,也就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。

maximumPoolSize:池中允许的最大线程数,表示线程池可同时活动线程数量上限。

keepAliveTime:线程池中的空闲线程所能持续的最长时间。

unit:持续时间的单位。

workQueue:包含Runnable的阻塞队列,当线程池达到基本大小时,新提交的任务将放入这个阻塞队列中,阻塞队列的实现包含三种:无界队列、有界队列以及同步移交队列。

threadFactory参数用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,方便定位问题。

handler参数定义了线程池饱和策略。当有界队列被填满后,并且线程池活动线程达到最大线程数,饱和策略开始生效。JDK提供了几种不同的RejectedExecutionHandler实现,分别是AbortPolicy、DiscardPolicy、DiscardOldestPolicy以及CallerRunsPolicy。AbortPolicy是默认的饱和策略,该策略将抛出未检查的RejectedExecutionException。DiscardPolicy策略会把新提交的任务直接抛弃,而DiscardOldestPolicy策略会抛弃队列首部最老的任务。CallerRunsPolicy策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量,它不会在线程池中的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。

按照如下规则运行:

1、如果线程池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;

2、如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);

3、如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;

4、如果线程池中的线程数量等于了maximumPoolSize,有4种才处理方式(该构造方法调用了含有5个参数的构造方法,并将最后一个构造方法为RejectedExecutionHandler类型,它在处理线程溢出时有4种方式)。 另外,当线程池中的线程数量大于corePoolSize时,如果里面有线程的空闲时间超过了keepAliveTime,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest{
public static void main(String[] args){
//创建等待队列
BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);
//创建线程池,池中保存的线程数为3,允许的最大线程数为5
ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);
//创建七个任务
Runnable t1 = new MyThreads("t1");
Runnable t2 = new MyThreads("t2");
Runnable t3 = new MyThreads("t3");
Runnable t4 = new MyThreads("t4");
Runnable t5 = new MyThreads("t5");
Runnable t6 = new MyThreads("t6");
Runnable t7 = new MyThreads("t7");
//每个任务会在一个线程上执行
pool.execute(t1);
pool.execute(t2);
pool.execute(t3);
pool.execute(t4);
pool.execute(t5);
pool.execute(t6);
pool.execute(t7);
//关闭线程池
pool.shutdown();
}
}
class MyThreads implements Runnable{
private String name;
public String getName() {
return name;
}
public MyThreads(String name) {
// TODO Auto-generated constructor stub
this.name=name;
}
@Override
public void run(){
System.out.println(Thread.currentThread().getName() + "正在执行 "+this.getName());
try{
Thread.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
//输出
pool-1-thread-2正在执行 t2
pool-1-thread-3正在执行 t3
pool-1-thread-1正在执行 t1
pool-1-thread-1正在执行 t4
pool-1-thread-3正在执行 t6
pool-1-thread-2正在执行 t5
pool-1-thread-3正在执行 t7

CachedThreadPool详解

public static ExecutorService newCachedThreadPool()

​ CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置Integer .MAX_VALUE,即maximumPool是无界的。这里把keepAliveTime设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。 CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。缓存型池子通常用于执行一些生存期很短的异步型任务 ,因此在一些面向连接的daemon型SERVER中用得不多。

1
2
3
4
5
6
7
8
9
10
11
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成;否则执行下面的步骤。

当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue. poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤1)将失败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成。

在步骤2)中新创建的线程将任务执行完后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit .NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执行步骤1)),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。

​ SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主线程提交的任务传递给空闲线程执行。

FixedThreadPool详解

public static ExecutorService newFixedThreadPool(int nThreads)

1
2
3
4
5
6
7
8
9
10
11
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

​ FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads 。当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间 ,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。在任意点,大多数 nThreads 线程会处于处理任务的活动状态,如果在所有线程处于活动状态时提交附加任务 , 则在有可用线程之前,附加任务将在队列中等待。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。

在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue。

线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。

FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer .MAX_VALUE)。使用无界队列作为工作队列会对线程池带来如下影响:

当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize。

由于1,使用无界队列时maximumPoolSize将是一个无效参数。

由于1和2,使用无界队列时keepAliveTime将是一个无效参数。

由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或shutdownNow())不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)。

SingleThreadExecutor详解

SingleThreadExecutor是使用单个worker线程的Executor。

public static ExecutorService newSingleThreadExecutor()

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

​ 创建一个单线程化的Executor。 单例线程,任意时间池中只能有一个线程,用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)。

ScheduledThreadPoolExecutor详解

ScheduledThreadPoolExecutor运行机制

​ ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。 DelayQueue是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThread Pool Executors中没有什么意义。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

ScheduledThreadPoolExecutor的执行主要分为两大部分:当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFutur接口的ScheduledFutureTask;线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。其中ScheduleAtFixedRate为每次执行时间为上一次任务开始起向后推一个时间间隔;ScheduleWithFixedDelay为每次执行时间为上一次任务结束起向后推一个时间间隔。由此可见,ScheduleAtFixedRate是基于固定时间间隔进行任务调度,ScheduleWithFixedDelay 取决于每次任务执行的时间长短,是基于不固定时间间隔进行任务调度。

​ 对于Timer类而言, 它实现任务调度的核心类是Timer 和 TimerTask。其中 Timer 负责设定 TimerTask 的起始与间隔执行时间。使用者只需要创建一个 TimerTask的继承类,实现自己的 run 方法,然后将其丢给 Timer 去执行即可。Timer 的设计核心是一个 TaskList和一个 TaskThread。Timer 将接收到的任务丢到自己的 TaskList 中,TaskList 按照 Task的最初执行时间进行排序。TimerThread 在创建 Timer时会启动成为一个守护线程。这个线程会轮询所有任务,找到一个最近要执行的任务,然后休眠,当到达最近要执行任务的开始时间点,TimerThread被唤醒并执行该任务。之后 TimerThread 更新最近一个要执行的任务,继续休眠。Timer的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.Timer;
import java.util.TimerTask;
public class TimerTest extends TimerTask{
private String jobName = "";
private int num;
public TimerTest(String jobName , int num) {
this.jobName = jobName;
this.num = num;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("execute " + jobName);
try {
int i = 7/num;
System.out.println(i);
num--;
} catch (Exception e) {
// TODO: handle exception
System.out.println(e.getMessage());
}
}
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTest("job1",2), 1000, 1000);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class TimerTest implements Runnable{
private String jobName = "";
private int num;
public TimerTest(String jobName , int num) {
this.jobName = jobName;
this.num = num;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("execute " + jobName);
try {
int i = 7/num;
System.out.println(i);
num--;
} catch (Exception e) {
// TODO: handle exception
System.out.println(e.getMessage());
}
}
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.scheduleAtFixedRate(new TimerTest("job1",2), 1000, 1000, TimeUnit.MILLISECONDS);
service.scheduleWithFixedDelay(new TimerTest("job2",2), 1000, 1000, TimeUnit.MILLISECONDS);
}
}

ScheduledThreadPoolExecutor的实现

ScheduledThreadPoolExecutor会把待调度的任务(ScheduledFutureTask)放到一个DelayQueue中。ScheduledFutureTask主要包含3个成员变量,如下。

long型成员变量time,表示这个任务将要被执行的具体时间。

long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号。

long型成员变量period,表示任务执行的间隔周期。

DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。

线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指Scheduled FutureTask的time大于等于当前时间。

线程1执行这个ScheduledFutureTask。

线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。

线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。

参考文献

http://blog.csdn.net/ns_code/article/details/17465497

http://blog.csdn.net/linghu_java/article/details/17123057

http://blog.csdn.net/bairrfhoinn/article/details/16848785

http://zhangjunhd.blog.51cto.com/113473/70068/

http://www.cnblogs.com/wanqieddy/p/3853863.html

http://blog.csdn.net/defonds/article/details/9715455

《java并发编程艺术》

修订 时间
初始化章节 2017-02-23
丰富内容,梳理章节 2017-09-11
Luckylau wechat
如果对您有价值,看官可以打赏的!