Data structure | Array Blocking Queue
by Botao Xiao
- ArrayBlockingQueue : 一个由数组支持的有界队列。
- 如果到达了上界,将无法添加新的元素进入。
- FIFO
ArrayBlockingQueue在构造时需要指定容量, 并可以选择是否需要公平性,如果公平参数被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来 达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队 列,此队列按 FIFO(先进先出)原则对元素进行排序。
ArrayBlockingQueue的操作
- 阻塞方法: put() <—> take()
- 非阻塞方法: offer() <—> poll()
添加元素
- put() put是ArrayBlockingQueue的方法(不是从Queue接口中继承来),在该方法中获取全局锁,如果队列满,将会阻塞直到有空间可以插入元素。条件(队列空,满)是通过Condition接口实现的。
```Java
/**
- Inserts the specified element at the tail of this queue, waiting
- for space to become available if the queue is full. *
- @throws InterruptedException {@inheritDoc}
- @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; //获取全局的锁 lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); //如果当前的队列满了,则一直阻塞 enqueue(e); //调用全局的私有入队列方法 } finally { lock.unlock(); } } ```
- offer() offer是Queue接口要求实现的方法,如果队列仍有位置允许插入,插入元素,如果队列已满,直接返回false,不会阻塞。
public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); //在写入的过程中获取锁 try { if (count == items.length) //如果queue中的元素已经到达了上限,直接返回false return false; else { enqueue(e); //调用私有的enqueue方法 return true; } } finally { lock.unlock(); //释放锁 } }
- add() add是Queue接口要求实现的方法,内部调用了offer(),如果队列仍有位置允许插入,插入元素,如果队列已满,抛出异常,不会阻塞。
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
- enqueue() enqueue是插入队列的核心方法,维护了一个读指针,一个写指针。
```Java
/**
- Inserts element at current put position, advances, and signals.
- Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); //取消notEmpty的await(put方法).
}
```
读取
- take() take和put对应,是ArrayBlockingQueue私有的阻塞方法,在读取的过程中,如果发现队列为空,则会阻塞直到有元素可以读取。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
- poll() 和offer()对应,如果没有元素,不会阻塞,返回false
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); //判断当前队列有没有元素。有的话调用deqeueu方法。 } finally { lock.unlock(); } }
- dequeue()
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) //循环读取数组 takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
- peek() 并不会删除队列的第一个元素,单纯的读取值。调用的是itemAt方法
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } }
Test
- Producer
Producer continuously put data into queue.
public class ArrayBlockingQueueProducer implements Runnable { private ArrayBlockingQueue<Integer> queue; public ArrayBlockingQueueProducer(ArrayBlockingQueue<Integer> queue) { super(); this.queue = queue; } private volatile AtomicInteger ai = new AtomicInteger(0); @Override public void run() { while(true){ try { queue.put(ai.getAndIncrement()); System.out.println("Producer: put " + ai.get() + " into queue..."); Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } }
- Consumer
Consumer get take data from queue 100 times
public class ArrayBlockingQueueConsumer implements Runnable { private ArrayBlockingQueue<Integer> queue; public ArrayBlockingQueueConsumer(ArrayBlockingQueue<Integer> queue) { super(); this.queue = queue; } @Override public void run() { try { for(int i = 0; i < 100; i++){ Thread.currentThread().join(10); System.out.println("Consumer: get " + queue.take() + " from queue..."); Thread.sleep(10); } } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); new Thread(new ArrayBlockingQueueConsumer(queue)).start(); new Thread(new ArrayBlockingQueueProducer(queue)).start(); Thread.currentThread().join(); System.out.println("Finish..."); } }
- Result
… Producer: put 108 into queue… Consumer: get 98 from queue… Producer: put 109 into queue… Consumer: get 99 from queue… Producer: put 110 into queue…
We can find that Producer is blocked.
Subscribe via RSS