Data structure | Linked Blocking Queue
by Botao Xiao
- 一个由链接节点支持的可选有界队列。
- 内部维护了一个Node类,是一个单向链表。第一个元素为空(dummy node)。
LinkedBlockingQueue的容量是没有上限的(说的不准确,在不指定时容量为Integer.MAX_VALUE,不要然的话在put时怎么会受阻呢),但是也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。
static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } }
插入元素
- put() LinkedBlockingQueue的阻塞插入方法,如果队列已满,则阻塞并等待。
```Java
/**
- Inserts the specified element at the tail of this queue, waiting if
- necessary for space to become available. *
- @throws InterruptedException {@inheritDoc}
- @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node
node = new Node (e); final ReentrantLock putLock = this.putLock; //写锁 final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); //如果队列已满,阻塞。 } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } ```
- offer()实现Queue接口的方法,如果当前队列已满,直接返回false。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; //此处的count为AtomicInteger,维护了原子性 if (count.get() == capacity) //如果队列已满,直接返回 return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
- enqueue() 添加方法的核心
private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; //在链表的结尾,添加要插入的结点。 }
取出元素
- take() 和put()相对应的方法,从队列中取出元素,如果队列为空则会阻塞。
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { //判断当前队列为空,condition方法开始阻塞。 notEmpty.await(); } x = dequeue(); //调用统一的dequeue方法从队列中读取元素。 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
- poll() 从队列中获取元素,如果队列中为空则返回null。
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) //如果队列为空直接返回null, point 1 return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { //再次判断,如果在point 1 到当前点之间队列已经变成空,直接跳过读取阶段,返回空。 x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
- dequeue() 统一的从队列头读取元素的方法
Avoid create a new node, just set value to null. ```Java /**
- Removes a node from head of queue. *
- @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node
h = head; Node first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } ```
Test
- 向队列中放置100个元素不断读取,发现读取的进程最终阻塞
public class LinkedBlockingQueueProducer implements Runnable { private LinkedBlockingQueue<Integer> q; private AtomicInteger ai = new AtomicInteger(); public LinkedBlockingQueueProducer(LinkedBlockingQueue<Integer> q) { super(); this.q = q; } @Override public void run() { for(int i = 0; i < 100; i++){ try { q.put(ai.get()); System.out.println("Producer: put " + ai.getAndIncrement() + " into queue..."); Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class LinkedBlockingQueueConsumer implements Runnable {
private LinkedBlockingQueue<Integer> q;
public LinkedBlockingQueueConsumer(LinkedBlockingQueue<Integer> q) {
super();
this.q = q;
}
@Override
public void run() {
try {
while(true){
System.out.println("Consumer: take " + q.take() + " from queue...");
Thread.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
new Thread(new LinkedBlockingQueueConsumer(queue)).start();
new Thread(new LinkedBlockingQueueProducer(queue)).start();
Thread.currentThread().join();
System.out.println("Finish...");
}
}
- 结果
Producer: put 98 into queue… Consumer: take 98 from queue… Producer: put 99 into queue… Consumer: take 99 from queue… 读取进程最终阻塞。
Subscribe via RSS