1. /*
  2. * @(#)SocketOrChannelConnectionImpl.java 1.91 04/06/21
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package com.sun.corba.se.impl.transport;
  8. import java.io.IOException;
  9. import java.net.InetSocketAddress;
  10. import java.net.Socket;
  11. import java.nio.ByteBuffer;
  12. import java.nio.channels.SelectableChannel;
  13. import java.nio.channels.SelectionKey;
  14. import java.nio.channels.SocketChannel;
  15. import java.security.AccessController;
  16. import java.security.PrivilegedAction;
  17. import java.util.Collections;
  18. import java.util.Hashtable;
  19. import java.util.HashMap;
  20. import java.util.Map;
  21. import org.omg.CORBA.COMM_FAILURE;
  22. import org.omg.CORBA.CompletionStatus;
  23. import org.omg.CORBA.DATA_CONVERSION;
  24. import org.omg.CORBA.INTERNAL;
  25. import org.omg.CORBA.MARSHAL;
  26. import org.omg.CORBA.OBJECT_NOT_EXIST;
  27. import org.omg.CORBA.SystemException;
  28. import com.sun.org.omg.SendingContext.CodeBase;
  29. import com.sun.corba.se.pept.broker.Broker;
  30. import com.sun.corba.se.pept.encoding.InputObject;
  31. import com.sun.corba.se.pept.encoding.OutputObject;
  32. import com.sun.corba.se.pept.protocol.MessageMediator;
  33. import com.sun.corba.se.pept.transport.Acceptor;
  34. import com.sun.corba.se.pept.transport.Connection;
  35. import com.sun.corba.se.pept.transport.ConnectionCache;
  36. import com.sun.corba.se.pept.transport.ContactInfo;
  37. import com.sun.corba.se.pept.transport.EventHandler;
  38. import com.sun.corba.se.pept.transport.InboundConnectionCache;
  39. import com.sun.corba.se.pept.transport.OutboundConnectionCache;
  40. import com.sun.corba.se.pept.transport.ResponseWaitingRoom;
  41. import com.sun.corba.se.pept.transport.Selector;
  42. import com.sun.corba.se.spi.ior.IOR;
  43. import com.sun.corba.se.spi.ior.iiop.GIOPVersion;
  44. import com.sun.corba.se.spi.logging.CORBALogDomains;
  45. import com.sun.corba.se.spi.orb.ORB ;
  46. import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
  47. import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
  48. import com.sun.corba.se.spi.orbutil.threadpool.Work;
  49. import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
  50. import com.sun.corba.se.spi.transport.CorbaContactInfo;
  51. import com.sun.corba.se.spi.transport.CorbaConnection;
  52. import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
  53. import com.sun.corba.se.spi.transport.ReadTimeouts;
  54. import com.sun.corba.se.impl.encoding.CachedCodeBase;
  55. import com.sun.corba.se.impl.encoding.CDRInputStream_1_0;
  56. import com.sun.corba.se.impl.encoding.CDROutputObject;
  57. import com.sun.corba.se.impl.encoding.CDROutputStream_1_0;
  58. import com.sun.corba.se.impl.encoding.CodeSetComponentInfo;
  59. import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry;
  60. import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  61. import com.sun.corba.se.impl.orbutil.ORBConstants;
  62. import com.sun.corba.se.impl.orbutil.ORBUtility;
  63. import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
  64. import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;
  65. import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl;
  66. /**
  67. * @author Harold Carr
  68. */
  69. public class SocketOrChannelConnectionImpl
  70. extends
  71. EventHandlerBase
  72. implements
  73. CorbaConnection,
  74. Work
  75. {
  76. public static boolean dprintWriteLocks = false;
  77. //
  78. // New transport.
  79. //
  80. protected long enqueueTime;
  81. protected SocketChannel socketChannel;
  82. public SocketChannel getSocketChannel()
  83. {
  84. return socketChannel;
  85. }
  86. // REVISIT:
  87. // protected for test: genericRPCMSGFramework.IIOPConnection constructor.
  88. protected CorbaContactInfo contactInfo;
  89. protected Acceptor acceptor;
  90. protected ConnectionCache connectionCache;
  91. //
  92. // From iiop.Connection.java
  93. //
  94. protected Socket socket; // The socket used for this connection.
  95. protected long timeStamp = 0;
  96. protected boolean isServer = false;
  97. // Start at some value other than zero since this is a magic
  98. // value in some protocols.
  99. protected int requestId = 5;
  100. protected CorbaResponseWaitingRoom responseWaitingRoom;
  101. protected int state;
  102. protected java.lang.Object stateEvent = new java.lang.Object();
  103. protected java.lang.Object writeEvent = new java.lang.Object();
  104. protected boolean writeLocked;
  105. protected int serverRequestCount = 0;
  106. // Server request map: used on the server side of Connection
  107. // Maps request ID to IIOPInputStream.
  108. Map serverRequestMap = null;
  109. // This is a flag associated per connection telling us if the
  110. // initial set of sending contexts were sent to the receiver
  111. // already...
  112. protected boolean postInitialContexts = false;
  113. // Remote reference to CodeBase server (supplies
  114. // FullValueDescription, among other things)
  115. protected IOR codeBaseServerIOR;
  116. // CodeBase cache for this connection. This will cache remote operations,
  117. // handle connecting, and ensure we don't do any remote operations until
  118. // necessary.
  119. protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this);
  120. protected ORBUtilSystemException wrapper ;
  121. // transport read timeout values
  122. protected ReadTimeouts readTimeouts;
  123. protected boolean shouldReadGiopHeaderOnly;
  124. // A message mediator used when shouldReadGiopHeaderOnly is
  125. // true to maintain request message state across execution in a
  126. // SelectorThread and WorkerThread.
  127. protected CorbaMessageMediator partialMessageMediator = null;
  128. // Used in genericRPCMSGFramework test.
  129. protected SocketOrChannelConnectionImpl(ORB orb)
  130. {
  131. this.orb = orb;
  132. wrapper = ORBUtilSystemException.get( orb,
  133. CORBALogDomains.RPC_TRANSPORT ) ;
  134. setWork(this);
  135. responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this);
  136. setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts());
  137. }
  138. // Both client and servers.
  139. protected SocketOrChannelConnectionImpl(ORB orb,
  140. boolean useSelectThreadToWait,
  141. boolean useWorkerThread)
  142. {
  143. this(orb) ;
  144. setUseSelectThreadToWait(useSelectThreadToWait);
  145. setUseWorkerThreadForEvent(useWorkerThread);
  146. }
  147. // Client constructor.
  148. public SocketOrChannelConnectionImpl(ORB orb,
  149. CorbaContactInfo contactInfo,
  150. boolean useSelectThreadToWait,
  151. boolean useWorkerThread,
  152. String socketType,
  153. String hostname,
  154. int port)
  155. {
  156. this(orb, useSelectThreadToWait, useWorkerThread);
  157. this.contactInfo = contactInfo;
  158. try {
  159. socket = orb.getORBData().getSocketFactory()
  160. .createSocket(socketType,
  161. new InetSocketAddress(hostname, port));
  162. socketChannel = socket.getChannel();
  163. if (socketChannel != null) {
  164. boolean isBlocking = !useSelectThreadToWait;
  165. socketChannel.configureBlocking(isBlocking);
  166. } else {
  167. // IMPORTANT: non-channel-backed sockets must use
  168. // dedicated reader threads.
  169. setUseSelectThreadToWait(false);
  170. }
  171. if (orb.transportDebugFlag) {
  172. dprint(".initialize: connection created: " + socket);
  173. }
  174. } catch (Throwable t) {
  175. throw wrapper.connectFailure(t, socketType, hostname,
  176. Integer.toString(port));
  177. }
  178. state = OPENING;
  179. }
  180. // Client-side convenience.
  181. public SocketOrChannelConnectionImpl(ORB orb,
  182. CorbaContactInfo contactInfo,
  183. String socketType,
  184. String hostname,
  185. int port)
  186. {
  187. this(orb, contactInfo,
  188. orb.getORBData().connectionSocketUseSelectThreadToWait(),
  189. orb.getORBData().connectionSocketUseWorkerThreadForEvent(),
  190. socketType, hostname, port);
  191. }
  192. // Server-side constructor.
  193. public SocketOrChannelConnectionImpl(ORB orb,
  194. Acceptor acceptor,
  195. Socket socket,
  196. boolean useSelectThreadToWait,
  197. boolean useWorkerThread)
  198. {
  199. this(orb, useSelectThreadToWait, useWorkerThread);
  200. this.socket = socket;
  201. socketChannel = socket.getChannel();
  202. if (socketChannel != null) {
  203. // REVISIT
  204. try {
  205. boolean isBlocking = !useSelectThreadToWait;
  206. socketChannel.configureBlocking(isBlocking);
  207. } catch (IOException e) {
  208. RuntimeException rte = new RuntimeException();
  209. rte.initCause(e);
  210. throw rte;
  211. }
  212. }
  213. this.acceptor = acceptor;
  214. serverRequestMap = Collections.synchronizedMap(new HashMap());
  215. isServer = true;
  216. state = ESTABLISHED;
  217. }
  218. // Server-side convenience
  219. public SocketOrChannelConnectionImpl(ORB orb,
  220. Acceptor acceptor,
  221. Socket socket)
  222. {
  223. this(orb, acceptor, socket,
  224. (socket.getChannel() == null
  225. ? false
  226. : orb.getORBData().connectionSocketUseSelectThreadToWait()),
  227. (socket.getChannel() == null
  228. ? false
  229. : orb.getORBData().connectionSocketUseWorkerThreadForEvent()));
  230. }
  231. ////////////////////////////////////////////////////
  232. //
  233. // framework.transport.Connection
  234. //
  235. public boolean shouldRegisterReadEvent()
  236. {
  237. return true;
  238. }
  239. public boolean shouldRegisterServerReadEvent()
  240. {
  241. return true;
  242. }
  243. public boolean read()
  244. {
  245. try {
  246. if (orb.transportDebugFlag) {
  247. dprint(".read->: " + this);
  248. }
  249. CorbaMessageMediator messageMediator = readBits();
  250. if (messageMediator != null) {
  251. // Null can happen when client closes stream
  252. // causing purgecalls.
  253. return dispatch(messageMediator);
  254. }
  255. return true;
  256. } finally {
  257. if (orb.transportDebugFlag) {
  258. dprint(".read<-: " + this);
  259. }
  260. }
  261. }
  262. protected CorbaMessageMediator readBits()
  263. {
  264. try {
  265. if (orb.transportDebugFlag) {
  266. dprint(".readBits->: " + this);
  267. }
  268. MessageMediator messageMediator;
  269. // REVISIT - use common factory base class.
  270. if (contactInfo != null) {
  271. messageMediator =
  272. contactInfo.createMessageMediator(orb, this);
  273. } else if (acceptor != null) {
  274. messageMediator = acceptor.createMessageMediator(orb, this);
  275. } else {
  276. throw
  277. new RuntimeException("SocketOrChannelConnectionImpl.readBits");
  278. }
  279. return (CorbaMessageMediator) messageMediator;
  280. } catch (ThreadDeath td) {
  281. if (orb.transportDebugFlag) {
  282. dprint(".readBits: " + this + ": ThreadDeath: " + td, td);
  283. }
  284. try {
  285. purgeCalls(wrapper.connectionAbort(td), false, false);
  286. } catch (Throwable t) {
  287. if (orb.transportDebugFlag) {
  288. dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t);
  289. }
  290. }
  291. throw td;
  292. } catch (Throwable ex) {
  293. if (orb.transportDebugFlag) {
  294. dprint(".readBits: " + this + ": Throwable: " + ex, ex);
  295. }
  296. try {
  297. if (ex instanceof INTERNAL) {
  298. sendMessageError(GIOPVersion.DEFAULT_VERSION);
  299. }
  300. } catch (IOException e) {
  301. if (orb.transportDebugFlag) {
  302. dprint(".readBits: " + this +
  303. ": sendMessageError: IOException: " + e, e);
  304. }
  305. }
  306. // REVISIT - make sure reader thread is killed.
  307. orb.getTransportManager().getSelector(0).unregisterForEvent(this);
  308. // Notify anyone waiting.
  309. purgeCalls(wrapper.connectionAbort(ex), true, false);
  310. // REVISIT
  311. //keepRunning = false;
  312. // REVISIT - if this is called after purgeCalls then
  313. // the state of the socket is ABORT so the writeLock
  314. // in close throws an exception. It is ignored but
  315. // causes IBM (screen scraping) tests to fail.
  316. //close();
  317. } finally {
  318. if (orb.transportDebugFlag) {
  319. dprint(".readBits<-: " + this);
  320. }
  321. }
  322. return null;
  323. }
  324. protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator)
  325. {
  326. try {
  327. if (orb.transportDebugFlag) {
  328. dprint(".finishReadingBits->: " + this);
  329. }
  330. // REVISIT - use common factory base class.
  331. if (contactInfo != null) {
  332. messageMediator =
  333. contactInfo.finishCreatingMessageMediator(orb, this, messageMediator);
  334. } else if (acceptor != null) {
  335. messageMediator =
  336. acceptor.finishCreatingMessageMediator(orb, this, messageMediator);
  337. } else {
  338. throw
  339. new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits");
  340. }
  341. return (CorbaMessageMediator) messageMediator;
  342. } catch (ThreadDeath td) {
  343. if (orb.transportDebugFlag) {
  344. dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td);
  345. }
  346. try {
  347. purgeCalls(wrapper.connectionAbort(td), false, false);
  348. } catch (Throwable t) {
  349. if (orb.transportDebugFlag) {
  350. dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t);
  351. }
  352. }
  353. throw td;
  354. } catch (Throwable ex) {
  355. if (orb.transportDebugFlag) {
  356. dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex);
  357. }
  358. try {
  359. if (ex instanceof INTERNAL) {
  360. sendMessageError(GIOPVersion.DEFAULT_VERSION);
  361. }
  362. } catch (IOException e) {
  363. if (orb.transportDebugFlag) {
  364. dprint(".finishReadingBits: " + this +
  365. ": sendMessageError: IOException: " + e, e);
  366. }
  367. }
  368. // REVISIT - make sure reader thread is killed.
  369. orb.getTransportManager().getSelector(0).unregisterForEvent(this);
  370. // Notify anyone waiting.
  371. purgeCalls(wrapper.connectionAbort(ex), true, false);
  372. // REVISIT
  373. //keepRunning = false;
  374. // REVISIT - if this is called after purgeCalls then
  375. // the state of the socket is ABORT so the writeLock
  376. // in close throws an exception. It is ignored but
  377. // causes IBM (screen scraping) tests to fail.
  378. //close();
  379. } finally {
  380. if (orb.transportDebugFlag) {
  381. dprint(".finishReadingBits<-: " + this);
  382. }
  383. }
  384. return null;
  385. }
  386. protected boolean dispatch(CorbaMessageMediator messageMediator)
  387. {
  388. try {
  389. if (orb.transportDebugFlag) {
  390. dprint(".dispatch->: " + this);
  391. }
  392. //
  393. // NOTE:
  394. //
  395. // This call is the transition from the tranport block
  396. // to the protocol block.
  397. //
  398. boolean result =
  399. messageMediator.getProtocolHandler()
  400. .handleRequest(messageMediator);
  401. return result;
  402. } catch (ThreadDeath td) {
  403. if (orb.transportDebugFlag) {
  404. dprint(".dispatch: ThreadDeath", td );
  405. }
  406. try {
  407. purgeCalls(wrapper.connectionAbort(td), false, false);
  408. } catch (Throwable t) {
  409. if (orb.transportDebugFlag) {
  410. dprint(".dispatch: purgeCalls: Throwable", t);
  411. }
  412. }
  413. throw td;
  414. } catch (Throwable ex) {
  415. if (orb.transportDebugFlag) {
  416. dprint(".dispatch: Throwable", ex ) ;
  417. }
  418. try {
  419. if (ex instanceof INTERNAL) {
  420. sendMessageError(GIOPVersion.DEFAULT_VERSION);
  421. }
  422. } catch (IOException e) {
  423. if (orb.transportDebugFlag) {
  424. dprint(".dispatch: sendMessageError: IOException", e);
  425. }
  426. }
  427. purgeCalls(wrapper.connectionAbort(ex), false, false);
  428. // REVISIT
  429. //keepRunning = false;
  430. } finally {
  431. if (orb.transportDebugFlag) {
  432. dprint(".dispatch<-: " + this);
  433. }
  434. }
  435. return true;
  436. }
  437. public boolean shouldUseDirectByteBuffers()
  438. {
  439. return getSocketChannel() != null;
  440. }
  441. public ByteBuffer read(int size, int offset, int length, long max_wait_time)
  442. throws IOException
  443. {
  444. if (shouldUseDirectByteBuffers()) {
  445. ByteBuffer byteBuffer =
  446. orb.getByteBufferPool().getByteBuffer(size);
  447. if (orb.transportDebugFlag) {
  448. // print address of ByteBuffer gotten from pool
  449. int bbAddress = System.identityHashCode(byteBuffer);
  450. StringBuffer sb = new StringBuffer(80);
  451. sb.append(".read: got ByteBuffer id (");
  452. sb.append(bbAddress).append(") from ByteBufferPool.");
  453. String msgStr = sb.toString();
  454. dprint(msgStr);
  455. }
  456. byteBuffer.position(offset);
  457. byteBuffer.limit(size);
  458. readFully(byteBuffer, length, max_wait_time);
  459. return byteBuffer;
  460. }
  461. byte[] buf = new byte[size];
  462. readFully(getSocket().getInputStream(), buf,
  463. offset, length, max_wait_time);
  464. ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
  465. byteBuffer.limit(size);
  466. return byteBuffer;
  467. }
  468. public ByteBuffer read(ByteBuffer byteBuffer, int offset,
  469. int length, long max_wait_time)
  470. throws IOException
  471. {
  472. int size = offset + length;
  473. if (shouldUseDirectByteBuffers()) {
  474. if (! byteBuffer.isDirect()) {
  475. throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
  476. }
  477. if (size > byteBuffer.capacity()) {
  478. if (orb.transportDebugFlag) {
  479. // print address of ByteBuffer being released
  480. int bbAddress = System.identityHashCode(byteBuffer);
  481. StringBuffer bbsb = new StringBuffer(80);
  482. bbsb.append(".read: releasing ByteBuffer id (")
  483. .append(bbAddress).append(") to ByteBufferPool.");
  484. String bbmsg = bbsb.toString();
  485. dprint(bbmsg);
  486. }
  487. orb.getByteBufferPool().releaseByteBuffer(byteBuffer);
  488. byteBuffer = orb.getByteBufferPool().getByteBuffer(size);
  489. }
  490. byteBuffer.position(offset);
  491. byteBuffer.limit(size);
  492. readFully(byteBuffer, length, max_wait_time);
  493. byteBuffer.position(0);
  494. byteBuffer.limit(size);
  495. return byteBuffer;
  496. }
  497. if (byteBuffer.isDirect()) {
  498. throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
  499. }
  500. byte[] buf = new byte[size];
  501. readFully(getSocket().getInputStream(), buf,
  502. offset, length, max_wait_time);
  503. return ByteBuffer.wrap(buf);
  504. }
  505. public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time)
  506. throws IOException
  507. {
  508. int n = 0;
  509. int bytecount = 0;
  510. long time_to_wait = readTimeouts.get_initial_time_to_wait();
  511. long total_time_in_wait = 0;
  512. // The reading of data incorporates a strategy to detect a
  513. // rogue client. The strategy is implemented as follows. As
  514. // long as data is being read, at least 1 byte or more, we
  515. // assume we have a well behaved client. If no data is read,
  516. // then we sleep for a time to wait, re-calculate a new time to
  517. // wait which is lengthier than the previous time spent waiting.
  518. // Then, if the total time spent waiting does not exceed a
  519. // maximum time we are willing to wait, we attempt another
  520. // read. If the maximum amount of time we are willing to
  521. // spend waiting for more data is exceeded, we throw an
  522. // IOException.
  523. // NOTE: Reading of GIOP headers are treated with a smaller
  524. // maximum time to wait threshold. Based on extensive
  525. // performance testing, all GIOP headers are being
  526. // read in 1 read access.
  527. do {
  528. bytecount = getSocketChannel().read(byteBuffer);
  529. if (bytecount < 0) {
  530. throw new IOException("End-of-stream");
  531. }
  532. else if (bytecount == 0) {
  533. try {
  534. Thread.sleep(time_to_wait);
  535. total_time_in_wait += time_to_wait;
  536. time_to_wait =
  537. (long)(time_to_wait*readTimeouts.get_backoff_factor());
  538. }
  539. catch (InterruptedException ie) {
  540. // ignore exception
  541. if (orb.transportDebugFlag) {
  542. dprint("readFully(): unexpected exception "
  543. + ie.toString());
  544. }
  545. }
  546. }
  547. else {
  548. n += bytecount;
  549. }
  550. }
  551. while (n < size && total_time_in_wait < max_wait_time);
  552. if (n < size && total_time_in_wait >= max_wait_time)
  553. {
  554. // failed to read entire message
  555. throw wrapper.transportReadTimeoutExceeded(new Integer(size),
  556. new Integer(n), new Long(max_wait_time),
  557. new Long(total_time_in_wait));
  558. }
  559. getConnectionCache().stampTime(this);
  560. }
  561. // To support non-channel connections.
  562. public void readFully(java.io.InputStream is, byte[] buf,
  563. int offset, int size, long max_wait_time)
  564. throws IOException
  565. {
  566. int n = 0;
  567. int bytecount = 0;
  568. long time_to_wait = readTimeouts.get_initial_time_to_wait();
  569. long total_time_in_wait = 0;
  570. // The reading of data incorporates a strategy to detect a
  571. // rogue client. The strategy is implemented as follows. As
  572. // long as data is being read, at least 1 byte or more, we
  573. // assume we have a well behaved client. If no data is read,
  574. // then we sleep for a time to wait, re-calculate a new time to
  575. // wait which is lengthier than the previous time spent waiting.
  576. // Then, if the total time spent waiting does not exceed a
  577. // maximum time we are willing to wait, we attempt another
  578. // read. If the maximum amount of time we are willing to
  579. // spend waiting for more data is exceeded, we throw an
  580. // IOException.
  581. // NOTE: Reading of GIOP headers are treated with a smaller
  582. // maximum time to wait threshold. Based on extensive
  583. // performance testing, all GIOP headers are being
  584. // read in 1 read access.
  585. do {
  586. bytecount = is.read(buf, offset + n, size - n);
  587. if (bytecount < 0) {
  588. throw new IOException("End-of-stream");
  589. }
  590. else if (bytecount == 0) {
  591. try {
  592. Thread.sleep(time_to_wait);
  593. total_time_in_wait += time_to_wait;
  594. time_to_wait =
  595. (long)(time_to_wait*readTimeouts.get_backoff_factor());
  596. }
  597. catch (InterruptedException ie) {
  598. // ignore exception
  599. if (orb.transportDebugFlag) {
  600. dprint("readFully(): unexpected exception "
  601. + ie.toString());
  602. }
  603. }
  604. }
  605. else {
  606. n += bytecount;
  607. }
  608. }
  609. while (n < size && total_time_in_wait < max_wait_time);
  610. if (n < size && total_time_in_wait >= max_wait_time)
  611. {
  612. // failed to read entire message
  613. throw wrapper.transportReadTimeoutExceeded(new Integer(size),
  614. new Integer(n), new Long(max_wait_time),
  615. new Long(total_time_in_wait));
  616. }
  617. getConnectionCache().stampTime(this);
  618. }
  619. public void write(ByteBuffer byteBuffer)
  620. throws IOException
  621. {
  622. if (shouldUseDirectByteBuffers()) {
  623. /* NOTE: cannot perform this test. If one ask for a
  624. ByteBuffer from the pool which is bigger than the size
  625. of ByteBuffers managed by the pool, then the pool will
  626. return a HeapByteBuffer.
  627. if (byteBuffer.hasArray()) {
  628. throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
  629. }
  630. */
  631. // IMPORTANT: For non-blocking SocketChannels, there's no guarantee
  632. // all bytes are written on first write attempt.
  633. do {
  634. getSocketChannel().write(byteBuffer);
  635. }
  636. while (byteBuffer.hasRemaining());
  637. } else {
  638. if (! byteBuffer.hasArray()) {
  639. throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
  640. }
  641. byte[] tmpBuf = byteBuffer.array();
  642. getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit());
  643. getSocket().getOutputStream().flush();
  644. }
  645. // TimeStamp connection to indicate it has been used
  646. // Note granularity of connection usage is assumed for
  647. // now to be that of a IIOP packet.
  648. getConnectionCache().stampTime(this);
  649. }
  650. /**
  651. * Note:it is possible for this to be called more than once
  652. */
  653. public synchronized void close()
  654. {
  655. try {
  656. if (orb.transportDebugFlag) {
  657. dprint(".close->: " + this);
  658. }
  659. writeLock();
  660. // REVISIT It will be good to have a read lock on the reader thread
  661. // before we proceed further, to avoid the reader thread (server side)
  662. // from processing requests. This avoids the risk that a new request
  663. // will be accepted by ReaderThread while the ListenerThread is
  664. // attempting to close this connection.
  665. if (isBusy()) { // we are busy!
  666. writeUnlock();
  667. if (orb.transportDebugFlag) {
  668. dprint(".close: isBusy so no close: " + this);
  669. }
  670. return;
  671. }
  672. try {
  673. try {
  674. sendCloseConnection(GIOPVersion.V1_0);
  675. } catch (Throwable t) {
  676. wrapper.exceptionWhenSendingCloseConnection(t);
  677. }
  678. synchronized ( stateEvent ){
  679. state = CLOSE_SENT;
  680. stateEvent.notifyAll();
  681. }
  682. // stop the reader without causing it to do purgeCalls
  683. //Exception ex = new Exception();
  684. //reader.stop(ex); // REVISIT
  685. // NOTE: !!!!!!
  686. // This does writeUnlock().
  687. purgeCalls(wrapper.connectionRebind(), false, true);
  688. } catch (Exception ex) {
  689. if (orb.transportDebugFlag) {
  690. dprint(".close: exception: " + this, ex);
  691. }
  692. }
  693. try {
  694. Selector selector = orb.getTransportManager().getSelector(0);
  695. selector.unregisterForEvent(this);
  696. if (socketChannel != null) {
  697. socketChannel.close();
  698. }
  699. socket.close();
  700. } catch (IOException e) {
  701. if (orb.transportDebugFlag) {
  702. dprint(".close: " + this, e);
  703. }
  704. }
  705. } finally {
  706. if (orb.transportDebugFlag) {
  707. dprint(".close<-: " + this);
  708. }
  709. }
  710. }
  711. public Acceptor getAcceptor()
  712. {
  713. return acceptor;
  714. }
  715. public ContactInfo getContactInfo()
  716. {
  717. return contactInfo;
  718. }
  719. public EventHandler getEventHandler()
  720. {
  721. return this;
  722. }
  723. public OutputObject createOutputObject(MessageMediator messageMediator)
  724. {
  725. // REVISIT - remove this method from Connection and all it subclasses.
  726. throw new RuntimeException("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called.");
  727. }
  728. // This is used by the GIOPOutputObject in order to
  729. // throw the correct error when handling code sets.
  730. // Can we determine if we are on the server side by
  731. // other means? XREVISIT
  732. public boolean isServer()
  733. {
  734. return isServer;
  735. }
  736. public boolean isBusy()
  737. {
  738. if (serverRequestCount > 0 ||
  739. getResponseWaitingRoom().numberRegistered() > 0)
  740. {
  741. return true;
  742. } else {
  743. return false;
  744. }
  745. }
  746. public long getTimeStamp()
  747. {
  748. return timeStamp;
  749. }
  750. public void setTimeStamp(long time)
  751. {
  752. timeStamp = time;
  753. }
  754. public void setState(String stateString)
  755. {
  756. synchronized (stateEvent) {
  757. if (stateString.equals("ESTABLISHED")) {
  758. state = ESTABLISHED;
  759. stateEvent.notifyAll();
  760. } else {
  761. // REVISIT: ASSERT
  762. }
  763. }
  764. }
  765. /**
  766. * Sets the writeLock for this connection.
  767. * If the writeLock is already set by someone else, block till the
  768. * writeLock is released and can set by us.
  769. * IMPORTANT: this connection's lock must be acquired before
  770. * setting the writeLock and must be unlocked after setting the writeLock.
  771. */
  772. public void writeLock()
  773. {
  774. try {
  775. if (dprintWriteLocks && orb.transportDebugFlag) {
  776. dprint(".writeLock->: " + this);
  777. }
  778. // Keep looping till we can set the writeLock.
  779. while ( true ) {
  780. int localState = state;
  781. switch ( localState ) {
  782. case OPENING:
  783. synchronized (stateEvent) {
  784. if (state != OPENING) {
  785. // somebody has changed 'state' so be careful
  786. break;
  787. }
  788. try {
  789. stateEvent.wait();
  790. } catch (InterruptedException ie) {
  791. if (orb.transportDebugFlag) {
  792. dprint(".writeLock: OPENING InterruptedException: " + this);
  793. }
  794. }
  795. }
  796. // Loop back
  797. break;
  798. case ESTABLISHED:
  799. synchronized (writeEvent) {
  800. if (!writeLocked) {
  801. writeLocked = true;
  802. return;
  803. }
  804. try {
  805. // do not stay here too long if state != ESTABLISHED
  806. // Bug 4752117
  807. while (state == ESTABLISHED && writeLocked) {
  808. writeEvent.wait(100);
  809. }
  810. } catch (InterruptedException ie) {
  811. if (orb.transportDebugFlag) {
  812. dprint(".writeLock: ESTABLISHED InterruptedException: " + this);
  813. }
  814. }
  815. }
  816. // Loop back
  817. break;
  818. //
  819. // XXX
  820. // Need to distinguish between client and server roles
  821. // here probably.
  822. //
  823. case ABORT:
  824. synchronized ( stateEvent ){
  825. if (state != ABORT) {
  826. break;
  827. }
  828. throw wrapper.writeErrorSend() ;
  829. }
  830. case CLOSE_RECVD:
  831. // the connection has been closed or closing
  832. // ==> throw rebind exception
  833. synchronized ( stateEvent ){
  834. if (state != CLOSE_RECVD) {
  835. break;
  836. }
  837. throw wrapper.connectionCloseRebind() ;
  838. }
  839. default:
  840. if (orb.transportDebugFlag) {
  841. dprint(".writeLock: default: " + this);
  842. }
  843. // REVISIT
  844. throw new RuntimeException(".writeLock: bad state");
  845. }
  846. }
  847. } finally {
  848. if (dprintWriteLocks && orb.transportDebugFlag) {
  849. dprint(".writeLock<-: " + this);
  850. }
  851. }
  852. }
  853. public void writeUnlock()
  854. {
  855. try {
  856. if (dprintWriteLocks && orb.transportDebugFlag) {
  857. dprint(".writeUnlock->: " + this);
  858. }
  859. synchronized (writeEvent) {
  860. writeLocked = false;
  861. writeEvent.notify(); // wake up one guy waiting to write
  862. }
  863. } finally {
  864. if (dprintWriteLocks && orb.transportDebugFlag) {
  865. dprint(".writeUnlock<-: " + this);
  866. }
  867. }
  868. }
  869. // Assumes the caller handles writeLock and writeUnlock
  870. public void sendWithoutLock(OutputObject outputObject)
  871. {
  872. // Don't we need to check for CloseConnection
  873. // here? REVISIT
  874. // XREVISIT - Shouldn't the MessageMediator
  875. // be the one to handle writing the data here?
  876. try {
  877. // Write the fragment/message
  878. CDROutputObject cdrOutputObject = (CDROutputObject) outputObject;
  879. cdrOutputObject.writeTo(this);
  880. // REVISIT - no flush?
  881. //socket.getOutputStream().flush();
  882. } catch (IOException e1) {
  883. /*
  884. * ADDED(Ram J) 10/13/2000 In the event of an IOException, try
  885. * sending a CancelRequest for regular requests / locate requests
  886. */
  887. // Since IIOPOutputStream's msgheader is set only once, and not
  888. // altered during sending multiple fragments, the original
  889. // msgheader will always have the requestId.
  890. // REVISIT This could be optimized to send a CancelRequest only
  891. // if any fragments had been sent already.
  892. /* REVISIT: MOVE TO SUBCONTRACT
  893. Message msg = os.getMessage();
  894. if (msg.getType() == Message.GIOPRequest ||
  895. msg.getType() == Message.GIOPLocateRequest) {
  896. GIOPVersion requestVersion = msg.getGIOPVersion();
  897. int requestId = MessageBase.getRequestId(msg);
  898. try {
  899. sendCancelRequest(requestVersion, requestId);
  900. } catch (IOException e2) {
  901. // most likely an abortive connection closure.
  902. // ignore, since nothing more can be done.
  903. if (orb.transportDebugFlag) {
  904. }
  905. }
  906. */
  907. // REVISIT When a send failure happens, purgeCalls() need to be
  908. // called to ensure that the connection is properly removed from
  909. // further usage (ie., cancelling pending requests with COMM_FAILURE
  910. // with an appropriate minor_code CompletionStatus.MAY_BE).
  911. // Relying on the IIOPOutputStream (as noted below) is not
  912. // sufficient as it handles COMM_FAILURE only for the final
  913. // fragment (during invoke processing). Note that COMM_FAILURE could
  914. // happen while sending the initial fragments.
  915. // Also the IIOPOutputStream does not properly close the connection.
  916. // It simply removes the connection from the table. An orderly
  917. // closure is needed (ie., cancel pending requests on the connection
  918. // COMM_FAILURE as well.
  919. // IIOPOutputStream will cleanup the connection info when it
  920. // sees this exception.
  921. throw wrapper.writeErrorSend(e1) ;
  922. }
  923. }
  924. public void registerWaiter(MessageMediator messageMediator)
  925. {
  926. responseWaitingRoom.registerWaiter(messageMediator);
  927. }
  928. public void unregisterWaiter(MessageMediator messageMediator)
  929. {
  930. responseWaitingRoom.unregisterWaiter(messageMediator);
  931. }
  932. public InputObject waitForResponse(MessageMediator messageMediator)
  933. {
  934. return responseWaitingRoom.waitForResponse(messageMediator);
  935. }
  936. public void setConnectionCache(ConnectionCache connectionCache)
  937. {
  938. this.connectionCache = connectionCache;
  939. }
  940. public ConnectionCache getConnectionCache()
  941. {
  942. return connectionCache;
  943. }
  944. ////////////////////////////////////////////////////
  945. //
  946. // EventHandler methods
  947. //
  948. public void setUseSelectThreadToWait(boolean x)
  949. {
  950. useSelectThreadToWait = x;
  951. // REVISIT - Reading of a GIOP header only is information
  952. // that should be passed into the constructor
  953. // from the SocketOrChannelConnection factory.
  954. setReadGiopHeaderOnly(shouldUseSelectThreadToWait());
  955. }
  956. public void handleEvent()
  957. {
  958. if (orb.transportDebugFlag) {
  959. dprint(".handleEvent->: " + this);
  960. }
  961. getSelectionKey().interestOps(getSelectionKey().interestOps() &
  962. (~ getInterestOps()));
  963. if (shouldUseWorkerThreadForEvent()) {
  964. Throwable throwable = null;
  965. try {
  966. int poolToUse = 0;
  967. if (shouldReadGiopHeaderOnly()) {
  968. partialMessageMediator = readBits();
  969. poolToUse =
  970. partialMessageMediator.getThreadPoolToUse();
  971. }
  972. if (orb.transportDebugFlag) {
  973. dprint(".handleEvent: addWork to pool: " + poolToUse);
  974. }
  975. orb.getThreadPoolManager().getThreadPool(poolToUse)
  976. .getWorkQueue(0).addWork(getWork());
  977. } catch (NoSuchThreadPoolException e) {
  978. throwable = e;
  979. } catch (NoSuchWorkQueueException e) {
  980. throwable = e;
  981. }
  982. // REVISIT: need to close connection.
  983. if (throwable != null) {
  984. if (orb.transportDebugFlag) {
  985. dprint(".handleEvent: " + throwable);
  986. }
  987. INTERNAL i = new INTERNAL("NoSuchThreadPoolException");
  988. i.initCause(throwable);
  989. throw i;
  990. }
  991. } else {
  992. if (orb.transportDebugFlag) {
  993. dprint(".handleEvent: doWork");
  994. }
  995. getWork().doWork();
  996. }
  997. if (orb.transportDebugFlag) {
  998. dprint(".handleEvent<-: " + this);
  999. }
  1000. }
  1001. public SelectableChannel getChannel()
  1002. {
  1003. return socketChannel;
  1004. }
  1005. public int getInterestOps()
  1006. {
  1007. return SelectionKey.OP_READ;
  1008. }
  1009. // public Acceptor getAcceptor() - already defined above.
  1010. public Connection getConnection()
  1011. {
  1012. return this;
  1013. }
  1014. ////////////////////////////////////////////////////
  1015. //
  1016. // Work methods.
  1017. //
  1018. public String getName()
  1019. {
  1020. return this.toString();
  1021. }
  1022. public void doWork()
  1023. {
  1024. try {
  1025. if (orb.transportDebugFlag) {
  1026. dprint(".doWork->: " + this);
  1027. }
  1028. // IMPORTANT: Sanity checks on SelectionKeys such as
  1029. // SelectorKey.isValid() should not be done
  1030. // here.
  1031. //
  1032. if (!shouldReadGiopHeaderOnly()) {
  1033. read();
  1034. }
  1035. else {
  1036. // get the partialMessageMediator
  1037. // created by SelectorThread
  1038. CorbaMessageMediator messageMediator =
  1039. this.getPartialMessageMediator();
  1040. // read remaining info needed in a MessageMediator
  1041. messageMediator = finishReadingBits(messageMediator);
  1042. if (messageMediator != null) {
  1043. // Null can happen when client closes stream
  1044. // causing purgecalls.
  1045. dispatch(messageMediator);
  1046. }
  1047. }
  1048. } catch (Throwable t) {
  1049. if (orb.transportDebugFlag) {
  1050. dprint(".doWork: ignoring Throwable: "
  1051. + t
  1052. + " " + this);
  1053. }
  1054. } finally {
  1055. if (orb.transportDebugFlag) {
  1056. dprint(".doWork<-: " + this);
  1057. }
  1058. }
  1059. }
  1060. public void setEnqueueTime(long timeInMillis)
  1061. {
  1062. enqueueTime = timeInMillis;
  1063. }
  1064. public long getEnqueueTime()
  1065. {
  1066. return enqueueTime;
  1067. }
  1068. ////////////////////////////////////////////////////
  1069. //
  1070. // spi.transport.CorbaConnection.
  1071. //
  1072. // IMPORTANT: Reader Threads must NOT read Giop header only.
  1073. public boolean shouldReadGiopHeaderOnly() {
  1074. return shouldReadGiopHeaderOnly;
  1075. }
  1076. protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) {
  1077. shouldReadGiopHeaderOnly = shouldReadHeaderOnly;
  1078. }
  1079. public ResponseWaitingRoom getResponseWaitingRoom()
  1080. {
  1081. return responseWaitingRoom;
  1082. }
  1083. // REVISIT - inteface defines isServer but already defined in
  1084. // higher interface.
  1085. public void serverRequestMapPut(int requestId,
  1086. CorbaMessageMediator messageMediator)
  1087. {
  1088. serverRequestMap.put(new Integer(requestId), messageMediator);
  1089. }
  1090. public CorbaMessageMediator serverRequestMapGet(int requestId)
  1091. {
  1092. return (CorbaMessageMediator)
  1093. serverRequestMap.get(new Integer(requestId));
  1094. }
  1095. public void serverRequestMapRemove(int requestId)
  1096. {
  1097. serverRequestMap.remove(new Integer(requestId));
  1098. }
  1099. // REVISIT: this is also defined in:
  1100. // com.sun.corba.se.spi.legacy.connection.Connection
  1101. public java.net.Socket getSocket()
  1102. {
  1103. return socket;
  1104. }
  1105. /** It is possible for a Close Connection to have been
  1106. ** sent here, but we will not check for this. A "lazy"
  1107. ** Exception will be thrown in the Worker thread after the
  1108. ** incoming request has been processed even though the connection
  1109. ** is closed before the request is processed. This is o.k because
  1110. ** it is a boundary condition. To prevent it we would have to add
  1111. ** more locks which would reduce performance in the normal case.
  1112. **/
  1113. public synchronized void serverRequestProcessingBegins()
  1114. {
  1115. serverRequestCount++;
  1116. }
  1117. public synchronized void serverRequestProcessingEnds()
  1118. {
  1119. serverRequestCount--;
  1120. }
  1121. //
  1122. //
  1123. //
  1124. public synchronized int getNextRequestId()
  1125. {
  1126. return requestId++;
  1127. }
  1128. // Negotiated code sets for char and wchar data
  1129. protected CodeSetComponentInfo.CodeSetContext codeSetContext = null;
  1130. public ORB getBroker()
  1131. {
  1132. return orb;
  1133. }
  1134. public CodeSetComponentInfo.CodeSetContext getCodeSetContext() {
  1135. // Needs to be synchronized for the following case when the client
  1136. // doesn't send the code set context twice, and we have two threads
  1137. // in ServerRequestDispatcher processCodeSetContext.
  1138. //
  1139. // Thread A checks to see if there is a context, there is none, so
  1140. // it calls setCodeSetContext, getting the synch lock.
  1141. // Thread B checks to see if there is a context. If we didn't synch,
  1142. // it might decide to outlaw wchar/wstring.
  1143. if (codeSetContext == null) {
  1144. synchronized(this) {
  1145. return codeSetContext;
  1146. }
  1147. }
  1148. return codeSetContext;
  1149. }
  1150. public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) {
  1151. // Double check whether or not we need to do this
  1152. if (codeSetContext == null) {
  1153. if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null ||
  1154. OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) {
  1155. // If the client says it's negotiated a code set that
  1156. // isn't a fallback and we never said we support, then
  1157. // it has a bug.
  1158. throw wrapper.badCodesetsFromClient() ;
  1159. }
  1160. codeSetContext = csc;
  1161. }
  1162. }
  1163. //
  1164. // from iiop.IIOPConnection.java
  1165. //
  1166. // Map request ID to an InputObject.
  1167. // This is so the client thread can start unmarshaling
  1168. // the reply and remove it from the out_calls map while the
  1169. // ReaderThread can still obtain the input stream to give
  1170. // new fragments. Only the ReaderThread touches the clientReplyMap,
  1171. // so it doesn't incur synchronization overhead.
  1172. public MessageMediator clientRequestMapGet(int requestId)
  1173. {
  1174. return responseWaitingRoom.getMessageMediator(requestId);
  1175. }
  1176. protected MessageMediator clientReply_1_1;
  1177. public void clientReply_1_1_Put(MessageMediator x)
  1178. {
  1179. clientReply_1_1 = x;
  1180. }
  1181. public MessageMediator clientReply_1_1_Get()
  1182. {
  1183. return clientReply_1_1;
  1184. }
  1185. public void clientReply_1_1_Remove()
  1186. {
  1187. clientReply_1_1 = null;
  1188. }
  1189. protected MessageMediator serverRequest_1_1;
  1190. public void serverRequest_1_1_Put(MessageMediator x)
  1191. {
  1192. serverRequest_1_1 = x;
  1193. }
  1194. public MessageMediator serverRequest_1_1_Get()
  1195. {
  1196. return serverRequest_1_1;
  1197. }
  1198. public void serverRequest_1_1_Remove()
  1199. {
  1200. serverRequest_1_1 = null;
  1201. }
  1202. protected String getStateString( int state )
  1203. {
  1204. synchronized ( stateEvent ){
  1205. switch (state) {
  1206. case OPENING : return "OPENING" ;
  1207. case ESTABLISHED : return "ESTABLISHED" ;
  1208. case CLOSE_SENT : return "CLOSE_SENT" ;
  1209. case CLOSE_RECVD : return "CLOSE_RECVD" ;
  1210. case ABORT : return "ABORT" ;
  1211. default : return "???" ;
  1212. }
  1213. }
  1214. }
  1215. public synchronized boolean isPostInitialContexts() {
  1216. return postInitialContexts;
  1217. }
  1218. // Can never be unset...
  1219. public synchronized void setPostInitialContexts(){
  1220. postInitialContexts = true;
  1221. }
  1222. /**
  1223. * Wake up the outstanding requests on the connection, and hand them
  1224. * COMM_FAILURE exception with a given minor code.
  1225. *
  1226. * Also, delete connection from connection table and
  1227. * stop the reader thread.
  1228. * Note that this should only ever be called by the Reader thread for
  1229. * this connection.
  1230. *
  1231. * @param minor_code The minor code for the COMM_FAILURE major code.
  1232. * @param die Kill the reader thread (this thread) before exiting.
  1233. */
  1234. public void purgeCalls(SystemException systemException,
  1235. boolean die, boolean lockHeld)
  1236. {
  1237. int minor_code = systemException.minor;
  1238. try{
  1239. if (orb.transportDebugFlag) {
  1240. dprint(".purgeCalls->: "
  1241. + minor_code + "/" + die + "/" + lockHeld
  1242. + " " + this);
  1243. }
  1244. // If this invocation is a result of ThreadDeath caused
  1245. // by a previous execution of this routine, just exit.
  1246. synchronized ( stateEvent ){
  1247. if ((state == ABORT) || (state == CLOSE_RECVD)) {
  1248. if (orb.transportDebugFlag) {
  1249. dprint(".purgeCalls: exiting since state is: "
  1250. + getStateString(state)
  1251. + " " + this);
  1252. }
  1253. return;
  1254. }
  1255. }
  1256. // Grab the writeLock (freeze the calls)
  1257. try {
  1258. if (!lockHeld) {
  1259. writeLock();
  1260. }
  1261. } catch (SystemException ex) {
  1262. if (orb.transportDebugFlag)
  1263. dprint(".purgeCalls: SystemException" + ex
  1264. + "; continuing " + this);
  1265. }
  1266. // Mark the state of the connection
  1267. // and determine the request status
  1268. org.omg.CORBA.CompletionStatus completion_status;
  1269. synchronized ( stateEvent ){
  1270. if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) {
  1271. state = CLOSE_RECVD;
  1272. systemException.completed = CompletionStatus.COMPLETED_NO;
  1273. } else {
  1274. state = ABORT;
  1275. systemException.completed = CompletionStatus.COMPLETED_MAYBE;
  1276. }
  1277. stateEvent.notifyAll();
  1278. }
  1279. try {
  1280. socket.getInputStream().close();
  1281. socket.getOutputStream().close();
  1282. socket.close();
  1283. } catch (Exception ex) {
  1284. if (orb.transportDebugFlag) {
  1285. dprint(".purgeCalls: Exception closing socket: " + ex
  1286. + " " + this);
  1287. }
  1288. }
  1289. // Signal all threads with outstanding requests on this
  1290. // connection and give them the SystemException;
  1291. responseWaitingRoom.signalExceptionToAllWaiters(systemException);
  1292. if (contactInfo != null) {
  1293. ((OutboundConnectionCache)getConnectionCache()).remove(contactInfo);
  1294. } else if (acceptor != null) {
  1295. ((InboundConnectionCache)getConnectionCache()).remove(this);
  1296. }
  1297. //
  1298. // REVISIT: Stop the reader thread
  1299. //
  1300. // Signal all the waiters of the writeLock.
  1301. // There are 4 types of writeLock waiters:
  1302. // 1. Send waiters:
  1303. // 2. SendReply waiters:
  1304. // 3. cleanUp waiters:
  1305. // 4. purge_call waiters:
  1306. //
  1307. writeUnlock();
  1308. } finally {
  1309. if (orb.transportDebugFlag) {
  1310. dprint(".purgeCalls<-: "
  1311. + minor_code + "/" + die + "/" + lockHeld
  1312. + " " + this);
  1313. }
  1314. }
  1315. }
  1316. /*************************************************************************
  1317. * The following methods are for dealing with Connection cleaning for
  1318. * better scalability of servers in high network load conditions.
  1319. **************************************************************************/
  1320. public void sendCloseConnection(GIOPVersion giopVersion)
  1321. throws IOException
  1322. {
  1323. Message msg = MessageBase.createCloseConnection(giopVersion);
  1324. sendHelper(giopVersion, msg);
  1325. }
  1326. public void sendMessageError(GIOPVersion giopVersion)
  1327. throws IOException
  1328. {
  1329. Message msg = MessageBase.createMessageError(giopVersion);
  1330. sendHelper(giopVersion, msg);
  1331. }
  1332. /**
  1333. * Send a CancelRequest message. This does not lock the connection, so the
  1334. * caller needs to ensure this method is called appropriately.
  1335. * @exception IOException - could be due to abortive connection closure.
  1336. */
  1337. public void sendCancelRequest(GIOPVersion giopVersion, int requestId)
  1338. throws IOException
  1339. {
  1340. Message msg = MessageBase.createCancelRequest(giopVersion, requestId);
  1341. sendHelper(giopVersion, msg);
  1342. }
  1343. protected void sendHelper(GIOPVersion giopVersion, Message msg)
  1344. throws IOException
  1345. {
  1346. // REVISIT: See comments in CDROutputObject constructor.
  1347. CDROutputObject outputObject =
  1348. new CDROutputObject((ORB)orb, null, giopVersion, this, msg,
  1349. ORBConstants.STREAM_FORMAT_VERSION_1);
  1350. msg.write(outputObject);
  1351. outputObject.writeTo(this);
  1352. }
  1353. public void sendCancelRequestWithLock(GIOPVersion giopVersion,
  1354. int requestId)
  1355. throws IOException
  1356. {
  1357. writeLock();
  1358. try {
  1359. sendCancelRequest(giopVersion, requestId);
  1360. } finally {
  1361. writeUnlock();
  1362. }
  1363. }
  1364. // Begin Code Base methods ---------------------------------------
  1365. //
  1366. // Set this connection's code base IOR. The IOR comes from the
  1367. // SendingContext. This is an optional service context, but all
  1368. // JavaSoft ORBs send it.
  1369. //
  1370. // The set and get methods don't need to be synchronized since the
  1371. // first possible get would occur during reading a valuetype, and
  1372. // that would be after the set.
  1373. // Sets this connection's code base IOR. This is done after
  1374. // getting the IOR out of the SendingContext service context.
  1375. // Our ORBs always send this, but it's optional in CORBA.
  1376. public final void setCodeBaseIOR(IOR ior) {
  1377. codeBaseServerIOR = ior;
  1378. }
  1379. public final IOR getCodeBaseIOR() {
  1380. return codeBaseServerIOR;
  1381. }
  1382. // Get a CodeBase stub to use in unmarshaling. The CachedCodeBase
  1383. // won't connect to the remote codebase unless it's necessary.
  1384. public final CodeBase getCodeBase() {
  1385. return cachedCodeBase;
  1386. }
  1387. // End Code Base methods -----------------------------------------
  1388. // set transport read thresholds
  1389. protected void setReadTimeouts(ReadTimeouts readTimeouts) {
  1390. this.readTimeouts = readTimeouts;
  1391. }
  1392. protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) {
  1393. partialMessageMediator = messageMediator;
  1394. }
  1395. protected CorbaMessageMediator getPartialMessageMediator() {
  1396. return partialMessageMediator;
  1397. }
  1398. public String toString()
  1399. {
  1400. synchronized ( stateEvent ){
  1401. return
  1402. "SocketOrChannelConnectionImpl[" + " "
  1403. + (socketChannel == null ?
  1404. socket.toString() : socketChannel.toString()) + " "
  1405. + getStateString( state ) + " "
  1406. + shouldUseSelectThreadToWait() + " "
  1407. + shouldUseWorkerThreadForEvent() + " "
  1408. + shouldReadGiopHeaderOnly()
  1409. + "]" ;
  1410. }
  1411. }
  1412. // Must be public - used in encoding.
  1413. public void dprint(String msg)
  1414. {
  1415. ORBUtility.dprint("SocketOrChannelConnectionImpl", msg);
  1416. }
  1417. protected void dprint(String msg, Throwable t)
  1418. {
  1419. dprint(msg);
  1420. t.printStackTrace(System.out);
  1421. }
  1422. }
  1423. // End of file.