1. /*
  2. * Copyright 2001-2004 The Apache Software Foundation
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. */
  17. package org.apache.tools.ant.taskdefs;
  18. import java.lang.reflect.Method;
  19. import java.util.Enumeration;
  20. import java.util.Vector;
  21. import java.util.List;
  22. import java.util.ArrayList;
  23. import org.apache.tools.ant.BuildException;
  24. import org.apache.tools.ant.Location;
  25. import org.apache.tools.ant.Task;
  26. import org.apache.tools.ant.TaskContainer;
  27. import org.apache.tools.ant.util.StringUtils;
  28. /**
  29. * Executes the contained tasks in separate threads, continuing
  30. * once all are completed.
  31. * <p>
  32. * New behavior allows for the ant script to specify a maximum number of
  33. * threads that will be executed in parallel. One should be very careful about
  34. * using the <code>waitFor</code> task when specifying <code>threadCount</code>
  35. * as it can cause deadlocks if the number of threads is too small or if one of
  36. * the nested tasks fails to execute completely. The task selection algorithm
  37. * will insure that the tasks listed before a task have started before that
  38. * task is started, but it will not insure a successful completion of those
  39. * tasks or that those tasks will finish first (i.e. it's a classic race
  40. * condition).
  41. * </p>
  42. * @since Ant 1.4
  43. *
  44. * @ant.task category="control"
  45. */
  46. public class Parallel extends Task
  47. implements TaskContainer {
  48. /** Class which holds a list of tasks to execute */
  49. public static class TaskList implements TaskContainer {
  50. /** Collection holding the nested tasks */
  51. private List tasks = new ArrayList();
  52. /**
  53. * Add a nested task to execute parallel (asynchron).
  54. * <p>
  55. * @param nestedTask Nested task to be executed in parallel
  56. */
  57. public void addTask(Task nestedTask) throws BuildException {
  58. tasks.add(nestedTask);
  59. }
  60. }
  61. /** Collection holding the nested tasks */
  62. private Vector nestedTasks = new Vector();
  63. /** Semaphore to notify of completed threads */
  64. private final Object semaphore = new Object();
  65. /** Total number of threads to run */
  66. private int numThreads = 0;
  67. /** Total number of threads per processor to run. */
  68. private int numThreadsPerProcessor = 0;
  69. /** The timeout period in milliseconds */
  70. private long timeout;
  71. /** Indicates threads are still running and new threads can be issued */
  72. private volatile boolean stillRunning;
  73. /** Indicates that the execution timedout */
  74. private boolean timedOut;
  75. /**
  76. * Indicates whether failure of any of the nested tasks should end
  77. * execution
  78. */
  79. private boolean failOnAny;
  80. /** The dameon task list if any */
  81. private TaskList daemonTasks;
  82. /** Accumulation of exceptions messages from all nested tasks */
  83. private StringBuffer exceptionMessage;
  84. /** Number of exceptions from nested tasks */
  85. private int numExceptions = 0;
  86. /** The first exception encountered */
  87. private Throwable firstException;
  88. /** The location of the first exception */
  89. private Location firstLocation;
  90. /**
  91. * Add a group of daemon threads
  92. */
  93. public void addDaemons(TaskList daemonTasks) {
  94. if (this.daemonTasks != null) {
  95. throw new BuildException("Only one daemon group is supported");
  96. }
  97. this.daemonTasks = daemonTasks;
  98. }
  99. /**
  100. * Interval to poll for completed threads when threadCount or
  101. * threadsPerProcessor is specified. Integer in milliseconds.; optional
  102. *
  103. * @param pollInterval New value of property pollInterval.
  104. */
  105. public void setPollInterval(int pollInterval) {
  106. }
  107. /**
  108. * Control whether a failure in a nested task halts execution. Note that
  109. * the task will complete but existing threads will continue to run - they
  110. * are not stopped
  111. *
  112. * @param failOnAny if true any nested task failure causes parallel to
  113. * complete.
  114. */
  115. public void setFailOnAny(boolean failOnAny) {
  116. this.failOnAny = failOnAny;
  117. }
  118. /**
  119. * Add a nested task to execute in parallel.
  120. * @param nestedTask Nested task to be executed in parallel
  121. */
  122. public void addTask(Task nestedTask) {
  123. nestedTasks.addElement(nestedTask);
  124. }
  125. /**
  126. * Dynamically generates the number of threads to execute based on the
  127. * number of available processors (via
  128. * <code>java.lang.Runtime.availableProcessors()</code>). Requires a J2SE
  129. * 1.4 VM, and it will overwrite the value set in threadCount.
  130. * If used in a 1.1, 1.2, or 1.3 VM then the task will defer to
  131. * <code>threadCount</code>.; optional
  132. * @param numThreadsPerProcessor Number of threads to create per available
  133. * processor.
  134. *
  135. */
  136. public void setThreadsPerProcessor(int numThreadsPerProcessor) {
  137. this.numThreadsPerProcessor = numThreadsPerProcessor;
  138. }
  139. /**
  140. * Statically determine the maximum number of tasks to execute
  141. * simultaneously. If there are less tasks than threads then all will be
  142. * executed at once, if there are more then only <code>threadCount</code>
  143. * tasks will be executed at one time. If <code>threadsPerProcessor</code>
  144. * is set and the JVM is at least a 1.4 VM then this value is
  145. * ignored.; optional
  146. *
  147. * @param numThreads total number of threads.
  148. *
  149. */
  150. public void setThreadCount(int numThreads) {
  151. this.numThreads = numThreads;
  152. }
  153. /**
  154. * Sets the timeout on this set of tasks. If the timeout is reached
  155. * before the other threads complete, the execution of this
  156. * task completes with an exception.
  157. *
  158. * Note that existing threads continue to run.
  159. *
  160. * @param timeout timeout in milliseconds.
  161. */
  162. public void setTimeout(long timeout) {
  163. this.timeout = timeout;
  164. }
  165. /**
  166. * Execute the parallel tasks
  167. *
  168. * @exception BuildException if any of the threads failed.
  169. */
  170. public void execute() throws BuildException {
  171. updateThreadCounts();
  172. if (numThreads == 0) {
  173. numThreads = nestedTasks.size();
  174. }
  175. spinThreads();
  176. }
  177. /**
  178. * Determine the number of threads based on the number of processors
  179. */
  180. private void updateThreadCounts() {
  181. if (numThreadsPerProcessor != 0) {
  182. int numProcessors = getNumProcessors();
  183. if (numProcessors != 0) {
  184. numThreads = numProcessors * numThreadsPerProcessor;
  185. }
  186. }
  187. }
  188. private void processExceptions(TaskRunnable[] runnables) {
  189. if (runnables == null) {
  190. return;
  191. }
  192. for (int i = 0; i < runnables.length; ++i) {
  193. Throwable t = runnables[i].getException();
  194. if (t != null) {
  195. numExceptions++;
  196. if (firstException == null) {
  197. firstException = t;
  198. }
  199. if (t instanceof BuildException
  200. && firstLocation == Location.UNKNOWN_LOCATION) {
  201. firstLocation = ((BuildException) t).getLocation();
  202. }
  203. exceptionMessage.append(StringUtils.LINE_SEP);
  204. exceptionMessage.append(t.getMessage());
  205. }
  206. }
  207. }
  208. /**
  209. * Spin up required threads with a maximum number active at any given time.
  210. *
  211. * @exception BuildException if any of the threads failed.
  212. */
  213. private void spinThreads() throws BuildException {
  214. final int numTasks = nestedTasks.size();
  215. TaskRunnable[] runnables = new TaskRunnable[numTasks];
  216. stillRunning = true;
  217. timedOut = false;
  218. int threadNumber = 0;
  219. for (Enumeration e = nestedTasks.elements(); e.hasMoreElements();
  220. threadNumber++) {
  221. Task nestedTask = (Task) e.nextElement();
  222. runnables[threadNumber]
  223. = new TaskRunnable(nestedTask);
  224. }
  225. final int maxRunning = numTasks < numThreads ? numTasks : numThreads;
  226. TaskRunnable[] running = new TaskRunnable[maxRunning];
  227. threadNumber = 0;
  228. ThreadGroup group = new ThreadGroup("parallel");
  229. TaskRunnable[] daemons = null;
  230. if (daemonTasks != null && daemonTasks.tasks.size() != 0) {
  231. daemons = new TaskRunnable[daemonTasks.tasks.size()];
  232. }
  233. synchronized (semaphore) {
  234. // start any daemon threads
  235. if (daemons != null) {
  236. for (int i = 0; i < daemons.length; ++i) {
  237. daemons[i] = new TaskRunnable((Task) daemonTasks.tasks.get(i));
  238. Thread daemonThread = new Thread(group, daemons[i]);
  239. daemonThread.setDaemon(true);
  240. daemonThread.start();
  241. }
  242. }
  243. // now run main threads in limited numbers...
  244. // start initial batch of threads
  245. for (int i = 0; i < maxRunning; ++i) {
  246. running[i] = runnables[threadNumber++];
  247. Thread thread = new Thread(group, running[i]);
  248. thread.start();
  249. }
  250. if (timeout != 0) {
  251. // start the timeout thread
  252. Thread timeoutThread = new Thread() {
  253. public synchronized void run() {
  254. try {
  255. wait(timeout);
  256. synchronized (semaphore) {
  257. stillRunning = false;
  258. timedOut = true;
  259. semaphore.notifyAll();
  260. }
  261. } catch (InterruptedException e) {
  262. // ignore
  263. }
  264. }
  265. };
  266. timeoutThread.start();
  267. }
  268. // now find available running slots for the remaining threads
  269. outer:
  270. while (threadNumber < numTasks && stillRunning) {
  271. for (int i = 0; i < maxRunning; i++) {
  272. if (running[i] == null || running[i].finished) {
  273. running[i] = runnables[threadNumber++];
  274. Thread thread = new Thread(group, running[i]);
  275. thread.start();
  276. // continue on outer while loop to get another
  277. // available slot
  278. continue outer;
  279. }
  280. }
  281. // if we got here all slots in use, so sleep until
  282. // something happens
  283. try {
  284. semaphore.wait();
  285. } catch (InterruptedException ie) {
  286. // doesn't java know interruptions are rude?
  287. // just pretend it didn't happen and go about out business.
  288. // sheesh!
  289. }
  290. }
  291. // are all threads finished
  292. outer2:
  293. while (stillRunning) {
  294. for (int i = 0; i < maxRunning; ++i) {
  295. if (running[i] != null && !running[i].finished) {
  296. //System.out.println("Thread " + i + " is still alive ");
  297. // still running - wait for it
  298. try {
  299. semaphore.wait();
  300. } catch (InterruptedException ie) {
  301. // who would interrupt me at a time like this?
  302. }
  303. continue outer2;
  304. }
  305. }
  306. stillRunning = false;
  307. }
  308. }
  309. if (timedOut) {
  310. throw new BuildException("Parallel execution timed out");
  311. }
  312. // now did any of the threads throw an exception
  313. exceptionMessage = new StringBuffer();
  314. numExceptions = 0;
  315. firstException = null;
  316. firstLocation = Location.UNKNOWN_LOCATION;
  317. processExceptions(daemons);
  318. processExceptions(runnables);
  319. if (numExceptions == 1) {
  320. if (firstException instanceof BuildException) {
  321. throw (BuildException) firstException;
  322. } else {
  323. throw new BuildException(firstException);
  324. }
  325. } else if (numExceptions > 1) {
  326. throw new BuildException(exceptionMessage.toString(),
  327. firstLocation);
  328. }
  329. }
  330. /**
  331. * Determine the number of processors. Only effective on later VMs
  332. *
  333. * @return the number of processors available or 0 if not determinable.
  334. */
  335. private int getNumProcessors() {
  336. try {
  337. Class[] paramTypes = {};
  338. Method availableProcessors =
  339. Runtime.class.getMethod("availableProcessors", paramTypes);
  340. Object[] args = {};
  341. Integer ret = (Integer) availableProcessors.invoke(Runtime.getRuntime(), args);
  342. return ret.intValue();
  343. } catch (Exception e) {
  344. // return a bogus number
  345. return 0;
  346. }
  347. }
  348. /**
  349. * thread that execs a task
  350. */
  351. private class TaskRunnable implements Runnable {
  352. private Throwable exception;
  353. private Task task;
  354. boolean finished;
  355. /**
  356. * Construct a new TaskRunnable.<p>
  357. *
  358. * @param task the Task to be executed in a separate thread
  359. */
  360. TaskRunnable(Task task) {
  361. this.task = task;
  362. }
  363. /**
  364. * Executes the task within a thread and takes care about
  365. * Exceptions raised within the task.
  366. */
  367. public void run() {
  368. try {
  369. task.perform();
  370. } catch (Throwable t) {
  371. exception = t;
  372. if (failOnAny) {
  373. stillRunning = false;
  374. }
  375. } finally {
  376. synchronized (semaphore) {
  377. finished = true;
  378. semaphore.notifyAll();
  379. }
  380. }
  381. }
  382. /**
  383. * get any exception that got thrown during execution;
  384. * @return an exception or null for no exception/not yet finished
  385. */
  386. public Throwable getException() {
  387. return exception;
  388. }
  389. }
  390. }