博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Java并发编程实战】——PriorityBlockingQueue
阅读量:4181 次
发布时间:2019-05-26

本文共 10969 字,大约阅读时间需要 36 分钟。

PriorityBlockingQueue 是一个无界优先级阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作。它是 PriorityQueue 的线程安全版本。

PriorityQueue 是一个基于优先级堆的无界优先级队列。构造队列时如不指定 Comparator,则模式按照其自然顺序进行排序。此实现为入队和出队方法(offer、poll、remove() 和 add)提供 O(log(n)) 时间;为 remove(Object) 和 contains(Object) 方法提供线性时间;为获取方法(peek、element 和 size)提供固定时间。

先看下类属性

public class PriorityBlockingQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable {
//内部数组默认长度 private static final int DEFAULT_INITIAL_CAPACITY = 11; //内部数组最大长度 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //平衡二叉堆、完全二叉树,队列存储元素的数据结构 private transient Object[] queue; //优先级队列中实际存储的元素 private transient int size; //排序方式,默认自然排序 private transient Comparator
comparator; //为所有的公共方法加锁,保证线程安全 private final ReentrantLock lock; //队列为空,阻塞读线程到等待队列 private final Condition notEmpty; //扩容的时候使用此自旋锁,CAS成功代表获取了锁 private transient volatile int allocationSpinLock;}

如何构造一个 PriorityBlockingQueue

PriorityBlockingQueue
blockingQueue = new PriorityBlockingQueue<>();public PriorityBlockingQueue() {
//默认初始化容量为11,Comparable 默认为自然排序 this(DEFAULT_INITIAL_CAPACITY, null);}/** * Creates a {@code PriorityBlockingQueue} with the specified initial * capacity that orders its elements according to the specified * comparator. * * @param initialCapacity the initial capacity for this priority queue * @param comparator the comparator that will be used to order this * priority queue. If {@code null}, the {@linkplain Comparable * natural ordering} of the elements will be used. * @throws IllegalArgumentException if {@code initialCapacity} is less * than 1 */public PriorityBlockingQueue(int initialCapacity, Comparator
comparator) {
//初始化容量不能小于1 if (initialCapacity < 1) throw new IllegalArgumentException(); //内部持有一个重入锁 this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; //队列的存储方式为数组,数组的特点是顺序存储,插入需要移动元素 this.queue = new Object[initialCapacity];}

也可通过指定一个集合来创建

//如果集合类型为 SortedSet 或者 PriorityBlockingQueue 那么排序方式设置成和他们一样public PriorityBlockingQueue(Collection
c) {
this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet
) {
SortedSet
ss = (SortedSet
) c; //设置成 SortedSet 的排序方式 this.comparator = (Comparator
) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue
) {
PriorityBlockingQueue
pq = (PriorityBlockingQueue
) c; //设置成 PriorityBlockingQueue 的排序方式 this.comparator = (Comparator
) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); //不能存储空的元素 if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; //未指定排序方式,需要调整堆 if (heapify) //之后分析怎么调整堆 heapify();}

向队列中插入一个元素

/** * Inserts the specified element into this priority queue. * As the queue is unbounded, this method will never return {@code false}. * * @param e the element to add * @return {@code true} (as specified by {@link Queue#offer}) * @throws ClassCastException if the specified element cannot be compared *         with elements currently in the priority queue according to the *         priority queue's ordering * @throws NullPointerException if the specified element is null */public boolean offer(E e) {
//插入元素为空,抛异常 if (e == null) throw new NullPointerException(); //将全局变量重赋给局部变量,从栈读变量比从堆读变量会更 cache-friendly final ReentrantLock lock = this.lock; //获取锁 lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) //当前容量达到数组最大值,扩容数组 tryGrow(array, cap); //保证数组有空闲位置,从队列尾部插入 try {
Comparator
cmp = comparator; if (cmp == null) //自然排序 siftUpComparable(n, e, array); else //按照指定的排序方式排序 siftUpUsingComparator(n, e, array, cmp); //如队列完成,总数加一 size = n + 1; //因有元素进来,唤醒阻塞在等待队列的线程 notEmpty.signal(); } finally {
//最后释放锁 lock.unlock(); } //因为是无界队列,插入肯定能成功 return true;}/** * Inserts item x at position k, maintaining heap invariant by * promoting x up the tree until it is greater than or equal to * its parent, or is the root. * * To simplify and speed up coercions and comparisons. the * Comparable and Comparator versions are separated into different * methods that are otherwise identical. (Similarly for siftDown.) * These methods are static, with heap state as arguments, to * simplify use in light of possible comparator exceptions. * * @param k the position to fill * @param x the item to insert * @param array the heap array *///将元素x插入到堆 array 中,插入开始位置为k,保证堆为小顶堆private static
void siftUpComparable(int k, T x, Object[] array) {
//以元素的内部排序作比较 Comparable
key = (Comparable
) x; //k==0,说明是首节点,直接设置值 while (k > 0) {
//按照完全二叉树的特点,parent 为在队列中的下标 int parent = (k - 1) >>> 1; //找到父节点 Object e = array[parent]; //和父节点比较,大于父节点代表位置k满足条件 if (key.compareTo((T) e) >= 0) break; //元素x小于父节点需要将父节点下移 array[k] = e; //x待插入的位置向上调整,继续循环,直到找到合适的位置 k = parent; } array[k] = key;}//和上面的一样,只是调用了不同的排序策略,这里是指定的排序private static
void siftUpUsingComparator(int k, T x, Object[] array, Comparator
cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x;}

当插入的元素个数比数组的容量大时,需要调整数组的容量

/** * Tries to grow array to accommodate at least one more element * (but normally expand by about 50%), giving up (allowing retry) * on contention (which we expect to be rare). Call only while * holding lock. * * @param array the heap array * @param oldCap the length of the array */private void tryGrow(Object[] array, int oldCap) {
//先释放锁,其他读线程能够访问队列 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; if (allocationSpinLock == 0 && //将 allocationSpinLock 状态从0修改成1代表成功获取到锁 UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
try {
//设置扩容后的容量 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); //超过最大值,设置成 MAX_ARRAY_SIZE if (newCap - MAX_ARRAY_SIZE > 0) {
// possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } //需要扩容才初始化新的数组 if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally {
//释放自旋锁 allocationSpinLock = 0; } } //数组为空代表上面获取自旋锁失败 if (newArray == null) // back off if another thread is allocating Thread.yield(); lock.lock(); //若线程获取锁失败后,还是看不到扩容之后的数组,退出方法 //注意 tryGrow() 在 offer() 方法中是循环调用的 if (newArray != null && queue == array) {
//获取到自旋锁的线程会进入到这里进行扩容 queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); }}

从队列中取出元素,先看看非阻塞的 pool() 方法

Integer poll = blockingQueue.poll();public E poll() {
//取也需要获取锁 final ReentrantLock lock = this.lock; lock.lock(); try {
//出队列 return dequeue(); } finally {
lock.unlock(); }} /** * Mechanics for poll(). Call only while holding lock. */private E dequeue() {
int n = size - 1; if (n < 0) //队列中没有元素,返回空 return null; else {
Object[] array = queue; //每次移出队列的头部元素 E result = (E) array[0]; //获取队列尾部的元素 E x = (E) array[n]; array[n] = null; Comparator
cmp = comparator; if (cmp == null) //头出队列,将尾节点取出放到堆合适的位置,调整最小堆 siftDownComparable(0, x, array, n); else //和 siftDownComparable 逻辑类似 siftDownUsingComparator(0, x, array, n, cmp); //重设总大小 size = n; //返回队列的头节点 return result; }}/** * Inserts item x at position k, maintaining heap invariant by * demoting x down the tree repeatedly until it is less than or * equal to its children or is a leaf. * * @param k the position to fill 需要填充的位置 * @param x the item to insert 待填入的元素 * @param array the heap array 堆数组 * @param n heap size 目标堆大小 */private static
void siftDownComparable(int k, T x, Object[] array, int n) {
if (n > 0) {
Comparable
key = (Comparable
)x; //定义一个边界,half一定是分支节点 int half = n >>> 1; // loop while a non-leaf //k会指向 while (k < half) {
//能进入到这里说明分支节点k不满足条件,继续往下寻找 //获取分支节点k的左子节点在数组中的下标位置 //k
<< 1) + 1; // assume left child is least //c代表较小的子节点 Object c = array[child]; //获取分支节点k的又子节点在数组中的下标位置 int right = child + 1; //确保 right 不会越界 if (right < n && ((Comparable
) c).compareTo((T) array[right]) > 0) //如果左子节点大于右子节点,c指向较小的那个节点 c = array[child = right]; //如果目标小于等于左右子节点,则退出循环,将目标放在位置k中 if (key.compareTo((T) c) <= 0) break; //目标还是大,将较小的节点放到位置k中 array[k] = c; //分支节点k向下移动,指向左右子节点较小的那个节点位置 k = child; } //找到目标的位置K,赋值了事 array[k] = key; }}

再来看看阻塞的 take() 方法

Integer take = blockingQueue.take();public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; //说明 take 方法响应中断 lock.lockInterruptibly(); E result; try {
//dequeue 方法在上面分析了,队列没有元素返回空 while ( (result = dequeue()) == null) //如果没获取到元素,则此线程阻塞到等待队列 //有其他线程将元素放入队列后,会调用 lock.unlock() 唤醒等待队列中的节点 notEmpty.await(); //唤醒后,此线程一定是获取了锁,继续循环 //因可能假唤醒,固使用 while } finally {
lock.unlock(); } return result;}

本文涉及知识点:

  • 并发编程基础 CAS
  • 数据结构 完全二叉树

转载地址:http://kqrai.baihongyu.com/

你可能感兴趣的文章
安卓微信 8.0 内测版来啦!
查看>>
我劝你不要再留QQ邮箱了
查看>>
真香!用 4K 高清显示器写代码!(包邮送一台)
查看>>
神器!各行业必备!低调使用
查看>>
B 站,牛逼!
查看>>
微信状态视频、图片素材来啦!
查看>>
再见了!锤子!!!
查看>>
LeetCode 全站第一,牛逼!
查看>>
为什么全网都在劝你学Java、Python,而不是C++?
查看>>
卧槽!阿里巴巴《算法笔记》开源了,完整版PDF开放下载!
查看>>
百度的骚操作。。。
查看>>
蔚来,牛X!
查看>>
微信悄悄新出了个野心很大的App
查看>>
微信红包封面制作小程序开放,人人都可免费制作了!!!
查看>>
13000亿!目瞪口呆!
查看>>
腾讯,搞了一个大事!
查看>>
入职腾讯第九年,我辞职了
查看>>
17 张程序员壁纸(使用频率很高)
查看>>
送一台全新手机,手慢无~
查看>>
全球顶级的14位程序员!膜拜!
查看>>