1. /*
  2. * @(#)SynchronousQueue.java 1.8 04/06/11
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package java.util.concurrent;
  8. import java.util.concurrent.locks.*;
  9. import java.util.*;
  10. /**
  11. * A {@linkplain BlockingQueue blocking queue} in which each
  12. * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
  13. * synchronous queue does not have any internal capacity, not even a
  14. * capacity of one. You cannot <tt>peek</tt> at a synchronous queue
  15. * because an element is only present when you try to take it; you
  16. * cannot add an element (using any method) unless another thread is
  17. * trying to remove it; you cannot iterate as there is nothing to
  18. * iterate. The <em>head</em> of the queue is the element that the
  19. * first queued thread is trying to add to the queue; if there are no
  20. * queued threads then no element is being added and the head is
  21. * <tt>null</tt>. For purposes of other <tt>Collection</tt> methods
  22. * (for example <tt>contains</tt>), a <tt>SynchronousQueue</tt> acts
  23. * as an empty collection. This queue does not permit <tt>null</tt>
  24. * elements.
  25. *
  26. * <p>Synchronous queues are similar to rendezvous channels used in
  27. * CSP and Ada. They are well suited for handoff designs, in which an
  28. * object running in one thread must sync up with an object running
  29. * in another thread in order to hand it some information, event, or
  30. * task.
  31. *
  32. * <p> This class supports an optional fairness policy for ordering
  33. * waiting producer and consumer threads. By default, this ordering
  34. * is not guaranteed. However, a queue constructed with fairness set
  35. * to <tt>true</tt> grants threads access in FIFO order. Fairness
  36. * generally decreases throughput but reduces variability and avoids
  37. * starvation.
  38. *
  39. * <p>This class and its iterator implement all of the
  40. * <em>optional</em> methods of the {@link Collection} and {@link
  41. * Iterator} interfaces.
  42. *
  43. * <p>This class is a member of the
  44. * <a href="{@docRoot}/../guide/collections/index.html">
  45. * Java Collections Framework</a>.
  46. *
  47. * @since 1.5
  48. * @author Doug Lea
  49. * @param <E> the type of elements held in this collection
  50. */
  51. public class SynchronousQueue<E> extends AbstractQueue<E>
  52. implements BlockingQueue<E>, java.io.Serializable {
  53. private static final long serialVersionUID = -3223113410248163686L;
  54. /*
  55. This implementation divides actions into two cases for puts:
  56. * An arriving producer that does not already have a waiting consumer
  57. creates a node holding item, and then waits for a consumer to take it.
  58. * An arriving producer that does already have a waiting consumer fills
  59. the slot node created by the consumer, and notifies it to continue.
  60. And symmetrically, two for takes:
  61. * An arriving consumer that does not already have a waiting producer
  62. creates an empty slot node, and then waits for a producer to fill it.
  63. * An arriving consumer that does already have a waiting producer takes
  64. item from the node created by the producer, and notifies it to continue.
  65. When a put or take waiting for the actions of its counterpart
  66. aborts due to interruption or timeout, it marks the node
  67. it created as "CANCELLED", which causes its counterpart to retry
  68. the entire put or take sequence.
  69. This requires keeping two simple queues, waitingProducers and
  70. waitingConsumers. Each of these can be FIFO (preserves fairness)
  71. or LIFO (improves throughput).
  72. */
  73. /** Lock protecting both wait queues */
  74. private final ReentrantLock qlock;
  75. /** Queue holding waiting puts */
  76. private final WaitQueue waitingProducers;
  77. /** Queue holding waiting takes */
  78. private final WaitQueue waitingConsumers;
  79. /**
  80. * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
  81. */
  82. public SynchronousQueue() {
  83. this(false);
  84. }
  85. /**
  86. * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
  87. * @param fair if true, threads contend in FIFO order for access;
  88. * otherwise the order is unspecified.
  89. */
  90. public SynchronousQueue(boolean fair) {
  91. if (fair) {
  92. qlock = new ReentrantLock(true);
  93. waitingProducers = new FifoWaitQueue();
  94. waitingConsumers = new FifoWaitQueue();
  95. }
  96. else {
  97. qlock = new ReentrantLock();
  98. waitingProducers = new LifoWaitQueue();
  99. waitingConsumers = new LifoWaitQueue();
  100. }
  101. }
  102. /**
  103. * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
  104. * These queues have all transient fields, but are serializable
  105. * in order to recover fairness settings when deserialized.
  106. */
  107. static abstract class WaitQueue implements java.io.Serializable {
  108. /** Create, add, and return node for x */
  109. abstract Node enq(Object x);
  110. /** Remove and return node, or null if empty */
  111. abstract Node deq();
  112. }
  113. /**
  114. * FIFO queue to hold waiting puts/takes.
  115. */
  116. static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
  117. private static final long serialVersionUID = -3623113410248163686L;
  118. private transient Node head;
  119. private transient Node last;
  120. Node enq(Object x) {
  121. Node p = new Node(x);
  122. if (last == null)
  123. last = head = p;
  124. else
  125. last = last.next = p;
  126. return p;
  127. }
  128. Node deq() {
  129. Node p = head;
  130. if (p != null) {
  131. if ((head = p.next) == null)
  132. last = null;
  133. p.next = null;
  134. }
  135. return p;
  136. }
  137. }
  138. /**
  139. * LIFO queue to hold waiting puts/takes.
  140. */
  141. static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
  142. private static final long serialVersionUID = -3633113410248163686L;
  143. private transient Node head;
  144. Node enq(Object x) {
  145. return head = new Node(x, head);
  146. }
  147. Node deq() {
  148. Node p = head;
  149. if (p != null) {
  150. head = p.next;
  151. p.next = null;
  152. }
  153. return p;
  154. }
  155. }
  156. /**
  157. * Nodes each maintain an item and handle waits and signals for
  158. * getting and setting it. The class extends
  159. * AbstractQueuedSynchronizer to manage blocking, using AQS state
  160. * 0 for waiting, 1 for ack, -1 for cancelled.
  161. */
  162. static final class Node extends AbstractQueuedSynchronizer {
  163. /** Synchronization state value representing that node acked */
  164. private static final int ACK = 1;
  165. /** Synchronization state value representing that node cancelled */
  166. private static final int CANCEL = -1;
  167. /** The item being transferred */
  168. Object item;
  169. /** Next node in wait queue */
  170. Node next;
  171. /** Creates a node with initial item */
  172. Node(Object x) { item = x; }
  173. /** Creates a node with initial item and next */
  174. Node(Object x, Node n) { item = x; next = n; }
  175. /**
  176. * Implements AQS base acquire to succeed if not in WAITING state
  177. */
  178. protected boolean tryAcquire(int ignore) {
  179. return getState() != 0;
  180. }
  181. /**
  182. * Implements AQS base release to signal if state changed
  183. */
  184. protected boolean tryRelease(int newState) {
  185. return compareAndSetState(0, newState);
  186. }
  187. /**
  188. * Takes item and nulls out field (for sake of GC)
  189. */
  190. private Object extract() {
  191. Object x = item;
  192. item = null;
  193. return x;
  194. }
  195. /**
  196. * Tries to cancel on interrupt; if so rethrowing,
  197. * else setting interrupt state
  198. */
  199. private void checkCancellationOnInterrupt(InterruptedException ie)
  200. throws InterruptedException {
  201. if (release(CANCEL))
  202. throw ie;
  203. Thread.currentThread().interrupt();
  204. }
  205. /**
  206. * Fills in the slot created by the consumer and signal consumer to
  207. * continue.
  208. */
  209. boolean setItem(Object x) {
  210. item = x; // can place in slot even if cancelled
  211. return release(ACK);
  212. }
  213. /**
  214. * Removes item from slot created by producer and signal producer
  215. * to continue.
  216. */
  217. Object getItem() {
  218. return (release(ACK))? extract() : null;
  219. }
  220. /**
  221. * Waits for a consumer to take item placed by producer.
  222. */
  223. void waitForTake() throws InterruptedException {
  224. try {
  225. acquireInterruptibly(0);
  226. } catch (InterruptedException ie) {
  227. checkCancellationOnInterrupt(ie);
  228. }
  229. }
  230. /**
  231. * Waits for a producer to put item placed by consumer.
  232. */
  233. Object waitForPut() throws InterruptedException {
  234. try {
  235. acquireInterruptibly(0);
  236. } catch (InterruptedException ie) {
  237. checkCancellationOnInterrupt(ie);
  238. }
  239. return extract();
  240. }
  241. /**
  242. * Waits for a consumer to take item placed by producer or time out.
  243. */
  244. boolean waitForTake(long nanos) throws InterruptedException {
  245. try {
  246. if (!tryAcquireNanos(0, nanos) &&
  247. release(CANCEL))
  248. return false;
  249. } catch (InterruptedException ie) {
  250. checkCancellationOnInterrupt(ie);
  251. }
  252. return true;
  253. }
  254. /**
  255. * Waits for a producer to put item placed by consumer, or time out.
  256. */
  257. Object waitForPut(long nanos) throws InterruptedException {
  258. try {
  259. if (!tryAcquireNanos(0, nanos) &&
  260. release(CANCEL))
  261. return null;
  262. } catch (InterruptedException ie) {
  263. checkCancellationOnInterrupt(ie);
  264. }
  265. return extract();
  266. }
  267. }
  268. /**
  269. * Adds the specified element to this queue, waiting if necessary for
  270. * another thread to receive it.
  271. * @param o the element to add
  272. * @throws InterruptedException if interrupted while waiting.
  273. * @throws NullPointerException if the specified element is <tt>null</tt>.
  274. */
  275. public void put(E o) throws InterruptedException {
  276. if (o == null) throw new NullPointerException();
  277. final ReentrantLock qlock = this.qlock;
  278. for (;;) {
  279. Node node;
  280. boolean mustWait;
  281. if (Thread.interrupted()) throw new InterruptedException();
  282. qlock.lock();
  283. try {
  284. node = waitingConsumers.deq();
  285. if ( (mustWait = (node == null)) )
  286. node = waitingProducers.enq(o);
  287. } finally {
  288. qlock.unlock();
  289. }
  290. if (mustWait) {
  291. node.waitForTake();
  292. return;
  293. }
  294. else if (node.setItem(o))
  295. return;
  296. // else consumer cancelled, so retry
  297. }
  298. }
  299. /**
  300. * Inserts the specified element into this queue, waiting if necessary
  301. * up to the specified wait time for another thread to receive it.
  302. * @param o the element to add
  303. * @param timeout how long to wait before giving up, in units of
  304. * <tt>unit</tt>
  305. * @param unit a <tt>TimeUnit</tt> determining how to interpret the
  306. * <tt>timeout</tt> parameter
  307. * @return <tt>true</tt> if successful, or <tt>false</tt> if
  308. * the specified waiting time elapses before a consumer appears.
  309. * @throws InterruptedException if interrupted while waiting.
  310. * @throws NullPointerException if the specified element is <tt>null</tt>.
  311. */
  312. public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
  313. if (o == null) throw new NullPointerException();
  314. long nanos = unit.toNanos(timeout);
  315. final ReentrantLock qlock = this.qlock;
  316. for (;;) {
  317. Node node;
  318. boolean mustWait;
  319. if (Thread.interrupted()) throw new InterruptedException();
  320. qlock.lock();
  321. try {
  322. node = waitingConsumers.deq();
  323. if ( (mustWait = (node == null)) )
  324. node = waitingProducers.enq(o);
  325. } finally {
  326. qlock.unlock();
  327. }
  328. if (mustWait)
  329. return node.waitForTake(nanos);
  330. else if (node.setItem(o))
  331. return true;
  332. // else consumer cancelled, so retry
  333. }
  334. }
  335. /**
  336. * Retrieves and removes the head of this queue, waiting if necessary
  337. * for another thread to insert it.
  338. * @throws InterruptedException if interrupted while waiting.
  339. * @return the head of this queue
  340. */
  341. public E take() throws InterruptedException {
  342. final ReentrantLock qlock = this.qlock;
  343. for (;;) {
  344. Node node;
  345. boolean mustWait;
  346. if (Thread.interrupted()) throw new InterruptedException();
  347. qlock.lock();
  348. try {
  349. node = waitingProducers.deq();
  350. if ( (mustWait = (node == null)) )
  351. node = waitingConsumers.enq(null);
  352. } finally {
  353. qlock.unlock();
  354. }
  355. if (mustWait) {
  356. Object x = node.waitForPut();
  357. return (E)x;
  358. }
  359. else {
  360. Object x = node.getItem();
  361. if (x != null)
  362. return (E)x;
  363. // else cancelled, so retry
  364. }
  365. }
  366. }
  367. /**
  368. * Retrieves and removes the head of this queue, waiting
  369. * if necessary up to the specified wait time, for another thread
  370. * to insert it.
  371. * @param timeout how long to wait before giving up, in units of
  372. * <tt>unit</tt>
  373. * @param unit a <tt>TimeUnit</tt> determining how to interpret the
  374. * <tt>timeout</tt> parameter
  375. * @return the head of this queue, or <tt>null</tt> if the
  376. * specified waiting time elapses before an element is present.
  377. * @throws InterruptedException if interrupted while waiting.
  378. */
  379. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  380. long nanos = unit.toNanos(timeout);
  381. final ReentrantLock qlock = this.qlock;
  382. for (;;) {
  383. Node node;
  384. boolean mustWait;
  385. if (Thread.interrupted()) throw new InterruptedException();
  386. qlock.lock();
  387. try {
  388. node = waitingProducers.deq();
  389. if ( (mustWait = (node == null)) )
  390. node = waitingConsumers.enq(null);
  391. } finally {
  392. qlock.unlock();
  393. }
  394. if (mustWait) {
  395. Object x = node.waitForPut(nanos);
  396. return (E)x;
  397. }
  398. else {
  399. Object x = node.getItem();
  400. if (x != null)
  401. return (E)x;
  402. // else cancelled, so retry
  403. }
  404. }
  405. }
  406. // Untimed nonblocking versions
  407. /**
  408. * Inserts the specified element into this queue, if another thread is
  409. * waiting to receive it.
  410. *
  411. * @param o the element to add.
  412. * @return <tt>true</tt> if it was possible to add the element to
  413. * this queue, else <tt>false</tt>
  414. * @throws NullPointerException if the specified element is <tt>null</tt>
  415. */
  416. public boolean offer(E o) {
  417. if (o == null) throw new NullPointerException();
  418. final ReentrantLock qlock = this.qlock;
  419. for (;;) {
  420. Node node;
  421. qlock.lock();
  422. try {
  423. node = waitingConsumers.deq();
  424. } finally {
  425. qlock.unlock();
  426. }
  427. if (node == null)
  428. return false;
  429. else if (node.setItem(o))
  430. return true;
  431. // else retry
  432. }
  433. }
  434. /**
  435. * Retrieves and removes the head of this queue, if another thread
  436. * is currently making an element available.
  437. *
  438. * @return the head of this queue, or <tt>null</tt> if no
  439. * element is available.
  440. */
  441. public E poll() {
  442. final ReentrantLock qlock = this.qlock;
  443. for (;;) {
  444. Node node;
  445. qlock.lock();
  446. try {
  447. node = waitingProducers.deq();
  448. } finally {
  449. qlock.unlock();
  450. }
  451. if (node == null)
  452. return null;
  453. else {
  454. Object x = node.getItem();
  455. if (x != null)
  456. return (E)x;
  457. // else retry
  458. }
  459. }
  460. }
  461. /**
  462. * Always returns <tt>true</tt>.
  463. * A <tt>SynchronousQueue</tt> has no internal capacity.
  464. * @return <tt>true</tt>
  465. */
  466. public boolean isEmpty() {
  467. return true;
  468. }
  469. /**
  470. * Always returns zero.
  471. * A <tt>SynchronousQueue</tt> has no internal capacity.
  472. * @return zero.
  473. */
  474. public int size() {
  475. return 0;
  476. }
  477. /**
  478. * Always returns zero.
  479. * A <tt>SynchronousQueue</tt> has no internal capacity.
  480. * @return zero.
  481. */
  482. public int remainingCapacity() {
  483. return 0;
  484. }
  485. /**
  486. * Does nothing.
  487. * A <tt>SynchronousQueue</tt> has no internal capacity.
  488. */
  489. public void clear() {}
  490. /**
  491. * Always returns <tt>false</tt>.
  492. * A <tt>SynchronousQueue</tt> has no internal capacity.
  493. * @param o the element
  494. * @return <tt>false</tt>
  495. */
  496. public boolean contains(Object o) {
  497. return false;
  498. }
  499. /**
  500. * Always returns <tt>false</tt>.
  501. * A <tt>SynchronousQueue</tt> has no internal capacity.
  502. *
  503. * @param o the element to remove
  504. * @return <tt>false</tt>
  505. */
  506. public boolean remove(Object o) {
  507. return false;
  508. }
  509. /**
  510. * Returns <tt>false</tt> unless given collection is empty.
  511. * A <tt>SynchronousQueue</tt> has no internal capacity.
  512. * @param c the collection
  513. * @return <tt>false</tt> unless given collection is empty
  514. */
  515. public boolean containsAll(Collection<?> c) {
  516. return c.isEmpty();
  517. }
  518. /**
  519. * Always returns <tt>false</tt>.
  520. * A <tt>SynchronousQueue</tt> has no internal capacity.
  521. * @param c the collection
  522. * @return <tt>false</tt>
  523. */
  524. public boolean removeAll(Collection<?> c) {
  525. return false;
  526. }
  527. /**
  528. * Always returns <tt>false</tt>.
  529. * A <tt>SynchronousQueue</tt> has no internal capacity.
  530. * @param c the collection
  531. * @return <tt>false</tt>
  532. */
  533. public boolean retainAll(Collection<?> c) {
  534. return false;
  535. }
  536. /**
  537. * Always returns <tt>null</tt>.
  538. * A <tt>SynchronousQueue</tt> does not return elements
  539. * unless actively waited on.
  540. * @return <tt>null</tt>
  541. */
  542. public E peek() {
  543. return null;
  544. }
  545. static class EmptyIterator<E> implements Iterator<E> {
  546. public boolean hasNext() {
  547. return false;
  548. }
  549. public E next() {
  550. throw new NoSuchElementException();
  551. }
  552. public void remove() {
  553. throw new IllegalStateException();
  554. }
  555. }
  556. /**
  557. * Returns an empty iterator in which <tt>hasNext</tt> always returns
  558. * <tt>false</tt>.
  559. *
  560. * @return an empty iterator
  561. */
  562. public Iterator<E> iterator() {
  563. return new EmptyIterator<E>();
  564. }
  565. /**
  566. * Returns a zero-length array.
  567. * @return a zero-length array
  568. */
  569. public Object[] toArray() {
  570. return new Object[0];
  571. }
  572. /**
  573. * Sets the zeroeth element of the specified array to <tt>null</tt>
  574. * (if the array has non-zero length) and returns it.
  575. * @param a the array
  576. * @return the specified array
  577. */
  578. public <T> T[] toArray(T[] a) {
  579. if (a.length > 0)
  580. a[0] = null;
  581. return a;
  582. }
  583. public int drainTo(Collection<? super E> c) {
  584. if (c == null)
  585. throw new NullPointerException();
  586. if (c == this)
  587. throw new IllegalArgumentException();
  588. int n = 0;
  589. E e;
  590. while ( (e = poll()) != null) {
  591. c.add(e);
  592. ++n;
  593. }
  594. return n;
  595. }
  596. public int drainTo(Collection<? super E> c, int maxElements) {
  597. if (c == null)
  598. throw new NullPointerException();
  599. if (c == this)
  600. throw new IllegalArgumentException();
  601. int n = 0;
  602. E e;
  603. while (n < maxElements && (e = poll()) != null) {
  604. c.add(e);
  605. ++n;
  606. }
  607. return n;
  608. }
  609. }