JUC 指的是 java.util 三个并发编程工具包

  1. java.util.concurrent
  2. java.util.concurrent.atomic
  3. java.util.concurrent.locks

[toc]

偏向锁、轻量级锁、重量级锁

偏向锁

(默认开启)

用 ThreadID 替换 Mark Word。有利于冲突很少的场景
虚拟机默认在 4s 后从无锁变为偏向锁
有其他线程访问时升级为轻量级锁
此种情况没有地方存储哈希值,当调用 hashcode() 时会撤销偏向锁
批量重偏向:撤销偏向锁次数超过阈值(20),JVM 会进行批量重偏向
批量撤销:撤销偏向锁次数超过阈值(40),JVM 会撤销所有的偏向锁,且之后创建的对象默认没有偏向锁

轻量级锁

进入 synchronized 代码块:
若对象的 Mark Word 的后三位为 001(无锁),将对象的 Mark Word 转移到栈帧的 Lock Record 区域中(Lock Record 区域还有一个指向该对象的引用),该对象的 Mark Word 后两位标记为 00,前 30 位改为 Lock Record 地址
若对象的 Mark Word 的后两位为 00(轻量级锁),则检查对象的 Mark Word 是否指向当前线程的栈帧,如果是,则该栈帧的 Lock Record 区域中置为 null 和对象的引用,否则表明有多个线程竞争锁【升级成重量级锁,自旋尝试失败后将自己加入 EntryList】

退出 synchronized 代码块:
若 Lock Record 锁记录为 null,重置锁记录,表示重入计数减一
若 Lock Record 锁记录不为 null,尝试用 CAS 恢复。若失败,则表示进行了锁膨胀或升级为重量级锁

重量级锁

进入 synchronized 代码块:
申请 monitor对象,将 Java 对象的 Mark Word 中前 30 位指向对应 monitor 对象,后两位置为 11。monitor 对象位于操作系统,其 Owner 属性指明当前占有锁的对象,EntryList 属性保存等待该锁而被阻塞的线程队列

退出 synchronized 代码块:
根据 Java 对象的 Mark Word 找到 monitor 对象,将 Owner 置空,唤醒 EntryList 中 BLOCKED 线程

32 位 JVM 普通对象
Mark Word(32位) 表示的锁状态
hashcode(25位) age(4位) biased_lock: 0 lock: 01 Normal
thread(23位) epoch(2位) age(4位) biased_lock: 1 lock: 01 Biased
ptr_to_lock_record(30位) lock: 00 Lightweight Locked
ptr_to_heavyweight_monitor(30位) lock: 10 Heavyweight Locked
lock: 11 Marked for GC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package com.pl.demo1_locking;

import lombok.extern.slf4j.Slf4j;

/*
32位 Jvm 中每个普通非数组 Java 对象的 Object Header 为 64 位(数组对象额外多了 32 位的长度标记)
包括 32 位的 Mark Word 和 32 位的 Klass Point

|----------------------------------------------------------------------------------------------|
| Mark Word(32bits) | State |
|----------------------------------------------------------------------------------------------|
| hashcode:25 | age:4 | biased_lock:0 | lock:01 | Normal |
|----------------------------------------------------------------------------------------------|
| thread:23 | epoch:2 | age:4 | biased_lock:1 | lock:01 | Biased |
|----------------------------------------------------------------------------------------------|
| ptr_to_lock_record:30 | lock:00 | Lightweight Locked |
|----------------------------------------------------------------------------------------------|
| ptr_to_heavyweight_monitor:30 | lock:10 | Heavyweight Locked |
|----------------------------------------------------------------------------------------------|
| | lock:11 | Marked for GC |
|----------------------------------------------------------------------------------------------|

|----------------------------------------------------------------------------------------------|
| Mark Word(64bits) | State |
|----------------------------------------------------------------------------------------------|
| unused:25|identity_hashcode:31|unused:1|age:4|biased_lock:0| lock:01 | Normal |
|----------------------------------------------------------------------------------------------|
| thread:54| epoch:2 |unused:1|age:4|biased_lock:1| lock:01 | Biased |
|----------------------------------------------------------------------------------------------|
| ptr_to_lock_record:62 | lock:00 | Lightweight Locked |
|----------------------------------------------------------------------------------------------|
| ptr_to_heavyweight_monitor:62 | lock:10 | Heavyweight Locked |
|----------------------------------------------------------------------------------------------|
| | lock:11 | Marked for GC |
|----------------------------------------------------------------------------------------------|

*/


@Slf4j(topic = "c.TestState")
public class Test {

public synchronized static void f1() {

}
// 在 static 方法上加 synchronized 即锁住类对象,等价于
public static void f2() {
synchronized (Test.class) {

}
}



static int count = 0;
public static void main(String[] args) throws InterruptedException {
Room room = new Room();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
room.increment();
}
}, "t1");

Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
room.decrement();
}
}, "t2");
t1.start();
t2.start();
// 虽然调用的是 t1.join(),但由于 t1.join() 获取了当前线程(main)的锁,实际上阻塞的是当前线(main)
// 在 join() 中,使用了 synchronized 来使获取到当前线程的锁,确保只有一个线程可以进入 join() 方法
// t1 执行完毕后,会由 JVM 底层执行唤醒等待线程的功能
t1.join();
t2.join();
log.debug(""+room.getCount());
}

}

class Room {
private int count = 0;


/*
在方法上加 synchronized 即锁住 this 对象,等价于
public void decrement() {
synchronized (this) {
count--;
}
}
*/
public synchronized void decrement() {
count--;
}

public void increment() {
synchronized (this) {
count++;
}
}
public synchronized int getCount() {
return count;
}
}

sleep() 和 wait()

  • sleep()Thread 类的 static 方法,wait()Object 的方法
  • sleep() 不需要强制和 synchronized 配合,wait() 必须获得对象锁和 synchronized 一起使用
  • sleep() 睡眠时不会释放对象锁,wait() 在等待时释放对象锁
  • 线程状态都是 TIMED_WAITING
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.pl.demo2_sleepAndWait;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.LockSupport;

import static com.pl.util.Sleeper.sleep;

@Slf4j(topic = "c.test")
class A {
static final Object o = new Object();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
synchronized (o) {
log.debug("begin");
sleep(2.5);
log.debug("other");
}
}, "t0").start();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
synchronized (o) {
log.debug("1111");
}
}, "t" + (i + 1)).start();
}
}
}
@Slf4j(topic = "c.test")
public class Test {

static final Object o = new Object();

public static void main(String[] args) {
LockSupport.park();
new Thread(() -> {
log.debug("begin");
synchronized (o) {
try {
o.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("other");
}, "t1").start();

new Thread(() -> {
log.debug("begin");
synchronized (o) {
try {
o.wait(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("other");
}, "t2").start();

new Thread(() -> {
log.debug("begin");
synchronized (o) {
try {
o.wait(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.debug("other");
}, "t3").start();


sleep(1);

synchronized (o) {
// o.notify();
o.notifyAll();
}
}
}

保护性暂停

设计模式:保护性暂停(用于一个线程等待另一个线程的执行结果)

  • 一个结果需要从一个线程传递到另一个线程,可关联同一个 GuardedObject
  • 多个结果不断从一个线程传递到另一个线程,可使用消息队列(生产者消费者)

joinFuture 的实现采用的就是此模式

GuardedObject

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.pl.demo3_guardedSuspension;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.test")
public class GuardedObjectTest {
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(() -> {
Object o = guardedObject.get(5000);
if (o != null) {
log.debug("下载完成");
} else {
log.debug("下载失败");
}
}, "t1").start();
new Thread( () -> {
log.debug("准备下载");
guardedObject.complete();
}, "t2").start();
}
}

@Slf4j(topic = "c.test")
class GuardedObject {
// 待传递的结果
private Object response;

// 获取结果
public Object get(long timeout) {
long begin = System.currentTimeMillis();
long delay = 0;
synchronized (this) {
// 解决虚假唤醒
while (response == null) {
if (delay > timeout) {
log.debug("超时!!");
break;
}
log.debug("开始等待");
try {
// 防止虚假唤醒导致的等待时间异常
wait(timeout - delay);
} catch (InterruptedException e) {
e.printStackTrace();
}
delay = System.currentTimeMillis() - begin;
}
}
return response;
}

public Object get() {
return get(0);
}

public void complete() {
synchronized (this) {
try {
log.debug("开始下载");
// 模拟下载耗时
wait(4000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.response = 1;
this.notifyAll();
}
}

生产者消费者消息队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.pl.demo3_guardedSuspension;

import lombok.extern.slf4j.Slf4j;

import java.util.Deque;
import java.util.LinkedList;

public class ProducerConsumerQueueTest {
public static void main(String[] args) {
MessageQueue messageQueue = new MessageQueue(3);
for (int i = 0; i < 5; i++) {
int id = i;
new Thread(() -> {
messageQueue.put(new Message(id, "内容" + id));
}, "Producer-" + i).start();
};

for (int i = 0; i < 5; i++) {
new Thread(() -> {
messageQueue.take();
}, "Consumer-" + i).start();
}

}
}

@Slf4j(topic = "c.test")
class MessageQueue {
// 消息队列
static final Deque<Message> list = new LinkedList<>();
private int capacity = 0;

public MessageQueue(int capacity) {
this.capacity = capacity;
}

public Message take() {
synchronized (list) {
while (list.isEmpty()) {
try {
log.debug("队列空,无法获取");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = list.remove();
log.debug("消耗了:" + message);
list.notifyAll();
return message;
}
}

public void put(Message message) {
synchronized (list) {
while (list.size() >= capacity) {
try {
log.debug("队列满,等待消耗");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

log.debug("添加了:" + message);
list.addLast(message);
list.notifyAll();
}
}


}

class Message {
private final int id;

private final Object value;

public Message(int id, Object value) {
this.id = id;
this.value = value;
}

public int getId() {
return id;
}

public Object getValue() {
return value;
}

@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}

LockSupport.park()、LockSupport.unpark()

每个线程都有自己的一个 Parker 对象,包括 _counter, _cond, _mutex

LockSupport.park()
1、当前线程调用 Unsafe.park()
2、若 _counter 为 0,获得 _mutex 互斥锁并继续
3、线程进入 _cond 条件变量阻塞
4、_counter 设置为 0

LockSupport.unpark(t1)
1、当前线程调用 Unsafe.unpark(t1),_counter 设置为 1
2、唤醒 _cond 条件变量中的 t1
3、t1 恢复运行
4、_counter 设置为 0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.pl.demo4_park;

import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.concurrent.locks.LockSupport;

import static com.pl.util.Sleeper.sleep;

@Slf4j(topic = "c.test")
public class Test {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
log.debug("start");
sleep(1);
log.debug("park");
// LockSupport.park() 暂停当前线程
// 当前线程进入 WAIT 状态
LockSupport.park();
log.debug("resume");
}, "t1");
t1.start();
sleep(2);
// LockSupport.unpark(thread) 恢复指定的线程
// 可以先于 park() 调用
LockSupport.unpark(t1);

}
}

死锁、活锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.pl.demo5_deadLockAndLiveLock;

import com.pl.util.Sleeper;
import lombok.extern.slf4j.Slf4j;

import static com.pl.util.Sleeper.sleep;

@Slf4j(topic = "c.test")
public class Test {

// 可使用 jps 查看 Java 进程 id,然后 jstack [id] 查看进程的详细情况
public static void main(String[] args) {
// testDeadLock();
testLiveLock();
}

public static void testDeadLock() {
Object lockA = new Object();
Object lockB = new Object();

new Thread(() -> {
synchronized (lockA) {
log.debug("enter lockA");
sleep(1);
synchronized (lockB) {
log.debug("enter lockB");
}
}
}, "t1").start();

new Thread(() -> {
synchronized (lockB) {
log.debug("enter lockB");
sleep(0.5);
synchronized (lockA) {
log.debug("enter lockA");
}
}
}, "t2").start();

}


static Integer count = 50;

public static void testLiveLock() {

new Thread(() -> {
while (count > 0) {
sleep(0.2);
count--;
log.debug("count = {} ", count);
}
}, "t1").start();

new Thread(() -> {
while (count < 100) {
sleep(0.2);
count++;
log.debug("count = {} ", count);
}
}, "t2").start();


}
}

ReentrantLock 的性质

可重入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.pl.demo6_reentrantLock;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.test")
public class ReentrantTest {

// ReentrantLock
private static final java.util.concurrent.locks.ReentrantLock lock = new ReentrantLock();

public static void main(String[] args) {
lock.lock();
try {
log.debug("enter main");
// 可重入
m1();
} finally {
lock.unlock();
}
}

private static void m1() {
lock.lock();
try {
log.debug("enter m1");
m2();
} finally {
lock.unlock();
}
}

private static void m2() {
lock.lock();
try {
log.debug("enter m2");
} finally {
lock.unlock();
}
}
}

可打断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.pl.demo6_reentrantLock;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

import static com.pl.util.Sleeper.sleep;

@Slf4j(topic = "c.test")
public class InterruptibleTest {

private static final java.util.concurrent.locks.ReentrantLock lock = new ReentrantLock();


public static void main(String[] args) {
Thread t1 = new Thread(() -> {
// lockInterruptibly() 加的锁可被打断
try {
log.debug("trying to get lock");
// 普通的 lock() 无法被打断
// lockInterruptibly() 可被打断
// 若有竞争,进入阻塞队列,可被其他线程调用 interrupt() 打断
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("fail to get lock");
return;
}

try {
log.debug("get lock");
} finally {
lock.unlock();
}

}, "t1");

lock.lock();
t1.start();

sleep(2);
t1.interrupt();
}
}

锁超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.pl.demo6_reentrantLock;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import static com.pl.util.Sleeper.sleep;

@Slf4j(topic = "c.test")
public class TimeoutTest {

private static final java.util.concurrent.locks.ReentrantLock lock = new ReentrantLock();


public static void main(String[] args) {
Thread t1 = new Thread(() -> {
log.debug("trying to get lock");
// tryLock() 在指定时间内尝试获取锁,一旦成功则返回 true,否则返回 false
// 可被打断
try {
if (!lock.tryLock(5, TimeUnit.SECONDS)) {
log.debug("fail to get lock");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
return;
}

try {
log.debug("get lock");
} finally {
lock.unlock();
}

}, "t1");

log.debug("get lock");
lock.lock();
t1.start();
sleep(3);
lock.unlock();
}
}

公平锁

1
2
3
4
5
6
7
8
9
10
11
package com.pl.demo6_reentrantLock;

import java.util.concurrent.locks.ReentrantLock;

public class FairTest {
// ReentrantLock 默认是不公平的(即释放锁时,被阻塞的线程进行竞争,而不是先来先得)
public static void main(String[] args) {
// 一般不用公平锁,因为会降低并发度
new ReentrantLock(true);
}
}

条件变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.pl.demo6_reentrantLock;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import static com.pl.util.Sleeper.sleep;

@Slf4j(topic = "c.test")
public class ConditionTest {
static ReentrantLock room = new ReentrantLock();
static boolean hasCon1 = false;
static boolean hasCon2 = false;

// 不同的条件变量
static Condition condition1 = room.newCondition();
static Condition condition2 = room.newCondition();

public static void main(String[] args) {
new Thread(() -> {
room.lock();
try {
log.debug("state of con1: " + hasCon1);
if (!hasCon1) {
log.debug("con1 is not satisfied. waiting...");
try {
// 调用相应 condition 的 await() 进入相应的阻塞队列
condition1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("con1 is satisfied.");
} finally {
room.unlock();
}
}, "t1").start();


new Thread(() -> {
room.lock();
try {
log.debug("state of con2: " + hasCon2);
if (!hasCon2) {
log.debug("con2 is not satisfied. waiting...");
try {
condition2.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("con2 is satisfied.");
} finally {
room.unlock();
}
}, "t2").start();

sleep(2);

new Thread(() -> {
room.lock();
try {
hasCon1 = true;
condition1.signalAll();
} finally {
room.unlock();
}
}, "tt1").start();

sleep(3);

new Thread(() -> {
room.lock();
try {
hasCon2 = true;
condition2.signalAll();
} finally {
room.unlock();
}
}, "tt2").start();
}
}

应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.pl.demo6_reentrantLock;

import com.pl.util.Sleeper;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

// 用 ReentrantLock 解决哲学家就餐问题
public class PhilosopherEating {
public static void main(String[] args) {
Chopstick c1 = new Chopstick("c1");
Chopstick c2 = new Chopstick("c2");
Chopstick c3 = new Chopstick("c3");
Chopstick c4 = new Chopstick("c4");
Chopstick c5 = new Chopstick("c5");
new Philosopher("张三", c1, c2).start();
new Philosopher("李四", c2, c3).start();
new Philosopher("王五", c3, c4).start();
new Philosopher("赵六", c4, c5).start();
new Philosopher("孙七", c5, c1).start();

}
}

@Slf4j(topic = "c.phi")
class Philosopher extends Thread {

private final Chopstick leftChop;

private final Chopstick rightChop;

public Philosopher(String name, Chopstick leftChop, Chopstick rightChop) {
this.setName(name);
this.leftChop = leftChop;
this.rightChop = rightChop;
}

@Override
public void run() {
// dead lock
/*
while (true) {
log.debug(this.getName() + " is trying to get " + leftChop.getName());
synchronized (leftChop) {
log.debug(this.getName() + " get " + leftChop.getName());
log.debug(this.getName() + " is trying to get " + rightChop.getName());
synchronized (rightChop) {
log.debug(this.getName() + " get " + rightChop.getName());
eat();
}
}
}
*/
while (true) {
log.debug(this.getName() + " is trying to get " + leftChop.getName());
if (leftChop.tryLock()) {
log.debug(this.getName() + " get " + leftChop.getName());
try {
log.debug(this.getName() + " is trying to get " + rightChop.getName());
if (rightChop.tryLock()) {
log.debug(this.getName() + " get " + rightChop.getName());
try {
eat();
} finally {
log.debug(this.getName() + " drop " + rightChop.getName());
rightChop.unlock();
}
} else {
log.debug(this.getName() + " can't get " + rightChop.getName());
}
} finally {
log.debug(this.getName() + " drop " + leftChop.getName());
leftChop.unlock();
}
} else {
log.debug(this.getName() + " can't get " + leftChop.getName());
}

}
}

private void eat() {
log.debug("eating");
Sleeper.sleep(0.2);
}

}

class Chopstick extends ReentrantLock {
private final String name;

public Chopstick(String name) {
this.name = name;
}

public String getName() {
return name;
}
}

实现交替执行

传统的 wait()、notify()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.pl.demo7_alternateExecution;

public class Test {
public static void main(String[] args) {
WaitNotify wn = new WaitNotify(1, 5);
// 三个线程都对同一个 wn 加锁
new Thread(() -> {
wn.print("a", 1, 2);
}, "t1").start();
new Thread(() -> {
wn.print("b", 2, 3);
}, "t2").start();
new Thread(() -> {
wn.print("c", 3, 1);
}, "t3").start();
}
}

class WaitNotify {
public void print(String s, int waitFlag, int nextFlag) {
for (int i = 0; i < loopNumber; i++) {
synchronized (this) {
while (flag != waitFlag) {
try {
wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.print(s);
flag = nextFlag;
this.notifyAll();
}
}
}

private int flag;

private final int loopNumber;

public WaitNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
}

park()、unpark()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.pl.demo7_alternateExecution;

import com.pl.util.Sleeper;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;

public class TestPark {
static Thread t1, t2, t3;
public static void main(String[] args) {
ParkUnpark pu = new ParkUnpark(5);
t1 = new Thread(() -> {
pu.print("a", t2);
});

t2 = new Thread(() -> {
pu.print("b", t3);
});

t3 = new Thread(() -> {
pu.print("c", t1);
});

t1.start();
t2.start();
t3.start();

LockSupport.unpark(t1);
}
}

class ParkUnpark {

public void print(String s, Thread next) {
for (int i = 0; i < loopNumber; i++) {
LockSupport.park();
System.out.print(s);
LockSupport.unpark(next);
}
}

private final int loopNumber;

public ParkUnpark(int loopNumber) {
this.loopNumber = loopNumber;
}
}

ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.pl.demo7_alternateExecution;

import com.pl.util.Sleeper;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class TestReen {
public static void main(String[] args) {
AwaitSignal as = new AwaitSignal(5);
Condition c1 = as.newCondition();
Condition c2 = as.newCondition();
Condition c3 = as.newCondition();
new Thread(() -> {
as.print("a", c1, c2);
},"t1").start();

new Thread(() -> {
as.print("b", c2, c3);
},"t2").start();

new Thread(() -> {
as.print("c", c3, c1);
},"t3").start();

// 需要手动启动
Sleeper.sleep(1);
as.lock();
try {
c1.signalAll();
} finally {
as.unlock();
}
}
}

class AwaitSignal extends ReentrantLock {

public void print(String s, Condition curr, Condition next) {
for (int i = 0; i < loopNumber; i++) {
lock();
try {
curr.await();
System.out.print(s);
next.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
unlock();
}
}
}


private final int loopNumber;


public AwaitSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
}

volitale

volatile 保证可见性、有序性原理:内存屏障(Memory Barrier/ Memory Fence)
volatile 变量的写指令后会加入写屏障,屏障前的对共享变量的改写操作,会同步到主存。不会将写屏障之前的代码排在写屏障后
volatile 变量的读指令前会加入读屏障,屏障后的对共享变量的读取操作,会从主存中加载。不会将读屏障之后的代码排在读屏障前

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.pl.demo8_volitale;

import com.pl.util.Sleeper;

public class Test {

// volatile 修饰成员变量和静态成员变量,避免线程到自己的工作缓存中查找变量
// 使得线程必须到主存中获取值,保证可见性
// volatile 写屏障,避免指令重排序
// ps:synchronized 既可保证可见性,又可保证原子性
static volatile boolean flag = true;
// static boolean flag = true;

public static void main(String[] args) {

new Thread(() -> {
while (flag) {

}
}, "t1").start();

Sleeper.sleep(1);
// 若 flag 没有 volatile 修饰,则线程到自己的缓存中获取到 flag 的值为 true,此处修改就不会使 t1 停下
flag = false;

}
}

Atomic

AtomicInteger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package com.pl.demo9_atomic;

import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;

// CAS 和 volatile 可以实现无锁并发,适用于线程数少、CPU 核心数多的情况
// CAS 基于乐观锁的思想
// synchronized 基于悲观锁的思想
public class AtomicTest {
public static void main(String[] args) {
Account.demo(new AccountUnsafe(10000));
Account.demo(new AccountCAS(10000));

AtomicInteger i = new AtomicInteger(0);
// ++i
System.out.println(i.addAndGet(1));
// 等价于
System.out.println(i.incrementAndGet());
// 等价于
System.out.println(i.updateAndGet(value -> value + 1));

System.out.println(i.get());

// i++
System.out.println(i.getAndAdd(1));
// 等价于
System.out.println(i.getAndIncrement());
// 等价于
System.out.println(i.getAndUpdate(value -> value + 1));

System.out.println(i.get());
}
}

class AccountCAS implements Account {

// 使用 AtomicInteger 原子类
private AtomicInteger balance;

public AccountCAS(Integer balance) {
this.balance = new AtomicInteger(balance);
}

@Override
public Integer getBalance() {
return balance.get();
}

@Override
public void withdraw(Integer amount) {
while (true) {
int prev = balance.get();
int next = prev - amount;
// 内部是原子的
// 底层是 Unsafe 类的 CAS(Compare and Swap)方法
if (balance.compareAndSet(prev, next)) {
break;
}
}
// 等价于
// balance.updateAndGet(value -> value - amount);
}
}

class AccountUnsafe implements Account {

Integer balance;

public AccountUnsafe(Integer balance) {
this.balance = balance;
}

@Override
public synchronized Integer getBalance() {
return balance;
}

@Override
public synchronized void withdraw(Integer amount) {
balance -= amount;
}
}

interface Account {
Integer getBalance();

void withdraw(Integer amount);

static void demo(Account account) {
ArrayList<Thread> threads = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
threads.add(new Thread(() -> {
account.withdraw(10);
}));
}
long start = System.nanoTime();
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance() + ", " + (end - start) / 1000);
}

}

AtomicReference

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.pl.demo9_atomic;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTest {
public static void main(String[] args) {
DecimalAccount.demo(new DecimalAccountImp(new BigDecimal("1")));
}
}

class DecimalAccountImp implements DecimalAccount {
// AtomicReference<T> 对象数据类型
private AtomicReference<BigDecimal> balance;

public DecimalAccountImp(BigDecimal balance) {
this.balance = new AtomicReference<>(balance);
}

@Override
public BigDecimal getBalance() {
return balance.get();
}

@Override
public void withdraw(BigDecimal amount) {
while (true) {
BigDecimal prev = balance.get();
BigDecimal next = prev.subtract(amount);
// 底层是 Unsafe 类的方法
if (balance.compareAndSet(prev, next)) {
break;
}
}
}
}

interface DecimalAccount {
BigDecimal getBalance();

void withdraw(BigDecimal amount);

static void demo(DecimalAccount account) {
ArrayList<Thread> threads = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
threads.add(new Thread(() -> {
account.withdraw(new BigDecimal("0.001"));
}));
}
long start = System.nanoTime();
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance() + ", " + (end - start) / 1000);
}

}

AtomicStampedReference

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.pl.demo9_atomic;

import com.pl.util.Sleeper;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicStampedReference;

@Slf4j(topic = "c.test")
public class AtomicStampedReferenceTest {

// 带版本号的原子引用,进行 CAS 是会比较版本号
// 若只需要关心是否有过更改,可用 AtomicMarkableReference,只维护一个布尔值
static AtomicStampedReference<String> ref = new AtomicStampedReference<String>("AAA", 0);

public static void main(String[] args) {
int stamp = ref.getStamp();
log.debug("before sleeping : {}", stamp);
// 途中被其他线程操作
other();
Sleeper.sleep(1);
log.debug("after sleeping: {}", ref.getStamp());
// 此时 (stamp + 1) != ref.getStamp(),修改失败
log.debug("try changing AAA to CCC, result: {}",
ref.compareAndSet("AAA", "BBB", stamp, stamp + 1));

}

private static void other() {
// "AAA"-> "CCC" -> "AAA"
new Thread(() -> {
int stamp = ref.getStamp();
log.debug("before: {}", stamp);
log.debug("try changing AAA to CCC, result: {}",
ref.compareAndSet("AAA", "CCC", stamp, stamp + 1));
log.debug("after: {}", ref.getStamp());
}, "t1").start();

Sleeper.sleep(0.5);

new Thread(() -> {
int stamp = ref.getStamp();
log.debug("before: {}", stamp);
log.debug("try changing CCC to AAA, result: {}",
ref.compareAndSet("CCC", "AAA", stamp, stamp + 1));
log.debug("after: {}", ref.getStamp());
}, "t2").start();
}
}


LongAdder

原子累加器,设置多个累加单元 cell,累加时操作不同的 cell 变量,最后将结果汇总,减少了 CAS 尝试失败

速度比普通的 AtomicLong 快很多

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.pl.demo10_longAdder;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.Supplier;

@Slf4j(topic = "c.test")
public class Test {

public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
demo(
AtomicLong::new,
AtomicLong::getAndIncrement
);
}

log.debug("--------------------");

for (int i = 0; i < 5; i++) {
demo(
LongAdder::new,
LongAdder::increment
);
}

}

// Supplier<T>: 0 参数 -> 1 结果
// Consumer<T>: 1 参数 -> 0 结果
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
ArrayList<Thread> threads = new ArrayList<>();
for (int i = 0; i < 4; i++) {
threads.add(new Thread(() -> {
for (int j = 0; j < 500_000; j++) {
action.accept(adder);
}
}, "t" + i));
}
long start = System.nanoTime();
threads.forEach(Thread::start);
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
long end = System.nanoTime();
log.debug("{}, cost: {} ms", adder, (end - start) / 1000_000);

}
}

LongAdder 源码

三个核心属性:cells、base、cellsBusy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;

/**
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;

/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int cellsBusy;

add() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public void add(long x) {
Cell[] as;
long b, v;
int m;
Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

longAccumulate() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* Handles cases of updates involving initialization, resizing,
* creating new Cells, and/or contention. See above for
* explanation. This method suffers the usual non-modularity
* problems of optimistic retry code, relying on rechecked sets of
* reads.
*
* @param x the value
* @param fn the update function, or null for add (this convention
* avoids the need for an extra field or function in LongAdder).
* @param wasUncontended false if CAS failed before call
*/
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

sum() 方法

1
2
3
4
5
6
7
8
9
10
11
12
public long sum() {
Cell[] as = cells;
Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

利用 AtomicIntegerArray 实现连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package com.pl.demo11_customPool;

import lombok.extern.slf4j.Slf4j;

import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerArray;

public class CustomPoolTest {
public static void main(String[] args) {
Pool pool = new Pool(2);
int threadCount = 5;
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
Connection connection = pool.borrow();
try {
Thread.sleep(new Random().nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
pool.free(connection);
}
}, "t" + (i + 1)).start();
}
}
}

/**
* 自定义连接池
*/
@Slf4j(topic = "c.Test")
class Pool {
/**
* 连接池大小
*/
private final int poolSize;

/**
* 所有的连接
*/
private Connection[] connections;

/**
* 保存每个连接的状态,0 表示空闲,1 表示繁忙
*/
private AtomicIntegerArray states;

public Pool(int poolSize) {
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
// 默认全 0,表示全空闲
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection();
}
}

public Connection borrow() {
while (true) {
for (int i = 0; i < poolSize; i++) {
if (states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("get conn[" + i + "]");
return connections[i];
}
}
}
synchronized (this) {
try {
log.debug("waiting...");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public void free(Connection connection) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == connection) {
// 此处不发生冲突,直接 set
states.set(i, 0);
log.debug("free conn[" + i + "]");
synchronized (this) {
this.notifyAll();
}
}
}
}

class MockConnection implements Connection {
......
}
}

final 关键字

final
1)修饰的类不能被继承
2)修饰的方法不能被子类重写
3)修饰的变量一旦赋值就不能修改(表现为基本数据类型的值不能发生变化,引用类型变量所引用的地址不会发生改变)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class FinalTest {
// 设置 final 变量时,会在 putfield 指令后加入写屏障,保证其他线程读取时不会读到 0
static final int A = 10;
static final int B = Short.MAX_VALUE + 1;
final int a = 20;
final int b = Integer.MAX_VALUE;

static int C = 15;
int c = 30;
}

class UseFinal {
void test() {
// final 修饰的变量对应的字节码
// BIPUSH 10
System.out.println(FinalTest.A);
// LDC 32768
System.out.println(FinalTest.B);
// BIPUSH 20
System.out.println(new FinalTest().a);
// LDC 2147483647
System.out.println(new FinalTest().b);

// 未被 final 修饰的变量对应的字节码
// GETSTATIC com/pl/demo12_final/FinalTest.C : I
System.out.println(FinalTest.C);
// GETFIELD com/pl/demo12_final/FinalTest.e : I
System.out.println(new FinalTest().c);
}
}

如果一个变量满足以下三个条件时,该变量就会成为一个宏变量:
1)被 final 修饰
2)在定义该 final 变量时就指定了初始值
3)该初始值在编译时就能够唯一指定

编译器会把程序所有用到宏变量的地方直接替换成该变量的值,这就是宏替换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.pl.demo12_final;

public class MacroVariableTest {

public static void main(String[] args) {
String hw = "hello world";
String hello = "hello";

// 宏变量,值为 "hello"
final String finalWorld2 = "hello";

// 不是宏变量
final String finalWorld3 = hello;

// 宏变量,值为 "hello"
final String finalWorld4 = "he" + "llo";

// 不是宏变量
String hw1 = hello + " world";

// 相当于 String hw2 = "hello" + " world";
// 也就相当于 String hw2 = "hello world";
String hw2 = finalWorld2 + " world";

String hw3 = finalWorld3 + " world";

// 相当于 String hw4 = "hello" + " world";
// 也就相当于 String hw4 = "hello world";
String hw4 = finalWorld4 + " world";

System.out.println(hw == hw1); // false
System.out.println(hw == hw2); // true
System.out.println(hw == hw3); // false
System.out.println(hw == hw4); // true
}
}

线程池

自定义线程池

taskQueue:任务队列
workers:核心线程的包装
rejectPolicy:拒绝策略
coreSize:最大核心数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
package com.pl.demo13_threadPool;

import com.pl.util.Sleeper;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.test")
public class MyThreadPoolTest {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 3, (queue, task) -> {
// 自定义实现拒绝策略
// 死等
// queue.put(task);
// 带超时
queue.offer(task, 500, TimeUnit.MILLISECONDS);
});
for (int i = 0; i < 8; i++) {
int j = i;
threadPool.execute(() -> {
// 每个任务都霸占线程池中的线程 1 s
Sleeper.sleep(1);
log.debug("{}" ,j);
});
}
}
}

/**
* 拒绝策略
*/
@FunctionalInterface
interface RejectPolicy<T> {
/**
* 拒绝方法
* @param blockingQueue 阻塞队列
* @param task 当前欲执行的任务
*/
void reject(BlockingQueue<T> blockingQueue, T task);
}

@Slf4j(topic = "c.test")
class ThreadPool {
/**
* 任务队列
*/
private final BlockingQueue<Runnable> taskQueue;

/**
* 线程集合
*/
private final HashSet<Worker> workers = new HashSet<>();

/**
* 拒绝策略
*/
private final RejectPolicy<Runnable> rejectPolicy;

/**
* 核心线程数
*/
private final int coreSize;

/**
* 利用线程池执行任务
* @param task 待执行的任务
*/
public void execute(Runnable task) {
// 直接执行或放入阻塞队列
synchronized (workers) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
log.debug("新增worker: {} 用以执行 {}", worker, task);
worker.start();
} else {
// 当队列满时,可有多种执行策略
// 1、死等
// 2、带超时等待
// 3、调用者放弃任务执行
// 4、调用者抛出异常
// 5、调用者自己执行任务
taskQueue.tryPut(rejectPolicy, task);

}
}
}

public ThreadPool(int coreSize, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}

/**
* 对线程进行包装,使其执行完一个任务后不立即结束
*/
class Worker extends Thread {
private Runnable task;

public Worker(Runnable task) {
this.task = task;
}

@Override
public void run() {
// 当前线程不断执行任务
while (task != null || (task = taskQueue.take(10, TimeUnit.SECONDS)) != null) {
try {
log.debug("开始执行 {}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
// 阻塞队列中的所有任务执行完成
synchronized (workers) {
workers.remove(this);
log.debug("移除 {}", this);
}
}
}

}


/**
* 阻塞队列
* @param <T> 队列中的任务类型
*/
@Slf4j(topic = "c.test")
class BlockingQueue<T> {
/**
* 任务队列
* 由于此应用场景只在对头和队尾操作元素,使用 Deque 的 ArrayDeque 实现而不是 LinkedList
*/
private final Deque<T> queue = new ArrayDeque<>();

/**
* 锁
*/
private final ReentrantLock lock = new ReentrantLock();

/**
* 生产者条件变量
*/
private final Condition fullWaitSet = lock.newCondition();

/**
* 消费者条件变量
*/
private final Condition emptyWaitSet = lock.newCondition();

/**
* 最大容量
*/
private final int capacity;

public BlockingQueue(int capacity) {
this.capacity = capacity;
}

public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() >= capacity) {
// 根据自定义拒绝策略执行
rejectPolicy.reject(this, task);
} else {
queue.add(task);
log.debug("将 {} 加入阻塞队列", task);
// 唤醒 emptyWaitSet 上的消费者线程
emptyWaitSet.signalAll();
}
} finally {
lock.unlock();
}
}

/**
* 从阻塞队列中获取任务
* @param timeout 最长等待时间,即该线程暂时无任务时的空闲持续时间
* @param timeUnit 时间单位
* @return T 获取到的任务
*/
public T take(long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.isEmpty()) {
try {
// 超时
if (nanos <= 0) {
return null;
}
// 阻塞队列为空,则无法获取,进入等待
// awaitNanos 返回剩余时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
T t = queue.removeFirst();
// 唤醒 fullWaitSet 上的生产者线程
fullWaitSet.signalAll();
return t;
} finally {
lock.unlock();
}
}

/**
* 往阻塞队列中添加任务,死等
* @param task 待添加的任务
*/
public void put(T task) {
lock.lock();
try {
while (queue.size() >= capacity) {
try {
// 阻塞队列为满,则无法添加,进入等待
fullWaitSet.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.add(task);
log.debug("将 {} 加入阻塞队列", task);
// 唤醒 emptyWaitSet 上的消费者线程
emptyWaitSet.signalAll();
} finally {
lock.unlock();
}
}

/**
* 带超时时间的添加任务
* @param task 待添加的任务
* @param timeout 超时时间
* @param timeUnit 时间单位
* @return 返回是否获取成功
*/
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() >= capacity) {
try {
if (nanos < 0) {
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.add(task);
log.debug("将 {} 加入阻塞队列", task);
emptyWaitSet.signalAll();
return true;
} finally {
lock.unlock();
}
}

/**
* 获取当前阻塞队列中任务的个数
* @return 队列中任务个数
*/
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}

JDK 提供的线程池

ThreadPoolExecutor

其中使用 AtomicInteger 类型的 ctl 的高 3 位标识线程池状态,低 29 位表示线程数量

状态名 对应 ctl 的高 3 位 说明
RUNNING 111 可接收新任务,可处理阻塞队列任务
SHUTDOWN 000 不接收新任务,可处理阻塞队列任务
STOP 001 不接收新任务,不理阻塞队列任务
TIDYING 010 任务全部执行完毕,活动线程为 0,即将进入终结
TERMINATED 011 已经终结

构造方法:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

corePoolSize – 核心线程数(创建后就一直保留)
maximumPoolSize – 最大线程数(核心线程数 + 救急线程数)
keepAliveTime – 生存时间。针对救急线程
unit – 时间单位。针对救急线程的生存时间
workQueue – 阻塞队列
threadFactory – 线程工厂,主要用于为线程起名字
handler – 拒绝策略。当阻塞队列满,由救急线程执行任务,若救急线程也忙,则执行拒绝策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@Slf4j(topic = "c.test")
public class ThreadPoolExecutorTest {

public static void main(String[] args) {

// 固定大小线程池,无救急线程
// 采用 LinkedBlockingQueue 阻塞队列
ExecutorService pool1 = Executors.newFixedThreadPool(2);
// testPool(pool1);

// 固定一个核心线程,无救急线程
// 用于多个任务排队执行
ExecutorService pool2 = Executors.newSingleThreadExecutor();
// testPool(pool2);

// 无核心线程,无限救急线程,救急线程存活时间为 1 min
// 采用 SynchronousQueue 阻塞队列,其容量为 0,不能存储,放入与取出必须一一对应
// 适用于任务密集,且每个任务执行时间短的场景
ExecutorService pool3 = Executors.newCachedThreadPool();
// synchronusQueueTest();

// 按计划执行任务。任务之间不会产生影响
ScheduledExecutorService pool4 = Executors.newScheduledThreadPool(1);
/*
// 固定频率执行
pool4.scheduleAtFixedRate(() -> {
log.debug("running");
}, 1, 1, TimeUnit.SECONDS);
*/
// 延时执行
log.debug("begin");
pool4.schedule(() -> {
log.debug("task1");
Sleeper.sleep(3);
}, 1, TimeUnit.SECONDS);
pool4.schedule(() -> {
log.debug("task2");
int i = 1 / 0;
}, 1, TimeUnit.SECONDS);
pool4.schedule(() -> {
log.debug("task3");
}, 1, TimeUnit.SECONDS);

}

/**
* 验证 SynchronousQueue 性质
*/
private static void synchronusQueueTest() {
SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> {
try {
log.debug("putting {}", 1);
integers.put(1);
log.debug("putted {}", 1);
log.debug("putting {}", 2);
integers.put(2);
log.debug("putted {}", 2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "t1").start();
Sleeper.sleep(1);
new Thread(() -> {
try {
log.debug("taking {}", 1);
integers.take();
log.debug("took {}", 1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "t2").start();
Sleeper.sleep(1);
new Thread(() -> {
try {
log.debug("taking {}", 2);
integers.take();
log.debug("took {}", 2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, "t3").start();
}

private static void testPool(ExecutorService pool1) {
pool1.execute(() -> {
log.debug("1");
Sleeper.sleep(1);
});
pool1.execute(() -> {
log.debug("2");
Sleeper.sleep(1);
});
pool1.execute(() -> {
log.debug("3");
Sleeper.sleep(1);
});
}
}

锁相关概念

锁:"避免发生混乱"的机制

  • 自旋锁和阻塞锁:

     自旋锁是一种基于忙等待的锁机制,而阻塞锁是一种基于线程阻塞的锁机制。
     自旋锁使用忙等待的策略,即当线程请求获取锁时,如果发现锁已被其他线程占用,该线程会一直在一个循环中自旋等待,直到锁可用为止,通常在低竞争情况下使用,以避免线程切换的开销;
     阻塞锁同步机制,当线程请求获取锁时,如果发现锁已被其他线程占用,该线程会被阻塞并释放CPU资源,直到锁可用为止,通常在高竞争情况下使用,以避免资源浪费和提供更好的公平性。
    
  • 乐观锁和悲观锁:

     乐观锁假设并发冲突的概率低,在修改共享数据之前不会阻塞其他线程的访问,而是在更新数据时通过版本号、时间戳等机制来检测冲突并解决,如果检测到冲突,则进行回滚或重试操作,适用于读多写少的场景;
     悲观锁假设并发冲突的概率高,线程在访问共享数据时会将其锁定,以防止其他线程同时访问或修改,保证数据的一致性,适用于写多读少或写写冲突频繁的场景。
    
  • 可重入锁和不可重入锁:

     可重入锁是一种允许同一个线程多次获取锁的机制,而不可重入锁不允许同一个线程多次获取锁。
     可重入锁机制下,一个线程已经获得了锁,并且再次请求获取该锁时,请求会成功并计算锁的持有次数,线程需要释放相同次数的锁才能完全释放锁。可以避免死锁和提供更高的灵活性;
     不可重入锁机制下,一个线程已经获得了锁,并且再次请求获取该锁时,请求会被阻塞,直到该锁被释放,可能导致线程在持有锁期间的阻塞或死锁。
    
  • 偏向锁、轻量级锁和重量级锁:

     偏向锁、轻量级锁和重量级锁是 JVM 在不同竞争情况下的渐进的锁优化升级机制
     偏向锁针对无竞争情况进行优化,轻量级锁针对低竞争情况进行优化,而重量级锁针对高竞争情况进行优化。
    
  • 共享锁和独占锁:

     共享锁允许多个线程同时读取共享资源,而独占锁只允许一个线程独占访问共享资源。
     共享锁机制下,多个线程可以同时持有共享锁,并行地读取数据,但在持有共享锁期间,其他线程无法获取独占锁。共享锁用于读多写少的场景,可以提高并发性能;
     独占锁机制下,一个线程持有独占锁时,其他线程无法同时获取独占锁或共享锁,独占锁用于需要确保数据完整性和一致性的场景,一次只允许一个线程修改数据。
    
  • 公平锁和非公平锁:

     公平锁按照线程请求锁的顺序来获取锁,先到先得。非公平锁则允许后来的线程在竞争中插队。
     公平锁提供了公平性,但可能导致线程切换开销增加;
     非公平锁可能提供更好的吞吐量,但可能导致某些线程长时间无法获得锁。
    
  • 表锁和行锁:

     表锁是对整个数据库表进行锁定,行锁是对表中的单行记录进行锁定。
     表锁的粒度较大,可能导致并发性能下降,但简单且适用于某些特定场景。
     行锁的粒度较小,可以提高并发性能,但需要更复杂的锁管理机制。
    

Lock 接口与 AQS

synchronized 无法破坏“不可抢占”条件。
因为 synchronized 申请资源时,若申请不到,线程就被阻塞了,此时阻塞态的线程无所作为,也释放不了线程已占有的资源。故提供另一种方式加锁:Lock 接口

自定义一种不可重入阻塞锁机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package com.pl.demo14_lockAndAqs;

import com.pl.util.Sleeper;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.*;

@Slf4j(topic = "c.test")
public class Test {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
log.debug("locking");
try {
Sleeper.sleep(1);
} finally {
log.debug("unlocking");
lock.unlock();
}
}, "t1").start();
new Thread(() -> {
lock.lock();
log.debug("locking");
// 不可重入
lock.lock();
log.debug("locking");
try {
Sleeper.sleep(1);
} finally {
log.debug("unlocking");
lock.unlock();
}
}, "t2").start();
new ReentrantLock();
}
}

class MyLock implements Lock {

/**
* 基于 AQS 实现的一种不可重入阻塞锁机制
*/
class MySync extends AbstractQueuedSynchronizer {
/**
* 0 表示无锁,1 表示有锁
*/
// private volatile int state;

/**
* 获取独占锁
* @param arg 实现可重入锁时用来计数,不可重入锁不需要
* @return 是否获取成功
*/
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
// 类似 monitor 设置 owner 线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

/**
* 释放锁
* @param arg 实现可重入锁时用来计数,不可重入锁不需要
* @return 是否释放成功
*/
@Override
protected boolean tryRelease(int arg) {
// 注意以下两条语句执行先后顺序:
// 先释放,再标识
setExclusiveOwnerThread(null);
// state 由 volatile 修饰
setState(0);
return true;
}

/**
* 检测当前线程是否持有独占锁
* @return 是否持有独占锁
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}

public Condition newCondition() {
return new ConditionObject();
}
}

private final MySync sync;

public MyLock() {
sync = new MySync();
}


/**
* 普通加锁
*/
@Override
public void lock() {
sync.acquire(1);
}

/**
* 特殊加锁
* 破坏“不可抢占”条件(某个线程申请不到其想要的资源时,主动释放它已占有的资源)之
* 能响应中断:阻塞态的线程能够响应中断信号,就有机会释放曾经持有的锁
*/
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

/**
* 特殊加锁
* 破坏“不可抢占”条件之
* 非阻塞地获取锁:如果尝试获取锁失败,并不进入阻塞状态,而是直接返回
*/
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}

/**
* 特殊加锁
* 破坏“不可抢占”条件之
* 支持超时:若线程在一段时间内,都没有获取到锁,不是进入阻塞态,而是返回一个错误
* @param time 最大等待时间
* @param unit 时间单位
* @return 是否成功获取锁
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

/**
* 释放锁
*/
@Override
public void unlock() {
sync.release(1);
}

@Override
public Condition newCondition() {
return sync.newCondition();
}
}

ReentrantLock 原理

假设 T0 线程一直保持锁,使得后续的 T1、T2、T3 线程被阻塞,如图:

加锁流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public void lock() {
sync.lock();
}

// class NonfairSync extends Sync
final void lock() {
if (compareAndSetState(0, 1))
// 加锁成功,设 owner 为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 否则进行尝试
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

// AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 当前线程为阻塞队列第一个,尝试获得锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

// class Sync extends AbstractQueuedSynchronizer
// 返回当前线程是否获取到锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 再次尝试加锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 锁重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

此时 T0 线程执行完毕,释放锁,假设有竞争,T0 线程执行完毕释放锁后,被新线程 T4 抢到锁,则 T4 开始执行,T1 继续阻塞;

假设没有竞争,则 T1 线程获得锁,如图:

释放锁流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void unlock() {
sync.release(1);
}

// AbstractQueuedSynchronizer#release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

// ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

假设 T0 获得锁后,进入条件变量 ConditionObject1 的等待队列,T0 释放锁,T1 获得锁,T1 获得锁后,也进入条件变量 ConditionObject1 的等待队列,T1 释放锁,T2 获得锁,如图:

条件变量 await():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// AbstractQueuedSynchronizer.ConditionObject#await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

// AbstractQueuedSynchronizer#fullyRelease
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

假设 T2 获得锁后,调用 ConditionObject1 的 signal(),唤醒 T0 并将其加入阻塞队列,如图:

条件变量 signal():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// AbstractQueuedSynchronizer.ConditionObject#signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
// If cannot change waitStatus, the node has been cancelled.
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

ReentrantReadWriteLock原理

读锁和写锁共用一个 state,写锁使用低 16 位,读锁使用高 16 位

写锁上锁流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// ReentrantReadWriteLock.WriteLock
public void lock() {
sync.acquire(1);
}

// AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// ReentrantReadWriteLock.Sync
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// state 不为 0 表示有读锁或写锁
// 为读锁或者为其他线程的写锁,则获取失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 为自己的写锁
// 重入次数溢出
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 锁重入
setState(c + acquires);
return true;
}
// state 为 0 表示目前没有锁,则尝试加锁
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

读锁上锁流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// ReentrantReadWriteLock.WriteLock
public void lock() {
sync.acquireShared(1);
}

// AbstractQueuedSynchronizer
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

//AbstractQueuedSynchronizer
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 有条件争抢锁
if (p == head) {
// 尝试上锁
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

// ReentrantReadWriteLock.Sync
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
// 有其他线程的写锁,则失败
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}


StampedLock

带戳的读写锁,适用于读多写少的场景,不支持可重入,原理是乐观读。

读取时,先乐观读 stamp = sl.tryOptimisticRead();,若validate 失败,则升级为读锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.pl.demo15_reentrantRWLockAndStampedLock;

import com.pl.util.Sleeper;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.StampedLock;

@Slf4j(topic = "c.test")
public class StampedLockTest {
public static void main(String[] args) {
DataContainerWithStampedLock data = new DataContainerWithStampedLock("data");
new Thread(() -> {
log.debug("{}", data.read());
}, "t1").start();
Sleeper.sleep(0.5);
new Thread(() -> {
log.debug("{}", data.read());
}, "t2").start();
Sleeper.sleep(0.5);
new Thread(() -> {
data.write("new1");
}, "t3").start();
}
}

@Slf4j(topic = "c.test")
class DataContainerWithStampedLock {
private Object data;

private StampedLock sl = new StampedLock();

public DataContainerWithStampedLock(Object data) {
this.data = data;
}

public Object read() {
log.debug("begin");
// 返回 stamp
long stamp = sl.tryOptimisticRead();
Sleeper.sleep(2);
log.debug("tryOptimisticRead {}", stamp);
if (sl.validate(stamp)) {
log.debug("optimisticRead finish {}", stamp);
return data;
}
// 升级成读锁
log.debug("update to readLock");
try {
stamp = sl.readLock();
log.debug("read lock {}", stamp);
log.debug("read lock finish {}", stamp);
return data;
} finally {
log.debug("readLock unlock");
sl.unlockRead(stamp);
}
}

public void write(Object data) {
long stamp = sl.writeLock();
log.debug("writeLock() {}", stamp);
try {
log.debug("writing");
Sleeper.sleep(2);
this.data = data;
} finally {
log.debug("unlockWrite()");
sl.unlockWrite(stamp);
}
}

}

Semaphore

信号量。用来限制同时访问共享资源的线程上限。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.pl.demo16_semaphoreAndCountDownLatchAndCyclicBarrier;

import com.pl.util.Sleeper;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Semaphore;

@Slf4j(topic = "c.test")
public class SemaphoreTest {
public static void main(String[] args) {
// 限制同时最多 3 个线程
Semaphore s = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
// 获得
s.acquire();
log.debug("acquire");
Sleeper.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 释放
s.release();
log.debug("release");
}
}, "t" + i).start();
}
}
}

acquire()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// Semaphore
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

// Semaphore.Sync
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

// AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}


private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

release()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void release() {
sync.releaseShared(1);
}

// Semaphore.Sync
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

// AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

CountDownLatch 和 CyclicBarrier

CountDownLatch

允许一个或多个线程,等待其他一组线程完成操作,再继续执行。

用于进行线程同步协助,await() 用于等待计数归零,countDown() 将计数减一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.pl.demo16_semaphoreAndCountDownLatchAndCyclicBarrier;

import com.pl.util.Sleeper;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;

@Slf4j(topic = "c.test")
public class CountdownLatchTest {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(2);
new Thread(() -> {
log.debug("begin");
Sleeper.sleep(1);
latch.countDown();
log.debug("end");
}, "t1").start();

new Thread(() -> {
log.debug("begin");
Sleeper.sleep(2);
latch.countDown();
log.debug("end");
}, "t2").start();

new Thread(() -> {
log.debug("begin");
Sleeper.sleep(3);
latch.countDown();
log.debug("end");
}, "t3").start();

try {
log.debug("waiting");
latch.await();
log.debug("end");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

private final Sync sync;

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
sync.releaseShared(1);
}

public long getCount() {
return sync.getCount();
}
}

CyclicBarrier

允许一组线程相互之间等待,达到一个共同点,再继续执行

支持复用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.pl.demo16_semaphoreAndCountDownLatchAndCyclicBarrier;

import com.pl.util.Sleeper;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j(topic = "c.test")
public class CyclicBarrierTest {
public static void main(String[] args) {

ExecutorService pool = Executors.newFixedThreadPool(2);

CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
log.debug("task1 task2 end");
});

for (int i = 0; i < 3; i++) {
// 计数为 0 时,下次会重置
pool.submit(() -> {
log.debug("begin");
Sleeper.sleep(1);
try {
// 2 - 1 == 1 >= 0 允许执行
cyclicBarrier.await();
log.debug("end");
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
});

pool.submit(() -> {
log.debug("begin");
Sleeper.sleep(3);
try {
// 1 - 1 == 0 >= 0 允许执行
cyclicBarrier.await();
log.debug("end");
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
}
}
}

LinkedBlockingQueue 和 ConcurrentLinkedQueue

使用两把锁,同一时刻允许一个消费者线程和一个生产者线程同时执行,生产者线程间仍然串行,消费者线程间仍然串行

链表中存在一个 Dummy 节点以及 last、head 指针

1
2
3
4
5
// 用于 put/offer,保证 last 节点的线程安全
private final ReentrantLock putLock = new ReentrantLock();

// 用于 take/poll,保证 head 节点的线程安全
private final ReentrantLock takeLock = new ReentrantLock();

put()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// count 表示元素数量
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 满了等待
while (count.get() == capacity) {
// 等待 putLock 的条件变量:notFull
notFull.await();
}
// 有空位,入队
enqueue(node);
// 计数加一
c = count.getAndIncrement();
// 自己 put 之后,队列还有空位,唤醒一个 put 线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果队列中有一个元素, 叫醒一个 take 线程
if (c == 0)
// 调用的是 notEmpty.signal() 而不是 notEmpty.signalAll() 是为了减少竞争
signalNotEmpty();
}

take()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果队列中只有一个空位时, 叫醒 put 线程
// 如果有多个线程进行出队, 第一个线程满足 c == capacity, 但后续线程 c < capacity
if (c == capacity)
// 这里调用的是 notFull.signal() 而不是 notFull.signalAll() 是为了减少竞争
signalNotFull()
return x;
}

ConcurrentLinkedQueue

将 LinkedBlockingQueue 的锁用 CAS 实现

CopyOnWriteArrayList

采读写分离,空间换时间,适用于读多写少的情况,存在读弱一致性。

并发与一致性是矛盾的,需要权衡

写操作时,会创建一份副本,在副本上操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 创建副本
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

读操作时没有上锁

1
2
3
4
5
6
7
8
9
public void forEach(Consumer<? super E> action) {
if (action == null) throw new NullPointerException();
Object[] elements = getArray();
int len = elements.length;
for (int i = 0; i < len; ++i) {
@SuppressWarnings("unchecked") E e = (E) elements[i];
action.accept(e);
}
}