1. /*
  2. * @(#)CyclicBarrier.java 1.7 04/07/13
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package java.util.concurrent;
  8. import java.util.concurrent.locks.*;
  9. /**
  10. * A synchronization aid that allows a set of threads to all wait for
  11. * each other to reach a common barrier point. CyclicBarriers are
  12. * useful in programs involving a fixed sized party of threads that
  13. * must occasionally wait for each other. The barrier is called
  14. * <em>cyclic</em> because it can be re-used after the waiting threads
  15. * are released.
  16. *
  17. * <p>A <tt>CyclicBarrier</tt> supports an optional {@link Runnable} command
  18. * that is run once per barrier point, after the last thread in the party
  19. * arrives, but before any threads are released.
  20. * This <em>barrier action</em> is useful
  21. * for updating shared-state before any of the parties continue.
  22. *
  23. * <p><b>Sample usage:</b> Here is an example of
  24. * using a barrier in a parallel decomposition design:
  25. * <pre>
  26. * class Solver {
  27. * final int N;
  28. * final float[][] data;
  29. * final CyclicBarrier barrier;
  30. *
  31. * class Worker implements Runnable {
  32. * int myRow;
  33. * Worker(int row) { myRow = row; }
  34. * public void run() {
  35. * while (!done()) {
  36. * processRow(myRow);
  37. *
  38. * try {
  39. * barrier.await();
  40. * } catch (InterruptedException ex) {
  41. * return;
  42. * } catch (BrokenBarrierException ex) {
  43. * return;
  44. * }
  45. * }
  46. * }
  47. * }
  48. *
  49. * public Solver(float[][] matrix) {
  50. * data = matrix;
  51. * N = matrix.length;
  52. * barrier = new CyclicBarrier(N,
  53. * new Runnable() {
  54. * public void run() {
  55. * mergeRows(...);
  56. * }
  57. * });
  58. * for (int i = 0; i < N; ++i)
  59. * new Thread(new Worker(i)).start();
  60. *
  61. * waitUntilDone();
  62. * }
  63. * }
  64. * </pre>
  65. * Here, each worker thread processes a row of the matrix then waits at the
  66. * barrier until all rows have been processed. When all rows are processed
  67. * the supplied {@link Runnable} barrier action is executed and merges the
  68. * rows. If the merger
  69. * determines that a solution has been found then <tt>done()</tt> will return
  70. * <tt>true</tt> and each worker will terminate.
  71. *
  72. * <p>If the barrier action does not rely on the parties being suspended when
  73. * it is executed, then any of the threads in the party could execute that
  74. * action when it is released. To facilitate this, each invocation of
  75. * {@link #await} returns the arrival index of that thread at the barrier.
  76. * You can then choose which thread should execute the barrier action, for
  77. * example:
  78. * <pre> if (barrier.await() == 0) {
  79. * // log the completion of this iteration
  80. * }</pre>
  81. *
  82. * <p>The <tt>CyclicBarrier</tt> uses a fast-fail all-or-none breakage
  83. * model for failed synchronization attempts: If a thread leaves a
  84. * barrier point prematurely because of interruption, failure, or
  85. * timeout, all other threads, even those that have not yet resumed
  86. * from a previous {@link #await}, will also leave abnormally via
  87. * {@link BrokenBarrierException} (or <tt>InterruptedException</tt> if
  88. * they too were interrupted at about the same time).
  89. *
  90. * @since 1.5
  91. * @see CountDownLatch
  92. *
  93. * @author Doug Lea
  94. */
  95. public class CyclicBarrier {
  96. /** The lock for guarding barrier entry */
  97. private final ReentrantLock lock = new ReentrantLock();
  98. /** Condition to wait on until tripped */
  99. private final Condition trip = lock.newCondition();
  100. /** The number of parties */
  101. private final int parties;
  102. /* The command to run when tripped */
  103. private final Runnable barrierCommand;
  104. /**
  105. * The generation number. Incremented upon barrier trip.
  106. * Retracted upon reset.
  107. */
  108. private long generation;
  109. /**
  110. * Breakage indicator.
  111. */
  112. private boolean broken;
  113. /**
  114. * Number of parties still waiting. Counts down from parties to 0
  115. * on each cycle.
  116. */
  117. private int count;
  118. /**
  119. * Updates state on barrier trip and wake up everyone.
  120. */
  121. private void nextGeneration() {
  122. count = parties;
  123. ++generation;
  124. trip.signalAll();
  125. }
  126. /**
  127. * Sets barrier as broken and wake up everyone
  128. */
  129. private void breakBarrier() {
  130. broken = true;
  131. trip.signalAll();
  132. }
  133. /**
  134. * Main barrier code, covering the various policies.
  135. */
  136. private int dowait(boolean timed, long nanos)
  137. throws InterruptedException, BrokenBarrierException, TimeoutException {
  138. final ReentrantLock lock = this.lock;
  139. lock.lock();
  140. try {
  141. int index = --count;
  142. long g = generation;
  143. if (broken)
  144. throw new BrokenBarrierException();
  145. if (Thread.interrupted()) {
  146. breakBarrier();
  147. throw new InterruptedException();
  148. }
  149. if (index == 0) { // tripped
  150. nextGeneration();
  151. boolean ranAction = false;
  152. try {
  153. Runnable command = barrierCommand;
  154. if (command != null)
  155. command.run();
  156. ranAction = true;
  157. return 0;
  158. } finally {
  159. if (!ranAction)
  160. breakBarrier();
  161. }
  162. }
  163. for (;;) {
  164. try {
  165. if (!timed)
  166. trip.await();
  167. else if (nanos > 0L)
  168. nanos = trip.awaitNanos(nanos);
  169. } catch (InterruptedException ie) {
  170. breakBarrier();
  171. throw ie;
  172. }
  173. if (broken ||
  174. g > generation) // true if a reset occurred while waiting
  175. throw new BrokenBarrierException();
  176. if (g < generation)
  177. return index;
  178. if (timed && nanos <= 0L) {
  179. breakBarrier();
  180. throw new TimeoutException();
  181. }
  182. }
  183. } finally {
  184. lock.unlock();
  185. }
  186. }
  187. /**
  188. * Creates a new <tt>CyclicBarrier</tt> that will trip when the
  189. * given number of parties (threads) are waiting upon it, and which
  190. * will execute the given barrier action when the barrier is tripped,
  191. * performed by the last thread entering the barrier.
  192. *
  193. * @param parties the number of threads that must invoke {@link #await}
  194. * before the barrier is tripped.
  195. * @param barrierAction the command to execute when the barrier is
  196. * tripped, or <tt>null</tt> if there is no action.
  197. *
  198. * @throws IllegalArgumentException if <tt>parties</tt> is less than 1.
  199. */
  200. public CyclicBarrier(int parties, Runnable barrierAction) {
  201. if (parties <= 0) throw new IllegalArgumentException();
  202. this.parties = parties;
  203. this.count = parties;
  204. this.barrierCommand = barrierAction;
  205. }
  206. /**
  207. * Creates a new <tt>CyclicBarrier</tt> that will trip when the
  208. * given number of parties (threads) are waiting upon it, and
  209. * does not perform a predefined action upon each barrier.
  210. *
  211. * @param parties the number of threads that must invoke {@link #await}
  212. * before the barrier is tripped.
  213. *
  214. * @throws IllegalArgumentException if <tt>parties</tt> is less than 1.
  215. */
  216. public CyclicBarrier(int parties) {
  217. this(parties, null);
  218. }
  219. /**
  220. * Returns the number of parties required to trip this barrier.
  221. * @return the number of parties required to trip this barrier.
  222. **/
  223. public int getParties() {
  224. return parties;
  225. }
  226. /**
  227. * Waits until all {@link #getParties parties} have invoked <tt>await</tt>
  228. * on this barrier.
  229. *
  230. * <p>If the current thread is not the last to arrive then it is
  231. * disabled for thread scheduling purposes and lies dormant until
  232. * one of following things happens:
  233. * <ul>
  234. * <li>The last thread arrives; or
  235. * <li>Some other thread {@link Thread#interrupt interrupts} the current
  236. * thread; or
  237. * <li>Some other thread {@link Thread#interrupt interrupts} one of the
  238. * other waiting threads; or
  239. * <li>Some other thread times out while waiting for barrier; or
  240. * <li>Some other thread invokes {@link #reset} on this barrier.
  241. * </ul>
  242. * <p>If the current thread:
  243. * <ul>
  244. * <li>has its interrupted status set on entry to this method; or
  245. * <li>is {@link Thread#interrupt interrupted} while waiting
  246. * </ul>
  247. * then {@link InterruptedException} is thrown and the current thread's
  248. * interrupted status is cleared.
  249. *
  250. * <p>If the barrier is {@link #reset} while any thread is waiting, or if
  251. * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked,
  252. * or while any thread is waiting,
  253. * then {@link BrokenBarrierException} is thrown.
  254. *
  255. * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
  256. * then all other waiting threads will throw
  257. * {@link BrokenBarrierException} and the barrier is placed in the broken
  258. * state.
  259. *
  260. * <p>If the current thread is the last thread to arrive, and a
  261. * non-null barrier action was supplied in the constructor, then the
  262. * current thread runs the action before allowing the other threads to
  263. * continue.
  264. * If an exception occurs during the barrier action then that exception
  265. * will be propagated in the current thread and the barrier is placed in
  266. * the broken state.
  267. *
  268. * @return the arrival index of the current thread, where index
  269. * <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and
  270. * zero indicates the last to arrive.
  271. *
  272. * @throws InterruptedException if the current thread was interrupted
  273. * while waiting
  274. * @throws BrokenBarrierException if <em>another</em> thread was
  275. * interrupted while the current thread was waiting, or the barrier was
  276. * reset, or the barrier was broken when <tt>await</tt> was called,
  277. * or the barrier action (if present) failed due an exception.
  278. */
  279. public int await() throws InterruptedException, BrokenBarrierException {
  280. try {
  281. return dowait(false, 0L);
  282. } catch (TimeoutException toe) {
  283. throw new Error(toe); // cannot happen;
  284. }
  285. }
  286. /**
  287. * Waits until all {@link #getParties parties} have invoked <tt>await</tt>
  288. * on this barrier.
  289. *
  290. * <p>If the current thread is not the last to arrive then it is
  291. * disabled for thread scheduling purposes and lies dormant until
  292. * one of the following things happens:
  293. * <ul>
  294. * <li>The last thread arrives; or
  295. * <li>The specified timeout elapses; or
  296. * <li>Some other thread {@link Thread#interrupt interrupts} the current
  297. * thread; or
  298. * <li>Some other thread {@link Thread#interrupt interrupts} one of the
  299. * other waiting threads; or
  300. * <li>Some other thread times out while waiting for barrier; or
  301. * <li>Some other thread invokes {@link #reset} on this barrier.
  302. * </ul>
  303. * <p>If the current thread:
  304. * <ul>
  305. * <li>has its interrupted status set on entry to this method; or
  306. * <li>is {@link Thread#interrupt interrupted} while waiting
  307. * </ul>
  308. * then {@link InterruptedException} is thrown and the current thread's
  309. * interrupted status is cleared.
  310. *
  311. * <p>If the specified waiting time elapses then {@link TimeoutException}
  312. * is thrown. If the time is less than or equal to zero, the
  313. * method will not wait at all.
  314. *
  315. * <p>If the barrier is {@link #reset} while any thread is waiting, or if
  316. * the barrier {@link #isBroken is broken} when <tt>await</tt> is invoked,
  317. * or while any thread is waiting,
  318. * then {@link BrokenBarrierException} is thrown.
  319. *
  320. * <p>If any thread is {@link Thread#interrupt interrupted} while waiting,
  321. * then all other waiting threads will throw
  322. * {@link BrokenBarrierException} and the barrier is placed in the broken
  323. * state.
  324. *
  325. * <p>If the current thread is the last thread to arrive, and a
  326. * non-null barrier action was supplied in the constructor, then the
  327. * current thread runs the action before allowing the other threads to
  328. * continue.
  329. * If an exception occurs during the barrier action then that exception
  330. * will be propagated in the current thread and the barrier is placed in
  331. * the broken state.
  332. *
  333. * @param timeout the time to wait for the barrier
  334. * @param unit the time unit of the timeout parameter
  335. * @return the arrival index of the current thread, where index
  336. * <tt>{@link #getParties()} - 1</tt> indicates the first to arrive and
  337. * zero indicates the last to arrive.
  338. *
  339. * @throws InterruptedException if the current thread was interrupted
  340. * while waiting
  341. * @throws TimeoutException if the specified timeout elapses.
  342. * @throws BrokenBarrierException if <em>another</em> thread was
  343. * interrupted while the current thread was waiting, or the barrier was
  344. * reset, or the barrier was broken when <tt>await</tt> was called,
  345. * or the barrier action (if present) failed due an exception.
  346. */
  347. public int await(long timeout, TimeUnit unit)
  348. throws InterruptedException,
  349. BrokenBarrierException,
  350. TimeoutException {
  351. return dowait(true, unit.toNanos(timeout));
  352. }
  353. /**
  354. * Queries if this barrier is in a broken state.
  355. * @return <tt>true</tt> if one or more parties broke out of this
  356. * barrier due to interruption or timeout since construction or
  357. * the last reset, or a barrier action failed due to an exception;
  358. * and <tt>false</tt> otherwise.
  359. */
  360. public boolean isBroken() {
  361. final ReentrantLock lock = this.lock;
  362. lock.lock();
  363. try {
  364. return broken;
  365. } finally {
  366. lock.unlock();
  367. }
  368. }
  369. /**
  370. * Resets the barrier to its initial state. If any parties are
  371. * currently waiting at the barrier, they will return with a
  372. * {@link BrokenBarrierException}. Note that resets <em>after</em>
  373. * a breakage has occurred for other reasons can be complicated to
  374. * carry out; threads need to re-synchronize in some other way,
  375. * and choose one to perform the reset. It may be preferable to
  376. * instead create a new barrier for subsequent use.
  377. */
  378. public void reset() {
  379. final ReentrantLock lock = this.lock;
  380. lock.lock();
  381. try {
  382. /*
  383. * Retract generation number enough to cover threads
  384. * currently waiting on current and still resuming from
  385. * previous generation, plus similarly accommodating spans
  386. * after the reset.
  387. */
  388. generation -= 4;
  389. broken = false;
  390. trip.signalAll();
  391. } finally {
  392. lock.unlock();
  393. }
  394. }
  395. /**
  396. * Returns the number of parties currently waiting at the barrier.
  397. * This method is primarily useful for debugging and assertions.
  398. *
  399. * @return the number of parties currently blocked in {@link #await}
  400. **/
  401. public int getNumberWaiting() {
  402. final ReentrantLock lock = this.lock;
  403. lock.lock();
  404. try {
  405. return parties - count;
  406. } finally {
  407. lock.unlock();
  408. }
  409. }
  410. }