1. /*
  2. * @(#)SocketOrChannelAcceptorImpl.java 1.54 04/06/21
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package com.sun.corba.se.impl.transport;
  8. import java.io.IOException;
  9. import java.net.InetSocketAddress;
  10. import java.net.ServerSocket;
  11. import java.net.Socket;
  12. import java.nio.channels.SelectableChannel;
  13. import java.nio.channels.SelectionKey;
  14. import java.nio.channels.ServerSocketChannel;
  15. import java.nio.channels.SocketChannel;
  16. import java.security.AccessController;
  17. import java.security.PrivilegedAction;
  18. import java.util.Collection;
  19. import java.util.Iterator;
  20. import java.util.LinkedList;
  21. import org.omg.CORBA.CompletionStatus;
  22. import org.omg.CORBA.INTERNAL;
  23. import com.sun.corba.se.pept.broker.Broker;
  24. import com.sun.corba.se.pept.encoding.InputObject;
  25. import com.sun.corba.se.pept.encoding.OutputObject;
  26. import com.sun.corba.se.pept.protocol.MessageMediator;
  27. import com.sun.corba.se.pept.transport.Acceptor;
  28. import com.sun.corba.se.pept.transport.Connection;
  29. import com.sun.corba.se.pept.transport.ContactInfo;
  30. import com.sun.corba.se.pept.transport.EventHandler;
  31. import com.sun.corba.se.pept.transport.InboundConnectionCache;
  32. import com.sun.corba.se.pept.transport.Selector;
  33. import com.sun.corba.se.spi.extension.RequestPartitioningPolicy;
  34. import com.sun.corba.se.spi.ior.IORTemplate;
  35. import com.sun.corba.se.spi.ior.TaggedProfileTemplate;
  36. import com.sun.corba.se.spi.ior.iiop.IIOPAddress ;
  37. import com.sun.corba.se.spi.ior.iiop.IIOPFactories;
  38. import com.sun.corba.se.spi.ior.iiop.IIOPProfileTemplate ;
  39. import com.sun.corba.se.spi.ior.iiop.GIOPVersion ;
  40. import com.sun.corba.se.spi.ior.iiop.AlternateIIOPAddressComponent;
  41. import com.sun.corba.se.spi.legacy.connection.LegacyServerSocketEndPointInfo;
  42. import com.sun.corba.se.spi.logging.CORBALogDomains;
  43. import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase;
  44. import com.sun.corba.se.spi.monitoring.MonitoringConstants;
  45. import com.sun.corba.se.spi.monitoring.MonitoringFactories;
  46. import com.sun.corba.se.spi.monitoring.MonitoredObject;
  47. import com.sun.corba.se.spi.orb.ORB;
  48. import com.sun.corba.se.spi.orbutil.threadpool.Work;
  49. import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
  50. import com.sun.corba.se.spi.transport.CorbaAcceptor;
  51. import com.sun.corba.se.spi.transport.CorbaConnection;
  52. import com.sun.corba.se.spi.transport.CorbaContactInfo;
  53. import com.sun.corba.se.spi.transport.SocketInfo;
  54. import com.sun.corba.se.spi.transport.SocketOrChannelAcceptor;
  55. import com.sun.corba.se.impl.encoding.CDRInputObject;
  56. import com.sun.corba.se.impl.encoding.CDROutputObject;
  57. import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  58. import com.sun.corba.se.impl.oa.poa.Policies; // REVISIT impl/poa specific
  59. import com.sun.corba.se.impl.orbutil.ORBConstants;
  60. import com.sun.corba.se.impl.orbutil.ORBUtility;
  61. import com.sun.corba.se.impl.ior.iiop.JavaSerializationComponent;
  62. // BEGIN Legacy support.
  63. import com.sun.corba.se.spi.legacy.connection.LegacyServerSocketEndPointInfo;
  64. // END Legacy support.
  65. /**
  66. * @author Harold Carr
  67. */
  68. public class SocketOrChannelAcceptorImpl
  69. extends
  70. EventHandlerBase
  71. implements
  72. CorbaAcceptor,
  73. SocketOrChannelAcceptor,
  74. Work,
  75. // BEGIN Legacy
  76. SocketInfo,
  77. LegacyServerSocketEndPointInfo
  78. // END Legacy
  79. {
  80. protected ServerSocketChannel serverSocketChannel;
  81. protected ServerSocket serverSocket;
  82. protected int port;
  83. protected long enqueueTime;
  84. protected boolean initialized;
  85. protected ORBUtilSystemException wrapper ;
  86. protected InboundConnectionCache connectionCache;
  87. // BEGIN Legacy
  88. protected String type = "";
  89. protected String name = "";
  90. protected String hostname;
  91. protected int locatorPort;
  92. // END Legacy
  93. public SocketOrChannelAcceptorImpl(ORB orb)
  94. {
  95. this.orb = orb;
  96. wrapper = ORBUtilSystemException.get( orb,
  97. CORBALogDomains.RPC_TRANSPORT ) ;
  98. setWork(this);
  99. initialized = false;
  100. // BEGIN Legacy support.
  101. this.hostname = orb.getORBData().getORBServerHost();
  102. this.name = LegacyServerSocketEndPointInfo.NO_NAME;
  103. this.locatorPort = -1;
  104. // END Legacy support.
  105. }
  106. public SocketOrChannelAcceptorImpl(ORB orb, int port)
  107. {
  108. this(orb);
  109. this.port = port;
  110. }
  111. // BEGIN Legacy support.
  112. public SocketOrChannelAcceptorImpl(ORB orb, int port,
  113. String name, String type)
  114. {
  115. this(orb, port);
  116. this.name = name;
  117. this.type = type;
  118. }
  119. // END Legacy support.
  120. ////////////////////////////////////////////////////
  121. //
  122. // pept.transport.Acceptor
  123. //
  124. public boolean initialize()
  125. {
  126. if (initialized) {
  127. return false;
  128. }
  129. if (orb.transportDebugFlag) {
  130. dprint(".initialize: " + this);
  131. }
  132. InetSocketAddress inetSocketAddress = null;
  133. try {
  134. if (orb.getORBData().getListenOnAllInterfaces().equals(ORBConstants.LISTEN_ON_ALL_INTERFACES)) {
  135. inetSocketAddress = new InetSocketAddress(port);
  136. } else {
  137. String host = orb.getORBData().getORBServerHost();
  138. inetSocketAddress = new InetSocketAddress(host, port);
  139. }
  140. serverSocket = orb.getORBData().getSocketFactory()
  141. .createServerSocket(type, inetSocketAddress);
  142. internalInitialize();
  143. } catch (Throwable t) {
  144. throw wrapper.createListenerFailed( t, Integer.toString(port) ) ;
  145. }
  146. initialized = true;
  147. return true;
  148. }
  149. protected void internalInitialize()
  150. throws Exception
  151. {
  152. // Determine the listening port (for the IOR).
  153. // This is important when using emphemeral ports (i.e.,
  154. // when the port value to the constructor is 0).
  155. port = serverSocket.getLocalPort();
  156. // Register with transport (also sets up monitoring).
  157. orb.getCorbaTransportManager().getInboundConnectionCache(this);
  158. // Finish configuation.
  159. serverSocketChannel = serverSocket.getChannel();
  160. if (serverSocketChannel != null) {
  161. setUseSelectThreadToWait(
  162. orb.getORBData().acceptorSocketUseSelectThreadToWait());
  163. serverSocketChannel.configureBlocking(
  164. ! orb.getORBData().acceptorSocketUseSelectThreadToWait());
  165. } else {
  166. // Configure to use listener and reader threads.
  167. setUseSelectThreadToWait(false);
  168. }
  169. setUseWorkerThreadForEvent(
  170. orb.getORBData().acceptorSocketUseWorkerThreadForEvent());
  171. }
  172. public boolean initialized()
  173. {
  174. return initialized;
  175. }
  176. public String getConnectionCacheType()
  177. {
  178. return this.getClass().toString();
  179. }
  180. public void setConnectionCache(InboundConnectionCache connectionCache)
  181. {
  182. this.connectionCache = connectionCache;
  183. }
  184. public InboundConnectionCache getConnectionCache()
  185. {
  186. return connectionCache;
  187. }
  188. public boolean shouldRegisterAcceptEvent()
  189. {
  190. return true;
  191. }
  192. public void accept()
  193. {
  194. try {
  195. SocketChannel socketChannel = null;
  196. Socket socket = null;
  197. if (serverSocketChannel == null) {
  198. socket = serverSocket.accept();
  199. } else {
  200. socketChannel = serverSocketChannel.accept();
  201. socket = socketChannel.socket();
  202. }
  203. orb.getORBData().getSocketFactory()
  204. .setAcceptedSocketOptions(this, serverSocket, socket);
  205. if (orb.transportDebugFlag) {
  206. dprint(".accept: " +
  207. (serverSocketChannel == null
  208. ? serverSocket.toString()
  209. : serverSocketChannel.toString()));
  210. }
  211. CorbaConnection connection =
  212. new SocketOrChannelConnectionImpl(orb, this, socket);
  213. if (orb.transportDebugFlag) {
  214. dprint(".accept: new: " + connection);
  215. }
  216. // NOTE: The connection MUST be put in the cache BEFORE being
  217. // registered with the selector. Otherwise if the bytes
  218. // are read on the connection it will attempt a time stamp
  219. // but the cache will be null, resulting in NPE.
  220. getConnectionCache().put(this, connection);
  221. if (connection.shouldRegisterServerReadEvent()) {
  222. Selector selector = orb.getTransportManager().getSelector(0);
  223. selector.registerForEvent(connection.getEventHandler());
  224. }
  225. getConnectionCache().reclaim();
  226. } catch (IOException e) {
  227. if (orb.transportDebugFlag) {
  228. dprint(".accept:", e);
  229. }
  230. orb.getTransportManager().getSelector(0).unregisterForEvent(this);
  231. // REVISIT - need to close - recreate - then register new one.
  232. orb.getTransportManager().getSelector(0).registerForEvent(this);
  233. // NOTE: if register cycling we do not want to shut down ORB
  234. // since local beans will still work. Instead one will see
  235. // a growing log file to alert admin of problem.
  236. }
  237. }
  238. public void close ()
  239. {
  240. try {
  241. if (orb.transportDebugFlag) {
  242. dprint(".close->:");
  243. }
  244. Selector selector = orb.getTransportManager().getSelector(0);
  245. selector.unregisterForEvent(this);
  246. if (serverSocketChannel != null) {
  247. serverSocketChannel.close();
  248. }
  249. if (serverSocket != null) {
  250. serverSocket.close();
  251. }
  252. } catch (IOException e) {
  253. if (orb.transportDebugFlag) {
  254. dprint(".close:", e);
  255. }
  256. } finally {
  257. if (orb.transportDebugFlag) {
  258. dprint(".close<-:");
  259. }
  260. }
  261. }
  262. public EventHandler getEventHandler()
  263. {
  264. return this;
  265. }
  266. ////////////////////////////////////////////////////
  267. //
  268. // CorbaAcceptor
  269. //
  270. public String getObjectAdapterId()
  271. {
  272. return null;
  273. }
  274. public String getObjectAdapterManagerId()
  275. {
  276. return null;
  277. }
  278. public void addToIORTemplate(IORTemplate iorTemplate,
  279. Policies policies,
  280. String codebase)
  281. {
  282. Iterator iterator = iorTemplate.iteratorById(
  283. org.omg.IOP.TAG_INTERNET_IOP.value);
  284. String hostname = orb.getORBData().getORBServerHost();
  285. if (iterator.hasNext()) {
  286. // REVISIT - how does this play with legacy ORBD port exchange?
  287. IIOPAddress iiopAddress =
  288. IIOPFactories.makeIIOPAddress(orb, hostname, port);
  289. AlternateIIOPAddressComponent iiopAddressComponent =
  290. IIOPFactories.makeAlternateIIOPAddressComponent(iiopAddress);
  291. while (iterator.hasNext()) {
  292. TaggedProfileTemplate taggedProfileTemplate =
  293. (TaggedProfileTemplate) iterator.next();
  294. taggedProfileTemplate.add(iiopAddressComponent);
  295. }
  296. } else {
  297. GIOPVersion version = orb.getORBData().getGIOPVersion();
  298. int templatePort;
  299. if (policies.forceZeroPort()) {
  300. templatePort = 0;
  301. } else if (policies.isTransient()) {
  302. templatePort = port;
  303. } else {
  304. templatePort = orb.getLegacyServerSocketManager()
  305. .legacyGetPersistentServerPort(SocketInfo.IIOP_CLEAR_TEXT);
  306. }
  307. IIOPAddress addr =
  308. IIOPFactories.makeIIOPAddress(orb, hostname, templatePort);
  309. IIOPProfileTemplate iiopProfile =
  310. IIOPFactories.makeIIOPProfileTemplate(orb, version, addr);
  311. if (version.supportsIORIIOPProfileComponents()) {
  312. iiopProfile.add(IIOPFactories.makeCodeSetsComponent(orb));
  313. iiopProfile.add(IIOPFactories.makeMaxStreamFormatVersionComponent());
  314. RequestPartitioningPolicy rpPolicy = (RequestPartitioningPolicy)
  315. policies.get_effective_policy(
  316. ORBConstants.REQUEST_PARTITIONING_POLICY);
  317. if (rpPolicy != null) {
  318. iiopProfile.add(
  319. IIOPFactories.makeRequestPartitioningComponent(
  320. rpPolicy.getValue()));
  321. }
  322. if (codebase != null && codebase != "") {
  323. iiopProfile.add(IIOPFactories. makeJavaCodebaseComponent(codebase));
  324. }
  325. if (orb.getORBData().isJavaSerializationEnabled()) {
  326. iiopProfile.add(
  327. IIOPFactories.makeJavaSerializationComponent());
  328. }
  329. }
  330. iorTemplate.add(iiopProfile);
  331. }
  332. }
  333. public String getMonitoringName()
  334. {
  335. return "AcceptedConnections";
  336. }
  337. ////////////////////////////////////////////////////
  338. //
  339. // EventHandler methods
  340. //
  341. public SelectableChannel getChannel()
  342. {
  343. return serverSocketChannel;
  344. }
  345. public int getInterestOps()
  346. {
  347. return SelectionKey.OP_ACCEPT;
  348. }
  349. public Acceptor getAcceptor()
  350. {
  351. return this;
  352. }
  353. public Connection getConnection()
  354. {
  355. throw new RuntimeException("Should not happen.");
  356. }
  357. ////////////////////////////////////////////////////
  358. //
  359. // Work methods.
  360. //
  361. /* CONFLICT: with legacy below.
  362. public String getName()
  363. {
  364. return this.toString();
  365. }
  366. */
  367. public void doWork()
  368. {
  369. try {
  370. if (orb.transportDebugFlag) {
  371. dprint(".doWork->: " + this);
  372. }
  373. if (selectionKey.isAcceptable()) {
  374. AccessController.doPrivileged(new PrivilegedAction() {
  375. public java.lang.Object run() {
  376. accept();
  377. return null;
  378. }
  379. });
  380. } else {
  381. if (orb.transportDebugFlag) {
  382. dprint(".doWork: ! selectionKey.isAcceptable: " + this);
  383. }
  384. }
  385. } catch (SecurityException se) {
  386. if (orb.transportDebugFlag) {
  387. dprint(".doWork: ignoring SecurityException: "
  388. + se
  389. + " " + this);
  390. }
  391. String permissionStr = ORBUtility.getClassSecurityInfo(getClass());
  392. wrapper.securityExceptionInAccept(se, permissionStr);
  393. } catch (Exception ex) {
  394. if (orb.transportDebugFlag) {
  395. dprint(".doWork: ignoring Exception: "
  396. + ex
  397. + " " + this);
  398. }
  399. wrapper.exceptionInAccept(ex);
  400. } catch (Throwable t) {
  401. if (orb.transportDebugFlag) {
  402. dprint(".doWork: ignoring Throwable: "
  403. + t
  404. + " " + this);
  405. }
  406. } finally {
  407. // IMPORTANT: To avoid bug (4953599), we force the
  408. // Thread that does the NIO select to also do the
  409. // enable/disable of Ops using SelectionKey.interestOps().
  410. // Otherwise, the SelectionKey.interestOps() may block
  411. // indefinitely.
  412. // NOTE: If "acceptorSocketUseWorkerThreadForEvent" is
  413. // set to to false in ParserTable.java, then this method,
  414. // doWork(), will get executed by the same thread
  415. // (SelectorThread) that does the NIO select.
  416. // If "acceptorSocketUseWorkerThreadForEvent" is set
  417. // to true, a WorkerThread will execute this method,
  418. // doWork(). Hence, the registering of the enabling of
  419. // the SelectionKey's interestOps is done here instead
  420. // of calling SelectionKey.interestOps(<interest op>).
  421. Selector selector = orb.getTransportManager().getSelector(0);
  422. selector.registerInterestOps(this);
  423. if (orb.transportDebugFlag) {
  424. dprint(".doWork<-:" + this);
  425. }
  426. }
  427. }
  428. public void setEnqueueTime(long timeInMillis)
  429. {
  430. enqueueTime = timeInMillis;
  431. }
  432. public long getEnqueueTime()
  433. {
  434. return enqueueTime;
  435. }
  436. //
  437. // Factory methods.
  438. //
  439. // REVISIT: refactor into common base or delegate.
  440. public MessageMediator createMessageMediator(Broker broker,
  441. Connection connection)
  442. {
  443. // REVISIT - no factoring so cheat to avoid code dup right now.
  444. // REVISIT **** COUPLING !!!!
  445. ContactInfo contactInfo = new SocketOrChannelContactInfoImpl();
  446. return contactInfo.createMessageMediator(broker, connection);
  447. }
  448. // REVISIT: refactor into common base or delegate.
  449. public MessageMediator finishCreatingMessageMediator(Broker broker,
  450. Connection connection,
  451. MessageMediator messageMediator)
  452. {
  453. // REVISIT - no factoring so cheat to avoid code dup right now.
  454. // REVISIT **** COUPLING !!!!
  455. ContactInfo contactInfo = new SocketOrChannelContactInfoImpl();
  456. return contactInfo.finishCreatingMessageMediator(broker,
  457. connection, messageMediator);
  458. }
  459. public InputObject createInputObject(Broker broker,
  460. MessageMediator messageMediator)
  461. {
  462. CorbaMessageMediator corbaMessageMediator = (CorbaMessageMediator)
  463. messageMediator;
  464. return new CDRInputObject((ORB)broker,
  465. (CorbaConnection)messageMediator.getConnection(),
  466. corbaMessageMediator.getDispatchBuffer(),
  467. corbaMessageMediator.getDispatchHeader());
  468. }
  469. public OutputObject createOutputObject(Broker broker,
  470. MessageMediator messageMediator)
  471. {
  472. CorbaMessageMediator corbaMessageMediator = (CorbaMessageMediator)
  473. messageMediator;
  474. return new CDROutputObject((ORB) broker, corbaMessageMediator,
  475. corbaMessageMediator.getReplyHeader(),
  476. corbaMessageMediator.getStreamFormatVersion());
  477. }
  478. ////////////////////////////////////////////////////
  479. //
  480. // SocketOrChannelAcceptor
  481. //
  482. public ServerSocket getServerSocket()
  483. {
  484. return serverSocket;
  485. }
  486. ////////////////////////////////////////////////////
  487. //
  488. // Implementation.
  489. //
  490. public String toString()
  491. {
  492. String sock;
  493. if (serverSocketChannel == null) {
  494. if (serverSocket == null) {
  495. sock = "(not initialized)";
  496. } else {
  497. sock = serverSocket.toString();
  498. }
  499. } else {
  500. sock = serverSocketChannel.toString();
  501. }
  502. return
  503. toStringName() +
  504. "["
  505. + sock + " "
  506. + type + " "
  507. + shouldUseSelectThreadToWait() + " "
  508. + shouldUseWorkerThreadForEvent()
  509. + "]" ;
  510. }
  511. protected String toStringName()
  512. {
  513. return "SocketOrChannelAcceptorImpl";
  514. }
  515. protected void dprint(String msg)
  516. {
  517. ORBUtility.dprint(toStringName(), msg);
  518. }
  519. protected void dprint(String msg, Throwable t)
  520. {
  521. dprint(msg);
  522. t.printStackTrace(System.out);
  523. }
  524. // BEGIN Legacy support
  525. ////////////////////////////////////////////////////
  526. //
  527. // LegacyServerSocketEndPointInfo and EndPointInfo
  528. //
  529. public String getType()
  530. {
  531. return type;
  532. }
  533. public String getHostName()
  534. {
  535. return hostname;
  536. }
  537. public String getHost()
  538. {
  539. return hostname;
  540. }
  541. public int getPort()
  542. {
  543. return port;
  544. }
  545. public int getLocatorPort()
  546. {
  547. return locatorPort;
  548. }
  549. public void setLocatorPort (int port)
  550. {
  551. locatorPort = port;
  552. }
  553. public String getName()
  554. {
  555. // Kluge alert:
  556. // Work and Legacy both define getName.
  557. // Try to make this behave best for most cases.
  558. String result =
  559. name.equals(LegacyServerSocketEndPointInfo.NO_NAME) ?
  560. this.toString() : name;
  561. return result;
  562. }
  563. // END Legacy support
  564. }
  565. // End of file.