1. /*
  2. * @(#)Exchanger.java 1.5 04/01/12
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package java.util.concurrent;
  8. import java.util.concurrent.locks.*;
  9. /**
  10. * A synchronization point at which two threads can exchange objects.
  11. * Each thread presents some object on entry to the {@link #exchange
  12. * exchange} method, and receives the object presented by the other
  13. * thread on return.
  14. *
  15. * <p><b>Sample Usage:</b>
  16. * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
  17. * swap buffers between threads so that the thread filling the
  18. * buffer gets a freshly
  19. * emptied one when it needs it, handing off the filled one to
  20. * the thread emptying the buffer.
  21. * <pre>
  22. * class FillAndEmpty {
  23. * Exchanger<DataBuffer> exchanger = new Exchanger();
  24. * DataBuffer initialEmptyBuffer = ... a made-up type
  25. * DataBuffer initialFullBuffer = ...
  26. *
  27. * class FillingLoop implements Runnable {
  28. * public void run() {
  29. * DataBuffer currentBuffer = initialEmptyBuffer;
  30. * try {
  31. * while (currentBuffer != null) {
  32. * addToBuffer(currentBuffer);
  33. * if (currentBuffer.full())
  34. * currentBuffer = exchanger.exchange(currentBuffer);
  35. * }
  36. * } catch (InterruptedException ex) { ... handle ... }
  37. * }
  38. * }
  39. *
  40. * class EmptyingLoop implements Runnable {
  41. * public void run() {
  42. * DataBuffer currentBuffer = initialFullBuffer;
  43. * try {
  44. * while (currentBuffer != null) {
  45. * takeFromBuffer(currentBuffer);
  46. * if (currentBuffer.empty())
  47. * currentBuffer = exchanger.exchange(currentBuffer);
  48. * }
  49. * } catch (InterruptedException ex) { ... handle ...}
  50. * }
  51. * }
  52. *
  53. * void start() {
  54. * new Thread(new FillingLoop()).start();
  55. * new Thread(new EmptyingLoop()).start();
  56. * }
  57. * }
  58. * </pre>
  59. *
  60. * @since 1.5
  61. * @author Doug Lea
  62. * @param <V> The type of objects that may be exchanged
  63. */
  64. public class Exchanger<V> {
  65. private final ReentrantLock lock = new ReentrantLock();
  66. private final Condition taken = lock.newCondition();
  67. /** Holder for the item being exchanged */
  68. private V item;
  69. /**
  70. * Arrival count transitions from 0 to 1 to 2 then back to 0
  71. * during an exchange.
  72. */
  73. private int arrivalCount;
  74. /**
  75. * Main exchange function, handling the different policy variants.
  76. */
  77. private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
  78. lock.lock();
  79. try {
  80. V other;
  81. // If arrival count already at two, we must wait for
  82. // a previous pair to finish and reset the count;
  83. while (arrivalCount == 2) {
  84. if (!timed)
  85. taken.await();
  86. else if (nanos > 0)
  87. nanos = taken.awaitNanos(nanos);
  88. else
  89. throw new TimeoutException();
  90. }
  91. int count = ++arrivalCount;
  92. // If item is already waiting, replace it and signal other thread
  93. if (count == 2) {
  94. other = item;
  95. item = x;
  96. taken.signal();
  97. return other;
  98. }
  99. // Otherwise, set item and wait for another thread to
  100. // replace it and signal us.
  101. item = x;
  102. InterruptedException interrupted = null;
  103. try {
  104. while (arrivalCount != 2) {
  105. if (!timed)
  106. taken.await();
  107. else if (nanos > 0)
  108. nanos = taken.awaitNanos(nanos);
  109. else
  110. break; // timed out
  111. }
  112. } catch (InterruptedException ie) {
  113. interrupted = ie;
  114. }
  115. // Get and reset item and count after the wait.
  116. // (We need to do this even if wait was aborted.)
  117. other = item;
  118. item = null;
  119. count = arrivalCount;
  120. arrivalCount = 0;
  121. taken.signal();
  122. // If the other thread replaced item, then we must
  123. // continue even if cancelled.
  124. if (count == 2) {
  125. if (interrupted != null)
  126. Thread.currentThread().interrupt();
  127. return other;
  128. }
  129. // If no one is waiting for us, we can back out
  130. if (interrupted != null)
  131. throw interrupted;
  132. else // must be timeout
  133. throw new TimeoutException();
  134. } finally {
  135. lock.unlock();
  136. }
  137. }
  138. /**
  139. * Create a new Exchanger.
  140. **/
  141. public Exchanger() {
  142. }
  143. /**
  144. * Waits for another thread to arrive at this exchange point (unless
  145. * it is {@link Thread#interrupt interrupted}),
  146. * and then transfers the given object to it, receiving its object
  147. * in return.
  148. * <p>If another thread is already waiting at the exchange point then
  149. * it is resumed for thread scheduling purposes and receives the object
  150. * passed in by the current thread. The current thread returns immediately,
  151. * receiving the object passed to the exchange by that other thread.
  152. * <p>If no other thread is already waiting at the exchange then the
  153. * current thread is disabled for thread scheduling purposes and lies
  154. * dormant until one of two things happens:
  155. * <ul>
  156. * <li>Some other thread enters the exchange; or
  157. * <li>Some other thread {@link Thread#interrupt interrupts} the current
  158. * thread.
  159. * </ul>
  160. * <p>If the current thread:
  161. * <ul>
  162. * <li>has its interrupted status set on entry to this method; or
  163. * <li>is {@link Thread#interrupt interrupted} while waiting
  164. * for the exchange,
  165. * </ul>
  166. * then {@link InterruptedException} is thrown and the current thread's
  167. * interrupted status is cleared.
  168. *
  169. * @param x the object to exchange
  170. * @return the object provided by the other thread.
  171. * @throws InterruptedException if current thread was interrupted
  172. * while waiting
  173. **/
  174. public V exchange(V x) throws InterruptedException {
  175. try {
  176. return doExchange(x, false, 0);
  177. } catch (TimeoutException cannotHappen) {
  178. throw new Error(cannotHappen);
  179. }
  180. }
  181. /**
  182. * Waits for another thread to arrive at this exchange point (unless
  183. * it is {@link Thread#interrupt interrupted}, or the specified waiting
  184. * time elapses),
  185. * and then transfers the given object to it, receiving its object
  186. * in return.
  187. *
  188. * <p>If another thread is already waiting at the exchange point then
  189. * it is resumed for thread scheduling purposes and receives the object
  190. * passed in by the current thread. The current thread returns immediately,
  191. * receiving the object passed to the exchange by that other thread.
  192. *
  193. * <p>If no other thread is already waiting at the exchange then the
  194. * current thread is disabled for thread scheduling purposes and lies
  195. * dormant until one of three things happens:
  196. * <ul>
  197. * <li>Some other thread enters the exchange; or
  198. * <li>Some other thread {@link Thread#interrupt interrupts} the current
  199. * thread; or
  200. * <li>The specified waiting time elapses.
  201. * </ul>
  202. * <p>If the current thread:
  203. * <ul>
  204. * <li>has its interrupted status set on entry to this method; or
  205. * <li>is {@link Thread#interrupt interrupted} while waiting
  206. * for the exchange,
  207. * </ul>
  208. * then {@link InterruptedException} is thrown and the current thread's
  209. * interrupted status is cleared.
  210. *
  211. * <p>If the specified waiting time elapses then {@link TimeoutException}
  212. * is thrown.
  213. * If the time is
  214. * less than or equal to zero, the method will not wait at all.
  215. *
  216. * @param x the object to exchange
  217. * @param timeout the maximum time to wait
  218. * @param unit the time unit of the <tt>timeout</tt> argument.
  219. * @return the object provided by the other thread.
  220. * @throws InterruptedException if current thread was interrupted
  221. * while waiting
  222. * @throws TimeoutException if the specified waiting time elapses before
  223. * another thread enters the exchange.
  224. **/
  225. public V exchange(V x, long timeout, TimeUnit unit)
  226. throws InterruptedException, TimeoutException {
  227. return doExchange(x, true, unit.toNanos(timeout));
  228. }
  229. }