1. /*
  2. * @(#)ClientNotifForwarder.java 1.34 04/04/23
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package com.sun.jmx.remote.internal;
  8. import java.io.IOException;
  9. import java.io.NotSerializableException;
  10. import java.io.Serializable;
  11. import java.util.ArrayList;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. import java.security.AccessController;
  15. import java.security.PrivilegedAction;
  16. import javax.security.auth.Subject;
  17. import javax.management.Notification;
  18. import javax.management.NotificationListener;
  19. import javax.management.NotificationFilter;
  20. import javax.management.ObjectName;
  21. import javax.management.ListenerNotFoundException;
  22. import javax.management.MBeanServerNotification;
  23. import javax.management.InstanceNotFoundException;
  24. import javax.management.ListenerNotFoundException;
  25. import javax.management.remote.NotificationResult;
  26. import javax.management.remote.TargetedNotification;
  27. import javax.management.remote.JMXConnectionNotification;
  28. import com.sun.jmx.remote.util.ClassLogger;
  29. import com.sun.jmx.remote.util.EnvHelp;
  30. public abstract class ClientNotifForwarder {
  31. public ClientNotifForwarder(Map env) {
  32. this(null, env);
  33. }
  34. public ClientNotifForwarder(ClassLoader defaultClassLoader, Map env) {
  35. maxNotifications = EnvHelp.getMaxFetchNotifNumber(env);
  36. timeout = EnvHelp.getFetchTimeout(env);
  37. this.defaultClassLoader = defaultClassLoader;
  38. }
  39. /**
  40. * Called to to fetch notifications from a server.
  41. */
  42. abstract protected NotificationResult fetchNotifs(long clientSequenceNumber,
  43. int maxNotifications,
  44. long timeout)
  45. throws IOException, ClassNotFoundException;
  46. abstract protected Integer addListenerForMBeanRemovedNotif()
  47. throws IOException, InstanceNotFoundException;
  48. abstract protected void removeListenerForMBeanRemovedNotif(Integer id)
  49. throws IOException, InstanceNotFoundException,
  50. ListenerNotFoundException;
  51. /**
  52. * Used to send out a notification about lost notifs
  53. */
  54. abstract protected void lostNotifs(String message, long number);
  55. public synchronized void addNotificationListener(Integer listenerID,
  56. ObjectName name,
  57. NotificationListener listener,
  58. NotificationFilter filter,
  59. Object handback,
  60. Subject delegationSubject)
  61. throws IOException, InstanceNotFoundException {
  62. if (logger.traceOn()) {
  63. logger.trace("addNotificationListener",
  64. "Add the listener "+listener+" at "+name);
  65. }
  66. infoList.put(listenerID,
  67. new ClientListenerInfo(listenerID,
  68. name,
  69. listener,
  70. filter,
  71. handback,
  72. delegationSubject));
  73. init(false);
  74. }
  75. public synchronized Integer[]
  76. removeNotificationListener(ObjectName name,
  77. NotificationListener listener)
  78. throws ListenerNotFoundException, IOException {
  79. beforeRemove();
  80. if (logger.traceOn()) {
  81. logger.trace("removeNotificationListener",
  82. "Remove the listener "+listener+" from "+name);
  83. }
  84. ArrayList ids = new ArrayList();
  85. ArrayList values = new ArrayList(infoList.values());
  86. for (int i=values.size()-1; i>=0; i--) {
  87. ClientListenerInfo li = (ClientListenerInfo)values.get(i);
  88. if (li.sameAs(name, listener)) {
  89. ids.add(li.getListenerID());
  90. infoList.remove(li.getListenerID());
  91. }
  92. }
  93. if (ids.isEmpty())
  94. throw new ListenerNotFoundException("Listener not found");
  95. return (Integer[])ids.toArray(new Integer[0]);
  96. }
  97. public synchronized Integer
  98. removeNotificationListener(ObjectName name,
  99. NotificationListener listener,
  100. NotificationFilter filter,
  101. Object handback)
  102. throws ListenerNotFoundException, IOException {
  103. if (logger.traceOn()) {
  104. logger.trace("removeNotificationListener",
  105. "Remove the listener "+listener+" from "+name);
  106. }
  107. beforeRemove();
  108. Integer id = null;
  109. ArrayList values = new ArrayList(infoList.values());
  110. for (int i=values.size()-1; i>=0; i--) {
  111. ClientListenerInfo li = (ClientListenerInfo)values.get(i);
  112. if (li.sameAs(name, listener, filter, handback)) {
  113. id=li.getListenerID();
  114. infoList.remove(id);
  115. break;
  116. }
  117. }
  118. if (id == null)
  119. throw new ListenerNotFoundException("Listener not found");
  120. return id;
  121. }
  122. public synchronized Integer[] removeNotificationListener(ObjectName name) {
  123. if (logger.traceOn()) {
  124. logger.trace("removeNotificationListener",
  125. "Remove all listeners registered at "+name);
  126. }
  127. ArrayList ids = new ArrayList();
  128. ArrayList values = new ArrayList(infoList.values());
  129. for (int i=values.size()-1; i>=0; i--) {
  130. ClientListenerInfo li = (ClientListenerInfo)values.get(i);
  131. if (li.sameAs(name)) {
  132. ids.add(li.getListenerID());
  133. infoList.remove(li.getListenerID());
  134. }
  135. }
  136. return (Integer[]) ids.toArray(new Integer[0]);
  137. }
  138. public synchronized ListenerInfo[] getListenerInfo() {
  139. return (ListenerInfo[])infoList.values().toArray(new ListenerInfo[0]);
  140. }
  141. /*
  142. * Called when a connector is doing reconnection. Like <code>postReconnection</code>,
  143. * this method is intended to be called only by a client connetor:
  144. * <code>RMIConnector</code/> and <code/>ClientIntermediary</code>.
  145. * Call this method will set the flag beingReconnection to <code>true</code>,
  146. * and the thread used to fetch notifis will be stopped, a new thread can be
  147. * created only after the method <code>postReconnection</code> is called.
  148. *
  149. * It is caller's responsiblity to not re-call this method before calling
  150. * <code>postReconnection.
  151. */
  152. public synchronized ClientListenerInfo[] preReconnection() throws IOException {
  153. if (state == TERMINATED || beingReconnected) { // should never
  154. throw new IOException("Illegal state.");
  155. }
  156. final ClientListenerInfo[] tmp = (ClientListenerInfo[])
  157. infoList.values().toArray(new ClientListenerInfo[0]);
  158. beingReconnected = true;
  159. infoList.clear();
  160. while (state == STARTING) {
  161. try {
  162. wait();
  163. } catch (InterruptedException ire) {
  164. IOException ioe = new IOException(ire.toString());
  165. EnvHelp.initCause(ioe, ire);
  166. throw ioe;
  167. }
  168. }
  169. if (state == STARTED) {
  170. setState(STOPPING);
  171. // we remove Thread.innterrup because of thebug 4956080:
  172. // BrokenConnectionTest.java fails under jtreg; runs OK standalone
  173. // if (Thread.currentThread() != notifFetcher.fetchThread) {
  174. // notifFetcher.fetchThread.interrupt();
  175. // }
  176. }
  177. return tmp;
  178. }
  179. /**
  180. * Called after reconnection is finished.
  181. * This method is intended to be called only by a client connetor:
  182. * <code>RMIConnector</code/> and <code/>ClientIntermediary</code>.
  183. */
  184. public synchronized void postReconnection(ClientListenerInfo[] listenerInfos)
  185. throws IOException {
  186. if (state == TERMINATED) {
  187. return;
  188. }
  189. while (state == STOPPING) {
  190. try {
  191. wait();
  192. } catch (InterruptedException ire) {
  193. IOException ioe = new IOException(ire.toString());
  194. EnvHelp.initCause(ioe, ire);
  195. throw ioe;
  196. }
  197. }
  198. final boolean trace = logger.traceOn();
  199. final int len = listenerInfos.length;
  200. for (int i=0; i<len; i++) {
  201. if (trace) {
  202. logger.trace("addNotificationListeners",
  203. "Add a listener at "+
  204. listenerInfos[i].getListenerID());
  205. }
  206. infoList.put(listenerInfos[i].getListenerID(), listenerInfos[i]);
  207. }
  208. beingReconnected = false;
  209. notifyAll();
  210. if (listenerInfos.length > 0) { // old listeners re-registered
  211. init(true);
  212. } else if (infoList.size() > 0) {
  213. // but new listeners registered during reconnection
  214. init(false);
  215. }
  216. }
  217. public synchronized void terminate() {
  218. if (state == TERMINATED) {
  219. return;
  220. }
  221. if (logger.traceOn()) {
  222. logger.trace("terminate", "Terminating...");
  223. }
  224. if (state == STARTED) {
  225. infoList.clear();
  226. }
  227. setState(TERMINATED);
  228. }
  229. // -------------------------------------------------
  230. // private classes
  231. // -------------------------------------------------
  232. //
  233. private class NotifFetcher implements Runnable {
  234. public void run() {
  235. fetchThread = Thread.currentThread();
  236. setState(STARTED);
  237. if (defaultClassLoader != null) {
  238. AccessController.doPrivileged(new PrivilegedAction() {
  239. public Object run() {
  240. fetchThread.
  241. setContextClassLoader(defaultClassLoader);
  242. return null;
  243. }
  244. });
  245. }
  246. while (!shouldStop()) {
  247. NotificationResult nr = fetchNotifs();
  248. if (nr == null) break; // nr == null means got exception
  249. final TargetedNotification[] notifs =
  250. nr.getTargetedNotifications();
  251. final int len = notifs.length;
  252. final HashMap listeners;
  253. final Integer myListenerID;
  254. long missed = 0;
  255. synchronized(ClientNotifForwarder.this) {
  256. // check sequence number.
  257. //
  258. if (clientSequenceNumber >= 0) {
  259. missed = nr.getEarliestSequenceNumber() -
  260. clientSequenceNumber;
  261. }
  262. clientSequenceNumber = nr.getNextSequenceNumber();
  263. final int size = infoList.size();
  264. listeners = new HashMap(((size>len)?len:size));
  265. for (int i = 0 ; i < len ; i++) {
  266. final TargetedNotification tn = notifs[i];
  267. final Integer listenerID = tn.getListenerID();
  268. // check if an mbean unregistration notif
  269. if (!listenerID.equals(mbeanRemovedNotifID)) {
  270. final ListenerInfo li =
  271. (ListenerInfo) infoList.get(listenerID);
  272. if (li != null)
  273. listeners.put(listenerID,li);
  274. continue;
  275. }
  276. final Notification notif = tn.getNotification();
  277. final String unreg =
  278. MBeanServerNotification.UNREGISTRATION_NOTIFICATION;
  279. if (notif instanceof MBeanServerNotification &&
  280. notif.getType().equals(unreg)) {
  281. MBeanServerNotification mbsn =
  282. (MBeanServerNotification) notif;
  283. ObjectName name = mbsn.getMBeanName();
  284. removeNotificationListener(name);
  285. }
  286. }
  287. myListenerID = mbeanRemovedNotifID;
  288. }
  289. if (missed > 0) {
  290. final String msg =
  291. "May have lost up to " + missed +
  292. " notification" + (missed == 1 ? "" : "s");
  293. lostNotifs(msg, missed);
  294. logger.trace("NotifFetcher.run", msg);
  295. }
  296. // forward
  297. for (int i = 0 ; i < len ; i++) {
  298. final TargetedNotification tn = notifs[i];
  299. dispatchNotification(tn,myListenerID,listeners);
  300. }
  301. }
  302. // tell that the thread is REALLY stopped
  303. setState(STOPPED);
  304. }
  305. void dispatchNotification(TargetedNotification tn,
  306. Integer myListenerID, Map listeners) {
  307. final Notification notif = tn.getNotification();
  308. final Integer listenerID = tn.getListenerID();
  309. if (listenerID.equals(myListenerID)) return;
  310. final ListenerInfo li = (ClientListenerInfo)
  311. listeners.get(listenerID);
  312. if (li == null) {
  313. logger.trace("NotifFetcher.dispatch",
  314. "Listener ID not in map");
  315. return;
  316. }
  317. NotificationListener l = li.getListener();
  318. Object h = li.getHandback();
  319. try {
  320. l.handleNotification(notif, h);
  321. } catch (RuntimeException e) {
  322. final String msg =
  323. "Failed to forward a notification " +
  324. "to a listener";
  325. logger.trace("NotifFetcher-run", msg, e);
  326. }
  327. }
  328. private NotificationResult fetchNotifs() {
  329. try {
  330. NotificationResult nr = ClientNotifForwarder.this.
  331. fetchNotifs(clientSequenceNumber,maxNotifications,
  332. timeout);
  333. if (logger.traceOn()) {
  334. logger.trace("NotifFetcher-run",
  335. "Got notifications from the server: "+nr);
  336. }
  337. return nr;
  338. } catch (ClassNotFoundException e) {
  339. logger.trace("NotifFetcher.fetchNotifs", e);
  340. return fetchOneNotif();
  341. } catch (NotSerializableException e) {
  342. logger.trace("NotifFetcher.fetchNotifs", e);
  343. return fetchOneNotif();
  344. } catch (IOException ioe) {
  345. if (!shouldStop()) {
  346. logger.error("NotifFetcher-run",
  347. "Failed to fetch notification, " +
  348. "stopping thread. Error is: " + ioe, ioe);
  349. logger.debug("NotifFetcher-run",ioe);
  350. }
  351. // no more fetching
  352. return null;
  353. }
  354. }
  355. /* Fetch one notification when we suspect that it might be a
  356. notification that we can't deserialize (because of a
  357. missing class). First we ask for 0 notifications with 0
  358. timeout. This allows us to skip sequence numbers for
  359. notifications that don't match our filters. Then we ask
  360. for one notification. If that produces a
  361. ClassNotFoundException or a NotSerializableException, we
  362. increase our sequence number and ask again. Eventually we
  363. will either get a successful notification, or a return with
  364. 0 notifications. In either case we can return a
  365. NotificationResult. This algorithm works (albeit less
  366. well) even if the server implementation doesn't optimize a
  367. request for 0 notifications to skip sequence numbers for
  368. notifications that don't match our filters.
  369. If we had at least one ClassNotFoundException, then we
  370. must emit a JMXConnectionNotification.LOST_NOTIFS.
  371. */
  372. private NotificationResult fetchOneNotif() {
  373. ClientNotifForwarder cnf = ClientNotifForwarder.this;
  374. long startSequenceNumber = clientSequenceNumber;
  375. int notFoundCount = 0;
  376. NotificationResult result = null;
  377. while (result == null && !shouldStop()) {
  378. NotificationResult nr;
  379. try {
  380. // 0 notifs to update startSequenceNumber
  381. nr = cnf.fetchNotifs(startSequenceNumber, 0, 0L);
  382. } catch (ClassNotFoundException e) {
  383. logger.warning("NotifFetcher.fetchOneNotif",
  384. "Impossible exception: " + e);
  385. logger.debug("NotifFetcher.fetchOneNotif",e);
  386. return null;
  387. } catch (IOException e) {
  388. if (!shouldStop())
  389. logger.trace("NotifFetcher.fetchOneNotif", e);
  390. return null;
  391. }
  392. if (shouldStop())
  393. return null;
  394. startSequenceNumber = nr.getNextSequenceNumber();
  395. try {
  396. // 1 notif to skip possible missing class
  397. result = cnf.fetchNotifs(startSequenceNumber, 1, 0L);
  398. } catch (Exception e) {
  399. if (e instanceof ClassNotFoundException
  400. || e instanceof NotSerializableException) {
  401. logger.warning("NotifFetcher.fetchOneNotif",
  402. "Failed to deserialize a notification: "+e.toString());
  403. if (logger.traceOn()) {
  404. logger.trace("NotifFetcher.fetchOneNotif",
  405. "Failed to deserialize a notification.", e);
  406. }
  407. notFoundCount++;
  408. startSequenceNumber++;
  409. } else {
  410. if (!shouldStop())
  411. logger.trace("NotifFetcher.fetchOneNotif", e);
  412. return null;
  413. }
  414. }
  415. }
  416. if (notFoundCount > 0) {
  417. final String msg =
  418. "Dropped " + notFoundCount + " notification" +
  419. (notFoundCount == 1 ? "" : "s") +
  420. " because classes were missing locally";
  421. lostNotifs(msg, notFoundCount);
  422. }
  423. return result;
  424. }
  425. private boolean shouldStop() {
  426. synchronized (ClientNotifForwarder.this) {
  427. if (state != STARTED) {
  428. return true;
  429. } else if (infoList.size() == 0) {
  430. // no more listener, stop fetching
  431. setState(STOPPING);
  432. return true;
  433. }
  434. return false;
  435. }
  436. }
  437. // the thread executing fetch job
  438. private Thread fetchThread;
  439. }
  440. // -------------------------------------------------
  441. // private methods
  442. // -------------------------------------------------
  443. private synchronized void setState(int newState) {
  444. if (state == TERMINATED) {
  445. return;
  446. }
  447. state = newState;
  448. this.notifyAll();
  449. }
  450. /*
  451. * Called to decide whether need to start a thread for fetching notifs.
  452. * <P>The parameter reconnected will decide whether to initilize the clientSequenceNumber,
  453. * initilaizing the clientSequenceNumber means to ignore all notifications arrived before.
  454. * If it is reconnected, we will not initialize in order to get all notifications arrived
  455. * during the reconnection. It may cause the newly registered listeners to receive some
  456. * notifications arrived before its registray.
  457. */
  458. private synchronized void init(boolean reconnected) throws IOException {
  459. switch (state) {
  460. case STARTED:
  461. return;
  462. case STARTING:
  463. return;
  464. case TERMINATED:
  465. throw new IOException("The ClientNotifForwarder has been terminated.");
  466. case STOPPING:
  467. if (beingReconnected == true) {
  468. // wait for another thread to do, which is doing reconnection
  469. return;
  470. }
  471. while (state == STOPPING) { // make sure only one fetching thread.
  472. try {
  473. wait();
  474. } catch (InterruptedException ire) {
  475. IOException ioe = new IOException(ire.toString());
  476. EnvHelp.initCause(ioe, ire);
  477. throw ioe;
  478. }
  479. }
  480. // re-call this method to check the state again,
  481. // the state can be other value like TERMINATED.
  482. init(reconnected);
  483. return;
  484. case STOPPED:
  485. if (beingReconnected == true) {
  486. // wait for another thread to do, which is doing reconnection
  487. return;
  488. }
  489. if (logger.traceOn()) {
  490. logger.trace("init", "Initializing...");
  491. }
  492. // init the clientSequenceNumber if not reconnected.
  493. if (!reconnected) {
  494. try {
  495. NotificationResult nr = fetchNotifs(-1, 0, 0);
  496. clientSequenceNumber = nr.getNextSequenceNumber();
  497. } catch (ClassNotFoundException e) {
  498. // can't happen
  499. logger.warning("init", "Impossible exception: "+ e);
  500. logger.debug("init",e);
  501. }
  502. }
  503. // for cleaning
  504. try {
  505. mbeanRemovedNotifID = addListenerForMBeanRemovedNotif();
  506. } catch (Exception e) {
  507. final String msg =
  508. "Failed to register a listener to the mbean " +
  509. "server: the client will not do clean when an MBean " +
  510. "is unregistered";
  511. if (logger.traceOn()) {
  512. logger.trace("init", msg, e);
  513. }
  514. }
  515. setState(STARTING);
  516. // start fetching
  517. notifFetcher = new NotifFetcher();
  518. Thread t = new Thread(notifFetcher);
  519. t.setDaemon(true);
  520. t.start();
  521. return;
  522. default:
  523. // should not
  524. throw new IOException("Unknown state.");
  525. }
  526. }
  527. /**
  528. * Import: should not remove a listener dureing reconnection, the reconnection
  529. * needs to change the listener list and that will possibly make removal fail.
  530. */
  531. private synchronized void beforeRemove() throws IOException {
  532. while (beingReconnected) {
  533. if (state == TERMINATED) {
  534. throw new IOException("Terminated.");
  535. }
  536. try {
  537. wait();
  538. } catch (InterruptedException ire) {
  539. IOException ioe = new IOException(ire.toString());
  540. EnvHelp.initCause(ioe, ire);
  541. throw ioe;
  542. }
  543. }
  544. if (state == TERMINATED) {
  545. throw new IOException("Terminated.");
  546. }
  547. }
  548. // -------------------------------------------------
  549. // private variables
  550. // -------------------------------------------------
  551. private ClassLoader defaultClassLoader = null;
  552. private HashMap infoList = new HashMap();
  553. // Integer -> ClientListenerInfo
  554. // notif stuff
  555. private long clientSequenceNumber = -1;
  556. private final int maxNotifications;
  557. private final long timeout;
  558. private NotifFetcher notifFetcher;
  559. private Integer mbeanRemovedNotifID = null;
  560. // admin stuff
  561. private boolean inited = false;
  562. // state
  563. /**
  564. * This state means that a thread is being created for fetching and forwarding notifications.
  565. */
  566. private static final int STARTING = 0;
  567. /**
  568. * This state tells that a thread has been started for fetching and forwarding notifications.
  569. */
  570. private static final int STARTED = 1;
  571. /**
  572. * This state means that the fetching thread is informed to stop.
  573. */
  574. private static final int STOPPING = 2;
  575. /**
  576. * This state means that the fetching thread is already stopped.
  577. */
  578. private static final int STOPPED = 3;
  579. /**
  580. * This state means that this object is terminated and no more thread will be created
  581. * for fetching notifications.
  582. */
  583. private static final int TERMINATED = 4;
  584. private int state = STOPPED;
  585. /**
  586. * This variable is used to tell whether a connector (RMIConnector or ClientIntermediary)
  587. * is doing reconnection.
  588. * This variable will be set to true by the method <code>preReconnection</code>, and set
  589. * fase by <code>postReconnection</code>.
  590. * When beingReconnected == true, no thread will be created for fetching notifications.
  591. */
  592. private boolean beingReconnected = false;
  593. private static final ClassLogger logger =
  594. new ClassLogger("javax.management.remote.misc",
  595. "ClientNotifForwarder");
  596. }