1. ReentrantReadWriteLock
本章路线总纲
无锁→独占锁→读写锁→邮戳锁
关于锁的大厂面试题
你知道Java里面有哪些锁?
你说你用过读写锁,锁饥饿问题是什么?有没有比读写锁更快的锁?
StampedLock知道吗?(邮戳锁/票据锁)
ReentrantReadWriteLock有锁降级机制,你知道吗?
1.1 读写锁ReentrantReadWriteLock
读写锁:一个资源能够被多个读线程访问,或者被一个写线程访问但是不能同时存在读写线程。
它只允许读读共存,而读写和写写依然是互斥的,大多实际场景是“读/读”线程间并不存在互斥关系,只有”读/写”线程或”写/写”线程间的操作需要互斥的。因此引入ReentrantReadWriteLock
一个ReentrantReadWriteLock同时只能存在一个写锁但是可以存在多个读锁,但不能同时存在写锁和读锁(切菜还是拍蒜选一个)。也即一个资源可以被多个读操作访问―或一个写操作访问,但两者不能同时进行。只有在读多写少情景下,读写锁才具有较高的性能体现。
排他锁和共享锁
排它锁:又称独占锁,独享锁 synchronized,ReentrantLock都是排它锁。以ReentrantLock为例,会发现读操作互斥,只能一个线程一个线程读,性能差
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
| package com.bilibili.juc.rwlock;
import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
class CoResource { Map<String,String> map = new HashMap<>(); Lock lock = new ReentrantLock(); ReadWriteLock rwLock = new ReentrantReadWriteLock();
public void write(String key ,String value) { lock.lock(); try { System.out.println(Thread.currentThread().getName()+"\t"+"正在写入"); map.put(key,value); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"\t"+"完成写入"); }finally { lock.unlock(); } }
public void read(String key) { lock.lock(); try { System.out.println(Thread.currentThread().getName()+"\t"+"正在读取"); String result = map.get(key); try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"\t"+"完成读取"+"\t"+result); }finally { lock.unlock(); } }
}
public class ReentrantLockDemo { public static void main(String[] args) { CoResource coResource = new CoResource();
for (int i = 1; i <=10; i++) { int finalI = i; new Thread(() -> { coResource.write(finalI +"", finalI +""); },String.valueOf(i)).start(); }
for (int i = 1; i <=10; i++) { int finalI = i; new Thread(() -> { coResource.read(finalI +""); },String.valueOf(i)).start(); }
} } 输出 1 正在写入 1 完成写入 3 正在写入 3 完成写入 4 正在写入 4 完成写入 2 正在写入 2 完成写入 5 正在写入 5 完成写入 6 正在写入 6 完成写入 7 正在写入 7 完成写入 8 正在写入 8 完成写入 9 正在写入 9 完成写入 10 正在写入 10 完成写入 1 正在读取 1 完成读取 1 2 正在读取 2 完成读取 2 4 正在读取 4 完成读取 4 6 正在读取 6 完成读取 6 5 正在读取 5 完成读取 5 3 正在读取 3 完成读取 3 7 正在读取 7 完成读取 7 8 正在读取 8 完成读取 8 9 正在读取 9 完成读取 9 10 正在读取 10 完成读取 10
|
共享锁:又称为读锁,获得共享锁后,可以查看,但无法删除和修改数 据, 其他线程此时业也可以获取到共享锁,也可以查看但是 无法修改和 删除数据
- 共享锁和排它锁典型是ReentranReadWriteLock
- 写锁是排它锁
- 读锁是共享锁
读写锁ReentrantRWLock优势
大实际生活中多实际场景是“读/读”线程间并不存在互斥关系, 只有“读/写”线程或”写/写”线程间的操作需要互斥的。因此引入ReentrantReadWriteLock。
ReentrantReadWriteLock它只允许读读共存,而读写和写写依然是互斥的,
一个ReentrantReaçWriteLock同时只能存在一个写锁但是可以存在多个读锁,但不能同时存在写锁和读锁(切菜还是拍蒜选一个)。 也即
- 一个资源可以被多个读操作访问
- 一个写操作访问,
- 读写仍然互斥。
只有在读多写少情景之下,读写锁才具有较高的性能体现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
| package com.bilibili.juc.rwlock;
import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyResource { Map<String,String> map = new HashMap<>(); Lock lock = new ReentrantLock(); ReadWriteLock rwLock = new ReentrantReadWriteLock();
public void write(String key ,String value) { rwLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName()+"\t"+"正在写入"); map.put(key,value); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"\t"+"完成写入"); }finally { rwLock.writeLock().unlock(); } }
public void read(String key) { rwLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName()+"\t"+"正在读取"); String result = map.get(key); try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"\t"+"完成读取"+"\t"+result); }finally { rwLock.readLock().unlock(); } }
}
public class ReentrantReadWriteLockDemo { public static void main(String[] args) { MyResource myResource = new MyResource();
for (int i = 1; i <=10; i++) { int finalI = i; new Thread(() -> { myResource.write(finalI +"", finalI +""); },String.valueOf(i)).start(); }
for (int i = 1; i <=10; i++) { int finalI = i; new Thread(() -> { myResource.read(finalI +""); },String.valueOf(i)).start(); }
} } 输出: 2 正在写入 2 完成写入 4 正在写入 4 完成写入 1 正在写入 1 完成写入 3 正在写入 3 完成写入 5 正在写入 5 完成写入 6 正在写入 6 完成写入 7 正在写入 7 完成写入 8 正在写入 8 完成写入 9 正在写入 9 完成写入 10 正在写入 10 完成写入 1 正在读取 2 正在读取 3 正在读取 4 正在读取 5 正在读取 6 正在读取 7 正在读取 8 正在读取 9 正在读取 10 正在读取 1 完成读取 1 10 完成读取 10 4 完成读取 4 8 完成读取 8 9 完成读取 9 2 完成读取 2 3 完成读取 3 5 完成读取 5 6 完成读取 6 7 完成读取 7
|
1.2 锁降级
ReentrantReadWriteLock锁降级 : 将写入锁降级为读锁(类似Linux文件读写权限理解,就像写权限要高于读权限一样),
写锁的降级,降级成为了读锁
1)如果同一个线程持有了写锁,在没有释放写锁的情况下,它还可以继续获得读锁。这就是写锁的降级,降级成为了读锁。
2)规则惯例,先获取写锁,然后获取读锁,再释放写锁的次序。
3)如果释放了写锁,那么就完全转换为读锁。
锁降级是为了让当前线程感知到数据的变化,目的是保证数据可见性
如果有线程在读,那么写线程是无法获取写锁的,是悲观锁的策略
在ReentrantReadWriteLock中,当读锁被使用时,如果有线程尝试获取写锁,该写线程会被阻塞。所以,需要释放所有读锁,才可获取写锁。
写锁和读锁是互斥的(这里的互斥是指线程间的互斥,当前线程可以获取到写锁又获取到读锁,但是获取到了读锁不能继续获取写锁),这是因为读写锁要保持写操作的可见性。因为,如果允许读锁在被获取的情况下对写锁的获取,那么正在运行的其他读线程无法感知到当前写线程的操作。
ReentrantReadWriteLock读的过程中不允许写,只有等待线程都释放了读锁,当前线程才能获取写锁,也就是写入必须等待,这是一种悲观的读锁,人家还在读着那,你先别去写,省的数据乱。
分析StampedLock(后面详细讲解),会发现它改进之处在于:
读的过程中也允许获取写锁介入(相当牛B,读和写两个操作也让你“共享”(注意引号)),这样会导致我们读的数据就可能不一致所以,需要额外的方法来判断读的过程中是否有写入,这是一种乐观的读锁。
显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。
1.3 为什么要锁降级?
锁降级确实不太贴切,明明是锁切换,在写锁释放前由写锁切换成了读锁。问题的关键其实是为什么要在锁切换前就加上读锁呢?防止释放写锁的瞬间被其他线程拿到写锁然后修改了数据,然后本线程在拿到读锁后读取数据就发生了错乱。但是,我把锁的范围加大一点不就行了吗?在写锁的范围里面完成读锁里面要干的事。缺点呢就是延长了写锁的占用时长,导致性能下降。对于中小公司而言没必要,随便在哪都能把这点性能捡回来了!
2.4 锁饥饿问题
ReentrantReadWriteLock实现了读写分离,但是一旦读操作比较多的时候,想要获取写锁就变得比较困难了,假如当前1000个线程,999个读,1个写,有可能999个读取线程长时间抢到了锁,那1个写线程就悲剧了因为当前有可能会一直存在读锁,而无法获得写锁,根本没机会写。
如何缓解锁饥饿问题?
使用”公平”策略可以一定程度上缓解这个问题,但是”公平”策略是以牺牲系统吞吐量为代价的
StampedLock类的乐观读锁闪亮登场
2. 邮戳锁StampedLock
2.1 StampedLock横空出世
StampedLock(也叫票据锁)是JDK1.8中新增的一个读写锁,也是对JDK1.5中的读写锁ReentrantReadWriteLock的优化。
stamp(戳记,long类型)
代表了锁的状态。当stamp返回零时,表示线程获取锁失败。并且,当释放锁或者转换锁的时候,需要传入最初获取的stamp值。
ReentrantReadWriteLock的读锁被占用的时候,其他线程尝试获取写锁的时候会被阻塞。但是,StampedLock采取乐观获取锁后,其他线程尝试获取写锁时不会被阻塞,这其实是对读锁的优化,所以,在获取乐观读锁后,还需要对结果进行校验。
ReentrantReadWriteLock
允许多个线程向时读,但是只允许一个线程写,在线程获取到写锁的时候,其他写操作和读操作都会处于阻塞状态,
读锁和写锁也是互斥的,所以在读的时候是不允许写的,读写锁比传统的synchronized速度要快很多,原因就是在于ReentrantReadWriteLock支持读并发,读读可以共享。
对短的只读代码段,使用乐观模式通常可以减少争用并提高吞吐量
2.2 ReentrantLock、ReentrantReadWriteLock、StampedLock性能比较
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
| public class ReentrantReadWriteLockTest { static Lock lock = new ReentrantLock(); static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); static StampedLock stampedLock = new StampedLock(); static int read = 1000; static int write = 3; static long mills = 10;
public static void main(String[] args) { testReentrantLock(); testReentrantReadWriteLock();
testStampedLock(); }
public static void testStampedLock() { ExecutorService executorService = Executors.newFixedThreadPool(100); ExecutorService executorServiceWrite = Executors.newFixedThreadPool(3); CountDownLatch latch = new CountDownLatch(read + write); long l = System.currentTimeMillis(); for (int i = 0; i < read; i++) { executorService.execute(() -> {
readStampedLock(); latch.countDown(); }); } for (int i = 0; i < write; i++) { executorServiceWrite.execute(() -> { writeStampedLock(); latch.countDown();
}); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } executorService.shutdown(); executorServiceWrite.shutdown(); System.out.println("testStampedLock执行耗时:" + (System.currentTimeMillis() - l)); }
public static void testReentrantLock() { ExecutorService executorService = Executors.newFixedThreadPool(100); ExecutorService executorServiceWrite = Executors.newFixedThreadPool(3); CountDownLatch latch = new CountDownLatch(read + write); long l = System.currentTimeMillis(); for (int i = 0; i < read; i++) { executorService.execute(() -> { read(); latch.countDown(); }); } for (int i = 0; i < write; i++) { executorServiceWrite.execute(() -> { write(); latch.countDown(); }); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } executorService.shutdown(); executorServiceWrite.shutdown(); System.out.println("testReentrantLock执行耗时:" + (System.currentTimeMillis() - l)); }
public static void testReentrantReadWriteLock() { ExecutorService executorService = Executors.newFixedThreadPool(100); ExecutorService executorServiceWrite = Executors.newFixedThreadPool(3); CountDownLatch latch = new CountDownLatch(read + write); long l = System.currentTimeMillis(); for (int i = 0; i < read; i++) { executorService.execute(() -> { readLock(); latch.countDown(); }); } for (int i = 0; i < write; i++) { executorServiceWrite.execute(() -> { writeLock(); latch.countDown();
}); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } executorService.shutdown(); executorServiceWrite.shutdown(); System.out.println("testReentrantReadWriteLock执行耗时:" + (System.currentTimeMillis() - l)); }
public static void tryOptimisticRead() { long stamp = stampedLock.tryOptimisticRead(); try { Thread.sleep(mills); if (!stampedLock.validate(stamp)) { long readLock = stampedLock.readLock(); try { } finally { stampedLock.unlock(readLock); } } } catch (InterruptedException e) { e.printStackTrace(); } }
public static void readStampedLock() { long stamp = stampedLock.readLock(); try { Thread.sleep(mills); } catch (InterruptedException e) { e.printStackTrace(); } finally { stampedLock.unlock(stamp); } }
public static void writeStampedLock() { long stamp = stampedLock.writeLock(); try { Thread.sleep(mills); } catch (InterruptedException e) { e.printStackTrace(); } finally { stampedLock.unlock(stamp); } }
public static void readLock() { readWriteLock.readLock().lock(); try { Thread.sleep(mills); } catch (InterruptedException e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } }
public static void writeLock() { readWriteLock.writeLock().lock(); try { Thread.sleep(mills); } catch (InterruptedException e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } }
public static void read() { lock.lock(); try { Thread.sleep(mills); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
public static void write() { lock.lock(); try { Thread.sleep(mills); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
|
执行结果
1 2 3
| testReentrantLock执行耗时:15868 testReentrantReadWriteLock执行耗时:218 testStampedLock执行耗时:221
|
根据执行结果可以明显看出在读多写少的情况下,ReentrantLock的性能是比较差的,而ReentrantReadWriteLock和StampedLock性能差不多相同,而StampedLock主要是为了解决ReentrantReadWriteLock可能出现的锁饥饿问题。
2.2 StampedLock总结
StampedLock的特点
所有获取锁的方法,都返回一个邮戳( Stamp) , Stamp为零表示获取失败,其余都表示成功;
所有释放锁的方法,都需要一个邮戳(Stamp),这个Stamp必须是和成功获取锁时得到的Stamp一致;
StampedLock是不可重入的,危险(如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁)
StampedLock有三种访问模式
Reading (读模式悲观):功能和ReentrantReadWriteLock的读锁类似
Writing(写模式):功能和ReentrantRedWriteLock的写锁类似
Optimistic reading(乐观读模式):无锁机制,类似于数据库中的乐观锁,支持读写并发,很乐观认对为读取时没人修改,假如被修改再实现升级为悲观读模式
主要API
tryOptimisticRead():加乐观读锁
validate(long stamp):校验乐观读锁执行过程中有无写锁搅局
StampedLock的缺点
StampedLock 不支持重入,没有Re开头
StampedLock的悲观读锁和写锁都不支持条件变量(Condition),这个也需要注意。
使用StampedLock一定不要调用中断操作,即不要调用interrupt()方法
三个工具
①. CountDownLatch(闭锁) 做减法
- ①. CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞
- ②. 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞)
- ③. 计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class CountDownLatchDemo { public static void main(String[] args) throws Exception{
CountDownLatch countDownLatch=new CountDownLatch(6); for (int i = 1; i <=6; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t"); countDownLatch.countDown(); },i+"").start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName()+"\t班长关门走人,main线程是班长"); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| public enum CountryEnum {
one(1,"齐"),two(2,"楚"),three(3,"燕"), four(4,"赵"),five(5,"魏"),six(6,"韩");
private Integer retCode; private String retMessage;
private CountryEnum(Integer retCode,String retMessage){ this.retCode=retCode; this.retMessage=retMessage; }
public static CountryEnum getCountryEnum(Integer index){ CountryEnum[] countryEnums = CountryEnum.values(); for (CountryEnum countryEnum : countryEnums) { if(countryEnum.getRetCode()==index){ return countryEnum; } } return null; }
public Integer getRetCode() { return retCode; }
public String getRetMessage() { return retMessage; } }
public class CountDownLatchDemo { public static void main(String[] args) throws Exception{ CountDownLatch countDownLatch=new CountDownLatch(6); for (int i = 1; i <=6; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t"+"**国,被灭"); countDownLatch.countDown(); },CountryEnum.getCountryEnum(i).getRetMessage()).start();
} countDownLatch.await(); System.out.println(Thread.currentThread().getName()+"\t"+"**秦国一统江湖"); } }
|
- ⑤. 实验CountDownLatch去解决时间等待问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class AtomicIntegerDemo { AtomicInteger atomicInteger=new AtomicInteger(0); public void addPlusPlus(){ atomicInteger.incrementAndGet(); } public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch=new CountDownLatch(10); AtomicIntegerDemo atomic=new AtomicIntegerDemo(); for (int i = 1; i <= 10; i++) { new Thread(()->{ try{ for (int j = 1; j <= 100; j++) { atomic.addPlusPlus(); } }finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName()+"\t"+"获取到的result:"+atomic.atomicInteger.get()); } }
|
- ①. CyclicBarrier的字面意思是可循环(Cyclic) 使用的屏障(barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫做同步点)时被阻塞,知道最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await()方法
- ②. 代码验证:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{ System.out.println("召唤龙珠"); }); for (int i = 1; i <=7; i++) { final int temp=i; new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t收集到了第"+temp+"颗龙珠"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); }
} }
|
- ①. acquire(获取)当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。
- ②. release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
- ③. 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
- ④. 代码验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class SemaphoreDemo { public static void main(String[] args) { Semaphore semaphore=new Semaphore(3); for (int i = 1; i <=6; i++) { new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"\t抢占了车位"); semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"\t离开了车位"); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release(); } },String.valueOf(i)).start(); } } }
|