1. /*
  2. * @(#)ExecutorCompletionService.java 1.1 04/02/09
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package java.util.concurrent;
  8. /**
  9. * A {@link CompletionService} that uses a supplied {@link Executor}
  10. * to execute tasks. This class arranges that submitted tasks are,
  11. * upon completion, placed on a queue accessible using <tt>take</tt>.
  12. * The class is lightweight enough to be suitable for transient use
  13. * when processing groups of tasks.
  14. *
  15. * <p>
  16. *
  17. * <b>Usage Examples.</b>
  18. *
  19. * Suppose you have a set of solvers for a certain problem, each
  20. * returning a value of some type <tt>Result</tt>, and would like to
  21. * run them concurrently, processing the results of each of them that
  22. * return a non-null value, in some method <tt>use(Result r)</tt>. You
  23. * could write this as:
  24. *
  25. * <pre>
  26. * void solve(Executor e, Collection<Callable<Result>> solvers)
  27. * throws InterruptedException, ExecutionException {
  28. * CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
  29. * for (Callable<Result> s : solvers)
  30. * ecs.submit(s);
  31. * int n = solvers.size();
  32. * for (int i = 0; i < n; ++i) {
  33. * Result r = ecs.take().get();
  34. * if (r != null)
  35. * use(r);
  36. * }
  37. * }
  38. * </pre>
  39. *
  40. * Suppose instead that you would like to use the first non-null result
  41. * of the set of tasks, ignoring any that encounter exceptions,
  42. * and cancelling all other tasks when the first one is ready:
  43. *
  44. * <pre>
  45. * void solve(Executor e, Collection<Callable<Result>> solvers)
  46. * throws InterruptedException {
  47. * CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
  48. * int n = solvers.size();
  49. * List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
  50. * Result result = null;
  51. * try {
  52. * for (Callable<Result> s : solvers)
  53. * futures.add(ecs.submit(s));
  54. * for (int i = 0; i < n; ++i) {
  55. * try {
  56. * Result r = ecs.take().get();
  57. * if (r != null) {
  58. * result = r;
  59. * break;
  60. * }
  61. * } catch(ExecutionException ignore) {}
  62. * }
  63. * }
  64. * finally {
  65. * for (Future<Result> f : futures)
  66. * f.cancel(true);
  67. * }
  68. *
  69. * if (result != null)
  70. * use(result);
  71. * }
  72. * </pre>
  73. */
  74. public class ExecutorCompletionService<V> implements CompletionService<V> {
  75. private final Executor executor;
  76. private final BlockingQueue<Future<V>> completionQueue;
  77. /**
  78. * FutureTask extension to enqueue upon completion
  79. */
  80. private class QueueingFuture extends FutureTask<V> {
  81. QueueingFuture(Callable<V> c) { super(c); }
  82. QueueingFuture(Runnable t, V r) { super(t, r); }
  83. protected void done() { completionQueue.add(this); }
  84. }
  85. /**
  86. * Creates an ExecutorCompletionService using the supplied
  87. * executor for base task execution and a
  88. * {@link LinkedBlockingQueue} as a completion queue.
  89. * @param executor the executor to use
  90. * @throws NullPointerException if executor is <tt>null</tt>
  91. */
  92. public ExecutorCompletionService(Executor executor) {
  93. if (executor == null)
  94. throw new NullPointerException();
  95. this.executor = executor;
  96. this.completionQueue = new LinkedBlockingQueue<Future<V>>();
  97. }
  98. /**
  99. * Creates an ExecutorCompletionService using the supplied
  100. * executor for base task execution and the supplied queue as its
  101. * completion queue.
  102. * @param executor the executor to use
  103. * @param completionQueue the queue to use as the completion queue
  104. * normally one dedicated for use by this service
  105. * @throws NullPointerException if executor or completionQueue are <tt>null</tt>
  106. */
  107. public ExecutorCompletionService(Executor executor,
  108. BlockingQueue<Future<V>> completionQueue) {
  109. if (executor == null || completionQueue == null)
  110. throw new NullPointerException();
  111. this.executor = executor;
  112. this.completionQueue = completionQueue;
  113. }
  114. public Future<V> submit(Callable<V> task) {
  115. if (task == null) throw new NullPointerException();
  116. QueueingFuture f = new QueueingFuture(task);
  117. executor.execute(f);
  118. return f;
  119. }
  120. public Future<V> submit(Runnable task, V result) {
  121. if (task == null) throw new NullPointerException();
  122. QueueingFuture f = new QueueingFuture(task, result);
  123. executor.execute(f);
  124. return f;
  125. }
  126. public Future<V> take() throws InterruptedException {
  127. return completionQueue.take();
  128. }
  129. public Future<V> poll() {
  130. return completionQueue.poll();
  131. }
  132. public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
  133. return completionQueue.poll(timeout, unit);
  134. }
  135. }