Luckylau's Blog

Java并发之线程同步工具

CountDownLatch

​ CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。CountDownLatch的一个非常典型的应用场景是:有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。假如我们这个想要继续往下执行的任务调用一个CountDownLatch对象的await()方法,其他的任务执行完自己的任务后调用同一个CountDownLatch对象上的countDown()方法,这个调用await()方法的任务将一直阻塞等待,直到这个CountDownLatch对象的计数值减到0为止。

伪代码为:

​ Main thread start
​ Create CountDownLatch for N threads
​ Create and start N threads
​ Main thread wait on latch
​ N threads completes there tasks are returns
​ Main thread resume execution

主要方法:
public CountDownLatch(int count);
public void countDown();
public void await() throws InterruptedException

CountDownLatch实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package Demo;
import java.util.concurrent.CountDownLatch;
public abstract class BaseHealthChecker implements Runnable{
private CountDownLatch latch;
private String name;
private boolean isServiceUp;
public BaseHealthChecker(CountDownLatch latch, String name) {
super();
this.latch = latch;
this.name = name;
this.isServiceUp=false;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isServiceUp() {
return isServiceUp;
}
public void setServiceUp(boolean isServiceUp) {
this.isServiceUp = isServiceUp;
}
public abstract void checkService();
@Override
public void run() {
// TODO Auto-generated method stub
try {
checkService();
isServiceUp=true;
} catch (Throwable t) {
// TODO: handle exception
t.printStackTrace(System.err);
isServiceUp=false;
} finally{
if(latch!=null){
latch.countDown();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package Demo;
import java.util.concurrent.CountDownLatch;
public class CacheHealthChecker extends BaseHealthChecker {
public CacheHealthChecker(CountDownLatch latch) {
super(latch, "Cache Service");
// TODO Auto-generated constructor stub
}
@Override
public void checkService() {
// TODO Auto-generated method stub
System.out.println("Checking "+this.getName());
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(this.getName()+"is Up");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package Demo;
import java.util.concurrent.CountDownLatch;
public class DatabaseHealthChecker extends BaseHealthChecker {
public DatabaseHealthChecker(CountDownLatch latch) {
super(latch, "Database Service");
// TODO Auto-generated constructor stub
}
@Override
public void checkService() {
// TODO Auto-generated method stub
System.out.println("Checking "+this.getName());
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(this.getName()+"is Up");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package Demo;
import java.util.concurrent.CountDownLatch;
public class NetworkHealthChecker extends BaseHealthChecker {
public NetworkHealthChecker(CountDownLatch latch) {
super(latch, "Network Service");
// TODO Auto-generated constructor stub
}
@Override
public void checkService() {
// TODO Auto-generated method stub
System.out.println("Checking "+this.getName());
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(this.getName()+" is Up");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package Demo;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ApplicationStartupUtil {
private static List<BaseHealthChecker> services;
private static CountDownLatch latch;
private final static ApplicationStartupUtil app=new ApplicationStartupUtil();
public ApplicationStartupUtil(){
}
public static ApplicationStartupUtil getInstance(){
return app;
}
public static boolean checkExternalService(){
boolean re=true;
latch=new CountDownLatch(3);
services=new ArrayList<BaseHealthChecker>();
services.add(new NetworkHealthChecker(latch));
services.add(new CacheHealthChecker(latch));
services.add(new DatabaseHealthChecker(latch));
ExecutorService executors=Executors.newFixedThreadPool(services.size());
for(final BaseHealthChecker v: services){
executors.execute(v);
}
executors.shutdown();
try {
latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
for(final BaseHealthChecker v:services){
if(! v.isServiceUp()){
re=false;
System.out.println("All services checked ,result is "+re);
}
}
System.out.println("All services checked ,result is "+re);
return re;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Test {
public static void main(String[] args) {
boolean re=false;
ApplicationStartupUtil.checkExternalService();
}
}
//输出:
Checking Network Service
Checking Cache Service
Checking Database Service
Network Service is Up
Cache Serviceis Up
Database Serviceis Up
All services checked ,result is true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Participant implements Runnable {
private Videoconference videoconference;
private String name;
public Participant(Videoconference videoconference, String name) {
// TODO Auto-generated constructor stub
this.videoconference = videoconference;
this.name = name;
}
@Override
public void run() {
// TODO Auto-generated method stub
long duration = (long)(Math.random()*10);
videoconference.arrive(name,duration);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class Videoconference implements Runnable{
private final CountDownLatch controller;
private int counter;
public Videoconference(int number){
this.controller =new CountDownLatch(number);
this.counter = number;
}
public synchronized void arrive(String name,long duration){
try {
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(name+" has arrived.");
System.out.println("VideoConference:Waiting for "+(--counter));
controller.countDown();
}
@Override
public void run() {
System.out.println("VideoConference:Initialization:"+controller.getCount());
// TODO Auto-generated method stub
try {
controller.await();
System.out.printf("VideoConference: All the participants have come\n");
System.out.printf("VideoConference: Let's start...\n");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public class Main {
public static void main(String[] args) {
Videoconference conference = new Videoconference(10);
Thread threadConference = new Thread(conference);
threadConference.start();
for(int i=0;i<10;i++){
Participant p = new Participant(conference, "Participant"+i);
Thread t = new Thread(p);
t.start();
}
}
}

CyclicBarrier

​ CyclicBarrier 类有一个整数初始值,此值表示将在同一点同步的线程数量。当其中一个线程到达确定点,它会调用await() 方法来等待其他线程。此时CyclicBarrier阻塞该线程进入休眠等待其他线程的到达。当最后一个线程调用CyclicBarrier 类的await() 方法,它唤醒所有等待的线程并继续执行它们的任务。

​ CountDownLatch和CyclicBarrier的区别在于:CountDownLatch 适用于一组线程和另一个主线程之间的工作协作。一个主线程等待一组工作线程的任务完毕才继续它的执行是使用 CountDownLatch 的主要场景;CyclicBarrier 用于一组或几组线程,比如一组线程需要在一个时间点上达成一致,例如同时开始一个工作。CyclicBarrier 的循环特性和构造函数所接受的 Runnable 参数也是 CountDownLatch 所不具备的; CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。

CyclicBarrier实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DemoCyclicBarrier {
public static void main(String[] args) {
int thread_num=5;
CyclicBarrier cyclicBarrier=new CyclicBarrier(thread_num, new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("Internal Barrier");
}
});
ExecutorService executor=Executors.newFixedThreadPool(thread_num);
for (int i=0;i<5;i++){
executor.execute(new worker("worker "+i, cyclicBarrier));
}
executor.shutdown();
}
}
class worker implements Runnable{
private String name;
private CyclicBarrier cyclicbarrier;
public worker(String name, CyclicBarrier cyclicBarrier){
this.name=name;
this.cyclicbarrier=cyclicBarrier;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public void run() {
// TODO Auto-generated method stub
try {
Thread.sleep(1000 * (new Random()).nextInt(8));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("worker "+this.getName()+" is waiting");
try {
cyclicbarrier.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("worker"+this.getName()+" is working");
}
}

Semaphore

​ Semaphore 直译是信号量,可能称它是许可量更容易理解。当然,因为在计算机科学中这个名字由来已久,所以不能乱改。它的功能比较好理解,就是通过构造函数设定一个数量的许可,然后通过 acquire 方法获得许可,release 方法释放许可。它还有 tryAcquire 和 acquireUninterruptibly 方法,可以根据自己的需要选择。当一个线程想要访问某个共享资源,首先,它必须获得semaphore。如果semaphore的内部计数器的值大于0,那么semaphore减少计数器的值并允许访问共享的资源。计数器的值大于0表示,有可以自由使用的资源,所以线程可以访问并使用它们。在默认的情况下信号量的进入是不公平的。如果在初始化的第二个参数设定为true时,则会选择时间等待最久的一个进入。

Semaphore实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
Semaphore semp=new Semaphore(5);
for(int i=0;i<10;i++){
exec.execute(new workerThread(i,semp));
}
exec.shutdown();
}
}
class workerThread implements Runnable{
private int id ;
private Semaphore semp;
public workerThread(int id, Semaphore semp) {
super();
this.id = id;
this.semp = semp;
}
public int getId() {
return id;
}
@Override
public void run() {
// TODO Auto-generated method stub
try {
semp.acquire();
System.out.println("workerThread id "+this.getId()+" get Access");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("workerThread id "+this.getId()+" finish the work");
semp.release();//注销该语句后,只会执行5个线程,其他处在阻塞中
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
//输出(未注销semp.release())
workerThread id 0 get Access
workerThread id 2 get Access
workerThread id 3 get Access
workerThread id 4 get Access
workerThread id 1 get Access
workerThread id 1 finish the work
workerThread id 5 get Access
workerThread id 2 finish the work
workerThread id 6 get Access
workerThread id 4 finish the work
workerThread id 7 get Access
workerThread id 6 finish the work
workerThread id 8 get Access
workerThread id 7 finish the work
workerThread id 9 get Access
workerThread id 8 finish the work
workerThread id 0 finish the work
workerThread id 3 finish the work
workerThread id 9 finish the work
workerThread id 5 finish the work
//输出(注销semp.release())
workerThread id 1 get Access
workerThread id 4 get Access
workerThread id 0 get Access
workerThread id 2 get Access
workerThread id 3 get Access
workerThread id 2 finish the work
workerThread id 0 finish the work
workerThread id 3 finish the work
workerThread id 4 finish the work
workerThread id 1 finish the work
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Job implements Runnable {
private PrintQueue printQueue;
public Job(PrintQueue printQueue){
this.printQueue = printQueue;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.printf("%s:Going to print a document\n",
Thread.currentThread().getName());
printQueue.printJob(new Object());
System.out.printf("%s:The document has been printed\n",Thread.currentThread().getName());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import java.util.concurrent.Semaphore;
public class PrintQueue {
private final Semaphore semaphore;
public PrintQueue(){
semaphore = new Semaphore(1, true);
}
public void printJob(Object document){
try {
semaphore.acquire();
Long duration = (long)(Math.random()*10000);
System.out.println(Thread.currentThread().getName()
+" PrintQueue:Printing a Job during "
+(duration/1000)+" seconds");
Thread.sleep(duration);
} catch (Exception e) {
e.printStackTrace();
} finally{
semaphore.release();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Main {
public static void main(String[] args) {
PrintQueue printQueue = new PrintQueue();
Thread thread[] = new Thread[10];
for(int i=0;i<10;i++){
thread[i] = new Thread(new Job(printQueue),"Thread"+i);
}
for(int i=0;i<10;i++){
thread[i].start();
}
}
}

Phaser

​ JDK 1.7 添加了一个新的工具Phaser,Phaser在功能上与CountDownLatch有部分重合。下面使用Phaser类来同步3个并发任务。这3个任务会在3个不同的文件夹和它们的子文件夹中搜索扩展名是.log的文件。这个任务被分成3个步骤:1.在指定的文件夹和子文件夹中获得文件扩展名为.log的文件列表。2.在操控台打印结果。3.在步骤1和步骤2的结尾我们要检查列表是否为空。如果为空,那么线程直接结束运行并从phaser类中淘汰。

Phaser实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.Phaser;
public class Main {
public static void main(String[] args) {
Object object = new Object();
Phaser phaser = new Phaser(3);
FileSearch system = new FileSearch("C:\\Windows",".log",phaser,object);
FileSearch apps = new FileSearch("c:\\Program Files",".log",phaser,object);
FileSearch documents = new FileSearch("c:\\Documents And Settings",".log",phaser,object);
Thread systemThread = new Thread(system,"Windows");
systemThread.start();
Thread appsThread = new Thread(apps,"Program Files");
appsThread.start();
Thread documentsThread = new Thread(documents,"Documents And Settings");
documentsThread.start();
try {
systemThread.join();
appsThread.join();
documentsThread.join();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Terminated: " + phaser.isTerminated());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Phaser;
public class FileSearch implements Runnable {
private String initPath;
private String suffix;
private Phaser phaser;
private List<String> results;
private final Object controller;
public FileSearch(String initPath, String suffix,Phaser phaser,Object object) {
this.initPath = initPath;
this.suffix = suffix;
this.phaser = phaser;
results=new ArrayList<String>();
controller=object;
}
@Override
public void run() {
// TODO Auto-generated method stub
phaser.arriveAndAwaitAdvance();
System.out.printf("%s: Starting.\n", Thread.currentThread().getName());
File file = new File(initPath);
if (file.isDirectory()) {
try {
directoryProcess(file);
} catch (InterruptedException e) {
System.out.printf("%s: The search has been interrupted",Thread.currentThread().getName());
}
}
if(resultsisEmpty()){
return;
}
showInfo();
}
private boolean resultsisEmpty(){
if(results.isEmpty()){
System.out.printf("%s :0 results\n",Thread.currentThread().getName());
phaser.arriveAndDeregister();
return true;
}else{
System.out.printf("%s :%d results\n",Thread.currentThread().getName(),
results.size());
phaser.arriveAndAwaitAdvance();
return false;
}
}
private void showInfo(){
synchronized (controller) {
for(int i =0 ;i < results.size();i++){
File file = new File(results.get(i));
System.out.printf("%s: %s\n", Thread.currentThread().getName(),
file.getAbsolutePath());
}
System.out.printf("%s: Work completed.\n", Thread.currentThread().getName());
phaser.arriveAndDeregister();
}
}
private void directoryProcess(File file) throws InterruptedException {
File list[] = file.listFiles();
if (list != null) {
for (int i = 0; i < list.length; i++) {
if (list[i].isDirectory()) {
directoryProcess(list[i]);
} else {
fileProcess(list[i]);
}
}
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
private void fileProcess(File file) throws InterruptedException
{
if (file.getName().endsWith(suffix)) {
results.add(file.getAbsolutePath());
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
}

Exchanger

Exchanger实例

LockSupport

LockSupport实例

参考:

1.http://www.importnew.com/15731.html

2.http://blog.csdn.net/junshuaizhang/article/details/39580751

3.http://blog.csdn.net/junshuaizhang/article/details/39667289

4.http://developer.51cto.com/art/201403/432095.htm

Luckylau wechat
如果对您有价值,看官可以打赏的!