1. /*
  2. * @(#)LinkedBlockingQueue.java 1.7 04/06/11
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package java.util.concurrent;
  8. import java.util.concurrent.atomic.*;
  9. import java.util.concurrent.locks.*;
  10. import java.util.*;
  11. /**
  12. * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
  13. * linked nodes.
  14. * This queue orders elements FIFO (first-in-first-out).
  15. * The <em>head</em> of the queue is that element that has been on the
  16. * queue the longest time.
  17. * The <em>tail</em> of the queue is that element that has been on the
  18. * queue the shortest time. New elements
  19. * are inserted at the tail of the queue, and the queue retrieval
  20. * operations obtain elements at the head of the queue.
  21. * Linked queues typically have higher throughput than array-based queues but
  22. * less predictable performance in most concurrent applications.
  23. *
  24. * <p> The optional capacity bound constructor argument serves as a
  25. * way to prevent excessive queue expansion. The capacity, if unspecified,
  26. * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
  27. * dynamically created upon each insertion unless this would bring the
  28. * queue above capacity.
  29. *
  30. * <p>This class and its iterator implement all of the
  31. * <em>optional</em> methods of the {@link Collection} and {@link
  32. * Iterator} interfaces.
  33. *
  34. * <p>This class is a member of the
  35. * <a href="{@docRoot}/../guide/collections/index.html">
  36. * Java Collections Framework</a>.
  37. *
  38. * @since 1.5
  39. * @author Doug Lea
  40. * @param <E> the type of elements held in this collection
  41. *
  42. **/
  43. public class LinkedBlockingQueue<E> extends AbstractQueue<E>
  44. implements BlockingQueue<E>, java.io.Serializable {
  45. private static final long serialVersionUID = -6903933977591709194L;
  46. /*
  47. * A variant of the "two lock queue" algorithm. The putLock gates
  48. * entry to put (and offer), and has an associated condition for
  49. * waiting puts. Similarly for the takeLock. The "count" field
  50. * that they both rely on is maintained as an atomic to avoid
  51. * needing to get both locks in most cases. Also, to minimize need
  52. * for puts to get takeLock and vice-versa, cascading notifies are
  53. * used. When a put notices that it has enabled at least one take,
  54. * it signals taker. That taker in turn signals others if more
  55. * items have been entered since the signal. And symmetrically for
  56. * takes signalling puts. Operations such as remove(Object) and
  57. * iterators acquire both locks.
  58. */
  59. /**
  60. * Linked list node class
  61. */
  62. static class Node<E> {
  63. /** The item, volatile to ensure barrier separating write and read */
  64. volatile E item;
  65. Node<E> next;
  66. Node(E x) { item = x; }
  67. }
  68. /** The capacity bound, or Integer.MAX_VALUE if none */
  69. private final int capacity;
  70. /** Current number of elements */
  71. private final AtomicInteger count = new AtomicInteger(0);
  72. /** Head of linked list */
  73. private transient Node<E> head;
  74. /** Tail of linked list */
  75. private transient Node<E> last;
  76. /** Lock held by take, poll, etc */
  77. private final ReentrantLock takeLock = new ReentrantLock();
  78. /** Wait queue for waiting takes */
  79. private final Condition notEmpty = takeLock.newCondition();
  80. /** Lock held by put, offer, etc */
  81. private final ReentrantLock putLock = new ReentrantLock();
  82. /** Wait queue for waiting puts */
  83. private final Condition notFull = putLock.newCondition();
  84. /**
  85. * Signal a waiting take. Called only from put/offer (which do not
  86. * otherwise ordinarily lock takeLock.)
  87. */
  88. private void signalNotEmpty() {
  89. final ReentrantLock takeLock = this.takeLock;
  90. takeLock.lock();
  91. try {
  92. notEmpty.signal();
  93. } finally {
  94. takeLock.unlock();
  95. }
  96. }
  97. /**
  98. * Signal a waiting put. Called only from take/poll.
  99. */
  100. private void signalNotFull() {
  101. final ReentrantLock putLock = this.putLock;
  102. putLock.lock();
  103. try {
  104. notFull.signal();
  105. } finally {
  106. putLock.unlock();
  107. }
  108. }
  109. /**
  110. * Create a node and link it at end of queue
  111. * @param x the item
  112. */
  113. private void insert(E x) {
  114. last = last.next = new Node<E>(x);
  115. }
  116. /**
  117. * Remove a node from head of queue,
  118. * @return the node
  119. */
  120. private E extract() {
  121. Node<E> first = head.next;
  122. head = first;
  123. E x = first.item;
  124. first.item = null;
  125. return x;
  126. }
  127. /**
  128. * Lock to prevent both puts and takes.
  129. */
  130. private void fullyLock() {
  131. putLock.lock();
  132. takeLock.lock();
  133. }
  134. /**
  135. * Unlock to allow both puts and takes.
  136. */
  137. private void fullyUnlock() {
  138. takeLock.unlock();
  139. putLock.unlock();
  140. }
  141. /**
  142. * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
  143. * {@link Integer#MAX_VALUE}.
  144. */
  145. public LinkedBlockingQueue() {
  146. this(Integer.MAX_VALUE);
  147. }
  148. /**
  149. * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
  150. *
  151. * @param capacity the capacity of this queue.
  152. * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
  153. * than zero.
  154. */
  155. public LinkedBlockingQueue(int capacity) {
  156. if (capacity <= 0) throw new IllegalArgumentException();
  157. this.capacity = capacity;
  158. last = head = new Node<E>(null);
  159. }
  160. /**
  161. * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
  162. * {@link Integer#MAX_VALUE}, initially containing the elements of the
  163. * given collection,
  164. * added in traversal order of the collection's iterator.
  165. * @param c the collection of elements to initially contain
  166. * @throws NullPointerException if <tt>c</tt> or any element within it
  167. * is <tt>null</tt>
  168. */
  169. public LinkedBlockingQueue(Collection<? extends E> c) {
  170. this(Integer.MAX_VALUE);
  171. for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
  172. add(it.next());
  173. }
  174. // this doc comment is overridden to remove the reference to collections
  175. // greater in size than Integer.MAX_VALUE
  176. /**
  177. * Returns the number of elements in this queue.
  178. *
  179. * @return the number of elements in this queue.
  180. */
  181. public int size() {
  182. return count.get();
  183. }
  184. // this doc comment is a modified copy of the inherited doc comment,
  185. // without the reference to unlimited queues.
  186. /**
  187. * Returns the number of elements that this queue can ideally (in
  188. * the absence of memory or resource constraints) accept without
  189. * blocking. This is always equal to the initial capacity of this queue
  190. * less the current <tt>size</tt> of this queue.
  191. * <p>Note that you <em>cannot</em> always tell if
  192. * an attempt to <tt>add</tt> an element will succeed by
  193. * inspecting <tt>remainingCapacity</tt> because it may be the
  194. * case that a waiting consumer is ready to <tt>take</tt> an
  195. * element out of an otherwise full queue.
  196. */
  197. public int remainingCapacity() {
  198. return capacity - count.get();
  199. }
  200. /**
  201. * Adds the specified element to the tail of this queue, waiting if
  202. * necessary for space to become available.
  203. * @param o the element to add
  204. * @throws InterruptedException if interrupted while waiting.
  205. * @throws NullPointerException if the specified element is <tt>null</tt>.
  206. */
  207. public void put(E o) throws InterruptedException {
  208. if (o == null) throw new NullPointerException();
  209. // Note: convention in all put/take/etc is to preset
  210. // local var holding count negative to indicate failure unless set.
  211. int c = -1;
  212. final ReentrantLock putLock = this.putLock;
  213. final AtomicInteger count = this.count;
  214. putLock.lockInterruptibly();
  215. try {
  216. /*
  217. * Note that count is used in wait guard even though it is
  218. * not protected by lock. This works because count can
  219. * only decrease at this point (all other puts are shut
  220. * out by lock), and we (or some other waiting put) are
  221. * signalled if it ever changes from
  222. * capacity. Similarly for all other uses of count in
  223. * other wait guards.
  224. */
  225. try {
  226. while (count.get() == capacity)
  227. notFull.await();
  228. } catch (InterruptedException ie) {
  229. notFull.signal(); // propagate to a non-interrupted thread
  230. throw ie;
  231. }
  232. insert(o);
  233. c = count.getAndIncrement();
  234. if (c + 1 < capacity)
  235. notFull.signal();
  236. } finally {
  237. putLock.unlock();
  238. }
  239. if (c == 0)
  240. signalNotEmpty();
  241. }
  242. /**
  243. * Inserts the specified element at the tail of this queue, waiting if
  244. * necessary up to the specified wait time for space to become available.
  245. * @param o the element to add
  246. * @param timeout how long to wait before giving up, in units of
  247. * <tt>unit</tt>
  248. * @param unit a <tt>TimeUnit</tt> determining how to interpret the
  249. * <tt>timeout</tt> parameter
  250. * @return <tt>true</tt> if successful, or <tt>false</tt> if
  251. * the specified waiting time elapses before space is available.
  252. * @throws InterruptedException if interrupted while waiting.
  253. * @throws NullPointerException if the specified element is <tt>null</tt>.
  254. */
  255. public boolean offer(E o, long timeout, TimeUnit unit)
  256. throws InterruptedException {
  257. if (o == null) throw new NullPointerException();
  258. long nanos = unit.toNanos(timeout);
  259. int c = -1;
  260. final ReentrantLock putLock = this.putLock;
  261. final AtomicInteger count = this.count;
  262. putLock.lockInterruptibly();
  263. try {
  264. for (;;) {
  265. if (count.get() < capacity) {
  266. insert(o);
  267. c = count.getAndIncrement();
  268. if (c + 1 < capacity)
  269. notFull.signal();
  270. break;
  271. }
  272. if (nanos <= 0)
  273. return false;
  274. try {
  275. nanos = notFull.awaitNanos(nanos);
  276. } catch (InterruptedException ie) {
  277. notFull.signal(); // propagate to a non-interrupted thread
  278. throw ie;
  279. }
  280. }
  281. } finally {
  282. putLock.unlock();
  283. }
  284. if (c == 0)
  285. signalNotEmpty();
  286. return true;
  287. }
  288. /**
  289. * Inserts the specified element at the tail of this queue if possible,
  290. * returning immediately if this queue is full.
  291. *
  292. * @param o the element to add.
  293. * @return <tt>true</tt> if it was possible to add the element to
  294. * this queue, else <tt>false</tt>
  295. * @throws NullPointerException if the specified element is <tt>null</tt>
  296. */
  297. public boolean offer(E o) {
  298. if (o == null) throw new NullPointerException();
  299. final AtomicInteger count = this.count;
  300. if (count.get() == capacity)
  301. return false;
  302. int c = -1;
  303. final ReentrantLock putLock = this.putLock;
  304. putLock.lock();
  305. try {
  306. if (count.get() < capacity) {
  307. insert(o);
  308. c = count.getAndIncrement();
  309. if (c + 1 < capacity)
  310. notFull.signal();
  311. }
  312. } finally {
  313. putLock.unlock();
  314. }
  315. if (c == 0)
  316. signalNotEmpty();
  317. return c >= 0;
  318. }
  319. public E take() throws InterruptedException {
  320. E x;
  321. int c = -1;
  322. final AtomicInteger count = this.count;
  323. final ReentrantLock takeLock = this.takeLock;
  324. takeLock.lockInterruptibly();
  325. try {
  326. try {
  327. while (count.get() == 0)
  328. notEmpty.await();
  329. } catch (InterruptedException ie) {
  330. notEmpty.signal(); // propagate to a non-interrupted thread
  331. throw ie;
  332. }
  333. x = extract();
  334. c = count.getAndDecrement();
  335. if (c > 1)
  336. notEmpty.signal();
  337. } finally {
  338. takeLock.unlock();
  339. }
  340. if (c == capacity)
  341. signalNotFull();
  342. return x;
  343. }
  344. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  345. E x = null;
  346. int c = -1;
  347. long nanos = unit.toNanos(timeout);
  348. final AtomicInteger count = this.count;
  349. final ReentrantLock takeLock = this.takeLock;
  350. takeLock.lockInterruptibly();
  351. try {
  352. for (;;) {
  353. if (count.get() > 0) {
  354. x = extract();
  355. c = count.getAndDecrement();
  356. if (c > 1)
  357. notEmpty.signal();
  358. break;
  359. }
  360. if (nanos <= 0)
  361. return null;
  362. try {
  363. nanos = notEmpty.awaitNanos(nanos);
  364. } catch (InterruptedException ie) {
  365. notEmpty.signal(); // propagate to a non-interrupted thread
  366. throw ie;
  367. }
  368. }
  369. } finally {
  370. takeLock.unlock();
  371. }
  372. if (c == capacity)
  373. signalNotFull();
  374. return x;
  375. }
  376. public E poll() {
  377. final AtomicInteger count = this.count;
  378. if (count.get() == 0)
  379. return null;
  380. E x = null;
  381. int c = -1;
  382. final ReentrantLock takeLock = this.takeLock;
  383. takeLock.lock();
  384. try {
  385. if (count.get() > 0) {
  386. x = extract();
  387. c = count.getAndDecrement();
  388. if (c > 1)
  389. notEmpty.signal();
  390. }
  391. } finally {
  392. takeLock.unlock();
  393. }
  394. if (c == capacity)
  395. signalNotFull();
  396. return x;
  397. }
  398. public E peek() {
  399. if (count.get() == 0)
  400. return null;
  401. final ReentrantLock takeLock = this.takeLock;
  402. takeLock.lock();
  403. try {
  404. Node<E> first = head.next;
  405. if (first == null)
  406. return null;
  407. else
  408. return first.item;
  409. } finally {
  410. takeLock.unlock();
  411. }
  412. }
  413. /**
  414. * Removes a single instance of the specified element from this
  415. * queue, if it is present.
  416. */
  417. public boolean remove(Object o) {
  418. if (o == null) return false;
  419. boolean removed = false;
  420. fullyLock();
  421. try {
  422. Node<E> trail = head;
  423. Node<E> p = head.next;
  424. while (p != null) {
  425. if (o.equals(p.item)) {
  426. removed = true;
  427. break;
  428. }
  429. trail = p;
  430. p = p.next;
  431. }
  432. if (removed) {
  433. p.item = null;
  434. trail.next = p.next;
  435. if (count.getAndDecrement() == capacity)
  436. notFull.signalAll();
  437. }
  438. } finally {
  439. fullyUnlock();
  440. }
  441. return removed;
  442. }
  443. public Object[] toArray() {
  444. fullyLock();
  445. try {
  446. int size = count.get();
  447. Object[] a = new Object[size];
  448. int k = 0;
  449. for (Node<E> p = head.next; p != null; p = p.next)
  450. a[k++] = p.item;
  451. return a;
  452. } finally {
  453. fullyUnlock();
  454. }
  455. }
  456. public <T> T[] toArray(T[] a) {
  457. fullyLock();
  458. try {
  459. int size = count.get();
  460. if (a.length < size)
  461. a = (T[])java.lang.reflect.Array.newInstance
  462. (a.getClass().getComponentType(), size);
  463. int k = 0;
  464. for (Node p = head.next; p != null; p = p.next)
  465. a[k++] = (T)p.item;
  466. return a;
  467. } finally {
  468. fullyUnlock();
  469. }
  470. }
  471. public String toString() {
  472. fullyLock();
  473. try {
  474. return super.toString();
  475. } finally {
  476. fullyUnlock();
  477. }
  478. }
  479. /**
  480. * Atomically removes all of the elements from this queue.
  481. * The queue will be empty after this call returns.
  482. */
  483. public void clear() {
  484. fullyLock();
  485. try {
  486. head.next = null;
  487. if (count.getAndSet(0) == capacity)
  488. notFull.signalAll();
  489. } finally {
  490. fullyUnlock();
  491. }
  492. }
  493. public int drainTo(Collection<? super E> c) {
  494. if (c == null)
  495. throw new NullPointerException();
  496. if (c == this)
  497. throw new IllegalArgumentException();
  498. Node first;
  499. fullyLock();
  500. try {
  501. first = head.next;
  502. head.next = null;
  503. if (count.getAndSet(0) == capacity)
  504. notFull.signalAll();
  505. } finally {
  506. fullyUnlock();
  507. }
  508. // Transfer the elements outside of locks
  509. int n = 0;
  510. for (Node<E> p = first; p != null; p = p.next) {
  511. c.add(p.item);
  512. p.item = null;
  513. ++n;
  514. }
  515. return n;
  516. }
  517. public int drainTo(Collection<? super E> c, int maxElements) {
  518. if (c == null)
  519. throw new NullPointerException();
  520. if (c == this)
  521. throw new IllegalArgumentException();
  522. if (maxElements <= 0)
  523. return 0;
  524. fullyLock();
  525. try {
  526. int n = 0;
  527. Node<E> p = head.next;
  528. while (p != null && n < maxElements) {
  529. c.add(p.item);
  530. p.item = null;
  531. p = p.next;
  532. ++n;
  533. }
  534. if (n != 0) {
  535. head.next = p;
  536. if (count.getAndAdd(-n) == capacity)
  537. notFull.signalAll();
  538. }
  539. return n;
  540. } finally {
  541. fullyUnlock();
  542. }
  543. }
  544. /**
  545. * Returns an iterator over the elements in this queue in proper sequence.
  546. * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
  547. * will never throw {@link java.util.ConcurrentModificationException},
  548. * and guarantees to traverse elements as they existed upon
  549. * construction of the iterator, and may (but is not guaranteed to)
  550. * reflect any modifications subsequent to construction.
  551. *
  552. * @return an iterator over the elements in this queue in proper sequence.
  553. */
  554. public Iterator<E> iterator() {
  555. return new Itr();
  556. }
  557. private class Itr implements Iterator<E> {
  558. /*
  559. * Basic weak-consistent iterator. At all times hold the next
  560. * item to hand out so that if hasNext() reports true, we will
  561. * still have it to return even if lost race with a take etc.
  562. */
  563. private Node<E> current;
  564. private Node<E> lastRet;
  565. private E currentElement;
  566. Itr() {
  567. final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
  568. final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
  569. putLock.lock();
  570. takeLock.lock();
  571. try {
  572. current = head.next;
  573. if (current != null)
  574. currentElement = current.item;
  575. } finally {
  576. takeLock.unlock();
  577. putLock.unlock();
  578. }
  579. }
  580. public boolean hasNext() {
  581. return current != null;
  582. }
  583. public E next() {
  584. final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
  585. final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
  586. putLock.lock();
  587. takeLock.lock();
  588. try {
  589. if (current == null)
  590. throw new NoSuchElementException();
  591. E x = currentElement;
  592. lastRet = current;
  593. current = current.next;
  594. if (current != null)
  595. currentElement = current.item;
  596. return x;
  597. } finally {
  598. takeLock.unlock();
  599. putLock.unlock();
  600. }
  601. }
  602. public void remove() {
  603. if (lastRet == null)
  604. throw new IllegalStateException();
  605. final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
  606. final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
  607. putLock.lock();
  608. takeLock.lock();
  609. try {
  610. Node<E> node = lastRet;
  611. lastRet = null;
  612. Node<E> trail = head;
  613. Node<E> p = head.next;
  614. while (p != null && p != node) {
  615. trail = p;
  616. p = p.next;
  617. }
  618. if (p == node) {
  619. p.item = null;
  620. trail.next = p.next;
  621. int c = count.getAndDecrement();
  622. if (c == capacity)
  623. notFull.signalAll();
  624. }
  625. } finally {
  626. takeLock.unlock();
  627. putLock.unlock();
  628. }
  629. }
  630. }
  631. /**
  632. * Save the state to a stream (that is, serialize it).
  633. *
  634. * @serialData The capacity is emitted (int), followed by all of
  635. * its elements (each an <tt>Object</tt>) in the proper order,
  636. * followed by a null
  637. * @param s the stream
  638. */
  639. private void writeObject(java.io.ObjectOutputStream s)
  640. throws java.io.IOException {
  641. fullyLock();
  642. try {
  643. // Write out any hidden stuff, plus capacity
  644. s.defaultWriteObject();
  645. // Write out all elements in the proper order.
  646. for (Node<E> p = head.next; p != null; p = p.next)
  647. s.writeObject(p.item);
  648. // Use trailing null as sentinel
  649. s.writeObject(null);
  650. } finally {
  651. fullyUnlock();
  652. }
  653. }
  654. /**
  655. * Reconstitute this queue instance from a stream (that is,
  656. * deserialize it).
  657. * @param s the stream
  658. */
  659. private void readObject(java.io.ObjectInputStream s)
  660. throws java.io.IOException, ClassNotFoundException {
  661. // Read in capacity, and any hidden stuff
  662. s.defaultReadObject();
  663. count.set(0);
  664. last = head = new Node<E>(null);
  665. // Read in all elements and place in queue
  666. for (;;) {
  667. E item = (E)s.readObject();
  668. if (item == null)
  669. break;
  670. add(item);
  671. }
  672. }
  673. }