- /*
- * @(#)LinkedBlockingQueue.java 1.7 04/06/11
- *
- * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
- * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
- */
-
- package java.util.concurrent;
- import java.util.concurrent.atomic.*;
- import java.util.concurrent.locks.*;
- import java.util.*;
-
- /**
- * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
- * linked nodes.
- * This queue orders elements FIFO (first-in-first-out).
- * The <em>head</em> of the queue is that element that has been on the
- * queue the longest time.
- * The <em>tail</em> of the queue is that element that has been on the
- * queue the shortest time. New elements
- * are inserted at the tail of the queue, and the queue retrieval
- * operations obtain elements at the head of the queue.
- * Linked queues typically have higher throughput than array-based queues but
- * less predictable performance in most concurrent applications.
- *
- * <p> The optional capacity bound constructor argument serves as a
- * way to prevent excessive queue expansion. The capacity, if unspecified,
- * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
- * dynamically created upon each insertion unless this would bring the
- * queue above capacity.
- *
- * <p>This class and its iterator implement all of the
- * <em>optional</em> methods of the {@link Collection} and {@link
- * Iterator} interfaces.
- *
- * <p>This class is a member of the
- * <a href="{@docRoot}/../guide/collections/index.html">
- * Java Collections Framework</a>.
- *
- * @since 1.5
- * @author Doug Lea
- * @param <E> the type of elements held in this collection
- *
- **/
- public class LinkedBlockingQueue<E> extends AbstractQueue<E>
- implements BlockingQueue<E>, java.io.Serializable {
- private static final long serialVersionUID = -6903933977591709194L;
-
- /*
- * A variant of the "two lock queue" algorithm. The putLock gates
- * entry to put (and offer), and has an associated condition for
- * waiting puts. Similarly for the takeLock. The "count" field
- * that they both rely on is maintained as an atomic to avoid
- * needing to get both locks in most cases. Also, to minimize need
- * for puts to get takeLock and vice-versa, cascading notifies are
- * used. When a put notices that it has enabled at least one take,
- * it signals taker. That taker in turn signals others if more
- * items have been entered since the signal. And symmetrically for
- * takes signalling puts. Operations such as remove(Object) and
- * iterators acquire both locks.
- */
-
- /**
- * Linked list node class
- */
- static class Node<E> {
- /** The item, volatile to ensure barrier separating write and read */
- volatile E item;
- Node<E> next;
- Node(E x) { item = x; }
- }
-
- /** The capacity bound, or Integer.MAX_VALUE if none */
- private final int capacity;
-
- /** Current number of elements */
- private final AtomicInteger count = new AtomicInteger(0);
-
- /** Head of linked list */
- private transient Node<E> head;
-
- /** Tail of linked list */
- private transient Node<E> last;
-
- /** Lock held by take, poll, etc */
- private final ReentrantLock takeLock = new ReentrantLock();
-
- /** Wait queue for waiting takes */
- private final Condition notEmpty = takeLock.newCondition();
-
- /** Lock held by put, offer, etc */
- private final ReentrantLock putLock = new ReentrantLock();
-
- /** Wait queue for waiting puts */
- private final Condition notFull = putLock.newCondition();
-
- /**
- * Signal a waiting take. Called only from put/offer (which do not
- * otherwise ordinarily lock takeLock.)
- */
- private void signalNotEmpty() {
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- }
-
- /**
- * Signal a waiting put. Called only from take/poll.
- */
- private void signalNotFull() {
- final ReentrantLock putLock = this.putLock;
- putLock.lock();
- try {
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- }
-
- /**
- * Create a node and link it at end of queue
- * @param x the item
- */
- private void insert(E x) {
- last = last.next = new Node<E>(x);
- }
-
- /**
- * Remove a node from head of queue,
- * @return the node
- */
- private E extract() {
- Node<E> first = head.next;
- head = first;
- E x = first.item;
- first.item = null;
- return x;
- }
-
- /**
- * Lock to prevent both puts and takes.
- */
- private void fullyLock() {
- putLock.lock();
- takeLock.lock();
- }
-
- /**
- * Unlock to allow both puts and takes.
- */
- private void fullyUnlock() {
- takeLock.unlock();
- putLock.unlock();
- }
-
-
- /**
- * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
- * {@link Integer#MAX_VALUE}.
- */
- public LinkedBlockingQueue() {
- this(Integer.MAX_VALUE);
- }
-
- /**
- * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
- *
- * @param capacity the capacity of this queue.
- * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
- * than zero.
- */
- public LinkedBlockingQueue(int capacity) {
- if (capacity <= 0) throw new IllegalArgumentException();
- this.capacity = capacity;
- last = head = new Node<E>(null);
- }
-
- /**
- * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
- * {@link Integer#MAX_VALUE}, initially containing the elements of the
- * given collection,
- * added in traversal order of the collection's iterator.
- * @param c the collection of elements to initially contain
- * @throws NullPointerException if <tt>c</tt> or any element within it
- * is <tt>null</tt>
- */
- public LinkedBlockingQueue(Collection<? extends E> c) {
- this(Integer.MAX_VALUE);
- for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
- add(it.next());
- }
-
-
- // this doc comment is overridden to remove the reference to collections
- // greater in size than Integer.MAX_VALUE
- /**
- * Returns the number of elements in this queue.
- *
- * @return the number of elements in this queue.
- */
- public int size() {
- return count.get();
- }
-
- // this doc comment is a modified copy of the inherited doc comment,
- // without the reference to unlimited queues.
- /**
- * Returns the number of elements that this queue can ideally (in
- * the absence of memory or resource constraints) accept without
- * blocking. This is always equal to the initial capacity of this queue
- * less the current <tt>size</tt> of this queue.
- * <p>Note that you <em>cannot</em> always tell if
- * an attempt to <tt>add</tt> an element will succeed by
- * inspecting <tt>remainingCapacity</tt> because it may be the
- * case that a waiting consumer is ready to <tt>take</tt> an
- * element out of an otherwise full queue.
- */
- public int remainingCapacity() {
- return capacity - count.get();
- }
-
- /**
- * Adds the specified element to the tail of this queue, waiting if
- * necessary for space to become available.
- * @param o the element to add
- * @throws InterruptedException if interrupted while waiting.
- * @throws NullPointerException if the specified element is <tt>null</tt>.
- */
- public void put(E o) throws InterruptedException {
- if (o == null) throw new NullPointerException();
- // Note: convention in all put/take/etc is to preset
- // local var holding count negative to indicate failure unless set.
- int c = -1;
- final ReentrantLock putLock = this.putLock;
- final AtomicInteger count = this.count;
- putLock.lockInterruptibly();
- try {
- /*
- * Note that count is used in wait guard even though it is
- * not protected by lock. This works because count can
- * only decrease at this point (all other puts are shut
- * out by lock), and we (or some other waiting put) are
- * signalled if it ever changes from
- * capacity. Similarly for all other uses of count in
- * other wait guards.
- */
- try {
- while (count.get() == capacity)
- notFull.await();
- } catch (InterruptedException ie) {
- notFull.signal(); // propagate to a non-interrupted thread
- throw ie;
- }
- insert(o);
- c = count.getAndIncrement();
- if (c + 1 < capacity)
- notFull.signal();
- } finally {
- putLock.unlock();
- }
- if (c == 0)
- signalNotEmpty();
- }
-
- /**
- * Inserts the specified element at the tail of this queue, waiting if
- * necessary up to the specified wait time for space to become available.
- * @param o the element to add
- * @param timeout how long to wait before giving up, in units of
- * <tt>unit</tt>
- * @param unit a <tt>TimeUnit</tt> determining how to interpret the
- * <tt>timeout</tt> parameter
- * @return <tt>true</tt> if successful, or <tt>false</tt> if
- * the specified waiting time elapses before space is available.
- * @throws InterruptedException if interrupted while waiting.
- * @throws NullPointerException if the specified element is <tt>null</tt>.
- */
- public boolean offer(E o, long timeout, TimeUnit unit)
- throws InterruptedException {
-
- if (o == null) throw new NullPointerException();
- long nanos = unit.toNanos(timeout);
- int c = -1;
- final ReentrantLock putLock = this.putLock;
- final AtomicInteger count = this.count;
- putLock.lockInterruptibly();
- try {
- for (;;) {
- if (count.get() < capacity) {
- insert(o);
- c = count.getAndIncrement();
- if (c + 1 < capacity)
- notFull.signal();
- break;
- }
- if (nanos <= 0)
- return false;
- try {
- nanos = notFull.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- notFull.signal(); // propagate to a non-interrupted thread
- throw ie;
- }
- }
- } finally {
- putLock.unlock();
- }
- if (c == 0)
- signalNotEmpty();
- return true;
- }
-
- /**
- * Inserts the specified element at the tail of this queue if possible,
- * returning immediately if this queue is full.
- *
- * @param o the element to add.
- * @return <tt>true</tt> if it was possible to add the element to
- * this queue, else <tt>false</tt>
- * @throws NullPointerException if the specified element is <tt>null</tt>
- */
- public boolean offer(E o) {
- if (o == null) throw new NullPointerException();
- final AtomicInteger count = this.count;
- if (count.get() == capacity)
- return false;
- int c = -1;
- final ReentrantLock putLock = this.putLock;
- putLock.lock();
- try {
- if (count.get() < capacity) {
- insert(o);
- c = count.getAndIncrement();
- if (c + 1 < capacity)
- notFull.signal();
- }
- } finally {
- putLock.unlock();
- }
- if (c == 0)
- signalNotEmpty();
- return c >= 0;
- }
-
-
- public E take() throws InterruptedException {
- E x;
- int c = -1;
- final AtomicInteger count = this.count;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lockInterruptibly();
- try {
- try {
- while (count.get() == 0)
- notEmpty.await();
- } catch (InterruptedException ie) {
- notEmpty.signal(); // propagate to a non-interrupted thread
- throw ie;
- }
-
- x = extract();
- c = count.getAndDecrement();
- if (c > 1)
- notEmpty.signal();
- } finally {
- takeLock.unlock();
- }
- if (c == capacity)
- signalNotFull();
- return x;
- }
-
- public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- E x = null;
- int c = -1;
- long nanos = unit.toNanos(timeout);
- final AtomicInteger count = this.count;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lockInterruptibly();
- try {
- for (;;) {
- if (count.get() > 0) {
- x = extract();
- c = count.getAndDecrement();
- if (c > 1)
- notEmpty.signal();
- break;
- }
- if (nanos <= 0)
- return null;
- try {
- nanos = notEmpty.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- notEmpty.signal(); // propagate to a non-interrupted thread
- throw ie;
- }
- }
- } finally {
- takeLock.unlock();
- }
- if (c == capacity)
- signalNotFull();
- return x;
- }
-
- public E poll() {
- final AtomicInteger count = this.count;
- if (count.get() == 0)
- return null;
- E x = null;
- int c = -1;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- if (count.get() > 0) {
- x = extract();
- c = count.getAndDecrement();
- if (c > 1)
- notEmpty.signal();
- }
- } finally {
- takeLock.unlock();
- }
- if (c == capacity)
- signalNotFull();
- return x;
- }
-
-
- public E peek() {
- if (count.get() == 0)
- return null;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- Node<E> first = head.next;
- if (first == null)
- return null;
- else
- return first.item;
- } finally {
- takeLock.unlock();
- }
- }
-
- /**
- * Removes a single instance of the specified element from this
- * queue, if it is present.
- */
- public boolean remove(Object o) {
- if (o == null) return false;
- boolean removed = false;
- fullyLock();
- try {
- Node<E> trail = head;
- Node<E> p = head.next;
- while (p != null) {
- if (o.equals(p.item)) {
- removed = true;
- break;
- }
- trail = p;
- p = p.next;
- }
- if (removed) {
- p.item = null;
- trail.next = p.next;
- if (count.getAndDecrement() == capacity)
- notFull.signalAll();
- }
- } finally {
- fullyUnlock();
- }
- return removed;
- }
-
- public Object[] toArray() {
- fullyLock();
- try {
- int size = count.get();
- Object[] a = new Object[size];
- int k = 0;
- for (Node<E> p = head.next; p != null; p = p.next)
- a[k++] = p.item;
- return a;
- } finally {
- fullyUnlock();
- }
- }
-
- public <T> T[] toArray(T[] a) {
- fullyLock();
- try {
- int size = count.get();
- if (a.length < size)
- a = (T[])java.lang.reflect.Array.newInstance
- (a.getClass().getComponentType(), size);
-
- int k = 0;
- for (Node p = head.next; p != null; p = p.next)
- a[k++] = (T)p.item;
- return a;
- } finally {
- fullyUnlock();
- }
- }
-
- public String toString() {
- fullyLock();
- try {
- return super.toString();
- } finally {
- fullyUnlock();
- }
- }
-
- /**
- * Atomically removes all of the elements from this queue.
- * The queue will be empty after this call returns.
- */
- public void clear() {
- fullyLock();
- try {
- head.next = null;
- if (count.getAndSet(0) == capacity)
- notFull.signalAll();
- } finally {
- fullyUnlock();
- }
- }
-
- public int drainTo(Collection<? super E> c) {
- if (c == null)
- throw new NullPointerException();
- if (c == this)
- throw new IllegalArgumentException();
- Node first;
- fullyLock();
- try {
- first = head.next;
- head.next = null;
- if (count.getAndSet(0) == capacity)
- notFull.signalAll();
- } finally {
- fullyUnlock();
- }
- // Transfer the elements outside of locks
- int n = 0;
- for (Node<E> p = first; p != null; p = p.next) {
- c.add(p.item);
- p.item = null;
- ++n;
- }
- return n;
- }
-
- public int drainTo(Collection<? super E> c, int maxElements) {
- if (c == null)
- throw new NullPointerException();
- if (c == this)
- throw new IllegalArgumentException();
- if (maxElements <= 0)
- return 0;
- fullyLock();
- try {
- int n = 0;
- Node<E> p = head.next;
- while (p != null && n < maxElements) {
- c.add(p.item);
- p.item = null;
- p = p.next;
- ++n;
- }
- if (n != 0) {
- head.next = p;
- if (count.getAndAdd(-n) == capacity)
- notFull.signalAll();
- }
- return n;
- } finally {
- fullyUnlock();
- }
- }
-
- /**
- * Returns an iterator over the elements in this queue in proper sequence.
- * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
- * will never throw {@link java.util.ConcurrentModificationException},
- * and guarantees to traverse elements as they existed upon
- * construction of the iterator, and may (but is not guaranteed to)
- * reflect any modifications subsequent to construction.
- *
- * @return an iterator over the elements in this queue in proper sequence.
- */
- public Iterator<E> iterator() {
- return new Itr();
- }
-
- private class Itr implements Iterator<E> {
- /*
- * Basic weak-consistent iterator. At all times hold the next
- * item to hand out so that if hasNext() reports true, we will
- * still have it to return even if lost race with a take etc.
- */
- private Node<E> current;
- private Node<E> lastRet;
- private E currentElement;
-
- Itr() {
- final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
- final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
- putLock.lock();
- takeLock.lock();
- try {
- current = head.next;
- if (current != null)
- currentElement = current.item;
- } finally {
- takeLock.unlock();
- putLock.unlock();
- }
- }
-
- public boolean hasNext() {
- return current != null;
- }
-
- public E next() {
- final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
- final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
- putLock.lock();
- takeLock.lock();
- try {
- if (current == null)
- throw new NoSuchElementException();
- E x = currentElement;
- lastRet = current;
- current = current.next;
- if (current != null)
- currentElement = current.item;
- return x;
- } finally {
- takeLock.unlock();
- putLock.unlock();
- }
- }
-
- public void remove() {
- if (lastRet == null)
- throw new IllegalStateException();
- final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
- final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
- putLock.lock();
- takeLock.lock();
- try {
- Node<E> node = lastRet;
- lastRet = null;
- Node<E> trail = head;
- Node<E> p = head.next;
- while (p != null && p != node) {
- trail = p;
- p = p.next;
- }
- if (p == node) {
- p.item = null;
- trail.next = p.next;
- int c = count.getAndDecrement();
- if (c == capacity)
- notFull.signalAll();
- }
- } finally {
- takeLock.unlock();
- putLock.unlock();
- }
- }
- }
-
- /**
- * Save the state to a stream (that is, serialize it).
- *
- * @serialData The capacity is emitted (int), followed by all of
- * its elements (each an <tt>Object</tt>) in the proper order,
- * followed by a null
- * @param s the stream
- */
- private void writeObject(java.io.ObjectOutputStream s)
- throws java.io.IOException {
-
- fullyLock();
- try {
- // Write out any hidden stuff, plus capacity
- s.defaultWriteObject();
-
- // Write out all elements in the proper order.
- for (Node<E> p = head.next; p != null; p = p.next)
- s.writeObject(p.item);
-
- // Use trailing null as sentinel
- s.writeObject(null);
- } finally {
- fullyUnlock();
- }
- }
-
- /**
- * Reconstitute this queue instance from a stream (that is,
- * deserialize it).
- * @param s the stream
- */
- private void readObject(java.io.ObjectInputStream s)
- throws java.io.IOException, ClassNotFoundException {
- // Read in capacity, and any hidden stuff
- s.defaultReadObject();
-
- count.set(0);
- last = head = new Node<E>(null);
-
- // Read in all elements and place in queue
- for (;;) {
- E item = (E)s.readObject();
- if (item == null)
- break;
- add(item);
- }
- }
- }