Luckylau's Blog

Runnable,Callable及Future,FutureTask的用法

内容 时间
初始化 2017-06-22
更新FutureTask 2017-09-11

Runnable

Runnable是个接口,在它里面只声明了一个run()方法:

1
2
3
public interface Runnable {
public abstract void run();
}

使用很简单:

1.实现该接口并重写run方法 2.利用该类的对象创建线程 3.线程启动时就会自动调用该对象的run方法

相对于继承Thread来创建线程方式,使用Runnable可以让你的实现类同时实现多个接口,而相对于Callable及Future,Runnable方法并不返回任务执行结果且不能抛出异常。

通常在开发中结合ExecutorService使用,将任务的提交与任务的执行解耦开,同时也能更好地利用Executor提供的各种特性。

1
2
3
4
5
6
7
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(new Runnable() {
public void run() {
//TODO
}
});
executor.shutdown();

Callable

它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call():

1
2
3
4
5
6
7
8
9
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

Callable并不像Runnable那样通过Thread的start方法就能启动实现类的run方法,所以它通常利用ExecutorService的submit方法去启动call方法自执行任务,而ExecutorService的submit又返回一个Future类型的结果,因此Callable通常也与Future一起使用。

1
2
3
4
5
6
ExecutorService pool = Executors.newCachedThreadPool();
Future<String> future = pool.submit(new Callable{
public void call(){
//TODO
}
});

或者利用FutureTask封装Callable再由Thread去启动(少用)

1
2
3
4
5
6
7
FutureTask<String> task = new FutureTask(new Callable{
public void call(){
//TODO
}
});
Thead thread = new Thread(task);
thread.start();

通过Executors.callable(Runnable task,T result)可以执行Runnable并返回”结果”,但是这个结果并不是Runnable的执行结果(Runnable的run方法是void类型),而是执行者预定义的结果,这点可以从其实现原理RunnableAdpter源码看出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);//通过RunnableAdapter实现
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result; //将传入的结果的直接返回
}
}

Future

它也是一个接口,主要对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。Future提供了以下几个方法。

1
2
3
4
5
6
7
8
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。

isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true;

isDone方法表示任务是否已经完成,若任务完成,则返回true;

get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;

get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

FutureTask

FutureTask是Future接口的一个唯一实现类。FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。

我们查看源码发现FutureTask类实现了RunnableFuture接口,而RunnableFuture继承了Runnable接口和Future接口。

1
2
3
4
public class FutureTask<V> implements RunnableFuture<V>{}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

FutureTask提供了2个构造器:

1
2
3
4
public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}

FutureTask 有两个很重要的属性 分别是 state 和runner ,futureTask之所以可以支持cancel操作 就是因为这两个属性
其中 state为 枚举值:

1
2
3
4
5
6
7
NEW 新建 0
COMPLETING 执行中 1
NORMAL 正常 2
EXCEPTIONAL 异常 3
CANCELLED 取消 4
INTERRUPTING 中断中 5
INTERRUNPED 被中断 6

state的状态变化可以有四种方式:

1
2
3
4
NEW->COMPLETING->NORMAL 正常完成的流程
NEW->COMPLETING->EXCEPTIONAL 出现异常的流程
NEW->CANCELED 被取消
NEW->INTERRUNPING->INTERRRUNPTED 被中断

FutureTask执行多任务计算的使用场景

示例一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
public class FutureTaskForMultiCompute {
public static void main(String[] args) {
// TODO Auto-generated method stub
List<FutureTask<Result>> taskList = new ArrayList<FutureTask<Result>>();
ExecutorService exec = Executors.newFixedThreadPool(5);
for(int i = 0 ; i < 10; i++) {
final Result result = new Result();
FutureTask<Result> ft =new FutureTask<Result>(new ComputeTask(result,i+"号线程"),result);
taskList.add(ft);
exec.submit(ft);
}
System.out.println("所有计算任务提交完毕, 主线程接着干其他事情!");
System.out.println("我在等待他们的返回结果....");
long total = 0;
for(FutureTask<Result> ft:taskList){
try {
Result res=ft.get();
System.out.println(res.toString());
total +=res.getDuration();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
exec.shutdown();
System.out.println("总耗时为: "+total);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.concurrent.TimeUnit;
public class ComputeTask implements Runnable{
private String name;
private Result result;
public ComputeTask(Result result,String name){
this.result = result;
this.name = name;
}
public String getName() {
return name;
}
@Override
public void run() {
// TODO Auto-generated method stub
Long duration = (long)(Math.random()*20);
try {
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
result.setName(this.name);
result.setDuration(duration);
System.out.println("子线程"+this.getName()+"已完成计算任务");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Result{
private String name;
private Long duration;
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
public Long getDuration() {
return duration;
}
public void setDuration(Long duration) {
this.duration = duration;
}
@Override
public String toString() {
return "Result [name=" + name + ", duration=" + duration + "]";
}
}

示例二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
public class ComputeTask implements Callable<Result>{
private String name;
private Result result;
public ComputeTask(Result result,String name){
this.result = result;
this.name = name;
}
public String getName() {
return name;
}
@Override
public Result call() throws Exception {
// TODO Auto-generated method stub
Long duration = (long)(Math.random()*20);
try {
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
result.setName(this.name);
result.setDuration(duration);
System.out.println("子线程"+this.getName()+"已完成计算任务");
return result;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
public class FutureTaskForMultiCompute {
public static void main(String[] args) {
// TODO Auto-generated method stub
List<FutureTask<Result>> taskList = new ArrayList<FutureTask<Result>>();
ExecutorService exec = Executors.newFixedThreadPool(5);
for(int i = 0 ; i < 10; i++) {
final Result result = new Result();
FutureTask<Result> ft =new FutureTask<Result>(new ComputeTask(result,i+"号线程"));
taskList.add(ft);
exec.submit(ft);
}
System.out.println("所有计算任务提交完毕, 主线程接着干其他事情!");
System.out.println("我在等待他们的返回结果....");
long total = 0;
for(FutureTask<Result> ft:taskList){
try {
Result res=ft.get();
System.out.println(res.toString());
total +=res.getDuration();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
exec.shutdown();
System.out.println("总耗时为: "+total);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Result{
private String name;
private Long duration;
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
public Long getDuration() {
return duration;
}
public void setDuration(Long duration) {
this.duration = duration;
}
@Override
public String toString() {
return "Result [name=" + name + ", duration=" + duration + "]";
}
}

FutureTask在高并发环境下确保任务只执行一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class Connection {
private static Map<String, Connection> connectionPool = new HashMap<String, Connection>();
private String name;
private static Random random = new Random();
private static ReentrantLock lock = new ReentrantLock();
public Connection (String name){
this.name = name;
}
public static Connection getConnection(String key){
try{
lock.lock();
if(connectionPool.containsKey(key)){
return connectionPool.get(key);
}
else{
//创建 Connection
Connection conn = createConnection();
connectionPool.put(key, conn);
return conn;
}
}
finally{
lock.unlock();
}
}
private static Connection createConnection(){
return new Connection("db-connect-"+random.nextInt(1000));
}
public String getName() {
return name;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public class RequestDemo implements Runnable{
private String key;
public RequestDemo(String key){
this.key = key;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println(Thread.currentThread().getName() + "请求连接");
Connection con=Connection.getConnection(key);
System.out.println(Thread.currentThread().getName() + " 获得的连接名字为:"+con.getName());
}
}
1
2
3
4
5
6
7
8
9
public class Main {
public static void main(String[] args) {
for(int i = 0; i < 100 ; i++){
RequestDemo re = new RequestDemo("db");
Thread th =new Thread(re);
th.start();
}
}
}

在上面的例子中,我们通过加锁确保高并发环境下的线程安全,也确保了connection只创建一次,然而确牺牲了性能。改用ConcurrentHash的情况下,几乎可以避免加锁的操作,性能大大提高,但是在高并发的情况下有可能出现Connection被创建多次的现象。(我们将上述的HashMap换成ConcurrentHash,去掉lock,发现Connection会被创建多次),我们使用FutureTask可以解决这个问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.FutureTask;
public class Connection {
private static ConcurrentHashMap<String,FutureTask<Connection>>connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();
private String name;
private static Random random = new Random();
public Connection (String name){
this.name = name;
}
public static Connection getConnection(String key) throws Exception{
FutureTask<Connection>connectionTask=connectionPool.get(key);
if(connectionTask!=null){
return connectionTask.get();
}else{
Callable<Connection> callable = new Callable<Connection>(){
@Override
public Connection call() throws Exception {
// TODO Auto-generated method stub
return createConnection();
}
};
FutureTask<Connection>newTask = new FutureTask<Connection>(callable);
connectionTask = connectionPool.putIfAbsent(key, newTask);
if(connectionTask==null){
connectionTask = newTask;
connectionTask.run();
}
return connectionTask.get();
}
}
private static Connection createConnection(){
return new Connection("db-connect-"+random.nextInt(1000));
}
public String getName() {
return name;
}
}

FutureTask的实现

FutureTask的实现基于AbstractQueuedSynchronizer(以下简称为AQS)。java.util.concurrent中的很多可阻塞类(比如ReentrantLock)都是基于AQS来实现的。AQS是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。JDK 6中AQS被广泛使用,基于AQS实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch和FutureTask。

每一个基于AQS实现的同步器都会包含两种类型的操作,如下:

至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为get()/get(long timeout,TimeUnit unit)方法调用。
至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法。
基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。

AQS被作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。具体来说,Sync实现了AQS的tryAcquire Shared(int)方法和tryReleaseShared(int)方法,Sync通过这两个方法来检查和更新同步状态。

Sync是FutureTask的内部私有类,它继承自AQS。创建FutureTask时会创建内部私有的成员对象Sync,Future Task所有的的公有方法都直接委托给了内部私有的Sync。

FutureTask.get()方法会调用AQS.acquireSharedInterruptibly(int arg)方法:

调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法首先会回调在子类Sync中实现的tryAcquire Shared()方法来判断acquire操作是否可以成功。acquire操作可以成功的条件为:state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null。

如果成功则get()方法立即返回。如果失败则到线程等待队列中去等待其他线程执行release操作。

当其他线程执行release操作(比如FutureTask.run()或FutureTask.cancel(…))唤醒当前线程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤醒它的后继线程。

最后返回计算的结果或抛出异常。

FutureTask.run()方法的执行过程:

执行在构造函数中指定的任务(Callable.call())

以原子方式来更新同步状态(调用AQS.compareAndSetState(int expect,int update),设置state为执行完成状态RAN)。如果这个原子操作成功,就设置代表计算结果的变量result的值为Callable.call()的返回值,然后调用AQS.releaseShared(int arg)。

AQS.releaseShared(int arg)首先会回调在子类Sync中实现的tryReleaseShared(arg)来执行release操作(设置运行任务的线程runner为null,然会返回true);AQS.releaseShared(int arg),然后唤醒线程等待队列中的第一个线程。

调用FutureTask.done() 。

参考:

http://www.cnblogs.com/MOBIN/p/6185387.html

http://www.importnew.com/17572.html

http://blog.csdn.net/chdjj/article/details/38900521

http://blog.csdn.net/liulipuo/article/details/39029643

http://blog.csdn.net/linchunquan/article/details/22382487

http://blog.csdn.net/javazejian/article/details/50896505

《java的并发编程艺术》

Luckylau wechat
如果对您有价值,看官可以打赏的!