by Botao Xiao
- Queue家族的继承关系:
Queue interface
boolean add(E e);
Add and offer are used to add element into the queue.boolean offer(E e);
E remove();
Retrieve and remove the head of the queue.E poll();
E element();
Retrieves, but does not remove the head of this queue.E peek();
Blocking and non-blocking
- Concurrent queue有两种实现方法,阻塞和非阻塞。
- 阻塞队列是通过锁实现。
- 非阻塞队列通过AQS实现。
- ArrayBlockingQueue :一个由数组支持的有界队列。
- 如果到达了上界,将无法添加新的元素进入。
ArrayBlockingQueue在构造时需要指定容量, 并可以选择是否需要公平性,如果公平参数被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来 达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队 列,此队列按 FIFO(先进先出)原则对元素进行排序。
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); //在写入的过程中获取锁 try { if (count == items.length) return false; else { enqueue(e); //调用私有的enqueue方法 return true; } } finally { lock.unlock(); //释放锁 } }
```Java /**
- Inserts element at current put position, advances, and signals.
- Call only when holding lock.
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
notEmpty.signal(); //取消notEmpty的await.
```Java public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); //判断当前队列有没有元素。有的话调用deqeueu方法。 } finally { lock.unlock(); } }
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
- 一个由链接节点支持的可选有界队列。
- 内部维护了一个Node类
LinkedBlockingQueue的容量是没有上限的(说的不准确,在不指定时容量为Integer.MAX_VALUE,不要然的话在put时怎么会受阻呢),但是也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。 ```Java static class Node
{ E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } } ``` ```Java public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; //此处的count为AtomicInteger,维护了原子性 if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
private void enqueue(Node
node) { // assert putLock.isHeldByCurrentThread(); // assert == null; last = = node; //在链表的结尾,添加要插入的结点。 } ```Java public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
- 一个由优先级堆支持的无界优先级队列。
PriorityBlockingQueue是一个带优先级的 队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对 PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞 队列上put时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,往入该队列中的元 素要具有比较能力。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
