You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

Parallel.java 17 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package org.apache.tools.ant.taskdefs;
  19. import java.lang.reflect.Method;
  20. import java.util.Enumeration;
  21. import java.util.Vector;
  22. import java.util.List;
  23. import java.util.ArrayList;
  24. import org.apache.tools.ant.BuildException;
  25. import org.apache.tools.ant.Location;
  26. import org.apache.tools.ant.Task;
  27. import org.apache.tools.ant.TaskContainer;
  28. import org.apache.tools.ant.property.LocalProperties;
  29. import org.apache.tools.ant.util.StringUtils;
  30. /**
  31. * Executes the contained tasks in separate threads, continuing
  32. * once all are completed.
  33. * <p>
  34. * New behavior allows for the ant script to specify a maximum number of
  35. * threads that will be executed in parallel. One should be very careful about
  36. * using the <code>waitFor</code> task when specifying <code>threadCount</code>
  37. * as it can cause deadlocks if the number of threads is too small or if one of
  38. * the nested tasks fails to execute completely. The task selection algorithm
  39. * will insure that the tasks listed before a task have started before that
  40. * task is started, but it will not insure a successful completion of those
  41. * tasks or that those tasks will finish first (i.e. it's a classic race
  42. * condition).
  43. * </p>
  44. * @since Ant 1.4
  45. *
  46. * @ant.task category="control"
  47. */
  48. public class Parallel extends Task
  49. implements TaskContainer {
  50. private static final int NUMBER_TRIES = 100;
  51. /** Class which holds a list of tasks to execute */
  52. public static class TaskList implements TaskContainer {
  53. /** Collection holding the nested tasks */
  54. private List tasks = new ArrayList();
  55. /**
  56. * Add a nested task to execute parallel (asynchron).
  57. * <p>
  58. * @param nestedTask Nested task to be executed in parallel.
  59. * must not be null.
  60. */
  61. public void addTask(Task nestedTask) {
  62. tasks.add(nestedTask);
  63. }
  64. }
  65. /** Collection holding the nested tasks */
  66. private Vector nestedTasks = new Vector();
  67. /** Semaphore to notify of completed threads */
  68. private final Object semaphore = new Object();
  69. /** Total number of threads to run */
  70. private int numThreads = 0;
  71. /** Total number of threads per processor to run. */
  72. private int numThreadsPerProcessor = 0;
  73. /** The timeout period in milliseconds */
  74. private long timeout;
  75. /** Indicates threads are still running and new threads can be issued */
  76. private volatile boolean stillRunning;
  77. /** Indicates that the execution timedout */
  78. private boolean timedOut;
  79. /**
  80. * Indicates whether failure of any of the nested tasks should end
  81. * execution
  82. */
  83. private boolean failOnAny;
  84. /** The dameon task list if any */
  85. private TaskList daemonTasks;
  86. /** Accumulation of exceptions messages from all nested tasks */
  87. private StringBuffer exceptionMessage;
  88. /** Number of exceptions from nested tasks */
  89. private int numExceptions = 0;
  90. /** The first exception encountered */
  91. private Throwable firstException;
  92. /** The location of the first exception */
  93. private Location firstLocation;
  94. /**
  95. * Add a group of daemon threads
  96. * @param daemonTasks The tasks to be executed as daemon.
  97. */
  98. public void addDaemons(TaskList daemonTasks) {
  99. if (this.daemonTasks != null) {
  100. throw new BuildException("Only one daemon group is supported");
  101. }
  102. this.daemonTasks = daemonTasks;
  103. }
  104. /**
  105. * Interval to poll for completed threads when threadCount or
  106. * threadsPerProcessor is specified. Integer in milliseconds.; optional
  107. *
  108. * @param pollInterval New value of property pollInterval.
  109. */
  110. public void setPollInterval(int pollInterval) {
  111. }
  112. /**
  113. * Control whether a failure in a nested task halts execution. Note that
  114. * the task will complete but existing threads will continue to run - they
  115. * are not stopped
  116. *
  117. * @param failOnAny if true any nested task failure causes parallel to
  118. * complete.
  119. */
  120. public void setFailOnAny(boolean failOnAny) {
  121. this.failOnAny = failOnAny;
  122. }
  123. /**
  124. * Add a nested task to execute in parallel.
  125. * @param nestedTask Nested task to be executed in parallel
  126. */
  127. public void addTask(Task nestedTask) {
  128. nestedTasks.addElement(nestedTask);
  129. }
  130. /**
  131. * Dynamically generates the number of threads to execute based on the
  132. * number of available processors (via
  133. * <code>java.lang.Runtime.availableProcessors()</code>). Requires a J2SE
  134. * 1.4 VM, and it will overwrite the value set in threadCount.
  135. * If used in a 1.1, 1.2, or 1.3 VM then the task will defer to
  136. * <code>threadCount</code>.; optional
  137. * @param numThreadsPerProcessor Number of threads to create per available
  138. * processor.
  139. *
  140. */
  141. public void setThreadsPerProcessor(int numThreadsPerProcessor) {
  142. this.numThreadsPerProcessor = numThreadsPerProcessor;
  143. }
  144. /**
  145. * Statically determine the maximum number of tasks to execute
  146. * simultaneously. If there are less tasks than threads then all will be
  147. * executed at once, if there are more then only <code>threadCount</code>
  148. * tasks will be executed at one time. If <code>threadsPerProcessor</code>
  149. * is set and the JVM is at least a 1.4 VM then this value is
  150. * ignored.; optional
  151. *
  152. * @param numThreads total number of threads.
  153. *
  154. */
  155. public void setThreadCount(int numThreads) {
  156. this.numThreads = numThreads;
  157. }
  158. /**
  159. * Sets the timeout on this set of tasks. If the timeout is reached
  160. * before the other threads complete, the execution of this
  161. * task completes with an exception.
  162. *
  163. * Note that existing threads continue to run.
  164. *
  165. * @param timeout timeout in milliseconds.
  166. */
  167. public void setTimeout(long timeout) {
  168. this.timeout = timeout;
  169. }
  170. /**
  171. * Execute the parallel tasks
  172. *
  173. * @exception BuildException if any of the threads failed.
  174. */
  175. public void execute() throws BuildException {
  176. updateThreadCounts();
  177. if (numThreads == 0) {
  178. numThreads = nestedTasks.size();
  179. }
  180. spinThreads();
  181. }
  182. /**
  183. * Determine the number of threads based on the number of processors
  184. */
  185. private void updateThreadCounts() {
  186. if (numThreadsPerProcessor != 0) {
  187. int numProcessors = getNumProcessors();
  188. if (numProcessors != 0) {
  189. numThreads = numProcessors * numThreadsPerProcessor;
  190. }
  191. }
  192. }
  193. private void processExceptions(TaskRunnable[] runnables) {
  194. if (runnables == null) {
  195. return;
  196. }
  197. for (int i = 0; i < runnables.length; ++i) {
  198. Throwable t = runnables[i].getException();
  199. if (t != null) {
  200. numExceptions++;
  201. if (firstException == null) {
  202. firstException = t;
  203. }
  204. if (t instanceof BuildException
  205. && firstLocation == Location.UNKNOWN_LOCATION) {
  206. firstLocation = ((BuildException) t).getLocation();
  207. }
  208. exceptionMessage.append(StringUtils.LINE_SEP);
  209. exceptionMessage.append(t.getMessage());
  210. }
  211. }
  212. }
  213. /**
  214. * Spin up required threads with a maximum number active at any given time.
  215. *
  216. * @exception BuildException if any of the threads failed.
  217. */
  218. private void spinThreads() throws BuildException {
  219. final int numTasks = nestedTasks.size();
  220. TaskRunnable[] runnables = new TaskRunnable[numTasks];
  221. stillRunning = true;
  222. timedOut = false;
  223. boolean interrupted = false;
  224. int threadNumber = 0;
  225. for (Enumeration e = nestedTasks.elements(); e.hasMoreElements();
  226. threadNumber++) {
  227. Task nestedTask = (Task) e.nextElement();
  228. runnables[threadNumber]
  229. = new TaskRunnable(nestedTask);
  230. }
  231. final int maxRunning = numTasks < numThreads ? numTasks : numThreads;
  232. TaskRunnable[] running = new TaskRunnable[maxRunning];
  233. threadNumber = 0;
  234. ThreadGroup group = new ThreadGroup("parallel");
  235. TaskRunnable[] daemons = null;
  236. if (daemonTasks != null && daemonTasks.tasks.size() != 0) {
  237. daemons = new TaskRunnable[daemonTasks.tasks.size()];
  238. }
  239. synchronized (semaphore) {
  240. // When we leave this block we can be sure all data is really
  241. // stored in main memory before the new threads start, the new
  242. // threads will for sure load the data from main memory.
  243. //
  244. // This probably is slightly paranoid.
  245. }
  246. synchronized (semaphore) {
  247. // start any daemon threads
  248. if (daemons != null) {
  249. for (int i = 0; i < daemons.length; ++i) {
  250. daemons[i] = new TaskRunnable((Task) daemonTasks.tasks.get(i));
  251. Thread daemonThread = new Thread(group, daemons[i]);
  252. daemonThread.setDaemon(true);
  253. daemonThread.start();
  254. }
  255. }
  256. // now run main threads in limited numbers...
  257. // start initial batch of threads
  258. for (int i = 0; i < maxRunning; ++i) {
  259. running[i] = runnables[threadNumber++];
  260. Thread thread = new Thread(group, running[i]);
  261. thread.start();
  262. }
  263. if (timeout != 0) {
  264. // start the timeout thread
  265. Thread timeoutThread = new Thread() {
  266. public synchronized void run() {
  267. try {
  268. wait(timeout);
  269. synchronized (semaphore) {
  270. stillRunning = false;
  271. timedOut = true;
  272. semaphore.notifyAll();
  273. }
  274. } catch (InterruptedException e) {
  275. // ignore
  276. }
  277. }
  278. };
  279. timeoutThread.start();
  280. }
  281. try {
  282. // now find available running slots for the remaining threads
  283. outer: while (threadNumber < numTasks && stillRunning) {
  284. for (int i = 0; i < maxRunning; i++) {
  285. if (running[i] == null || running[i].isFinished()) {
  286. running[i] = runnables[threadNumber++];
  287. Thread thread = new Thread(group, running[i]);
  288. thread.start();
  289. // continue on outer while loop to get another
  290. // available slot
  291. continue outer;
  292. }
  293. }
  294. // if we got here all slots in use, so sleep until
  295. // something happens
  296. semaphore.wait();
  297. }
  298. // are all threads finished
  299. outer2: while (stillRunning) {
  300. for (int i = 0; i < maxRunning; ++i) {
  301. if (running[i] != null && !running[i].isFinished()) {
  302. // System.out.println("Thread " + i + " is still
  303. // alive ");
  304. // still running - wait for it
  305. semaphore.wait();
  306. continue outer2;
  307. }
  308. }
  309. stillRunning = false;
  310. }
  311. } catch (InterruptedException ie) {
  312. interrupted = true;
  313. }
  314. killAll(running);
  315. }
  316. if (interrupted) {
  317. throw new BuildException("Parallel execution interrupted.");
  318. }
  319. if (timedOut) {
  320. throw new BuildException("Parallel execution timed out");
  321. }
  322. // now did any of the threads throw an exception
  323. exceptionMessage = new StringBuffer();
  324. numExceptions = 0;
  325. firstException = null;
  326. firstLocation = Location.UNKNOWN_LOCATION;
  327. processExceptions(daemons);
  328. processExceptions(runnables);
  329. if (numExceptions == 1) {
  330. if (firstException instanceof BuildException) {
  331. throw (BuildException) firstException;
  332. } else {
  333. throw new BuildException(firstException);
  334. }
  335. } else if (numExceptions > 1) {
  336. throw new BuildException(exceptionMessage.toString(),
  337. firstLocation);
  338. }
  339. }
  340. /**
  341. * Doesn't do anything if all threads where already gone,
  342. * else it tries to interrupt the threads 100 times.
  343. * @param running The list of tasks that may currently be running.
  344. */
  345. private void killAll(TaskRunnable[] running) {
  346. boolean oneAlive;
  347. int tries = 0;
  348. do {
  349. oneAlive = false;
  350. for (int i = 0; i < running.length; i++) {
  351. if (running[i] != null && !running[i].isFinished()) {
  352. running[i].interrupt();
  353. Thread.yield();
  354. oneAlive = true;
  355. }
  356. }
  357. if (oneAlive) {
  358. tries++;
  359. Thread.yield();
  360. }
  361. } while (oneAlive && tries < NUMBER_TRIES);
  362. }
  363. /**
  364. * Determine the number of processors. Only effective on Java 1.4+
  365. *
  366. * @return the number of processors available or 0 if not determinable.
  367. */
  368. private int getNumProcessors() {
  369. try {
  370. Class[] paramTypes = {};
  371. Method availableProcessors =
  372. Runtime.class.getMethod("availableProcessors", paramTypes);
  373. Object[] args = {};
  374. Integer ret = (Integer) availableProcessors.invoke(Runtime.getRuntime(), args);
  375. return ret.intValue();
  376. } catch (Exception e) {
  377. // return a bogus number
  378. return 0;
  379. }
  380. }
  381. /**
  382. * thread that execs a task
  383. */
  384. private class TaskRunnable implements Runnable {
  385. private Throwable exception;
  386. private Task task;
  387. private boolean finished;
  388. private volatile Thread thread;
  389. /**
  390. * Construct a new TaskRunnable.<p>
  391. *
  392. * @param task the Task to be executed in a separate thread
  393. */
  394. TaskRunnable(Task task) {
  395. this.task = task;
  396. }
  397. /**
  398. * Executes the task within a thread and takes care about
  399. * Exceptions raised within the task.
  400. */
  401. public void run() {
  402. try {
  403. LocalProperties.get(getProject()).copy();
  404. thread = Thread.currentThread();
  405. task.perform();
  406. } catch (Throwable t) {
  407. exception = t;
  408. if (failOnAny) {
  409. stillRunning = false;
  410. }
  411. } finally {
  412. synchronized (semaphore) {
  413. finished = true;
  414. semaphore.notifyAll();
  415. }
  416. }
  417. }
  418. /**
  419. * get any exception that got thrown during execution;
  420. * @return an exception or null for no exception/not yet finished
  421. */
  422. public Throwable getException() {
  423. return exception;
  424. }
  425. /**
  426. * Provides the indicator that the task has been finished.
  427. * @return Returns true when the task is finished.
  428. */
  429. boolean isFinished() {
  430. return finished;
  431. }
  432. void interrupt() {
  433. thread.interrupt();
  434. }
  435. }
  436. }