• ArrayBlockingQueue : 一个由数组支持的有界队列。
  • 如果到达了上界,将无法添加新的元素进入。
  • FIFO

    ArrayBlockingQueue在构造时需要指定容量, 并可以选择是否需要公平性,如果公平参数被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来 达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队 列,此队列按 FIFO(先进先出)原则对元素进行排序。

ArrayBlockingQueue的操作

  • 阻塞方法: put() <—> take()
  • 非阻塞方法: offer() <—> poll()

添加元素

  • put() put是ArrayBlockingQueue的方法(不是从Queue接口中继承来),在该方法中获取全局锁,如果队列满,将会阻塞直到有空间可以插入元素。条件(队列空,满)是通过Condition接口实现的。 ```Java /**
    • Inserts the specified element at the tail of this queue, waiting
    • for space to become available if the queue is full. *
    • @throws InterruptedException {@inheritDoc}
    • @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; //获取全局的锁 lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); //如果当前的队列满了,则一直阻塞 enqueue(e); //调用全局的私有入队列方法 } finally { lock.unlock(); } } ```
  • offer() offer是Queue接口要求实现的方法,如果队列仍有位置允许插入,插入元素,如果队列已满,直接返回false,不会阻塞。
      public boolean offer(E e) {
          checkNotNull(e);
          final ReentrantLock lock = this.lock;
          lock.lock();		//在写入的过程中获取锁
          try {
              if (count == items.length)	//如果queue中的元素已经到达了上限,直接返回false
                  return false;
              else {
                  enqueue(e);	//调用私有的enqueue方法
                  return true;
              }
          } finally {
              lock.unlock();	//释放锁
          }
      }
    
  • add() add是Queue接口要求实现的方法,内部调用了offer(),如果队列仍有位置允许插入,插入元素,如果队列已满,抛出异常,不会阻塞。
      public boolean add(E e) {
          if (offer(e))
              return true;
          else
              throw new IllegalStateException("Queue full");
      }
    
  • enqueue() enqueue是插入队列的核心方法,维护了一个读指针,一个写指针。 ```Java /**
    • Inserts element at current put position, advances, and signals.
    • Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); //取消notEmpty的await(put方法). } ```

      读取

  • take() take和put对应,是ArrayBlockingQueue私有的阻塞方法,在读取的过程中,如果发现队列为空,则会阻塞直到有元素可以读取。
      public E take() throws InterruptedException {
          final ReentrantLock lock = this.lock;
          lock.lockInterruptibly();
          try {
              while (count == 0)
                  notEmpty.await();
              return dequeue();
          } finally {
              lock.unlock();
          }
      }
    
  • poll() 和offer()对应,如果没有元素,不会阻塞,返回false
      public E poll() {
          final ReentrantLock lock = this.lock;
          lock.lock();
          try {
              return (count == 0) ? null : dequeue();	//判断当前队列有没有元素。有的话调用deqeueu方法。
          } finally {
              lock.unlock();
          }
      }
    
  • dequeue()
      private E dequeue() {
          // assert lock.getHoldCount() == 1;
          // assert items[takeIndex] != null;
          final Object[] items = this.items;
          @SuppressWarnings("unchecked")
          E x = (E) items[takeIndex];
          items[takeIndex] = null;
          if (++takeIndex == items.length) //循环读取数组
              takeIndex = 0;
          count--;
          if (itrs != null)
              itrs.elementDequeued();
          notFull.signal();
          return x;
      }
    
  • peek() 并不会删除队列的第一个元素,单纯的读取值。调用的是itemAt方法
      public E peek() {
          final ReentrantLock lock = this.lock;
          lock.lock();
          try {
              return itemAt(takeIndex); // null when queue is empty
          } finally {
              lock.unlock();
          }
      }
    

Test

  • Producer Producer continuously put data into queue.
    public class ArrayBlockingQueueProducer implements Runnable {
      private ArrayBlockingQueue<Integer> queue;
      public ArrayBlockingQueueProducer(ArrayBlockingQueue<Integer> queue) {
          super();
          this.queue = queue;
      }
      private volatile AtomicInteger ai = new AtomicInteger(0);
      @Override
      public void run() {
          while(true){
              try {
                  queue.put(ai.getAndIncrement());
                  System.out.println("Producer: put " + ai.get() + " into queue...");
                  Thread.sleep(10);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }
    }
    
  • Consumer Consumer get take data from queue 100 times
    public class ArrayBlockingQueueConsumer implements Runnable {
      private ArrayBlockingQueue<Integer> queue;
      public ArrayBlockingQueueConsumer(ArrayBlockingQueue<Integer> queue) {
          super();
          this.queue = queue;
      }
      @Override
      public void run() {
          try {
              for(int i = 0; i < 100; i++){
                  Thread.currentThread().join(10);
                  System.out.println("Consumer: get " + queue.take() + " from queue...");
                  Thread.sleep(10);
              }
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
    
      public static void main(String[] args) throws InterruptedException {
          ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
          new Thread(new ArrayBlockingQueueConsumer(queue)).start();
          new Thread(new ArrayBlockingQueueProducer(queue)).start();
          Thread.currentThread().join();
          System.out.println("Finish...");
      }
    }
    
  • Result

    … Producer: put 108 into queue… Consumer: get 98 from queue… Producer: put 109 into queue… Consumer: get 99 from queue… Producer: put 110 into queue…

We can find that Producer is blocked.