• ** 非阻塞**的链表队列
  • 通过CAS实现无锁非阻塞

    基于链接节点的、无界的、线程安全。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列检索操作从队列头部获得元素。当许多线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许 null 元素。

Compare and swap(CAS)

  • CAS是一种替代锁存在的无锁并发机制。
  • 该种无锁机制也是乐观锁的体现。
  • Why need CAS:一个修改操作可以分为:从内存中读取值->修改值->写回内存,在高并发的过程中,这一个过程失去了其原子性,所以修改值的原始值可能已经被其他线程修改了。 i != i 这种操作在单线程中一定是true,在多线程中i的值可能在别的线程被更改。
  • 如何实现CAS:在循环中读取这次循环的内存中存储值,进行修改,在写回内存中时比较内存中的值是否还是第一步取出的值,不是的话说明这次循环失败,重新开始,是的话写入新的值。
    private static final Unsafe unsafe = Unsafe.getUnsafe(); //JNI实现的Unsafe类。
    public final boolean compareAndSet(long expect, long update) {
      return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
    }
    

Node

  • ConcurrentLinkedQueue的内部私有类
  • 使用了通过C++编写的Unsafe类
     private static class Node<E> {
          volatile E item;
          volatile Node<E> next;
          /**
           * Constructs a new node.  Uses relaxed write because item can
           * only be seen after publication via casNext.
           */
          Node(E item) {
              UNSAFE.putObject(this, itemOffset, item);
          }
          //通过CAS保证了操作的原子性。
          //CAS应用:表示设置当前Node的item值。第一个参数为期望值,第二个参数为设置目标值。当当前值等于期望值时(就是没有被其他人改过),就会将目标设置为val。
          boolean casItem(E cmp, E val) {
              return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
          }
          void lazySetNext(Node<E> val) {
              UNSAFE.putOrderedObject(this, nextOffset, val);
          }
          //CAS 下一个结点
          boolean casNext(Node<E> cmp, Node<E> val) {
              return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
          }
          // Unsafe mechanics
          private static final sun.misc.Unsafe UNSAFE;
          private static final long itemOffset;
          private static final long nextOffset;
          static {
              try {
                  UNSAFE = sun.misc.Unsafe.getUnsafe();
                  Class<?> k = Node.class;
                  itemOffset = UNSAFE.objectFieldOffset
                      (k.getDeclaredField("item"));
                  nextOffset = UNSAFE.objectFieldOffset
                      (k.getDeclaredField("next"));
              } catch (Exception e) {
                  throw new Error(e);
              }
          }
      }
    

ConcurrentLinkedQueue’s API

插入元素

  • add() 通过offer方法添加元素。
      public boolean add(E e) {
          return offer(e);
      }
    
  • offer() clq
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // 当尾结点就是指向链表的最后一个元素,尝试CAS新的结点
                if (p.casNext(null, newNode)) {	//将新的结点添加到链表的底部
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // 如上图所示,如果当前p不是tail结点,就将新接入的结点当做尾结点。相当于尾结点一次性跳跃两个结点。
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)	//哨兵结点情况
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
				//如果在读取中,t的值被更新了,“打赌”t被更新正确了,不然重新从头开始。
                p = (t != (t = tail)) ? t : head;
            else	//q不是最后一个结点
                // Check for tail updates after two hops.
				//如果在该操作过程中,t的值被更新了,我们“打赌”t的值被更新正确
				//不然我们将p指针向后移动一位,重新进入循环
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

高效读写的队列:深度剖析ConcurrentLinkedQueue

出队列

cdl

  • poll() 出队列不会阻塞,如果队列为空则返回null。
      public E poll() {
          restartFromHead:
          for (;;) {
              for (Node<E> h = head, p = h, q;;) {
                  E item = p.item;
                  //取出头元素的值,并将其CAS成null。
                  if (item != null && p.casItem(item, null)) {
                      // Successful CAS is the linearization point
                      // for item to be removed from this queue.
                      //"打赌",假设头结点head已经被更新,则沿用h,不然更新为下一个结点
                      if (p != h) // hop two nodes at a time
                          updateHead(h, ((q = p.next) != null) ? q : p);
                      return item;
                  }
                  //当前结点为空,下一个结点也为空,队列为空,退出循环,返回null。
                  else if ((q = p.next) == null) {
                      updateHead(h, p);
                      return null;
                  }
                  //遇到哨兵结点,从新从头开始。
                  else if (p == q)
                      continue restartFromHead;
                  //此时q == p.next, 将指针向右移动一位。
                  else
                      p = q;
              }
          }
      }
    

其他方法

  • size() 检查队列大小,O(n),为了保证并发性不得不每次都遍历链表。
      public int size() {	//返回检查队列的大小
          int count = 0;
          for (Node<E> p = first(); p != null; p = succ(p))
              if (p.item != null)
                  // Collection.size() spec says to max out
                  if (++count == Integer.MAX_VALUE)
                      break;
          return count;
      }
    
  • isEmpty() 判断链表非空,O(1)级别

Test

  • 生产者
    public class ConcurrentLinkQueueProducer implements Runnable{
      private ConcurrentLinkedQueue<Integer> q;
      public ConcurrentLinkQueueProducer(ConcurrentLinkedQueue<Integer> q) {
          super();
          this.q = q;
      }
      @Override
      public void run() {
          AtomicInteger al = new AtomicInteger(0);//通过AtomicInteger保证原子性,避免锁。
          while(true){
              q.offer(al.get());
              System.out.println("Producer: put " + al.getAndIncrement() + " into queue...");
              try {
                  Thread.sleep(10);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      }
    }
    
  • 消费者
    public class ConcurrentLinkedQueueConsumer implements Runnable {
      private ConcurrentLinkedQueue<Integer> q;
      public ConcurrentLinkedQueueConsumer(ConcurrentLinkedQueue<Integer> q) {
          super();
          this.q = q;
      }
      @Override
      public void run() {
          try {
              while(true){
                  System.out.println("Consumer: get " + q.poll() + " from queue...");
                  Thread.sleep(10);
              }
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
    
      public static void main(String[] args) throws InterruptedException {
          ConcurrentLinkedQueue<Integer> q = new ConcurrentLinkedQueue<>();
          Thread producer = new Thread(new ConcurrentLinkQueueProducer(q));
          producer.start();
          producer.join(100);
          new Thread(new ConcurrentLinkedQueueConsumer(q)).start();
      }
    }