1. /*
  2. * @(#)ThreadPoolExecutor.java 1.9 04/07/12
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package java.util.concurrent;
  8. import java.util.concurrent.locks.*;
  9. import java.util.*;
  10. /**
  11. * An {@link ExecutorService} that executes each submitted task using
  12. * one of possibly several pooled threads, normally configured
  13. * using {@link Executors} factory methods.
  14. *
  15. * <p>Thread pools address two different problems: they usually
  16. * provide improved performance when executing large numbers of
  17. * asynchronous tasks, due to reduced per-task invocation overhead,
  18. * and they provide a means of bounding and managing the resources,
  19. * including threads, consumed when executing a collection of tasks.
  20. * Each <tt>ThreadPoolExecutor</tt> also maintains some basic
  21. * statistics, such as the number of completed tasks.
  22. *
  23. * <p>To be useful across a wide range of contexts, this class
  24. * provides many adjustable parameters and extensibility
  25. * hooks. However, programmers are urged to use the more convenient
  26. * {@link Executors} factory methods {@link
  27. * Executors#newCachedThreadPool} (unbounded thread pool, with
  28. * automatic thread reclamation), {@link Executors#newFixedThreadPool}
  29. * (fixed size thread pool) and {@link
  30. * Executors#newSingleThreadExecutor} (single background thread), that
  31. * preconfigure settings for the most common usage
  32. * scenarios. Otherwise, use the following guide when manually
  33. * configuring and tuning this class:
  34. *
  35. * <dl>
  36. *
  37. * <dt>Core and maximum pool sizes</dt>
  38. *
  39. * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the
  40. * pool size
  41. * (see {@link ThreadPoolExecutor#getPoolSize})
  42. * according to the bounds set by corePoolSize
  43. * (see {@link ThreadPoolExecutor#getCorePoolSize})
  44. * and
  45. * maximumPoolSize
  46. * (see {@link ThreadPoolExecutor#getMaximumPoolSize}).
  47. * When a new task is submitted in method {@link
  48. * ThreadPoolExecutor#execute}, and fewer than corePoolSize threads
  49. * are running, a new thread is created to handle the request, even if
  50. * other worker threads are idle. If there are more than
  51. * corePoolSize but less than maximumPoolSize threads running, a new
  52. * thread will be created only if the queue is full. By setting
  53. * corePoolSize and maximumPoolSize the same, you create a fixed-size
  54. * thread pool. By setting maximumPoolSize to an essentially unbounded
  55. * value such as <tt>Integer.MAX_VALUE</tt>, you allow the pool to
  56. * accommodate an arbitrary number of concurrent tasks. Most typically,
  57. * core and maximum pool sizes are set only upon construction, but they
  58. * may also be changed dynamically using {@link
  59. * ThreadPoolExecutor#setCorePoolSize} and {@link
  60. * ThreadPoolExecutor#setMaximumPoolSize}. <dd>
  61. *
  62. * <dt> On-demand construction
  63. *
  64. * <dd> By default, even core threads are initially created and
  65. * started only when needed by new tasks, but this can be overridden
  66. * dynamically using method {@link
  67. * ThreadPoolExecutor#prestartCoreThread} or
  68. * {@link ThreadPoolExecutor#prestartAllCoreThreads}. </dd>
  69. *
  70. * <dt>Creating new threads</dt>
  71. *
  72. * <dd>New threads are created using a {@link
  73. * java.util.concurrent.ThreadFactory}. If not otherwise specified, a
  74. * {@link Executors#defaultThreadFactory} is used, that creates threads to all
  75. * be in the same {@link ThreadGroup} and with the same
  76. * <tt>NORM_PRIORITY</tt> priority and non-daemon status. By supplying
  77. * a different ThreadFactory, you can alter the thread's name, thread
  78. * group, priority, daemon status, etc. If a <tt>ThreadFactory</tt> fails to create
  79. * a thread when asked by returning null from <tt>newThread</tt>,
  80. * the executor will continue, but might
  81. * not be able to execute any tasks. </dd>
  82. *
  83. * <dt>Keep-alive times</dt>
  84. *
  85. * <dd>If the pool currently has more than corePoolSize threads,
  86. * excess threads will be terminated if they have been idle for more
  87. * than the keepAliveTime (see {@link
  88. * ThreadPoolExecutor#getKeepAliveTime}). This provides a means of
  89. * reducing resource consumption when the pool is not being actively
  90. * used. If the pool becomes more active later, new threads will be
  91. * constructed. This parameter can also be changed dynamically
  92. * using method {@link ThreadPoolExecutor#setKeepAliveTime}. Using
  93. * a value of <tt>Long.MAX_VALUE</tt> {@link TimeUnit#NANOSECONDS}
  94. * effectively disables idle threads from ever terminating prior
  95. * to shut down.
  96. * </dd>
  97. *
  98. * <dt>Queuing</dt>
  99. *
  100. * <dd>Any {@link BlockingQueue} may be used to transfer and hold
  101. * submitted tasks. The use of this queue interacts with pool sizing:
  102. *
  103. * <ul>
  104. *
  105. * <li> If fewer than corePoolSize threads are running, the Executor
  106. * always prefers adding a new thread
  107. * rather than queuing.</li>
  108. *
  109. * <li> If corePoolSize or more threads are running, the Executor
  110. * always prefers queuing a request rather than adding a new
  111. * thread.</li>
  112. *
  113. * <li> If a request cannot be queued, a new thread is created unless
  114. * this would exceed maximumPoolSize, in which case, the task will be
  115. * rejected.</li>
  116. *
  117. * </ul>
  118. *
  119. * There are three general strategies for queuing:
  120. * <ol>
  121. *
  122. * <li> <em> Direct handoffs.</em> A good default choice for a work
  123. * queue is a {@link SynchronousQueue} that hands off tasks to threads
  124. * without otherwise holding them. Here, an attempt to queue a task
  125. * will fail if no threads are immediately available to run it, so a
  126. * new thread will be constructed. This policy avoids lockups when
  127. * handling sets of requests that might have internal dependencies.
  128. * Direct handoffs generally require unbounded maximumPoolSizes to
  129. * avoid rejection of new submitted tasks. This in turn admits the
  130. * possibility of unbounded thread growth when commands continue to
  131. * arrive on average faster than they can be processed. </li>
  132. *
  133. * <li><em> Unbounded queues.</em> Using an unbounded queue (for
  134. * example a {@link LinkedBlockingQueue} without a predefined
  135. * capacity) will cause new tasks to be queued in cases where all
  136. * corePoolSize threads are busy. Thus, no more than corePoolSize
  137. * threads will ever be created. (And the value of the maximumPoolSize
  138. * therefore doesn't have any effect.) This may be appropriate when
  139. * each task is completely independent of others, so tasks cannot
  140. * affect each others execution; for example, in a web page server.
  141. * While this style of queuing can be useful in smoothing out
  142. * transient bursts of requests, it admits the possibility of
  143. * unbounded work queue growth when commands continue to arrive on
  144. * average faster than they can be processed. </li>
  145. *
  146. * <li><em>Bounded queues.</em> A bounded queue (for example, an
  147. * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
  148. * used with finite maximumPoolSizes, but can be more difficult to
  149. * tune and control. Queue sizes and maximum pool sizes may be traded
  150. * off for each other: Using large queues and small pools minimizes
  151. * CPU usage, OS resources, and context-switching overhead, but can
  152. * lead to artificially low throughput. If tasks frequently block (for
  153. * example if they are I/O bound), a system may be able to schedule
  154. * time for more threads than you otherwise allow. Use of small queues
  155. * generally requires larger pool sizes, which keeps CPUs busier but
  156. * may encounter unacceptable scheduling overhead, which also
  157. * decreases throughput. </li>
  158. *
  159. * </ol>
  160. *
  161. * </dd>
  162. *
  163. * <dt>Rejected tasks</dt>
  164. *
  165. * <dd> New tasks submitted in method {@link
  166. * ThreadPoolExecutor#execute} will be <em>rejected</em> when the
  167. * Executor has been shut down, and also when the Executor uses finite
  168. * bounds for both maximum threads and work queue capacity, and is
  169. * saturated. In either case, the <tt>execute</tt> method invokes the
  170. * {@link RejectedExecutionHandler#rejectedExecution} method of its
  171. * {@link RejectedExecutionHandler}. Four predefined handler policies
  172. * are provided:
  173. *
  174. * <ol>
  175. *
  176. * <li> In the
  177. * default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a
  178. * runtime {@link RejectedExecutionException} upon rejection. </li>
  179. *
  180. * <li> In {@link
  181. * ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes
  182. * <tt>execute</tt> itself runs the task. This provides a simple
  183. * feedback control mechanism that will slow down the rate that new
  184. * tasks are submitted. </li>
  185. *
  186. * <li> In {@link ThreadPoolExecutor.DiscardPolicy},
  187. * a task that cannot be executed is simply dropped. </li>
  188. *
  189. * <li>In {@link
  190. * ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not
  191. * shut down, the task at the head of the work queue is dropped, and
  192. * then execution is retried (which can fail again, causing this to be
  193. * repeated.) </li>
  194. *
  195. * </ol>
  196. *
  197. * It is possible to define and use other kinds of {@link
  198. * RejectedExecutionHandler} classes. Doing so requires some care
  199. * especially when policies are designed to work only under particular
  200. * capacity or queuing policies. </dd>
  201. *
  202. * <dt>Hook methods</dt>
  203. *
  204. * <dd>This class provides <tt>protected</tt> overridable {@link
  205. * ThreadPoolExecutor#beforeExecute} and {@link
  206. * ThreadPoolExecutor#afterExecute} methods that are called before and
  207. * after execution of each task. These can be used to manipulate the
  208. * execution environment; for example, reinitializing ThreadLocals,
  209. * gathering statistics, or adding log entries. Additionally, method
  210. * {@link ThreadPoolExecutor#terminated} can be overridden to perform
  211. * any special processing that needs to be done once the Executor has
  212. * fully terminated.
  213. *
  214. * <p>If hook or callback methods throw
  215. * exceptions, internal worker threads may in turn fail and
  216. * abruptly terminate.</dd>
  217. *
  218. * <dt>Queue maintenance</dt>
  219. *
  220. * <dd> Method {@link ThreadPoolExecutor#getQueue} allows access to
  221. * the work queue for purposes of monitoring and debugging. Use of
  222. * this method for any other purpose is strongly discouraged. Two
  223. * supplied methods, {@link ThreadPoolExecutor#remove} and {@link
  224. * ThreadPoolExecutor#purge} are available to assist in storage
  225. * reclamation when large numbers of queued tasks become
  226. * cancelled.</dd> </dl>
  227. *
  228. * <p> <b>Extension example</b>. Most extensions of this class
  229. * override one or more of the protected hook methods. For example,
  230. * here is a subclass that adds a simple pause/resume feature:
  231. *
  232. * <pre>
  233. * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
  234. * private boolean isPaused;
  235. * private ReentrantLock pauseLock = new ReentrantLock();
  236. * private Condition unpaused = pauseLock.newCondition();
  237. *
  238. * public PausableThreadPoolExecutor(...) { super(...); }
  239. *
  240. * protected void beforeExecute(Thread t, Runnable r) {
  241. * super.beforeExecute(t, r);
  242. * pauseLock.lock();
  243. * try {
  244. * while (isPaused) unpaused.await();
  245. * } catch(InterruptedException ie) {
  246. * t.interrupt();
  247. * } finally {
  248. * pauseLock.unlock();
  249. * }
  250. * }
  251. *
  252. * public void pause() {
  253. * pauseLock.lock();
  254. * try {
  255. * isPaused = true;
  256. * } finally {
  257. * pauseLock.unlock();
  258. * }
  259. * }
  260. *
  261. * public void resume() {
  262. * pauseLock.lock();
  263. * try {
  264. * isPaused = false;
  265. * unpaused.signalAll();
  266. * } finally {
  267. * pauseLock.unlock();
  268. * }
  269. * }
  270. * }
  271. * </pre>
  272. * @since 1.5
  273. * @author Doug Lea
  274. */
  275. public class ThreadPoolExecutor extends AbstractExecutorService {
  276. /**
  277. * Only used to force toArray() to produce a Runnable[].
  278. */
  279. private static final Runnable[] EMPTY_RUNNABLE_ARRAY = new Runnable[0];
  280. /**
  281. * Permission for checking shutdown
  282. */
  283. private static final RuntimePermission shutdownPerm =
  284. new RuntimePermission("modifyThread");
  285. /**
  286. * Queue used for holding tasks and handing off to worker threads.
  287. */
  288. private final BlockingQueue<Runnable> workQueue;
  289. /**
  290. * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
  291. * workers set.
  292. */
  293. private final ReentrantLock mainLock = new ReentrantLock();
  294. /**
  295. * Wait condition to support awaitTermination
  296. */
  297. private final Condition termination = mainLock.newCondition();
  298. /**
  299. * Set containing all worker threads in pool.
  300. */
  301. private final HashSet<Worker> workers = new HashSet<Worker>();
  302. /**
  303. * Timeout in nanoseconds for idle threads waiting for work.
  304. * Threads use this timeout only when there are more than
  305. * corePoolSize present. Otherwise they wait forever for new work.
  306. */
  307. private volatile long keepAliveTime;
  308. /**
  309. * Core pool size, updated only while holding mainLock,
  310. * but volatile to allow concurrent readability even
  311. * during updates.
  312. */
  313. private volatile int corePoolSize;
  314. /**
  315. * Maximum pool size, updated only while holding mainLock
  316. * but volatile to allow concurrent readability even
  317. * during updates.
  318. */
  319. private volatile int maximumPoolSize;
  320. /**
  321. * Current pool size, updated only while holding mainLock
  322. * but volatile to allow concurrent readability even
  323. * during updates.
  324. */
  325. private volatile int poolSize;
  326. /**
  327. * Lifecycle state
  328. */
  329. volatile int runState;
  330. // Special values for runState
  331. /** Normal, not-shutdown mode */
  332. static final int RUNNING = 0;
  333. /** Controlled shutdown mode */
  334. static final int SHUTDOWN = 1;
  335. /** Immediate shutdown mode */
  336. static final int STOP = 2;
  337. /** Final state */
  338. static final int TERMINATED = 3;
  339. /**
  340. * Handler called when saturated or shutdown in execute.
  341. */
  342. private volatile RejectedExecutionHandler handler;
  343. /**
  344. * Factory for new threads.
  345. */
  346. private volatile ThreadFactory threadFactory;
  347. /**
  348. * Tracks largest attained pool size.
  349. */
  350. private int largestPoolSize;
  351. /**
  352. * Counter for completed tasks. Updated only on termination of
  353. * worker threads.
  354. */
  355. private long completedTaskCount;
  356. /**
  357. * The default rejected execution handler
  358. */
  359. private static final RejectedExecutionHandler defaultHandler =
  360. new AbortPolicy();
  361. /**
  362. * Invoke the rejected execution handler for the given command.
  363. */
  364. void reject(Runnable command) {
  365. handler.rejectedExecution(command, this);
  366. }
  367. /**
  368. * Create and return a new thread running firstTask as its first
  369. * task. Call only while holding mainLock
  370. * @param firstTask the task the new thread should run first (or
  371. * null if none)
  372. * @return the new thread, or null if threadFactory fails to create thread
  373. */
  374. private Thread addThread(Runnable firstTask) {
  375. Worker w = new Worker(firstTask);
  376. Thread t = threadFactory.newThread(w);
  377. if (t != null) {
  378. w.thread = t;
  379. workers.add(w);
  380. int nt = ++poolSize;
  381. if (nt > largestPoolSize)
  382. largestPoolSize = nt;
  383. }
  384. return t;
  385. }
  386. /**
  387. * Create and start a new thread running firstTask as its first
  388. * task, only if fewer than corePoolSize threads are running.
  389. * @param firstTask the task the new thread should run first (or
  390. * null if none)
  391. * @return true if successful.
  392. */
  393. private boolean addIfUnderCorePoolSize(Runnable firstTask) {
  394. Thread t = null;
  395. final ReentrantLock mainLock = this.mainLock;
  396. mainLock.lock();
  397. try {
  398. if (poolSize < corePoolSize)
  399. t = addThread(firstTask);
  400. } finally {
  401. mainLock.unlock();
  402. }
  403. if (t == null)
  404. return false;
  405. t.start();
  406. return true;
  407. }
  408. /**
  409. * Create and start a new thread only if fewer than maximumPoolSize
  410. * threads are running. The new thread runs as its first task the
  411. * next task in queue, or if there is none, the given task.
  412. * @param firstTask the task the new thread should run first (or
  413. * null if none)
  414. * @return null on failure, else the first task to be run by new thread.
  415. */
  416. private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
  417. Thread t = null;
  418. Runnable next = null;
  419. final ReentrantLock mainLock = this.mainLock;
  420. mainLock.lock();
  421. try {
  422. if (poolSize < maximumPoolSize) {
  423. next = workQueue.poll();
  424. if (next == null)
  425. next = firstTask;
  426. t = addThread(next);
  427. }
  428. } finally {
  429. mainLock.unlock();
  430. }
  431. if (t == null)
  432. return null;
  433. t.start();
  434. return next;
  435. }
  436. /**
  437. * Get the next task for a worker thread to run.
  438. * @return the task
  439. * @throws InterruptedException if interrupted while waiting for task
  440. */
  441. Runnable getTask() throws InterruptedException {
  442. for (;;) {
  443. switch(runState) {
  444. case RUNNING: {
  445. if (poolSize <= corePoolSize) // untimed wait if core
  446. return workQueue.take();
  447. long timeout = keepAliveTime;
  448. if (timeout <= 0) // die immediately for 0 timeout
  449. return null;
  450. Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
  451. if (r != null)
  452. return r;
  453. if (poolSize > corePoolSize) // timed out
  454. return null;
  455. // else, after timeout, pool shrank so shouldn't die, so retry
  456. break;
  457. }
  458. case SHUTDOWN: {
  459. // Help drain queue
  460. Runnable r = workQueue.poll();
  461. if (r != null)
  462. return r;
  463. // Check if can terminate
  464. if (workQueue.isEmpty()) {
  465. interruptIdleWorkers();
  466. return null;
  467. }
  468. // There could still be delayed tasks in queue.
  469. // Wait for one, re-checking state upon interruption
  470. try {
  471. return workQueue.take();
  472. } catch(InterruptedException ignore) {}
  473. break;
  474. }
  475. case STOP:
  476. return null;
  477. default:
  478. assert false;
  479. }
  480. }
  481. }
  482. /**
  483. * Wake up all threads that might be waiting for tasks.
  484. */
  485. void interruptIdleWorkers() {
  486. final ReentrantLock mainLock = this.mainLock;
  487. mainLock.lock();
  488. try {
  489. for (Worker w : workers)
  490. w.interruptIfIdle();
  491. } finally {
  492. mainLock.unlock();
  493. }
  494. }
  495. /**
  496. * Perform bookkeeping for a terminated worker thread.
  497. * @param w the worker
  498. */
  499. void workerDone(Worker w) {
  500. final ReentrantLock mainLock = this.mainLock;
  501. mainLock.lock();
  502. try {
  503. completedTaskCount += w.completedTasks;
  504. workers.remove(w);
  505. if (--poolSize > 0)
  506. return;
  507. // Else, this is the last thread. Deal with potential shutdown.
  508. int state = runState;
  509. assert state != TERMINATED;
  510. if (state != STOP) {
  511. // If there are queued tasks but no threads, create
  512. // replacement thread. We must create it initially
  513. // idle to avoid orphaned tasks in case addThread
  514. // fails. This also handles case of delayed tasks
  515. // that will sometime later become runnable.
  516. if (!workQueue.isEmpty()) {
  517. Thread t = addThread(null);
  518. if (t != null)
  519. t.start();
  520. return;
  521. }
  522. // Otherwise, we can exit without replacement
  523. if (state == RUNNING)
  524. return;
  525. }
  526. // Either state is STOP, or state is SHUTDOWN and there is
  527. // no work to do. So we can terminate.
  528. termination.signalAll();
  529. runState = TERMINATED;
  530. // fall through to call terminate() outside of lock.
  531. } finally {
  532. mainLock.unlock();
  533. }
  534. assert runState == TERMINATED;
  535. terminated();
  536. }
  537. /**
  538. * Worker threads
  539. */
  540. private class Worker implements Runnable {
  541. /**
  542. * The runLock is acquired and released surrounding each task
  543. * execution. It mainly protects against interrupts that are
  544. * intended to cancel the worker thread from instead
  545. * interrupting the task being run.
  546. */
  547. private final ReentrantLock runLock = new ReentrantLock();
  548. /**
  549. * Initial task to run before entering run loop
  550. */
  551. private Runnable firstTask;
  552. /**
  553. * Per thread completed task counter; accumulated
  554. * into completedTaskCount upon termination.
  555. */
  556. volatile long completedTasks;
  557. /**
  558. * Thread this worker is running in. Acts as a final field,
  559. * but cannot be set until thread is created.
  560. */
  561. Thread thread;
  562. Worker(Runnable firstTask) {
  563. this.firstTask = firstTask;
  564. }
  565. boolean isActive() {
  566. return runLock.isLocked();
  567. }
  568. /**
  569. * Interrupt thread if not running a task
  570. */
  571. void interruptIfIdle() {
  572. final ReentrantLock runLock = this.runLock;
  573. if (runLock.tryLock()) {
  574. try {
  575. thread.interrupt();
  576. } finally {
  577. runLock.unlock();
  578. }
  579. }
  580. }
  581. /**
  582. * Cause thread to die even if running a task.
  583. */
  584. void interruptNow() {
  585. thread.interrupt();
  586. }
  587. /**
  588. * Run a single task between before/after methods.
  589. */
  590. private void runTask(Runnable task) {
  591. final ReentrantLock runLock = this.runLock;
  592. runLock.lock();
  593. try {
  594. // Abort now if immediate cancel. Otherwise, we have
  595. // committed to run this task.
  596. if (runState == STOP)
  597. return;
  598. Thread.interrupted(); // clear interrupt status on entry
  599. boolean ran = false;
  600. beforeExecute(thread, task);
  601. try {
  602. task.run();
  603. ran = true;
  604. afterExecute(task, null);
  605. ++completedTasks;
  606. } catch(RuntimeException ex) {
  607. if (!ran)
  608. afterExecute(task, ex);
  609. // Else the exception occurred within
  610. // afterExecute itself in which case we don't
  611. // want to call it again.
  612. throw ex;
  613. }
  614. } finally {
  615. runLock.unlock();
  616. }
  617. }
  618. /**
  619. * Main run loop
  620. */
  621. public void run() {
  622. try {
  623. Runnable task = firstTask;
  624. firstTask = null;
  625. while (task != null || (task = getTask()) != null) {
  626. runTask(task);
  627. task = null; // unnecessary but can help GC
  628. }
  629. } catch(InterruptedException ie) {
  630. // fall through
  631. } finally {
  632. workerDone(this);
  633. }
  634. }
  635. }
  636. // Public methods
  637. /**
  638. * Creates a new <tt>ThreadPoolExecutor</tt> with the given
  639. * initial parameters and default thread factory and handler. It
  640. * may be more convenient to use one of the {@link Executors}
  641. * factory methods instead of this general purpose constructor.
  642. *
  643. * @param corePoolSize the number of threads to keep in the
  644. * pool, even if they are idle.
  645. * @param maximumPoolSize the maximum number of threads to allow in the
  646. * pool.
  647. * @param keepAliveTime when the number of threads is greater than
  648. * the core, this is the maximum time that excess idle threads
  649. * will wait for new tasks before terminating.
  650. * @param unit the time unit for the keepAliveTime
  651. * argument.
  652. * @param workQueue the queue to use for holding tasks before they
  653. * are executed. This queue will hold only the <tt>Runnable</tt>
  654. * tasks submitted by the <tt>execute</tt> method.
  655. * @throws IllegalArgumentException if corePoolSize, or
  656. * keepAliveTime less than zero, or if maximumPoolSize less than or
  657. * equal to zero, or if corePoolSize greater than maximumPoolSize.
  658. * @throws NullPointerException if <tt>workQueue</tt> is null
  659. */
  660. public ThreadPoolExecutor(int corePoolSize,
  661. int maximumPoolSize,
  662. long keepAliveTime,
  663. TimeUnit unit,
  664. BlockingQueue<Runnable> workQueue) {
  665. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  666. Executors.defaultThreadFactory(), defaultHandler);
  667. }
  668. /**
  669. * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
  670. * parameters.
  671. *
  672. * @param corePoolSize the number of threads to keep in the
  673. * pool, even if they are idle.
  674. * @param maximumPoolSize the maximum number of threads to allow in the
  675. * pool.
  676. * @param keepAliveTime when the number of threads is greater than
  677. * the core, this is the maximum time that excess idle threads
  678. * will wait for new tasks before terminating.
  679. * @param unit the time unit for the keepAliveTime
  680. * argument.
  681. * @param workQueue the queue to use for holding tasks before they
  682. * are executed. This queue will hold only the <tt>Runnable</tt>
  683. * tasks submitted by the <tt>execute</tt> method.
  684. * @param threadFactory the factory to use when the executor
  685. * creates a new thread.
  686. * @throws IllegalArgumentException if corePoolSize, or
  687. * keepAliveTime less than zero, or if maximumPoolSize less than or
  688. * equal to zero, or if corePoolSize greater than maximumPoolSize.
  689. * @throws NullPointerException if <tt>workQueue</tt>
  690. * or <tt>threadFactory</tt> are null.
  691. */
  692. public ThreadPoolExecutor(int corePoolSize,
  693. int maximumPoolSize,
  694. long keepAliveTime,
  695. TimeUnit unit,
  696. BlockingQueue<Runnable> workQueue,
  697. ThreadFactory threadFactory) {
  698. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  699. threadFactory, defaultHandler);
  700. }
  701. /**
  702. * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
  703. * parameters.
  704. *
  705. * @param corePoolSize the number of threads to keep in the
  706. * pool, even if they are idle.
  707. * @param maximumPoolSize the maximum number of threads to allow in the
  708. * pool.
  709. * @param keepAliveTime when the number of threads is greater than
  710. * the core, this is the maximum time that excess idle threads
  711. * will wait for new tasks before terminating.
  712. * @param unit the time unit for the keepAliveTime
  713. * argument.
  714. * @param workQueue the queue to use for holding tasks before they
  715. * are executed. This queue will hold only the <tt>Runnable</tt>
  716. * tasks submitted by the <tt>execute</tt> method.
  717. * @param handler the handler to use when execution is blocked
  718. * because the thread bounds and queue capacities are reached.
  719. * @throws IllegalArgumentException if corePoolSize, or
  720. * keepAliveTime less than zero, or if maximumPoolSize less than or
  721. * equal to zero, or if corePoolSize greater than maximumPoolSize.
  722. * @throws NullPointerException if <tt>workQueue</tt>
  723. * or <tt>handler</tt> are null.
  724. */
  725. public ThreadPoolExecutor(int corePoolSize,
  726. int maximumPoolSize,
  727. long keepAliveTime,
  728. TimeUnit unit,
  729. BlockingQueue<Runnable> workQueue,
  730. RejectedExecutionHandler handler) {
  731. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  732. Executors.defaultThreadFactory(), handler);
  733. }
  734. /**
  735. * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
  736. * parameters.
  737. *
  738. * @param corePoolSize the number of threads to keep in the
  739. * pool, even if they are idle.
  740. * @param maximumPoolSize the maximum number of threads to allow in the
  741. * pool.
  742. * @param keepAliveTime when the number of threads is greater than
  743. * the core, this is the maximum time that excess idle threads
  744. * will wait for new tasks before terminating.
  745. * @param unit the time unit for the keepAliveTime
  746. * argument.
  747. * @param workQueue the queue to use for holding tasks before they
  748. * are executed. This queue will hold only the <tt>Runnable</tt>
  749. * tasks submitted by the <tt>execute</tt> method.
  750. * @param threadFactory the factory to use when the executor
  751. * creates a new thread.
  752. * @param handler the handler to use when execution is blocked
  753. * because the thread bounds and queue capacities are reached.
  754. * @throws IllegalArgumentException if corePoolSize, or
  755. * keepAliveTime less than zero, or if maximumPoolSize less than or
  756. * equal to zero, or if corePoolSize greater than maximumPoolSize.
  757. * @throws NullPointerException if <tt>workQueue</tt>
  758. * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
  759. */
  760. public ThreadPoolExecutor(int corePoolSize,
  761. int maximumPoolSize,
  762. long keepAliveTime,
  763. TimeUnit unit,
  764. BlockingQueue<Runnable> workQueue,
  765. ThreadFactory threadFactory,
  766. RejectedExecutionHandler handler) {
  767. if (corePoolSize < 0 ||
  768. maximumPoolSize <= 0 ||
  769. maximumPoolSize < corePoolSize ||
  770. keepAliveTime < 0)
  771. throw new IllegalArgumentException();
  772. if (workQueue == null || threadFactory == null || handler == null)
  773. throw new NullPointerException();
  774. this.corePoolSize = corePoolSize;
  775. this.maximumPoolSize = maximumPoolSize;
  776. this.workQueue = workQueue;
  777. this.keepAliveTime = unit.toNanos(keepAliveTime);
  778. this.threadFactory = threadFactory;
  779. this.handler = handler;
  780. }
  781. /**
  782. * Executes the given task sometime in the future. The task
  783. * may execute in a new thread or in an existing pooled thread.
  784. *
  785. * If the task cannot be submitted for execution, either because this
  786. * executor has been shutdown or because its capacity has been reached,
  787. * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
  788. *
  789. * @param command the task to execute
  790. * @throws RejectedExecutionException at discretion of
  791. * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
  792. * for execution
  793. * @throws NullPointerException if command is null
  794. */
  795. public void execute(Runnable command) {
  796. if (command == null)
  797. throw new NullPointerException();
  798. for (;;) {
  799. if (runState != RUNNING) {
  800. reject(command);
  801. return;
  802. }
  803. if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
  804. return;
  805. if (workQueue.offer(command))
  806. return;
  807. Runnable r = addIfUnderMaximumPoolSize(command);
  808. if (r == command)
  809. return;
  810. if (r == null) {
  811. reject(command);
  812. return;
  813. }
  814. // else retry
  815. }
  816. }
  817. /**
  818. * Initiates an orderly shutdown in which previously submitted
  819. * tasks are executed, but no new tasks will be
  820. * accepted. Invocation has no additional effect if already shut
  821. * down.
  822. * @throws SecurityException if a security manager exists and
  823. * shutting down this ExecutorService may manipulate threads that
  824. * the caller is not permitted to modify because it does not hold
  825. * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
  826. * or the security manager's <tt>checkAccess</tt> method denies access.
  827. */
  828. public void shutdown() {
  829. // Fail if caller doesn't have modifyThread permission. We
  830. // explicitly check permissions directly because we can't trust
  831. // implementations of SecurityManager to correctly override
  832. // the "check access" methods such that our documented
  833. // security policy is implemented.
  834. SecurityManager security = System.getSecurityManager();
  835. if (security != null)
  836. java.security.AccessController.checkPermission(shutdownPerm);
  837. boolean fullyTerminated = false;
  838. final ReentrantLock mainLock = this.mainLock;
  839. mainLock.lock();
  840. try {
  841. if (workers.size() > 0) {
  842. // Check if caller can modify worker threads. This
  843. // might not be true even if passed above check, if
  844. // the SecurityManager treats some threads specially.
  845. if (security != null) {
  846. for (Worker w: workers)
  847. security.checkAccess(w.thread);
  848. }
  849. int state = runState;
  850. if (state == RUNNING) // don't override shutdownNow
  851. runState = SHUTDOWN;
  852. try {
  853. for (Worker w: workers)
  854. w.interruptIfIdle();
  855. } catch(SecurityException se) {
  856. // If SecurityManager allows above checks, but
  857. // then unexpectedly throws exception when
  858. // interrupting threads (which it ought not do),
  859. // back out as cleanly as we can. Some threads may
  860. // have been killed but we remain in non-shutdown
  861. // state.
  862. runState = state;
  863. throw se;
  864. }
  865. }
  866. else { // If no workers, trigger full termination now
  867. fullyTerminated = true;
  868. runState = TERMINATED;
  869. termination.signalAll();
  870. }
  871. } finally {
  872. mainLock.unlock();
  873. }
  874. if (fullyTerminated)
  875. terminated();
  876. }
  877. /**
  878. * Attempts to stop all actively executing tasks, halts the
  879. * processing of waiting tasks, and returns a list of the tasks that were
  880. * awaiting execution.
  881. *
  882. * <p>This implementation cancels tasks via {@link
  883. * Thread#interrupt}, so if any tasks mask or fail to respond to
  884. * interrupts, they may never terminate.
  885. *
  886. * @return list of tasks that never commenced execution
  887. * @throws SecurityException if a security manager exists and
  888. * shutting down this ExecutorService may manipulate threads that
  889. * the caller is not permitted to modify because it does not hold
  890. * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
  891. * or the security manager's <tt>checkAccess</tt> method denies access.
  892. */
  893. public List<Runnable> shutdownNow() {
  894. // Almost the same code as shutdown()
  895. SecurityManager security = System.getSecurityManager();
  896. if (security != null)
  897. java.security.AccessController.checkPermission(shutdownPerm);
  898. boolean fullyTerminated = false;
  899. final ReentrantLock mainLock = this.mainLock;
  900. mainLock.lock();
  901. try {
  902. if (workers.size() > 0) {
  903. if (security != null) {
  904. for (Worker w: workers)
  905. security.checkAccess(w.thread);
  906. }
  907. int state = runState;
  908. if (state != TERMINATED)
  909. runState = STOP;
  910. try {
  911. for (Worker w : workers)
  912. w.interruptNow();
  913. } catch(SecurityException se) {
  914. runState = state; // back out;
  915. throw se;
  916. }
  917. }
  918. else { // If no workers, trigger full termination now
  919. fullyTerminated = true;
  920. runState = TERMINATED;
  921. termination.signalAll();
  922. }
  923. } finally {
  924. mainLock.unlock();
  925. }
  926. if (fullyTerminated)
  927. terminated();
  928. return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY));
  929. }
  930. public boolean isShutdown() {
  931. return runState != RUNNING;
  932. }
  933. /**
  934. * Returns true if this executor is in the process of terminating
  935. * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
  936. * completely terminated. This method may be useful for
  937. * debugging. A return of <tt>true</tt> reported a sufficient
  938. * period after shutdown may indicate that submitted tasks have
  939. * ignored or suppressed interruption, causing this executor not
  940. * to properly terminate.
  941. * @return true if terminating but not yet terminated.
  942. */
  943. public boolean isTerminating() {
  944. return runState == STOP;
  945. }
  946. public boolean isTerminated() {
  947. return runState == TERMINATED;
  948. }
  949. public boolean awaitTermination(long timeout, TimeUnit unit)
  950. throws InterruptedException {
  951. long nanos = unit.toNanos(timeout);
  952. final ReentrantLock mainLock = this.mainLock;
  953. mainLock.lock();
  954. try {
  955. for (;;) {
  956. if (runState == TERMINATED)
  957. return true;
  958. if (nanos <= 0)
  959. return false;
  960. nanos = termination.awaitNanos(nanos);
  961. }
  962. } finally {
  963. mainLock.unlock();
  964. }
  965. }
  966. /**
  967. * Invokes <tt>shutdown</tt> when this executor is no longer
  968. * referenced.
  969. */
  970. protected void finalize() {
  971. shutdown();
  972. }
  973. /**
  974. * Sets the thread factory used to create new threads.
  975. *
  976. * @param threadFactory the new thread factory
  977. * @throws NullPointerException if threadFactory is null
  978. * @see #getThreadFactory
  979. */
  980. public void setThreadFactory(ThreadFactory threadFactory) {
  981. if (threadFactory == null)
  982. throw new NullPointerException();
  983. this.threadFactory = threadFactory;
  984. }
  985. /**
  986. * Returns the thread factory used to create new threads.
  987. *
  988. * @return the current thread factory
  989. * @see #setThreadFactory
  990. */
  991. public ThreadFactory getThreadFactory() {
  992. return threadFactory;
  993. }
  994. /**
  995. * Sets a new handler for unexecutable tasks.
  996. *
  997. * @param handler the new handler
  998. * @throws NullPointerException if handler is null
  999. * @see #getRejectedExecutionHandler
  1000. */
  1001. public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
  1002. if (handler == null)
  1003. throw new NullPointerException();
  1004. this.handler = handler;
  1005. }
  1006. /**
  1007. * Returns the current handler for unexecutable tasks.
  1008. *
  1009. * @return the current handler
  1010. * @see #setRejectedExecutionHandler
  1011. */
  1012. public RejectedExecutionHandler getRejectedExecutionHandler() {
  1013. return handler;
  1014. }
  1015. /**
  1016. * Returns the task queue used by this executor. Access to the
  1017. * task queue is intended primarily for debugging and monitoring.
  1018. * This queue may be in active use. Retrieving the task queue
  1019. * does not prevent queued tasks from executing.
  1020. *
  1021. * @return the task queue
  1022. */
  1023. public BlockingQueue<Runnable> getQueue() {
  1024. return workQueue;
  1025. }
  1026. /**
  1027. * Removes this task from the executor's internal queue if it is
  1028. * present, thus causing it not to be run if it has not already
  1029. * started.
  1030. *
  1031. * <p> This method may be useful as one part of a cancellation
  1032. * scheme. It may fail to remove tasks that have been converted
  1033. * into other forms before being placed on the internal queue. For
  1034. * example, a task entered using <tt>submit</tt> might be
  1035. * converted into a form that maintains <tt>Future</tt> status.
  1036. * However, in such cases, method {@link ThreadPoolExecutor#purge}
  1037. * may be used to remove those Futures that have been cancelled.
  1038. *
  1039. *
  1040. * @param task the task to remove
  1041. * @return true if the task was removed
  1042. */
  1043. public boolean remove(Runnable task) {
  1044. return getQueue().remove(task);
  1045. }
  1046. /**
  1047. * Tries to remove from the work queue all {@link Future}
  1048. * tasks that have been cancelled. This method can be useful as a
  1049. * storage reclamation operation, that has no other impact on
  1050. * functionality. Cancelled tasks are never executed, but may
  1051. * accumulate in work queues until worker threads can actively
  1052. * remove them. Invoking this method instead tries to remove them now.
  1053. * However, this method may fail to remove tasks in
  1054. * the presence of interference by other threads.
  1055. */
  1056. public void purge() {
  1057. // Fail if we encounter interference during traversal
  1058. try {
  1059. Iterator<Runnable> it = getQueue().iterator();
  1060. while (it.hasNext()) {
  1061. Runnable r = it.next();
  1062. if (r instanceof Future<?>) {
  1063. Future<?> c = (Future<?>)r;
  1064. if (c.isCancelled())
  1065. it.remove();
  1066. }
  1067. }
  1068. }
  1069. catch(ConcurrentModificationException ex) {
  1070. return;
  1071. }
  1072. }
  1073. /**
  1074. * Sets the core number of threads. This overrides any value set
  1075. * in the constructor. If the new value is smaller than the
  1076. * current value, excess existing threads will be terminated when
  1077. * they next become idle. If larger, new threads will, if needed,
  1078. * be started to execute any queued tasks.
  1079. *
  1080. * @param corePoolSize the new core size
  1081. * @throws IllegalArgumentException if <tt>corePoolSize</tt>
  1082. * less than zero
  1083. * @see #getCorePoolSize
  1084. */
  1085. public void setCorePoolSize(int corePoolSize) {
  1086. if (corePoolSize < 0)
  1087. throw new IllegalArgumentException();
  1088. final ReentrantLock mainLock = this.mainLock;
  1089. mainLock.lock();
  1090. try {
  1091. int extra = this.corePoolSize - corePoolSize;
  1092. this.corePoolSize = corePoolSize;
  1093. if (extra < 0) {
  1094. int n = workQueue.size();
  1095. // We have to create initially-idle threads here
  1096. // because we otherwise have no recourse about
  1097. // what to do with a dequeued task if addThread fails.
  1098. while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize ) {
  1099. Thread t = addThread(null);
  1100. if (t != null)
  1101. t.start();
  1102. else
  1103. break;
  1104. }
  1105. }
  1106. else if (extra > 0 && poolSize > corePoolSize) {
  1107. Iterator<Worker> it = workers.iterator();
  1108. while (it.hasNext() &&
  1109. extra-- > 0 &&
  1110. poolSize > corePoolSize &&
  1111. workQueue.remainingCapacity() == 0)
  1112. it.next().interruptIfIdle();
  1113. }
  1114. } finally {
  1115. mainLock.unlock();
  1116. }
  1117. }
  1118. /**
  1119. * Returns the core number of threads.
  1120. *
  1121. * @return the core number of threads
  1122. * @see #setCorePoolSize
  1123. */
  1124. public int getCorePoolSize() {
  1125. return corePoolSize;
  1126. }
  1127. /**
  1128. * Starts a core thread, causing it to idly wait for work. This
  1129. * overrides the default policy of starting core threads only when
  1130. * new tasks are executed. This method will return <tt>false</tt>
  1131. * if all core threads have already been started.
  1132. * @return true if a thread was started
  1133. */
  1134. public boolean prestartCoreThread() {
  1135. return addIfUnderCorePoolSize(null);
  1136. }
  1137. /**
  1138. * Starts all core threads, causing them to idly wait for work. This
  1139. * overrides the default policy of starting core threads only when
  1140. * new tasks are executed.
  1141. * @return the number of threads started.
  1142. */
  1143. public int prestartAllCoreThreads() {
  1144. int n = 0;
  1145. while (addIfUnderCorePoolSize(null))
  1146. ++n;
  1147. return n;
  1148. }
  1149. /**
  1150. * Sets the maximum allowed number of threads. This overrides any
  1151. * value set in the constructor. If the new value is smaller than
  1152. * the current value, excess existing threads will be
  1153. * terminated when they next become idle.
  1154. *
  1155. * @param maximumPoolSize the new maximum
  1156. * @throws IllegalArgumentException if maximumPoolSize less than zero or
  1157. * the {@link #getCorePoolSize core pool size}
  1158. * @see #getMaximumPoolSize
  1159. */
  1160. public void setMaximumPoolSize(int maximumPoolSize) {
  1161. if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
  1162. throw new IllegalArgumentException();
  1163. final ReentrantLock mainLock = this.mainLock;
  1164. mainLock.lock();
  1165. try {
  1166. int extra = this.maximumPoolSize - maximumPoolSize;
  1167. this.maximumPoolSize = maximumPoolSize;
  1168. if (extra > 0 && poolSize > maximumPoolSize) {
  1169. Iterator<Worker> it = workers.iterator();
  1170. while (it.hasNext() &&
  1171. extra > 0 &&
  1172. poolSize > maximumPoolSize) {
  1173. it.next().interruptIfIdle();
  1174. --extra;
  1175. }
  1176. }
  1177. } finally {
  1178. mainLock.unlock();
  1179. }
  1180. }
  1181. /**
  1182. * Returns the maximum allowed number of threads.
  1183. *
  1184. * @return the maximum allowed number of threads
  1185. * @see #setMaximumPoolSize
  1186. */
  1187. public int getMaximumPoolSize() {
  1188. return maximumPoolSize;
  1189. }
  1190. /**
  1191. * Sets the time limit for which threads may remain idle before
  1192. * being terminated. If there are more than the core number of
  1193. * threads currently in the pool, after waiting this amount of
  1194. * time without processing a task, excess threads will be
  1195. * terminated. This overrides any value set in the constructor.
  1196. * @param time the time to wait. A time value of zero will cause
  1197. * excess threads to terminate immediately after executing tasks.
  1198. * @param unit the time unit of the time argument
  1199. * @throws IllegalArgumentException if time less than zero
  1200. * @see #getKeepAliveTime
  1201. */
  1202. public void setKeepAliveTime(long time, TimeUnit unit) {
  1203. if (time < 0)
  1204. throw new IllegalArgumentException();
  1205. this.keepAliveTime = unit.toNanos(time);
  1206. }
  1207. /**
  1208. * Returns the thread keep-alive time, which is the amount of time
  1209. * which threads in excess of the core pool size may remain
  1210. * idle before being terminated.
  1211. *
  1212. * @param unit the desired time unit of the result
  1213. * @return the time limit
  1214. * @see #setKeepAliveTime
  1215. */
  1216. public long getKeepAliveTime(TimeUnit unit) {
  1217. return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
  1218. }
  1219. /* Statistics */
  1220. /**
  1221. * Returns the current number of threads in the pool.
  1222. *
  1223. * @return the number of threads
  1224. */
  1225. public int getPoolSize() {
  1226. return poolSize;
  1227. }
  1228. /**
  1229. * Returns the approximate number of threads that are actively
  1230. * executing tasks.
  1231. *
  1232. * @return the number of threads
  1233. */
  1234. public int getActiveCount() {
  1235. final ReentrantLock mainLock = this.mainLock;
  1236. mainLock.lock();
  1237. try {
  1238. int n = 0;
  1239. for (Worker w : workers) {
  1240. if (w.isActive())
  1241. ++n;
  1242. }
  1243. return n;
  1244. } finally {
  1245. mainLock.unlock();
  1246. }
  1247. }
  1248. /**
  1249. * Returns the largest number of threads that have ever
  1250. * simultaneously been in the pool.
  1251. *
  1252. * @return the number of threads
  1253. */
  1254. public int getLargestPoolSize() {
  1255. final ReentrantLock mainLock = this.mainLock;
  1256. mainLock.lock();
  1257. try {
  1258. return largestPoolSize;
  1259. } finally {
  1260. mainLock.unlock();
  1261. }
  1262. }
  1263. /**
  1264. * Returns the approximate total number of tasks that have been
  1265. * scheduled for execution. Because the states of tasks and
  1266. * threads may change dynamically during computation, the returned
  1267. * value is only an approximation, but one that does not ever
  1268. * decrease across successive calls.
  1269. *
  1270. * @return the number of tasks
  1271. */
  1272. public long getTaskCount() {
  1273. final ReentrantLock mainLock = this.mainLock;
  1274. mainLock.lock();
  1275. try {
  1276. long n = completedTaskCount;
  1277. for (Worker w : workers) {
  1278. n += w.completedTasks;
  1279. if (w.isActive())
  1280. ++n;
  1281. }
  1282. return n + workQueue.size();
  1283. } finally {
  1284. mainLock.unlock();
  1285. }
  1286. }
  1287. /**
  1288. * Returns the approximate total number of tasks that have
  1289. * completed execution. Because the states of tasks and threads
  1290. * may change dynamically during computation, the returned value
  1291. * is only an approximation, but one that does not ever decrease
  1292. * across successive calls.
  1293. *
  1294. * @return the number of tasks
  1295. */
  1296. public long getCompletedTaskCount() {
  1297. final ReentrantLock mainLock = this.mainLock;
  1298. mainLock.lock();
  1299. try {
  1300. long n = completedTaskCount;
  1301. for (Worker w : workers)
  1302. n += w.completedTasks;
  1303. return n;
  1304. } finally {
  1305. mainLock.unlock();
  1306. }
  1307. }
  1308. /**
  1309. * Method invoked prior to executing the given Runnable in the
  1310. * given thread. This method is invoked by thread <tt>t</tt> that
  1311. * will execute task <tt>r</tt>, and may be used to re-initialize
  1312. * ThreadLocals, or to perform logging. Note: To properly nest
  1313. * multiple overridings, subclasses should generally invoke
  1314. * <tt>super.beforeExecute</tt> at the end of this method.
  1315. *
  1316. * @param t the thread that will run task r.
  1317. * @param r the task that will be executed.
  1318. */
  1319. protected void beforeExecute(Thread t, Runnable r) { }
  1320. /**
  1321. * Method invoked upon completion of execution of the given
  1322. * Runnable. This method is invoked by the thread that executed
  1323. * the task. If non-null, the Throwable is the uncaught exception
  1324. * that caused execution to terminate abruptly. Note: To properly
  1325. * nest multiple overridings, subclasses should generally invoke
  1326. * <tt>super.afterExecute</tt> at the beginning of this method.
  1327. *
  1328. * @param r the runnable that has completed.
  1329. * @param t the exception that caused termination, or null if
  1330. * execution completed normally.
  1331. */
  1332. protected void afterExecute(Runnable r, Throwable t) { }
  1333. /**
  1334. * Method invoked when the Executor has terminated. Default
  1335. * implementation does nothing. Note: To properly nest multiple
  1336. * overridings, subclasses should generally invoke
  1337. * <tt>super.terminated</tt> within this method.
  1338. */
  1339. protected void terminated() { }
  1340. /**
  1341. * A handler for rejected tasks that runs the rejected task
  1342. * directly in the calling thread of the <tt>execute</tt> method,
  1343. * unless the executor has been shut down, in which case the task
  1344. * is discarded.
  1345. */
  1346. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  1347. /**
  1348. * Creates a <tt>CallerRunsPolicy</tt>.
  1349. */
  1350. public CallerRunsPolicy() { }
  1351. /**
  1352. * Executes task r in the caller's thread, unless the executor
  1353. * has been shut down, in which case the task is discarded.
  1354. * @param r the runnable task requested to be executed
  1355. * @param e the executor attempting to execute this task
  1356. */
  1357. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  1358. if (!e.isShutdown()) {
  1359. r.run();
  1360. }
  1361. }
  1362. }
  1363. /**
  1364. * A handler for rejected tasks that throws a
  1365. * <tt>RejectedExecutionException</tt>.
  1366. */
  1367. public static class AbortPolicy implements RejectedExecutionHandler {
  1368. /**
  1369. * Creates an <tt>AbortPolicy</tt>.
  1370. */
  1371. public AbortPolicy() { }
  1372. /**
  1373. * Always throws RejectedExecutionException.
  1374. * @param r the runnable task requested to be executed
  1375. * @param e the executor attempting to execute this task
  1376. * @throws RejectedExecutionException always.
  1377. */
  1378. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  1379. throw new RejectedExecutionException();
  1380. }
  1381. }
  1382. /**
  1383. * A handler for rejected tasks that silently discards the
  1384. * rejected task.
  1385. */
  1386. public static class DiscardPolicy implements RejectedExecutionHandler {
  1387. /**
  1388. * Creates a <tt>DiscardPolicy</tt>.
  1389. */
  1390. public DiscardPolicy() { }
  1391. /**
  1392. * Does nothing, which has the effect of discarding task r.
  1393. * @param r the runnable task requested to be executed
  1394. * @param e the executor attempting to execute this task
  1395. */
  1396. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  1397. }
  1398. }
  1399. /**
  1400. * A handler for rejected tasks that discards the oldest unhandled
  1401. * request and then retries <tt>execute</tt>, unless the executor
  1402. * is shut down, in which case the task is discarded.
  1403. */
  1404. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  1405. /**
  1406. * Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
  1407. */
  1408. public DiscardOldestPolicy() { }
  1409. /**
  1410. * Obtains and ignores the next task that the executor
  1411. * would otherwise execute, if one is immediately available,
  1412. * and then retries execution of task r, unless the executor
  1413. * is shut down, in which case task r is instead discarded.
  1414. * @param r the runnable task requested to be executed
  1415. * @param e the executor attempting to execute this task
  1416. */
  1417. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  1418. if (!e.isShutdown()) {
  1419. e.getQueue().poll();
  1420. e.execute(r);
  1421. }
  1422. }
  1423. }
  1424. }