1. /*
  2. * @(#)SelectorImpl.java 1.17 04/04/07
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package com.sun.corba.se.impl.transport;
  8. import java.io.IOException;
  9. import java.nio.channels.ClosedChannelException;
  10. import java.nio.channels.SelectableChannel;
  11. import java.nio.channels.SelectionKey;
  12. import java.nio.channels.Selector;
  13. import java.util.ArrayList;
  14. import java.util.HashMap;
  15. import java.util.Iterator;
  16. import java.util.List;
  17. import com.sun.corba.se.pept.broker.Broker;
  18. import com.sun.corba.se.pept.transport.Acceptor;
  19. import com.sun.corba.se.pept.transport.Connection;
  20. import com.sun.corba.se.pept.transport.EventHandler;
  21. import com.sun.corba.se.pept.transport.ListenerThread;
  22. import com.sun.corba.se.pept.transport.ReaderThread;
  23. import com.sun.corba.se.spi.logging.CORBALogDomains;
  24. import com.sun.corba.se.spi.orb.ORB;
  25. import com.sun.corba.se.spi.orbutil.threadpool.Work;
  26. import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
  27. import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
  28. import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  29. import com.sun.corba.se.impl.orbutil.ORBUtility;
  30. /**
  31. * @author Harold Carr
  32. */
  33. public class SelectorImpl
  34. extends
  35. Thread
  36. implements
  37. com.sun.corba.se.pept.transport.Selector
  38. {
  39. private ORB orb;
  40. private Selector selector;
  41. private long timeout;
  42. private List deferredRegistrations;
  43. private List interestOpsList;
  44. private HashMap listenerThreads;
  45. private HashMap readerThreads;
  46. private boolean selectorStarted;
  47. private boolean closed;
  48. private ORBUtilSystemException wrapper ;
  49. public SelectorImpl(ORB orb)
  50. {
  51. this.orb = orb;
  52. selector = null;
  53. selectorStarted = false;
  54. timeout = 60000;
  55. deferredRegistrations = new ArrayList();
  56. interestOpsList = new ArrayList();
  57. listenerThreads = new HashMap();
  58. readerThreads = new HashMap();
  59. closed = false;
  60. wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT);
  61. }
  62. public void setTimeout(long timeout)
  63. {
  64. this.timeout = timeout;
  65. }
  66. public long getTimeout()
  67. {
  68. return timeout;
  69. }
  70. public void registerInterestOps(EventHandler eventHandler)
  71. {
  72. if (orb.transportDebugFlag) {
  73. dprint(".registerInterestOps:-> " + eventHandler);
  74. }
  75. SelectionKey selectionKey = eventHandler.getSelectionKey();
  76. if (selectionKey.isValid()) {
  77. int ehOps = eventHandler.getInterestOps();
  78. SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);
  79. synchronized(interestOpsList) {
  80. interestOpsList.add(keyAndOp);
  81. }
  82. // tell Selector Thread there's an update to a SelectorKey's Ops
  83. selector.wakeup();
  84. }
  85. else {
  86. wrapper.selectionKeyInvalid(eventHandler.toString());
  87. if (orb.transportDebugFlag) {
  88. dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);
  89. }
  90. }
  91. if (orb.transportDebugFlag) {
  92. dprint(".registerInterestOps:<- ");
  93. }
  94. }
  95. public void registerForEvent(EventHandler eventHandler)
  96. {
  97. if (orb.transportDebugFlag) {
  98. dprint(".registerForEvent: " + eventHandler);
  99. }
  100. if (isClosed()) {
  101. if (orb.transportDebugFlag) {
  102. dprint(".registerForEvent: closed: " + eventHandler);
  103. }
  104. return;
  105. }
  106. if (eventHandler.shouldUseSelectThreadToWait()) {
  107. synchronized (deferredRegistrations) {
  108. deferredRegistrations.add(eventHandler);
  109. }
  110. if (! selectorStarted) {
  111. startSelector();
  112. }
  113. selector.wakeup();
  114. return;
  115. }
  116. switch (eventHandler.getInterestOps()) {
  117. case SelectionKey.OP_ACCEPT :
  118. createListenerThread(eventHandler);
  119. break;
  120. case SelectionKey.OP_READ :
  121. createReaderThread(eventHandler);
  122. break;
  123. default:
  124. if (orb.transportDebugFlag) {
  125. dprint(".registerForEvent: default: " + eventHandler);
  126. }
  127. throw new RuntimeException(
  128. "SelectorImpl.registerForEvent: unknown interest ops");
  129. }
  130. }
  131. public void unregisterForEvent(EventHandler eventHandler)
  132. {
  133. if (orb.transportDebugFlag) {
  134. dprint(".unregisterForEvent: " + eventHandler);
  135. }
  136. if (isClosed()) {
  137. if (orb.transportDebugFlag) {
  138. dprint(".unregisterForEvent: closed: " + eventHandler);
  139. }
  140. return;
  141. }
  142. if (eventHandler.shouldUseSelectThreadToWait()) {
  143. SelectionKey selectionKey = eventHandler.getSelectionKey();
  144. selectionKey.cancel();
  145. selector.wakeup();
  146. return;
  147. }
  148. switch (eventHandler.getInterestOps()) {
  149. case SelectionKey.OP_ACCEPT :
  150. destroyListenerThread(eventHandler);
  151. break;
  152. case SelectionKey.OP_READ :
  153. destroyReaderThread(eventHandler);
  154. break;
  155. default:
  156. if (orb.transportDebugFlag) {
  157. dprint(".unregisterForEvent: default: " + eventHandler);
  158. }
  159. throw new RuntimeException(
  160. "SelectorImpl.uregisterForEvent: unknown interest ops");
  161. }
  162. }
  163. public void close()
  164. {
  165. if (orb.transportDebugFlag) {
  166. dprint(".close");
  167. }
  168. if (isClosed()) {
  169. if (orb.transportDebugFlag) {
  170. dprint(".close: already closed");
  171. }
  172. return;
  173. }
  174. setClosed(true);
  175. Iterator i;
  176. // Kill listeners.
  177. i = listenerThreads.values().iterator();
  178. while (i.hasNext()) {
  179. ListenerThread listenerThread = (ListenerThread) i.next();
  180. listenerThread.close();
  181. }
  182. // Kill readers.
  183. i = readerThreads.values().iterator();
  184. while (i.hasNext()) {
  185. ReaderThread readerThread = (ReaderThread) i.next();
  186. readerThread.close();
  187. }
  188. // Selector
  189. try {
  190. if (selector != null) {
  191. // wakeup Selector thread to process close request
  192. selector.wakeup();
  193. }
  194. } catch (Throwable t) {
  195. if (orb.transportDebugFlag) {
  196. dprint(".close: selector.close: " + t);
  197. }
  198. }
  199. }
  200. ///////////////////////////////////////////////////
  201. //
  202. // Thread methods.
  203. //
  204. public void run()
  205. {
  206. setName("SelectorThread");
  207. while (!closed) {
  208. try {
  209. int n = 0;
  210. if (timeout == 0 && orb.transportDebugFlag) {
  211. dprint(".run: Beginning of selection cycle");
  212. }
  213. handleDeferredRegistrations();
  214. enableInterestOps();
  215. try {
  216. n = selector.select(timeout);
  217. } catch (IOException e) {
  218. if (orb.transportDebugFlag) {
  219. dprint(".run: selector.select: " + e);
  220. }
  221. }
  222. if (closed) {
  223. selector.close();
  224. if (orb.transportDebugFlag) {
  225. dprint(".run: closed - .run return");
  226. }
  227. return;
  228. }
  229. /*
  230. if (timeout == 0 && orb.transportDebugFlag) {
  231. dprint(".run: selector.select() returned: " + n);
  232. }
  233. if (n == 0) {
  234. continue;
  235. }
  236. */
  237. Iterator iterator = selector.selectedKeys().iterator();
  238. if (orb.transportDebugFlag) {
  239. if (iterator.hasNext()) {
  240. dprint(".run: n = " + n);
  241. }
  242. }
  243. while (iterator.hasNext()) {
  244. SelectionKey selectionKey = (SelectionKey) iterator.next();
  245. iterator.remove();
  246. EventHandler eventHandler = (EventHandler)
  247. selectionKey.attachment();
  248. try {
  249. eventHandler.handleEvent();
  250. } catch (Throwable t) {
  251. if (orb.transportDebugFlag) {
  252. dprint(".run: eventHandler.handleEvent", t);
  253. }
  254. }
  255. }
  256. if (timeout == 0 && orb.transportDebugFlag) {
  257. dprint(".run: End of selection cycle");
  258. }
  259. } catch (Throwable t) {
  260. // IMPORTANT: ignore all errors so the select thread keeps running.
  261. // Otherwise a guaranteed hang.
  262. if (orb.transportDebugFlag) {
  263. dprint(".run: ignoring", t);
  264. }
  265. }
  266. }
  267. }
  268. /////////////////////////////////////////////////////
  269. //
  270. // Implementation.
  271. //
  272. private synchronized boolean isClosed ()
  273. {
  274. return closed;
  275. }
  276. private synchronized void setClosed(boolean closed)
  277. {
  278. this.closed = closed;
  279. }
  280. private void startSelector()
  281. {
  282. try {
  283. selector = Selector.open();
  284. } catch (IOException e) {
  285. if (orb.transportDebugFlag) {
  286. dprint(".startSelector: Selector.open: IOException: " + e);
  287. }
  288. // REVISIT - better handling/reporting
  289. RuntimeException rte =
  290. new RuntimeException(".startSelector: Selector.open exception");
  291. rte.initCause(e);
  292. throw rte;
  293. }
  294. setDaemon(true);
  295. start();
  296. selectorStarted = true;
  297. if (orb.transportDebugFlag) {
  298. dprint(".startSelector: selector.start completed.");
  299. }
  300. }
  301. private void handleDeferredRegistrations()
  302. {
  303. synchronized (deferredRegistrations) {
  304. int deferredListSize = deferredRegistrations.size();
  305. for (int i = 0; i < deferredListSize; i++) {
  306. EventHandler eventHandler =
  307. (EventHandler)deferredRegistrations.get(i);
  308. if (orb.transportDebugFlag) {
  309. dprint(".handleDeferredRegistrations: " + eventHandler);
  310. }
  311. SelectableChannel channel = eventHandler.getChannel();
  312. SelectionKey selectionKey = null;
  313. try {
  314. selectionKey =
  315. channel.register(selector,
  316. eventHandler.getInterestOps(),
  317. (Object)eventHandler);
  318. } catch (ClosedChannelException e) {
  319. if (orb.transportDebugFlag) {
  320. dprint(".handleDeferredRegistrations: " + e);
  321. }
  322. }
  323. eventHandler.setSelectionKey(selectionKey);
  324. }
  325. deferredRegistrations.clear();
  326. }
  327. }
  328. private void enableInterestOps()
  329. {
  330. synchronized (interestOpsList) {
  331. int listSize = interestOpsList.size();
  332. if (listSize > 0) {
  333. if (orb.transportDebugFlag) {
  334. dprint(".enableInterestOps:->");
  335. }
  336. SelectionKey selectionKey = null;
  337. SelectionKeyAndOp keyAndOp = null;
  338. int keyOp, selectionKeyOps = 0;
  339. for (int i = 0; i < listSize; i++) {
  340. keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i);
  341. selectionKey = keyAndOp.selectionKey;
  342. // Need to check if the SelectionKey is valid because a
  343. // connection's SelectionKey could be put on the list to
  344. // have its OP enabled and before it's enabled be reclaimed.
  345. // Otherwise, the enabling of the OP will throw an exception
  346. // here and exit this method an potentially not enable all
  347. // registered ops.
  348. //
  349. // So, we ignore SelectionKeys that are invalid. They will get
  350. // cleaned up on the next Selector.select() call.
  351. if (selectionKey.isValid()) {
  352. if (orb.transportDebugFlag) {
  353. dprint(".enableInterestOps: " + keyAndOp);
  354. }
  355. keyOp = keyAndOp.keyOp;
  356. selectionKeyOps = selectionKey.interestOps();
  357. selectionKey.interestOps(selectionKeyOps | keyOp);
  358. }
  359. }
  360. interestOpsList.clear();
  361. if (orb.transportDebugFlag) {
  362. dprint(".enableInterestOps:<-");
  363. }
  364. }
  365. }
  366. }
  367. private void createListenerThread(EventHandler eventHandler)
  368. {
  369. if (orb.transportDebugFlag) {
  370. dprint(".createListenerThread: " + eventHandler);
  371. }
  372. Acceptor acceptor = eventHandler.getAcceptor();
  373. ListenerThread listenerThread =
  374. new ListenerThreadImpl(orb, acceptor, this);
  375. listenerThreads.put(eventHandler, listenerThread);
  376. Throwable throwable = null;
  377. try {
  378. orb.getThreadPoolManager().getThreadPool(0)
  379. .getWorkQueue(0).addWork((Work)listenerThread);
  380. } catch (NoSuchThreadPoolException e) {
  381. throwable = e;
  382. } catch (NoSuchWorkQueueException e) {
  383. throwable = e;
  384. }
  385. if (throwable != null) {
  386. RuntimeException rte = new RuntimeException(throwable.toString());
  387. rte.initCause(throwable);
  388. throw rte;
  389. }
  390. }
  391. private void destroyListenerThread(EventHandler eventHandler)
  392. {
  393. if (orb.transportDebugFlag) {
  394. dprint(".destroyListenerThread: " + eventHandler);
  395. }
  396. ListenerThread listenerThread = (ListenerThread)
  397. listenerThreads.get(eventHandler);
  398. if (listenerThread == null) {
  399. if (orb.transportDebugFlag) {
  400. dprint(".destroyListenerThread: cannot find ListenerThread - ignoring.");
  401. }
  402. return;
  403. }
  404. listenerThreads.remove(eventHandler);
  405. listenerThread.close();
  406. }
  407. private void createReaderThread(EventHandler eventHandler)
  408. {
  409. if (orb.transportDebugFlag) {
  410. dprint(".createReaderThread: " + eventHandler);
  411. }
  412. Connection connection = eventHandler.getConnection();
  413. ReaderThread readerThread =
  414. new ReaderThreadImpl(orb, connection, this);
  415. readerThreads.put(eventHandler, readerThread);
  416. Throwable throwable = null;
  417. try {
  418. orb.getThreadPoolManager().getThreadPool(0)
  419. .getWorkQueue(0).addWork((Work)readerThread);
  420. } catch (NoSuchThreadPoolException e) {
  421. throwable = e;
  422. } catch (NoSuchWorkQueueException e) {
  423. throwable = e;
  424. }
  425. if (throwable != null) {
  426. RuntimeException rte = new RuntimeException(throwable.toString());
  427. rte.initCause(throwable);
  428. throw rte;
  429. }
  430. }
  431. private void destroyReaderThread(EventHandler eventHandler)
  432. {
  433. if (orb.transportDebugFlag) {
  434. dprint(".destroyReaderThread: " + eventHandler);
  435. }
  436. ReaderThread readerThread = (ReaderThread)
  437. readerThreads.get(eventHandler);
  438. if (readerThread == null) {
  439. if (orb.transportDebugFlag) {
  440. dprint(".destroyReaderThread: cannot find ReaderThread - ignoring.");
  441. }
  442. return;
  443. }
  444. readerThreads.remove(eventHandler);
  445. readerThread.close();
  446. }
  447. private void dprint(String msg)
  448. {
  449. ORBUtility.dprint("SelectorImpl", msg);
  450. }
  451. protected void dprint(String msg, Throwable t)
  452. {
  453. dprint(msg);
  454. t.printStackTrace(System.out);
  455. }
  456. // Private class to contain a SelectionKey and a SelectionKey op.
  457. // Used only by SelectorImpl to register and enable SelectionKey
  458. // Op.
  459. // REVISIT - Could do away with this class and use the EventHanlder
  460. // directly.
  461. private class SelectionKeyAndOp
  462. {
  463. // A SelectionKey.[OP_READ|OP_WRITE|OP_ACCEPT|OP_CONNECT]
  464. public int keyOp;
  465. public SelectionKey selectionKey;
  466. // constructor
  467. public SelectionKeyAndOp(SelectionKey selectionKey, int keyOp) {
  468. this.selectionKey = selectionKey;
  469. this.keyOp = keyOp;
  470. }
  471. }
  472. // End of file.
  473. }