Skip to content

Commit 0a9d0b9

Browse files
committed
增加 ArrayBlockingQueue和LinkedBlockingQueue的实现原理
1 parent 368a2e7 commit 0a9d0b9

File tree

5 files changed

+373
-0
lines changed

5 files changed

+373
-0
lines changed
Loading
Loading
Loading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
2+
# 1. BlockingQueue简介 #
3+
在实际编程中,会经常使用到JDK中Collection集合框架中的各种容器类如实现List,Map,Queue接口的容器类,但是这些容器类基本上不是线程安全的,除了使用Collections可以将其转换为线程安全的容器,Doug Lea大师为我们都准备了对应的线程安全的容器,如实现List接口的CopyOnWriteArrayList([关于CopyOnWriteArrayList可以看这篇文章](https://juejin.im/post/5aeeb55f5188256715478c21)),实现Map接口的ConcurrentHashMap([关于ConcurrentHashMap可以看这篇文章](https://juejin.im/post/5aeeaba8f265da0b9d781d16)),实现Queue接口的ConcurrentLinkedQueue([关于ConcurrentLinkedQueue可以看这篇文章](https://juejin.im/post/5aeeae756fb9a07ab11112af))。
4+
5+
最常用的"**生产者-消费者**"问题中,队列通常被视作线程间操作的数据容器,这样,可以对各个模块的业务功能进行解耦,生产者将“生产”出来的数据放置在数据容器中,而消费者仅仅只需要在“数据容器”中进行获取数据即可,这样生产者线程和消费者线程就能够进行解耦,只专注于自己的业务功能即可。阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是BlockingQueue提供了可阻塞的插入和移除的方法。**当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。**
6+
7+
# 2. 基本操作 #
8+
9+
BlockingQueue基本操作总结如下(此图来源于JAVA API文档):
10+
11+
![BlockingQueue基本操作.png](http://upload-images.jianshu.io/upload_images/2615789-19d06e0ba334fe52.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
12+
13+
BlockingQueue继承于Queue接口,因此,对数据元素的基本操作有:
14+
15+
> 插入元素
16+
17+
1. add(E e) :往队列插入数据,当队列满时,插入元素时会抛出IllegalStateException异常;
18+
2. offer(E e):当往队列插入数据时,插入成功返回`true`,否则则返回`false`。当队列满时不会抛出异常;
19+
20+
> 删除元素
21+
22+
1. remove(Object o):从队列中删除数据,成功则返回`true`,否则为`false`
23+
2. poll:删除数据,当队列为空时,返回null;
24+
25+
> 查看元素
26+
27+
1. element:获取队头元素,如果队列为空时则抛出NoSuchElementException异常;
28+
2. peek:获取队头元素,如果队列为空则抛出NoSuchElementException异常
29+
30+
BlockingQueue具有的特殊操作:
31+
32+
> 插入数据:
33+
34+
1. put:当阻塞队列容量已经满时,往阻塞队列插入数据的线程会被阻塞,直至阻塞队列已经有空余的容量可供使用;
35+
2. offer(E e, long timeout, TimeUnit unit):若阻塞队列已经满时,同样会阻塞插入数据的线程,直至阻塞队列已经有空余的地方,与put方法不同的是,该方法会有一个超时时间,若超过当前给定的超时时间,插入数据的线程会退出;
36+
37+
> 删除数据
38+
39+
1. take():当阻塞队列为空时,获取队头数据的线程会被阻塞;
40+
2. poll(long timeout, TimeUnit unit):当阻塞队列为空时,获取数据的线程会被阻塞,另外,如果被阻塞的线程超过了给定的时长,该线程会退出
41+
42+
43+
# 3. 常用的BlockingQueue #
44+
实现BlockingQueue接口的有`ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, LinkedTransferQueue, PriorityBlockingQueue, SynchronousQueue`,而这几种常见的阻塞队列也是在实际编程中会常用的,下面对这几种常见的阻塞队列进行说明:
45+
46+
> 1.ArrayBlockingQueue
47+
48+
**ArrayBlockingQueue**是由数组实现的有界阻塞队列。该队列命令元素FIFO(先进先出)。因此,对头元素时队列中存在时间最长的数据元素,而对尾数据则是当前队列最新的数据元素。ArrayBlockingQueue可作为“有界数据缓冲区”,生产者插入数据到队列容器中,并由消费者提取。ArrayBlockingQueue一旦创建,容量不能改变。
49+
50+
当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。
51+
52+
ArrayBlockingQueue默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到ArrayBlockingQueue。而非公平性则是指访问ArrayBlockingQueue的顺序不是遵守严格的时间顺序,有可能存在,一旦ArrayBlockingQueue可以被访问时,长时间阻塞的线程依然无法访问到ArrayBlockingQueue。**如果保证公平性,通常会降低吞吐量**。如果需要获得公平性的ArrayBlockingQueue,可采用如下代码:
53+
54+
private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);
55+
56+
关于ArrayBlockingQueue的实现原理,可以[看这篇文章](https://juejin.im/post/5aeebdb26fb9a07aa83ea17e)
57+
58+
59+
> 2.LinkedBlockingQueue
60+
61+
LinkedBlockingQueue是用链表实现的有界阻塞队列,同样满足FIFO的特性,与ArrayBlockingQueue相比起来具有更高的吞吐量,为了防止LinkedBlockingQueue容量迅速增,损耗大量内存。通常在创建LinkedBlockingQueue对象时,会指定其大小,如果未指定,容量等于Integer.MAX_VALUE
62+
63+
64+
> 3.PriorityBlockingQueue
65+
66+
PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现compareTo()方法来指定元素排序规则,或者初始化时通过构造器参数Comparator来指定排序规则。
67+
68+
> 4.SynchronousQueue
69+
70+
SynchronousQueue每个插入操作必须等待另一个线程进行相应的删除操作,因此,SynchronousQueue实际上没有存储任何数据元素,因为只有线程在删除数据时,其他线程才能插入数据,同样的,如果当前有线程在插入数据时,线程才能删除数据。SynchronousQueue也可以通过构造器参数来为其指定公平性。
71+
72+
73+
> 5.LinkedTransferQueue
74+
75+
LinkedTransferQueue是一个由链表数据结构构成的无界阻塞队列,由于该队列实现了TransferQueue接口,与其他阻塞队列相比主要有以下不同的方法:
76+
77+
**transfer(E e)**
78+
如果当前有线程(消费者)正在调用take()方法或者可延时的poll()方法进行消费数据时,生产者线程可以调用transfer方法将数据传递给消费者线程。如果当前没有消费者线程的话,生产者线程就会将数据插入到队尾,直到有消费者能够进行消费才能退出;
79+
80+
**tryTransfer(E e)**
81+
tryTransfer方法如果当前有消费者线程(调用take方法或者具有超时特性的poll方法)正在消费数据的话,该方法可以将数据立即传送给消费者线程,如果当前没有消费者线程消费数据的话,就立即返回`false`。因此,与transfer方法相比,transfer方法是必须等到有消费者线程消费数据时,生产者线程才能够返回。而tryTransfer方法能够立即返回结果退出。
82+
83+
**tryTransfer(E e,long timeout,imeUnit unit)**</br>
84+
与transfer基本功能一样,只是增加了超时特性,如果数据才规定的超时时间内没有消费者进行消费的话,就返回`false`
85+
86+
87+
> 6.LinkedBlockingDeque
88+
89+
LinkedBlockingDeque是基于链表数据结构的有界阻塞双端队列,如果在创建对象时为指定大小时,其默认大小为Integer.MAX_VALUE。与LinkedBlockingQueue相比,主要的不同点在于,LinkedBlockingDeque具有双端队列的特性。LinkedBlockingDeque基本操作如下图所示(来源于java文档)
90+
91+
![LinkedBlockingDeque的基本操作.png](http://upload-images.jianshu.io/upload_images/2615789-d51d940d30786e32.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/600)
92+
93+
94+
95+
如上图所示,LinkedBlockingDeque的基本操作可以分为四种类型:1.特殊情况,抛出异常;2.特殊情况,返回特殊值如null或者false;3.当线程不满足操作条件时,线程会被阻塞直至条件满足;4. 操作具有超时特性。
96+
97+
另外,LinkedBlockingDeque实现了BlockingDueue接口而LinkedBlockingQueue实现的是BlockingQueue,这两个接口的主要区别如下图所示(来源于java文档):
98+
99+
100+
![BlockingQueue和BlockingDeque的区别.png](http://upload-images.jianshu.io/upload_images/2615789-7316a2543b99caa2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/600)
101+
102+
从上图可以看出,两个接口的功能是可以等价使用的,比如BlockingQueue的add方法和BlockingDeque的addLast方法的功能是一样的。
103+
104+
> 7.DelayQueue
105+
106+
DelayQueue是一个存放实现Delayed接口的数据的无界阻塞队列,只有当数据对象的延时时间达到时才能插入到队列进行存储。如果当前所有的数据都还没有达到创建时所指定的延时期,则队列没有队头,并且线程通过poll等方法获取数据元素则返回null。所谓数据延时期满时,则是通过Delayed接口的`getDelay(TimeUnit.NANOSECONDS)`来进行判定,如果该方法返回的是小于等于0则说明该数据元素的延时期已满。
107+
108+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
2+
# 1. ArrayBlockingQueue简介 #
3+
4+
在多线程编程过程中,为了业务解耦和架构设计,经常会使用并发容器用于存储多线程间的共享数据,这样不仅可以保证线程安全,还可以简化各个线程操作。例如在“生产者-消费者”问题中,会使用阻塞队列(BlockingQueue)作为数据容器,关于BlockingQueue可以[看这篇文章](https://juejin.im/post/5aeebd02518825672f19c546)。为了加深对阻塞队列的理解,唯一的方式是对其实验原理进行理解,这篇文章就主要来看看ArrayBlockingQueue和LinkedBlockingQueue的实现原理。
5+
6+
# 2. ArrayBlockingQueue实现原理 #
7+
8+
阻塞队列最核心的功能是,能够可阻塞式的插入和删除队列元素。当前队列为空时,会阻塞消费数据的线程,直至队列非空时,通知被阻塞的线程;当队列满时,会阻塞插入数据的线程,直至队列未满时,通知插入数据的线程(生产者线程)。那么,多线程中消息通知机制最常用的是lock的condition机制,关于condition可以[看这篇文章的详细介绍](https://juejin.im/post/5aeea5e951882506a36c67f0)。那么ArrayBlockingQueue的实现是不是也会采用Condition的通知机制呢?下面来看看。
9+
10+
## 2.1 ArrayBlockingQueue的主要属性
11+
12+
ArrayBlockingQueue的主要属性如下:
13+
14+
/** The queued items */
15+
final Object[] items;
16+
17+
/** items index for next take, poll, peek or remove */
18+
int takeIndex;
19+
20+
/** items index for next put, offer, or add */
21+
int putIndex;
22+
23+
/** Number of elements in the queue */
24+
int count;
25+
26+
/*
27+
* Concurrency control uses the classic two-condition algorithm
28+
* found in any textbook.
29+
*/
30+
31+
/** Main lock guarding all access */
32+
final ReentrantLock lock;
33+
34+
/** Condition for waiting takes */
35+
private final Condition notEmpty;
36+
37+
/** Condition for waiting puts */
38+
private final Condition notFull;
39+
40+
从源码中可以看出ArrayBlockingQueue内部是采用数组进行数据存储的(`属性items`),为了保证线程安全,采用的是`ReentrantLock lock`,为了保证可阻塞式的插入删除数据利用的是Condition,当获取数据的消费者线程被阻塞时会将该线程放置到notEmpty等待队列中,当插入数据的生产者线程被阻塞时,会将该线程放置到notFull等待队列中。而notEmpty和notFull等中要属性在构造方法中进行创建:
41+
42+
public ArrayBlockingQueue(int capacity, boolean fair) {
43+
if (capacity <= 0)
44+
throw new IllegalArgumentException();
45+
this.items = new Object[capacity];
46+
lock = new ReentrantLock(fair);
47+
notEmpty = lock.newCondition();
48+
notFull = lock.newCondition();
49+
}
50+
51+
接下来,主要看看可阻塞式的put和take方法是怎样实现的。
52+
53+
## 2.2 put方法详解
54+
55+
` put(E e)`方法源码如下:
56+
57+
public void put(E e) throws InterruptedException {
58+
checkNotNull(e);
59+
final ReentrantLock lock = this.lock;
60+
lock.lockInterruptibly();
61+
try {
62+
//如果当前队列已满,将线程移入到notFull等待队列中
63+
while (count == items.length)
64+
notFull.await();
65+
//满足插入数据的要求,直接进行入队操作
66+
enqueue(e);
67+
} finally {
68+
lock.unlock();
69+
}
70+
}
71+
72+
73+
该方法的逻辑很简单,当队列已满时(`count == items.length`)将线程移入到notFull等待队列中,如果当前满足插入数据的条件,就可以直接调用` enqueue(e)`插入数据元素。enqueue方法源码为:
74+
75+
private void enqueue(E x) {
76+
// assert lock.getHoldCount() == 1;
77+
// assert items[putIndex] == null;
78+
final Object[] items = this.items;
79+
//插入数据
80+
items[putIndex] = x;
81+
if (++putIndex == items.length)
82+
putIndex = 0;
83+
count++;
84+
//通知消费者线程,当前队列中有数据可供消费
85+
notEmpty.signal();
86+
}
87+
88+
enqueue方法的逻辑同样也很简单,先完成插入数据,即往数组中添加数据(`items[putIndex] = x`),然后通知被阻塞的消费者线程,当前队列中有数据可供消费(`notEmpty.signal()`)。
89+
90+
## 2.3 take方法详解
91+
92+
take方法源码如下:
93+
94+
95+
public E take() throws InterruptedException {
96+
final ReentrantLock lock = this.lock;
97+
lock.lockInterruptibly();
98+
try {
99+
//如果队列为空,没有数据,将消费者线程移入等待队列中
100+
while (count == 0)
101+
notEmpty.await();
102+
//获取数据
103+
return dequeue();
104+
} finally {
105+
lock.unlock();
106+
}
107+
}
108+
109+
take方法也主要做了两步:1. 如果当前队列为空的话,则将获取数据的消费者线程移入到等待队列中;2. 若队列不为空则获取数据,即完成出队操作`dequeue`。dequeue方法源码为:
110+
111+
private E dequeue() {
112+
// assert lock.getHoldCount() == 1;
113+
// assert items[takeIndex] != null;
114+
final Object[] items = this.items;
115+
@SuppressWarnings("unchecked")
116+
//获取数据
117+
E x = (E) items[takeIndex];
118+
items[takeIndex] = null;
119+
if (++takeIndex == items.length)
120+
takeIndex = 0;
121+
count--;
122+
if (itrs != null)
123+
itrs.elementDequeued();
124+
//通知被阻塞的生产者线程
125+
notFull.signal();
126+
return x;
127+
}
128+
129+
dequeue方法也主要做了两件事情:1. 获取队列中的数据,即获取数组中的数据元素(`(E) items[takeIndex]`);2. 通知notFull等待队列中的线程,使其由等待队列移入到同步队列中,使其能够有机会获得lock,并执行完成功退出。
130+
131+
从以上分析,可以看出put和take方法主要是通过condition的通知机制来完成可阻塞式的插入数据和获取数据。在理解ArrayBlockingQueue后再去理解LinkedBlockingQueue就很容易了。
132+
133+
134+
# 3. LinkedBlockingQueue实现原理 #
135+
LinkedBlockingQueue是用链表实现的有界阻塞队列,当构造对象时为指定队列大小时,队列默认大小为`Integer.MAX_VALUE`。从它的构造方法可以看出:
136+
137+
public LinkedBlockingQueue() {
138+
this(Integer.MAX_VALUE);
139+
}
140+
141+
142+
# 3.1 LinkedBlockingQueue的主要属性 #
143+
144+
145+
LinkedBlockingQueue的主要属性有:
146+
147+
/** Current number of elements */
148+
private final AtomicInteger count = new AtomicInteger();
149+
150+
/**
151+
* Head of linked list.
152+
* Invariant: head.item == null
153+
*/
154+
transient Node<E> head;
155+
156+
/**
157+
* Tail of linked list.
158+
* Invariant: last.next == null
159+
*/
160+
private transient Node<E> last;
161+
162+
/** Lock held by take, poll, etc */
163+
private final ReentrantLock takeLock = new ReentrantLock();
164+
165+
/** Wait queue for waiting takes */
166+
private final Condition notEmpty = takeLock.newCondition();
167+
168+
/** Lock held by put, offer, etc */
169+
private final ReentrantLock putLock = new ReentrantLock();
170+
171+
/** Wait queue for waiting puts */
172+
private final Condition notFull = putLock.newCondition();
173+
174+
可以看出与ArrayBlockingQueue主要的区别是,LinkedBlockingQueue在插入数据和删除数据时分别是由两个不同的lock(`takeLock``putLock`)来控制线程安全的,因此,也由这两个lock生成了两个对应的condition(`notEmpty``notFull`)来实现可阻塞的插入和删除数据。并且,采用了链表的数据结构来实现队列,Node结点的定义为:
175+
176+
static class Node<E> {
177+
E item;
178+
179+
/**
180+
* One of:
181+
* - the real successor Node
182+
* - this Node, meaning the successor is head.next
183+
* - null, meaning there is no successor (this is the last node)
184+
*/
185+
Node<E> next;
186+
187+
Node(E x) { item = x; }
188+
}
189+
190+
接下来,我们也同样来看看put方法和take方法的实现。
191+
192+
## 3.2 put方法详解 ##
193+
194+
put方法源码为:
195+
196+
public void put(E e) throws InterruptedException {
197+
if (e == null) throw new NullPointerException();
198+
// Note: convention in all put/take/etc is to preset local var
199+
// holding count negative to indicate failure unless set.
200+
int c = -1;
201+
Node<E> node = new Node<E>(e);
202+
final ReentrantLock putLock = this.putLock;
203+
final AtomicInteger count = this.count;
204+
putLock.lockInterruptibly();
205+
try {
206+
/*
207+
* Note that count is used in wait guard even though it is
208+
* not protected by lock. This works because count can
209+
* only decrease at this point (all other puts are shut
210+
* out by lock), and we (or some other waiting put) are
211+
* signalled if it ever changes from capacity. Similarly
212+
* for all other uses of count in other wait guards.
213+
*/
214+
//如果队列已满,则阻塞当前线程,将其移入等待队列
215+
while (count.get() == capacity) {
216+
notFull.await();
217+
}
218+
//入队操作,插入数据
219+
enqueue(node);
220+
c = count.getAndIncrement();
221+
//若队列满足插入数据的条件,则通知被阻塞的生产者线程
222+
if (c + 1 < capacity)
223+
notFull.signal();
224+
} finally {
225+
putLock.unlock();
226+
}
227+
if (c == 0)
228+
signalNotEmpty();
229+
}
230+
231+
put方法的逻辑也同样很容易理解,可见注释。基本上和ArrayBlockingQueue的put方法一样。take方法的源码如下:
232+
233+
public E take() throws InterruptedException {
234+
E x;
235+
int c = -1;
236+
final AtomicInteger count = this.count;
237+
final ReentrantLock takeLock = this.takeLock;
238+
takeLock.lockInterruptibly();
239+
try {
240+
//当前队列为空,则阻塞当前线程,将其移入到等待队列中,直至满足条件
241+
while (count.get() == 0) {
242+
notEmpty.await();
243+
}
244+
//移除队头元素,获取数据
245+
x = dequeue();
246+
c = count.getAndDecrement();
247+
//如果当前满足移除元素的条件,则通知被阻塞的消费者线程
248+
if (c > 1)
249+
notEmpty.signal();
250+
} finally {
251+
takeLock.unlock();
252+
}
253+
if (c == capacity)
254+
signalNotFull();
255+
return x;
256+
}
257+
258+
take方法的主要逻辑请见于注释,也很容易理解。
259+
260+
# 4. ArrayBlockingQueue与LinkedBlockingQueue的比较 #
261+
262+
**相同点**:ArrayBlockingQueue和LinkedBlockingQueue都是通过condition通知机制来实现可阻塞式插入和删除元素,并满足线程安全的特性;
263+
264+
**不同点**:1. ArrayBlockingQueue底层是采用的数组进行实现,而LinkedBlockingQueue则是采用链表数据结构;
265+
2. ArrayBlockingQueue插入和删除数据,只采用了一个lock,而LinkedBlockingQueue则是在插入和删除分别采用了`putLock``takeLock`,这样可以降低线程由于线程无法获取到lock而进入WAITING状态的可能性,从而提高了线程并发执行的效率。

0 commit comments

Comments
 (0)