Just relax, take it easy..

0%

Java BlockingQueue 学习小记

最近一周作息发生了一些微妙的变化,晚上 11 点睡觉清晨 7 点自然醒,身体状况明显好转的同时,也明白了什么叫做一天之际在于晨:起床后去门口吃麦当劳早餐的同时,看一会书,感觉一天多活了一个多小时。

这篇文章简单记录今早 java BlockingQueue 学习小记~

前言

java.util.concurrent.BlockingQueue

A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

参考官方注释,BlockingQueuejava.util.concurrent 中的一个接口。顾名思义:Queue 表示先进先出的队列,多个线程同时放入对象而其他线程获取对象(解耦输入与输出),Blocking 代表当队列满了或者为空时,尝试放入或获取元素的线程会进入阻塞状态。

举个例子

有点抽象,举个例子:当队列为空时,获取元素

1
2
3
4
5
ArrayBlockingQueue#take
└── AQS#await(挂起线程进入 WATTING 状态,直到被 singal 通知或者线程中断)
└── LockSupport#park
└── sun.misc.Unsafe#park(native 方法)
└── 调用操作系统具体实现

park 底层实现?

native 方法可以理解为另一个层面的接口,供非 java 代码实现底层逻辑。

首先根据 sun.misc.Unsafe#park 搜索源代码

我们发现 Unsafe#park 实际调用当前线程 Parker 对象的 park 方法

继续寻找 Parker::park 方法..

以 linux 实现为例,当超时时间为 0 时,Parker::park 方法最终调用标准库 pthread_cond_wait# include <pthread.h>),挂起线程,等待被唤醒。

接口定义

官方文档解释的很清楚,以获取队列第一个元素为例(如果队列为空):

  • remove(): 立即抛出异常 java.util.NoSuchElementException
  • poll(): 立即返回 null
  • take(): waiting if necessary
  • pull(timeout, unit): waiting + timeout


p.s. Special value 特殊值指的 false/null 等..

-

为了更好理解,参考博主绘制的 uml 图,BlockingQueue 接口在 Queue 的基础之上,扩展了 take&put 两个阻塞方法:
blockingqueue

接口实现

一图胜千言,简单绘制常见几种官方队列数据结构(下面将根据源码一一说明):

java_queue_diff

1. LinkedBlockingQueue

顾名思义底层是由链表实现,特性为先入先出,同时没有长度限制

1.1 数据结构

三个部分组成:链表 + 锁 + 迭代器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 一、链表
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
//链表头 & 链表尾(注意 head 不是真实的元素,i.e. 虚拟节点)
transient Node<E> head;
private transient Node<E> last;

public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 参考这个赋值,初始化时,head 指向一个空节点
last = head = new Node<E>(null);
}

// 二、锁
// 两把锁:put/take,两种操作同时进行,互不影响。
//take 时的锁
private final ReentrantLock takeLock = new ReentrantLock();
// take 的条件队列,condition 可以简单理解为基于 ASQ 同步机制建立的条件队列
private final Condition notEmpty = takeLock.newCondition();

// 三、迭代器
private class Itr implements Iterator<E> {}

1.2 新增操作

put 操作触发链表 enqueue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// java.util.concurrent.LinkedBlockingQueue#put
public void put(E e) throws InterruptedException {
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 等待其他线程 take 成功后,被信号唤醒
while (count.get() == capacity) {
notFull.await();
}
// 真正执行出列(见下面的方法)
enqueue(node);
// 如果还没有满 -> 发送通知
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 不等于 -1 表示 put 成功 -> 唤醒待 take 的线程
if (c == 0)
signalNotEmpty();
}

private void enqueue(Node<E> node) {
// 很巧妙:下面这行代码从右往左执行
last = last.next = node;
}

1.3 获取并删除

1
2
3
4
5
6
7
8
9
10
11
12
13
// java.util.concurrent.LinkedBlockingQueue#dequeue
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h;
// 关键一步:
// 还记得上文 head 为虚拟节点,此时将 head 指向下一个节点
// 同时将 item 置为空(帮助垃圾回收?)
head = first;
E x = first.item;
first.item = null;
return x;
}

2 ArrayBlockingQueue

一句话:有界阻塞数组(队列满后,继续放入阻塞),容量不变化。

2.1 数据结构

底层数据结构,仅简单一个数组(参数1控制大小)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// 底层数据结构,仅简单一个数组(参数1控制大小)
this.items = new Object[capacity];
// 可重入锁:通过参数2控制是否公平
// - 公平:锁的获取符合绝对时间(保证先来先到)
// - 否则:降低一定线程切换,提升性能(随机获取)
lock = new ReentrantLock(fair);

// 利用 await & signal 两个条件作为红绿灯
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

2.2 获取元素

新增与获取操作比较类似,以 take 获取元素为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// java.util.concurrent.ArrayBlockingQueue#take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// ReadLock#lock 优先考虑获取锁,待获取锁成功后,才响应中断
// 而 lockInterruptibly 直接中断线程
lock.lockInterruptibly();
try {
while (count == 0)
// AQS 的 await 方法(阻塞当前线程直到被 singal 通知或者线程中断)
// `LockSupport.park()` -> `sun.misc.Unsafe#park` - native 方法
// 调用linux系统,标准的阻塞接口(^^原理???^^)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

// 阻塞后何时被唤醒? 当队列入列后:
private void enqueue(E x) {
final Object[] items = this.items;
// 不难理解,计算放入的 index 并赋值
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 发送队列不为空信号
notEmpty.signal();
}

3. SynchronousQueue

一句话:放入数据队列的行为是阻塞的,只有等消费后才会同步返回结果。

如果有困惑可以运行下面的 demo 代码试试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueTest {
private final static SimpleDateFormat s = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();

Thread t1 = new Thread(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("t1 take..");
Integer take1 = null;
try {
take1 = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
log("t2 toke: " + take1);
});

Thread t2 = new Thread(() -> {
log("t2 put..");
try {
queue.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("t2 put done");
});

t1.start();
t2.start();
// 2022-02-03 12:32:36 t2 put..
// 2022-02-03 12:32:39 t1 take..
// 2022-02-03 12:32:39 t2 toke: 1
// 2022-02-03 12:32:39 t2 put done
}

private static void log(Object content) {
System.out.println(s.format(new Date()) + " " + content.toString());
}
}

4 DelayQueue

一句话:延迟执行

4.1 定义延迟执行任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

class DelayedEvent implements Delayed {
private long id;
private long executeTime;

public DelayedEvent(long id, long delayTime, TimeUnit unit) {
super();
this.id = id;
this.executeTime = System.currentTimeMillis() + unit.toMillis(delayTime);
}

public long getId() {
return id;
}

@Override
public int compareTo(Delayed that) {
long result = this.getDelay(TimeUnit.NANOSECONDS) - that.getDelay(TimeUnit.NANOSECONDS);
if (result < 0) {
return -1;
} else if (result > 0) {
return 1;
}
return 0;
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

@Override
public String toString() {
return "DelayedEvent [id=" + getId() + "]";
}
}

4.2 执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;

public class DelayedQueueEvent {

private final static SimpleDateFormat s = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedEvent> queue = new DelayQueue<>();

queue.put(new DelayedEvent(1, 1, TimeUnit.SECONDS));
queue.put(new DelayedEvent(2, 3, TimeUnit.SECONDS));

s.format(new Date());

log("taking...");
DelayedEvent take1 = queue.take();
log(take1);
DelayedEvent take2 = queue.take();
log(take2);

// 2022-02-03 12:27:01 taking...
// 2022-02-03 12:27:02 DelayedEvent [id=1]
// 2022-02-03 12:27:04 DelayedEvent [id=2]
}

private static void log(Object content) {
System.out.println(s.format(new Date()) + " " + content.toString());
}
}

总结

以上 BlockingQueue 针对不同场景的复杂实现,背后都是灵活使用继承与组合后,基于非常简单的数据结构。希望自己有一天也能写出优雅的面向对象代码。

参考

  1. https://gorden5566.com/post/1027.html
  2. https://kkewwei.github.io/elasticsearch_learning/2018/11/10/LockSupport%E6%BA%90%E7%A0%81%E8%A7%A3%E8%AF%BB
  3. https://zeral.cn/java/unsafe.park-vs-object.wait/