JUC 指的是 java.util
偏向锁、轻量级锁、重量级锁 偏向锁 (默认开启)
用 ThreadID 替换 Mark Word。有利于冲突很少的场景 虚拟机默认在 4s 后从无锁变为偏向锁 有其他线程访问时升级为轻量级锁 此种情况没有地方存储哈希值,当调用 hashcode()
时会撤销偏向锁 批量重偏向:撤销偏向锁次数超过阈值(20),JVM 会进行批量重偏向 批量撤销:撤销偏向锁次数超过阈值(40),JVM 会撤销所有的偏向锁,且之后创建的对象默认没有偏向锁
轻量级锁 进入 synchronized
代码块: 若对象的 Mark Word 的后三位为 001(无锁),将对象的 Mark Word 转移到栈帧的 Lock Record 区域中(Lock Record 区域还有一个指向该对象的引用),该对象的 Mark Word 后两位标记为 00,前 30 位改为 Lock Record 地址 若对象的 Mark Word 的后两位为 00(轻量级锁),则检查对象的 Mark Word 是否指向当前线程的栈帧,如果是,则该栈帧的 Lock Record 区域中置为 null 和对象的引用,否则表明有多个线程竞争锁【升级成重量级锁,自旋尝试失败后将自己加入 EntryList】
退出 synchronized
代码块: 若 Lock Record 锁记录为 null,重置锁记录,表示重入计数减一 若 Lock Record 锁记录不为 null,尝试用 CAS 恢复。若失败,则表示进行了锁膨胀或升级为重量级锁
重量级锁 进入 synchronized
代码块: 申请 monitor对象,将 Java 对象的 Mark Word 中前 30 位指向对应 monitor 对象,后两位置为 11。monitor 对象位于操作系统,其 Owner 属性指明当前占有锁的对象,EntryList 属性保存等待该锁而被阻塞的线程队列
退出 synchronized
代码块: 根据 Java 对象的 Mark Word 找到 monitor 对象,将 Owner 置空,唤醒 EntryList 中 BLOCKED 线程
32 位 JVM 普通对象
Mark Word(32位)
biased_lock: 0
lock: 01
biased_lock: 1
lock: 01
lock: 00
Lightweight Locked
lock: 10
Heavyweight Locked
lock: 11
Marked for GC
package com.pl.demo1_locking;import lombok.extern.slf4j.Slf4j;@Slf4j(topic = "c.TestState") public class Test { public synchronized static void f1 () { } public static void f2 () { synchronized (Test.class) { } } static int count = 0 ; public static void main (String[] args) throws InterruptedException { Room room = new Room (); Thread t1 = new Thread (() -> { for (int i = 0 ; i < 5000 ; i++) { room.increment(); } }, "t1" ); Thread t2 = new Thread (() -> { for (int i = 0 ; i < 5000 ; i++) { room.decrement(); } }, "t2" ); t1.start(); t2.start(); t1.join(); t2.join(); log.debug("" +room.getCount()); } } class Room { private int count = 0 ; public synchronized void decrement () { count--; } public void increment () { synchronized (this ) { count++; } } public synchronized int getCount () { return count; } }
sleep() 和 wait()
是 Thread
类的 static
是 Object
不需要强制和 synchronized
必须获得对象锁和 synchronized
package com.pl.demo2_sleepAndWait;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.LockSupport;import static com.pl.util.Sleeper.sleep;@Slf4j(topic = "c.test") class A { static final Object o = new Object (); public static void main (String[] args) throws InterruptedException { new Thread (() -> { synchronized (o) { log.debug("begin" ); sleep(2.5 ); log.debug("other" ); } }, "t0" ).start(); for (int i = 0 ; i < 5 ; i++) { new Thread (() -> { synchronized (o) { log.debug("1111" ); } }, "t" + (i + 1 )).start(); } } } @Slf4j(topic = "c.test") public class Test { static final Object o = new Object (); public static void main (String[] args) { LockSupport.park(); new Thread (() -> { log.debug("begin" ); synchronized (o) { try { o.wait(); } catch (InterruptedException e) { throw new RuntimeException (e); } } log.debug("other" ); }, "t1" ).start(); new Thread (() -> { log.debug("begin" ); synchronized (o) { try { o.wait(500 ); } catch (InterruptedException e) { throw new RuntimeException (e); } } log.debug("other" ); }, "t2" ).start(); new Thread (() -> { log.debug("begin" ); synchronized (o) { try { o.wait(2000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } } log.debug("other" ); }, "t3" ).start(); sleep(1 ); synchronized (o) { o.notifyAll(); } } }
保护性暂停 设计模式:保护性暂停(用于一个线程等待另一个线程的执行结果)
一个结果需要从一个线程传递到另一个线程,可关联同一个 GuardedObject
package com.pl.demo3_guardedSuspension;import lombok.extern.slf4j.Slf4j;@Slf4j(topic = "c.test") public class GuardedObjectTest { public static void main (String[] args) { GuardedObject guardedObject = new GuardedObject (); new Thread (() -> { Object o = guardedObject.get(5000 ); if (o != null ) { log.debug("下载完成" ); } else { log.debug("下载失败" ); } }, "t1" ).start(); new Thread ( () -> { log.debug("准备下载" ); guardedObject.complete(); }, "t2" ).start(); } } @Slf4j(topic = "c.test") class GuardedObject { private Object response; public Object get (long timeout) { long begin = System.currentTimeMillis(); long delay = 0 ; synchronized (this ) { while (response == null ) { if (delay > timeout) { log.debug("超时!!" ); break ; } log.debug("开始等待" ); try { wait(timeout - delay); } catch (InterruptedException e) { e.printStackTrace(); } delay = System.currentTimeMillis() - begin; } } return response; } public Object get () { return get(0 ); } public void complete () { synchronized (this ) { try { log.debug("开始下载" ); wait(4000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } this .response = 1 ; this .notifyAll(); } }
package com.pl.demo3_guardedSuspension;import lombok.extern.slf4j.Slf4j;import java.util.Deque;import java.util.LinkedList;public class ProducerConsumerQueueTest { public static void main (String[] args) { MessageQueue messageQueue = new MessageQueue (3 ); for (int i = 0 ; i < 5 ; i++) { int id = i; new Thread (() -> { messageQueue.put(new Message (id, "内容" + id)); }, "Producer-" + i).start(); }; for (int i = 0 ; i < 5 ; i++) { new Thread (() -> { messageQueue.take(); }, "Consumer-" + i).start(); } } } @Slf4j(topic = "c.test") class MessageQueue { static final Deque<Message> list = new LinkedList <>(); private int capacity = 0 ; public MessageQueue (int capacity) { this .capacity = capacity; } public Message take () { synchronized (list) { while (list.isEmpty()) { try { log.debug("队列空,无法获取" ); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = list.remove(); log.debug("消耗了:" + message); list.notifyAll(); return message; } } public void put (Message message) { synchronized (list) { while (list.size() >= capacity) { try { log.debug("队列满,等待消耗" ); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("添加了:" + message); list.addLast(message); list.notifyAll(); } } } class Message { private final int id; private final Object value; public Message (int id, Object value) { this .id = id; this .value = value; } public int getId () { return id; } public Object getValue () { return value; } @Override public String toString () { return "Message{" + "id=" + id + ", value=" + value + '}' ; } }
LockSupport.park()、LockSupport.unpark() 每个线程都有自己的一个 Parker 对象,包括 _counter, _cond, _mutex
: 1、当前线程调用 Unsafe.park()
2、若 _counter 为 0,获得 _mutex 互斥锁并继续 3、线程进入 _cond 条件变量阻塞 4、_counter 设置为 0
1、当前线程调用 Unsafe.unpark(t1)
,_counter 设置为 1 2、唤醒 _cond 条件变量中的 t1 3、t1 恢复运行 4、_counter 设置为 0
package com.pl.demo4_park;import lombok.extern.slf4j.Slf4j;import java.util.HashMap;import java.util.concurrent.locks.LockSupport;import static com.pl.util.Sleeper.sleep;@Slf4j(topic = "c.test") public class Test { public static void main (String[] args) { Thread t1 = new Thread (() -> { log.debug("start" ); sleep(1 ); log.debug("park" ); LockSupport.park(); log.debug("resume" ); }, "t1" ); t1.start(); sleep(2 ); LockSupport.unpark(t1); } }
package com.pl.demo5_deadLockAndLiveLock;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import static com.pl.util.Sleeper.sleep;@Slf4j(topic = "c.test") public class Test { public static void main (String[] args) { testLiveLock(); } public static void testDeadLock () { Object lockA = new Object (); Object lockB = new Object (); new Thread (() -> { synchronized (lockA) { log.debug("enter lockA" ); sleep(1 ); synchronized (lockB) { log.debug("enter lockB" ); } } }, "t1" ).start(); new Thread (() -> { synchronized (lockB) { log.debug("enter lockB" ); sleep(0.5 ); synchronized (lockA) { log.debug("enter lockA" ); } } }, "t2" ).start(); } static Integer count = 50 ; public static void testLiveLock () { new Thread (() -> { while (count > 0 ) { sleep(0.2 ); count--; log.debug("count = {} " , count); } }, "t1" ).start(); new Thread (() -> { while (count < 100 ) { sleep(0.2 ); count++; log.debug("count = {} " , count); } }, "t2" ).start(); } }
package com.pl.demo6_reentrantLock;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.ReentrantLock;@Slf4j(topic = "c.test") public class ReentrantTest { private static final java.util.concurrent.locks.ReentrantLock lock = new ReentrantLock (); public static void main (String[] args) { lock.lock(); try { log.debug("enter main" ); m1(); } finally { lock.unlock(); } } private static void m1 () { lock.lock(); try { log.debug("enter m1" ); m2(); } finally { lock.unlock(); } } private static void m2 () { lock.lock(); try { log.debug("enter m2" ); } finally { lock.unlock(); } } }
package com.pl.demo6_reentrantLock;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.ReentrantLock;import static com.pl.util.Sleeper.sleep;@Slf4j(topic = "c.test") public class InterruptibleTest { private static final java.util.concurrent.locks.ReentrantLock lock = new ReentrantLock (); public static void main (String[] args) { Thread t1 = new Thread (() -> { try { log.debug("trying to get lock" ); lock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); log.debug("fail to get lock" ); return ; } try { log.debug("get lock" ); } finally { lock.unlock(); } }, "t1" ); lock.lock(); t1.start(); sleep(2 ); t1.interrupt(); } }
package com.pl.demo6_reentrantLock;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReentrantLock;import static com.pl.util.Sleeper.sleep;@Slf4j(topic = "c.test") public class TimeoutTest { private static final java.util.concurrent.locks.ReentrantLock lock = new ReentrantLock (); public static void main (String[] args) { Thread t1 = new Thread (() -> { log.debug("trying to get lock" ); try { if (!lock.tryLock(5 , TimeUnit.SECONDS)) { log.debug("fail to get lock" ); return ; } } catch (InterruptedException e) { e.printStackTrace(); return ; } try { log.debug("get lock" ); } finally { lock.unlock(); } }, "t1" ); log.debug("get lock" ); lock.lock(); t1.start(); sleep(3 ); lock.unlock(); } }
package com.pl.demo6_reentrantLock;import java.util.concurrent.locks.ReentrantLock;public class FairTest { public static void main (String[] args) { new ReentrantLock (true ); } }
package com.pl.demo6_reentrantLock;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;import static com.pl.util.Sleeper.sleep;@Slf4j(topic = "c.test") public class ConditionTest { static ReentrantLock room = new ReentrantLock (); static boolean hasCon1 = false ; static boolean hasCon2 = false ; static Condition condition1 = room.newCondition(); static Condition condition2 = room.newCondition(); public static void main (String[] args) { new Thread (() -> { room.lock(); try { log.debug("state of con1: " + hasCon1); if (!hasCon1) { log.debug("con1 is not satisfied. waiting..." ); try { condition1.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("con1 is satisfied." ); } finally { room.unlock(); } }, "t1" ).start(); new Thread (() -> { room.lock(); try { log.debug("state of con2: " + hasCon2); if (!hasCon2) { log.debug("con2 is not satisfied. waiting..." ); try { condition2.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("con2 is satisfied." ); } finally { room.unlock(); } }, "t2" ).start(); sleep(2 ); new Thread (() -> { room.lock(); try { hasCon1 = true ; condition1.signalAll(); } finally { room.unlock(); } }, "tt1" ).start(); sleep(3 ); new Thread (() -> { room.lock(); try { hasCon2 = true ; condition2.signalAll(); } finally { room.unlock(); } }, "tt2" ).start(); } }
package com.pl.demo6_reentrantLock;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.ReentrantLock;public class PhilosopherEating { public static void main (String[] args) { Chopstick c1 = new Chopstick ("c1" ); Chopstick c2 = new Chopstick ("c2" ); Chopstick c3 = new Chopstick ("c3" ); Chopstick c4 = new Chopstick ("c4" ); Chopstick c5 = new Chopstick ("c5" ); new Philosopher ("张三" , c1, c2).start(); new Philosopher ("李四" , c2, c3).start(); new Philosopher ("王五" , c3, c4).start(); new Philosopher ("赵六" , c4, c5).start(); new Philosopher ("孙七" , c5, c1).start(); } } @Slf4j(topic = "c.phi") class Philosopher extends Thread { private final Chopstick leftChop; private final Chopstick rightChop; public Philosopher (String name, Chopstick leftChop, Chopstick rightChop) { this .setName(name); this .leftChop = leftChop; this .rightChop = rightChop; } @Override public void run () { while (true ) { log.debug(this .getName() + " is trying to get " + leftChop.getName()); if (leftChop.try
实现交替执行 传统的 wait()、notify() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package com.pl.demo7_alternateExecution;public class Test { public static void main (String[] args) { WaitNotify wn = new WaitNotify (1 , 5 ); new Thread (() -> { wn.print("a" , 1 , 2 ); }, "t1" ).start(); new Thread (() -> { wn.print("b" , 2 , 3 ); }, "t2" ).start(); new Thread (() -> { wn.print("c" , 3 , 1 ); }, "t3" ).start(); } } class WaitNotify { public void print (String s, int waitFlag, int nextFlag) { for (int i = 0 ; i < loopNumber; i++) { synchronized (this ) { while (flag != waitFlag) { try { wait(); } catch (InterruptedException e) { throw new RuntimeException (e); } } System.out.print(s); flag = nextFlag; this .notifyAll(); } } } private int flag; private final int loopNumber; public WaitNotify (int flag, int loopNumber) { this .flag = flag; this .loopNumber = loopNumber; } }
park()、unpark() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package com.pl.demo7_alternateExecution;import com.pl.util.Sleeper;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.LockSupport;public class TestPark { static Thread t1, t2, t3; public static void main (String[] args) { ParkUnpark pu = new ParkUnpark (5 ); t1 = new Thread (() -> { pu.print("a" , t2); }); t2 = new Thread (() -> { pu.print("b" , t3); }); t3 = new Thread (() -> { pu.print("c" , t1); }); t1.start(); t2.start(); t3.start(); LockSupport.unpark(t1); } } class ParkUnpark { public void print (String s, Thread next) { for (int i = 0 ; i < loopNumber; i++) { LockSupport.park(); System.out.print(s); LockSupport.unpark(next); } } private final int loopNumber; public ParkUnpark (int loopNumber) { this .loopNumber = loopNumber; } }
ReentrantLock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package com.pl.demo7_alternateExecution;import com.pl.util.Sleeper;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;public class TestReen { public static void main (String[] args) { AwaitSignal as = new AwaitSignal (5 ); Condition c1 = as.newCondition(); Condition c2 = as.newCondition(); Condition c3 = as.newCondition(); new Thread (() -> { as.print("a" , c1, c2); },"t1" ).start(); new Thread (() -> { as.print("b" , c2, c3); },"t2" ).start(); new Thread (() -> { as.print("c" , c3, c1); },"t3" ).start(); Sleeper.sleep(1 ); as.lock(); try { c1.signalAll(); } finally { as.unlock(); } } } class AwaitSignal extends ReentrantLock { public void print (String s, Condition curr, Condition next) { for (int i = 0 ; i < loopNumber; i++) { lock(); try { curr.await(); System.out.print(s); next.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { unlock(); } } } private final int loopNumber; public AwaitSignal (int loopNumber) { this .loopNumber = loopNumber; } }
volitale volatile
保证可见性、有序性原理:内存屏障(Memory Barrier/ Memory Fence) 对 volatile
变量的写指令后会加入写屏障,屏障前的对共享变量的改写操作,会同步到主存。不会将写屏障之前的代码排在写屏障后 对 volatile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.pl.demo8_volitale;import com.pl.util.Sleeper;public class Test { static volatile boolean flag = true ; public static void main (String[] args) { new Thread (() -> { while (flag) { } }, "t1" ).start(); Sleeper.sleep(1 ); flag = false ; } }
Atomic AtomicInteger 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 package com.pl.demo9_atomic;import java.util.ArrayList;import java.util.concurrent.atomic.AtomicInteger;public class AtomicTest { public static void main (String[] args) { Account.demo(new AccountUnsafe (10000 )); Account.demo(new AccountCAS (10000 )); AtomicInteger i = new AtomicInteger (0 ); System.out.println(i.addAndGet(1 )); System.out.println(i.incrementAndGet()); System.out.println(i.updateAndGet(value -> value + 1 )); System.out.println(i.get()); System.out.println(i.getAndAdd(1 )); System.out.println(i.getAndIncrement()); System.out.println(i.getAndUpdate(value -> value + 1 )); System.out.println(i.get()); } } class AccountCAS implements Account { private AtomicInteger balance; public AccountCAS (Integer balance) { this .balance = new AtomicInteger (balance); } @Override public Integer getBalance () { return balance.get(); } @Override public void withdraw (Integer amount) { while (true ) { int prev = balance.get(); int next = prev - amount; if (balance.compareAndSet(prev, next)) { break ; } } } } class AccountUnsafe implements Account { Integer balance; public AccountUnsafe (Integer balance) { this .balance = balance; } @Override public synchronized Integer getBalance () { return balance; } @Override public synchronized void withdraw (Integer amount) { balance -= amount; } } interface Account { Integer getBalance () ; void withdraw (Integer amount) ; static void demo (Account account) { ArrayList<Thread> threads = new ArrayList <>(); for (int i = 0 ; i < 1000 ; i++) { threads.add(new Thread (() -> { account.withdraw(10 ); })); } long start = System.nanoTime(); threads.forEach(Thread::start); threads.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + ", " + (end - start) / 1000 ); } }
AtomicReference 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 package com.pl.demo9_atomic;import java.math.BigDecimal;import java.util.ArrayList;import java.util.concurrent.atomic.AtomicReference;public class AtomicReferenceTest { public static void main (String[] args) { DecimalAccount.demo(new DecimalAccountImp (new BigDecimal ("1" ))); } } class DecimalAccountImp implements DecimalAccount { private AtomicReference<BigDecimal> balance; public DecimalAccountImp (BigDecimal balance) { this .balance = new AtomicReference <>(balance); } @Override public BigDecimal getBalance () { return balance.get(); } @Override public void withdraw (BigDecimal amount) { while (true ) { BigDecimal prev = balance.get(); BigDecimal next = prev.subtract(amount); if (balance.compareAndSet(prev, next)) { break ; } } } } interface DecimalAccount { BigDecimal getBalance () ; void withdraw (BigDecimal amount) ; static void demo (DecimalAccount account) { ArrayList<Thread> threads = new ArrayList <>(); for (int i = 0 ; i < 1000 ; i++) { threads.add(new Thread (() -> { account.withdraw(new BigDecimal ("0.001" )); })); } long start = System.nanoTime(); threads.forEach(Thread::start); threads.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + ", " + (end - start) / 1000 ); } }
AtomicStampedReference 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package com.pl.demo9_atomic;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicStampedReference;@Slf4j(topic = "c.test") public class AtomicStampedReferenceTest { static AtomicStampedReference<String> ref = new AtomicStampedReference <String>("AAA" , 0 ); public static void main (String[] args) { int stamp = ref.getStamp(); log.debug("before sleeping : {}" , stamp); other(); Sleeper.sleep(1 ); log.debug("after sleeping: {}" , ref.getStamp()); log.debug("try changing AAA to CCC, result: {}" , ref.compareAndSet("AAA" , "BBB" , stamp, stamp + 1 )); } private static void other () { new Thread (() -> { int stamp = ref.getStamp(); log.debug("before: {}" , stamp); log.debug("try changing AAA to CCC, result: {}" , ref.compareAndSet("AAA" , "CCC" , stamp, stamp + 1 )); log.debug("after: {}" , ref.getStamp()); }, "t1" ).start(); Sleeper.sleep(0.5 ); new Thread (() -> { int stamp = ref.getStamp(); log.debug("before: {}" , stamp); log.debug("try changing CCC to AAA, result: {}" , ref.compareAndSet("CCC" , "AAA" , stamp, stamp + 1 )); log.debug("after: {}" , ref.getStamp()); }, "t2" ).start(); } }
LongAdder 原子累加器,设置多个累加单元 cell,累加时操作不同的 cell 变量,最后将结果汇总,减少了 CAS 尝试失败
速度比普通的 AtomicLong 快很多
使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 package com.pl.demo10_longAdder;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;import java.util.concurrent.atomic.AtomicLong;import java.util.concurrent.atomic.LongAdder;import java.util.function.Consumer;import java.util.function.Supplier;@Slf4j(topic = "c.test") public class Test { public static void main (String[] args) { for (int i = 0 ; i < 5 ; i++) { demo( AtomicLong::new , AtomicLong::getAndIncrement ); } log.debug("--------------------" ); for (int i = 0 ; i < 5 ; i++) { demo( LongAdder::new , LongAdder::increment ); } } private static <T> void demo (Supplier<T> adderSupplier, Consumer<T> action) { T adder = adderSupplier.get(); ArrayList<Thread> threads = new ArrayList <>(); for (int i = 0 ; i < 4 ; i++) { threads.add(new Thread (() -> { for (int j = 0 ; j < 500_000 ; j++) { action.accept(adder); } }, "t" + i)); } long start = System.nanoTime(); threads.forEach(Thread::start); for (Thread thread : threads) { try { thread.join(); } catch (InterruptedException e) { throw new RuntimeException (e); } } long end = System.nanoTime(); log.debug("{}, cost: {} ms" , adder, (end - start) / 1000_000 ); } }
LongAdder 源码 三个核心属性:cells、base、cellsBusy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 transient volatile Cell[] cells;transient volatile long base;transient volatile int cellsBusy;
1 2 3 4 5 6 7 8 9 10 11 12 13 public void add (long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null , uncontended); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 final void longAccumulate (long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0 ) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true ; } boolean collide = false ; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { Cell r = new Cell (x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false ; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created) break ; continue ; } } collide = false ; } else if (!wasUncontended) wasUncontended = true ; else if (a.cas(v = a.value, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; else if (n >= NCPU || cells != as) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell [n << 1 ]; for (int i = 0 ; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false ; try { if (cells == as) { Cell[] rs = new Cell [2 ]; rs[h & 1 ] = new Cell (x); cells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } else if (casBase(v = base, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; } }
1 2 3 4 5 6 7 8 9 10 11 12 public long sum () { Cell[] as = cells; Cell a; long sum = base; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
利用 AtomicIntegerArray 实现连接池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 package com.pl.demo11_customPool;import lombok.extern.slf4j.Slf4j;import java.sql.*;import java.util.Map;import java.util.Properties;import java.util.Random;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicIntegerArray;public class CustomPoolTest { public static void main (String[] args) { Pool pool = new Pool (2 ); int threadCount = 5 ; for (int i = 0 ; i < threadCount; i++) { new Thread (() -> { Connection connection = pool.borrow(); try { Thread.sleep(new Random ().nextInt(2000 )); } catch (InterruptedException e) { e.printStackTrace(); } finally { pool.free(connection); } }, "t" + (i + 1 )).start(); } } } @Slf4j(topic = "c.Test") class Pool { private final int poolSize; private Connection[] connections; private AtomicIntegerArray states; public Pool (int poolSize) { this .poolSize = poolSize; this .connections = new Connection [poolSize]; this .states = new AtomicIntegerArray (new int [poolSize]); for (int i = 0 ; i < poolSize; i++) { connections[i] = new MockConnection (); } } public Connection borrow () { while (true ) { for (int i = 0 ; i < poolSize; i++) { if (states.get(i) == 0 ) { if (states.compareAndSet(i, 0 , 1 )) { log.debug("get conn[" + i + "]" ); return connections[i]; } } } synchronized (this ) { try { log.debug("waiting..." ); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public void free (Connection connection) { for (int i = 0 ; i < poolSize; i++) { if (connections[i] == connection) { states.set(i, 0 ); log.debug("free conn[" + i + "]" ); synchronized (this ) { this .notifyAll(); } } } } class MockConnection implements Connection { ...... } }
final 关键字 final
: 1)修饰的类不能被继承 2)修饰的方法不能被子类重写 3)修饰的变量一旦赋值就不能修改(表现为基本数据类型的值不能发生变化,引用类型变量所引用的地址不会发生改变)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class FinalTest { static final int A = 10 ; static final int B = Short.MAX_VALUE + 1 ; final int a = 20 ; final int b = Integer.MAX_VALUE; static int C = 15 ; int c = 30 ; } class UseFinal { void test () { System.out.println(FinalTest.A); System.out.println(FinalTest.B); System.out.println(new FinalTest ().a); System.out.println(new FinalTest ().b); System.out.println(FinalTest.C); System.out.println(new FinalTest ().c); } }
如果一个变量满足以下三个条件时,该变量就会成为一个宏变量: 1)被 final
修饰 2)在定义该 final
变量时就指定了初始值 3)该初始值在编译时就能够唯一指定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.pl.demo12_final;public class MacroVariableTest { public static void main (String[] args) { String hw = "hello world" ; String hello = "hello" ; final String finalWorld2 = "hello" ; final String finalWorld3 = hello; final String finalWorld4 = "he" + "llo" ; String hw1 = hello + " world" ; String hw2 = finalWorld2 + " world" ; String hw3 = finalWorld3 + " world" ; String hw4 = finalWorld4 + " world" ; System.out.println(hw == hw1); System.out.println(hw == hw2); System.out.println(hw == hw3); System.out.println(hw == hw4); } }
线程池 自定义线程池 taskQueue:任务队列 workers:核心线程的包装 rejectPolicy:拒绝策略 coreSize:最大核心数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 package com.pl.demo13_threadPool;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.ArrayDeque;import java.util.Deque;import java.util.HashSet;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;@Slf4j(topic = "c.test") public class MyThreadPoolTest { public static void main (String[] args) { ThreadPool threadPool = new ThreadPool (2 , 3 , (queue, task) -> { queue.offer(task, 500 , TimeUnit.MILLISECONDS); }); for (int i = 0 ; i < 8 ; i++) { int j = i; threadPool.execute(() -> { Sleeper.sleep(1 ); log.debug("{}" ,j); }); } } } @FunctionalInterface interface RejectPolicy <T> { void reject (BlockingQueue<T> blockingQueue, T task) ; } @Slf4j(topic = "c.test") class ThreadPool { private final BlockingQueue<Runnable> taskQueue; private final HashSet<Worker> workers = new HashSet <>(); private final RejectPolicy<Runnable> rejectPolicy; private final int coreSize; public void execute (Runnable task) { synchronized (workers) { if (workers.size() < coreSize) { Worker worker = new Worker (task); workers.add(worker); log.debug("新增worker: {} 用以执行 {}" , worker, task); worker.start(); } else { taskQueue.tryPut(rejectPolicy, task); } } } public ThreadPool (int coreSize, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) { this .coreSize = coreSize; this .taskQueue = new BlockingQueue <>(queueCapacity); this .rejectPolicy = rejectPolicy; } class Worker extends Thread { private Runnable task; public Worker (Runnable task) { this .task = task; } @Override public void run () { while (task != null || (task = taskQueue.take(10 , TimeUnit.SECONDS)) != null ) { try { log.debug("开始执行 {}" , task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null ; } } synchronized (workers) { workers.remove(this ); log.debug("移除 {}" , this ); } } } } @Slf4j(topic = "c.test") class BlockingQueue <T> { private final Deque<T> queue = new ArrayDeque <>(); private final ReentrantLock lock = new ReentrantLock (); private final Condition fullWaitSet = lock.newCondition(); private final Condition emptyWaitSet = lock.newCondition(); private final int capacity; public BlockingQueue (int capacity) { this .capacity = capacity; } public void tryPut (RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if (queue.size() >= capacity) { rejectPolicy.reject(this , task); } else { queue.add(task); log.debug("将 {} 加入阻塞队列" , task); emptyWaitSet.signalAll(); } } finally { lock.unlock(); } } public T take (long timeout, TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.isEmpty()) { try { if (nanos <= 0 ) { return null ; } nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { throw new RuntimeException (e); } } T t = queue.removeFirst(); fullWaitSet.signalAll(); return t; } finally { lock.unlock(); } } public void put (T task) { lock.lock(); try { while (queue.size() >= capacity) { try { fullWaitSet.await(); } catch (InterruptedException e) { throw new RuntimeException (e); } } queue.add(task); log.debug("将 {} 加入阻塞队列" , task); emptyWaitSet.signalAll(); } finally { lock.unlock(); } } public boolean offer (T task, long timeout, TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size() >= capacity) { try { if (nanos < 0 ) { return false ; } nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { throw new RuntimeException (e); } } queue.add(task); log.debug("将 {} 加入阻塞队列" , task); emptyWaitSet.signalAll(); return true ; } finally { lock.unlock(); } } public int size () { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } }
JDK 提供的线程池 ThreadPoolExecutor
其中使用 AtomicInteger
类型的 ctl 的高 3 位标识线程池状态,低 29 位表示线程数量
对应 ctl 的高 3 位
任务全部执行完毕,活动线程为 0,即将进入终结
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize – 核心线程数(创建后就一直保留) maximumPoolSize – 最大线程数(核心线程数 + 救急线程数) keepAliveTime – 生存时间。针对救急线程 unit – 时间单位。针对救急线程的生存时间 workQueue – 阻塞队列 threadFactory – 线程工厂,主要用于为线程起名字 handler – 拒绝策略。当阻塞队列满,由救急线程执行任务,若救急线程也忙,则执行拒绝策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 @Slf4j(topic = "c.test") public class ThreadPoolExecutorTest { public static void main (String[] args) { ExecutorService pool1 = Executors.newFixedThreadPool(2 ); ExecutorService pool2 = Executors.newSingleThreadExecutor(); ExecutorService pool3 = Executors.newCachedThreadPool(); ScheduledExecutorService pool4 = Executors.newScheduledThreadPool(1 ); log.debug("begin" ); pool4.schedule(() -> { log.debug("task1" ); Sleeper.sleep(3 ); }, 1 , TimeUnit.SECONDS); pool4.schedule(() -> { log.debug("task2" ); int i = 1 / 0 ; }, 1 , TimeUnit.SECONDS); pool4.schedule(() -> { log.debug("task3" ); }, 1 , TimeUnit.SECONDS); } private static void synchronusQueueTest () { SynchronousQueue<Integer> integers = new SynchronousQueue <>(); new Thread (() -> { try { log.debug("putting {}" , 1 ); integers.put(1 ); log.debug("putted {}" , 1 ); log.debug("putting {}" , 2 ); integers.put(2 ); log.debug("putted {}" , 2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }, "t1" ).start(); Sleeper.sleep(1 ); new Thread (() -> { try { log.debug("taking {}" , 1 ); integers.take(); log.debug("took {}" , 1 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }, "t2" ).start(); Sleeper.sleep(1 ); new Thread (() -> { try { log.debug("taking {}" , 2 ); integers.take(); log.debug("took {}" , 2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }, "t3" ).start(); } private static void testPool (ExecutorService pool1) { pool1.execute(() -> { log.debug("1" ); Sleeper.sleep(1 ); }); pool1.execute(() -> { log.debug("2" ); Sleeper.sleep(1 ); }); pool1.execute(() -> { log.debug("3" ); Sleeper.sleep(1 ); }); } }
锁相关概念 锁:"避免发生混乱"的机制
偏向锁、轻量级锁和重量级锁是 JVM 在不同竞争情况下的渐进的锁优化升级机制
Lock 接口与 AQS synchronized
无法破坏“不可抢占”条件。 因为 synchronized
自定义一种不可重入阻塞锁机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 package com.pl.demo14_lockAndAqs;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.*;@Slf4j(topic = "c.test") public class Test { public static void main (String[] args) { MyLock lock = new MyLock (); new Thread (() -> { lock.lock(); log.debug("locking" ); try { Sleeper.sleep(1 ); } finally { log.debug("unlocking" ); lock.unlock(); } }, "t1" ).start(); new Thread (() -> { lock.lock(); log.debug("locking" ); lock.lock(); log.debug("locking" ); try { Sleeper.sleep(1 ); } finally { log.debug("unlocking" ); lock.unlock(); } }, "t2" ).start(); new ReentrantLock (); } } class MyLock implements Lock { class MySync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } @Override protected boolean tryRelease (int arg) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } @Override protected boolean isHeldExclusively () { return getState() == 1 ; } public Condition newCondition () { return new ConditionObject (); } } private final MySync sync; public MyLock () { sync = new MySync (); } @Override public void lock () { sync.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return sync.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { sync.release(1 ); } @Override public Condition newCondition () { return sync.newCondition(); } }
ReentrantLock 原理 假设 T0 线程一直保持锁,使得后续的 T1、T2、T3 线程被阻塞,如图:
graph TB;
subgraph Sync[NonfairSync]
h --> dum["Dummy Node(ws = Node.SIGNAL)"] <--> t1["Thread1 Node(ws = Node.SIGNAL)"]
t1 <--> t2["Thread2 Node(ws = Node.SIGNAL)"] <--> t3["Thread3 Node(ws = 0)"]<--> t
e --> t0[Thread0]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public void lock () { sync.lock(); } final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
此时 T0 线程执行完毕,释放锁,假设有竞争,T0 线程执行完毕释放锁后,被新线程 T4 抢到锁,则 T4 开始执行,T1 继续阻塞;
假设没有竞争,则 T1 线程获得锁,如图:
graph TB;
subgraph Sync[NonfairSync]
dum["Dummy Node(ws = Node.SIGNAL)"]
h --> t1["T1 Node -> Null Node<br/>(ws = Node.SIGNAL)"] <--> t2["Thread2 Node(ws = Node.SIGNAL)"] <--> t3["Thread3 Node(ws = 0)"]<--> t
e --> t0["Thread 0 -> null -> Thread 1"]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
假设 T0 获得锁后,进入条件变量 ConditionObject1 的等待队列,T0 释放锁,T1 获得锁,T1 获得锁后,也进入条件变量 ConditionObject1 的等待队列,T1 释放锁,T2 获得锁,如图:
graph TB;
subgraph ConditionObject1
fw --> node1["T0 Node(ws = Node.CONDITION)"]
node1 --> node2["T1 Node(ws = Node.CONDITION)"]
lw --> node2
subgraph Sync[NonfairSync]
dum["Dummy Node(ws = Node.SIGNAL)"]
t1["T1 Node -> Null Node<br/>(ws = Node.SIGNAL)"]
h <--> t2["T2 Node -> Null Node<br/>(ws = Node.SIGNAL)"] <--> t3["T3 Node(ws = 0)"]<--> t
e --> t0["T0 -> null -> <br/> T1 -> null -> T2"]
条件变量 await():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } final int fullyRelease (Node node) { boolean failed = true ; try { int savedState = getState(); if (release(savedState)) { failed = false ; return savedState; } else { throw new IllegalMonitorStateException (); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
假设 T2 获得锁后,调用 ConditionObject1 的 signal(),唤醒 T0 并将其加入阻塞队列,如图:
graph TB;
subgraph ConditionObject1
fw --> node2["T1 Node(ws = Node.CONDITION)"]
lw --> node2
subgraph Sync[NonfairSync]
h <--> t2["Null Node<br/>(ws = Node.SIGNAL)"] <--> t3["T3 Node(ws = Node.SIGNAL)"]
t3<--> node1["T0 Node(ws = 0)"] <--> t
e --> t0["T2"]
条件变量 signal():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); } private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); } final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }
ReentrantReadWriteLock原理 读锁和写锁共用一个 state,写锁使用低 16 位,读锁使用高 16 位
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public void lock () { sync.acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 public void lock () { sync.acquireShared(1 ); } public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared(current); }
StampedLock 带戳的读写锁,适用于读多写少的场景,不支持可重入,原理是乐观读。
读取时,先乐观读 stamp = sl.tryOptimisticRead();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 package com.pl.demo15_reentrantRWLockAndStampedLock;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.locks.StampedLock;@Slf4j(topic = "c.test") public class StampedLockTest { public static void main (String[] args) { DataContainerWithStampedLock data = new DataContainerWithStampedLock ("data" ); new Thread (() -> { log.debug("{}" , data.read()); }, "t1" ).start(); Sleeper.sleep(0.5 ); new Thread (() -> { log.debug("{}" , data.read()); }, "t2" ).start(); Sleeper.sleep(0.5 ); new Thread (() -> { data.write("new1" ); }, "t3" ).start(); } } @Slf4j(topic = "c.test") class DataContainerWithStampedLock { private Object data; private StampedLock sl = new StampedLock (); public DataContainerWithStampedLock (Object data) { this .data = data; } public Object read () { log.debug("begin" ); long stamp = sl.tryOptimisticRead(); Sleeper.sleep(2 ); log.debug("tryOptimisticRead {}" , stamp); if (sl.validate(stamp)) { log.debug("optimisticRead finish {}" , stamp); return data; } log.debug("update to readLock" ); try { stamp = sl.readLock(); log.debug("read lock {}" , stamp); log.debug("read lock finish {}" , stamp); return data; } finally { log.debug("readLock unlock" ); sl.unlockRead(stamp); } } public void write (Object data) { long stamp = sl.writeLock(); log.debug("writeLock() {}" , stamp); try { log.debug("writing" ); Sleeper.sleep(2 ); this .data = data; } finally { log.debug("unlockWrite()" ); sl.unlockWrite(stamp); } } }
Semaphore 信号量。用来限制同时访问共享资源的线程上限。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.pl.demo16_semaphoreAndCountDownLatchAndCyclicBarrier;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.Semaphore;@Slf4j(topic = "c.test") public class SemaphoreTest { public static void main (String[] args) { Semaphore s = new Semaphore (3 ); for (int i = 0 ; i < 10 ; i++) { new Thread (() -> { try { s.acquire(); log.debug("acquire" ); Sleeper.sleep(2 ); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { s.release(); log.debug("release" ); } }, "t" + i).start(); } } }
acquire() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } static final class NonfairSync extends Sync { NonfairSync(int permits ) { super (permits ); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
release() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public void release () { sync.releaseShared(1 ); } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error ("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
CountDownLatch 和 CyclicBarrier CountDownLatch 允许一个或多个线程,等待其他一组线程完成操作,再继续执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package com.pl.demo16_semaphoreAndCountDownLatchAndCyclicBarrier;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;@Slf4j(topic = "c.test") public class CountdownLatchTest { public static void main (String[] args) { CountDownLatch latch = new CountDownLatch (2 ); new Thread (() -> { log.debug("begin" ); Sleeper.sleep(1 ); latch.countDown(); log.debug("end" ); }, "t1" ).start(); new Thread (() -> { log.debug("begin" ); Sleeper.sleep(2 ); latch.countDown(); log.debug("end" ); }, "t2" ).start(); new Thread (() -> { log.debug("begin" ); Sleeper.sleep(3 ); latch.countDown(); log.debug("end" ); }, "t3" ).start(); try { log.debug("waiting" ); latch.await(); log.debug("end" ); } catch (InterruptedException e) { throw new RuntimeException (e); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L ; Sync(int count) { setState(count); } int getCount () { return getState(); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } } private final Sync sync; public CountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException ("count < 0" ); this .sync = new Sync (count); } public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public boolean await (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); } public void countDown () { sync.releaseShared(1 ); } public long getCount () { return sync.getCount(); } }
CyclicBarrier 允许一组线程相互之间等待,达到一个共同点,再继续执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package com.pl.demo16_semaphoreAndCountDownLatchAndCyclicBarrier;import com.pl.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;@Slf4j(topic = "c.test") public class CyclicBarrierTest { public static void main (String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2 ); CyclicBarrier cyclicBarrier = new CyclicBarrier (2 , () -> { log.debug("task1 task2 end" ); }); for (int i = 0 ; i < 3 ; i++) { pool.submit(() -> { log.debug("begin" ); Sleeper.sleep(1 ); try { cyclicBarrier.await(); log.debug("end" ); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException (e); } }); pool.submit(() -> { log.debug("begin" ); Sleeper.sleep(3 ); try { cyclicBarrier.await(); log.debug("end" ); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException (e); } }); } } }
LinkedBlockingQueue 和 ConcurrentLinkedQueue 使用两把锁,同一时刻允许一个消费者线程和一个生产者线程同时执行,生产者线程间仍然串行,消费者线程间仍然串行
链表中存在一个 Dummy 节点以及 last、head 指针
flowchart LR
head --> Dummy
subgraph Dummy
subgraph Node1
subgraph Node2
next0 --> Node1
next1 --> Node2
last --> Node2
1 2 3 4 5 private final ReentrantLock putLock = new ReentrantLock ();private final ReentrantLock takeLock = new ReentrantLock ();
put() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException (); int c = -1 ; Node<E> node = new Node <E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); }
take() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public E take () throws InterruptedException { E x; int c = -1 ; final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull() return x; }
ConcurrentLinkedQueue 将 LinkedBlockingQueue 的锁用 CAS 实现
CopyOnWriteArrayList 采读写分离,空间换时间,适用于读多写少的情况,存在读弱一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public boolean add (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1 ); newElements[len] = e; setArray(newElements); return true ; } finally { lock.unlock(); } }
1 2 3 4 5 6 7 8 9 public void forEach (Consumer<? super E> action) { if (action == null ) throw new NullPointerException (); Object[] elements = getArray(); int len = elements.length; for (int i = 0 ; i < len; ++i) { @SuppressWarnings("unchecked") E e = (E) elements[i]; action.accept(e); } }