JUC 指的是 java.util
三个并发编程工具包
java.util.concurrent
java.util.concurrent.atomic
java.util.concurrent.locks
[toc]
偏向锁、轻量级锁、重量级锁 偏向锁 (默认开启)
用 ThreadID 替换 Mark Word。有利于冲突很少的场景 虚拟机默认在 4s 后从无锁变为偏向锁 有其他线程访问时升级为轻量级锁 此种情况没有地方存储哈希值,当调用 hashcode()
时会撤销偏向锁 批量重偏向:撤销偏向锁次数超过阈值(20),JVM 会进行批量重偏向 批量撤销:撤销偏向锁次数超过阈值(40),JVM 会撤销所有的偏向锁,且之后创建的对象默认没有偏向锁
轻量级锁 进入 synchronized
代码块: 若对象的 Mark Word 的后三位为 001(无锁),将对象的 Mark Word 转移到栈帧的 Lock Record 区域中(Lock Record 区域还有一个指向该对象的引用),该对象的 Mark Word 后两位标记为 00,前 30 位改为 Lock Record 地址 若对象的 Mark Word 的后两位为 00(轻量级锁),则检查对象的 Mark Word 是否指向当前线程的栈帧,如果是,则该栈帧的 Lock Record 区域中置为 null 和对象的引用,否则表明有多个线程竞争锁【升级成重量级锁,自旋尝试失败后将自己加入 EntryList】
退出 synchronized
代码块: 若 Lock Record 锁记录为 null,重置锁记录,表示重入计数减一 若 Lock Record 锁记录不为 null,尝试用 CAS 恢复。若失败,则表示进行了锁膨胀或升级为重量级锁
重量级锁 进入 synchronized
代码块: 申请 monitor对象,将 Java 对象的 Mark Word 中前 30 位指向对应 monitor 对象,后两位置为 11。monitor 对象位于操作系统,其 Owner 属性指明当前占有锁的对象,EntryList 属性保存等待该锁而被阻塞的线程队列
退出 synchronized
代码块: 根据 Java 对象的 Mark Word 找到 monitor 对象,将 Owner 置空,唤醒 EntryList 中 BLOCKED 线程
32 位 JVM 普通对象
Mark Word(32位)
表示的锁状态
hashcode(25位)
age(4位)
biased_lock: 0
lock: 01
Normal
thread(23位)
epoch(2位)
age(4位)
biased_lock: 1
lock: 01
Biased
ptr_to_lock_record(30位)
lock: 00
Lightweight Locked
ptr_to_heavyweight_monitor(30位)
lock: 10
Heavyweight Locked
lock: 11
Marked for GC
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 package com.pl.demo1_locking;import lombok.extern.slf4j.Slf4j;@Slf4j(topic = "c.TestState") public class Test { public synchronized static void f1 () { } public static void f2 () { synchronized (Test.class) { } } static int count = 0 ; public static void main (String[] args) throws InterruptedException { Room room = new Room (); Thread t1 = new Thread (() -> { for (int i = 0 ; i < 5000 ; i++) { room.increment(); } }, "t1" ); Thread t2 = new Thread (() -> { for (int i = 0 ; i < 5000 ; i++) { room.decrement(); } }, "t2" ); t1.start(); t2.start(); t1.join(); t2.join(); log.debug("" +room.getCount()); } } class Room { private int count = 0 ; public synchronized void decrement () { count--; } public void increment () { synchronized (this ) { count++; } } public synchronized int getCount () { return count; } }
sleep() 和 wait()
sleep()
是 Thread
类的 static
方法,wait()
是 Object
的方法
sleep()
不需要强制和 synchronized
配合,wait()
必须获得对象锁和 synchronized
一起使用
sleep()
睡眠时不会释放对象锁,wait()
在等待时释放对象锁
线程状态都是 TIMED_WAITING
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 package com.pl.demo2_sleepAndWait;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.LockSupport;import static com.pl.util.Sleeper.sleep;@Slf4j(topic = "c.test") class A { static final Object o = new Object (); public static void main (String[] args) throws InterruptedException { new Thread (() -> { synchronized (o) { log.debug("begin" ); sleep(2.5 ); log.debug("other" ); } }, "t0" ).start(); for (int i = 0 ; i < 5 ; i++) { new Thread (() -> { synchronized (o) { log.debug("1111" ); } }, "t" + (i + 1 )).start(); } } } @Slf4j(topic = "c.test") public class Test { static final Object o = new Object (); public static void main (String[] args) { LockSupport.park(); new Thread (() -> { log.debug("begin" ); synchronized (o) { try { o.wait(); } catch (InterruptedException e) { throw new RuntimeException (e); } } log.debug("other" ); }, "t1" ).start(); new Thread (() -> { log.debug("begin" ); synchronized (o) { try { o.wait(500 ); } catch (InterruptedException e) { throw new RuntimeException (e); } } log.debug("other" ); }, "t2" ).start(); new Thread (() -> { log.debug("begin" ); synchronized (o) { try { o.wait(2000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } } log.debug("other" ); }, "t3" ).start(); sleep(1 ); synchronized (o) { o.notifyAll(); } } }
保护性暂停 设计模式:保护性暂停(用于一个线程等待另一个线程的执行结果)
一个结果需要从一个线程传递到另一个线程,可关联同一个 GuardedObject
多个结果不断从一个线程传递到另一个线程,可使用消息队列(生产者消费者)
join
、Future
的实现采用的就是此模式
GuardedObject 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package com.pl.demo3_guardedSuspension;import lombok.extern.slf4j.Slf4j;@Slf4j(topic = "c.test") public class GuardedObjectTest { public static void main (String[] args) { GuardedObject guardedObject = new GuardedObject (); new Thread (() -> { Object o = guardedObject.get(5000 ); if (o != null ) { log.debug("下载完成" ); } else { log.debug("下载失败" ); } }, "t1" ).start(); new Thread ( () -> { log.debug("准备下载" ); guardedObject.complete(); }, "t2" ).start(); } } @Slf4j(topic = "c.test") class GuardedObject { private Object response; public Object get (long timeout) { long begin = System.currentTimeMillis(); long delay = 0 ; synchronized (this ) { while (response == null ) { if (delay > timeout) { log.debug("超时!!" ); break ; } log.debug("开始等待" ); try { wait(timeout - delay); } catch (InterruptedException e) { e.printStackTrace(); } delay = System.currentTimeMillis() - begin; } } return response; } public Object get () { return get(0 ); } public void complete () { synchronized (this ) { try { log.debug("开始下载" ); wait(4000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } this .response = 1 ; this .notifyAll(); } }
生产者消费者消息队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 package com.pl.demo3_guardedSuspension;import lombok.extern.slf4j.Slf4j;import java.util.Deque;import java.util.LinkedList;public class ProducerConsumerQueueTest { public static void main (String[] args) { MessageQueue messageQueue = new MessageQueue (3 ); for (int i = 0 ; i < 5 ; i++) { int id = i; new Thread (() -> { messageQueue.put(new Message (id, "内容" + id)); }, "Producer-" + i).start(); }; for (int i = 0 ; i < 5 ; i++) { new Thread (() -> { messageQueue.take(); }, "Consumer-" + i).start(); } } } @Slf4j(topic = "c.test") class MessageQueue { static final Deque<Message> list = new LinkedList <>(); private int capacity = 0 ; public MessageQueue (int capacity) { this .capacity = capacity; } public Message take () { synchronized (list) { while (list.isEmpty()) { try { log.debug("队列空,无法获取" ); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = list.remove(); log.debug("消耗了:" + message); list.notifyAll(); return message; } } public void put (Message message) { synchronized (list) { while (list.size() >= capacity) { try { log.debug("队列满,等待消耗" ); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("添加了:" + message); list.addLast(message); list.notifyAll(); } } } class Message { private final int id; private final Object value; public Message (int id, Object value) { this .id = id; this .value = value; } public int getId () { return id; } public Object getValue () { return value; } @Override public String toString () { return "Message{" + "id=" + id + ", value=" + value + '}' ; } }
LockSupport.park()、LockSupport.unpark() 每个线程都有自己的一个 Parker 对象,包括 _counter, _cond, _mutex
LockSupport.park()
: 1、当前线程调用 Unsafe.park()
2、若 _counter 为 0,获得 _mutex 互斥锁并继续 3、线程进入 _cond 条件变量阻塞 4、_counter 设置为 0
LockSupport.unpark(t1)
1、当前线程调用 Unsafe.unpark(t1)
,_counter 设置为 1 2、唤醒 _cond 条件变量中的 t1 3、t1 恢复运行 4、_counter 设置为 0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.pl.demo4_park;import lombok.extern.slf4j.Slf4j;import java.util.HashMap;import java.util.concurrent.locks.LockSupport;import static com.pl.util.Sleeper.sleep;@Slf4j(topic = "c.test") public class Test { public static void main (String[] args) { Thread t1 = new Thread (() -> { log.debug("start" ); sleep(1 ); log.debug("park" ); LockSupport.park(); log.debug("resume" ); }, "t1" ); t1.start(); sleep(2 ); LockSupport.unpark(t1); } }
死锁、活锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package com.pl.demo5_deadLockAndLiveLock;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import static com.pl.util.Sleeper.sleep;@Slf4j(topic = "c.test") public class Test { public static void main (String[] args) { testLiveLock(); } public static void testDeadLock () { Object lockA = new Object (); Object lockB = new Object (); new Thread (() -> { synchronized (lockA) { log.debug("enter lockA" ); sleep(1 ); synchronized (lockB) { log.debug("enter lockB" ); } } }, "t1" ).start(); new Thread (() -> { synchronized (lockB) { log.debug("enter lockB" ); sleep(0.5 ); synchronized (lockA) { log.debug("enter lockA" ); } } }, "t2" ).start(); } static Integer count = 50 ; public static void testLiveLock () { new Thread (() -> { while (count > 0 ) { sleep(0.2 ); count--; log.debug("count = {} " , count); } }, "t1" ).start(); new Thread (() -> { while (count < 100 ) { sleep(0.2 ); count++; log.debug("count = {} " , count); } }, "t2" ).start(); } }
ReentrantLock 的性质 可重入 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.pl.demo6_reentrantLock;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.ReentrantLock;@Slf4j(topic = "c.test") public class ReentrantTest { private static final java.util.concurrent.locks.ReentrantLock lock = new ReentrantLock (); public static void main (String[] args) { lock.lock(); try { log.debug("enter main" ); m1(); } finally { lock.unlock(); } } private static void m1 () { lock.lock(); try { log.debug("enter m1" ); m2(); } finally { lock.unlock(); } } private static void m2 () { lock.lock(); try { log.debug("enter m2" ); } finally { lock.unlock(); } } }
可打断 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package com.pl.demo6_reentrantLock;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.ReentrantLock;import static com.pl.util.Sleeper.sleep;@Slf4j(topic = "c.test") public class InterruptibleTest { private static final java.util.concurrent.locks.ReentrantLock lock = new ReentrantLock (); public static void main (String[] args) { Thread t1 = new Thread (() -> { try { log.debug("trying to get lock" ); lock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); log.debug("fail to get lock" ); return ; } try { log.debug("get lock" ); } finally { lock.unlock(); } }, "t1" ); lock.lock(); t1.start(); sleep(2 ); t1.interrupt(); } }
锁超时 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package com.pl.demo6_reentrantLock;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantLock;import static com.pl.util.Sleeper.sleep;@Slf4j(topic = "c.test") public class TimeoutTest { private static final java.util.concurrent.locks.ReentrantLock lock = new ReentrantLock (); public static void main (String[] args) { Thread t1 = new Thread (() -> { log.debug("trying to get lock" ); try { if (!lock.tryLock(5 , TimeUnit.SECONDS)) { log.debug("fail to get lock" ); return ; } } catch (InterruptedException e) { e.printStackTrace(); return ; } try { log.debug("get lock" ); } finally { lock.unlock(); } }, "t1" ); log.debug("get lock" ); lock.lock(); t1.start(); sleep(3 ); lock.unlock(); } }
公平锁 1 2 3 4 5 6 7 8 9 10 11 package com.pl.demo6_reentrantLock;import java.util.concurrent.locks.ReentrantLock;public class FairTest { public static void main (String[] args) { new ReentrantLock (true ); } }
条件变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 package com.pl.demo6_reentrantLock;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;import static com.pl.util.Sleeper.sleep;@Slf4j(topic = "c.test") public class ConditionTest { static ReentrantLock room = new ReentrantLock (); static boolean hasCon1 = false ; static boolean hasCon2 = false ; static Condition condition1 = room.newCondition(); static Condition condition2 = room.newCondition(); public static void main (String[] args) { new Thread (() -> { room.lock(); try { log.debug("state of con1: " + hasCon1); if (!hasCon1) { log.debug("con1 is not satisfied. waiting..." ); try { condition1.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("con1 is satisfied." ); } finally { room.unlock(); } }, "t1" ).start(); new Thread (() -> { room.lock(); try { log.debug("state of con2: " + hasCon2); if (!hasCon2) { log.debug("con2 is not satisfied. waiting..." ); try { condition2.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("con2 is satisfied." ); } finally { room.unlock(); } }, "t2" ).start(); sleep(2 ); new Thread (() -> { room.lock(); try { hasCon1 = true ; condition1.signalAll(); } finally { room.unlock(); } }, "tt1" ).start(); sleep(3 ); new Thread (() -> { room.lock(); try { hasCon2 = true ; condition2.signalAll(); } finally { room.unlock(); } }, "tt2" ).start(); } }
应用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 package com.pl.demo6_reentrantLock;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.ReentrantLock;public class PhilosopherEating { public static void main (String[] args) { Chopstick c1 = new Chopstick ("c1" ); Chopstick c2 = new Chopstick ("c2" ); Chopstick c3 = new Chopstick ("c3" ); Chopstick c4 = new Chopstick ("c4" ); Chopstick c5 = new Chopstick ("c5" ); new Philosopher ("张三" , c1, c2).start(); new Philosopher ("李四" , c2, c3).start(); new Philosopher ("王五" , c3, c4).start(); new Philosopher ("赵六" , c4, c5).start(); new Philosopher ("孙七" , c5, c1).start(); } } @Slf4j(topic = "c.phi") class Philosopher extends Thread { private final Chopstick leftChop; private final Chopstick rightChop; public Philosopher (String name, Chopstick leftChop, Chopstick rightChop) { this .setName(name); this .leftChop = leftChop; this .rightChop = rightChop; } @Override public void run () { while (true ) { log.debug(this .getName() + " is trying to get " + leftChop.getName()); if (leftChop.tryLock()) { log.debug(this .getName() + " get " + leftChop.getName()); try { log.debug(this .getName() + " is trying to get " + rightChop.getName()); if (rightChop.tryLock()) { log.debug(this .getName() + " get " + rightChop.getName()); try { eat(); } finally { log.debug(this .getName() + " drop " + rightChop.getName()); rightChop.unlock(); } } else { log.debug(this .getName() + " can't get " + rightChop.getName()); } } finally { log.debug(this .getName() + " drop " + leftChop.getName()); leftChop.unlock(); } } else { log.debug(this .getName() + " can't get " + leftChop.getName()); } } } private void eat () { log.debug("eating" ); Sleeper.sleep(0.2 ); } } class Chopstick extends ReentrantLock { private final String name; public Chopstick (String name) { this .name = name; } public String getName () { return name; } }
实现交替执行 传统的 wait()、notify() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package com.pl.demo7_alternateExecution;public class Test { public static void main (String[] args) { WaitNotify wn = new WaitNotify (1 , 5 ); new Thread (() -> { wn.print("a" , 1 , 2 ); }, "t1" ).start(); new Thread (() -> { wn.print("b" , 2 , 3 ); }, "t2" ).start(); new Thread (() -> { wn.print("c" , 3 , 1 ); }, "t3" ).start(); } } class WaitNotify { public void print (String s, int waitFlag, int nextFlag) { for (int i = 0 ; i < loopNumber; i++) { synchronized (this ) { while (flag != waitFlag) { try { wait(); } catch (InterruptedException e) { throw new RuntimeException (e); } } System.out.print(s); flag = nextFlag; this .notifyAll(); } } } private int flag; private final int loopNumber; public WaitNotify (int flag, int loopNumber) { this .flag = flag; this .loopNumber = loopNumber; } }
park()、unpark() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package com.pl.demo7_alternateExecution;import com.pl.util.Sleeper;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.LockSupport;public class TestPark { static Thread t1, t2, t3; public static void main (String[] args) { ParkUnpark pu = new ParkUnpark (5 ); t1 = new Thread (() -> { pu.print("a" , t2); }); t2 = new Thread (() -> { pu.print("b" , t3); }); t3 = new Thread (() -> { pu.print("c" , t1); }); t1.start(); t2.start(); t3.start(); LockSupport.unpark(t1); } } class ParkUnpark { public void print (String s, Thread next) { for (int i = 0 ; i < loopNumber; i++) { LockSupport.park(); System.out.print(s); LockSupport.unpark(next); } } private final int loopNumber; public ParkUnpark (int loopNumber) { this .loopNumber = loopNumber; } }
ReentrantLock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package com.pl.demo7_alternateExecution;import com.pl.util.Sleeper;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;public class TestReen { public static void main (String[] args) { AwaitSignal as = new AwaitSignal (5 ); Condition c1 = as.newCondition(); Condition c2 = as.newCondition(); Condition c3 = as.newCondition(); new Thread (() -> { as.print("a" , c1, c2); },"t1" ).start(); new Thread (() -> { as.print("b" , c2, c3); },"t2" ).start(); new Thread (() -> { as.print("c" , c3, c1); },"t3" ).start(); Sleeper.sleep(1 ); as.lock(); try { c1.signalAll(); } finally { as.unlock(); } } } class AwaitSignal extends ReentrantLock { public void print (String s, Condition curr, Condition next) { for (int i = 0 ; i < loopNumber; i++) { lock(); try { curr.await(); System.out.print(s); next.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { unlock(); } } } private final int loopNumber; public AwaitSignal (int loopNumber) { this .loopNumber = loopNumber; } }
volitale volatile
保证可见性、有序性原理:内存屏障(Memory Barrier/ Memory Fence) 对 volatile
变量的写指令后会加入写屏障,屏障前的对共享变量的改写操作,会同步到主存。不会将写屏障之前的代码排在写屏障后 对 volatile
变量的读指令前会加入读屏障,屏障后的对共享变量的读取操作,会从主存中加载。不会将读屏障之后的代码排在读屏障前
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.pl.demo8_volitale;import com.pl.util.Sleeper;public class Test { static volatile boolean flag = true ; public static void main (String[] args) { new Thread (() -> { while (flag) { } }, "t1" ).start(); Sleeper.sleep(1 ); flag = false ; } }
Atomic AtomicInteger 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 package com.pl.demo9_atomic;import java.util.ArrayList;import java.util.concurrent.atomic.AtomicInteger;public class AtomicTest { public static void main (String[] args) { Account.demo(new AccountUnsafe (10000 )); Account.demo(new AccountCAS (10000 )); AtomicInteger i = new AtomicInteger (0 ); System.out.println(i.addAndGet(1 )); System.out.println(i.incrementAndGet()); System.out.println(i.updateAndGet(value -> value + 1 )); System.out.println(i.get()); System.out.println(i.getAndAdd(1 )); System.out.println(i.getAndIncrement()); System.out.println(i.getAndUpdate(value -> value + 1 )); System.out.println(i.get()); } } class AccountCAS implements Account { private AtomicInteger balance; public AccountCAS (Integer balance) { this .balance = new AtomicInteger (balance); } @Override public Integer getBalance () { return balance.get(); } @Override public void withdraw (Integer amount) { while (true ) { int prev = balance.get(); int next = prev - amount; if (balance.compareAndSet(prev, next)) { break ; } } } } class AccountUnsafe implements Account { Integer balance; public AccountUnsafe (Integer balance) { this .balance = balance; } @Override public synchronized Integer getBalance () { return balance; } @Override public synchronized void withdraw (Integer amount) { balance -= amount; } } interface Account { Integer getBalance () ; void withdraw (Integer amount) ; static void demo (Account account) { ArrayList<Thread> threads = new ArrayList <>(); for (int i = 0 ; i < 1000 ; i++) { threads.add(new Thread (() -> { account.withdraw(10 ); })); } long start = System.nanoTime(); threads.forEach(Thread::start); threads.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + ", " + (end - start) / 1000 ); } }
AtomicReference 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 package com.pl.demo9_atomic;import java.math.BigDecimal;import java.util.ArrayList;import java.util.concurrent.atomic.AtomicReference;public class AtomicReferenceTest { public static void main (String[] args) { DecimalAccount.demo(new DecimalAccountImp (new BigDecimal ("1" ))); } } class DecimalAccountImp implements DecimalAccount { private AtomicReference<BigDecimal> balance; public DecimalAccountImp (BigDecimal balance) { this .balance = new AtomicReference <>(balance); } @Override public BigDecimal getBalance () { return balance.get(); } @Override public void withdraw (BigDecimal amount) { while (true ) { BigDecimal prev = balance.get(); BigDecimal next = prev.subtract(amount); if (balance.compareAndSet(prev, next)) { break ; } } } } interface DecimalAccount { BigDecimal getBalance () ; void withdraw (BigDecimal amount) ; static void demo (DecimalAccount account) { ArrayList<Thread> threads = new ArrayList <>(); for (int i = 0 ; i < 1000 ; i++) { threads.add(new Thread (() -> { account.withdraw(new BigDecimal ("0.001" )); })); } long start = System.nanoTime(); threads.forEach(Thread::start); threads.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + ", " + (end - start) / 1000 ); } }
AtomicStampedReference 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package com.pl.demo9_atomic;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicStampedReference;@Slf4j(topic = "c.test") public class AtomicStampedReferenceTest { static AtomicStampedReference<String> ref = new AtomicStampedReference <String>("AAA" , 0 ); public static void main (String[] args) { int stamp = ref.getStamp(); log.debug("before sleeping : {}" , stamp); other(); Sleeper.sleep(1 ); log.debug("after sleeping: {}" , ref.getStamp()); log.debug("try changing AAA to CCC, result: {}" , ref.compareAndSet("AAA" , "BBB" , stamp, stamp + 1 )); } private static void other () { new Thread (() -> { int stamp = ref.getStamp(); log.debug("before: {}" , stamp); log.debug("try changing AAA to CCC, result: {}" , ref.compareAndSet("AAA" , "CCC" , stamp, stamp + 1 )); log.debug("after: {}" , ref.getStamp()); }, "t1" ).start(); Sleeper.sleep(0.5 ); new Thread (() -> { int stamp = ref.getStamp(); log.debug("before: {}" , stamp); log.debug("try changing CCC to AAA, result: {}" , ref.compareAndSet("CCC" , "AAA" , stamp, stamp + 1 )); log.debug("after: {}" , ref.getStamp()); }, "t2" ).start(); } }
LongAdder 原子累加器,设置多个累加单元 cell,累加时操作不同的 cell 变量,最后将结果汇总,减少了 CAS 尝试失败
速度比普通的 AtomicLong 快很多
使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 package com.pl.demo10_longAdder;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;import java.util.concurrent.atomic.AtomicLong;import java.util.concurrent.atomic.LongAdder;import java.util.function.Consumer;import java.util.function.Supplier;@Slf4j(topic = "c.test") public class Test { public static void main (String[] args) { for (int i = 0 ; i < 5 ; i++) { demo( AtomicLong::new , AtomicLong::getAndIncrement ); } log.debug("--------------------" ); for (int i = 0 ; i < 5 ; i++) { demo( LongAdder::new , LongAdder::increment ); } } private static <T> void demo (Supplier<T> adderSupplier, Consumer<T> action) { T adder = adderSupplier.get(); ArrayList<Thread> threads = new ArrayList <>(); for (int i = 0 ; i < 4 ; i++) { threads.add(new Thread (() -> { for (int j = 0 ; j < 500_000 ; j++) { action.accept(adder); } }, "t" + i)); } long start = System.nanoTime(); threads.forEach(Thread::start); for (Thread thread : threads) { try { thread.join(); } catch (InterruptedException e) { throw new RuntimeException (e); } } long end = System.nanoTime(); log.debug("{}, cost: {} ms" , adder, (end - start) / 1000_000 ); } }
LongAdder 源码 三个核心属性:cells、base、cellsBusy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 transient volatile Cell[] cells;transient volatile long base;transient volatile int cellsBusy;
add()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 public void add (long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null , uncontended); } }
longAccumulate()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 final void longAccumulate (long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0 ) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true ; } boolean collide = false ; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { Cell r = new Cell (x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false ; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created) break ; continue ; } } collide = false ; } else if (!wasUncontended) wasUncontended = true ; else if (a.cas(v = a.value, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; else if (n >= NCPU || cells != as) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell [n << 1 ]; for (int i = 0 ; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false ; try { if (cells == as) { Cell[] rs = new Cell [2 ]; rs[h & 1 ] = new Cell (x); cells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } else if (casBase(v = base, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; } }
sum()
方法
1 2 3 4 5 6 7 8 9 10 11 12 public long sum () { Cell[] as = cells; Cell a; long sum = base; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
利用 AtomicIntegerArray 实现连接池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 package com.pl.demo11_customPool;import lombok.extern.slf4j.Slf4j;import java.sql.*;import java.util.Map;import java.util.Properties;import java.util.Random;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicIntegerArray;public class CustomPoolTest { public static void main (String[] args) { Pool pool = new Pool (2 ); int threadCount = 5 ; for (int i = 0 ; i < threadCount; i++) { new Thread (() -> { Connection connection = pool.borrow(); try { Thread.sleep(new Random ().nextInt(2000 )); } catch (InterruptedException e) { e.printStackTrace(); } finally { pool.free(connection); } }, "t" + (i + 1 )).start(); } } } @Slf4j(topic = "c.Test") class Pool { private final int poolSize; private Connection[] connections; private AtomicIntegerArray states; public Pool (int poolSize) { this .poolSize = poolSize; this .connections = new Connection [poolSize]; this .states = new AtomicIntegerArray (new int [poolSize]); for (int i = 0 ; i < poolSize; i++) { connections[i] = new MockConnection (); } } public Connection borrow () { while (true ) { for (int i = 0 ; i < poolSize; i++) { if (states.get(i) == 0 ) { if (states.compareAndSet(i, 0 , 1 )) { log.debug("get conn[" + i + "]" ); return connections[i]; } } } synchronized (this ) { try { log.debug("waiting..." ); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public void free (Connection connection) { for (int i = 0 ; i < poolSize; i++) { if (connections[i] == connection) { states.set(i, 0 ); log.debug("free conn[" + i + "]" ); synchronized (this ) { this .notifyAll(); } } } } class MockConnection implements Connection { ...... } }
final 关键字 final
: 1)修饰的类不能被继承 2)修饰的方法不能被子类重写 3)修饰的变量一旦赋值就不能修改(表现为基本数据类型的值不能发生变化,引用类型变量所引用的地址不会发生改变)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class FinalTest { static final int A = 10 ; static final int B = Short.MAX_VALUE + 1 ; final int a = 20 ; final int b = Integer.MAX_VALUE; static int C = 15 ; int c = 30 ; } class UseFinal { void test () { System.out.println(FinalTest.A); System.out.println(FinalTest.B); System.out.println(new FinalTest ().a); System.out.println(new FinalTest ().b); System.out.println(FinalTest.C); System.out.println(new FinalTest ().c); } }
如果一个变量满足以下三个条件时,该变量就会成为一个宏变量: 1)被 final
修饰 2)在定义该 final
变量时就指定了初始值 3)该初始值在编译时就能够唯一指定
编译器会把程序所有用到宏变量的地方直接替换成该变量的值,这就是宏替换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.pl.demo12_final;public class MacroVariableTest { public static void main (String[] args) { String hw = "hello world" ; String hello = "hello" ; final String finalWorld2 = "hello" ; final String finalWorld3 = hello; final String finalWorld4 = "he" + "llo" ; String hw1 = hello + " world" ; String hw2 = finalWorld2 + " world" ; String hw3 = finalWorld3 + " world" ; String hw4 = finalWorld4 + " world" ; System.out.println(hw == hw1); System.out.println(hw == hw2); System.out.println(hw == hw3); System.out.println(hw == hw4); } }
线程池 自定义线程池 taskQueue:任务队列 workers:核心线程的包装 rejectPolicy:拒绝策略 coreSize:最大核心数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 package com.pl.demo13_threadPool;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.ArrayDeque;import java.util.Deque;import java.util.HashSet;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;@Slf4j(topic = "c.test") public class MyThreadPoolTest { public static void main (String[] args) { ThreadPool threadPool = new ThreadPool (2 , 3 , (queue, task) -> { queue.offer(task, 500 , TimeUnit.MILLISECONDS); }); for (int i = 0 ; i < 8 ; i++) { int j = i; threadPool.execute(() -> { Sleeper.sleep(1 ); log.debug("{}" ,j); }); } } } @FunctionalInterface interface RejectPolicy <T> { void reject (BlockingQueue<T> blockingQueue, T task) ; } @Slf4j(topic = "c.test") class ThreadPool { private final BlockingQueue<Runnable> taskQueue; private final HashSet<Worker> workers = new HashSet <>(); private final RejectPolicy<Runnable> rejectPolicy; private final int coreSize; public void execute (Runnable task) { synchronized (workers) { if (workers.size() < coreSize) { Worker worker = new Worker (task); workers.add(worker); log.debug("新增worker: {} 用以执行 {}" , worker, task); worker.start(); } else { taskQueue.tryPut(rejectPolicy, task); } } } public ThreadPool (int coreSize, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) { this .coreSize = coreSize; this .taskQueue = new BlockingQueue <>(queueCapacity); this .rejectPolicy = rejectPolicy; } class Worker extends Thread { private Runnable task; public Worker (Runnable task) { this .task = task; } @Override public void run () { while (task != null || (task = taskQueue.take(10 , TimeUnit.SECONDS)) != null ) { try { log.debug("开始执行 {}" , task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null ; } } synchronized (workers) { workers.remove(this ); log.debug("移除 {}" , this ); } } } } @Slf4j(topic = "c.test") class BlockingQueue <T> { private final Deque<T> queue = new ArrayDeque <>(); private final ReentrantLock lock = new ReentrantLock (); private final Condition fullWaitSet = lock.newCondition(); private final Condition emptyWaitSet = lock.newCondition(); private final int capacity; public BlockingQueue (int capacity) { this .capacity = capacity; } public void tryPut (RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if (queue.size() >= capacity) { rejectPolicy.reject(this , task); } else { queue.add(task); log.debug("将 {} 加入阻塞队列" , task); emptyWaitSet.signalAll(); } } finally { lock.unlock(); } } public T take (long timeout, TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.isEmpty()) { try { if (nanos <= 0 ) { return null ; } nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { throw new RuntimeException (e); } } T t = queue.removeFirst(); fullWaitSet.signalAll(); return t; } finally { lock.unlock(); } } public void put (T task) { lock.lock(); try { while (queue.size() >= capacity) { try { fullWaitSet.await(); } catch (InterruptedException e) { throw new RuntimeException (e); } } queue.add(task); log.debug("将 {} 加入阻塞队列" , task); emptyWaitSet.signalAll(); } finally { lock.unlock(); } } public boolean offer (T task, long timeout, TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size() >= capacity) { try { if (nanos < 0 ) { return false ; } nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { throw new RuntimeException (e); } } queue.add(task); log.debug("将 {} 加入阻塞队列" , task); emptyWaitSet.signalAll(); return true ; } finally { lock.unlock(); } } public int size () { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } }
JDK 提供的线程池 ThreadPoolExecutor
其中使用 AtomicInteger
类型的 ctl 的高 3 位标识线程池状态,低 29 位表示线程数量
状态名
对应 ctl 的高 3 位
说明
RUNNING
111
可接收新任务,可处理阻塞队列任务
SHUTDOWN
000
不接收新任务,可处理阻塞队列任务
STOP
001
不接收新任务,不理阻塞队列任务
TIDYING
010
任务全部执行完毕,活动线程为 0,即将进入终结
TERMINATED
011
已经终结
构造方法:
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize – 核心线程数(创建后就一直保留) maximumPoolSize – 最大线程数(核心线程数 + 救急线程数) keepAliveTime – 生存时间。针对救急线程 unit – 时间单位。针对救急线程的生存时间 workQueue – 阻塞队列 threadFactory – 线程工厂,主要用于为线程起名字 handler – 拒绝策略。当阻塞队列满,由救急线程执行任务,若救急线程也忙,则执行拒绝策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 @Slf4j(topic = "c.test") public class ThreadPoolExecutorTest { public static void main (String[] args) { ExecutorService pool1 = Executors.newFixedThreadPool(2 ); ExecutorService pool2 = Executors.newSingleThreadExecutor(); ExecutorService pool3 = Executors.newCachedThreadPool(); ScheduledExecutorService pool4 = Executors.newScheduledThreadPool(1 ); log.debug("begin" ); pool4.schedule(() -> { log.debug("task1" ); Sleeper.sleep(3 ); }, 1 , TimeUnit.SECONDS); pool4.schedule(() -> { log.debug("task2" ); int i = 1 / 0 ; }, 1 , TimeUnit.SECONDS); pool4.schedule(() -> { log.debug("task3" ); }, 1 , TimeUnit.SECONDS); } private static void synchronusQueueTest () { SynchronousQueue<Integer> integers = new SynchronousQueue <>(); new Thread (() -> { try { log.debug("putting {}" , 1 ); integers.put(1 ); log.debug("putted {}" , 1 ); log.debug("putting {}" , 2 ); integers.put(2 ); log.debug("putted {}" , 2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }, "t1" ).start(); Sleeper.sleep(1 ); new Thread (() -> { try { log.debug("taking {}" , 1 ); integers.take(); log.debug("took {}" , 1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }, "t2" ).start(); Sleeper.sleep(1 ); new Thread (() -> { try { log.debug("taking {}" , 2 ); integers.take(); log.debug("took {}" , 2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }, "t3" ).start(); } private static void testPool (ExecutorService pool1) { pool1.execute(() -> { log.debug("1" ); Sleeper.sleep(1 ); }); pool1.execute(() -> { log.debug("2" ); Sleeper.sleep(1 ); }); pool1.execute(() -> { log.debug("3" ); Sleeper.sleep(1 ); }); } }
锁相关概念 锁:"避免发生混乱"的机制
自旋锁和阻塞锁:
自旋锁是一种基于忙等待的锁机制,而阻塞锁是一种基于线程阻塞的锁机制。
自旋锁使用忙等待的策略,即当线程请求获取锁时,如果发现锁已被其他线程占用,该线程会一直在一个循环中自旋等待,直到锁可用为止,通常在低竞争情况下使用,以避免线程切换的开销;
阻塞锁同步机制,当线程请求获取锁时,如果发现锁已被其他线程占用,该线程会被阻塞并释放CPU资源,直到锁可用为止,通常在高竞争情况下使用,以避免资源浪费和提供更好的公平性。
乐观锁和悲观锁:
乐观锁假设并发冲突的概率低,在修改共享数据之前不会阻塞其他线程的访问,而是在更新数据时通过版本号、时间戳等机制来检测冲突并解决,如果检测到冲突,则进行回滚或重试操作,适用于读多写少的场景;
悲观锁假设并发冲突的概率高,线程在访问共享数据时会将其锁定,以防止其他线程同时访问或修改,保证数据的一致性,适用于写多读少或写写冲突频繁的场景。
可重入锁和不可重入锁:
可重入锁是一种允许同一个线程多次获取锁的机制,而不可重入锁不允许同一个线程多次获取锁。
可重入锁机制下,一个线程已经获得了锁,并且再次请求获取该锁时,请求会成功并计算锁的持有次数,线程需要释放相同次数的锁才能完全释放锁。可以避免死锁和提供更高的灵活性;
不可重入锁机制下,一个线程已经获得了锁,并且再次请求获取该锁时,请求会被阻塞,直到该锁被释放,可能导致线程在持有锁期间的阻塞或死锁。
偏向锁、轻量级锁和重量级锁:
偏向锁、轻量级锁和重量级锁是 JVM 在不同竞争情况下的渐进的锁优化升级机制
偏向锁针对无竞争情况进行优化,轻量级锁针对低竞争情况进行优化,而重量级锁针对高竞争情况进行优化。
共享锁和独占锁:
共享锁允许多个线程同时读取共享资源,而独占锁只允许一个线程独占访问共享资源。
共享锁机制下,多个线程可以同时持有共享锁,并行地读取数据,但在持有共享锁期间,其他线程无法获取独占锁。共享锁用于读多写少的场景,可以提高并发性能;
独占锁机制下,一个线程持有独占锁时,其他线程无法同时获取独占锁或共享锁,独占锁用于需要确保数据完整性和一致性的场景,一次只允许一个线程修改数据。
公平锁和非公平锁:
公平锁按照线程请求锁的顺序来获取锁,先到先得。非公平锁则允许后来的线程在竞争中插队。
公平锁提供了公平性,但可能导致线程切换开销增加;
非公平锁可能提供更好的吞吐量,但可能导致某些线程长时间无法获得锁。
表锁和行锁:
表锁是对整个数据库表进行锁定,行锁是对表中的单行记录进行锁定。
表锁的粒度较大,可能导致并发性能下降,但简单且适用于某些特定场景。
行锁的粒度较小,可以提高并发性能,但需要更复杂的锁管理机制。
Lock 接口与 AQS synchronized
无法破坏“不可抢占”条件。 因为 synchronized
申请资源时,若申请不到,线程就被阻塞了,此时阻塞态的线程无所作为,也释放不了线程已占有的资源。故提供另一种方式加锁:Lock
接口
自定义一种不可重入阻塞锁机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 package com.pl.demo14_lockAndAqs;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.*;@Slf4j(topic = "c.test") public class Test { public static void main (String[] args) { MyLock lock = new MyLock (); new Thread (() -> { lock.lock(); log.debug("locking" ); try { Sleeper.sleep(1 ); } finally { log.debug("unlocking" ); lock.unlock(); } }, "t1" ).start(); new Thread (() -> { lock.lock(); log.debug("locking" ); lock.lock(); log.debug("locking" ); try { Sleeper.sleep(1 ); } finally { log.debug("unlocking" ); lock.unlock(); } }, "t2" ).start(); new ReentrantLock (); } } class MyLock implements Lock { class MySync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } @Override protected boolean isHeldExclusively () { return getState() == 1 ; } public Condition newCondition () { return new ConditionObject (); } } private final MySync sync; public MyLock () { sync = new MySync (); } @Override public void lock () { sync.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return sync.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { sync.release(1 ); } @Override public Condition newCondition () { return sync.newCondition(); } }
ReentrantLock 原理 假设 T0 线程一直保持锁,使得后续的 T1、T2、T3 线程被阻塞,如图:
graph TB;
subgraph Sync[NonfairSync]
s[state=1];
e[exclusiveOwnerThread]
h[head指针];
t[tail指针];
end
h --> dum["Dummy Node(ws = Node.SIGNAL)"] <--> t1["Thread1 Node(ws = Node.SIGNAL)"]
t1 <--> t2["Thread2 Node(ws = Node.SIGNAL)"] <--> t3["Thread3 Node(ws = 0)"]<--> t
e --> t0[Thread0]
加锁流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public void lock () { sync.lock(); } final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
此时 T0 线程执行完毕,释放锁,假设有竞争,T0 线程执行完毕释放锁后,被新线程 T4 抢到锁,则 T4 开始执行,T1 继续阻塞;
假设没有竞争,则 T1 线程获得锁,如图:
graph TB;
subgraph Sync[NonfairSync]
s[state=1];
e[exclusiveOwnerThread]
h[head指针];
t[tail指针];
end
dum["Dummy Node(ws = Node.SIGNAL)"]
h --> t1["T1 Node -> Null Node<br/>(ws = Node.SIGNAL)"] <--> t2["Thread2 Node(ws = Node.SIGNAL)"] <--> t3["Thread3 Node(ws = 0)"]<--> t
e --> t0["Thread 0 -> null -> Thread 1"]
释放锁流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
假设 T0 获得锁后,进入条件变量 ConditionObject1 的等待队列,T0 释放锁,T1 获得锁,T1 获得锁后,也进入条件变量 ConditionObject1 的等待队列,T1 释放锁,T2 获得锁,如图:
graph TB;
subgraph ConditionObject1
fw[firstWaiter]
lw[lastWaiter]
end
fw --> node1["T0 Node(ws = Node.CONDITION)"]
node1 --> node2["T1 Node(ws = Node.CONDITION)"]
lw --> node2
subgraph Sync[NonfairSync]
s[state=1];
e[exclusiveOwnerThread]
h[head指针];
t[tail指针];
end
dum["Dummy Node(ws = Node.SIGNAL)"]
t1["T1 Node -> Null Node<br/>(ws = Node.SIGNAL)"]
h <--> t2["T2 Node -> Null Node<br/>(ws = Node.SIGNAL)"] <--> t3["T3 Node(ws = 0)"]<--> t
e --> t0["T0 -> null -> <br/> T1 -> null -> T2"]
条件变量 await():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } final int fullyRelease (Node node) { boolean failed = true ; try { int savedState = getState(); if (release(savedState)) { failed = false ; return savedState; } else { throw new IllegalMonitorStateException (); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
假设 T2 获得锁后,调用 ConditionObject1 的 signal(),唤醒 T0 并将其加入阻塞队列,如图:
graph TB;
subgraph ConditionObject1
fw[firstWaiter]
lw[lastWaiter]
end
fw --> node2["T1 Node(ws = Node.CONDITION)"]
lw --> node2
subgraph Sync[NonfairSync]
s[state=1];
e[exclusiveOwnerThread]
h[head指针];
t[tail指针];
end
h <--> t2["Null Node<br/>(ws = Node.SIGNAL)"] <--> t3["T3 Node(ws = Node.SIGNAL)"]
t3<--> node1["T0 Node(ws = 0)"] <--> t
e --> t0["T2"]
条件变量 signal():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); } private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); } final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }
ReentrantReadWriteLock原理 读锁和写锁共用一个 state,写锁使用低 16 位,读锁使用高 16 位
写锁上锁流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public void lock () { sync.acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; }
读锁上锁流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 public void lock () { sync.acquireShared(1 ); } public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared(current); }
StampedLock 带戳的读写锁,适用于读多写少的场景,不支持可重入,原理是乐观读。
读取时,先乐观读 stamp = sl.tryOptimisticRead();
,若validate
失败,则升级为读锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 package com.pl.demo15_reentrantRWLockAndStampedLock;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.StampedLock;@Slf4j(topic = "c.test") public class StampedLockTest { public static void main (String[] args) { DataContainerWithStampedLock data = new DataContainerWithStampedLock ("data" ); new Thread (() -> { log.debug("{}" , data.read()); }, "t1" ).start(); Sleeper.sleep(0.5 ); new Thread (() -> { log.debug("{}" , data.read()); }, "t2" ).start(); Sleeper.sleep(0.5 ); new Thread (() -> { data.write("new1" ); }, "t3" ).start(); } } @Slf4j(topic = "c.test") class DataContainerWithStampedLock { private Object data; private StampedLock sl = new StampedLock (); public DataContainerWithStampedLock (Object data) { this .data = data; } public Object read () { log.debug("begin" ); long stamp = sl.tryOptimisticRead(); Sleeper.sleep(2 ); log.debug("tryOptimisticRead {}" , stamp); if (sl.validate(stamp)) { log.debug("optimisticRead finish {}" , stamp); return data; } log.debug("update to readLock" ); try { stamp = sl.readLock(); log.debug("read lock {}" , stamp); log.debug("read lock finish {}" , stamp); return data; } finally { log.debug("readLock unlock" ); sl.unlockRead(stamp); } } public void write (Object data) { long stamp = sl.writeLock(); log.debug("writeLock() {}" , stamp); try { log.debug("writing" ); Sleeper.sleep(2 ); this .data = data; } finally { log.debug("unlockWrite()" ); sl.unlockWrite(stamp); } } }
Semaphore 信号量。用来限制同时访问共享资源的线程上限。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.pl.demo16_semaphoreAndCountDownLatchAndCyclicBarrier;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.Semaphore;@Slf4j(topic = "c.test") public class SemaphoreTest { public static void main (String[] args) { Semaphore s = new Semaphore (3 ); for (int i = 0 ; i < 10 ; i++) { new Thread (() -> { try { s.acquire(); log.debug("acquire" ); Sleeper.sleep(2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { s.release(); log.debug("release" ); } }, "t" + i).start(); } } }
acquire() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } static final class NonfairSync extends Sync { NonfairSync(int permits ) { super (permits ); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
release() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public void release () { sync.releaseShared(1 ); } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error ("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
CountDownLatch 和 CyclicBarrier CountDownLatch 允许一个或多个线程,等待其他一组线程完成操作,再继续执行。
用于进行线程同步协助,await()
用于等待计数归零,countDown()
将计数减一。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.pl.demo16_semaphoreAndCountDownLatchAndCyclicBarrier;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;@Slf4j(topic = "c.test") public class CountdownLatchTest { public static void main (String[] args) { CountDownLatch latch = new CountDownLatch (2 ); new Thread (() -> { log.debug("begin" ); Sleeper.sleep(1 ); latch.countDown(); log.debug("end" ); }, "t1" ).start(); new Thread (() -> { log.debug("begin" ); Sleeper.sleep(2 ); latch.countDown(); log.debug("end" ); }, "t2" ).start(); new Thread (() -> { log.debug("begin" ); Sleeper.sleep(3 ); latch.countDown(); log.debug("end" ); }, "t3" ).start(); try { log.debug("waiting" ); latch.await(); log.debug("end" ); } catch (InterruptedException e) { throw new RuntimeException (e); } } }
原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L ; Sync(int count) { setState(count); } int getCount () { return getState(); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } } private final Sync sync; public CountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException ("count < 0" ); this .sync = new Sync (count); } public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public boolean await (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); } public void countDown () { sync.releaseShared(1 ); } public long getCount () { return sync.getCount(); } }
CyclicBarrier 允许一组线程相互之间等待,达到一个共同点,再继续执行
支持复用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package com.pl.demo16_semaphoreAndCountDownLatchAndCyclicBarrier;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;@Slf4j(topic = "c.test") public class CyclicBarrierTest { public static void main (String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2 ); CyclicBarrier cyclicBarrier = new CyclicBarrier (2 , () -> { log.debug("task1 task2 end" ); }); for (int i = 0 ; i < 3 ; i++) { pool.submit(() -> { log.debug("begin" ); Sleeper.sleep(1 ); try { cyclicBarrier.await(); log.debug("end" ); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException (e); } }); pool.submit(() -> { log.debug("begin" ); Sleeper.sleep(3 ); try { cyclicBarrier.await(); log.debug("end" ); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException (e); } }); } } }
LinkedBlockingQueue 和 ConcurrentLinkedQueue 使用两把锁,同一时刻允许一个消费者线程和一个生产者线程同时执行,生产者线程间仍然串行,消费者线程间仍然串行
链表中存在一个 Dummy 节点以及 last、head 指针
flowchart LR
head --> Dummy
subgraph Dummy
item0[item]
next0[next]
end
subgraph Node1
item1[item]
next1[next]
end
subgraph Node2
item2[item]
next2[next]
end
next0 --> Node1
next1 --> Node2
last --> Node2
1 2 3 4 5 private final ReentrantLock putLock = new ReentrantLock ();private final ReentrantLock takeLock = new ReentrantLock ();
put() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException (); int c = -1 ; Node<E> node = new Node <E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); }
take() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public E take () throws InterruptedException { E x; int c = -1 ; final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull() return x; }
ConcurrentLinkedQueue 将 LinkedBlockingQueue 的锁用 CAS 实现
CopyOnWriteArrayList 采读写分离,空间换时间,适用于读多写少的情况,存在读弱一致性。
并发与一致性是矛盾的,需要权衡
写操作时,会创建一份副本,在副本上操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public boolean add (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1 ); newElements[len] = e; setArray(newElements); return true ; } finally { lock.unlock(); } }
读操作时没有上锁
1 2 3 4 5 6 7 8 9 public void forEach (Consumer<? super E> action) { if (action == null ) throw new NullPointerException (); Object[] elements = getArray(); int len = elements.length; for (int i = 0 ; i < len; ++i) { @SuppressWarnings("unchecked") E e = (E) elements[i]; action.accept(e); } }