Luckylau's Blog

Java并发之同步控制

synchronized

在Java中,synchronized是用来表示同步的,我们可以synchronized来修饰一个方法。也可以synchronized来修饰方法里面的一个语句块。但是要注意在static方法和非static方法使用synchronized。大家都知道,static的方法属于类方法,它属于这个类,那么static获取到的锁,是属于类的锁。而非static方法获取到的锁,是属于当前对象的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Account {
private double balance;
public double getBalance() {
return balance;
}
public void setBalance(double balance) {
this.balance = balance;
}
public synchronized void addAmount(double amount){
System.out.println("addAmount...");
double tmp = balance;
try {
Thread.sleep(20);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
tmp+=amount;
balance = tmp;
System.out.println("after addAmount: "+balance);
}
public synchronized void subtractAmount(double amount){
System.out.println("subtractAmount......");
double tmp = balance;
try {
Thread.sleep(20);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
tmp-=amount;
balance=tmp;
System.out.println("after subtractAmount: "+balance);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Bank implements Runnable{
private Account account;
public Bank(Account account) {
this.account = account;
}
@Override
public void run() {
// TODO Auto-generated method stub
for(int i=0;i<100;i++){
account.subtractAmount(1000);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Company implements Runnable{
private Account account;
public Company(Account account) {
this.account = account;
}
@Override
public void run() {
// TODO Auto-generated method stub
for(int i=0;i<100;i++){
account.addAmount(1000);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Main {
public static void main(String[] args) {
Account account = new Account();
account.setBalance(1000);
Company company = new Company(account);
Thread companyThread = new Thread(company);
Bank bank = new Bank(account);
Thread bankThread = new Thread(bank);
System.out.printf("Account:Initial balance:%f\n", account.getBalance());
bankThread.start();
companyThread.start();
try {
bankThread.join();
companyThread.join();
System.out.printf("Account:Initial balance:%f\n", account.getBalance());
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
}

​ 当你使用synchronized关键字来保护代码块时,你必须通过一个对象的引用作为参数。通常,你将会使用this关键字来引用执行该方法的对象,但是你也可以使用其他对象引用。通常情况下,这些对象被创建只有这个目的。比如,你在一个类中有被多个线程共享的两个独立属性。你必须同步访问每个变量,如果有一个线程访问一个属性和另一个线程在同一时刻访问另一个属性,这是没有问题的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class Cinema {
private long vacanciesCinema1;
private long vacanciesCinema2;
private final Object controlCinema1,controlCinema2;
public Cinema(){
controlCinema1 = new Object();
controlCinema2 = new Object();
vacanciesCinema1 = 20;
vacanciesCinema2 = 20;
}
public boolean sellTickets1(int number) {
System.out.println("sellTickets1");
synchronized (controlCinema1) {
if(number < vacanciesCinema1){
vacanciesCinema1 -=number;
return true;
}else{
return false;
}
}
}
public boolean sellTickets2(int number) {
System.out.println("sellTickets2");
synchronized (controlCinema2) {
if(number < vacanciesCinema2){
vacanciesCinema2 -=number;
return true;
}else{
return false;
}
}
}
public boolean returnTickets1(int number){
System.out.println("returnTickets1");
synchronized (controlCinema1) {
vacanciesCinema1 +=number;
return true;
}
}
public boolean returnTickets2(int number){
System.out.println("returnTickets2");
synchronized (controlCinema2) {
vacanciesCinema2 +=number;
return true;
}
}
public long getVacanciesCinema1() {
return vacanciesCinema1;
}
public void setVacanciesCinema1(long vacanciesCinema1) {
this.vacanciesCinema1 = vacanciesCinema1;
}
public long getVacanciesCinema2() {
return vacanciesCinema2;
}
public void setVacanciesCinema2(long vacanciesCinema2) {
this.vacanciesCinema2 = vacanciesCinema2;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TicketOffice1 implements Runnable{
private Cinema cinema;
public TicketOffice1(Cinema cinema){
this.cinema = cinema;
}
@Override
public void run() {
cinema.sellTickets1(2);
cinema.sellTickets1(2);
cinema.sellTickets2(2);
cinema.returnTickets1(2);
cinema.sellTickets1(2);
cinema.sellTickets2(2);
cinema.sellTickets2(2);
cinema.sellTickets2(2);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class TicketOffice2 implements Runnable{
private Cinema cinema;
public TicketOffice2(Cinema cinema){
this.cinema = cinema;
}
@Override
public void run() {
cinema.sellTickets2(2);
cinema.sellTickets2(2);
cinema.sellTickets1(2);
cinema.sellTickets1(1);
cinema.returnTickets2(2);
cinema.sellTickets1(2);
cinema.sellTickets2(2);
cinema.sellTickets1(2);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Main {
public static void main(String[] args) {
Cinema cinema = new Cinema();
TicketOffice1 ticketOffice1 = new TicketOffice1(cinema);
Thread thread1 = new Thread(ticketOffice1,"TicketOffice1");
TicketOffice2 ticketOffice2 = new TicketOffice2(cinema);
Thread thread2 = new Thread(ticketOffice2,"TicketOffice2");
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (Exception e) {
e.printStackTrace();
}
System.out.printf("Room 1 Vacancies: %d\n",cinema.getVacanciesCinema1());
System.out.printf("Room 2 Vacancies: %d\n",cinema.getVacanciesCinema2());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class EventStorage {
private int maxSize;
private List<Date> storage;
public EventStorage(){
maxSize = 10;
storage = new LinkedList<Date>();
}
public synchronized void set(){
while(storage.size() == maxSize){
System.out.println("--------------满仓-----------------");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
((LinkedList<Date>) storage).offer(new Date());
System.out.println("Set:"+storage.size());
notifyAll();
}
public synchronized void get(){
while(storage.size()==0){
System.out.println("---------------------------等待中--------------------");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.printf("Get:%d:%s\n",storage.size(),((LinkedList<Date>) storage).poll());
notifyAll();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Producer implements Runnable {
private EventStorage storage;
public Producer(EventStorage storage){
this.storage = storage;
}
@Override
public void run() {
// TODO Auto-generated method stub
for(int i=0;i<100;i++){
storage.set();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public class Consumer implements Runnable {
private EventStorage storage;
public Consumer(EventStorage storage){
this.storage = storage;
}
@Override
public void run() {
for(int i=0;i<100;i++){
storage.get();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
public class Main {
public static void main(String[] args) {
EventStorage stor= new EventStorage();
Consumer consumer = new Consumer(stor);
Thread th1 = new Thread(consumer);
Producer producer = new Producer(stor);
Thread th2 = new Thread(producer);
th1.start();
th2.start();
}
}

Lock

Lock 接口比synchronized关键字提供更多额外的功能,在使用Lock时需要注意的是要释放Lock锁。

lock的接口

1
2
3
4
5
6
7
8
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}

一般使用的方式为

1
2
3
4
5
6
7
8
9
Lock lock = ...;
lock.lock();
try{
//处理任务
}catch(Exception ex){
}finally{
lock.unlock(); //释放锁
}

​ 另外一个重要的概念是Condition。Condition 将 Object的通信方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 通信方法的使用。

​ 在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。

​ Condition的强大之处在于它可以为多个线程间建立不同的Condition, 使用synchronized/wait()只有一个阻塞队列,notifyAll会唤起所有阻塞队列下的线程,而使用lock/condition,可以实现多个阻塞队列,signalAll只会唤起某个阻塞队列下的阻塞线程。

我们来看一下它们在实现生产者何消费者的不同。

synchronized/wait()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class EventStorage {
private int maxSize;
private List<Date> storage;
public EventStorage(){
maxSize = 10;
storage = new LinkedList<Date>();
}
public synchronized void set(){
while(storage.size() == maxSize){
System.out.println("--------------满仓-----------------");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
((LinkedList<Date>) storage).offer(new Date());
System.out.println(Thread.currentThread().getName()+" Set:"+storage.size());
notifyAll();
}
public synchronized void get(String name){
while(storage.size()==0){
System.out.println(name+"---------------------------等待中--------------------");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.printf(name+"Get:%d:%s\n",storage.size(),((LinkedList<Date>) storage).poll());
notifyAll();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Producer implements Runnable {
private EventStorage storage;
public Producer(EventStorage storage){
this.storage = storage;
}
@Override
public void run() {
// TODO Auto-generated method stub
while(true){
storage.set();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Consumer implements Runnable{
private EventStorage storage;
private String name;
public Consumer(EventStorage storage,String name){
this.storage = storage;
this.name = name;
}
@Override
public void run() {
// TODO Auto-generated method stub
while(true){
storage.get(name);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Main {
public static void main(String[] args) {
EventStorage stor= new EventStorage();
for(int i = 0 ;i < 6; i++) {
Consumer consumer = new Consumer(stor,i+"号消费者");
Thread th1 = new Thread(consumer);
th1.start();
}
for(int i =0 ; i < 2; i++) {
Producer producer = new Producer(stor);
Thread th2 = new Thread(producer,i+"号生产者");
th2.start();
}
}
}
lock/condition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class EventStorage {
private final Lock lock;
private final Condition notFull;
private final Condition notEmpty;
private int maxSize;
private List<Date> storage;
public EventStorage(){
maxSize = 10;
storage = new LinkedList<Date>();
lock = new ReentrantLock();
notFull =lock.newCondition();
notEmpty =lock.newCondition();
}
public void set(){
lock.lock();
try {
while(storage.size() == maxSize){
System.out.println("--------------满仓-----------------");
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
TimeUnit.SECONDS.sleep(1);
((LinkedList<Date>) storage).offer(new Date());
System.out.println(Thread.currentThread().getName()+" Set:"+storage.size());
notEmpty.signalAll();
} catch (InterruptedException e ) {
// TODO: handle exception
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void get(String name){
lock.lock();
try {
while(storage.size()==0){
System.out.println(name+"---------------------------等待中--------------------");
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
TimeUnit.SECONDS.sleep(1);
System.out.printf(name+"Get:%d:%s\n",storage.size(),((LinkedList<Date>) storage).poll());
notFull.signalAll();
} catch (InterruptedException e) {
// TODO: handle exception
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class PrintQueue {
private final Lock queuelock = new ReentrantLock();
public void printJob(Object document){
queuelock.lock();//获取Lock对象的控制权
try {
Long duration = (long)(Math.random()*10000);
System.out.println(Thread.currentThread().getName()
+"PrintQueue:Printing a Job during "
+(duration/1000)+" seconds");
Thread.sleep(duration);
} catch (Exception e) {
e.printStackTrace();
} finally{
queuelock.unlock();//释放Lock对象的控制
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Job implements Runnable {
private PrintQueue printQueue;
public Job(PrintQueue printQueue){
this.printQueue = printQueue;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.printf("%s:Going to print a document\n",
Thread.currentThread().getName());
printQueue.printJob(new Object());
System.out.printf("%s:The document has been printed\n",Thread.currentThread().getName());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public class Main {
public static void main(String[] args) {
PrintQueue printQueue = new PrintQueue();
Thread thread[] = new Thread[10];
for(int i=0;i<10;i++){
thread[i] = new Thread(new Job(printQueue),"Thread"+i);
}
for(int i=0;i<10;i++){
thread[i].start();
}
}
}

ReentrantReadWriteLock

ReentrantReadWriteLock,首先要做的是与ReentrantLock划清界限。它和后者都是单独的实现,彼此之间没有继承或实现的关系,它是实现了ReadWriteLock接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}

ReadLock获取的是共享锁,WriteLock获取的是独占锁。当写操作时,其他线程无法读取或写入数据,而当读操作时,其它线程无法写入数据,但却可以读取数据 。所以读写锁:分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由jvm自己控制的,你只要上好相应的锁即可。如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁;如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁。总之,读的时候上读锁,写的时候上写锁。

ReentrantReadWriteLock具有如下特性:

公平性:非公平锁(默认) 这个和独占锁的非公平性一样,由于读线程之间没有锁竞争,所以读操作没有公平性和非公平性,写操作时,由于写操作可能立即获取到锁,所以会推迟一个或多个读操作或者写操作。因此非公平锁的吞吐量要高于公平锁。公平锁 利用AQS的CLH队列,释放当前保持的锁(读锁或者写锁)时,优先为等待时间最长的那个写线程分配写入锁,当前前提是写线程的等待时间要比所有读线程的等待时间要长。同样一个线程持有写入锁或者有一个写线程已经在等待了,那么试图获取公平锁的(非重入)所有线程(包括读写线程)都将被阻塞,直到最先的写线程释放锁。如果读线程的等待时间比写线程的等待时间还有长,那么一旦上一个写线程释放锁,这一组读线程将获取锁。

重入性:读写锁允许读线程和写线程按照请求锁的顺序重新获取读取锁或者写入锁。当然了只有写线程释放了锁,读线程才能获取重入锁。写线程获取写入锁后可以再次获取读取锁,但是读线程获取读取锁后却不能获取写入锁。读写锁最多支持65535个递归写入锁和65535个递归读取锁。

锁降级:写线程获取写入锁后可以获取读取锁,然后释放写入锁,这样就从写入锁变成了读取锁,从而实现锁降级的特性。

1
2
3
4
5
6
7
8
ReadWriteLock rtLock = new ReentrantReadWriteLock();
rtLock.writeLock().lock();
System.out.println("writeLock");
rtLock.readLock().lock();
System.out.println("get read lock");
//虽然不会导致死锁,但没有正确的释放锁。从写锁降级成读锁,并不会自动释放当前线程获取的写锁,仍然需要显示的释放,否则别的线程永远也获
//取不到写锁。
rtLock.writeLock().unlock();

锁升级:读取锁是不能直接升级为写入锁的。因为获取一个写入锁需要释放所有读取锁,所以如果有两个读取锁视图获取写入锁而都不释放读取锁时就会发生死锁。

下面就是锁升级,ReentrantReadWriteLock是不支持的,会产生死锁。

1
2
3
4
5
ReadWriteLock rtLock = new ReentrantReadWriteLock();
rtLock.readLock().lock();
System.out.println("get readLock.");
rtLock.writeLock().lock();
System.out.println("blocking");

可以这样实现:

1
2
3
4
5
6
ReadWriteLock rtLock = new ReentrantReadWriteLock();
rtLock.readLock().lock();
System.out.println("get readLock.");
rtLock.readLock().unlock();
rtLock.writeLock().lock();
System.out.println("blocking");

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Main {
public static void main(String[] args) {
PricesInfo priceinfo = new PricesInfo();
Reader [] readers = new Reader[5];
Thread [] threadreaders = new Thread[5];
for(int i =0 ;i <5;i ++){
readers[i]=new Reader(priceinfo);
threadreaders[i]=new Thread(readers[i]);
}
for ( int i = 0; i < 5; i++ ) {
threadreaders[i].start();
}
Writer writer = new Writer(priceinfo);
new Thread(writer).start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Reader implements Runnable {
private PricesInfo pricesInfo;
public Reader(PricesInfo pricesInfo){
this.pricesInfo = pricesInfo;
}
@Override
public void run() {
// TODO Auto-generated method stub
for(int i=0;i<15;i++){
pricesInfo.getPrice();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import java.util.Random;
public class Writer implements Runnable{
private PricesInfo pricesInfo;
public Writer(PricesInfo pricesInfo){
this.pricesInfo = pricesInfo;
}
@Override
public void run() {
for(int i=0;i<3;i++){
int p1 = new Random().nextInt(47);
int p2 = new Random().nextInt(47)*10;
pricesInfo.setPrices(p1, p2);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PricesInfo {
private int price1;
private int price2;
private ReadWriteLock lock;
public PricesInfo(){
price1 = 1;
price2 = 1;
lock = new ReentrantReadWriteLock();
}
public void getPrice() {
lock.readLock().lock();
System.out.println(Thread.currentThread().getName()+" begin to getPrice");
isSleep(true);
System.out.printf("%s:Price 1:%d Price 2:%d\n", Thread.currentThread().getName(),getPrice1(),getPrice2());
lock.readLock().unlock();
}
public int getPrice1() {
return price1;
}
public int getPrice2() {
return price2;
}
public void setPrices(int price1,int price2){
lock.writeLock().lock();
System.out.println("Writer:Attempt to modify the prices1: "+price1+" price2:"+price2);
isSleep(true);
System.out.println("Writer:Prices have been modified,prices1:"+price1+" price2:"+price2);
this.price1 = price1;
this.price2 = price2;
lock.writeLock().unlock();
}
private static void isSleep(boolean bool){
if(bool){
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else{
}
}
}

StampedLock

Luckylau wechat
如果对您有价值,看官可以打赏的!