Skip to content

ReentrantLock源码分析 #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@

<a href="https://github.com/wupeixuan/JDKSourceCode1.8/blob/master/src/java/util/concurrent/ThreadPoolExecutor.java">ThreadPoolExecutor 源码解析</a>

<a href="https://github.com/wupeixuan/JDKSourceCode1.8/blob/master/src/java/util/concurrent/locks/ReentrantLock.java">ReentrantLock源码解析</a>

- 参考文章 [ReentrantLock源码分析](https://java.isture.com/thread/concurrent/ReentrantLock/source/ReentrantLock%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90.html)

<a href="https://github.com/wupeixuan/JDKSourceCode1.8/blob/master/src/java/util/concurrent/locks/AbstractQueuedSynchronizer.java">AbstractQueuedSynchronizer(AQS源码分析)</a>

- 参考文章 [锁核心类AQS详解](https://java.isture.com/thread/JUCLock/锁核心类AQS详解.html)

## 其他

<a href="https://github.com/wupeixuan/JDKSourceCode1.8/blob/master/src/java/lang/Object.java">Object 源码解析</a>
Expand Down
13 changes: 11 additions & 2 deletions src/java/util/concurrent/CountDownLatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,25 +161,33 @@ public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;


// 构造器
Sync(int count) {
setState(count);
}

// 返回当前计数
int getCount() {
return getState();
}

// 试图在共享模式下获取对象状态
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

// 试图设置状态来反映共享模式下的一个释放
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 无限循环
for (;;) {
// 获取状态
int c = getState();
if (c == 0)
if (c == 0)// 没有被线程占有
return false;
// 下一个状态
int nextc = c-1;
// 比较并且设置成功
if (compareAndSetState(c, nextc))
return nextc == 0;
}
Expand All @@ -197,6 +205,7 @@ protected boolean tryReleaseShared(int releases) {
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 初始化状态数
this.sync = new Sync(count);
}

Expand Down
47 changes: 31 additions & 16 deletions src/java/util/concurrent/FutureTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class FutureTask<V> implements RunnableFuture<V> {
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
//任务状态
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
Expand All @@ -99,15 +100,20 @@ public class FutureTask<V> implements RunnableFuture<V> {
private static final int INTERRUPTED = 6;

/** The underlying callable; nulled out after running */
//内部持有的callable任务,运行完毕后置空
private Callable<V> callable;
/** The result to return or exception to throw from get() */
//从get()中返回的结果或抛出的异常
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
//运行callable的线程
private volatile Thread runner;
/** Treiber stack of waiting threads */
//使用Treiber栈保存等待线程
private volatile WaitNode waiters;

/**
* //返回执行结果或抛出异常
* Returns result or throws exception for completed task.
*
* @param s completed state value
Expand All @@ -129,6 +135,8 @@ private V report(int s) throws ExecutionException {
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
//这个构造函数会把传入的Callable变量保存在this.callable字段中,
// 该字段定义为private Callable<V> callable;用来保存底层的调用,在被执行完成以后会指向null,接着会初始化state字段为NEW。
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
Expand All @@ -148,6 +156,7 @@ public FutureTask(Callable<V> callable) {
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/

public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
Expand All @@ -162,12 +171,13 @@ public boolean isDone() {
}

public boolean cancel(boolean mayInterruptIfRunning) {
//如果当前Future状态为NEW,根据参数修改Future状态为INTERRUPTING或CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
if (mayInterruptIfRunning) {//可以在运行时中断
try {
Thread t = runner;
if (t != null)
Expand All @@ -177,7 +187,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
}
}
} finally {
finishCompletion();
finishCompletion();//移除并唤醒所有等待线程
}
return true;
}
Expand All @@ -193,6 +203,7 @@ public V get() throws InterruptedException, ExecutionException {
}

/**
* //获取执行结果
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
Expand Down Expand Up @@ -230,7 +241,7 @@ protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
finishCompletion(); //执行完毕,唤醒等待线程
}
}

Expand All @@ -253,6 +264,7 @@ protected void setException(Throwable t) {
}

public void run() {
//新建任务,CAS替换runner为当前线程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
Expand All @@ -271,7 +283,7 @@ public void run() {
setException(ex);
}
if (ran)
set(result);
set(result);//设置执行结果
}
} finally {
// runner must be non-null until state is settled to
Expand All @@ -281,7 +293,7 @@ public void run() {
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
handlePossibleCancellationInterrupt(s);//处理中断逻辑
}
}

Expand Down Expand Up @@ -331,6 +343,7 @@ protected boolean runAndReset() {
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
//在中断者中断线程之前可能会延迟,所以我们只需要让出CPU时间片自旋等待
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
Expand Down Expand Up @@ -364,12 +377,12 @@ static final class WaitNode {
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//移除等待线程
for (;;) {//自旋遍历等待线程
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
LockSupport.unpark(t);//唤醒等待线程
}
WaitNode next = q.next;
if (next == null)
Expand All @@ -380,7 +393,7 @@ private void finishCompletion() {
break;
}
}

//任务完成后调用函数,自定义扩展
done();

callable = null; // to reduce footprint
Expand All @@ -398,35 +411,36 @@ private int awaitDone(boolean timed, long nanos)
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
for (;;) {//自旋
if (Thread.interrupted()) {//获取并清除中断状态
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
q.thread = null;//置空等待节点的线程
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
//CAS修改waiter
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
removeWaiter(q);//超时,移除等待节点
return state;
}
LockSupport.parkNanos(this, nanos);
LockSupport.parkNanos(this, nanos);//阻塞当前线程
}
else
LockSupport.park(this);
LockSupport.park(this);//阻塞当前线程
}
}

Expand All @@ -442,9 +456,10 @@ else if (timed) {
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
node.thread = null;//首先置空线程
retry:
for (;;) { // restart on removeWaiter race
//依次遍历查找
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
Expand Down
Loading