最近一周作息发生了一些微妙的变化,晚上 11 点睡觉清晨 7 点自然醒,身体状况明显好转的同时,也明白了什么叫做一天之际在于晨:起床后去门口吃麦当劳早餐的同时,看一会书,感觉一天多活了一个多小时。
这篇文章简单记录今早 java BlockingQueue 学习小记~
前言
java.util.concurrent.BlockingQueue
A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
参考官方注释,BlockingQueue
是 java.util.concurrent
中的一个接口。顾名思义:Queue 表示先进先出的队列,多个线程同时放入对象而其他线程获取对象(解耦输入与输出),Blocking 代表当队列满了或者为空时,尝试放入或获取元素的线程会进入阻塞状态。
举个例子 有点抽象,举个例子:当队列为空时,获取元素
1 2 3 4 5 ArrayBlockingQueue#take └── AQS#await(挂起线程进入 WATTING 状态,直到被 singal 通知或者线程中断) └── LockSupport#park └── sun.misc.Unsafe#park(native 方法) └── 调用操作系统具体实现
park 底层实现? native 方法可以理解为另一个层面的接口,供非 java 代码实现底层逻辑。
首先根据 sun.misc.Unsafe#park
搜索源代码 :
我们发现 Unsafe#park
实际调用当前线程 Parker
对象的 park
方法
继续寻找 Parker::park
方法..
以 linux 实现为例,当超时时间为 0 时,Parker::park 方法最终调用标准库 pthread_cond_wait
(# include <pthread.h>
),挂起线程,等待被唤醒。
接口定义 官方文档解释的很清楚,以获取队列第一个元素为例(如果队列为空):
remove()
: 立即抛出异常 java.util.NoSuchElementException
poll()
: 立即返回 null
take()
: waiting if necessary
pull(timeout, unit)
: waiting + timeout
p.s. Special value 特殊值指的 false/null 等..
-
为了更好理解,参考博主绘制的 uml 图,BlockingQueue
接口在 Queue
的基础之上,扩展了 take
&put
两个阻塞方法:
接口实现 一图胜千言,简单绘制常见几种官方队列数据结构(下面将根据源码一一说明):
1. LinkedBlockingQueue 顾名思义底层是由链表实现,特性为先入先出,同时没有长度限制
1.1 数据结构 三个部分组成:链表 + 锁 + 迭代器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 static class Node <E > { E item; Node<E> next; Node(E x) { item = x; } } transient Node<E> head;private transient Node<E> last;public LinkedBlockingQueue (int capacity) { if (capacity <= 0 ) throw new IllegalArgumentException(); this .capacity = capacity; last = head = new Node<E>(null ); } private final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();private class Itr implements Iterator <E > {}
1.2 新增操作 put 操作触发链表 enqueue:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void put (E e) throws InterruptedException { int c = -1 ; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); } private void enqueue (Node<E> node) { last = last.next = node; }
1.3 获取并删除 1 2 3 4 5 6 7 8 9 10 11 12 13 private E dequeue () { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null ; return x; }
2 ArrayBlockingQueue 一句话:有界阻塞数组(队列满后,继续放入阻塞),容量不变化。
2.1 数据结构 底层数据结构,仅简单一个数组(参数1控制大小)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public ArrayBlockingQueue (int capacity, boolean fair) { if (capacity <= 0 ) throw new IllegalArgumentException(); this .items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
2.2 获取元素 新增与获取操作比较类似,以 take 获取元素为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private void enqueue (E x) { final Object[] items = this .items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0 ; count++; notEmpty.signal(); }
3. SynchronousQueue 一句话:放入数据队列的行为 是阻塞的,只有等消费后才会同步返回结果。
如果有困惑可以运行下面的 demo 代码试试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.SynchronousQueue;public class SynchronousQueueTest { private final static SimpleDateFormat s = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" ); public static void main (String[] args) { SynchronousQueue<Integer> queue = new SynchronousQueue<>(); Thread t1 = new Thread(() -> { try { Thread.sleep(3000 ); } catch (InterruptedException e) { e.printStackTrace(); } log("t1 take.." ); Integer take1 = null ; try { take1 = queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } log("t2 toke: " + take1); }); Thread t2 = new Thread(() -> { log("t2 put.." ); try { queue.put(1 ); } catch (InterruptedException e) { e.printStackTrace(); } log("t2 put done" ); }); t1.start(); t2.start(); } private static void log (Object content) { System.out.println(s.format(new Date()) + " " + content.toString()); } }
4 DelayQueue 一句话:延迟执行
4.1 定义延迟执行任务: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;class DelayedEvent implements Delayed { private long id; private long executeTime; public DelayedEvent (long id, long delayTime, TimeUnit unit) { super (); this .id = id; this .executeTime = System.currentTimeMillis() + unit.toMillis(delayTime); } public long getId () { return id; } @Override public int compareTo (Delayed that) { long result = this .getDelay(TimeUnit.NANOSECONDS) - that.getDelay(TimeUnit.NANOSECONDS); if (result < 0 ) { return -1 ; } else if (result > 0 ) { return 1 ; } return 0 ; } @Override public long getDelay (TimeUnit unit) { return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public String toString () { return "DelayedEvent [id=" + getId() + "]" ; } }
4.2 执行任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.DelayQueue;import java.util.concurrent.TimeUnit;public class DelayedQueueEvent { private final static SimpleDateFormat s = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" ); public static void main (String[] args) throws InterruptedException { DelayQueue<DelayedEvent> queue = new DelayQueue<>(); queue.put(new DelayedEvent(1 , 1 , TimeUnit.SECONDS)); queue.put(new DelayedEvent(2 , 3 , TimeUnit.SECONDS)); s.format(new Date()); log("taking..." ); DelayedEvent take1 = queue.take(); log(take1); DelayedEvent take2 = queue.take(); log(take2); } private static void log (Object content) { System.out.println(s.format(new Date()) + " " + content.toString()); } }
总结 以上 BlockingQueue 针对不同场景的复杂实现,背后都是灵活使用继承与组合后,基于非常简单的数据结构。希望自己有一天也能写出优雅的面向对象代码。
参考
https://gorden5566.com/post/1027.html
https://kkewwei.github.io/elasticsearch_learning/2018/11/10/LockSupport%E6%BA%90%E7%A0%81%E8%A7%A3%E8%AF%BB
https://zeral.cn/java/unsafe.park-vs-object.wait/