Java容器类(4)-Queue

Posted by zhidaliao on October 22, 2016

This document is not completed and will be updated anytime.

点击查看大图

普通容器类

Queue

队列与栈一样是一种线性结构,因此以常见的线性表如数组、链表作为底层的数据结构。

虽然 List 可以模拟 Queue ,但是需要Queue这种数据接口,去掉List随机访问需求,从而实现高效的并发操作

通过接口的定义可以看出,没有随机访问,提供了针对FIFO的方法。

public interface Queue<E> extends Collection<E> {
 
    boolean add(E e);
 
    boolean offer(E e);
 
    E remove();
 
    E poll();
 
    E element();
 
    E peek();
}

队列(queue)原理

PriorityQueue

底层为数组,使用二叉堆算法,优先级队列

使用方式:

PriorityQueue<Object> queue = new PriorityQueue<Object>(6, new Comparator<Object>() {
    @Override
    public int compare(Object t1, Object t2) {
    }
});

源码:

public class PriorityQueue<E> extends AbstractQueue<E>
    implements java.io.Serializable {

    private static final long serialVersionUID = -7720805057305804111L;

    private static final int DEFAULT_INITIAL_CAPACITY = 11;
 
    private transient Object[] queue;
 
    private int size = 0;
 
    private final Comparator<? super E> comparator;
 
    private transient int modCount = 0;
 
    public PriorityQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
    
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        if (i >= queue.length)
            grow(i + 1);
        size = i + 1;
        if (i == 0)
            queue[0] = e;
        else
            siftUp(i, e);
        return true;
    }

    private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x);
        else
            siftUpComparable(k, x);
    }

    private void siftUpUsingComparator(int k, E x) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (comparator.compare(x, (E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = x;
    }
}

jdk PriorityQueue优先队列工作原理分析

JDK源码研究PriorityQueue(优先队列)

Deque

继承了Queue,增加了额外的双向接口方法。

public interface Deque<E> extends Queue<E> {
    
    void addFirst(E e);
 
    void addLast(E e);
 
    boolean offerFirst(E e);
 
    boolean offerLast(E e);
}

AbstractQueue

抽象类,使用数组,新增了通用方法(对接口中的方法进行封装,业务处理)

public abstract class AbstractQueue<E>
    extends AbstractCollection<E>
    implements Queue<E> {
 
    protected AbstractQueue() {
    }

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
    ..
}

ArrayDeque

public class ArrayDeque<E> extends AbstractCollection<E>
                           implements Deque<E>, Cloneable, Serializable
{ 
    private transient E[] elements;
 
    private transient int head;
 
    private transient int tail;

    private static final int MIN_INITIAL_CAPACITY = 8;

    private void allocateElements(int numElements) {
        int initialCapacity = MIN_INITIAL_CAPACITY;
        if (numElements >= initialCapacity) {
            initialCapacity = numElements;
            initialCapacity |= (initialCapacity >>>  1);
            initialCapacity |= (initialCapacity >>>  2);
            initialCapacity |= (initialCapacity >>>  4);
            initialCapacity |= (initialCapacity >>>  8);
            initialCapacity |= (initialCapacity >>> 16);
            initialCapacity++;

            if (initialCapacity < 0)   // Too many elements, must back off
                initialCapacity >>>= 1;// Good luck allocating 2 ^ 30 elements
        }
        elements = (E[]) new Object[initialCapacity];
    }

    
    private void doubleCapacity() {
        assert head == tail;
        int p = head;
        int n = elements.length;
        int r = n - p; // number of elements to the right of p
        int newCapacity = n << 1;
        if (newCapacity < 0)
            throw new IllegalStateException("Sorry, deque too big");
        Object[] a = new Object[newCapacity];
        System.arraycopy(elements, p, a, 0, r);
        System.arraycopy(elements, 0, a, r, p);
        elements = (E[])a;
        head = 0;
        tail = n;
    }

    
    private <T> T[] copyElements(T[] a) {
        if (head < tail) {
            System.arraycopy(elements, head, a, 0, size());
        } else if (head > tail) {
            int headPortionLen = elements.length - head;
            System.arraycopy(elements, head, a, 0, headPortionLen);
            System.arraycopy(elements, 0, a, headPortionLen, tail);
        }
        return a;
    }

    public ArrayDeque() {
        elements = (E[]) new Object[16];
    }

    public ArrayDeque(int numElements) {
        allocateElements(numElements);
    }

    public ArrayDeque(Collection<? extends E> c) {
        allocateElements(c.size());
        addAll(c);
    }

    public void addFirst(E e) {
        if (e == null)
            throw new NullPointerException();
        elements[head = (head - 1) & (elements.length - 1)] = e;
        if (head == tail)
            doubleCapacity();
    }

    public void addLast(E e) {
        if (e == null)
            throw new NullPointerException();
        elements[tail] = e;
        if ( (tail = (tail + 1) & (elements.length - 1)) == head)
            doubleCapacity();
    }

    public boolean offerFirst(E e) {
        addFirst(e);
        return true;
    }

    ...
}

allocateElements 此方法是给数组分配初始容量,初始容量并不是numElements,而是大于指定长度的最小的2的幂正数

假设a=1xxxxxxxxxxxx…(base 2, x代表该位任意为0或1) 首先a |= (a »> 1)之后,a => 11xxxxxxxx…(最高两位是1) 然后a |= (a»> 2): a => 1111xxxxxxxxx…(最高四位是1) 再a |= (a»> 4): a => 11111111xxxxxxx…(最高八位是1) …….. 最终,a的所有低位也都变成了1,即11111111…111(全是1) 再a++ 就变成了10000000000…000(加一之后进位,比原来的二进制串多了一位,且第一位是1,其它位都是0),

得到的结果大于等于任意一个操作数 结果有趋向每个位都为1的趋势 所以这样运算下来,运算得到的结果的二进制一定是每个位都是1,再加一个,就刚好是2的整数幂了

jdk源码分析ArrayDeque

ArrayDeque源码分析

Deque & ArrayDeque

Deque的实现类

ArrayDeque, ConcurrentLinkedDeque, LinkedBlockingDeque, LinkedList

双向队列

Deque is an interface and has two implementations: LinkedList and ArrayDeque.

Deque dq = new LinkedList();
Deque dq = new ArrayDeque();
When to use ArrayList and when to use ArrayDeque?

ArrayDeque has the ability to add or remove the elements from both ends (head or tail)

on the other hand removing last element from ArrayList takes O(n) time as it traverses the whole list to reach the end.

So if you want to add or remove elements from both ends choose ArrayDeque over ArrayList, however if you only want to perform the opreation on the tail (at the end) then you should choose ArrayList.


同步容器类

BlockingQueue 接口

有界阻塞队列,定义了方法没有具体实现,相对于Queue 增加了阻塞超时参数,批量几个处理方法。

BlockingQueue 接口实现类:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, LinkedTransferQueue, PriorityBlockingQueue, SynchronousQueue

public interface BlockingQueue<E> extends Queue<E> {

    boolean add(E e);
    
    boolean offer(E e);
    
    void put(E e) throws InterruptedException;
    
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
    
    E take() throws InterruptedException;

    int remainingCapacity();

    boolean remove(Object o);
    
    public boolean contains(Object o);

    int drainTo(Collection<? super E> c);
    
    int drainTo(Collection<? super E> c, int maxElements);
}

TransferQueue

TransferQueue 的设计主要是避免队列中产生过多的数据,消费者无法及时消费完成。

public interface TransferQueue<E> extends BlockingQueue<E> {
    
    boolean tryTransfer(E e);

    void transfer(E e) throws InterruptedException;
    
    // 等待消费者接收,如果在指定的时间没有消费者接收将会返回false
    boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    
    boolean hasWaitingConsumer();

    int getWaitingConsumerCount();
}

LinkedTransferQueue

利用链表FIFO实现无界队列,底层数据结构为Node

JUC源码分析25-队列-LinkedTransferQueue

ArrayBlockingQueue

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    final Object[] items;

    int takeIndex;

    int putIndex;

    int count;

    /** Main lock guarding all access */
    final ReentrantLock lock;
    
    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

    public boolean add(E e) {
        return super.add(e);
    }

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            insert(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
    ...
}

ConcurrentLinkedQueue

并发的顺序队列

SynchronousQueue

不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。

Java并发包中的同步队列SynchronousQueue实现原理

ArrayBlockingQueue

DelayQueue

LinkedBlockingQueue

PriorityBlockingQueue

LinkedTransferQueue

LinkedBlockingDeque


同步工具

AbstractQueuedSynchronizer

深度解析Java 8:JDK1.8 AbstractQueuedSynchronizer的实现分析(上)

背景

Java中的FutureTask作为可异步执行任务并可获取执行结果而被大家所熟知。通常可以使用future.get()来获取线程的执行结果,在线程执行结束之前,get方法会一直阻塞状态,直到call()返回,其优点是使用线程异步执行任务的情况下还可以获取到线程的执行结果,但是FutureTask的以上功能却是依靠通过一个叫AbstractQueuedSynchronizer的类来实现。

至少在JDK 1.5、JDK1.6版本是这样的(从1.7开始FutureTask已经被其作者Doug Lea修改为不再依赖AbstractQueuedSynchronizer实现了,这是JDK1.7的变化之一)。但是AbstractQueuedSynchronizer在JDK1.8中还有如下图所示的众多子类:ReentrantLock、Semaphore、CountDownLatch、AbstractFuture

在锁框架中,AbstractQueuedSynchronizer抽象类可以毫不夸张的说,占据着核心地位,它提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架

AbstractQueuedSynchronizer类底层的数据结构是使用双向链表,是队列的一种实现,故也可看成是队列

  • 其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。
  • 而Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。

AQS的功能可以分为两类:独占功能和共享功能

它的所有子类中,要么实现并使用了它独占功能的API,要么使用了共享锁的功能,而不会同时使用两套API,即便是它最有名的子类ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套API来实现的,为什么这么做,后面我们再分析,到目前为止,我们只需要明白AQS在功能上有独占控制和共享控制两种功能即可。

如FutureTask(JDK1.6)一样,ReentrantLock内部有代理类完成具体操作,ReentrantLock只是封装了统一的一套API而已

独占:

有那么一个被volatile修饰的标志位叫做key,用来表示有没有线程拿走了锁,或者说,锁还存不存在,还需要一个线程安全的队列,维护一堆被挂起的线程,以至于当锁被归还时,能通知到这些被挂起的线程,可以来竞争获取锁了。

共享锁。

代表:CountDownLatch. 共享的状态是可以被共享的,也就是意味着其他AQS队列中的其他节点也应能第一时间知道state状态的变化。1

ReentrantLock

公平锁

先判断队列的头指针是否为NULL,为NULL就是获取锁,修改state值。没有就排队。

final void lock() {
    acquire(1);
}

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
非公平锁

直接修改状态位 state,成功直接返回,失败入队列

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

深度解析Java 8:AbstractQueuedSynchronizer的实现分析(上)

CountDownLatch

深度解析Java 8:AbstractQueuedSynchronizer 的实现分析(下)

Semaphore

本质是 AbstractQueueSynchronizer 共享锁。 分析同步工具 Semaphore 和 CyclicBarrier 的实现原理

Barier

CyclicBarrier是通过ReentrantLock(独占锁)和Condition来实现的

分析同步工具 Semaphore 和 CyclicBarrier 的实现原理

其他

ThreadLocal

类似于栈封闭

  • ThreadLocal 避免修改到其他线程的变量,所有操作局限在本线程,没有数据入库的操作,因为是全局变量 ; T1 A=5 ,T2 修改A=6 ,导致错误的 T1 A=6
  • 使用ThreadLocal,能使线程中的某个值所有改变只在该当前线程中变动,不会被其他线程操作,保证了当前线程的数据不共享,避免竞态条件的发生。

内部实现:

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null)
            return (T)e.value;
    }
    return setInitialValue();
}


public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}   


static class ThreadLocalMap {

    static class Entry extends WeakReference<ThreadLocal> {
        /** The value associated with this ThreadLocal. */
        Object value;

        Entry(ThreadLocal k, Object v) {
            super(k);
            value = v;
        }
    }
   
    private static final int INITIAL_CAPACITY = 16;
   
    private Entry[] table;
    
    private int size = 0;
 
    private int threshold; // Default to 0

    private void setThreshold(int len) {
        threshold = len * 2 / 3;
    }
}

其他

RandomAccess

1) 有两个类实现了 RandomAccess 接口:ArrayList Vector

2) RandomAccess 是一个声明接口,表示集合内的元素获取的时间是一样的,第一个和第一万个元素获取的时间一致。用来表明其支持快速(通常是固定时间)随机访问。此接口的主要目的是允许一般的算法更改其行为,从而在将其应用到随机或连续访问列表时能提供良好的性能。

3) RandomAccess 没有声明任何方法,所以也成为标记接口.用来向编译器暗示

RandomAccess 接口是List 实现所使用的标记接口,

在对List特别的遍历算法中,要尽量来判断是属于RandomAccess(如ArrayList)还是SequenceAccess(如LinkedList),因为适合RandomAccess List的遍历算法,用在SequenceAccess List上就差别很大

即对于实现了RandomAccess接口的类实例而言,遍历的效率依次递减:

for (int i=0, i<list.size(); i++)
    list.get(i);

for (int i=0, n=list.size(); i < n;!i++)
    itr.next();

for (Iterator i=list.iterator(); i.hasNext(); )
    i.next();

对于容器类的接口,要使用什么方式进行遍历需要自己指定:

Object o;
if (listObject instanceof RandomAccess)
{
  for (int i=0, n=list.size(); i < n; i++)
  {
    o = list.get(i);
    //do something with object o
  }

}
else
{
  Iterator itr = list.iterator();
  for (int i=0, n=list.size(); i < n; i++)
  {
    o = itr.next();
    //do something with object o

  }
}

参考网址

Java Collections

http://www.falkhausen.de/Java-8/