diff --git a/docs/JUC/JUC.md b/docs/JUC/JUC.md index d2a3a32..a026e07 100644 --- a/docs/JUC/JUC.md +++ b/docs/JUC/JUC.md @@ -1,5 +1,7 @@ ## 死锁 +死锁是指两个或两个以上的线程(或进程)在运行过程中,因为资源竞争而造成相互等待的现象,若无外力作用则不会解除等待状态,它们之间的执行都将无法继续下去。 + Java 死锁产生的四个必要条件: 1. 互斥条件,即当资源被一个线程使用(占有)时,别的线程不能使用 @@ -11,7 +13,9 @@ Java 死锁产生的四个必要条件: 定位死锁: -检测死锁可以使用 **jconsole**工具,或者使用 **jps 定位进程 id,再用 jstack 定位死锁** +通过`jps+jstack`工具排查:`jps` + `jstack -l ` + +通过`jconsole`工具排查:首先按`win+r`调出运行窗口,然后输入`JConsole`命令 Linux 下可以通过 top 先定位到 CPU 占用高的 Java 进程,再利用 `top -Hp 进程id` 来定位是哪个线程,最后再用 jstack 的输出来看各个线程栈 @@ -23,7 +27,7 @@ Java 中的每一个对象都可以作为锁。 具体表现为以下 3 种形 1. 对于普通同步方法,锁是当前实例对象。 2. 对于静态同步方法,锁是当前类的 Class 对象。 -3. 对于同步方法块,锁是 synchonized 括号里配置的对象。 +3. 对于同步代码块,锁是 synchonized 括号里配置的对象。 注意: @@ -246,7 +250,9 @@ ObjectMonitor() { ## Java内存模型 -线程之间的共享变量存储在主内存(Main Memory)中,每个线程都有一个私有的本地内存(Local Memory),本地内存中存储了该线程以读/写共享变量的副本。本地内存是 JMM 的一个抽象概念,并不真实存在。它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优化。 +Java内存模型(JMM)定义了Java程序中多线程访问主存和工作内存的规范,确保多线程并发访问时的可见性、有序性和原子性。 + +线程之间的共享变量存储在主内存(Main Memory)中,每个线程都有一个私有的本地内存(Local Memory),本地内存中存储了该线程以读/写共享变量的副本。本地内存是 JMM 的一个抽象概念,并不真实存在,它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优化。 ![](./JUC/JMM.png) @@ -266,7 +272,7 @@ ObjectMonitor() { 一个或者多个操作在 CPU 执行的过程中不被中断的特性称为原子性。 -在Java中使用了synchronized 或者`Lock`锁接口的实现类来确保原子性。 +在 Java 中,可以借助`synchronized`、各种 `Lock` 以及各种原子类实现原子性。 **有序性** @@ -274,6 +280,8 @@ JMM 允许编译器和 CPU 优化指令顺序,但通过内存屏障机制和 ` `happens-before` 规则来保证原子性、可见性以及有序性。 + + ### volatile volatile 在多处理器开发中保证了共享变量的**可见性**。可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。**主要作用是保证可见性和禁止指令重排优化。** @@ -302,21 +310,6 @@ volatile重排序规则: * 当第二个操作是 volatile 写时,不管第一个操作是什么,都不能重排序。这个规则确保 volatile 写之前的操作不会被编译器重排序到 volatile 写之后。 * 当第一个操作是 volatile 读时,不管第二个操作是什么,都不能重排序。这个规则确保 volatile 读之后的操作不会被编译器重排序到 volatile 读之前。 -为了实现 volatile 的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。 - -* 在每个 volatile 写操作的前面插入一个 StoreStore 屏障。 - -* 在每个 volatile 写操作的后面插入一个 StoreLoad 屏障。 -* 在每个 volatile 读操作的后面插入一个 LoadLoad 屏障。 -* 在每个 volatile 读操作的后面插入一个 LoadStore 屏障。 - -| 屏障类型 | 指令示例 | 说明 | -| ---------- | --------------------------- | ------------------------------------------------------------ | -| LoadLoad | Load1; LoadLoad; Load2; | 确保`Load1`指令数据的装载,发生于`Load2`及后续所有装载指令的数据装载之前。 | -| StoreStore | Store1; StoreStore; Store2; | 确保`Store1`数据的存储对其他处理器可见(刷新到内存中),并发生于`Store2`及后续所有存储指令的数据写入之前。 | -| LoadStore | Load1; LoadStore; Store2; | 确保`Load1`指令数据的装载,发生于`Store2`及后续所有存储指令的数据写入之前。 | -| StoreLoad | Store1; StoreLoad; Load2; | 确保`Store1`数据的存储对其他处理器可见(刷新到内存中),并发生于`Load2`及后续所有装载指令的数据装载之前。`StoreLoad Barriers`会使该屏障之前的所有内存访问指令(存储和装载)完成之后,才执行该屏障之后的内存访问指令。 | - synchronized 无法禁止指令重排和处理器优化,为什么可以保证有序性可见性? @@ -412,7 +405,9 @@ public class Singleton { ## Java线程 -现代操作系统在运行一个程序时,会为其创建一个进程。例如,启动一个 Java 程序,操作系统就会创建一个 Java 进程。现代操作系统调度的最小单元是线程,也叫轻量级进程(Light Weight Process),在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能够访问共享的内存变量。 +进程是程序的一次执行过程,是系统运行程序的基本单位,进程是动态的。 + +线程与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中可以产生多个线程。线程共享进程的堆和方法区资源,但每个线程有自己的程序计数器、虚拟机栈和本地方法栈,线程也被称为轻量级进程。 @@ -429,7 +424,7 @@ public class Singleton { ![](./JUC/线程状态.jpeg) -**NEW → RUNNABLE**: +**NEW --> RUNNABLE**: 当调用 t.start() 方法时,由 NEW → RUNNABLE @@ -489,7 +484,7 @@ Daemon 线程是一种支持型线程(常被叫做守护线程),因为它主 ### 创建和运行线程 -**方法一:继承Thread** +**方法一:继承Thread重写run方法** ```java Thread t = new Thread() { @@ -537,6 +532,8 @@ System.out.println("result = " + result); **方法四:使用线程池** +**其实在`Java`中,创建线程的方式就只有一种:调用`Thread.start()`方法** + ### 查看进程线程方法 @@ -590,9 +587,9 @@ Thread 类 API: **run和start**: -直接调用 run 是在主线程中执行了 run,没有启动新的线程 ; +* 直接调用 run 是在主线程中执行了 run,没有启动新的线程 ; -使用 start 是启动新的线程,通过新的线程间接执行 run 中的代码。 +* 使用 start 是启动新的线程,通过新的线程间接执行 run 中的代码。 **sleep和yield**: @@ -614,6 +611,8 @@ yield : + + ### 过期的 suspend()、resume()和 stop() 不推荐使用的方法,这些方法已过时,容易破坏同步代码块,造成线程死锁: @@ -1654,6 +1653,9 @@ protected boolean tryReleaseShared(int releases) { 当调用 `await()` 方法的时候,如果 `state` 不为 0,那就证明任务还没有执行完毕,`await()` 方法就会一直阻塞。直到`count` 个线程调用了`countDown()`使 state 值被减为 0,或者调用`await()`的线程被中断,该线程才会从阻塞中被唤醒,`await()` 方法之后的语句得到执行。 ```java +public void await() throws InterruptedException { + sync.acquireSharedInterruptibly(1); +} // AbstractQueuedSynchronizer类 → acquireSharedInterruptibly()方法 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { @@ -1720,7 +1722,7 @@ public static void main(String[] args) { 与 CountDownLatch 的区别: -1. `CountDownLatch` 的实现是基于 AQS 共享模式的,而 `CycliBarrier` 是基于AQS 独占模式(ReentrantLock) 和 `Condition` 的。 +1. `CountDownLatch` 的实现是基于 AQS 共享模式的,而 `CycliBarrier` 是基于AQS 独占模式( `ReentrantLock` ) 和 `Condition` 的。 2. CountDownLatch 是一个线程阻塞等待其他线程到达一个节点之后才能继续执行,这个过程其他线程不会阻塞;CyclicBarrier是各个线程阻塞等待所有线程都达到一个节点后,所有线程继续执行。 @@ -1732,9 +1734,7 @@ public static void main(String[] args) { Semaphore 可以用于做流量控制,特别是公用资源有限的应用场景。 -`Semaphore` 是通过 `AQS` 实现的。AQS 维护了一个 FIFO(先进先出)的同步队列和一个同步状态 `state`,该状态用来表示剩余的许可数量。 - - +`Semaphore` 是通过 `AQS` 实现的。AQS 维护了一个 FIFO(先进先出)的同步队列和一个同步状态 `state`,该状态用来表示剩余的许可数量。调用`semaphore.acquire()` ,线程尝试获取许可证,如果 `state >= 0` 的话,则表示可以获取成功。如果获取成功的话,使用 CAS 操作去修改 `state` 的值 `state=state-1`。如果 `state<0` 的话,则表示许可证数量不足。此时会创建一个 Node 节点加入阻塞队列,挂起当前线程。 ```java public class SemaphoreTest { @@ -1842,11 +1842,402 @@ protected final boolean tryReleaseShared(int releases) { +## 并发容器 + +### BlockingQueue + +阻塞队列与普通队列最大的不同点在于:支持队列内元素的阻塞添加与阻塞弹出。 + +```java +public interface BlockingQueue extends Queue { + // 如果队列未满则将元素e插入队列尾部,插入成功返回true, + // 如果队列已满,则抛IllegalStateException异常 + boolean add(E e); + + // 如果队列未满则将元素e插入队列尾部,插入成功返回true + boolean offer(E e); + + // 如果队列未满则将元素e插入队列尾部,插入成功返回true, + // 如果该队列已满,则在指定的等待时间之内阻塞至可用空间出现 + // 如果超出指定时间还未将元素插入队列则返回(可响应线程中断) + boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; + + // 将元素插入队列的尾部,如果该队列已满,则一直阻塞等待 + void put(E e) throws InterruptedException; + + // 获取并移除队列的头部元素,如果没有元素则阻塞等待, + // 直到有线程添加元素后再唤醒等待线程执行该操作 + E take() throws InterruptedException; + + // 获取并移除队列的头部元素,在指定的等待时间之内阻塞等待获取元素, + // 如果超出指定时间还未获取到元素则返回(可响应线程中断) + E poll(long timeout, TimeUnit unit) throws InterruptedException; + + // 从队列中移除某个指定元素,移除成功返回true,没有该元素则返回false + boolean remove(Object o); + + // 获取队列剩余的可用空位 + // 假设队列长度为10,已有3个元素,调用该方法则返回7 + int remainingCapacity(); + + // 检查队列中是否存在指定元素,存在返回true,反之false + public boolean contains(Object o); +} +``` + +**ArrayBlockingQueue**: + +```java +public class ArrayBlockingQueue extends AbstractQueue + implements BlockingQueue, java.io.Serializable { + // ArrayBlockingQueue构造器:指定队列长度 + public ArrayBlockingQueue(int capacity) { + this(capacity, false); + } + // 构造器:指定队列长度与公平模式 + public ArrayBlockingQueue(int capacity, boolean fair) { + if (capacity <= 0) + throw new IllegalArgumentException(); + this.items = new Object[capacity]; + lock = new ReentrantLock(fair); + notEmpty = lock.newCondition(); + notFull = lock.newCondition(); + } + + // 内部存储元素的数组结构 + final Object[] items; + + // 记录获取元素的下标(take、poll、peek、remove方法都会用到) + int takeIndex; + + // 记录添加元素的下标(put、offer、add方法都会用到) + int putIndex; + + // 当前队列中元素的数量 + int count; + + // 控制并发的ReentrantLock锁对象 + final ReentrantLock lock; + + // 用于控制获取元素线程的condition对象 + private final Condition notEmpty; + + // 用于控制添加元素线程的condition对象 + private final Condition notFull; + + // 迭代器对象 + transient Itrs itrs = null; +} +``` + +`ArrayBlockingQueue`内部使用一个数组成员`items`存储所有的队列元素,分别使用三个数值:`takeIndex`、`putIndex`以及`count`记录添加与获取元素的数组位置与队列中的元素个数,同时内部使用`ReentrantLock`解决线程安全问题,用两个`Condition`对象:`notEmpty`、`notFull`控制“写”线程与“读”线程的阻塞。 + +```java +// ArrayBlockingQueue类 → put()方法 +public void put(E e) throws InterruptedException { + // 检查元素是否为空,为空则抛出空指针异常 + checkNotNull(e); + // 获取ReentrantLock成员锁对象 + final ReentrantLock lock = this.lock; + // 可响应中断式获取锁 + lock.lockInterruptibly(); + try { + // 如果队列元素已满 + while (count == items.length) + // 阻塞当前添加元素的线程 + notFull.await(); + // 如果队列元素未满则执行添加操作 + enqueue(e); + } finally { + // 释放锁 + lock.unlock(); + } +} +// ArrayBlockingQueue类 → enqueue()方法 +private void enqueue(E x) { + // 获取存储元素的items数组成员 + final Object[] items = this.items; + // 将元素放在数组的putIndex下标位置 + items[putIndex] = x; + // 对putIndex+1,+1后如果=数组长度了则重置为0 + if (++putIndex == items.length) + putIndex = 0; + // 记录队列元素的数值count+1 + count++; + // 唤醒等待获取队列元素的线程 + notEmpty.signal(); +} +``` + + + +**LinkedBlockingQueue**: + +LinkedBlockingQueue采用了读写分离的思想提升了容器整体的吞吐量。 + +```java +public class LinkedBlockingQueue extends AbstractQueue + implements BlockingQueue, java.io.Serializable { + // 构造器:可指定队列长度 + public LinkedBlockingQueue(int capacity) { + // 如果指定的队列长度为0或小于0则抛出异常 + if (capacity <= 0) throw new IllegalArgumentException(); + // 将传入的指定长度赋值给capacity成员 + this.capacity = capacity; + // 初始化空的节点作为队列头节点 + last = head = new Node(null); + } + // 构造器:不指定长度默认则为Integer.MAX_VALUE + public LinkedBlockingQueue() { + this(Integer.MAX_VALUE); + } + + // LinkedBlockingQueue类 → Node内部类 + static class Node { + // 当前节点存储的元素本身 + E item; + // 当前节点的后继节点 + Node next; + // 构造器 + Node(E x) { item = x; } + } + + // 队列的长度(可以指定长度,默认为Integer.MAX_VALUE) + private final int capacity; + + // 原子计数器:记录队列中元素的个数 + private final AtomicInteger count = new AtomicInteger(); + + // 队列(内部链表)的头节点 + transient Node head; + + // 队列(内部链表)的尾节点 + private transient Node last; + + // 读锁:线程从队列中获取元素时,使用这把锁 + private final ReentrantLock takeLock = new ReentrantLock(); + + // 获取元素时,队列为空,线程加入该condition队列等待 + private final Condition notEmpty = takeLock.newCondition(); + + // 写锁:线程向队列中添加元素时,使用这把锁 + private final ReentrantLock putLock = new ReentrantLock(); + + // 添加元素时,队列已满,线程加入该condition队列等待 + private final Condition notFull = putLock.newCondition(); +} +``` + +`LinkedBlockingQueue`因为是基于链表结构实现的队列容器,所以通过`Node`内部类构建了一个单向链表,同时使用`AtomicInteger`原子类记录队列中元素数量,`head、last`分别指向队列的头部以及尾部,同时使用`takeLock、putLock`两个`ReentrantLock`控制队列容器的读写并发访问。 + +```java +// LinkedBlockingQueue类 → put()方法 +public void put(E e) throws InterruptedException { + // 如果元素为空则抛出空指针异常 + if (e == null) throw new NullPointerException(); + int c = -1; + // 将要添加的元素封装成node节点 + Node node = new Node(e); + // 拿到写锁 + final ReentrantLock putLock = this.putLock; + // 获取当前队列的元素数量 + final AtomicInteger count = this.count; + // 可响应中断式加锁 + putLock.lockInterruptibly(); + try { + // 如果队列已满 + while (count.get() == capacity) { + // 挂起当前线程 + notFull.await(); + } + // 如果队列未满,将封装的node节点加入队列 + enqueue(node); + // 更新count计数器并获取更新前的count值 + c = count.getAndIncrement(); + // 如果队列还未满 + if (c + 1 < capacity) + // 唤醒下一个添加线程,执行元素添加操作 + notFull.signal(); + } finally { + // 释放锁 + putLock.unlock(); + } + if (c == 0) + // 如果存在元素则唤醒take线程 + signalNotEmpty(); +} + +// LinkedBlockingQueue类 → enqueue()方法 +private void enqueue(Node node) { + // 将新来的节点添加到链表的尾部 + last = last.next = node; +} +``` + + + +### CopyOnWrite + +写时复制容器是计算机程序设计领域惯用的一种优化思想,其核心思想是,如果有多个调用者同时请求相同资源,他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源的内容时,系统才会真正复制一份专用副本给该调用者,而其他调用者所见到的最初的资源仍然保持不变,修改完成后再修改原有引用指向。 + + + +在JUC包中,写时复制容器主要提供了两种:`CopyOnWriteArrayList`与`CopyOnWriteArraySet`,在使用这两个容器时,读操作不会加锁,写操作时则会先获取锁,然后再复制一份原有数据进行修改,修改完成后再修改原有引用指向。 + +CopyOnWrite缺陷: + +* 内存占用问题。因为CopyOnWrite容器每次在发生修改时都会复制一个新的数组,所以当数组数据过大时对内存消耗比较高。 + +* 数据不一致性问题。CopyOnWrite容器保证的是最终一致性,由于写操作是在复制的数据副本上进行的,所以读操作不能立即看到写操作的结果。 +* 不适合频繁修改的场景: `CopyOnWrite` 适合读多写少的场景,如果写操作频繁,复制操作的开销会变得很大,影响性能。 + + + +```java +public E get(int index) { + return get(getArray(), index); +} + +// CopyOnWriteArrayList类 → set()方法 +public E set(int index, E element) { + // 获取锁对象并加锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + // 获取内部存储数据的数组成员:array + Object[] elements = getArray(); + // 获取数组中指定下标原有的数据 + E oldValue = get(elements, index); + // 如果指定下标位置原本存储的数据与新的数据不同 + if (oldValue != element) { + // 获取数组的长度 + int len = elements.length; + // 拷贝一个新的数组对象 + Object[] newElements = Arrays.copyOf(elements, len); + // 将指定下标位置的元素修改为指定的数据 + newElements[index] = element; + // 将成员array的引用从原本的数组改为新的数组 + setArray(newElements); + } else { + // 如果指定下标位置原本存储的数据与新的数据相同 + // 不做任何更改 + setArray(elements); + } + // 返回原本下标位置的值 + return oldValue; + } finally { + // 释放锁/解锁 + lock.unlock(); + } +} +// CopyOnWriteArrayList类 → add()方法 +public void add(int index, E element) { + // 获取锁/加锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + // 获取内部存储数据的数组成员:array + Object[] elements = getArray(); + int len = elements.length; + // 如果指定下标位置超出数组长度或小于0则抛出异常 + if (index > len || index < 0) + throw new IndexOutOfBoundsException("Index: "+index+ + ", Size: "+len); + // 创建一个新的数组对象 + Object[] newElements; + // 计算插入的下标位置是在数组中间还在数组最后 + int numMoved = len - index; + // 如果在数组最后,那么拷贝原本的数组并长度+1,留个空位 + if (numMoved == 0) + newElements = Arrays.copyOf(elements, len + 1); + // 如果要在数组中间插入数据 + else { + // 先创建一个长度为len+1的新数组 + newElements = new Object[len + 1]; + // 然后将拷贝老数组中的所有数据拷贝过来 + // 但是将下标为index的位置空出来 + System.arraycopy(elements, 0, newElements, 0, index); + System.arraycopy(elements, index, newElements, index + 1, + numMoved); + } + // 将要添加的数据设置到数组的index下标位置 + newElements[index] = element; + // 将成员array的引用从原本的数组改为新的数组 + setArray(newElements); + } finally { + // 释放锁/解锁 + lock.unlock(); + } +} +// CopyOnWriteArrayList类 → remove()方法 +public E remove(int index) { + // 获取锁/加锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + // 拷贝原本的数组 + Object[] elements = getArray(); + // 获取数组长度与数组中要移除的值 + int len = elements.length; + E oldValue = get(elements, index); + // 计算要移除的位置是在数组的最后还是在数组的中间 + int numMoved = len - index - 1; + // 如果在数组最后 + if (numMoved == 0) + // 拷贝数组时,将最后一个元素不拷贝即可 + // 拷贝完成后重新更改引用指向 + setArray(Arrays.copyOf(elements, len - 1)); + // 如果要移除的位置是在数组中间 + else { + // 创建一个长度为原本长度-1的新数组 + Object[] newElements = new Object[len - 1]; + // 在拷贝数据时,将指定位置的元素不拷贝即可 + System.arraycopy(elements, 0, newElements, 0, index); + System.arraycopy(elements, index + 1, newElements, index, + numMoved); + // 更改成员array的引用指向 + setArray(newElements); + } + // 返回被移除的值 + return oldValue; + } finally { + // 释放锁/解锁 + lock.unlock(); + } +} +``` + + + +CopyOnWriteArraySet实际上是基于CopyOnWriteArrayList实现的: + +```java +public class CopyOnWriteArraySet extends AbstractSet + implements java.io.Serializable { + // 内部存储数据的结构 + private final CopyOnWriteArrayList al; + // 构造器 + public CopyOnWriteArraySet() { + al = new CopyOnWriteArrayList(); + } +} +``` + + + +### ConcurrentHashMap + +Java7 中 `ConcurrentHashMap` 使用的分段锁,也就是每一个 Segment 上同时只有一个线程可以操作,每一个 `Segment` 都是一个类似 `HashMap` 数组的结构,它可以扩容,它的冲突会转化为链表。但是 `Segment` 的个数一但初始化就不能改变。 + +Java8 中的 `ConcurrentHashMap` 使用的 `Synchronized` 锁加 CAS 的机制。结构也由 Java7 中的 **`Segment` 数组 + `HashEntry` 数组 + 链表** 进化成了 **Node 数组 + 链表 / 红黑树**,Node 是类似于一个 HashEntry 的结构。它的冲突再达到一定大小时会转化成红黑树,在冲突小于一定数量时又退回链表。 + + + ## 无锁 ### CAS -CAS 的全称是 Compare-And-Swap,依赖于CPU的原子性指令实现。 +CAS 的全称是 Compare-And-Swap,依赖于CPU的原子性指令实现。在 Java 中,实现 CAS 操作的一个关键类是`Unsafe`。 @@ -1861,7 +2252,7 @@ CAS 实现原子操作的三大问题: 1. ABA 问题。因为 CAS 需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA 问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加 1,那么 A→B→A 就会变成 1A→2B→3A。从 Java 1.5 开始,JDK 的 Atomic 包里提供了一个类 AtomicStampedReference 来解决 ABA 问题。 -2. 循环时间长开销大。自旋 CAS 如果长时间不成功,会给 CPU 带来非常大的执行开销。 +2. 循环时间长开销大。CAS 经常会用到自旋操作来进行重试,也就是不成功就一直循环执行直到成功。如果长时间不成功,会给 CPU 带来非常大的执行开销。 3. 只能保证一个共享变量的原子操作。当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性。从 Java 1.5 开始, JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行 CAS 操作。 @@ -2004,32 +2395,60 @@ static final class Cell { ## 线程池 +[Java线程池实现原理及其在美团业务中的实践 - 美团技术团队 (meituan.com)](https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html) + 作用: 1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。 3. 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。 -### 线程池状态 +关键属性: -ThreadPoolExecutor 使用 **int 的高 3 位来表示线程池状态,低 29 位表示线程数量** +```java +public class ThreadPoolExecutor extends AbstractExecutorService { -| 状态 | 高3位 | 接收新任务 | 处理阻塞任务队列 | 说明 | -| ---------- | ----- | ---------- | ---------------- | ----------------------------------------- | -| RUNNING | 111 | Y | Y | | -| SHUTDOWN | 000 | N | Y | 不接收新任务,但处理阻塞队列剩余任务 | -| STOP | 001 | N | N | 中断正在执行的任务,并抛弃阻塞队列任务 | -| TIDYING | 010 | - | - | 任务全执行完毕,活动线程为 0 即将进入终结 | -| TERMINATED | 011 | - | - | 终止状态 | + // 控制变量-存放状态和线程数 + private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); -这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值 + // 任务队列,必须是阻塞队列 + private final BlockingQueue workQueue; -```java -// c 为旧值, ctlOf 返回结果为新值 -ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))); + // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合 + private final HashSet workers = new HashSet<>(); + + // 全局锁 + private final ReentrantLock mainLock = new ReentrantLock(); -// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们 -private static int ctlOf(int rs, int wc) { return rs | wc; } + // awaitTermination方法使用的等待条件变量 + private final Condition termination = mainLock.newCondition(); + + // 记录峰值线程数 + private int largestPoolSize; + + // 记录已经成功执行完毕的任务数 + private long completedTaskCount; + + // 线程工厂,用于创建新的线程实例 + private volatile ThreadFactory threadFactory; + + // 拒绝执行处理器,对应不同的拒绝策略 + private volatile RejectedExecutionHandler handler; + + // 空闲线程等待任务的时间周期,单位是纳秒 + private volatile long keepAliveTime; + + // 是否允许核心线程超时,如果为true则keepAliveTime对核心线程也生效 + private volatile boolean allowCoreThreadTimeOut; + + // 核心线程数 + private volatile int corePoolSize; + + // 线程池容量 + private volatile int maximumPoolSize; + + // 省略其他代码 +} ``` @@ -2058,9 +2477,9 @@ public ThreadPoolExecutor(int corePoolSize, - ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。 - - LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按 FIFO 排序元素,吞吐量通常要高于 ArrayBlockingQueue。静态工厂方法 Executors.newFixedThreadPool()使用了这个队列。 + - LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按 FIFO 排序元素,吞吐量通常要高于 ArrayBlockingQueue。Executors.newFixedThreadPool()使用了这个队列。 - - SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。 + - SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue,Executors.newCachedThreadPool 使用了这个队列。 - PriorityBlockingQueue:一个具有优先级的无限阻塞队列。 @@ -2070,61 +2489,50 @@ public ThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler 下有 4 个实现类: - - AbortPolicy:让调用者抛出 RejectedExecutionException 异常,**默认策略** - - CallerRunsPolicy:让调用者线程执行 + - AbortPolicy:抛出 RejectedExecutionException 异常,**默认策略** + - CallerRunsPolicy:让调用者线程处理当前任务 - DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常 - - DiscardOldestPolicy:放弃队列中最早的任务,把当前任务加入队列中尝试再次提交当前任务 - - + - DiscardOldestPolicy:放弃队列中最早的任务,再次提交当前任务 -### 工作原理 - -如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务; -如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue; +### 线程池状态 -如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务 ; +ThreadPoolExecutor 使用 **int 的高 3 位来表示线程池状态,低 29 位表示线程数量** -如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。 +| 状态 | 高3位 | 接收新任务 | 处理阻塞队列任务 | 说明 | +| ---------- | ----- | ---------- | ---------------- | ------------------------------------------------------------ | +| RUNNING | 111 | Y | Y | 能接收新提交的任务和处理阻塞队列中的任务 | +| SHUTDOWN | 000 | N | Y | 不接收新任务,但处理阻塞队列剩余任务 | +| STOP | 001 | N | N | 不接收新任务,也不处理阻塞队列剩余任务,会中断正在执行的任务 | +| TIDYING | 010 | - | - | 任务全执行完毕,活动线程为 0 即将进入终结 | +| TERMINATED | 011 | - | - | 终止状态 | -![](./JUC/线程池工作原理.jpeg) +这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值 ```java -public void execute(Runnable command) { - if (command == null) throw new NullPointerException(); - // 如果线程数小于基本线程数,则创建线程并执行当前任务 - if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { - // 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。 - if (runState == RUNNING && workQueue.offer(command)) { - if (runState != RUNNING || poolSize == 0) - ensureQueuedTaskHandled(command); - } - // 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量, - // 则创建一个线程执行任务。 - else if (!addIfUnderMaximumPoolSize(command)) - // 抛出 RejectedExecutionException 异常 - reject(command); // is shutdown or saturated - } -} +private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); +private static final int COUNT_BITS = Integer.SIZE - 3; +private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; + +private static final int RUNNING = -1 << COUNT_BITS; +private static final int SHUTDOWN = 0 << COUNT_BITS; +private static final int STOP = 1 << COUNT_BITS; +private static final int TIDYING = 2 << COUNT_BITS; +private static final int TERMINATED = 3 << COUNT_BITS; + +// 通过ctl值获取运行状态 高3位 +private static int runStateOf(int c) { return c & ~COUNT_MASK; } +// 通过ctl值获取工作线程数 低29位 +private static int workerCountOf(int c) { return c & COUNT_MASK; } + +// 通过运行状态和工作线程数计算ctl的值,或运算 +private static int ctlOf(int rs, int wc) { return rs | wc; } ``` -线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后,还会循环获取工作队列里的任务来执行。 +线程池状态的跃迁图: -```java -public void run() { - try { - Runnable task = firstTask; - firstTask = null; - while (task != null || (task = getTask()) != null) { - runTask(task); - task = null; - } - } finally { - workerDone(this); - } -} -``` +![](./JUC/线程池状态变化图.png) @@ -2164,6 +2572,8 @@ Future 接口有 5 个方法,它们分别是取消任务的方法 `cancel()` `submit()`方法用于提交需要返回值的任务。线程池会返回一个 `Future` 类型的对象,通过这个 `Future` 对象可以判断任务是否执行成功,并且可以通过 `Future` 的 `get()`方法来获取返回值,`get()`方法会阻塞当前线程直到任务完成,而使用 `get(long timeout,TimeUnit unit)`方法的话,如果在 `timeout` 时间内任务还没有执行完,就会抛出 `java.util.concurrent.TimeoutException` + + ### 关闭方法 可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。 @@ -2179,21 +2589,79 @@ ExecutorService 类 API: + +### 工作原理 + +1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。 +2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。 +3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。 +4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。 +5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。 + +![](./JUC/任务调度流程.png) + + + +```java +public void execute(Runnable command) { + if (command == null) + throw new NullPointerException(); + // 获取ctl的值 + int c = ctl.get(); + // 判断如果当前工作线程数小于核心线程数,则创建新的核心线程并且执行传入的任务 + if (workerCountOf(c) < corePoolSize) { + if (addWorker(command, true)) + // 如果创建新的核心线程成功则直接返回 + return; + // 这里说明创建核心线程失败,需要更新ctl的临时变量c + c = ctl.get(); + } + // 当前工作线程数大于等于corePoolSize + // 判断线程池是否处于运行中状态,同时尝试用非阻塞方法向任务队列放入任务(放入任务失败返回false) + if (isRunning(c) && workQueue.offer(command)) { + int recheck = ctl.get(); + // 对线程池的运行中状态做二次检查 + // 如果线程池二次检查状态是非运行中状态,则从任务队列移除当前的任务调用拒绝策略处理 + if (! isRunning(recheck) && remove(command)) + reject(command); + // 如果当前工作线程数量为0,则必须要新创建一个线程处理队列中的任务 + else if (workerCountOf(recheck) == 0) + addWorker(null, false); + } + // 如果向任务队列投放任务失败,则会尝试创建非核心线程传入任务执行 + // 创建非核心线程失败,此时需要拒绝执行任务 + else if (!addWorker(command, false)) + // 调用拒绝策略处理任务 - 返回 + reject(command); +} +``` + +`addWork() `的两个参数,第一个是需要提交的线程` Runnable firstTask`,第二个参数是 `boolean` 类型,表示是否为核心线程。 `execute()` 中有三处调用了 `addWork() `: + +- 第一次,`addWorker(command, true)`,如果当前工作线程总数小于corePoolSize,则直接创建核心线程执行任务。 +- 第二次,`addWorker(null, false); `,如果当前工作线程总数大于等于corePoolSize,判断线程池是否处于运行中状态,同时尝试用非阻塞方法向队列放入任务,这里会二次检查线程池运行状态。如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null。为什么这里是 null ?之前已经把`command`提交到阻塞队列了`workQueue.offer(command)`,所以提交一个空线程,直接从阻塞队列里面取就可以了。 +- 第三次,`if (!addWorker(command, false))`,阻塞队列满了,则会尝试创建非核心线程传入任务实例执行。 + + + ### 开发要求 阿里巴巴 Java 开发手册要求: -- **线程资源必须通过线程池提供,不允许在应用中自行显式创建线程** +- 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程 - 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题 - 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者过度切换的问题 -- 线程池**不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式**,这样的处理方式更加明确线程池的运行规则,规避资源耗尽的风险 +- 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式更加明确线程池的运行规则,规避资源耗尽的风险。 Executors 返回的线程池对象弊端如下: - - FixedThreadPool 和 SingleThreadExecutor:请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM(内存溢出) - - CacheThreadPool 和 ScheduledThreadPool:允许创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,导致 OOM + - FixedThreadPool 和 SingleThreadExecutor:工作队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM(内存溢出) + - CacheThreadPool :允许创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,导致 OOM + - ScheduledThreadPool 和 SingleThreadScheduledExecutor : 使用的无界的延迟阻塞队列 `DelayedWorkQueue` ,任务队列最大长度为 `Integer.MAX_VALUE` ,可能堆积大量的请求,从而导致 OOM。 + + 创建多大容量的线程池合适? @@ -2247,7 +2715,7 @@ Executor 框架主要由 3 大部分组成如下: Executors 可以创建 3 种类型的 ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool 和 CachedThreadPool。 -##### **FixedThreadPool**: +##### **FixedThreadPool** ```java public static ExecutorService newFixedThreadPool(int nThreads) { @@ -2321,9 +2789,9 @@ public static ExecutorService newCachedThreadPool() { ![](./JUC/CachedThreadPool执行流程.jpeg) -1. 首先执行 SynchronousQueue.offer(Runnable task)。如果当前 maximumPool 中有空闲线程正在执行 SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成。 -2. 当初始 maximumPool 为空,或者 maximumPool 中当前没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)。此时 CachedThreadPool 会创建一个新线程执行任务,execute()方法执行完成。 -3. 新创建的线程将任务执行完后,会执行 SynchronousQueue.poll (keepAliveTime,TimeUnit.NANOSECONDS)。这个 poll 操作会让空闲线程最多在 SynchronousQueue 中等待 60 秒钟。如果 60 秒钟内主线程提交了一个新任务,那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。 +1. 首先执行 `SynchronousQueue.offer(Runnable task)`。如果当前 maximumPool 中有空闲线程正在执行 `SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)`,那么主线程执行 offer 操作与空闲线程执行的 poll 操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成。 +2. 当初始 maximumPool 为空,或者 maximumPool 中当前没有空闲线程时,将没有线程执行 `SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)`。此时 CachedThreadPool 会创建一个新线程执行任务,execute()方法执行完成。 +3. 新创建的线程将任务执行完后,会执行 `SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)`。这个 poll 操作会让空闲线程最多在 SynchronousQueue 中等待 60 秒钟。如果 60 秒钟内主线程提交了一个新任务,那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。 @@ -2348,11 +2816,11 @@ public ScheduledThreadPoolExecutor(int corePoolSize) { ![](./JUC/ScheduledThreadPool执行流程.jpeg) -1. 当调用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate()方法或者 scheduleWithFixedDelay()方法时,会向 ScheduledThreadPoolExecutor 的 DelayQueue 添加一个实现了 RunnableScheduledFutur 接口的 ScheduledFutureTask。 +1. 当调用` ScheduledThreadPoolExecutor` 的 `scheduleAtFixedRate()` 方法或者 `scheduleWithFixedDelay()` 方法时,会向 `ScheduledThreadPoolExecutor` 的 `DelayQueue` 添加一个实现了 `RunnableScheduledFutur` 接口的 `ScheduledFutureTask`。 -2. 线程池中的线程从 DelayQueue 中获取 ScheduledFutureTask,然后执行任务。 +2. 线程池中的线程从 `DelayQueue` 中获取 `ScheduledFutureTask`,然后执行任务。 -ScheduledFutureTask 主要包含 3 个成员变量: +`ScheduledFutureTask` 主要包含 3 个成员变量: * long time,表示这个任务将要被执行的具体时间。 * long sequenceNumber,表示这个任务被添加到 ScheduledThreadPoolExecutor 中的序号。 @@ -2403,9 +2871,27 @@ public E take() throws InterruptedException { Future 接口和实现 Future 接口的 FutureTask 类用来表示异步计算的结果。当我们把 Runnable 接口或 Callable 接口的实现类提交(submit)给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 时,ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 会向我们返回一个 FutureTask 对象。 +```java +public interface Future { + // 取消任务执行 + // 成功取消返回 true,否则返回 false + boolean cancel(boolean mayInterruptIfRunning); + // 判断任务是否被取消 + boolean isCancelled(); + // 判断任务是否已经执行完成 + boolean isDone(); + // 获取任务执行结果 + V get() throws InterruptedException, ExecutionException; + // 指定时间内没有返回计算结果就抛出 TimeOutException 异常 + V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutExceptio +} +``` + + + #### Runnable 接口和 Callable 接口 -Runnable 接口和 Callable 接口的实现类,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。它们之间的区别是 Runnable 不会返回结果,而 Callable 可以返回结果。 +Runnable 接口和 Callable 接口的实现类,都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。它们之间的区别是 Runnable 不会返回结果,而 Callable 可以返回结果和抛出异常。 @@ -2425,7 +2911,7 @@ Synchronized与ReetrantLock则是利用上述第一点:破坏多线程条件 -ThreadLocal 实现每一个线程都有自己专属的本地变量副本来避免共享,从而避免了线程安全问题。 +ThreadLocal 实现每一个线程都有自己专属的本地变量副本来避免共享,使用 `get()` 和 `set()` 方法来获取默认值或将其值更改为当前线程所存的副本的值,从而避免了线程安全问题。 ### **ThreadLocal 的工作原理** @@ -2433,7 +2919,7 @@ ThreadLocal 的目标是让不同的线程有不同的变量 V,那最直接的 ![](./JUC/ThreadLocal.png) -`Thread`类有一个类型为`ThreadLocal.ThreadLocalMap`的实例变量`threadLocals`,也就是说每个线程有一个自己的`ThreadLocalMap`。这个 `threadLocals` 就是每条线程用来存储变量副本的,key值为当前 `ThreadLocal` 对象,value为变量值。 +每个`Thread`都有一个类型为`ThreadLocal.ThreadLocalMap`的实例变量`threadLocals`,也就是说每个线程有一个自己的`ThreadLocalMap`。这个 `threadLocals` 就是每条线程用来存储变量副本的,key值为当前 `ThreadLocal` 对象,value为变量值。 ```java // ThreadLocal类 @@ -2452,7 +2938,7 @@ public class ThreadLocal { } ``` -每个线程在往`ThreadLocal`里放值的时候,都会往自己的`ThreadLocalMap`里存,读也是以`ThreadLocal`作为引用,在自己的`map`里找对应的`key`,从而实现了**线程隔离**。 +每个线程在往`ThreadLocal`里放值的时候,都会往自己的`ThreadLocalMap`里存,读也是以`ThreadLocal`作为引用,在自己的`map`里找对应的`key`,从而实现了线程隔离。 @@ -2533,7 +3019,7 @@ protected T initialValue() { `ThreadLocalMap` 中使用的 key 为 `ThreadLocal` 的弱引用,而 value 是强引用。所以,如果 `ThreadLocal` 没有被外部强引用的情况下,在垃圾回收的时候,Entry.key 会被清理掉,而 Entry.value 不会被清理掉。 -这样一来,`ThreadLocalMap` 中就会出现 key 为 null 的 Entry。假如我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。`ThreadLocalMap` 实现中已经考虑了这种情况,在调用 `set()`、`get()`、`remove()` 方法的时候,会清理掉 key 为 null 的记录。使用完 `ThreadLocal`方法后最好手动调用`remove()`方法 +这样一来,`ThreadLocalMap` 中就会出现 key 为 null 的 Entry。假如不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。`ThreadLocalMap` 实现中已经考虑了这种情况,在调用 `set()`、`get()`、`remove()` 方法的时候,会清理掉 key 为 null 的记录。使用完 `ThreadLocal`方法后最好手动调用`remove()`方法。 @@ -2582,6 +3068,47 @@ public class ThreadLocal { +### ThreadLocalMap.get() + +```java +// ThreadLocal类 -> ThreadLocalMap内部类 -> getEntry()方法 +private Entry getEntry(ThreadLocal key) { + int i = key.threadLocalHashCode & (table.length - 1); + // 获取table[i]位置的元素,如果不为空并且key相同则返回 + Entry e = table[i]; + if (e != null && e.get() == key) + return e; + // 如果key不相同则遍历整个table[i]之后的元素获取对应key的值 + else + return getEntryAfterMiss(key, i, e); +} + +// ThreadLocal类 -> ThreadLocalMap内部类 -> getEntryAfterMiss()方法 +private Entry getEntryAfterMiss(ThreadLocal key, int i, Entry e) { + Entry[] tab = table; + int len = tab.length; + // 遍历整个table[i]之后的元素 + while (e != null) { + ThreadLocal k = e.get(); + // 如果key相同则返回对应的元素 + if (k == key) + return e; + if (k == null) + expungeStaleEntry(i); + else + i = nextIndex(i, len); + e = tab[i]; + } + return null; +} +``` + +**第一种情况:** 通过查找`key`值计算出散列表中位置,然后该位置中的`Entry.key`和查找的`key`一致,则直接返回。 + +**第二种情况:** `slot`位置中的`Entry.key`和要查找的`key`不一致:往后遍历查找,如果时遇到`key=null`,触发一次探测式数据回收操作,执行`expungeStaleEntry()`方法,直到Entry为null或找到匹配值。 + + + ### ThreadLocalMap.set() ```java @@ -2674,6 +3201,8 @@ private void replaceStaleEntry(ThreadLocal key, Object value, ![](./JUC/set.png) +> 绿色块`Entry`代表正常数据,灰色块代表`Entry`的`key`值为`null`,已被垃圾回收。白色块表示`Entry`为`null` + 散列数组下标为 7 位置对应的`Entry`数据`key`为`null`,表明此数据`key`值已经被垃圾回收掉了,此时就会执行`replaceStaleEntry()`方法,进行探测式数据清理工作。 数据清理工作: @@ -2692,47 +3221,6 @@ private void replaceStaleEntry(ThreadLocal key, Object value, -### ThreadLocalMap.get() - -```java -// ThreadLocal类 -> ThreadLocalMap内部类 -> getEntry()方法 -private Entry getEntry(ThreadLocal key) { - int i = key.threadLocalHashCode & (table.length - 1); - // 获取table[i]位置的元素,如果不为空并且key相同则返回 - Entry e = table[i]; - if (e != null && e.get() == key) - return e; - // 如果key不相同则遍历整个table[i]之后的元素获取对应key的值 - else - return getEntryAfterMiss(key, i, e); -} - -// ThreadLocal类 -> ThreadLocalMap内部类 -> getEntryAfterMiss()方法 -private Entry getEntryAfterMiss(ThreadLocal key, int i, Entry e) { - Entry[] tab = table; - int len = tab.length; - // 遍历整个table[i]之后的元素 - while (e != null) { - ThreadLocal k = e.get(); - // 如果key相同则返回对应的元素 - if (k == key) - return e; - if (k == null) - expungeStaleEntry(i); - else - i = nextIndex(i, len); - e = tab[i]; - } - return null; -} -``` - -**第一种情况:** 通过查找`key`值计算出散列表中位置,然后该位置中的`Entry.key`和查找的`key`一致,则直接返回。 - -**第二种情况:** `slot`位置中的`Entry.key`和要查找的`key`不一致:往后遍历查找,如果时遇到`key=null`,触发一次探测式数据回收操作,执行`expungeStaleEntry()`方法,直到Entry为null或找到匹配值。 - - - ### 清理工作 `ThreadLocalMap`的两种过期`key`数据清理方式:**探测式清理**和**启发式清理**。 @@ -2813,6 +3301,52 @@ private boolean cleanSomeSlots(int i, int n) { 在`ThreadLocalMap.set()`方法的最后,如果执行完启发式清理工作后,未清理到任何数据,且当前散列数组中`Entry`的数量已经达到了列表的扩容阈值`(len*2/3)`,就开始执行`rehash()`逻辑: +```java +private void rehash() { + expungeStaleEntries(); + + if (size >= threshold - threshold / 4) + resize(); +} + +private void expungeStaleEntries() { + Entry[] tab = table; + int len = tab.length; + for (int j = 0; j < len; j++) { + Entry e = tab[j]; + if (e != null && e.get() == null) + expungeStaleEntry(j); + } +} +private void resize() { + Entry[] oldTab = table; + int oldLen = oldTab.length; + int newLen = oldLen * 2; + Entry[] newTab = new Entry[newLen]; + int count = 0; + + for (int j = 0; j < oldLen; ++j) { + Entry e = oldTab[j]; + if (e != null) { + ThreadLocal k = e.get(); + if (k == null) { + e.value = null; + } else { + int h = k.threadLocalHashCode & (newLen - 1); + while (newTab[h] != null) + h = nextIndex(h, newLen); + newTab[h] = e; + count++; + } + } + } + + setThreshold(newLen); + size = count; + table = newTab; +} +``` + `rehash()`首先是会进行探测式清理工作,从`table`的起始位置往后清理。清理完成之后,`table`中可能有一些`key`为`null`的`Entry`数据被清理掉,所以此时通过判断`size >= threshold - threshold / 4` 也就是`size >= threshold * 3/4` 来决定是否扩容。 扩容后的`tab`的大小为`oldLen * 2`,然后遍历老的散列表,重新计算`hash`位置,然后放到新的`tab`数组中,如果出现`hash`冲突则往后寻找最近的`entry`为`null`的槽位,遍历完成之后,`oldTab`中所有的`entry`数据都已经放入到新的`tab`中了。 diff --git "a/docs/JUC/JUC/\344\273\273\345\212\241\350\260\203\345\272\246\346\265\201\347\250\213.png" "b/docs/JUC/JUC/\344\273\273\345\212\241\350\260\203\345\272\246\346\265\201\347\250\213.png" new file mode 100644 index 0000000..fbe43d0 Binary files /dev/null and "b/docs/JUC/JUC/\344\273\273\345\212\241\350\260\203\345\272\246\346\265\201\347\250\213.png" differ diff --git "a/docs/JUC/JUC/\347\272\277\347\250\213\346\261\240\347\212\266\346\200\201\345\217\230\345\214\226\345\233\276.png" "b/docs/JUC/JUC/\347\272\277\347\250\213\346\261\240\347\212\266\346\200\201\345\217\230\345\214\226\345\233\276.png" new file mode 100644 index 0000000..6fa9494 Binary files /dev/null and "b/docs/JUC/JUC/\347\272\277\347\250\213\346\261\240\347\212\266\346\200\201\345\217\230\345\214\226\345\233\276.png" differ diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 0000000..1429952 --- /dev/null +++ b/docs/index.md @@ -0,0 +1,2 @@ +## 欢迎来到 **WNote**! +