1. /*
  2. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  3. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  4. */
  5. package com.sun.corba.se.impl.orbutil.threadpool;
  6. import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
  7. import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
  8. import com.sun.corba.se.spi.orbutil.threadpool.Work;
  9. import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;
  10. import com.sun.corba.se.impl.orbutil.ORBConstants;
  11. import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl;
  12. import com.sun.corba.se.spi.monitoring.MonitoringConstants;
  13. import com.sun.corba.se.spi.monitoring.MonitoredObject;
  14. import com.sun.corba.se.spi.monitoring.MonitoringFactories;
  15. import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;
  16. public class ThreadPoolImpl implements ThreadPool
  17. {
  18. private static int threadCounter = 0; // serial counter useful for debugging
  19. private WorkQueue workQueue;
  20. // Stores the number of available worker threads
  21. private int availableWorkerThreads = 0;
  22. // Stores the number of threads in the threadpool currently
  23. private int currentThreadCount = 0;
  24. // Minimum number of worker threads created at instantiation of the threadpool
  25. private int minWorkerThreads = 0;
  26. // Maximum number of worker threads in the threadpool
  27. private int maxWorkerThreads = 0;
  28. // Inactivity timeout value for worker threads to exit and stop running
  29. private long inactivityTimeout;
  30. // Indicates if the threadpool is bounded or unbounded
  31. private boolean boundedThreadPool = false;
  32. // Running count of the work items processed
  33. // Set the value to 1 so that divide by zero is avoided in
  34. // averageWorkCompletionTime()
  35. private long processedCount = 1;
  36. // Running aggregate of the time taken in millis to execute work items
  37. // processed by the threads in the threadpool
  38. private long totalTimeTaken = 0;
  39. // Lock for protecting state when required
  40. private Object lock = new Object();
  41. // Name of the ThreadPool
  42. private String name;
  43. // MonitoredObject for ThreadPool
  44. private MonitoredObject threadpoolMonitoredObject;
  45. // ThreadGroup in which threads should be created
  46. private ThreadGroup threadGroup ;
  47. /**
  48. * This constructor is used to create an unbounded threadpool
  49. */
  50. public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) {
  51. inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT;
  52. maxWorkerThreads = Integer.MAX_VALUE;
  53. workQueue = new WorkQueueImpl(this);
  54. threadGroup = tg ;
  55. name = threadpoolName;
  56. initializeMonitoring();
  57. }
  58. /**
  59. * This constructor is used to create an unbounded threadpool
  60. * in the ThreadGroup of the current thread
  61. */
  62. public ThreadPoolImpl(String threadpoolName) {
  63. this( Thread.currentThread().getThreadGroup(), threadpoolName ) ;
  64. }
  65. /**
  66. * This constructor is used to create bounded threadpool
  67. */
  68. public ThreadPoolImpl(int minSize, int maxSize, long timeout,
  69. String threadpoolName)
  70. {
  71. minWorkerThreads = minSize;
  72. maxWorkerThreads = maxSize;
  73. inactivityTimeout = timeout;
  74. boundedThreadPool = true;
  75. workQueue = new WorkQueueImpl(this);
  76. name = threadpoolName;
  77. for (int i = 0; i < minWorkerThreads; i++) {
  78. createWorkerThread();
  79. }
  80. initializeMonitoring();
  81. }
  82. // Setup monitoring for this threadpool
  83. private void initializeMonitoring() {
  84. // Get root monitored object
  85. MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory().
  86. createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null).
  87. getRootMonitoredObject();
  88. // Create the threadpool monitoring root
  89. MonitoredObject threadPoolMonitoringObjectRoot = root.getChild(
  90. MonitoringConstants.THREADPOOL_MONITORING_ROOT);
  91. if (threadPoolMonitoringObjectRoot == null) {
  92. threadPoolMonitoringObjectRoot = MonitoringFactories.
  93. getMonitoredObjectFactory().createMonitoredObject(
  94. MonitoringConstants.THREADPOOL_MONITORING_ROOT,
  95. MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION);
  96. root.addChild(threadPoolMonitoringObjectRoot);
  97. }
  98. threadpoolMonitoredObject = MonitoringFactories.
  99. getMonitoredObjectFactory().
  100. createMonitoredObject(name,
  101. MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION);
  102. threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject);
  103. LongMonitoredAttributeBase b1 = new
  104. LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS,
  105. MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
  106. public Object getValue() {
  107. return new Long(ThreadPoolImpl.this.currentNumberOfThreads());
  108. }
  109. };
  110. threadpoolMonitoredObject.addAttribute(b1);
  111. LongMonitoredAttributeBase b2 = new
  112. LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS,
  113. MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) {
  114. public Object getValue() {
  115. return new Long(ThreadPoolImpl.this.numberOfAvailableThreads());
  116. }
  117. };
  118. threadpoolMonitoredObject.addAttribute(b2);
  119. LongMonitoredAttributeBase b3 = new
  120. LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS,
  121. MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) {
  122. public Object getValue() {
  123. return new Long(ThreadPoolImpl.this.numberOfBusyThreads());
  124. }
  125. };
  126. threadpoolMonitoredObject.addAttribute(b3);
  127. LongMonitoredAttributeBase b4 = new
  128. LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME,
  129. MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) {
  130. public Object getValue() {
  131. return new Long(ThreadPoolImpl.this.averageWorkCompletionTime());
  132. }
  133. };
  134. threadpoolMonitoredObject.addAttribute(b4);
  135. LongMonitoredAttributeBase b5 = new
  136. LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT,
  137. MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) {
  138. public Object getValue() {
  139. return new Long(ThreadPoolImpl.this.currentProcessedCount());
  140. }
  141. };
  142. threadpoolMonitoredObject.addAttribute(b5);
  143. // Add the monitored object for the WorkQueue
  144. threadpoolMonitoredObject.addChild(
  145. ((WorkQueueImpl)workQueue).getMonitoredObject());
  146. }
  147. // Package private method to get the monitored object for this
  148. // class
  149. MonitoredObject getMonitoredObject() {
  150. return threadpoolMonitoredObject;
  151. }
  152. public WorkQueue getAnyWorkQueue()
  153. {
  154. return workQueue;
  155. }
  156. public WorkQueue getWorkQueue(int queueId)
  157. throws NoSuchWorkQueueException
  158. {
  159. if (queueId != 0)
  160. throw new NoSuchWorkQueueException();
  161. return workQueue;
  162. }
  163. /**
  164. * To be called from the workqueue when work is added to the
  165. * workQueue. This method would create new threads if required
  166. * or notify waiting threads on the queue for available work
  167. */
  168. void notifyForAvailableWork(WorkQueue aWorkQueue) {
  169. synchronized (lock) {
  170. if (availableWorkerThreads == 0) {
  171. createWorkerThread();
  172. } else {
  173. aWorkQueue.notify();
  174. }
  175. }
  176. }
  177. /**
  178. * To be called from the workqueue to create worker threads when none
  179. * available.
  180. */
  181. void createWorkerThread() {
  182. WorkerThread thread;
  183. synchronized (lock) {
  184. if (boundedThreadPool) {
  185. if (currentThreadCount < maxWorkerThreads) {
  186. thread = new WorkerThread(threadGroup, getName());
  187. currentThreadCount++;
  188. } else {
  189. // REVIST - Need to create a thread to monitor the
  190. // the state for deadlock i.e. all threads waiting for
  191. // something which can be got from the item in the
  192. // workqueue, but there is no thread available to
  193. // process that work item - DEADLOCK !!
  194. return;
  195. }
  196. } else {
  197. thread = new WorkerThread(threadGroup, getName());
  198. currentThreadCount++;
  199. }
  200. }
  201. // The thread must be set to a daemon thread so the
  202. // VM can exit if the only threads left are PooledThreads
  203. // or other daemons. We don't want to rely on the
  204. // calling thread always being a daemon.
  205. // Catch exceptions since setDaemon can cause a
  206. // security exception to be thrown under netscape
  207. // in the Applet mode
  208. try {
  209. thread.setDaemon(true);
  210. } catch (Exception e) {
  211. // REVISIT - need to do some logging here
  212. }
  213. thread.start();
  214. }
  215. /**
  216. * This method will return the minimum number of threads maintained
  217. * by the threadpool.
  218. */
  219. public int minimumNumberOfThreads() {
  220. return minWorkerThreads;
  221. }
  222. /**
  223. * This method will return the maximum number of threads in the
  224. * threadpool at any point in time, for the life of the threadpool
  225. */
  226. public int maximumNumberOfThreads() {
  227. return maxWorkerThreads;
  228. }
  229. /**
  230. * This method will return the time in milliseconds when idle
  231. * threads in the threadpool are removed.
  232. */
  233. public long idleTimeoutForThreads() {
  234. return inactivityTimeout;
  235. }
  236. /**
  237. * This method will return the total number of threads currently in the
  238. * threadpool. This method returns a value which is not synchronized.
  239. */
  240. public int currentNumberOfThreads() {
  241. synchronized (lock) {
  242. return currentThreadCount;
  243. }
  244. }
  245. /**
  246. * This method will return the number of available threads in the
  247. * threadpool which are waiting for work. This method returns a
  248. * value which is not synchronized.
  249. */
  250. public int numberOfAvailableThreads() {
  251. synchronized (lock) {
  252. return availableWorkerThreads;
  253. }
  254. }
  255. /**
  256. * This method will return the number of busy threads in the threadpool
  257. * This method returns a value which is not synchronized.
  258. */
  259. public int numberOfBusyThreads() {
  260. synchronized (lock) {
  261. return (currentThreadCount - availableWorkerThreads);
  262. }
  263. }
  264. /**
  265. * This method returns the average elapsed time taken to complete a Work
  266. * item in milliseconds.
  267. */
  268. public long averageWorkCompletionTime() {
  269. synchronized (lock) {
  270. return (totalTimeTaken / processedCount);
  271. }
  272. }
  273. /**
  274. * This method returns the number of Work items processed by the threadpool
  275. */
  276. public long currentProcessedCount() {
  277. synchronized (lock) {
  278. return processedCount;
  279. }
  280. }
  281. public String getName() {
  282. return name;
  283. }
  284. /**
  285. * This method will return the number of WorkQueues serviced by the threadpool.
  286. */
  287. public int numberOfWorkQueues() {
  288. return 1;
  289. }
  290. private static synchronized int getUniqueThreadId() {
  291. return ThreadPoolImpl.threadCounter++;
  292. }
  293. private class WorkerThread extends Thread
  294. {
  295. private Work currentWork;
  296. private int threadId = 0; // unique id for the thread
  297. // thread pool this WorkerThread belongs too
  298. private String threadPoolName;
  299. // name seen by Thread.getName()
  300. private StringBuffer workerThreadName = new StringBuffer();
  301. WorkerThread(ThreadGroup tg, String threadPoolName) {
  302. super(tg, "Idle");
  303. this.threadId = ThreadPoolImpl.getUniqueThreadId();
  304. this.threadPoolName = threadPoolName;
  305. setName(composeWorkerThreadName(threadPoolName, "Idle"));
  306. }
  307. public void run() {
  308. while (true) {
  309. try {
  310. synchronized (lock) {
  311. availableWorkerThreads++;
  312. }
  313. // Get some work to do
  314. currentWork = ((WorkQueueImpl)workQueue).requestWork(inactivityTimeout);
  315. synchronized (lock) {
  316. availableWorkerThreads--;
  317. // It is possible in notifyForAvailableWork that the
  318. // check for availableWorkerThreads = 0 may return
  319. // false, because the availableWorkerThreads has not been
  320. // decremented to zero before the producer thread added
  321. // work to the queue. This may create a deadlock, if the
  322. // executing thread needs information which is in the work
  323. // item queued in the workqueue, but has no thread to work
  324. // on it since none was created because availableWorkerThreads = 0
  325. // returned false.
  326. // The following code will ensure that a thread is always available
  327. // in those situations
  328. if ((availableWorkerThreads == 0) &&
  329. (workQueue.workItemsInQueue() > 0)) {
  330. createWorkerThread();
  331. }
  332. }
  333. // Set the thread name for debugging.
  334. setName(composeWorkerThreadName(threadPoolName,
  335. Integer.toString(this.threadId)));
  336. long start = System.currentTimeMillis();
  337. try {
  338. // Do the work
  339. currentWork.doWork();
  340. } catch (Throwable t) {
  341. // Ignore all errors.
  342. ;
  343. }
  344. long end = System.currentTimeMillis();
  345. synchronized (lock) {
  346. totalTimeTaken += (end - start);
  347. processedCount++;
  348. }
  349. // set currentWork to null so that the work item can be
  350. // garbage collected
  351. currentWork = null;
  352. setName(composeWorkerThreadName(threadPoolName, "Idle"));
  353. } catch (TimeoutException e) {
  354. // This thread timed out waiting for something to do.
  355. synchronized (lock) {
  356. availableWorkerThreads--;
  357. // This should for both bounded and unbounded case
  358. if (currentThreadCount > minWorkerThreads) {
  359. currentThreadCount--;
  360. // This thread can exit.
  361. return;
  362. } else {
  363. // Go back to waiting on workQueue
  364. continue;
  365. }
  366. }
  367. } catch (InterruptedException ie) {
  368. // InterruptedExceptions are
  369. // caught here. Thus, threads can be forced out of
  370. // requestWork and so they have to reacquire the lock.
  371. // Other options include ignoring or
  372. // letting this thread die.
  373. // Ignoring for now. REVISIT
  374. synchronized (lock) {
  375. availableWorkerThreads--;
  376. }
  377. } catch (Throwable e) {
  378. // Ignore any exceptions that currentWork.process
  379. // accidently lets through, but let Errors pass.
  380. // Add debugging output? REVISIT
  381. synchronized (lock) {
  382. availableWorkerThreads--;
  383. }
  384. }
  385. }
  386. }
  387. private String composeWorkerThreadName(String poolName, String workerName) {
  388. workerThreadName.setLength(0);
  389. workerThreadName.append("p: ").append(poolName);
  390. workerThreadName.append("; w: ").append(workerName);
  391. return workerThreadName.toString();
  392. }
  393. } // End of WorkerThread class
  394. }
  395. // End of file.