1. /*
  2. * @(#)ConcurrentLinkedQueue.java 1.7 04/06/11
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package java.util.concurrent;
  8. import java.util.*;
  9. import java.util.concurrent.atomic.*;
  10. /**
  11. * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
  12. * This queue orders elements FIFO (first-in-first-out).
  13. * The <em>head</em> of the queue is that element that has been on the
  14. * queue the longest time.
  15. * The <em>tail</em> of the queue is that element that has been on the
  16. * queue the shortest time. New elements
  17. * are inserted at the tail of the queue, and the queue retrieval
  18. * operations obtain elements at the head of the queue.
  19. * A <tt>ConcurrentLinkedQueue</tt> is an appropriate choice when
  20. * many threads will share access to a common collection.
  21. * This queue does not permit <tt>null</tt> elements.
  22. *
  23. * <p>This implementation employs an efficient "wait-free"
  24. * algorithm based on one described in <a
  25. * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
  26. * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
  27. * Algorithms</a> by Maged M. Michael and Michael L. Scott.
  28. *
  29. * <p>Beware that, unlike in most collections, the <tt>size</tt> method
  30. * is <em>NOT</em> a constant-time operation. Because of the
  31. * asynchronous nature of these queues, determining the current number
  32. * of elements requires a traversal of the elements.
  33. *
  34. * <p>This class and its iterator implement all of the
  35. * <em>optional</em> methods of the {@link Collection} and {@link
  36. * Iterator} interfaces.
  37. *
  38. * <p>This class is a member of the
  39. * <a href="{@docRoot}/../guide/collections/index.html">
  40. * Java Collections Framework</a>.
  41. *
  42. * @since 1.5
  43. * @author Doug Lea
  44. * @param <E> the type of elements held in this collection
  45. *
  46. */
  47. public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
  48. implements Queue<E>, java.io.Serializable {
  49. private static final long serialVersionUID = 196745693267521676L;
  50. /*
  51. * This is a straight adaptation of Michael & Scott algorithm.
  52. * For explanation, read the paper. The only (minor) algorithmic
  53. * difference is that this version supports lazy deletion of
  54. * internal nodes (method remove(Object)) -- remove CAS'es item
  55. * fields to null. The normal queue operations unlink but then
  56. * pass over nodes with null item fields. Similarly, iteration
  57. * methods ignore those with nulls.
  58. */
  59. private static class Node<E> {
  60. private volatile E item;
  61. private volatile Node<E> next;
  62. private static final
  63. AtomicReferenceFieldUpdater<Node, Node>
  64. nextUpdater =
  65. AtomicReferenceFieldUpdater.newUpdater
  66. (Node.class, Node.class, "next");
  67. private static final
  68. AtomicReferenceFieldUpdater<Node, Object>
  69. itemUpdater =
  70. AtomicReferenceFieldUpdater.newUpdater
  71. (Node.class, Object.class, "item");
  72. Node(E x) { item = x; }
  73. Node(E x, Node<E> n) { item = x; next = n; }
  74. E getItem() {
  75. return item;
  76. }
  77. boolean casItem(E cmp, E val) {
  78. return itemUpdater.compareAndSet(this, cmp, val);
  79. }
  80. void setItem(E val) {
  81. itemUpdater.set(this, val);
  82. }
  83. Node<E> getNext() {
  84. return next;
  85. }
  86. boolean casNext(Node<E> cmp, Node<E> val) {
  87. return nextUpdater.compareAndSet(this, cmp, val);
  88. }
  89. void setNext(Node<E> val) {
  90. nextUpdater.set(this, val);
  91. }
  92. }
  93. private static final
  94. AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
  95. tailUpdater =
  96. AtomicReferenceFieldUpdater.newUpdater
  97. (ConcurrentLinkedQueue.class, Node.class, "tail");
  98. private static final
  99. AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
  100. headUpdater =
  101. AtomicReferenceFieldUpdater.newUpdater
  102. (ConcurrentLinkedQueue.class, Node.class, "head");
  103. private boolean casTail(Node<E> cmp, Node<E> val) {
  104. return tailUpdater.compareAndSet(this, cmp, val);
  105. }
  106. private boolean casHead(Node<E> cmp, Node<E> val) {
  107. return headUpdater.compareAndSet(this, cmp, val);
  108. }
  109. /**
  110. * Pointer to header node, initialized to a dummy node. The first
  111. * actual node is at head.getNext().
  112. */
  113. private transient volatile Node<E> head = new Node<E>(null, null);
  114. /** Pointer to last node on list **/
  115. private transient volatile Node<E> tail = head;
  116. /**
  117. * Creates a <tt>ConcurrentLinkedQueue</tt> that is initially empty.
  118. */
  119. public ConcurrentLinkedQueue() {}
  120. /**
  121. * Creates a <tt>ConcurrentLinkedQueue</tt>
  122. * initially containing the elements of the given collection,
  123. * added in traversal order of the collection's iterator.
  124. * @param c the collection of elements to initially contain
  125. * @throws NullPointerException if <tt>c</tt> or any element within it
  126. * is <tt>null</tt>
  127. */
  128. public ConcurrentLinkedQueue(Collection<? extends E> c) {
  129. for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
  130. add(it.next());
  131. }
  132. // Have to override just to update the javadoc
  133. /**
  134. * Adds the specified element to the tail of this queue.
  135. * @param o the element to add.
  136. * @return <tt>true</tt> (as per the general contract of
  137. * <tt>Collection.add</tt>).
  138. *
  139. * @throws NullPointerException if the specified element is <tt>null</tt>
  140. */
  141. public boolean add(E o) {
  142. return offer(o);
  143. }
  144. /**
  145. * Inserts the specified element to the tail of this queue.
  146. *
  147. * @param o the element to add.
  148. * @return <tt>true</tt> (as per the general contract of
  149. * <tt>Queue.offer</tt>).
  150. * @throws NullPointerException if the specified element is <tt>null</tt>
  151. */
  152. public boolean offer(E o) {
  153. if (o == null) throw new NullPointerException();
  154. Node<E> n = new Node<E>(o, null);
  155. for(;;) {
  156. Node<E> t = tail;
  157. Node<E> s = t.getNext();
  158. if (t == tail) {
  159. if (s == null) {
  160. if (t.casNext(s, n)) {
  161. casTail(t, n);
  162. return true;
  163. }
  164. } else {
  165. casTail(t, s);
  166. }
  167. }
  168. }
  169. }
  170. public E poll() {
  171. for (;;) {
  172. Node<E> h = head;
  173. Node<E> t = tail;
  174. Node<E> first = h.getNext();
  175. if (h == head) {
  176. if (h == t) {
  177. if (first == null)
  178. return null;
  179. else
  180. casTail(t, first);
  181. } else if (casHead(h, first)) {
  182. E item = first.getItem();
  183. if (item != null) {
  184. first.setItem(null);
  185. return item;
  186. }
  187. // else skip over deleted item, continue loop,
  188. }
  189. }
  190. }
  191. }
  192. public E peek() { // same as poll except don't remove item
  193. for (;;) {
  194. Node<E> h = head;
  195. Node<E> t = tail;
  196. Node<E> first = h.getNext();
  197. if (h == head) {
  198. if (h == t) {
  199. if (first == null)
  200. return null;
  201. else
  202. casTail(t, first);
  203. } else {
  204. E item = first.getItem();
  205. if (item != null)
  206. return item;
  207. else // remove deleted node and continue
  208. casHead(h, first);
  209. }
  210. }
  211. }
  212. }
  213. /**
  214. * Returns the first actual (non-header) node on list. This is yet
  215. * another variant of poll/peek; here returning out the first
  216. * node, not element (so we cannot collapse with peek() without
  217. * introducing race.)
  218. */
  219. Node<E> first() {
  220. for (;;) {
  221. Node<E> h = head;
  222. Node<E> t = tail;
  223. Node<E> first = h.getNext();
  224. if (h == head) {
  225. if (h == t) {
  226. if (first == null)
  227. return null;
  228. else
  229. casTail(t, first);
  230. } else {
  231. if (first.getItem() != null)
  232. return first;
  233. else // remove deleted node and continue
  234. casHead(h, first);
  235. }
  236. }
  237. }
  238. }
  239. public boolean isEmpty() {
  240. return first() == null;
  241. }
  242. /**
  243. * Returns the number of elements in this queue. If this queue
  244. * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
  245. * <tt>Integer.MAX_VALUE</tt>.
  246. *
  247. * <p>Beware that, unlike in most collections, this method is
  248. * <em>NOT</em> a constant-time operation. Because of the
  249. * asynchronous nature of these queues, determining the current
  250. * number of elements requires an O(n) traversal.
  251. *
  252. * @return the number of elements in this queue.
  253. */
  254. public int size() {
  255. int count = 0;
  256. for (Node<E> p = first(); p != null; p = p.getNext()) {
  257. if (p.getItem() != null) {
  258. // Collections.size() spec says to max out
  259. if (++count == Integer.MAX_VALUE)
  260. break;
  261. }
  262. }
  263. return count;
  264. }
  265. public boolean contains(Object o) {
  266. if (o == null) return false;
  267. for (Node<E> p = first(); p != null; p = p.getNext()) {
  268. E item = p.getItem();
  269. if (item != null &&
  270. o.equals(item))
  271. return true;
  272. }
  273. return false;
  274. }
  275. public boolean remove(Object o) {
  276. if (o == null) return false;
  277. for (Node<E> p = first(); p != null; p = p.getNext()) {
  278. E item = p.getItem();
  279. if (item != null &&
  280. o.equals(item) &&
  281. p.casItem(item, null))
  282. return true;
  283. }
  284. return false;
  285. }
  286. public Object[] toArray() {
  287. // Use ArrayList to deal with resizing.
  288. ArrayList<E> al = new ArrayList<E>();
  289. for (Node<E> p = first(); p != null; p = p.getNext()) {
  290. E item = p.getItem();
  291. if (item != null)
  292. al.add(item);
  293. }
  294. return al.toArray();
  295. }
  296. public <T> T[] toArray(T[] a) {
  297. // try to use sent-in array
  298. int k = 0;
  299. Node<E> p;
  300. for (p = first(); p != null && k < a.length; p = p.getNext()) {
  301. E item = p.getItem();
  302. if (item != null)
  303. a[k++] = (T)item;
  304. }
  305. if (p == null) {
  306. if (k < a.length)
  307. a[k] = null;
  308. return a;
  309. }
  310. // If won't fit, use ArrayList version
  311. ArrayList<E> al = new ArrayList<E>();
  312. for (Node<E> q = first(); q != null; q = q.getNext()) {
  313. E item = q.getItem();
  314. if (item != null)
  315. al.add(item);
  316. }
  317. return (T[])al.toArray(a);
  318. }
  319. /**
  320. * Returns an iterator over the elements in this queue in proper sequence.
  321. * The returned iterator is a "weakly consistent" iterator that
  322. * will never throw {@link java.util.ConcurrentModificationException},
  323. * and guarantees to traverse elements as they existed upon
  324. * construction of the iterator, and may (but is not guaranteed to)
  325. * reflect any modifications subsequent to construction.
  326. *
  327. * @return an iterator over the elements in this queue in proper sequence.
  328. */
  329. public Iterator<E> iterator() {
  330. return new Itr();
  331. }
  332. private class Itr implements Iterator<E> {
  333. /**
  334. * Next node to return item for.
  335. */
  336. private Node<E> nextNode;
  337. /**
  338. * nextItem holds on to item fields because once we claim
  339. * that an element exists in hasNext(), we must return it in
  340. * the following next() call even if it was in the process of
  341. * being removed when hasNext() was called.
  342. **/
  343. private E nextItem;
  344. /**
  345. * Node of the last returned item, to support remove.
  346. */
  347. private Node<E> lastRet;
  348. Itr() {
  349. advance();
  350. }
  351. /**
  352. * Moves to next valid node and returns item to return for
  353. * next(), or null if no such.
  354. */
  355. private E advance() {
  356. lastRet = nextNode;
  357. E x = nextItem;
  358. Node<E> p = (nextNode == null)? first() : nextNode.getNext();
  359. for (;;) {
  360. if (p == null) {
  361. nextNode = null;
  362. nextItem = null;
  363. return x;
  364. }
  365. E item = p.getItem();
  366. if (item != null) {
  367. nextNode = p;
  368. nextItem = item;
  369. return x;
  370. } else // skip over nulls
  371. p = p.getNext();
  372. }
  373. }
  374. public boolean hasNext() {
  375. return nextNode != null;
  376. }
  377. public E next() {
  378. if (nextNode == null) throw new NoSuchElementException();
  379. return advance();
  380. }
  381. public void remove() {
  382. Node<E> l = lastRet;
  383. if (l == null) throw new IllegalStateException();
  384. // rely on a future traversal to relink.
  385. l.setItem(null);
  386. lastRet = null;
  387. }
  388. }
  389. /**
  390. * Save the state to a stream (that is, serialize it).
  391. *
  392. * @serialData All of the elements (each an <tt>E</tt>) in
  393. * the proper order, followed by a null
  394. * @param s the stream
  395. */
  396. private void writeObject(java.io.ObjectOutputStream s)
  397. throws java.io.IOException {
  398. // Write out any hidden stuff
  399. s.defaultWriteObject();
  400. // Write out all elements in the proper order.
  401. for (Node<E> p = first(); p != null; p = p.getNext()) {
  402. Object item = p.getItem();
  403. if (item != null)
  404. s.writeObject(item);
  405. }
  406. // Use trailing null as sentinel
  407. s.writeObject(null);
  408. }
  409. /**
  410. * Reconstitute the Queue instance from a stream (that is,
  411. * deserialize it).
  412. * @param s the stream
  413. */
  414. private void readObject(java.io.ObjectInputStream s)
  415. throws java.io.IOException, ClassNotFoundException {
  416. // Read in capacity, and any hidden stuff
  417. s.defaultReadObject();
  418. head = new Node<E>(null, null);
  419. tail = head;
  420. // Read in all elements and place in queue
  421. for (;;) {
  422. E item = (E)s.readObject();
  423. if (item == null)
  424. break;
  425. else
  426. offer(item);
  427. }
  428. }
  429. }