1. /*
  2. * @(#)file SnmpQManager.java
  3. * @(#)author Sun Microsystems, Inc.
  4. * @(#)version 1.6
  5. * @(#)date 04/09/15
  6. *
  7. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  8. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  9. *
  10. */
  11. package com.sun.jmx.snmp.daemon;
  12. import java.util.Vector;
  13. import java.io.Serializable;
  14. // import debug stuff
  15. //
  16. import com.sun.jmx.trace.Trace;
  17. /**
  18. * This class implements a server queue manager.
  19. * This class is for internal use.
  20. */
  21. final class SnmpQManager implements Serializable {
  22. // VARIABLES
  23. //----------
  24. private SendQ newq ;
  25. private WaitQ waitq ;
  26. private ThreadGroup queueThreadGroup = null ;
  27. private Thread requestQThread = null ;
  28. private Thread timerQThread = null ;
  29. static String dbgTag = "SnmpQManager";
  30. // CONSTRUCTORS
  31. //-------------
  32. SnmpQManager() {
  33. newq = new SendQ(20, 5) ;
  34. waitq = new WaitQ(20, 5) ;
  35. queueThreadGroup = new ThreadGroup("Qmanager Thread Group") ;
  36. // TIME BOMB HERE
  37. startQThreads() ;
  38. }
  39. public void startQThreads() {
  40. if (timerQThread == null || timerQThread.isAlive() == false) {
  41. timerQThread = new SnmpTimerServer(queueThreadGroup, this) ;
  42. }
  43. if (requestQThread == null || requestQThread.isAlive() == false) {
  44. requestQThread = new SnmpSendServer(queueThreadGroup, this) ;
  45. }
  46. }
  47. public void stopQThreads() {
  48. ((SnmpTimerServer)timerQThread).isBeingDestroyed = true;
  49. waitq.isBeingDestroyed = true;
  50. ((SnmpSendServer)requestQThread).isBeingDestroyed = true;
  51. newq.isBeingDestroyed = true;
  52. if (timerQThread != null && timerQThread.isAlive() == true) {
  53. ((SnmpTimerServer)timerQThread).stopTimerServer();
  54. }
  55. waitq = null;
  56. timerQThread = null;
  57. if (requestQThread != null && requestQThread.isAlive() == true) {
  58. ((SnmpSendServer)requestQThread).stopSendServer();
  59. }
  60. newq = null;
  61. requestQThread = null;
  62. }
  63. public void addRequest(SnmpInformRequest reqc) {
  64. newq.addRequest(reqc) ;
  65. return ;
  66. }
  67. public void addWaiting(SnmpInformRequest reqc) {
  68. waitq.addWaiting(reqc) ;
  69. return ;
  70. }
  71. public Vector getAllOutstandingRequest(long range) {
  72. return newq.getAllOutstandingRequest(range) ;
  73. }
  74. public SnmpInformRequest getTimeoutRequests() {
  75. return waitq.getTimeoutRequests() ;
  76. }
  77. public void removeRequest(SnmpInformRequest reqc) {
  78. newq.removeElement(reqc) ;
  79. waitq.removeElement(reqc) ;
  80. }
  81. public SnmpInformRequest removeRequest(long reqid) {
  82. SnmpInformRequest reqc = null ;
  83. if ((reqc = newq.removeRequest(reqid)) == null)
  84. reqc = waitq.removeRequest(reqid) ;
  85. return reqc ;
  86. }
  87. // TRACES & DEBUG
  88. //---------------
  89. static boolean isTraceOn() {
  90. return Trace.isSelected(Trace.LEVEL_TRACE, Trace.INFO_ADAPTOR_SNMP);
  91. }
  92. static void trace(String clz, String func, String info) {
  93. Trace.send(Trace.LEVEL_TRACE, Trace.INFO_ADAPTOR_SNMP, clz, func, info);
  94. }
  95. static void trace(String func, String info) {
  96. SnmpQManager.trace(dbgTag, func, info);
  97. }
  98. static boolean isDebugOn() {
  99. return Trace.isSelected(Trace.LEVEL_DEBUG, Trace.INFO_ADAPTOR_SNMP);
  100. }
  101. static void debug(String clz, String func, String info) {
  102. Trace.send(Trace.LEVEL_DEBUG, Trace.INFO_ADAPTOR_SNMP, clz, func, info);
  103. }
  104. static void debug(String clz, String func, Throwable exception) {
  105. Trace.send(Trace.LEVEL_DEBUG, Trace.INFO_ADAPTOR_SNMP, clz, func, exception);
  106. }
  107. static void debug(String func, String info) {
  108. SnmpQManager.debug(dbgTag, func, info);
  109. }
  110. static void debug(String func, Throwable exception) {
  111. SnmpQManager.debug(dbgTag, func, exception);
  112. }
  113. }
  114. /**
  115. * This vector manages the inform requests to be sent to the manager.
  116. */
  117. class SendQ extends Vector {
  118. SendQ(int initialCapacity, int capacityIncr) {
  119. super(initialCapacity , capacityIncr) ;
  120. }
  121. private synchronized void notifyClients() {
  122. this.notifyAll() ;
  123. }
  124. public synchronized void addRequest(SnmpInformRequest req) {
  125. long nextPoll = req.getAbsNextPollTime() ;
  126. int i ;
  127. for (i = size() ; i > 0 ; i--) {
  128. if (nextPoll < getRequestAt(i-1).getAbsNextPollTime())
  129. break ;
  130. }
  131. if (i == size()) {
  132. addElement(req) ;
  133. notifyClients() ;
  134. } else
  135. insertElementAt(req, i) ;
  136. return ;
  137. }
  138. public synchronized boolean waitUntilReady() {
  139. while (true) {
  140. if (isBeingDestroyed == true)
  141. return false;
  142. long tmp = 0 ;
  143. if (isEmpty() == false) {
  144. long currTime = System.currentTimeMillis() ;
  145. SnmpInformRequest req = (SnmpInformRequest) lastElement() ;
  146. tmp = req.getAbsNextPollTime() - currTime ;
  147. if (tmp <= 0) {
  148. return true ;
  149. }
  150. }
  151. waitOnThisQueue(tmp) ;
  152. }
  153. }
  154. public synchronized Vector getAllOutstandingRequest(long margin) {
  155. int i ;
  156. Vector outreq = new Vector() ;
  157. while (true) {
  158. if (waitUntilReady() == true) {
  159. long refTime = System.currentTimeMillis() + margin ;
  160. for (i = size() ; i > 0 ; i--) {
  161. SnmpInformRequest req = getRequestAt(i-1) ;
  162. if (req.getAbsNextPollTime() > refTime)
  163. break ;
  164. outreq.addElement(req) ;
  165. }
  166. if (! outreq.isEmpty()) {
  167. elementCount -= outreq.size() ;
  168. return outreq ;
  169. }
  170. }
  171. else
  172. return null;
  173. }
  174. }
  175. public synchronized void waitOnThisQueue(long time) {
  176. if (time == 0 && !isEmpty()) {
  177. if (SnmpQManager.isDebugOn()) {
  178. SnmpQManager.debug("waitOnThisQueue", "[" + Thread.currentThread().toString() + "]:" +
  179. "Fatal BUG :: Blocking on newq permenantly. But size = " + size());
  180. }
  181. }
  182. try {
  183. this.wait(time) ;
  184. } catch (InterruptedException e) {
  185. }
  186. }
  187. public SnmpInformRequest getRequestAt(int idx) {
  188. return (SnmpInformRequest)elementAt(idx) ;
  189. }
  190. public synchronized SnmpInformRequest removeRequest(long reqid) {
  191. int max= size() ;
  192. for (int i = 0 ; i < max ; i++) {
  193. SnmpInformRequest reqc = getRequestAt(i) ;
  194. if (reqid == reqc.getRequestId()) {
  195. removeElementAt(i) ;
  196. return reqc ;
  197. }
  198. }
  199. return null ;
  200. }
  201. // This boolean is used to stop handling requests while the corresponding SnmpQManager
  202. // is being destroyed.
  203. //
  204. boolean isBeingDestroyed = false;
  205. }
  206. /**
  207. * This vector manages the inform requests to be retried to the manager.
  208. */
  209. class WaitQ extends Vector {
  210. WaitQ(int initialCapacity, int capacityIncr) {
  211. super(initialCapacity , capacityIncr) ;
  212. }
  213. public synchronized void addWaiting(SnmpInformRequest req) {
  214. long waitTime = req.getAbsMaxTimeToWait() ;
  215. int i ;
  216. for (i = size() ; i > 0 ; i--) {
  217. if (waitTime < getRequestAt(i-1).getAbsMaxTimeToWait())
  218. break ;
  219. }
  220. if (i == size()) {
  221. addElement(req) ;
  222. notifyClients() ;
  223. } else
  224. insertElementAt(req, i) ;
  225. return ;
  226. }
  227. public synchronized boolean waitUntilReady() {
  228. while (true) {
  229. if (isBeingDestroyed == true)
  230. return false;
  231. long tmp = 0 ;
  232. if (isEmpty() == false) {
  233. long currTime = System.currentTimeMillis() ;
  234. SnmpInformRequest req = (SnmpInformRequest) lastElement() ;
  235. tmp = req.getAbsMaxTimeToWait() - currTime ;
  236. if (tmp <= 0) {
  237. return true ;
  238. }
  239. }
  240. waitOnThisQueue(tmp) ;
  241. }
  242. }
  243. public synchronized SnmpInformRequest getTimeoutRequests() {
  244. if (waitUntilReady() == true) {
  245. SnmpInformRequest req = (SnmpInformRequest) lastElement() ;
  246. elementCount-- ;
  247. return req ;
  248. }
  249. else {
  250. return null;
  251. }
  252. }
  253. private synchronized void notifyClients() {
  254. this.notifyAll() ;
  255. }
  256. public synchronized void waitOnThisQueue(long time) {
  257. if (time == 0 && !isEmpty()) {
  258. if (SnmpQManager.isDebugOn()) {
  259. SnmpQManager.debug("waitOnThisQueue", "[" + Thread.currentThread().toString() + "]:" +
  260. "Fatal BUG :: Blocking on waitq permenantly. But size = " + size());
  261. }
  262. }
  263. try {
  264. this.wait(time) ;
  265. } catch (InterruptedException e) {
  266. }
  267. }
  268. public SnmpInformRequest getRequestAt(int idx) {
  269. return (SnmpInformRequest)elementAt(idx) ;
  270. }
  271. public synchronized SnmpInformRequest removeRequest(long reqid) {
  272. int max= size();
  273. for (int i = 0 ; i < max ; i++) {
  274. SnmpInformRequest reqc = getRequestAt(i) ;
  275. if (reqid == reqc.getRequestId()) {
  276. removeElementAt(i) ;
  277. return reqc ;
  278. }
  279. }
  280. return null ;
  281. }
  282. // This boolean is used to stop handling requests while the corresponding SnmpQManager
  283. // is being destroyed.
  284. //
  285. boolean isBeingDestroyed = false;
  286. }