1. /*
  2. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  3. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  4. */
  5. package com.sun.corba.se.impl.orbutil.threadpool;
  6. import java.util.LinkedList;
  7. import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool;
  8. import com.sun.corba.se.spi.orbutil.threadpool.Work;
  9. import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue;
  10. import com.sun.corba.se.impl.orbutil.ORBConstants;
  11. import com.sun.corba.se.impl.orbutil.threadpool.ThreadPoolImpl;
  12. import com.sun.corba.se.spi.monitoring.MonitoringConstants;
  13. import com.sun.corba.se.spi.monitoring.MonitoringFactories;
  14. import com.sun.corba.se.spi.monitoring.MonitoredObject;
  15. import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;
  16. public class WorkQueueImpl implements WorkQueue
  17. {
  18. private ThreadPool workerThreadPool;
  19. private LinkedList theWorkQueue = new LinkedList();
  20. private long workItemsAdded = 0;
  21. // Initialized to 1 to avoid divide by zero in averageTimeInQueue()
  22. private long workItemsDequeued = 1;
  23. private long totalTimeInQueue = 0;
  24. // Name of the work queue
  25. private String name;
  26. // MonitoredObject for work queue
  27. private MonitoredObject workqueueMonitoredObject;
  28. public WorkQueueImpl() {
  29. name=ORBConstants.WORKQUEUE_DEFAULT_NAME;
  30. initializeMonitoring();
  31. }
  32. public WorkQueueImpl(ThreadPool workerThreadPool) {
  33. this(workerThreadPool, ORBConstants.WORKQUEUE_DEFAULT_NAME);
  34. }
  35. public WorkQueueImpl(ThreadPool workerThreadPool, String name) {
  36. this.workerThreadPool = workerThreadPool;
  37. this.name = name;
  38. initializeMonitoring();
  39. }
  40. // Setup monitoring for this workqueue
  41. private void initializeMonitoring() {
  42. workqueueMonitoredObject = MonitoringFactories.
  43. getMonitoredObjectFactory().
  44. createMonitoredObject(name,
  45. MonitoringConstants.WORKQUEUE_MONITORING_DESCRIPTION);
  46. LongMonitoredAttributeBase b1 = new
  47. LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED,
  48. MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED_DESCRIPTION) {
  49. public Object getValue() {
  50. return new Long(WorkQueueImpl.this.totalWorkItemsAdded());
  51. }
  52. };
  53. workqueueMonitoredObject.addAttribute(b1);
  54. LongMonitoredAttributeBase b2 = new
  55. LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE,
  56. MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE_DESCRIPTION) {
  57. public Object getValue() {
  58. return new Long(WorkQueueImpl.this.workItemsInQueue());
  59. }
  60. };
  61. workqueueMonitoredObject.addAttribute(b2);
  62. LongMonitoredAttributeBase b3 = new
  63. LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE,
  64. MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE_DESCRIPTION) {
  65. public Object getValue() {
  66. return new Long(WorkQueueImpl.this.averageTimeInQueue());
  67. }
  68. };
  69. workqueueMonitoredObject.addAttribute(b3);
  70. }
  71. // Package private method to get the monitored object for this
  72. // class
  73. MonitoredObject getMonitoredObject() {
  74. return workqueueMonitoredObject;
  75. }
  76. public void addWork(Work work) {
  77. synchronized (this) {
  78. workItemsAdded++;
  79. work.setEnqueueTime(System.currentTimeMillis());
  80. theWorkQueue.addLast(work);
  81. ((ThreadPoolImpl)workerThreadPool).notifyForAvailableWork(this);
  82. }
  83. }
  84. Work requestWork(long waitTime)
  85. throws TimeoutException, InterruptedException
  86. {
  87. Work workItem;
  88. synchronized (this) {
  89. if (theWorkQueue.size() != 0) {
  90. workItem = (Work)theWorkQueue.removeFirst();
  91. totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime();
  92. workItemsDequeued++;
  93. return workItem;
  94. }
  95. try {
  96. long remainingWaitTime = waitTime;
  97. long finishTime = System.currentTimeMillis() + waitTime;
  98. do {
  99. this.wait(remainingWaitTime);
  100. if (theWorkQueue.size() != 0) {
  101. workItem = (Work)theWorkQueue.removeFirst();
  102. totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime();
  103. workItemsDequeued++;
  104. return workItem;
  105. }
  106. remainingWaitTime = finishTime - System.currentTimeMillis();
  107. } while (remainingWaitTime > 0);
  108. throw new TimeoutException();
  109. } catch (InterruptedException ie) {
  110. throw ie;
  111. }
  112. }
  113. }
  114. public void setThreadPool(ThreadPool workerThreadPool) {
  115. this.workerThreadPool = workerThreadPool;
  116. }
  117. public ThreadPool getThreadPool() {
  118. return workerThreadPool;
  119. }
  120. /**
  121. * Returns the total number of Work items added to the Queue.
  122. * This method is unsynchronized and only gives a snapshot of the
  123. * state when it is called
  124. */
  125. public long totalWorkItemsAdded() {
  126. return workItemsAdded;
  127. }
  128. /**
  129. * Returns the total number of Work items in the Queue to be processed
  130. * This method is unsynchronized and only gives a snapshot of the
  131. * state when it is called
  132. */
  133. public int workItemsInQueue() {
  134. return theWorkQueue.size();
  135. }
  136. public synchronized long averageTimeInQueue() {
  137. return (totalTimeInQueueworkItemsDequeued);
  138. }
  139. public String getName() {
  140. return name;
  141. }
  142. }
  143. // End of file.