1. /*
  2. * @(#)AbstractExecutorService.java 1.1 04/02/09
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package java.util.concurrent;
  8. import java.util.*;
  9. /**
  10. * Provides default implementation of {@link ExecutorService}
  11. * execution methods. This class implements the <tt>submit</tt>,
  12. * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using the default
  13. * {@link FutureTask} class provided in this package. For example,
  14. * the implementation of <tt>submit(Runnable)</tt> creates an
  15. * associated <tt>FutureTask</tt> that is executed and
  16. * returned. Subclasses overriding these methods to use different
  17. * {@link Future} implementations should do so consistently for each
  18. * of these methods.
  19. *
  20. * @since 1.5
  21. * @author Doug Lea
  22. */
  23. public abstract class AbstractExecutorService implements ExecutorService {
  24. public Future<?> submit(Runnable task) {
  25. if (task == null) throw new NullPointerException();
  26. FutureTask<Object> ftask = new FutureTask<Object>(task, null);
  27. execute(ftask);
  28. return ftask;
  29. }
  30. public <T> Future<T> submit(Runnable task, T result) {
  31. if (task == null) throw new NullPointerException();
  32. FutureTask<T> ftask = new FutureTask<T>(task, result);
  33. execute(ftask);
  34. return ftask;
  35. }
  36. public <T> Future<T> submit(Callable<T> task) {
  37. if (task == null) throw new NullPointerException();
  38. FutureTask<T> ftask = new FutureTask<T>(task);
  39. execute(ftask);
  40. return ftask;
  41. }
  42. /**
  43. * the main mechanics of invokeAny.
  44. */
  45. private <T> T doInvokeAny(Collection<Callable<T>> tasks,
  46. boolean timed, long nanos)
  47. throws InterruptedException, ExecutionException, TimeoutException {
  48. if (tasks == null)
  49. throw new NullPointerException();
  50. int ntasks = tasks.size();
  51. if (ntasks == 0)
  52. throw new IllegalArgumentException();
  53. List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
  54. ExecutorCompletionService<T> ecs =
  55. new ExecutorCompletionService<T>(this);
  56. // For efficiency, especially in executors with limited
  57. // parallelism, check to see if previously submitted tasks are
  58. // done before submitting more of them. This interleaving
  59. // plus the exception mechanics account for messiness of main
  60. // loop.
  61. try {
  62. // Record exceptions so that if we fail to obtain any
  63. // result, we can throw the last exception we got.
  64. ExecutionException ee = null;
  65. long lastTime = (timed)? System.nanoTime() : 0;
  66. Iterator<Callable<T>> it = tasks.iterator();
  67. // Start one task for sure; the rest incrementally
  68. futures.add(ecs.submit(it.next()));
  69. --ntasks;
  70. int active = 1;
  71. for (;;) {
  72. Future<T> f = ecs.poll();
  73. if (f == null) {
  74. if (ntasks > 0) {
  75. --ntasks;
  76. futures.add(ecs.submit(it.next()));
  77. ++active;
  78. }
  79. else if (active == 0)
  80. break;
  81. else if (timed) {
  82. f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
  83. if (f == null)
  84. throw new TimeoutException();
  85. long now = System.nanoTime();
  86. nanos -= now - lastTime;
  87. lastTime = now;
  88. }
  89. else
  90. f = ecs.take();
  91. }
  92. if (f != null) {
  93. --active;
  94. try {
  95. return f.get();
  96. } catch(InterruptedException ie) {
  97. throw ie;
  98. } catch(ExecutionException eex) {
  99. ee = eex;
  100. } catch(RuntimeException rex) {
  101. ee = new ExecutionException(rex);
  102. }
  103. }
  104. }
  105. if (ee == null)
  106. ee = new ExecutionException();
  107. throw ee;
  108. } finally {
  109. for (Future<T> f : futures)
  110. f.cancel(true);
  111. }
  112. }
  113. public <T> T invokeAny(Collection<Callable<T>> tasks)
  114. throws InterruptedException, ExecutionException {
  115. try {
  116. return doInvokeAny(tasks, false, 0);
  117. } catch (TimeoutException cannotHappen) {
  118. assert false;
  119. return null;
  120. }
  121. }
  122. public <T> T invokeAny(Collection<Callable<T>> tasks,
  123. long timeout, TimeUnit unit)
  124. throws InterruptedException, ExecutionException, TimeoutException {
  125. return doInvokeAny(tasks, true, unit.toNanos(timeout));
  126. }
  127. public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
  128. throws InterruptedException {
  129. if (tasks == null)
  130. throw new NullPointerException();
  131. List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  132. boolean done = false;
  133. try {
  134. for (Callable<T> t : tasks) {
  135. FutureTask<T> f = new FutureTask<T>(t);
  136. futures.add(f);
  137. execute(f);
  138. }
  139. for (Future<T> f : futures) {
  140. if (!f.isDone()) {
  141. try {
  142. f.get();
  143. } catch(CancellationException ignore) {
  144. } catch(ExecutionException ignore) {
  145. }
  146. }
  147. }
  148. done = true;
  149. return futures;
  150. } finally {
  151. if (!done)
  152. for (Future<T> f : futures)
  153. f.cancel(true);
  154. }
  155. }
  156. public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
  157. long timeout, TimeUnit unit)
  158. throws InterruptedException {
  159. if (tasks == null || unit == null)
  160. throw new NullPointerException();
  161. long nanos = unit.toNanos(timeout);
  162. List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  163. boolean done = false;
  164. try {
  165. for (Callable<T> t : tasks)
  166. futures.add(new FutureTask<T>(t));
  167. long lastTime = System.nanoTime();
  168. // Interleave time checks and calls to execute in case
  169. // executor doesn't have any/much parallelism.
  170. Iterator<Future<T>> it = futures.iterator();
  171. while (it.hasNext()) {
  172. execute((Runnable)(it.next()));
  173. long now = System.nanoTime();
  174. nanos -= now - lastTime;
  175. lastTime = now;
  176. if (nanos <= 0)
  177. return futures;
  178. }
  179. for (Future<T> f : futures) {
  180. if (!f.isDone()) {
  181. if (nanos <= 0)
  182. return futures;
  183. try {
  184. f.get(nanos, TimeUnit.NANOSECONDS);
  185. } catch(CancellationException ignore) {
  186. } catch(ExecutionException ignore) {
  187. } catch(TimeoutException toe) {
  188. return futures;
  189. }
  190. long now = System.nanoTime();
  191. nanos -= now - lastTime;
  192. lastTime = now;
  193. }
  194. }
  195. done = true;
  196. return futures;
  197. } finally {
  198. if (!done)
  199. for (Future<T> f : futures)
  200. f.cancel(true);
  201. }
  202. }
  203. }