1. /*
  2. * @(#)file CommunicatorServer.java
  3. * @(#)author Sun Microsystems, Inc.
  4. * @(#)version 1.58
  5. * @(#)lastedit 04/02/19
  6. *
  7. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  8. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  9. *
  10. */
  11. package com.sun.jmx.snmp.daemon;
  12. // java import
  13. //
  14. import java.io.ObjectInputStream;
  15. import java.io.IOException;
  16. import java.net.InetAddress;
  17. import java.util.Date;
  18. import java.util.Vector;
  19. import java.util.Enumeration;
  20. // jmx import
  21. //
  22. import javax.management.MBeanServer;
  23. import javax.management.MBeanRegistration;
  24. import javax.management.ObjectName;
  25. import javax.management.NotificationListener;
  26. import javax.management.NotificationFilter;
  27. import javax.management.NotificationBroadcaster;
  28. import javax.management.NotificationBroadcasterSupport;
  29. import javax.management.MBeanNotificationInfo;
  30. import javax.management.AttributeChangeNotification;
  31. import javax.management.ListenerNotFoundException;
  32. import javax.management.loading.ClassLoaderRepository;
  33. import javax.management.MBeanServerFactory;
  34. // jmx RI import
  35. //
  36. import com.sun.jmx.trace.Trace;
  37. // JSR 160 import
  38. //
  39. // XXX Revisit:
  40. // used to import com.sun.jmx.snmp.MBeanServerForwarder
  41. // Now using JSR 160 instead. => this is an additional
  42. // dependency to JSR 160.
  43. //
  44. import javax.management.remote.MBeanServerForwarder;
  45. /**
  46. * Defines generic behavior for the server part of a connector or an adaptor.
  47. * Most connectors or adaptors extend <CODE>CommunicatorServer</CODE>
  48. * and inherit this behavior. Connectors or adaptors that do not fit into
  49. * this model do not extend <CODE>CommunicatorServer</CODE>.
  50. * <p>
  51. * A <CODE>CommunicatorServer</CODE> is an active object, it listens for
  52. * client requests and processes them in its own thread. When necessary, a
  53. * <CODE>CommunicatorServer</CODE> creates other threads to process multiple
  54. * requests concurrently.
  55. * <p>
  56. * A <CODE>CommunicatorServer</CODE> object can be stopped by calling the
  57. * <CODE>stop</CODE> method. When it is stopped, the
  58. * <CODE>CommunicatorServer</CODE> no longer listens to client requests and
  59. * no longer holds any thread or communication resources.
  60. * It can be started again by calling the <CODE>start</CODE> method.
  61. * <p>
  62. * A <CODE>CommunicatorServer</CODE> has a <CODE>State</CODE> attribute
  63. * which reflects its activity.
  64. * <p>
  65. * <TABLE>
  66. * <TR><TH>CommunicatorServer</TH> <TH>State</TH></TR>
  67. * <TR><TD><CODE>stopped</CODE></TD> <TD><CODE>OFFLINE</CODE></TD></TR>
  68. * <TR><TD><CODE>starting</CODE></TD> <TD><CODE>STARTING</CODE></TD></TR>
  69. * <TR><TD><CODE>running</CODE></TD> <TD><CODE>ONLINE</CODE></TD></TR>
  70. * <TR><TD><CODE>stopping</CODE></TD> <TD><CODE>STOPPING</CODE></TD></TR>
  71. * </TABLE>
  72. * <p>
  73. * The <CODE>STARTING</CODE> state marks the transition
  74. * from <CODE>OFFLINE</CODE> to <CODE>ONLINE</CODE>.
  75. * <p>
  76. * The <CODE>STOPPING</CODE> state marks the transition from
  77. * <CODE>ONLINE</CODE> to <CODE>OFFLINE</CODE>. This occurs when the
  78. * <CODE>CommunicatorServer</CODE> is finishing or interrupting active
  79. * requests.
  80. * <p>
  81. * When a <CODE>CommunicatorServer</CODE> is unregistered from the MBeanServer,
  82. * it is stopped automatically.
  83. * <p>
  84. * When the value of the <CODE>State</CODE> attribute changes the
  85. * <CODE>CommunicatorServer</CODE> sends a
  86. * <tt>{@link javax.management.AttributeChangeNotification}</tt> to the
  87. * registered listeners, if any.
  88. *
  89. * <p><b>This API is a Sun Microsystems internal API and is subject
  90. * to change without notice.</b></p>
  91. * @version 1.58 02/19/04
  92. * @author Sun Microsystems, Inc
  93. */
  94. public abstract class CommunicatorServer
  95. implements Runnable, MBeanRegistration, NotificationBroadcaster,
  96. CommunicatorServerMBean {
  97. //
  98. // States of a CommunicatorServer
  99. //
  100. /**
  101. * Represents an <CODE>ONLINE</CODE> state.
  102. */
  103. public static final int ONLINE = 0 ;
  104. /**
  105. * Represents an <CODE>OFFLINE</CODE> state.
  106. */
  107. public static final int OFFLINE = 1 ;
  108. /**
  109. * Represents a <CODE>STOPPING</CODE> state.
  110. */
  111. public static final int STOPPING = 2 ;
  112. /**
  113. * Represents a <CODE>STARTING</CODE> state.
  114. */
  115. public static final int STARTING = 3 ;
  116. //
  117. // Types of connectors.
  118. //
  119. /**
  120. * Indicates that it is an RMI connector type.
  121. */
  122. //public static final int RMI_TYPE = 1 ;
  123. /**
  124. * Indicates that it is an HTTP connector type.
  125. */
  126. //public static final int HTTP_TYPE = 2 ;
  127. /**
  128. * Indicates that it is an HTML connector type.
  129. */
  130. //public static final int HTML_TYPE = 3 ;
  131. /**
  132. * Indicates that it is an SNMP connector type.
  133. */
  134. public static final int SNMP_TYPE = 4 ;
  135. /**
  136. * Indicates that it is an HTTPS connector type.
  137. */
  138. //public static final int HTTPS_TYPE = 5 ;
  139. //
  140. // Package variables
  141. //
  142. /**
  143. * The state of the connector server.
  144. */
  145. transient volatile int state = OFFLINE ;
  146. /**
  147. * The object name of the connector server.
  148. * @serial
  149. */
  150. ObjectName objectName ;
  151. MBeanServer topMBS;
  152. MBeanServer bottomMBS;
  153. /**
  154. */
  155. transient String dbgTag = null ;
  156. /**
  157. * The maximum number of clients that the CommunicatorServer can
  158. * process concurrently.
  159. * @serial
  160. */
  161. int maxActiveClientCount = 1 ;
  162. /**
  163. */
  164. transient int servedClientCount = 0 ;
  165. /**
  166. * The host name used by this CommunicatorServer.
  167. * @serial
  168. */
  169. String host = null ;
  170. /**
  171. * The port number used by this CommunicatorServer.
  172. * @serial
  173. */
  174. int port = -1 ;
  175. //
  176. // Private fields
  177. //
  178. /* This object controls access to the "state" and "interrupted" variables.
  179. If held at the same time as the lock on "this", the "this" lock must
  180. be taken first. */
  181. private transient Object stateLock = new Object();
  182. private transient Vector clientHandlerVector = new Vector() ;
  183. private transient Thread fatherThread = Thread.currentThread() ;
  184. private transient Thread mainThread = null ;
  185. private volatile boolean stopRequested = false ;
  186. private boolean interrupted = false;
  187. private transient Exception startException = null;
  188. // Notifs count, broadcaster and info
  189. private transient long notifCount = 0;
  190. private transient NotificationBroadcasterSupport notifBroadcaster =
  191. new NotificationBroadcasterSupport();
  192. private transient MBeanNotificationInfo[] notifInfos = null;
  193. /**
  194. * Instantiates a <CODE>CommunicatorServer</CODE>.
  195. *
  196. * @param connectorType Indicates the connector type. Possible values are:
  197. * SNMP_TYPE.
  198. *
  199. * @exception <CODE>java.lang.IllegalArgumentException</CODE>
  200. * This connector type is not correct.
  201. */
  202. public CommunicatorServer(int connectorType)
  203. throws IllegalArgumentException {
  204. switch (connectorType) {
  205. case SNMP_TYPE :
  206. infoType = Trace.INFO_ADAPTOR_SNMP ;
  207. break;
  208. default:
  209. throw new IllegalArgumentException("Invalid connector Type") ;
  210. }
  211. dbgTag = makeDebugTag() ;
  212. }
  213. protected Thread createMainThread() {
  214. return new Thread (this, makeThreadName());
  215. }
  216. /**
  217. * Starts this <CODE>CommunicatorServer</CODE>.
  218. * <p>
  219. * Has no effect if this <CODE>CommunicatorServer</CODE> is
  220. * <CODE>ONLINE</CODE> or <CODE>STOPPING</CODE>.
  221. * @param timeout Time in ms to wait for the connector to start.
  222. * If <code>timeout</code> is positive, wait for at most
  223. * the specified time. An infinite timeout can be specified
  224. * by passing a <code>timeout</code> value equals
  225. * <code>Long.MAX_VALUE</code>. In that case the method
  226. * will wait until the connector starts or fails to start.
  227. * If timeout is negative or zero, returns as soon as possible
  228. * without waiting.
  229. * @exception CommunicationException if the connectors fails to start.
  230. * @exception InterruptedException if the thread is interrupted or the
  231. * timeout expires.
  232. */
  233. public void start(long timeout)
  234. throws CommunicationException, InterruptedException {
  235. boolean start;
  236. synchronized (stateLock) {
  237. if (state == STOPPING) {
  238. // Fix for bug 4352451:
  239. // "java.net.BindException: Address in use".
  240. waitState(OFFLINE, 60000);
  241. }
  242. start = (state == OFFLINE);
  243. if (start) {
  244. changeState(STARTING);
  245. stopRequested = false;
  246. interrupted = false;
  247. startException = null;
  248. }
  249. }
  250. if (!start) {
  251. if (isTraceOn())
  252. trace("start","Connector is not OFFLINE") ;
  253. return;
  254. }
  255. if (isTraceOn())
  256. trace("start","--> Start connector ") ;
  257. mainThread = createMainThread();
  258. mainThread.start() ;
  259. if (timeout > 0) waitForStart(timeout);
  260. }
  261. /**
  262. * Starts this <CODE>CommunicatorServer</CODE>.
  263. * <p>
  264. * Has no effect if this <CODE>CommunicatorServer</CODE> is
  265. * <CODE>ONLINE</CODE> or <CODE>STOPPING</CODE>.
  266. */
  267. public void start() {
  268. try {
  269. start(0);
  270. } catch (InterruptedException x) {
  271. // can not happen because of `0'
  272. trace("start","interrupted: " + x);
  273. }
  274. }
  275. /**
  276. * Stops this <CODE>CommunicatorServer</CODE>.
  277. * <p>
  278. * Has no effect if this <CODE>CommunicatorServer</CODE> is
  279. * <CODE>OFFLINE</CODE> or <CODE>STOPPING</CODE>.
  280. */
  281. public void stop() {
  282. synchronized (stateLock) {
  283. if (state == OFFLINE || state == STOPPING) {
  284. if (isTraceOn())
  285. trace("stop","Connector is not ONLINE") ;
  286. return;
  287. }
  288. changeState(STOPPING);
  289. //
  290. // Stop the connector thread
  291. //
  292. if (isTraceOn())
  293. trace("stop","Interrupt main thread") ;
  294. stopRequested = true ;
  295. if (!interrupted) {
  296. interrupted = true;
  297. mainThread.interrupt();
  298. }
  299. }
  300. //
  301. // Call terminate on each active client handler
  302. //
  303. if (isTraceOn()) {
  304. trace("stop","terminateAllClient") ;
  305. }
  306. terminateAllClient() ;
  307. // ----------------------
  308. // changeState
  309. // ----------------------
  310. synchronized (stateLock) {
  311. if (state == STARTING)
  312. changeState(OFFLINE);
  313. }
  314. }
  315. /**
  316. * Tests whether the <CODE>CommunicatorServer</CODE> is active.
  317. *
  318. * @return True if connector is <CODE>ONLINE</CODE> false otherwise.
  319. */
  320. public boolean isActive() {
  321. synchronized (stateLock) {
  322. return (state == ONLINE);
  323. }
  324. }
  325. /**
  326. * <p>Waits until either the State attribute of this MBean equals the
  327. * specified <VAR>wantedState</VAR> parameter,
  328. * or the specified <VAR>timeOut</VAR> has elapsed.
  329. * The method <CODE>waitState</CODE> returns with a boolean value
  330. * indicating whether the specified <VAR>wantedState</VAR> parameter
  331. * equals the value of this MBean's State attribute at the time the method
  332. * terminates.</p>
  333. *
  334. * <p>Two special cases for the <VAR>timeOut</VAR> parameter value are:</p>
  335. * <UL><LI> if <VAR>timeOut</VAR> is negative then <CODE>waitState</CODE>
  336. * returns immediately (i.e. does not wait at all),</LI>
  337. * <LI> if <VAR>timeOut</VAR> equals zero then <CODE>waitState</CODE>
  338. * waits untill the value of this MBean's State attribute
  339. * is the same as the <VAR>wantedState</VAR> parameter (i.e. will wait
  340. * indefinitely if this condition is never met).</LI></UL>
  341. *
  342. * @param wantedState The value of this MBean's State attribute to wait
  343. * for. <VAR>wantedState</VAR> can be one of:
  344. * <ul>
  345. * <li><CODE>CommunicatorServer.OFFLINE</CODE>,</li>
  346. * <li><CODE>CommunicatorServer.ONLINE</CODE>,</li>
  347. * <li><CODE>CommunicatorServer.STARTING</CODE>,</li>
  348. * <li><CODE>CommunicatorServer.STOPPING</CODE>.</li>
  349. * </ul>
  350. * @param timeOut The maximum time to wait for, in milliseconds,
  351. * if positive.
  352. * Infinite time out if 0, or no waiting at all if negative.
  353. *
  354. * @return true if the value of this MBean's State attribute is the
  355. * same as the <VAR>wantedState</VAR> parameter; false otherwise.
  356. */
  357. public boolean waitState(int wantedState, long timeOut) {
  358. if (isTraceOn())
  359. trace("waitState", wantedState + "(0on,1off,2st) TO=" + timeOut +
  360. " ; current state = " + getStateString());
  361. long endTime = 0;
  362. if (timeOut > 0)
  363. endTime = System.currentTimeMillis() + timeOut;
  364. synchronized (stateLock) {
  365. while (state != wantedState) {
  366. if (timeOut < 0) {
  367. if (isTraceOn())
  368. trace("waitState", "timeOut < 0, return without wait");
  369. return false;
  370. } else {
  371. try {
  372. if (timeOut > 0) {
  373. long toWait = endTime - System.currentTimeMillis();
  374. if (toWait <= 0) {
  375. if (isTraceOn())
  376. trace("waitState", "timed out");
  377. return false;
  378. }
  379. stateLock.wait(toWait);
  380. } else { // timeOut == 0
  381. stateLock.wait();
  382. }
  383. } catch (InterruptedException e) {
  384. if (isTraceOn())
  385. trace("waitState", "wait interrupted");
  386. return (state == wantedState);
  387. }
  388. }
  389. }
  390. if (isTraceOn())
  391. trace("waitState", "returning in desired state");
  392. return true;
  393. }
  394. }
  395. /**
  396. * <p>Waits until the communicator is started or timeout expires.
  397. *
  398. * @param timeout Time in ms to wait for the connector to start.
  399. * If <code>timeout</code> is positive, wait for at most
  400. * the specified time. An infinite timeout can be specified
  401. * by passing a <code>timeout</code> value equals
  402. * <code>Long.MAX_VALUE</code>. In that case the method
  403. * will wait until the connector starts or fails to start.
  404. * If timeout is negative or zero, returns as soon as possible
  405. * without waiting.
  406. *
  407. * @exception CommunicationException if the connectors fails to start.
  408. * @exception InterruptedException if the thread is interrupted or the
  409. * timeout expires.
  410. *
  411. */
  412. private void waitForStart(long timeout)
  413. throws CommunicationException, InterruptedException {
  414. if (isTraceOn())
  415. trace("waitForStart", "Timeout=" + timeout +
  416. " ; current state = " + getStateString());
  417. final long startTime = System.currentTimeMillis();
  418. synchronized (stateLock) {
  419. while (state == STARTING) {
  420. // Time elapsed since startTime...
  421. //
  422. final long elapsed = System.currentTimeMillis() - startTime;
  423. // wait for timeout - elapsed.
  424. // A timeout of Long.MAX_VALUE is equivalent to something
  425. // like 292271023 years - which is pretty close to
  426. // forever as far as we are concerned ;-)
  427. //
  428. final long remainingTime = timeout-elapsed;
  429. // If remainingTime is negative, the timeout has elapsed.
  430. //
  431. if (remainingTime < 0) {
  432. if (isTraceOn())
  433. trace("waitForStart",
  434. "timeout < 0, return without wait");
  435. throw new InterruptedException("Timeout expired");
  436. }
  437. // We're going to wait until someone notifies on the
  438. // the stateLock object, or until the timeout expires,
  439. // or until the thread is interrupted.
  440. //
  441. try {
  442. stateLock.wait(remainingTime);
  443. } catch (InterruptedException e) {
  444. if (isTraceOn())
  445. trace("waitForStart", "wait interrupted");
  446. // If we are now ONLINE, then no need to rethrow the
  447. // exception... we're simply going to exit the while
  448. // loop. Otherwise, throw the InterruptedException.
  449. //
  450. if (state != ONLINE) throw e;
  451. }
  452. }
  453. // We're no longer in STARTING state
  454. //
  455. if (state == ONLINE) {
  456. // OK, we're started, everything went fine, just return
  457. //
  458. if (isTraceOn()) trace("waitForStart", "started");
  459. return;
  460. } else if (startException instanceof CommunicationException) {
  461. // There was some exception during the starting phase.
  462. // Cast and throw...
  463. //
  464. throw (CommunicationException)startException;
  465. } else if (startException instanceof InterruptedException) {
  466. // There was some exception during the starting phase.
  467. // Cast and throw...
  468. //
  469. throw (InterruptedException)startException;
  470. } else if (startException != null) {
  471. // There was some exception during the starting phase.
  472. // Wrap and throw...
  473. //
  474. throw new CommunicationException(startException,
  475. "Failed to start: "+
  476. startException);
  477. } else {
  478. // We're not ONLINE, and there's no exception...
  479. // Something went wrong but we don't know what...
  480. //
  481. throw new CommunicationException("Failed to start: state is "+
  482. getStringForState(state));
  483. }
  484. }
  485. }
  486. /**
  487. * Gets the state of this <CODE>CommunicatorServer</CODE> as an integer.
  488. *
  489. * @return <CODE>ONLINE</CODE>, <CODE>OFFLINE</CODE>,
  490. * <CODE>STARTING</CODE> or <CODE>STOPPING</CODE>.
  491. */
  492. public int getState() {
  493. synchronized (stateLock) {
  494. return state ;
  495. }
  496. }
  497. /**
  498. * Gets the state of this <CODE>CommunicatorServer</CODE> as a string.
  499. *
  500. * @return One of the strings "ONLINE", "OFFLINE", "STARTING" or
  501. * "STOPPING".
  502. */
  503. public String getStateString() {
  504. return getStringForState(state) ;
  505. }
  506. /**
  507. * Gets the host name used by this <CODE>CommunicatorServer</CODE>.
  508. *
  509. * @return The host name used by this <CODE>CommunicatorServer</CODE>.
  510. */
  511. public String getHost() {
  512. try {
  513. host = InetAddress.getLocalHost().getHostName();
  514. } catch (Exception e) {
  515. host = "Unknown host";
  516. }
  517. return host ;
  518. }
  519. /**
  520. * Gets the port number used by this <CODE>CommunicatorServer</CODE>.
  521. *
  522. * @return The port number used by this <CODE>CommunicatorServer</CODE>.
  523. */
  524. public int getPort() {
  525. synchronized (stateLock) {
  526. return port ;
  527. }
  528. }
  529. /**
  530. * Sets the port number used by this <CODE>CommunicatorServer</CODE>.
  531. *
  532. * @param port The port number used by this
  533. * <CODE>CommunicatorServer</CODE>.
  534. *
  535. * @exception java.lang.IllegalStateException This method has been invoked
  536. * while the communicator was ONLINE or STARTING.
  537. */
  538. public void setPort(int port) throws java.lang.IllegalStateException {
  539. synchronized (stateLock) {
  540. if ((state == ONLINE) || (state == STARTING))
  541. throw new IllegalStateException("Stop server before " +
  542. "carrying out this operation");
  543. this.port = port;
  544. dbgTag = makeDebugTag();
  545. }
  546. }
  547. /**
  548. * Gets the protocol being used by this <CODE>CommunicatorServer</CODE>.
  549. * @return The protocol as a string.
  550. */
  551. public abstract String getProtocol() ;
  552. /**
  553. * Gets the number of clients that have been processed by this
  554. * <CODE>CommunicatorServer</CODE> since its creation.
  555. *
  556. * @return The number of clients handled by this
  557. * <CODE>CommunicatorServer</CODE>
  558. * since its creation. This counter is not reset by the
  559. * <CODE>stop</CODE> method.
  560. */
  561. int getServedClientCount() {
  562. return servedClientCount ;
  563. }
  564. /**
  565. * Gets the number of clients currently being processed by this
  566. * <CODE>CommunicatorServer</CODE>.
  567. *
  568. * @return The number of clients currently being processed by this
  569. * <CODE>CommunicatorServer</CODE>.
  570. */
  571. int getActiveClientCount() {
  572. int result = clientHandlerVector.size() ;
  573. return result ;
  574. }
  575. /**
  576. * Gets the maximum number of clients that this
  577. * <CODE>CommunicatorServer</CODE> can process concurrently.
  578. *
  579. * @return The maximum number of clients that this
  580. * <CODE>CommunicatorServer</CODE> can
  581. * process concurrently.
  582. */
  583. int getMaxActiveClientCount() {
  584. return maxActiveClientCount ;
  585. }
  586. /**
  587. * Sets the maximum number of clients this
  588. * <CODE>CommunicatorServer</CODE> can process concurrently.
  589. *
  590. * @param c The number of clients.
  591. *
  592. * @exception java.lang.IllegalStateException This method has been invoked
  593. * while the communicator was ONLINE or STARTING.
  594. */
  595. void setMaxActiveClientCount(int c)
  596. throws java.lang.IllegalStateException {
  597. synchronized (stateLock) {
  598. if ((state == ONLINE) || (state == STARTING)) {
  599. throw new IllegalStateException(
  600. "Stop server before carrying out this operation");
  601. }
  602. maxActiveClientCount = c ;
  603. }
  604. }
  605. /**
  606. * For SNMP Runtime internal use only.
  607. */
  608. void notifyClientHandlerCreated(ClientHandler h) {
  609. clientHandlerVector.addElement(h) ;
  610. }
  611. /**
  612. * For SNMP Runtime internal use only.
  613. */
  614. synchronized void notifyClientHandlerDeleted(ClientHandler h) {
  615. clientHandlerVector.removeElement(h);
  616. notifyAll();
  617. }
  618. /**
  619. * The number of times the communicator server will attempt
  620. * to bind before giving up.
  621. **/
  622. protected int getBindTries() {
  623. return 50;
  624. }
  625. /**
  626. * The delay, in ms, during which the communicator server will sleep before
  627. * attempting to bind again.
  628. **/
  629. protected long getBindSleepTime() {
  630. return 100;
  631. }
  632. /**
  633. * For SNMP Runtime internal use only.
  634. * <p>
  635. * The <CODE>run</CODE> method executed by this connector's main thread.
  636. */
  637. public void run() {
  638. // Fix jaw.00667.B
  639. // It seems that the init of "i" and "success"
  640. // need to be done outside the "try" clause...
  641. // A bug in Java 2 production release ?
  642. //
  643. int i = 0;
  644. boolean success = false;
  645. // ----------------------
  646. // Bind
  647. // ----------------------
  648. try {
  649. // Fix for bug 4352451: "java.net.BindException: Address in use".
  650. //
  651. final int bindRetries = getBindTries();
  652. final long sleepTime = getBindSleepTime();
  653. while (i < bindRetries && !success) {
  654. try {
  655. // Try socket connection.
  656. //
  657. doBind();
  658. success = true;
  659. } catch (CommunicationException ce) {
  660. i++;
  661. try {
  662. Thread.sleep(sleepTime);
  663. } catch (InterruptedException ie) {
  664. throw ie;
  665. }
  666. }
  667. }
  668. // Retry last time to get correct exception.
  669. //
  670. if (!success) {
  671. // Try socket connection.
  672. //
  673. doBind();
  674. }
  675. } catch(Exception x) {
  676. if (isDebugOn()) {
  677. debug("run","Unexpected exception = "+x) ;
  678. }
  679. synchronized(stateLock) {
  680. startException = x;
  681. changeState(OFFLINE);
  682. }
  683. if (isTraceOn()) {
  684. trace("run","State is OFFLINE") ;
  685. }
  686. doError(x);
  687. return;
  688. }
  689. try {
  690. // ----------------------
  691. // State change
  692. // ----------------------
  693. changeState(ONLINE) ;
  694. if (isTraceOn()) {
  695. trace("run","State is ONLINE") ;
  696. }
  697. // ----------------------
  698. // Main loop
  699. // ----------------------
  700. while (!stopRequested) {
  701. servedClientCount++;
  702. doReceive() ;
  703. waitIfTooManyClients() ;
  704. doProcess() ;
  705. }
  706. if (isTraceOn()) {
  707. trace("run","Stop has been requested") ;
  708. }
  709. } catch(InterruptedException x) {
  710. if (isTraceOn()) {
  711. trace("run","Interrupt caught") ;
  712. }
  713. changeState(STOPPING);
  714. } catch(Exception x) {
  715. if (isDebugOn()) {
  716. debug("run","Unexpected exception = "+x) ;
  717. }
  718. changeState(STOPPING);
  719. } finally {
  720. synchronized (stateLock) {
  721. interrupted = true;
  722. Thread.currentThread().interrupted();
  723. }
  724. // ----------------------
  725. // unBind
  726. // ----------------------
  727. try {
  728. doUnbind() ;
  729. waitClientTermination() ;
  730. changeState(OFFLINE);
  731. if (isTraceOn()) {
  732. trace("run","State is OFFLINE") ;
  733. }
  734. } catch(Exception x) {
  735. if (isDebugOn()) {
  736. debug("run","Unexpected exception = "+x) ;
  737. }
  738. changeState(OFFLINE);
  739. }
  740. }
  741. }
  742. /**
  743. */
  744. protected abstract void doError(Exception e) throws CommunicationException;
  745. //
  746. // To be defined by the subclass.
  747. //
  748. // Each method below is called by run() and must be subclassed.
  749. // If the method sends an exception (Communication or Interrupt), this
  750. // will end up the run() method and switch the connector offline.
  751. //
  752. // If it is a CommunicationException, run() will call
  753. // Debug.printException().
  754. //
  755. // All these methods should propagate the InterruptedException to inform
  756. // run() that the connector must be switch OFFLINE.
  757. //
  758. //
  759. //
  760. // doBind() should do all what is needed before calling doReceive().
  761. // If doBind() throws an exception, doUnbind() is not to be called
  762. // and run() ends up.
  763. //
  764. /**
  765. */
  766. protected abstract void doBind()
  767. throws CommunicationException, InterruptedException ;
  768. /**
  769. * <CODE>doReceive()</CODE> should block until a client is available.
  770. * If this method throws an exception, <CODE>doProcess()</CODE> is not
  771. * called but <CODE>doUnbind()</CODE> is called then <CODE>run()</CODE>
  772. * stops.
  773. */
  774. protected abstract void doReceive()
  775. throws CommunicationException, InterruptedException ;
  776. /**
  777. * <CODE>doProcess()</CODE> is called after <CODE>doReceive()</CODE>:
  778. * it should process the requests of the incoming client.
  779. * If it throws an exception, <CODE>doUnbind()</CODE> is called and
  780. * <CODE>run()</CODE> stops.
  781. */
  782. protected abstract void doProcess()
  783. throws CommunicationException, InterruptedException ;
  784. /**
  785. * <CODE>doUnbind()</CODE> is called whenever the connector goes
  786. * <CODE>OFFLINE</CODE>, except if <CODE>doBind()</CODE> has thrown an
  787. * exception.
  788. */
  789. protected abstract void doUnbind()
  790. throws CommunicationException, InterruptedException ;
  791. /**
  792. * Get the <code>MBeanServer</code> object to which incoming requests are
  793. * sent. This is either the MBean server in which this connector is
  794. * registered, or an <code>MBeanServerForwarder</code> leading to that
  795. * server.
  796. */
  797. public synchronized MBeanServer getMBeanServer() {
  798. return topMBS;
  799. }
  800. /**
  801. * Set the <code>MBeanServer</code> object to which incoming
  802. * requests are sent. This must be either the MBean server in
  803. * which this connector is registered, or an
  804. * <code>MBeanServerForwarder</code> leading to that server. An
  805. * <code>MBeanServerForwarder</code> <code>mbsf</code> leads to an
  806. * MBean server <code>mbs</code> if
  807. * <code>mbsf.getMBeanServer()</code> is either <code>mbs</code>
  808. * or an <code>MBeanServerForwarder</code> leading to
  809. * <code>mbs</code>.
  810. *
  811. * @exception IllegalArgumentException if <code>newMBS</code> is neither
  812. * the MBean server in which this connector is registered nor an
  813. * <code>MBeanServerForwarder</code> leading to that server.
  814. *
  815. * @exception IllegalStateException This method has been invoked
  816. * while the communicator was ONLINE or STARTING.
  817. */
  818. public synchronized void setMBeanServer(MBeanServer newMBS)
  819. throws IllegalArgumentException, IllegalStateException {
  820. synchronized (stateLock) {
  821. if (state == ONLINE || state == STARTING)
  822. throw new IllegalStateException("Stop server before " +
  823. "carrying out this operation");
  824. }
  825. final String error =
  826. "MBeanServer argument must be MBean server where this " +
  827. "server is registered, or an MBeanServerForwarder " +
  828. "leading to that server";
  829. Vector seenMBS = new Vector();
  830. for (MBeanServer mbs = newMBS;
  831. mbs != bottomMBS;
  832. mbs = ((MBeanServerForwarder) mbs).getMBeanServer()) {
  833. if (!(mbs instanceof MBeanServerForwarder))
  834. throw new IllegalArgumentException(error);
  835. if (seenMBS.contains(mbs))
  836. throw new IllegalArgumentException("MBeanServerForwarder " +
  837. "loop");
  838. seenMBS.addElement(mbs);
  839. }
  840. topMBS = newMBS;
  841. }
  842. //
  843. // To be called by the subclass if needed
  844. //
  845. /**
  846. * For internal use only.
  847. */
  848. ObjectName getObjectName() {
  849. return objectName ;
  850. }
  851. /**
  852. * For internal use only.
  853. */
  854. void changeState(int newState) {
  855. int oldState;
  856. synchronized (stateLock) {
  857. if (state == newState)
  858. return;
  859. oldState = state;
  860. state = newState;
  861. stateLock.notifyAll();
  862. }
  863. sendStateChangeNotification(oldState, newState);
  864. }
  865. /**
  866. * Returns the string used in debug traces.
  867. */
  868. String makeDebugTag() {
  869. return "CommunicatorServer["+ getProtocol() + ":" + getPort() + "]" ;
  870. }
  871. /**
  872. * Returns the string used to name the connector thread.
  873. */
  874. String makeThreadName() {
  875. String result ;
  876. if (objectName == null)
  877. result = "CommunicatorServer" ;
  878. else
  879. result = objectName.toString() ;
  880. return result ;
  881. }
  882. /**
  883. * This method blocks if there are too many active clients.
  884. * Call to <CODE>wait()</CODE> is terminated when a client handler
  885. * thread calls <CODE>notifyClientHandlerDeleted(this)</CODE> ;
  886. */
  887. private synchronized void waitIfTooManyClients()
  888. throws InterruptedException {
  889. while (getActiveClientCount() >= maxActiveClientCount) {
  890. if (isTraceOn()) {
  891. trace("waitIfTooManyClients",
  892. "Waiting for a client to terminate") ;
  893. }
  894. wait();
  895. }
  896. }
  897. /**
  898. * This method blocks until there is no more active client.
  899. */
  900. private void waitClientTermination() {
  901. int s = clientHandlerVector.size() ;
  902. if (isTraceOn()) {
  903. if (s >= 1) {
  904. trace("waitClientTermination","waiting for " +
  905. s + " clients to terminate") ;
  906. }
  907. }
  908. for (Enumeration e = clientHandlerVector.elements() ;
  909. e.hasMoreElements();){
  910. ClientHandler h = (ClientHandler)e.nextElement() ;
  911. h.join() ;
  912. }
  913. if (isTraceOn()) {
  914. if (s >= 1) {
  915. trace("waitClientTermination","Ok, let's go...") ;
  916. }
  917. }
  918. }
  919. /**
  920. * Call <CODE>interrupt()</CODE> on each pending client.
  921. */
  922. private void terminateAllClient() {
  923. int s = clientHandlerVector.size() ;
  924. if (isTraceOn()) {
  925. if (s >= 1) {
  926. trace("terminateAllClient","Interrupting " + s + " clients") ;
  927. }
  928. }
  929. for (Enumeration e = clientHandlerVector.elements() ;
  930. e.hasMoreElements();){
  931. ClientHandler h = (ClientHandler)e.nextElement() ;
  932. h.interrupt() ;
  933. }
  934. }
  935. /**
  936. * Controls the way the CommunicatorServer service is deserialized.
  937. */
  938. private void readObject(ObjectInputStream stream)
  939. throws IOException, ClassNotFoundException {
  940. // Call the default deserialization of the object.
  941. //
  942. stream.defaultReadObject();
  943. // Call the specific initialization for the CommunicatorServer service.
  944. // This is for transient structures to be initialized to specific
  945. // default values.
  946. //
  947. stateLock = new Object();
  948. state = OFFLINE;
  949. stopRequested = false;
  950. servedClientCount = 0;
  951. clientHandlerVector = new Vector();
  952. fatherThread = Thread.currentThread();
  953. mainThread = null;
  954. notifCount = 0;
  955. notifInfos = null;
  956. notifBroadcaster = new NotificationBroadcasterSupport();
  957. dbgTag = makeDebugTag();
  958. }
  959. //
  960. // NotificationBroadcaster
  961. //
  962. /**
  963. * Adds a listener for the notifications emitted by this
  964. * CommunicatorServer.
  965. * There is only one type of notifications sent by the CommunicatorServer:
  966. * they are <tt>{@link javax.management.AttributeChangeNotification}</tt>,
  967. * sent when the <tt>State</tt> attribute of this CommunicatorServer
  968. * changes.
  969. *
  970. * @param listener The listener object which will handle the emitted
  971. * notifications.
  972. * @param filter The filter object. If filter is null, no filtering
  973. * will be performed before handling notifications.
  974. * @param handback An object which will be sent back unchanged to the
  975. * listener when a notification is emitted.
  976. *
  977. * @exception IllegalArgumentException Listener parameter is null.
  978. */
  979. public void addNotificationListener(NotificationListener listener,
  980. NotificationFilter filter,
  981. Object handback)
  982. throws java.lang.IllegalArgumentException {
  983. if (isDebugOn()) {
  984. debug("addNotificationListener","Adding listener "+ listener +
  985. " with filter "+ filter + " and handback "+ handback);
  986. }
  987. notifBroadcaster.addNotificationListener(listener, filter, handback);
  988. }
  989. /**
  990. * Removes the specified listener from this CommunicatorServer.
  991. * Note that if the listener has been registered with different
  992. * handback objects or notification filters, all entries corresponding
  993. * to the listener will be removed.
  994. *
  995. * @param listener The listener object to be removed.
  996. *
  997. * @exception ListenerNotFoundException The listener is not registered.
  998. */
  999. public void removeNotificationListener(NotificationListener listener)
  1000. throws ListenerNotFoundException {
  1001. if (isDebugOn()) {
  1002. debug("removeNotificationListener","Removing listener "+ listener);
  1003. }
  1004. notifBroadcaster.removeNotificationListener(listener);
  1005. }
  1006. /**
  1007. * Returns an array of MBeanNotificationInfo objects describing
  1008. * the notification types sent by this CommunicatorServer.
  1009. * There is only one type of notifications sent by the CommunicatorServer:
  1010. * it is <tt>{@link javax.management.AttributeChangeNotification}</tt>,
  1011. * sent when the <tt>State</tt> attribute of this CommunicatorServer
  1012. * changes.
  1013. */
  1014. public MBeanNotificationInfo[] getNotificationInfo() {
  1015. // Initialize notifInfos on first call to getNotificationInfo()
  1016. //
  1017. if (notifInfos == null) {
  1018. notifInfos = new MBeanNotificationInfo[1];
  1019. String[] notifTypes = {
  1020. AttributeChangeNotification.ATTRIBUTE_CHANGE};
  1021. notifInfos[0] = new MBeanNotificationInfo( notifTypes,
  1022. AttributeChangeNotification.class.getName(),
  1023. "Sent to notify that the value of the State attribute "+
  1024. "of this CommunicatorServer instance has changed.");
  1025. }
  1026. return notifInfos;
  1027. }
  1028. /**
  1029. *
  1030. */
  1031. private void sendStateChangeNotification(int oldState, int newState) {
  1032. String oldStateString = getStringForState(oldState);
  1033. String newStateString = getStringForState(newState);
  1034. String message = new StringBuffer().append(dbgTag)
  1035. .append(" The value of attribute State has changed from ")
  1036. .append(oldState).append(" (").append(oldStateString)
  1037. .append(") to ").append(newState).append(" (")
  1038. .append(newStateString).append(").").toString();
  1039. notifCount++;
  1040. AttributeChangeNotification notif =
  1041. new AttributeChangeNotification(this, // source
  1042. notifCount, // sequence number
  1043. System.currentTimeMillis(), // time stamp
  1044. message, // message
  1045. "State", // attribute name
  1046. "int", // attribute type
  1047. new Integer(oldState), // old value
  1048. new Integer(newState) ); // new value
  1049. if (isDebugOn()) {
  1050. debug("sendStateChangeNotification",
  1051. "Sending AttributeChangeNotification #"+ notifCount +
  1052. " with message: "+ message);
  1053. }
  1054. notifBroadcaster.sendNotification(notif);
  1055. }
  1056. /**
  1057. *
  1058. */
  1059. private static String getStringForState(int s) {
  1060. switch (s) {
  1061. case ONLINE: return "ONLINE";
  1062. case STARTING: return "STARTING";
  1063. case OFFLINE: return "OFFLINE";
  1064. case STOPPING: return "STOPPING";
  1065. default: return "UNDEFINED";
  1066. }
  1067. }
  1068. //
  1069. // MBeanRegistration
  1070. //
  1071. /**
  1072. * Preregister method of connector.
  1073. *
  1074. *@param server The <CODE>MBeanServer</CODE> in which the MBean will
  1075. * be registered.
  1076. *@param name The object name of the MBean.
  1077. *
  1078. *@return The name of the MBean registered.
  1079. *
  1080. *@exception java.langException This exception should be caught by
  1081. * the <CODE>MBeanServer</CODE> and re-thrown
  1082. * as an <CODE>MBeanRegistrationException</CODE>.
  1083. */
  1084. public ObjectName preRegister(MBeanServer server, ObjectName name)
  1085. throws java.lang.Exception {
  1086. objectName = name;
  1087. synchronized (this) {
  1088. if (bottomMBS != null) {
  1089. throw new IllegalArgumentException("connector already " +
  1090. "registered in an MBean " +
  1091. "server");
  1092. }
  1093. topMBS = bottomMBS = server;
  1094. }
  1095. dbgTag = makeDebugTag();
  1096. return name;
  1097. }
  1098. /**
  1099. *
  1100. *@param registrationDone Indicates whether or not the MBean has been
  1101. * successfully registered in the <CODE>MBeanServer</CODE>.
  1102. * The value false means that the registration phase has failed.
  1103. */
  1104. public void postRegister(Boolean registrationDone) {
  1105. if (!registrationDone.booleanValue()) {
  1106. synchronized (this) {
  1107. topMBS = bottomMBS = null;
  1108. }
  1109. }
  1110. }
  1111. /**
  1112. * Stop the connector.
  1113. *
  1114. * @exception java.langException This exception should be caught by
  1115. * the <CODE>MBeanServer</CODE> and re-thrown
  1116. * as an <CODE>MBeanRegistrationException</CODE>.
  1117. */
  1118. public void preDeregister() throws java.lang.Exception {
  1119. synchronized (this) {
  1120. topMBS = bottomMBS = null;
  1121. }
  1122. objectName = null ;
  1123. final int cstate = getState();
  1124. if ((cstate == ONLINE) || ( cstate == STARTING)) {
  1125. stop() ;
  1126. }
  1127. }
  1128. /**
  1129. * Do nothing.
  1130. */
  1131. public void postDeregister(){
  1132. }
  1133. /**
  1134. * Load a class using the default loader repository
  1135. **/
  1136. Class loadClass(String className)
  1137. throws ClassNotFoundException {
  1138. try {
  1139. return Class.forName(className);
  1140. } catch (ClassNotFoundException e) {
  1141. final ClassLoaderRepository clr =
  1142. MBeanServerFactory.getClassLoaderRepository(bottomMBS);
  1143. if (clr == null) throw new ClassNotFoundException(className);
  1144. return clr.loadClass(className);
  1145. }
  1146. }
  1147. //
  1148. // Debug stuff
  1149. //
  1150. /**
  1151. */
  1152. int infoType;
  1153. /**
  1154. */
  1155. boolean isTraceOn() {
  1156. return Trace.isSelected(Trace.LEVEL_TRACE, infoType);
  1157. }
  1158. /**
  1159. */
  1160. void trace(String clz, String func, String info) {
  1161. Trace.send(Trace.LEVEL_TRACE, infoType, clz, func, info);
  1162. }
  1163. /**
  1164. */
  1165. boolean isDebugOn() {
  1166. return Trace.isSelected(Trace.LEVEL_DEBUG, infoType);
  1167. }
  1168. /**
  1169. */
  1170. void debug(String clz, String func, String info) {
  1171. Trace.send(Trace.LEVEL_DEBUG, infoType, clz, func, info);
  1172. }
  1173. /**
  1174. */
  1175. void debug(String clz, String func, Throwable exception) {
  1176. Trace.send(Trace.LEVEL_DEBUG, infoType, clz, func, exception);
  1177. }
  1178. /**
  1179. */
  1180. void trace(String func, String info) {
  1181. trace(dbgTag, func, info);
  1182. }
  1183. /**
  1184. */
  1185. void debug(String func, String info) {
  1186. debug(dbgTag, func, info);
  1187. }
  1188. /**
  1189. */
  1190. void debug(String func, Throwable exception) {
  1191. debug(dbgTag, func, exception);
  1192. }
  1193. }