diff --git a/README.md b/README.md index ea31a328..889a4cf1 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,14 @@ ThreadPoolExecutor 源码解析 +ReentrantLock源码解析 + +- 参考文章 [ReentrantLock源码分析](https://java.isture.com/thread/concurrent/ReentrantLock/source/ReentrantLock%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90.html) + +AbstractQueuedSynchronizer(AQS源码分析) + +- 参考文章 [锁核心类AQS详解](https://java.isture.com/thread/JUCLock/锁核心类AQS详解.html) + ## 其他 Object 源码解析 diff --git a/src/java/util/concurrent/CountDownLatch.java b/src/java/util/concurrent/CountDownLatch.java index b0c80a9a..0799a25f 100644 --- a/src/java/util/concurrent/CountDownLatch.java +++ b/src/java/util/concurrent/CountDownLatch.java @@ -161,25 +161,33 @@ public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; + + // 构造器 Sync(int count) { setState(count); } - + // 返回当前计数 int getCount() { return getState(); } + // 试图在共享模式下获取对象状态 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } + // 试图设置状态来反映共享模式下的一个释放 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero + // 无限循环 for (;;) { + // 获取状态 int c = getState(); - if (c == 0) + if (c == 0)// 没有被线程占有 return false; + // 下一个状态 int nextc = c-1; + // 比较并且设置成功 if (compareAndSetState(c, nextc)) return nextc == 0; } @@ -197,6 +205,7 @@ protected boolean tryReleaseShared(int releases) { */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); + // 初始化状态数 this.sync = new Sync(count); } diff --git a/src/java/util/concurrent/FutureTask.java b/src/java/util/concurrent/FutureTask.java index 9a3747a5..bba43a66 100644 --- a/src/java/util/concurrent/FutureTask.java +++ b/src/java/util/concurrent/FutureTask.java @@ -89,6 +89,7 @@ public class FutureTask implements RunnableFuture { * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ + //任务状态 private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; @@ -99,15 +100,20 @@ public class FutureTask implements RunnableFuture { private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ + //内部持有的callable任务,运行完毕后置空 private Callable callable; /** The result to return or exception to throw from get() */ + //从get()中返回的结果或抛出的异常 private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ + //运行callable的线程 private volatile Thread runner; /** Treiber stack of waiting threads */ + //使用Treiber栈保存等待线程 private volatile WaitNode waiters; /** + * //返回执行结果或抛出异常 * Returns result or throws exception for completed task. * * @param s completed state value @@ -129,6 +135,8 @@ private V report(int s) throws ExecutionException { * @param callable the callable task * @throws NullPointerException if the callable is null */ + //这个构造函数会把传入的Callable变量保存在this.callable字段中, + // 该字段定义为private Callable callable;用来保存底层的调用,在被执行完成以后会指向null,接着会初始化state字段为NEW。 public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); @@ -148,6 +156,7 @@ public FutureTask(Callable callable) { * {@code Future f = new FutureTask(runnable, null)} * @throws NullPointerException if the runnable is null */ + public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable @@ -162,12 +171,13 @@ public boolean isDone() { } public boolean cancel(boolean mayInterruptIfRunning) { + //如果当前Future状态为NEW,根据参数修改Future状态为INTERRUPTING或CANCELLED if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception - if (mayInterruptIfRunning) { + if (mayInterruptIfRunning) {//可以在运行时中断 try { Thread t = runner; if (t != null) @@ -177,7 +187,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { } } } finally { - finishCompletion(); + finishCompletion();//移除并唤醒所有等待线程 } return true; } @@ -193,6 +203,7 @@ public V get() throws InterruptedException, ExecutionException { } /** + * //获取执行结果 * @throws CancellationException {@inheritDoc} */ public V get(long timeout, TimeUnit unit) @@ -230,7 +241,7 @@ protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state - finishCompletion(); + finishCompletion(); //执行完毕,唤醒等待线程 } } @@ -253,6 +264,7 @@ protected void setException(Throwable t) { } public void run() { + //新建任务,CAS替换runner为当前线程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) @@ -271,7 +283,7 @@ public void run() { setException(ex); } if (ran) - set(result); + set(result);//设置执行结果 } } finally { // runner must be non-null until state is settled to @@ -281,7 +293,7 @@ public void run() { // leaked interrupts int s = state; if (s >= INTERRUPTING) - handlePossibleCancellationInterrupt(s); + handlePossibleCancellationInterrupt(s);//处理中断逻辑 } } @@ -331,6 +343,7 @@ protected boolean runAndReset() { private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. + //在中断者中断线程之前可能会延迟,所以我们只需要让出CPU时间片自旋等待 if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt @@ -364,12 +377,12 @@ static final class WaitNode { private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { - if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { - for (;;) { + if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//移除等待线程 + for (;;) {//自旋遍历等待线程 Thread t = q.thread; if (t != null) { q.thread = null; - LockSupport.unpark(t); + LockSupport.unpark(t);//唤醒等待线程 } WaitNode next = q.next; if (next == null) @@ -380,7 +393,7 @@ private void finishCompletion() { break; } } - + //任务完成后调用函数,自定义扩展 done(); callable = null; // to reduce footprint @@ -398,8 +411,8 @@ private int awaitDone(boolean timed, long nanos) final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; - for (;;) { - if (Thread.interrupted()) { + for (;;) {//自旋 + if (Thread.interrupted()) {//获取并清除中断状态 removeWaiter(q); throw new InterruptedException(); } @@ -407,7 +420,7 @@ private int awaitDone(boolean timed, long nanos) int s = state; if (s > COMPLETING) { if (q != null) - q.thread = null; + q.thread = null;//置空等待节点的线程 return s; } else if (s == COMPLETING) // cannot time out yet @@ -415,18 +428,19 @@ else if (s == COMPLETING) // cannot time out yet else if (q == null) q = new WaitNode(); else if (!queued) + //CAS修改waiter queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { - removeWaiter(q); + removeWaiter(q);//超时,移除等待节点 return state; } - LockSupport.parkNanos(this, nanos); + LockSupport.parkNanos(this, nanos);//阻塞当前线程 } else - LockSupport.park(this); + LockSupport.park(this);//阻塞当前线程 } } @@ -442,9 +456,10 @@ else if (timed) { */ private void removeWaiter(WaitNode node) { if (node != null) { - node.thread = null; + node.thread = null;//首先置空线程 retry: for (;;) { // restart on removeWaiter race + //依次遍历查找 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) diff --git a/src/java/util/concurrent/ScheduledThreadPoolExecutor.java b/src/java/util/concurrent/ScheduledThreadPoolExecutor.java index 8c0a4519..66ca3db6 100644 --- a/src/java/util/concurrent/ScheduledThreadPoolExecutor.java +++ b/src/java/util/concurrent/ScheduledThreadPoolExecutor.java @@ -150,6 +150,7 @@ public class ScheduledThreadPoolExecutor */ /** + * //关闭后继续执行已经存在的周期任务 * False if should cancel/suppress periodic tasks on shutdown. */ private volatile boolean continueExistingPeriodicTasksAfterShutdown; @@ -157,17 +158,20 @@ public class ScheduledThreadPoolExecutor /** * False if should cancel non-periodic tasks on shutdown. */ + //关闭后继续执行已经存在的延时任务 private volatile boolean executeExistingDelayedTasksAfterShutdown = true; /** * True if ScheduledFutureTask.cancel should remove from queue */ + //取消任务后移除 private volatile boolean removeOnCancel = false; /** * Sequence number to break scheduling ties, and in turn to * guarantee FIFO order among tied entries. */ + //为相同延时的任务提供的顺序编号,保证任务之间的FIFO顺序 private static final AtomicLong sequencer = new AtomicLong(); /** @@ -181,9 +185,11 @@ private class ScheduledFutureTask extends FutureTask implements RunnableScheduledFuture { /** Sequence number to break ties FIFO */ + //为相同延时任务提供的顺序编号 private final long sequenceNumber; /** The time the task is enabled to execute in nanoTime units */ + //任务可以执行的时间,纳秒级 private long time; /** @@ -192,14 +198,17 @@ private class ScheduledFutureTask * indicates fixed-delay execution. A value of 0 indicates a * non-repeating task. */ + //重复任务的执行周期时间,纳秒级。 private final long period; /** The actual task to be re-enqueued by reExecutePeriodic */ + //重新入队的任务 RunnableScheduledFuture outerTask = this; /** * Index into delay queue, to support faster cancellation. */ + //延迟队列的索引,以支持更快的取消操作 int heapIndex; /** @@ -266,13 +275,14 @@ public boolean isPeriodic() { /** * Sets the next time to run for a periodic task. + * //设置下一次执行任务的时间 */ private void setNextRunTime() { long p = period; - if (p > 0) + if (p > 0) //固定速率执行,scheduleAtFixedRate time += p; else - time = triggerTime(-p); + time = triggerTime(-p); //固定延迟执行,scheduleWithFixedDelay } public boolean cancel(boolean mayInterruptIfRunning) { @@ -286,14 +296,14 @@ public boolean cancel(boolean mayInterruptIfRunning) { * Overrides FutureTask version so as to reset/requeue if periodic. */ public void run() { - boolean periodic = isPeriodic(); - if (!canRunInCurrentRunState(periodic)) + boolean periodic = isPeriodic();;//是否为周期任务 + if (!canRunInCurrentRunState(periodic))//当前状态是否可以执行 cancel(false); - else if (!periodic) + else if (!periodic)//不是周期任务,直接执行 ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { - setNextRunTime(); - reExecutePeriodic(outerTask); + setNextRunTime();//设置下一次运行时间 + reExecutePeriodic(outerTask);//重排序一个周期任务 } } } @@ -322,16 +332,16 @@ boolean canRunInCurrentRunState(boolean periodic) { * @param task the task */ private void delayedExecute(RunnableScheduledFuture task) { - if (isShutdown()) + if (isShutdown())//池已关闭,执行拒绝策略 reject(task); else { - super.getQueue().add(task); + super.getQueue().add(task);//任务入队 if (isShutdown() && - !canRunInCurrentRunState(task.isPeriodic()) && - remove(task)) + !canRunInCurrentRunState(task.isPeriodic()) &&//判断run-after-shutdown参数 + remove(task))//移除任务 task.cancel(false); else - ensurePrestart(); + ensurePrestart();//启动一个新的线程等待任务 } } @@ -341,13 +351,15 @@ private void delayedExecute(RunnableScheduledFuture task) { * * @param task the task */ + //重排序一个周期任务 void reExecutePeriodic(RunnableScheduledFuture task) { - if (canRunInCurrentRunState(true)) { - super.getQueue().add(task); + if (canRunInCurrentRunState(true)) {//池关闭后可继续执行 + super.getQueue().add(task);//任务入列 + //重新检查run-after-shutdown参数,如果不能继续运行就移除队列任务,并取消任务的执行 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else - ensurePrestart(); + ensurePrestart();//启动一个新的线程等待任务 } } @@ -355,33 +367,38 @@ void reExecutePeriodic(RunnableScheduledFuture task) { * Cancels and clears the queue of all tasks that should not be run * due to shutdown policy. Invoked within super.shutdown. */ + //取消并清除由于关闭策略不应该运行的所有任务 @Override void onShutdown() { BlockingQueue q = super.getQueue(); + //获取run-after-shutdown参数 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); - if (!keepDelayed && !keepPeriodic) { + if (!keepDelayed && !keepPeriodic) {//池关闭后不保留任务 + //依次取消任务 for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture) ((RunnableScheduledFuture) e).cancel(false); - q.clear(); + q.clear();//清除等待队列 } - else { + else {//池关闭后保留任务 // Traverse snapshot to avoid iterator exceptions + //遍历快照以避免迭代器异常 for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture t = (RunnableScheduledFuture)e; if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled + //如果任务已经取消,移除队列中的任务 if (q.remove(t)) t.cancel(false); } } } } - tryTerminate(); + tryTerminate(); //终止线程池 } /** @@ -495,6 +512,7 @@ private long triggerTime(long delay, TimeUnit unit) { /** * Returns the trigger time of a delayed action. + * //计算固定延迟任务的执行时间 */ long triggerTime(long delay) { return now() + @@ -555,6 +573,10 @@ public ScheduledFuture schedule(Callable callable, * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ + /** + * 创建一个周期执行的任务,第一次执行延期时间为initialDelay, + * 之后每隔period执行一次,不等待第一次执行完成就开始计时 + */ public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, @@ -563,14 +585,15 @@ public ScheduledFuture scheduleAtFixedRate(Runnable command, throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); + //构建RunnableScheduledFuture任务类型 ScheduledFutureTask sft = new ScheduledFutureTask(command, null, - triggerTime(initialDelay, unit), - unit.toNanos(period)); - RunnableScheduledFuture t = decorateTask(command, sft); - sft.outerTask = t; - delayedExecute(t); + triggerTime(initialDelay, unit),//计算任务的延迟时间 + unit.toNanos(period));//计算任务的执行周期 + RunnableScheduledFuture t = decorateTask(command, sft);//执行用户自定义逻辑 + sft.outerTask = t;//赋值给outerTask,准备重新入队等待下一次执行 + delayedExecute(t);//执行任务 return t; } @@ -579,6 +602,10 @@ public ScheduledFuture scheduleAtFixedRate(Runnable command, * @throws NullPointerException {@inheritDoc} * @throws IllegalArgumentException {@inheritDoc} */ + /** + * 创建一个周期执行的任务,第一次执行延期时间为initialDelay, + * 在第一次执行完之后延迟delay后开始下一次执行 + */ public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, @@ -587,14 +614,15 @@ public ScheduledFuture scheduleWithFixedDelay(Runnable command, throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); + //构建RunnableScheduledFuture任务类型 ScheduledFutureTask sft = new ScheduledFutureTask(command, null, - triggerTime(initialDelay, unit), - unit.toNanos(-delay)); - RunnableScheduledFuture t = decorateTask(command, sft); - sft.outerTask = t; - delayedExecute(t); + triggerTime(initialDelay, unit),//计算任务的延迟时间 + unit.toNanos(-delay));//计算任务的执行周期 + RunnableScheduledFuture t = decorateTask(command, sft);//执行用户自定义逻辑 + sft.outerTask = t;//赋值给outerTask,准备重新入队等待下一次执行 + delayedExecute(t);//执行任务 return t; } diff --git a/src/java/util/concurrent/ThreadPoolExecutor.java b/src/java/util/concurrent/ThreadPoolExecutor.java index e2fba877..df4234cf 100644 --- a/src/java/util/concurrent/ThreadPoolExecutor.java +++ b/src/java/util/concurrent/ThreadPoolExecutor.java @@ -343,6 +343,8 @@ public class ThreadPoolExecutor extends AbstractExecutorService { * that workerCount is 0 (which sometimes entails a recheck -- see * below). */ + //这个属性是用来存放 当前运行的worker数量以及线程池状态的 + //int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; @@ -410,6 +412,7 @@ private void decrementWorkerCount() { * return null even if it may later return non-null when delays * expire. */ + //存放任务的阻塞队列 private final BlockingQueue workQueue; /** @@ -431,6 +434,7 @@ private void decrementWorkerCount() { * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ + //worker的集合,用set来存放 private final HashSet workers = new HashSet(); /** @@ -442,6 +446,7 @@ private void decrementWorkerCount() { * Tracks largest attained pool size. Accessed only under * mainLock. */ + //历史达到的worker数最大值 private int largestPoolSize; /** @@ -479,6 +484,7 @@ private void decrementWorkerCount() { /** * Handler called when saturated or shutdown in execute. */ + //当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略 private volatile RejectedExecutionHandler handler; /** @@ -487,6 +493,7 @@ private void decrementWorkerCount() { * present or if allowCoreThreadTimeOut. Otherwise they wait * forever for new work. */ + //超出coreSize的worker的生存时间 private volatile long keepAliveTime; /** @@ -501,12 +508,14 @@ private void decrementWorkerCount() { * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. */ + //常驻worker的数量 private volatile int corePoolSize; /** * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. */ + //最大worker的数量,一般当workQueue满了才会用到这个参数 private volatile int maximumPoolSize; /** @@ -568,10 +577,13 @@ private final class Worker private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ + //运行的线程,前面addWorker方法中就是直接通过启动这个线程来启动这个worker final Thread thread; /** Initial task to run. Possibly null. */ + //当一个worker刚创建的时候,就先尝试执行这个任务 Runnable firstTask; /** Per-thread task counter */ + //记录完成任务的数量 volatile long completedTasks; /** @@ -581,6 +593,7 @@ private final class Worker Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; + //创建一个Thread,将自己设置给他,后面这个thread启动的时候,也就是执行worker的run方法 this.thread = getThreadFactory().newThread(this); } @@ -725,6 +738,7 @@ private void interruptWorkers() { mainLock.lock(); try { for (Worker w : workers) + //遍历所有worker,然后调用中断方法 w.interruptIfStarted(); } finally { mainLock.unlock(); @@ -754,8 +768,12 @@ private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { + //遍历所有的worker for (Worker w : workers) { Thread t = w.thread; + //先尝试调用w.tryLock(),如果获取到锁,就说明worker是空闲的,就可以直接中断它 + //注意的是,worker自己本身实现了AQS同步框架,然后实现的类似锁的功能 + //它实现的锁是不可重入的,所以如果worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回false if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); @@ -1088,6 +1106,7 @@ private Runnable getTask() { */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); + // 先执行firstTask,再从workerQueue中取task(getTask()) Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts @@ -1355,9 +1374,13 @@ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { + //检查是否可以关闭线程 checkShutdownAccess(); + //设置线程池状态 advanceRunState(SHUTDOWN); + //尝试中断worker interruptIdleWorkers(); + //预留方法,留给子类实现 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); @@ -1387,9 +1410,13 @@ public List shutdownNow() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { + //检测权限 checkShutdownAccess(); + // 设置线程池状态 advanceRunState(STOP); + //中断所有的worker interruptWorkers(); + //清空任务队列 tasks = drainQueue(); } finally { mainLock.unlock(); diff --git a/src/java/util/concurrent/atomic/AtomicInteger.java b/src/java/util/concurrent/atomic/AtomicInteger.java index e1c68153..7f53816f 100644 --- a/src/java/util/concurrent/atomic/AtomicInteger.java +++ b/src/java/util/concurrent/atomic/AtomicInteger.java @@ -20,19 +20,27 @@ public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // setup to use Unsafe.compareAndSwapInt for updates + // 内部实际上依赖于Unsafe类的方法,对 value值进行操作 private static final Unsafe unsafe = Unsafe.getUnsafe(); + // value字段的偏移量 private static final long valueOffset; static { try { + //初始化value字段的偏移量 valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } + /** + * 内部的value属性,它就代表了Atomiclnteger 的当前实际取值。 + * 所有的方法都是围绕该值进行的 + */ private volatile int value; /** + * 使用给定值初始化value * Creates a new AtomicInteger with the given initial value. * * @param initialValue the initial value @@ -42,12 +50,14 @@ public AtomicInteger(int initialValue) { } /** + * 初始化value值为0 * Creates a new AtomicInteger with initial value {@code 0}. */ public AtomicInteger() { } /** + * 获取当前最新值 * Gets the current value. * * @return the current value @@ -57,6 +67,7 @@ public final int get() { } /** + * 设置给定新值 * Sets to the given value. * * @param newValue the new value @@ -66,6 +77,7 @@ public final void set(int newValue) { } /** + * 1. 最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值。 * Eventually sets to the given value. * * @param newValue the new value @@ -76,6 +88,7 @@ public final void lazySet(int newValue) { } /** + * 原子性的将当前值设为给定新值,返回旧值 * Atomically sets to the given value and returns the old value. * * @param newValue the new value @@ -86,6 +99,7 @@ public final int getAndSet(int newValue) { } /** + * 如果当前值等于预期值,则以原子方式将该值设置为给定的新值 * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * @@ -115,6 +129,7 @@ public final boolean weakCompareAndSet(int expect, int update) { } /** + * 原子性的将当前值加1,返回旧值 * Atomically increments by one the current value. * * @return the previous value @@ -124,6 +139,7 @@ public final int getAndIncrement() { } /** + * 原子性的将当前值减1,返回旧值 * Atomically decrements by one the current value. * * @return the previous value @@ -133,34 +149,38 @@ public final int getAndDecrement() { } /** + * 原子性的将当前值增加delta,返回旧值 * Atomically adds the given value to the current value. * - * @param delta the value to add - * @return the previous value + * @param delta the value to add 增加的值 + * @return the previous value 旧值 */ public final int getAndAdd(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta); } /** + * 原子性的将当前值加1,返回新值 * Atomically increments by one the current value. * - * @return the updated value + * @return the updated value 更新后的值 */ public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } /** + * 原子性的将当前值减1,返回新值 * Atomically decrements by one the current value. * - * @return the updated value + * @return the updated value 更新后的值 */ public final int decrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, -1) - 1; } /** + * 原子性的将当前值增加delta,返回新值 * Atomically adds the given value to the current value. * * @param delta the value to add diff --git a/src/java/util/concurrent/atomic/AtomicStampedReference.java b/src/java/util/concurrent/atomic/AtomicStampedReference.java index e33ea040..ad383b73 100644 --- a/src/java/util/concurrent/atomic/AtomicStampedReference.java +++ b/src/java/util/concurrent/atomic/AtomicStampedReference.java @@ -49,32 +49,51 @@ */ public class AtomicStampedReference { + /** + * Pair内部类,用于维护reference和stamp + * @param + */ private static class Pair { + /** + * 真正的数据 + */ final T reference; + /** + * 版本号 + */ final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } + /** + * 返回Pair实例 + */ static Pair of(T reference, int stamp) { return new Pair(reference, stamp); } } + /** + * 由于要维护两个属性,因此干脆使用一个内部类对象来维护这两个属性 + */ private volatile Pair pair; /** + * 创建具有给定初始值的新 AtomicStampedReference。 * Creates a new {@code AtomicStampedReference} with the given * initial values. * - * @param initialRef the initial reference - * @param initialStamp the initial stamp + * @param initialRef the initial reference 初始值 + * @param initialStamp the initial stamp 初始版本号 */ public AtomicStampedReference(V initialRef, int initialStamp) { + //初始化一个Pair对象,并初始化属性值 pair = Pair.of(initialRef, initialStamp); } /** + * 获得当前保存的对象引用 * Returns the current value of the reference. * * @return the current value of the reference @@ -84,6 +103,7 @@ public V getReference() { } /** + * 获得当前保存的版本号 * Returns the current value of the stamp. * * @return the current value of the stamp @@ -131,6 +151,7 @@ public boolean weakCompareAndSet(V expectedReference, } /** + * 如果当前引用 == 预期引用,并且当前版本号等于预期版本号,则以原子方式将该引用和该标志的值设置为给定的更新值。 * Atomically sets the value of both the reference and stamp * to the given update values if the * current reference is {@code ==} to the expected reference @@ -147,6 +168,8 @@ public boolean compareAndSet(V expectedReference, int expectedStamp, int newStamp) { Pair current = pair; + //一系列的判断,如果两个预期值都相等,那么尝试调用compareAndSwapObject使用新的Pair对象替代旧的Pair对象 + //这样就同时完成了reference和stamp的更新 return expectedReference == current.reference && expectedStamp == current.stamp && @@ -156,13 +179,16 @@ public boolean compareAndSet(V expectedReference, } /** + * 设置新对象引用和版本号 * Unconditionally sets the value of both the reference and stamp. * - * @param newReference the new value for the reference - * @param newStamp the new value for the stamp + * @param newReference the new value for the reference 新对象引用 + * @param newStamp the new value for the stamp 新版本号 */ public void set(V newReference, int newStamp) { Pair current = pair; + //如果新对象引用以及新版本号和之前的都一样那就不设置 + //否则就是新建一个Pair对象并设置相应的属性,替代原来的Pair对象 if (newReference != current.reference || newStamp != current.stamp) this.pair = Pair.of(newReference, newStamp); } @@ -194,6 +220,13 @@ public boolean attemptStamp(V expectedReference, int newStamp) { private static final long pairOffset = objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class); + /** + * CAS替换内部的Pair对象的方法 + * + * @param cmp 预期pair对象 + * @param val 新pair对象 + * @return 如果成功,则返回 true + */ private boolean casPair(Pair cmp, Pair val) { return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val); } diff --git a/src/java/util/concurrent/locks/AbstractQueuedSynchronizer.java b/src/java/util/concurrent/locks/AbstractQueuedSynchronizer.java index 65270e65..7ea49f95 100644 --- a/src/java/util/concurrent/locks/AbstractQueuedSynchronizer.java +++ b/src/java/util/concurrent/locks/AbstractQueuedSynchronizer.java @@ -299,6 +299,7 @@ public abstract class AbstractQueuedSynchronizer protected AbstractQueuedSynchronizer() { } /** + * 内部类 - Node类 * Wait queue node class. * *

The wait queue is a variant of a "CLH" (Craig, Landin, and @@ -376,13 +377,23 @@ protected AbstractQueuedSynchronizer() { } * Scherer and Michael Scott, along with members of JSR-166 * expert group, for helpful ideas, discussions, and critiques * on the design of this class. + * 内部类 - Node */ static final class Node { /** Marker to indicate a node is waiting in shared mode */ + // 模式,分为共享与独占 + // 共享模式 static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ + // 独占模式 static final Node EXCLUSIVE = null; + // 结点状态 + // CANCELLED,值为1,表示当前的线程被取消 + // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark + // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中 + // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行 + // 值为0,表示当前节点在sync队列中,等待着获取锁 /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ @@ -429,6 +440,7 @@ static final class Node { * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ + // 结点状态 volatile int waitStatus; /** @@ -442,6 +454,7 @@ static final class Node { * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ + // 前驱结点 volatile Node prev; /** @@ -457,12 +470,14 @@ static final class Node { * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ + // 后继结点 volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ + // 结点所对应的线程 volatile Thread thread; /** @@ -475,9 +490,11 @@ static final class Node { * we save a field by using special value to indicate shared * mode. */ + // 下一个等待者 Node nextWaiter; /** + * // 结点是否在共享模式下等待 * Returns true if node is waiting in shared mode. */ final boolean isShared() { @@ -485,6 +502,7 @@ final boolean isShared() { } /** + * // 获取前驱结点,若前驱结点为空,抛出异常 * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. @@ -492,21 +510,25 @@ final boolean isShared() { * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { + // 保存前驱结点 Node p = prev; - if (p == null) + if (p == null)// 前驱结点为空,抛出异常 throw new NullPointerException(); - else + else// 前驱结点不为空,返回 return p; } + // 无参构造方法 Node() { // Used to establish initial head or SHARED marker } + // 构造方法 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } + // 构造方法 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; @@ -519,17 +541,20 @@ final Node predecessor() throws NullPointerException { * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ + // 头节点 private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ + // 尾结点 private transient volatile Node tail; /** * The synchronization state. */ + // 状态 private volatile int state; /** @@ -573,47 +598,59 @@ protected final boolean compareAndSetState(int expect, int update) { * rather than to use timed park. A rough estimate suffices * to improve responsiveness with very short timeouts. */ + // 自旋时间 static final long spinForTimeoutThreshold = 1000L; - /** + /**\ + * * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { - for (;;) { + for (;;) {// 无限循环,确保结点能够成功入队列 + // 保存尾结点 Node t = tail; - if (t == null) { // Must initialize - if (compareAndSetHead(new Node())) - tail = head; - } else { + if (t == null) {// 尾结点为空,即还没被初始化 // Must initialize + if (compareAndSetHead(new Node()))// 头节点为空,并设置头节点为新生成的结点 + tail = head;// 头节点与尾结点都指向同一个新生结点 + } else {// 尾结点不为空,即已经被初始化过 + // 将node结点的prev域连接到尾结点 node.prev = t; - if (compareAndSetTail(t, node)) { + if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node + // 设置尾结点的next域为node t.next = node; - return t; + return t;// 返回尾结点 } } } } /** + * 获取锁失败后,会执行 addWaiter(Node.EXCLUSIVE)加入等待队列,具体实现方法如下: * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { + // 新生成一个结点,默认为独占模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure + // 保存尾结点 Node pred = tail; - if (pred != null) { + if (pred != null) { // 尾结点不为空,即已经被初始化 + // 将node结点的prev域连接到尾结点 node.prev = pred; - if (compareAndSetTail(pred, node)) { + // 4. 通过compareAndSetTail方法,完成尾节点的设置。这个方法主要是对 tailOffset和 Expect进行比较, + // 如果 tailOffset的 Node和 Expect的 Node地址是相同的,那么设置 Tail的值为 Update的值。 + if (compareAndSetTail(pred, node)) {// 比较pred是否为尾结点,是则将尾结点设置为node + // 设置尾结点的next域为node pred.next = node; - return node; + return node;// 返回新生成的结点 } } - enq(node); + enq(node);// 尾结点为空(即还没有被初始化过),或者是compareAndSetTail操作失败,则入队列 return node; } @@ -641,6 +678,7 @@ private void unparkSuccessor(Node node) { * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ + // 获取头结点waitStatus int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); @@ -651,14 +689,18 @@ private void unparkSuccessor(Node node) { * traverse backwards from tail to find the actual * non-cancelled successor. */ + // 获取当前节点的下一个节点 Node s = node.next; + // 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点 if (s == null || s.waitStatus > 0) { s = null; + // 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) + // 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点unpark LockSupport.unpark(s.thread); } @@ -679,20 +721,24 @@ private void doReleaseShared() { * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ + // 无限循环 for (;;) { + // 保存头节点 Node h = head; - if (h != null && h != tail) { + if (h != null && h != tail) {// 头节点不为空并且头节点不为尾结点 + // 获取头节点的等待状态 int ws = h.waitStatus; - if (ws == Node.SIGNAL) { - if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) + if (ws == Node.SIGNAL) {// 状态为SIGNAL + if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 不成功就继续 continue; // loop to recheck cases + // 释放后继结点 unparkSuccessor(h); } else if (ws == 0 && - !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) + !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 状态为0并且不成功,继续 continue; // loop on failed CAS } - if (h == head) // loop if head changed + if (h == head) // 若头节点改变,继续循环 // loop if head changed break; } } @@ -706,7 +752,9 @@ else if (ws == 0 && * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { + // 获取头节点 Node h = head; // Record old head for check below + // 设置头节点 setHead(node); /* * Try to signal next queued node if: @@ -724,10 +772,13 @@ private void setHeadAndPropagate(Node node, int propagate) { * racing acquires/releases, so most need signals now or soon * anyway. */ + // 进行判断 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { + // 获取节点的后继 Node s = node.next; - if (s == null || s.isShared()) + if (s == null || s.isShared()) // 后继为空或者为共享模式 + // 以共享模式进行释放 doReleaseShared(); } } @@ -741,12 +792,14 @@ private void setHeadAndPropagate(Node node, int propagate) { */ private void cancelAcquire(Node node) { // Ignore if node doesn't exist + // 将无效节点过滤 if (node == null) return; - + // 设置该节点不关联任何线程,也就是虚节点 node.thread = null; // Skip cancelled predecessors + // 通过前驱节点,跳过取消状态的node Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; @@ -754,20 +807,27 @@ private void cancelAcquire(Node node) { // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. + // 获取过滤后的前驱节点的后继节点 Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. + // 把当前node的状态设置为CANCELLED node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. + // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点 + // 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; + // 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功 + // 如果1和2中有一个为true,再判断当前节点的线程是否为null + // 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && @@ -776,6 +836,7 @@ private void cancelAcquire(Node node) { if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { + // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点 unparkSuccessor(node); } @@ -792,20 +853,25 @@ private void cancelAcquire(Node node) { * @param node the node * @return {@code true} if thread should block */ + // 靠前驱节点判断当前线程是否应该被阻塞 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { + // 获取头结点的节点状态 int ws = pred.waitStatus; + // 说明头结点处于唤醒状态 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; + // 通过枚举值我们知道waitStatus>0是取消状态 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { + // 循环向前查找取消节点,把取消节点从队列中剔除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; @@ -815,6 +881,7 @@ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ + // 设置前任节点等待状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; @@ -847,6 +914,8 @@ private final boolean parkAndCheckInterrupt() { */ /** + * acquireQueued方法可以对排队中的线程进行“获锁”操作。总的来说,一个线程获取锁失败了,被放入等待队列, + * acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。 * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * @@ -855,17 +924,24 @@ private final boolean parkAndCheckInterrupt() { * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { + // 标记是否成功拿到资源 boolean failed = true; try { + // 标记等待过程中是否中断过 boolean interrupted = false; + // 开始自旋,要么获取锁,要么中断 for (;;) { + // 获取当前节点的前驱节点 final Node p = node.predecessor(); + // 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点) if (p == head && tryAcquire(arg)) { + // 获取锁成功,头指针移动到当前node setHead(node); p.next = null; // help GC failed = false; return interrupted; } + // 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; @@ -979,22 +1055,28 @@ private void doAcquireShared(int arg) { */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { + // 添加节点至等待队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { - for (;;) { + for (;;) {// 无限循环 + // 获取node的前驱节点 final Node p = node.predecessor(); - if (p == head) { + if (p == head) { // 前驱节点为头节点 + // 试图在共享模式下获取对象状态 int r = tryAcquireShared(arg); - if (r >= 0) { + if (r >= 0) {// 获取成功 + // 设置头节点并进行繁殖 setHeadAndPropagate(node, r); + // 设置节点next域 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && - parkAndCheckInterrupt()) + parkAndCheckInterrupt())// 在获取失败后是否需要禁止线程并且进行中断检查 + // 抛出异常 throw new InterruptedException(); } } finally { @@ -1047,6 +1129,7 @@ private boolean doAcquireSharedNanos(int arg, long nanosTimeout) // Main exported methods /** + * 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。 * Attempts to acquire in exclusive mode. This method should query * if the state of the object permits it to be acquired in the * exclusive mode, and if so to acquire it. @@ -1077,6 +1160,7 @@ protected boolean tryAcquire(int arg) { } /** + * 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。 * Attempts to set the state to reflect a release in exclusive * mode. * @@ -1103,6 +1187,7 @@ protected boolean tryRelease(int arg) { } /** + * 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 * Attempts to acquire in shared mode. This method should query if * the state of the object permits it to be acquired in the shared * mode, and if so to acquire it. @@ -1139,6 +1224,7 @@ protected int tryAcquireShared(int arg) { } /** + * 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。 * Attempts to set the state to reflect a release in shared mode. * *

This method is always invoked by the thread performing release. @@ -1164,6 +1250,7 @@ protected boolean tryReleaseShared(int arg) { } /** + * 该线程是否正在独占资源。只有用到Condition才需要去实现它。 * Returns {@code true} if synchronization is held exclusively with * respect to the current (calling) thread. This method is invoked * upon each call to a non-waiting {@link ConditionObject} method. @@ -1195,6 +1282,7 @@ protected boolean isHeldExclusively() { * can represent anything you like. */ public final void acquire(int arg) { + // 当执行Acquire(1)时,会通过tryAcquire获取锁。在这种情况下,如果获取锁失败,就会调用 addWaiter加入到等待队列中去。 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); @@ -1258,8 +1346,11 @@ public final boolean tryAcquireNanos(int arg, long nanosTimeout) * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { + // 上边自定义的tryRelease如果返回true,说明该锁没有被任何线程持有 if (tryRelease(arg)) { + // 获取头结点 Node h = head; + // 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; @@ -1467,6 +1558,9 @@ final boolean apparentlyFirstQueuedIsExclusive() { } /** + * hasQueuedPredecessors 公平锁加锁时判断等待队列中是否存在有效节点的方法。 + * 如果返回False,说明当前线程可以争取共享资源; + * 如果返回True,说明队列中存在有效节点,当前线程必须加入到等待队列中。 * Queries whether any threads have been waiting to acquire longer * than the current thread. * @@ -1667,10 +1761,14 @@ private boolean findNodeFromTail(Node node) { * @return true if successfully transferred (else the node was * cancelled before signal) */ + // node节点是等待队列上的节点 final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ + // 改变节点的等待状态为0 + // 0表示:当前节点在sync队列中,等待着获取锁。-2表示当前节点在等待condition,也就是在condition队列中 + // 返回false,外层的循环继续执行 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; @@ -1680,10 +1778,17 @@ final boolean transferForSignal(Node node) { * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ + // 将节点加入到同步队列中,返回node节点的前驱结点,也就是老的尾节点 Node p = enq(node); int ws = p.waitStatus; + // 大于0的状态只有1,也就是取消 + // 如果老的尾节点被取消 或者 更新老的尾节点为SIGNAL失败,可以直接轮到当前节点,直接唤醒当前节点的线程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); + // 如果老的尾节点没有被取消 或者 更新老的尾节点为SIGNAL成功,则返回true + // 返回true的话,外层的do循环会直接退出 + // 所以这个方法最核心的逻辑知识把等待队列的节点转移到同步队列中去 + // 转移到同步队列后,signal()方法调用完成后紧接着应该是unlock()方法,移动同步队列的新节点等待被唤醒 return true; } @@ -1826,17 +1931,21 @@ public final Collection getWaitingThreads(ConditionObject condition) { * *

This class is Serializable, but all fields are transient, * so deserialized conditions have no waiters. + * 内部类 */ public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ + // condition队列的头节点 private transient Node firstWaiter; /** Last node of condition queue. */ + // condition队列的尾结点 private transient Node lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ + // 构造方法 public ConditionObject() { } // Internal methods @@ -1845,18 +1954,26 @@ public ConditionObject() { } * Adds a new waiter to wait queue. * @return its new wait node */ + // 添加新的waiter到wait队列 private Node addConditionWaiter() { + // 保存尾结点 Node t = lastWaiter; // If lastWaiter is cancelled, clean out. - if (t != null && t.waitStatus != Node.CONDITION) { + if (t != null && t.waitStatus != Node.CONDITION) {// 尾结点不为空,并且尾结点的状态不为CONDITION + // 清除状态为CONDITION的结点 unlinkCancelledWaiters(); + // 将最后一个结点重新赋值给t t = lastWaiter; } + // 新建一个结点 Node node = new Node(Thread.currentThread(), Node.CONDITION); - if (t == null) + if (t == null)// 尾结点为空 + // 设置condition队列的头节点 firstWaiter = node; - else + else// 尾结点不为空 + // 设置为节点的nextWaiter域为node结点 t.nextWaiter = node; + // 更新condition队列的尾结点 lastWaiter = node; return node; } @@ -1868,10 +1985,15 @@ private Node addConditionWaiter() { * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { + // 循环 do { + // firstWaiter = first.nextWaiter 重新赋值等待队列头结点 if ( (firstWaiter = first.nextWaiter) == null) + // 等待队列 为空 lastWaiter = null; + // 断掉节点关系 first.nextWaiter = null; + // transferForSignal 将节点从等待队列转移到同步队列 } while (!transferForSignal(first) && (first = firstWaiter) != null); } @@ -1881,11 +2003,18 @@ private void doSignal(Node first) { * @param first (non-null) the first node on condition queue */ private void doSignalAll(Node first) { + // condition队列的头节点尾结点都设置为空 lastWaiter = firstWaiter = null; + // 遍历所有的等待队列 do { + // 获取first结点的nextWaiter域结点 Node next = first.nextWaiter; + // 设置first结点的nextWaiter域为空 first.nextWaiter = null; + // 等待队列转移到同步队列,signal方法也是同样转移的 + // 将first结点从condition队列转移到sync队列 transferForSignal(first); + // 重新设置first first = next; } while (first != null); } @@ -1904,22 +2033,31 @@ private void doSignalAll(Node first) { * without requiring many re-traversals during cancellation * storms. */ + // 从condition队列中清除状态为CANCEL的结点 private void unlinkCancelledWaiters() { + // 保存condition队列头节点 Node t = firstWaiter; Node trail = null; - while (t != null) { + while (t != null) { // t不为空 + // 下一个结点 Node next = t.nextWaiter; - if (t.waitStatus != Node.CONDITION) { + if (t.waitStatus != Node.CONDITION) { // t结点的状态不为CONDTION状态 + // 设置t节点的nextWaiter域为空 t.nextWaiter = null; - if (trail == null) + if (trail == null) // trail为空 + // 重新设置condition队列的头节点 firstWaiter = next; - else + else// trail不为空 + // 设置trail结点的nextWaiter域为next结点 trail.nextWaiter = next; - if (next == null) + if (next == null)// next结点为空 + // 设置condition队列的尾结点 lastWaiter = trail; } - else + else// t结点的状态为CONDTION状态 + // 设置trail结点 trail = t; + // 设置t结点 t = next; } } @@ -1934,11 +2072,16 @@ private void unlinkCancelledWaiters() { * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ + // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。 public final void signal() { + // 判断是否是当前线程持有锁,不是则抛出异常 + // 说明了调用这个方法之前也必须要持有锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); + // 等待队列队头,理论上就是第一次调用await()时加入的节点线程 Node first = firstWaiter; if (first != null) + // 发信号 doSignal(first); } @@ -1949,11 +2092,14 @@ public final void signal() { * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ + // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。 public final void signalAll() { - if (!isHeldExclusively()) + if (!isHeldExclusively())// 不被当前线程独占,抛出异常 throw new IllegalMonitorStateException(); + // 保存condition队列头节点 Node first = firstWaiter; - if (first != null) + if (first != null)// 头节点不为空 + // 唤醒所有等待线程 doSignalAll(first); } @@ -1968,13 +2114,18 @@ public final void signalAll() { * {@link #acquire} with saved state as argument. * */ + // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断 public final void awaitUninterruptibly() { + // 添加一个结点到等待队列 Node node = addConditionWaiter(); + // 获取释放的状态 int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { + // 阻塞当前线程 LockSupport.park(this); - if (Thread.interrupted()) + if (Thread.interrupted())// 当前线程被中断 + // 设置interrupted状态 interrupted = true; } if (acquireQueued(node, savedState) || interrupted) @@ -2029,22 +2180,36 @@ else if (interruptMode == REINTERRUPT) *

  • If interrupted while blocked in step 4, throw InterruptedException. * */ + // 等待,当前线程在接到信号或被中断之前一直处于等待状态 public final void await() throws InterruptedException { if (Thread.interrupted()) + // 如果当前线程被中断,抛出InterruptedException throw new InterruptedException(); + // 以当前线程为节点添加到等待队列,并返回当前节点 Node node = addConditionWaiter(); + // 完全释放当前线程获得锁,并返回释放前state的值 int savedState = fullyRelease(node); + // 中断标识 int interruptMode = 0; + // 检查当前节点的是否在同步队列,注意前面的感叹号,是节点不在同步队列中,才将当前线程park while (!isOnSyncQueue(node)) { + // 调用Unsafa类底层阻塞线程,等待唤醒自己的条件信号 + // 阻塞当前线程 LockSupport.park(this); + // 当被唤醒以后,接着从下面开始执行 + // checkInterruptWhileWaiting 检查线程是否被中断 + // 发出信号之前被中断,返回-1,发出信号之后被中断,返回1,没有被中断,返回0 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } + // 再次从同步队列获得锁,获取不到锁会再次阻塞线程 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled + // 判断条件等待队列中有没有线程被取消,如果有,则将它们清除 unlinkCancelledWaiters(); if (interruptMode != 0) + // 发生了中断,抛出异常或者重新中断当前线程 reportInterruptAfterWait(interruptMode); } @@ -2061,6 +2226,7 @@ public final void await() throws InterruptedException { *
  • If interrupted while blocked in step 4, throw InterruptedException. * */ + // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) @@ -2103,6 +2269,7 @@ public final long awaitNanos(long nanosTimeout) *
  • If timed out while blocked in step 4, return false, else true. * */ + // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态 public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); @@ -2144,32 +2311,45 @@ public final boolean awaitUntil(Date deadline) *
  • If timed out while blocked in step 4, return false, else true. * */ + // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0 public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) + // 如果当前线程被中断,抛出InterruptedException throw new InterruptedException(); + // 以当前线程为节点添加到等待队列,并返回当前节点 Node node = addConditionWaiter(); + // 完全释放当前线程获得锁,并返回释放前state的值 int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; + // 中断标识 int interruptMode = 0; + // 检查当前节点的是否在同步队列,注意前面的感叹号,是节点不在同步队列中,才将当前线程park while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) + // 调用Unsafa类底层阻塞线程,等待唤醒自己的条件信号 LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) + // 当被唤醒以后,接着从下面开始执行 + // checkInterruptWhileWaiting 检查线程是否被中断 + // 发出信号之前被中断,返回-1,发出信号之后被中断,返回1,没有被中断,返回0 break; nanosTimeout = deadline - System.nanoTime(); } + // 再次从同步队列获得锁,获取不到锁会再次阻塞线程 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) + // 判断条件等待队列中有没有线程被取消,如果有,则将它们清除 unlinkCancelledWaiters(); if (interruptMode != 0) + // 发生了中断,抛出异常或者重新中断当前线程 reportInterruptAfterWait(interruptMode); return !timedout; } @@ -2194,6 +2374,7 @@ final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ + // 查询是否有正在等待此条件的任何线程 protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); @@ -2213,6 +2394,7 @@ protected final boolean hasWaiters() { * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ + // 返回正在等待此条件的线程数估计值 protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); @@ -2233,6 +2415,7 @@ protected final int getWaitQueueLength() { * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ + // 返回包含那些可能正在等待此条件的线程集合 protected final Collection getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); @@ -2257,13 +2440,24 @@ protected final Collection getWaitingThreads() { * are at it, we do the same for other CASable fields (which could * otherwise be done with atomic field updaters). */ + // Unsafe类实例 private static final Unsafe unsafe = Unsafe.getUnsafe(); + // state内存偏移地址 private static final long stateOffset; + // head内存偏移地址 private static final long headOffset; + // tail内存偏移地址 private static final long tailOffset; private static final long waitStatusOffset; + // next内存偏移地址 private static final long nextOffset; + /** + * 从AQS的静态代码块可以看出,都是获取一个对象的属性相对于该对象在内存当中的偏移量, + * 这样我们就可以根据这个偏移量在对象内存当中找到这个属性。 + * tailOffset指的是 tail对应的偏移量,所以这个时候会将 new出来的 Node置为当前队列的尾节点。 + * 同时,由于是双向链表,也需要将前一个节点指向尾节点。 + */ static { try { stateOffset = unsafe.objectFieldOffset diff --git a/src/java/util/concurrent/locks/Condition.java b/src/java/util/concurrent/locks/Condition.java index c4cad05c..0130f3d2 100644 --- a/src/java/util/concurrent/locks/Condition.java +++ b/src/java/util/concurrent/locks/Condition.java @@ -228,6 +228,7 @@ public interface Condition { * @throws InterruptedException if the current thread is interrupted * (and interruption of thread suspension is supported) */ + // 等待,当前线程在接到信号或被中断之前一直处于等待状态 void await() throws InterruptedException; /** @@ -264,6 +265,7 @@ public interface Condition { * thrown (such as {@link IllegalMonitorStateException}) and the * implementation must document that fact. */ + // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断 void awaitUninterruptibly(); /** @@ -355,6 +357,7 @@ public interface Condition { * @throws InterruptedException if the current thread is interrupted * (and interruption of thread suspension is supported) */ + //等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 long awaitNanos(long nanosTimeout) throws InterruptedException; /** @@ -370,6 +373,7 @@ public interface Condition { * @throws InterruptedException if the current thread is interrupted * (and interruption of thread suspension is supported) */ + // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0 boolean await(long time, TimeUnit unit) throws InterruptedException; /** @@ -447,6 +451,7 @@ public interface Condition { * @throws InterruptedException if the current thread is interrupted * (and interruption of thread suspension is supported) */ + // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态 boolean awaitUntil(Date deadline) throws InterruptedException; /** @@ -465,6 +470,7 @@ public interface Condition { * not held. Typically, an exception such as {@link * IllegalMonitorStateException} will be thrown. */ + // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。 void signal(); /** @@ -483,5 +489,6 @@ public interface Condition { * not held. Typically, an exception such as {@link * IllegalMonitorStateException} will be thrown. */ + // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。 void signalAll(); } diff --git a/src/java/util/concurrent/locks/LockSupport.java b/src/java/util/concurrent/locks/LockSupport.java index ea629ebb..d46a4d87 100644 --- a/src/java/util/concurrent/locks/LockSupport.java +++ b/src/java/util/concurrent/locks/LockSupport.java @@ -36,6 +36,8 @@ package java.util.concurrent.locks; import sun.misc.Unsafe; +import java.util.concurrent.ThreadPoolExecutor; + /** * Basic thread blocking primitives for creating locks and other * synchronization classes. @@ -122,6 +124,7 @@ private LockSupport() {} // Cannot be instantiated. private static void setBlocker(Thread t, Object arg) { // Even though volatile, hotspot doesn't need a write barrier here. + // 设置线程t的parkBlocker字段的值为arg UNSAFE.putObject(t, parkBlockerOffset, arg); } @@ -137,8 +140,9 @@ private static void setBlocker(Thread t, Object arg) { * this operation has no effect */ public static void unpark(Thread thread) { + // 线程为不空 if (thread != null) - UNSAFE.unpark(thread); + UNSAFE.unpark(thread);// 释放该线程许可 } /** @@ -170,9 +174,13 @@ public static void unpark(Thread thread) { * @since 1.6 */ public static void park(Object blocker) { + // 获取当前线程 Thread t = Thread.currentThread(); + // 设置Blocker setBlocker(t, blocker); + // 获取许可 UNSAFE.park(false, 0L); + // 重新可运行后再此设置Blocker setBlocker(t, null); } @@ -209,10 +217,15 @@ public static void park(Object blocker) { * @since 1.6 */ public static void parkNanos(Object blocker, long nanos) { + // 时间大于0 if (nanos > 0) { + // 获取当前线程 Thread t = Thread.currentThread(); + // 设置Blocker setBlocker(t, blocker); + // 获取许可,并设置了时间 UNSAFE.park(false, nanos); + // 设置许可 setBlocker(t, null); } } @@ -251,9 +264,12 @@ public static void parkNanos(Object blocker, long nanos) { * @since 1.6 */ public static void parkUntil(Object blocker, long deadline) { + // 获取当前线程 Thread t = Thread.currentThread(); + // 设置Blocker setBlocker(t, blocker); UNSAFE.park(true, deadline); + // 设置Blocker为null setBlocker(t, null); } @@ -301,6 +317,7 @@ public static Object getBlocker(Thread t) { * for example, the interrupt status of the thread upon return. */ public static void park() { + // 获取许可,设置时间为无限长,直到可以获取许可 UNSAFE.park(false, 0L); } @@ -392,20 +409,30 @@ else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0) // Hotspot implementation via intrinsics API private static final sun.misc.Unsafe UNSAFE; + // parkBlocker字段的内存偏移地址 private static final long parkBlockerOffset; + //threadLocalRandomSeed字段的内存偏移地址 private static final long SEED; + // threadLocalRandomProbe字段的内存偏移地址 private static final long PROBE; + // threadLocalRandomSecondarySeed字段的内存偏移地址 private static final long SECONDARY; static { try { + // 获取Unsafe实例 UNSAFE = sun.misc.Unsafe.getUnsafe(); + // 线程类类型 Class tk = Thread.class; + // 获取Thread的parkBlocker字段的内存偏移地址 parkBlockerOffset = UNSAFE.objectFieldOffset (tk.getDeclaredField("parkBlocker")); + // 获取Thread的threadLocalRandomSeed字段的内存偏移地址 SEED = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomSeed")); + // 获取Thread的threadLocalRandomProbe字段的内存偏移地址 PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe")); + // 获取Thread的threadLocalRandomSecondarySeed字段的内存偏移地址 SECONDARY = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomSecondarySeed")); } catch (Exception ex) { throw new Error(ex); } diff --git a/src/java/util/concurrent/locks/ReentrantLock.java b/src/java/util/concurrent/locks/ReentrantLock.java index c3ec2eab..06c9072a 100644 --- a/src/java/util/concurrent/locks/ReentrantLock.java +++ b/src/java/util/concurrent/locks/ReentrantLock.java @@ -114,75 +114,97 @@ public class ReentrantLock implements Lock, java.io.Serializable { * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { + // 序列号 private static final long serialVersionUID = -5179523762034025860L; /** + * 获取锁 * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); /** + * 非公平方式获取 * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { + // 当前线程 final Thread current = Thread.currentThread(); + // 获取状态 int c = getState(); - if (c == 0) { + if (c == 0) { // 表示没有线程正在竞争该锁 if (compareAndSetState(0, acquires)) { + // 设置当前线程独占 setExclusiveOwnerThread(current); - return true; + return true; // 成功 } } - else if (current == getExclusiveOwnerThread()) { - int nextc = c + acquires; + else if (current == getExclusiveOwnerThread()) {// 当前线程拥有该锁 + int nextc = c + acquires;// 增加重入次数 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); + // 设置状态 setState(nextc); + // 成功 return true; } + // 失败 return false; } + // 试图在共享模式下获取对象状态,此方法应该查询是否允许它在共享模式下获取对象状态,如果允许,则获取它 + // 方法返回当前锁是不是没有被线程持有 protected final boolean tryRelease(int releases) { + // 减少可重入次数 int c = getState() - releases; - if (Thread.currentThread() != getExclusiveOwnerThread()) - throw new IllegalMonitorStateException(); + // 当前线程不是持有锁的线程,抛出异常 + if (Thread.currentThread() != getExclusiveOwnerThread())// 当前线程不为独占线程 + throw new IllegalMonitorStateException();// 抛出异常 + // 释放标识 boolean free = false; + // 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state if (c == 0) { free = true; + // 已经释放,清空独占 setExclusiveOwnerThread(null); } + // 设置标识 setState(c); return free; } + // 判断资源是否被当前线程占有 protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } + // 新生一个条件 final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class - + // 返回资源的占用线程 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } + // 返回状态 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } + // 资源是否被占用 final boolean isLocked() { return getState() != 0; } /** + * 自定义反序列化逻辑 * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) @@ -193,19 +215,27 @@ private void readObject(java.io.ObjectInputStream s) } /** + * 非公平锁 * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** + * 获得锁 * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { + /** + * 若通过CAS设置变量State(同步状态)成功,也就是获取锁成功,则将当前线程设置为独占线程。 + * 若通过CAS设置变量State(同步状态)失败,也就是获取锁失败,则进入Acquire方法进行后续处理。 + */ if (compareAndSetState(0, 1)) + // 把当前线程设置独占了锁 setExclusiveOwnerThread(Thread.currentThread()); - else + else// 锁已经被占用,或者set失败 + // 以独占模式获取对象,忽略中断 acquire(1); } @@ -215,33 +245,43 @@ protected final boolean tryAcquire(int acquires) { } /** + * 公平锁 * Sync object for fair locks */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { + // 以独占模式获取对象,忽略中断 + // aqs 的 acquire 方法 acquire(1); } /** + * 尝试公平获取锁 * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { + // 获取当前线程 final Thread current = Thread.currentThread(); + // 获取状态 int c = getState(); - if (c == 0) { + if (c == 0) {// 状态为0 + // hasQueuedPredecessors()公平锁加锁时判断等待队列中是否存在有效节点的方法 if (!hasQueuedPredecessors() && - compareAndSetState(0, acquires)) { + compareAndSetState(0, acquires)) { // 不存在已经等待更久的线程并且比较并且设置状态成功 + // 设置当前线程独占 setExclusiveOwnerThread(current); return true; } } - else if (current == getExclusiveOwnerThread()) { + else if (current == getExclusiveOwnerThread()) {// 状态不为0,即资源已经被线程占据 + // 下一个状态 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); + // 设置状态 setState(nextc); return true; } @@ -259,7 +299,7 @@ public ReentrantLock() { /** * Creates an instance of {@code ReentrantLock} with the - * given fairness policy. + * GIVEN FAIRNESS POLICY. * * @param fair {@code true} if this lock should use a fair ordering policy */