- /*
- * @(#)SelectorImpl.java 1.17 04/04/07
- *
- * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
- * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
- */
-
- package com.sun.corba.se.impl.transport;
-
- import java.io.IOException;
- import java.nio.channels.ClosedChannelException;
- import java.nio.channels.SelectableChannel;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.List;
-
- import com.sun.corba.se.pept.broker.Broker;
- import com.sun.corba.se.pept.transport.Acceptor;
- import com.sun.corba.se.pept.transport.Connection;
- import com.sun.corba.se.pept.transport.EventHandler;
- import com.sun.corba.se.pept.transport.ListenerThread;
- import com.sun.corba.se.pept.transport.ReaderThread;
-
- import com.sun.corba.se.spi.logging.CORBALogDomains;
- import com.sun.corba.se.spi.orb.ORB;
- import com.sun.corba.se.spi.orbutil.threadpool.Work;
- import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
- import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
-
- import com.sun.corba.se.impl.logging.ORBUtilSystemException;
- import com.sun.corba.se.impl.orbutil.ORBUtility;
-
- /**
- * @author Harold Carr
- */
- public class SelectorImpl
- extends
- Thread
- implements
- com.sun.corba.se.pept.transport.Selector
- {
- private ORB orb;
- private Selector selector;
- private long timeout;
- private List deferredRegistrations;
- private List interestOpsList;
- private HashMap listenerThreads;
- private HashMap readerThreads;
- private boolean selectorStarted;
- private boolean closed;
- private ORBUtilSystemException wrapper ;
-
-
- public SelectorImpl(ORB orb)
- {
- this.orb = orb;
- selector = null;
- selectorStarted = false;
- timeout = 60000;
- deferredRegistrations = new ArrayList();
- interestOpsList = new ArrayList();
- listenerThreads = new HashMap();
- readerThreads = new HashMap();
- closed = false;
- wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT);
- }
-
- public void setTimeout(long timeout)
- {
- this.timeout = timeout;
- }
-
- public long getTimeout()
- {
- return timeout;
- }
-
- public void registerInterestOps(EventHandler eventHandler)
- {
- if (orb.transportDebugFlag) {
- dprint(".registerInterestOps:-> " + eventHandler);
- }
-
- SelectionKey selectionKey = eventHandler.getSelectionKey();
- if (selectionKey.isValid()) {
- int ehOps = eventHandler.getInterestOps();
- SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);
- synchronized(interestOpsList) {
- interestOpsList.add(keyAndOp);
- }
- // tell Selector Thread there's an update to a SelectorKey's Ops
- selector.wakeup();
- }
- else {
- wrapper.selectionKeyInvalid(eventHandler.toString());
- if (orb.transportDebugFlag) {
- dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);
- }
- }
-
- if (orb.transportDebugFlag) {
- dprint(".registerInterestOps:<- ");
- }
- }
-
- public void registerForEvent(EventHandler eventHandler)
- {
- if (orb.transportDebugFlag) {
- dprint(".registerForEvent: " + eventHandler);
- }
-
- if (isClosed()) {
- if (orb.transportDebugFlag) {
- dprint(".registerForEvent: closed: " + eventHandler);
- }
- return;
- }
-
- if (eventHandler.shouldUseSelectThreadToWait()) {
- synchronized (deferredRegistrations) {
- deferredRegistrations.add(eventHandler);
- }
- if (! selectorStarted) {
- startSelector();
- }
- selector.wakeup();
- return;
- }
-
- switch (eventHandler.getInterestOps()) {
- case SelectionKey.OP_ACCEPT :
- createListenerThread(eventHandler);
- break;
- case SelectionKey.OP_READ :
- createReaderThread(eventHandler);
- break;
- default:
- if (orb.transportDebugFlag) {
- dprint(".registerForEvent: default: " + eventHandler);
- }
- throw new RuntimeException(
- "SelectorImpl.registerForEvent: unknown interest ops");
- }
- }
-
- public void unregisterForEvent(EventHandler eventHandler)
- {
- if (orb.transportDebugFlag) {
- dprint(".unregisterForEvent: " + eventHandler);
- }
-
- if (isClosed()) {
- if (orb.transportDebugFlag) {
- dprint(".unregisterForEvent: closed: " + eventHandler);
- }
- return;
- }
-
- if (eventHandler.shouldUseSelectThreadToWait()) {
- SelectionKey selectionKey = eventHandler.getSelectionKey();
- selectionKey.cancel();
- selector.wakeup();
- return;
- }
-
- switch (eventHandler.getInterestOps()) {
- case SelectionKey.OP_ACCEPT :
- destroyListenerThread(eventHandler);
- break;
- case SelectionKey.OP_READ :
- destroyReaderThread(eventHandler);
- break;
- default:
- if (orb.transportDebugFlag) {
- dprint(".unregisterForEvent: default: " + eventHandler);
- }
- throw new RuntimeException(
- "SelectorImpl.uregisterForEvent: unknown interest ops");
- }
- }
-
- public void close()
- {
- if (orb.transportDebugFlag) {
- dprint(".close");
- }
-
- if (isClosed()) {
- if (orb.transportDebugFlag) {
- dprint(".close: already closed");
- }
- return;
- }
-
- setClosed(true);
-
- Iterator i;
-
- // Kill listeners.
-
- i = listenerThreads.values().iterator();
- while (i.hasNext()) {
- ListenerThread listenerThread = (ListenerThread) i.next();
- listenerThread.close();
- }
-
- // Kill readers.
-
- i = readerThreads.values().iterator();
- while (i.hasNext()) {
- ReaderThread readerThread = (ReaderThread) i.next();
- readerThread.close();
- }
-
- // Selector
-
- try {
- if (selector != null) {
- // wakeup Selector thread to process close request
- selector.wakeup();
- }
- } catch (Throwable t) {
- if (orb.transportDebugFlag) {
- dprint(".close: selector.close: " + t);
- }
- }
- }
-
- ///////////////////////////////////////////////////
- //
- // Thread methods.
- //
-
- public void run()
- {
- setName("SelectorThread");
- while (!closed) {
- try {
- int n = 0;
- if (timeout == 0 && orb.transportDebugFlag) {
- dprint(".run: Beginning of selection cycle");
- }
- handleDeferredRegistrations();
- enableInterestOps();
- try {
- n = selector.select(timeout);
- } catch (IOException e) {
- if (orb.transportDebugFlag) {
- dprint(".run: selector.select: " + e);
- }
- }
- if (closed) {
- selector.close();
- if (orb.transportDebugFlag) {
- dprint(".run: closed - .run return");
- }
- return;
- }
- /*
- if (timeout == 0 && orb.transportDebugFlag) {
- dprint(".run: selector.select() returned: " + n);
- }
- if (n == 0) {
- continue;
- }
- */
- Iterator iterator = selector.selectedKeys().iterator();
- if (orb.transportDebugFlag) {
- if (iterator.hasNext()) {
- dprint(".run: n = " + n);
- }
- }
- while (iterator.hasNext()) {
- SelectionKey selectionKey = (SelectionKey) iterator.next();
- iterator.remove();
- EventHandler eventHandler = (EventHandler)
- selectionKey.attachment();
- try {
- eventHandler.handleEvent();
- } catch (Throwable t) {
- if (orb.transportDebugFlag) {
- dprint(".run: eventHandler.handleEvent", t);
- }
- }
- }
- if (timeout == 0 && orb.transportDebugFlag) {
- dprint(".run: End of selection cycle");
- }
- } catch (Throwable t) {
- // IMPORTANT: ignore all errors so the select thread keeps running.
- // Otherwise a guaranteed hang.
- if (orb.transportDebugFlag) {
- dprint(".run: ignoring", t);
- }
- }
- }
- }
-
- /////////////////////////////////////////////////////
- //
- // Implementation.
- //
-
- private synchronized boolean isClosed ()
- {
- return closed;
- }
-
- private synchronized void setClosed(boolean closed)
- {
- this.closed = closed;
- }
-
- private void startSelector()
- {
- try {
- selector = Selector.open();
- } catch (IOException e) {
- if (orb.transportDebugFlag) {
- dprint(".startSelector: Selector.open: IOException: " + e);
- }
- // REVISIT - better handling/reporting
- RuntimeException rte =
- new RuntimeException(".startSelector: Selector.open exception");
- rte.initCause(e);
- throw rte;
- }
- setDaemon(true);
- start();
- selectorStarted = true;
- if (orb.transportDebugFlag) {
- dprint(".startSelector: selector.start completed.");
- }
- }
-
- private void handleDeferredRegistrations()
- {
- synchronized (deferredRegistrations) {
- int deferredListSize = deferredRegistrations.size();
- for (int i = 0; i < deferredListSize; i++) {
- EventHandler eventHandler =
- (EventHandler)deferredRegistrations.get(i);
- if (orb.transportDebugFlag) {
- dprint(".handleDeferredRegistrations: " + eventHandler);
- }
- SelectableChannel channel = eventHandler.getChannel();
- SelectionKey selectionKey = null;
- try {
- selectionKey =
- channel.register(selector,
- eventHandler.getInterestOps(),
- (Object)eventHandler);
- } catch (ClosedChannelException e) {
- if (orb.transportDebugFlag) {
- dprint(".handleDeferredRegistrations: " + e);
- }
- }
- eventHandler.setSelectionKey(selectionKey);
- }
- deferredRegistrations.clear();
- }
- }
-
- private void enableInterestOps()
- {
- synchronized (interestOpsList) {
- int listSize = interestOpsList.size();
- if (listSize > 0) {
- if (orb.transportDebugFlag) {
- dprint(".enableInterestOps:->");
- }
- SelectionKey selectionKey = null;
- SelectionKeyAndOp keyAndOp = null;
- int keyOp, selectionKeyOps = 0;
- for (int i = 0; i < listSize; i++) {
- keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i);
- selectionKey = keyAndOp.selectionKey;
-
- // Need to check if the SelectionKey is valid because a
- // connection's SelectionKey could be put on the list to
- // have its OP enabled and before it's enabled be reclaimed.
- // Otherwise, the enabling of the OP will throw an exception
- // here and exit this method an potentially not enable all
- // registered ops.
- //
- // So, we ignore SelectionKeys that are invalid. They will get
- // cleaned up on the next Selector.select() call.
-
- if (selectionKey.isValid()) {
- if (orb.transportDebugFlag) {
- dprint(".enableInterestOps: " + keyAndOp);
- }
- keyOp = keyAndOp.keyOp;
- selectionKeyOps = selectionKey.interestOps();
- selectionKey.interestOps(selectionKeyOps | keyOp);
- }
- }
- interestOpsList.clear();
- if (orb.transportDebugFlag) {
- dprint(".enableInterestOps:<-");
- }
- }
- }
- }
-
- private void createListenerThread(EventHandler eventHandler)
- {
- if (orb.transportDebugFlag) {
- dprint(".createListenerThread: " + eventHandler);
- }
- Acceptor acceptor = eventHandler.getAcceptor();
- ListenerThread listenerThread =
- new ListenerThreadImpl(orb, acceptor, this);
- listenerThreads.put(eventHandler, listenerThread);
- Throwable throwable = null;
- try {
- orb.getThreadPoolManager().getThreadPool(0)
- .getWorkQueue(0).addWork((Work)listenerThread);
- } catch (NoSuchThreadPoolException e) {
- throwable = e;
- } catch (NoSuchWorkQueueException e) {
- throwable = e;
- }
- if (throwable != null) {
- RuntimeException rte = new RuntimeException(throwable.toString());
- rte.initCause(throwable);
- throw rte;
- }
- }
-
- private void destroyListenerThread(EventHandler eventHandler)
- {
- if (orb.transportDebugFlag) {
- dprint(".destroyListenerThread: " + eventHandler);
- }
- ListenerThread listenerThread = (ListenerThread)
- listenerThreads.get(eventHandler);
- if (listenerThread == null) {
- if (orb.transportDebugFlag) {
- dprint(".destroyListenerThread: cannot find ListenerThread - ignoring.");
- }
- return;
- }
- listenerThreads.remove(eventHandler);
- listenerThread.close();
- }
-
- private void createReaderThread(EventHandler eventHandler)
- {
- if (orb.transportDebugFlag) {
- dprint(".createReaderThread: " + eventHandler);
- }
- Connection connection = eventHandler.getConnection();
- ReaderThread readerThread =
- new ReaderThreadImpl(orb, connection, this);
- readerThreads.put(eventHandler, readerThread);
- Throwable throwable = null;
- try {
- orb.getThreadPoolManager().getThreadPool(0)
- .getWorkQueue(0).addWork((Work)readerThread);
- } catch (NoSuchThreadPoolException e) {
- throwable = e;
- } catch (NoSuchWorkQueueException e) {
- throwable = e;
- }
- if (throwable != null) {
- RuntimeException rte = new RuntimeException(throwable.toString());
- rte.initCause(throwable);
- throw rte;
- }
- }
-
- private void destroyReaderThread(EventHandler eventHandler)
- {
- if (orb.transportDebugFlag) {
- dprint(".destroyReaderThread: " + eventHandler);
- }
- ReaderThread readerThread = (ReaderThread)
- readerThreads.get(eventHandler);
- if (readerThread == null) {
- if (orb.transportDebugFlag) {
- dprint(".destroyReaderThread: cannot find ReaderThread - ignoring.");
- }
- return;
- }
- readerThreads.remove(eventHandler);
- readerThread.close();
- }
-
- private void dprint(String msg)
- {
- ORBUtility.dprint("SelectorImpl", msg);
- }
-
- protected void dprint(String msg, Throwable t)
- {
- dprint(msg);
- t.printStackTrace(System.out);
- }
-
- // Private class to contain a SelectionKey and a SelectionKey op.
- // Used only by SelectorImpl to register and enable SelectionKey
- // Op.
- // REVISIT - Could do away with this class and use the EventHanlder
- // directly.
- private class SelectionKeyAndOp
- {
- // A SelectionKey.[OP_READ|OP_WRITE|OP_ACCEPT|OP_CONNECT]
- public int keyOp;
- public SelectionKey selectionKey;
-
- // constructor
- public SelectionKeyAndOp(SelectionKey selectionKey, int keyOp) {
- this.selectionKey = selectionKey;
- this.keyOp = keyOp;
- }
- }
-
- // End of file.
- }
-