1. /*
  2. * @(#)CorbaMessageMediatorImpl.java 1.98 04/06/21
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package com.sun.corba.se.impl.protocol;
  8. import java.io.ByteArrayOutputStream;
  9. import java.io.IOException;
  10. import java.io.PrintWriter;
  11. import java.nio.ByteBuffer;
  12. import java.nio.channels.SelectionKey;
  13. import java.util.EmptyStackException;
  14. import java.util.Iterator;
  15. import org.omg.CORBA.Any;
  16. import org.omg.CORBA.CompletionStatus;
  17. import org.omg.CORBA.ExceptionList;
  18. import org.omg.CORBA.INTERNAL;
  19. import org.omg.CORBA.Principal;
  20. import org.omg.CORBA.SystemException;
  21. import org.omg.CORBA.TypeCode;
  22. import org.omg.CORBA.UnknownUserException;
  23. import org.omg.CORBA.UNKNOWN;
  24. import org.omg.CORBA.portable.ResponseHandler;
  25. import org.omg.CORBA.portable.UnknownException;
  26. import org.omg.CORBA_2_3.portable.InputStream;
  27. import org.omg.CORBA_2_3.portable.OutputStream;
  28. import org.omg.IOP.ExceptionDetailMessage;
  29. import org.omg.IOP.TAG_RMI_CUSTOM_MAX_STREAM_FORMAT;
  30. import com.sun.corba.se.pept.broker.Broker;
  31. import com.sun.corba.se.pept.encoding.InputObject;
  32. import com.sun.corba.se.pept.encoding.OutputObject;
  33. import com.sun.corba.se.pept.protocol.MessageMediator;
  34. import com.sun.corba.se.pept.protocol.ProtocolHandler;
  35. import com.sun.corba.se.pept.transport.ByteBufferPool;
  36. import com.sun.corba.se.pept.transport.Connection;
  37. import com.sun.corba.se.pept.transport.ContactInfo;
  38. import com.sun.corba.se.pept.transport.EventHandler;
  39. import com.sun.corba.se.spi.ior.IOR;
  40. import com.sun.corba.se.spi.ior.ObjectKey;
  41. import com.sun.corba.se.spi.ior.ObjectKeyTemplate;
  42. import com.sun.corba.se.spi.ior.iiop.GIOPVersion;
  43. import com.sun.corba.se.spi.ior.iiop.IIOPProfileTemplate;
  44. import com.sun.corba.se.spi.ior.iiop.IIOPProfile;
  45. import com.sun.corba.se.spi.ior.iiop.MaxStreamFormatVersionComponent;
  46. import com.sun.corba.se.spi.oa.OAInvocationInfo;
  47. import com.sun.corba.se.spi.oa.ObjectAdapter;
  48. import com.sun.corba.se.spi.orb.ORB;
  49. import com.sun.corba.se.spi.orb.ORBVersionFactory;
  50. import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
  51. import com.sun.corba.se.spi.protocol.CorbaProtocolHandler;
  52. import com.sun.corba.se.spi.protocol.CorbaServerRequestDispatcher;
  53. import com.sun.corba.se.spi.protocol.ForwardException;
  54. import com.sun.corba.se.spi.transport.CorbaConnection;
  55. import com.sun.corba.se.spi.transport.CorbaContactInfo;
  56. import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
  57. import com.sun.corba.se.spi.logging.CORBALogDomains;
  58. import com.sun.corba.se.spi.servicecontext.ORBVersionServiceContext;
  59. import com.sun.corba.se.spi.servicecontext.ServiceContexts;
  60. import com.sun.corba.se.spi.servicecontext.UEInfoServiceContext;
  61. import com.sun.corba.se.spi.servicecontext.MaxStreamFormatVersionServiceContext;
  62. import com.sun.corba.se.spi.servicecontext.SendingContextServiceContext;
  63. import com.sun.corba.se.spi.servicecontext.UnknownServiceContext;
  64. import com.sun.corba.se.impl.corba.RequestImpl;
  65. import com.sun.corba.se.impl.encoding.BufferManagerFactory;
  66. import com.sun.corba.se.impl.encoding.BufferManagerReadStream;
  67. import com.sun.corba.se.impl.encoding.CDRInputObject;
  68. import com.sun.corba.se.impl.encoding.CDROutputObject;
  69. import com.sun.corba.se.impl.encoding.EncapsOutputStream;
  70. import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  71. import com.sun.corba.se.impl.logging.InterceptorsSystemException;
  72. import com.sun.corba.se.impl.orbutil.ORBConstants;
  73. import com.sun.corba.se.impl.orbutil.ORBUtility;
  74. import com.sun.corba.se.impl.ior.iiop.JavaSerializationComponent;
  75. import com.sun.corba.se.impl.protocol.AddressingDispositionException;
  76. import com.sun.corba.se.impl.protocol.RequestCanceledException;
  77. import com.sun.corba.se.impl.protocol.giopmsgheaders.AddressingDispositionHelper;
  78. import com.sun.corba.se.impl.protocol.giopmsgheaders.CancelRequestMessage;
  79. import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage_1_1;
  80. import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage_1_2;
  81. import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage;
  82. import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage_1_0;
  83. import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage_1_1;
  84. import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateRequestMessage_1_2;
  85. import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage;
  86. import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage;
  87. import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage_1_0;
  88. import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage_1_1;
  89. import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyMessage_1_2;
  90. import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
  91. import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;
  92. import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageHandler;
  93. import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage;
  94. import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage_1_0;
  95. import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage_1_1;
  96. import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage_1_2;
  97. import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage;
  98. import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage_1_0 ;
  99. import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage_1_1 ;
  100. import com.sun.corba.se.impl.protocol.giopmsgheaders.RequestMessage_1_2 ;
  101. // REVISIT: make sure no memory leaks in client/server request/reply maps.
  102. // REVISIT: normalize requestHeader, replyHeader, messageHeader.
  103. /**
  104. * @author Harold Carr
  105. */
  106. public class CorbaMessageMediatorImpl
  107. implements
  108. CorbaMessageMediator,
  109. CorbaProtocolHandler,
  110. MessageHandler
  111. {
  112. protected ORB orb;
  113. protected ORBUtilSystemException wrapper ;
  114. protected InterceptorsSystemException interceptorWrapper ;
  115. protected CorbaContactInfo contactInfo;
  116. protected CorbaConnection connection;
  117. protected short addrDisposition;
  118. protected CDROutputObject outputObject;
  119. protected CDRInputObject inputObject;
  120. protected Message messageHeader;
  121. protected RequestMessage requestHeader;
  122. protected LocateReplyOrReplyMessage replyHeader;
  123. protected String replyExceptionDetailMessage;
  124. protected IOR replyIOR;
  125. protected Integer requestIdInteger;
  126. protected Message dispatchHeader;
  127. protected ByteBuffer dispatchByteBuffer;
  128. protected byte streamFormatVersion;
  129. protected boolean streamFormatVersionSet = false;
  130. protected org.omg.CORBA.Request diiRequest;
  131. protected boolean cancelRequestAlreadySent = false;
  132. protected ProtocolHandler protocolHandler;
  133. protected boolean _executeReturnServantInResponseConstructor = false;
  134. protected boolean _executeRemoveThreadInfoInResponseConstructor = false;
  135. protected boolean _executePIInResponseConstructor = false;
  136. //
  137. // Client-side constructor.
  138. //
  139. public CorbaMessageMediatorImpl(ORB orb,
  140. ContactInfo contactInfo,
  141. Connection connection,
  142. GIOPVersion giopVersion,
  143. IOR ior,
  144. int requestId,
  145. short addrDisposition,
  146. String operationName,
  147. boolean isOneWay)
  148. {
  149. this( orb, connection ) ;
  150. this.contactInfo = (CorbaContactInfo) contactInfo;
  151. this.addrDisposition = addrDisposition;
  152. streamFormatVersion =
  153. getStreamFormatVersionForThisRequest(
  154. ((CorbaContactInfo)this.contactInfo).getEffectiveTargetIOR(),
  155. giopVersion);
  156. streamFormatVersionSet = true;
  157. requestHeader = (RequestMessage) MessageBase.createRequest(
  158. this.orb,
  159. giopVersion,
  160. ORBUtility.getEncodingVersion(orb, ior),
  161. requestId,
  162. !isOneWay,
  163. ((CorbaContactInfo)this.contactInfo).getEffectiveTargetIOR(),
  164. this.addrDisposition,
  165. operationName,
  166. new ServiceContexts(orb),
  167. null);
  168. }
  169. //
  170. // Acceptor constructor.
  171. //
  172. public CorbaMessageMediatorImpl(ORB orb,
  173. Connection connection)
  174. {
  175. this.orb = orb;
  176. this.connection = (CorbaConnection)connection;
  177. this.wrapper = ORBUtilSystemException.get( orb,
  178. CORBALogDomains.RPC_PROTOCOL ) ;
  179. this.interceptorWrapper = InterceptorsSystemException.get( orb,
  180. CORBALogDomains.RPC_PROTOCOL ) ;
  181. }
  182. //
  183. // Dispatcher constructor.
  184. //
  185. // Note: in some cases (e.g., a reply message) this message
  186. // mediator will only be used for dispatch. Then the original
  187. // request side mediator will take over.
  188. public CorbaMessageMediatorImpl(ORB orb,
  189. CorbaConnection connection,
  190. Message dispatchHeader,
  191. ByteBuffer byteBuffer)
  192. {
  193. this( orb, connection ) ;
  194. this.dispatchHeader = dispatchHeader;
  195. this.dispatchByteBuffer = byteBuffer;
  196. }
  197. ////////////////////////////////////////////////////
  198. //
  199. // MessageMediator
  200. //
  201. public Broker getBroker()
  202. {
  203. return orb;
  204. }
  205. public ContactInfo getContactInfo()
  206. {
  207. return contactInfo;
  208. }
  209. public Connection getConnection()
  210. {
  211. return connection;
  212. }
  213. public void initializeMessage()
  214. {
  215. getRequestHeader().write(outputObject);
  216. }
  217. public void finishSendingRequest()
  218. {
  219. // REVISIT: probably move logic in outputObject to here.
  220. outputObject.finishSendingMessage();
  221. }
  222. public InputObject waitForResponse()
  223. {
  224. if (getRequestHeader().isResponseExpected()) {
  225. return connection.waitForResponse(this);
  226. }
  227. return null;
  228. }
  229. public void setOutputObject(OutputObject outputObject)
  230. {
  231. this.outputObject = (CDROutputObject) outputObject;
  232. }
  233. public OutputObject getOutputObject()
  234. {
  235. return outputObject;
  236. }
  237. public void setInputObject(InputObject inputObject)
  238. {
  239. this.inputObject = (CDRInputObject) inputObject;
  240. }
  241. public InputObject getInputObject()
  242. {
  243. return inputObject;
  244. }
  245. ////////////////////////////////////////////////////
  246. //
  247. // CorbaMessageMediator
  248. //
  249. public void setReplyHeader(LocateReplyOrReplyMessage header)
  250. {
  251. this.replyHeader = header;
  252. this.replyIOR = header.getIOR(); // REVISIT - need separate field?
  253. }
  254. public LocateReplyMessage getLocateReplyHeader()
  255. {
  256. return (LocateReplyMessage) replyHeader;
  257. }
  258. public ReplyMessage getReplyHeader()
  259. {
  260. return (ReplyMessage) replyHeader;
  261. }
  262. public void setReplyExceptionDetailMessage(String message)
  263. {
  264. replyExceptionDetailMessage = message;
  265. }
  266. public RequestMessage getRequestHeader()
  267. {
  268. return requestHeader;
  269. }
  270. public GIOPVersion getGIOPVersion()
  271. {
  272. if (messageHeader != null) {
  273. return messageHeader.getGIOPVersion();
  274. }
  275. return getRequestHeader().getGIOPVersion();
  276. }
  277. public byte getEncodingVersion() {
  278. if (messageHeader != null) {
  279. return messageHeader.getEncodingVersion();
  280. }
  281. return getRequestHeader().getEncodingVersion();
  282. }
  283. public int getRequestId()
  284. {
  285. return getRequestHeader().getRequestId();
  286. }
  287. public Integer getRequestIdInteger()
  288. {
  289. if (requestIdInteger == null) {
  290. requestIdInteger = new Integer(getRequestHeader().getRequestId());
  291. }
  292. return requestIdInteger;
  293. }
  294. public boolean isOneWay()
  295. {
  296. return ! getRequestHeader().isResponseExpected();
  297. }
  298. public short getAddrDisposition()
  299. {
  300. return addrDisposition;
  301. }
  302. public String getOperationName()
  303. {
  304. return getRequestHeader().getOperation();
  305. }
  306. public ServiceContexts getRequestServiceContexts()
  307. {
  308. return getRequestHeader().getServiceContexts();
  309. }
  310. public ServiceContexts getReplyServiceContexts()
  311. {
  312. return getReplyHeader().getServiceContexts();
  313. }
  314. public void sendCancelRequestIfFinalFragmentNotSent()
  315. {
  316. if ((!sentFullMessage()) && sentFragment() &&
  317. (!cancelRequestAlreadySent))
  318. {
  319. try {
  320. if (orb.subcontractDebugFlag) {
  321. dprint(".sendCancelRequestIfFinalFragmentNotSent->: "
  322. + opAndId(this));
  323. }
  324. connection.sendCancelRequestWithLock(getGIOPVersion(),
  325. getRequestId());
  326. // Case: first a location forward, then a marshaling
  327. // exception (e.g., non-serializable object). Only
  328. // send cancel once.
  329. cancelRequestAlreadySent = true;
  330. } catch (IOException e) {
  331. if (orb.subcontractDebugFlag) {
  332. dprint(".sendCancelRequestIfFinalFragmentNotSent: !ERROR : " + opAndId(this),
  333. e);
  334. }
  335. // REVISIT: we could attempt to send a final incomplete
  336. // fragment in this case.
  337. throw interceptorWrapper.ioexceptionDuringCancelRequest(
  338. CompletionStatus.COMPLETED_MAYBE, e );
  339. } finally {
  340. if (orb.subcontractDebugFlag) {
  341. dprint(".sendCancelRequestIfFinalFragmentNotSent<-: "
  342. + opAndId(this));
  343. }
  344. }
  345. }
  346. }
  347. public boolean sentFullMessage()
  348. {
  349. return outputObject.getBufferManager().sentFullMessage();
  350. }
  351. public boolean sentFragment()
  352. {
  353. return outputObject.getBufferManager().sentFragment();
  354. }
  355. public void setDIIInfo(org.omg.CORBA.Request diiRequest)
  356. {
  357. this.diiRequest = diiRequest;
  358. }
  359. public boolean isDIIRequest()
  360. {
  361. return diiRequest != null;
  362. }
  363. public Exception unmarshalDIIUserException(String repoId, InputStream is)
  364. {
  365. if (! isDIIRequest()) {
  366. return null;
  367. }
  368. ExceptionList _exceptions = diiRequest.exceptions();
  369. try {
  370. // Find the typecode for the exception
  371. for (int i=0; i<_exceptions.count() ; i++) {
  372. TypeCode tc = _exceptions.item(i);
  373. if ( tc.id().equals(repoId) ) {
  374. // Since we dont have the actual user exception
  375. // class, the spec says we have to create an
  376. // UnknownUserException and put it in the
  377. // environment.
  378. Any eany = orb.create_any();
  379. eany.read_value(is, (TypeCode)tc);
  380. return new UnknownUserException(eany);
  381. }
  382. }
  383. } catch (Exception b) {
  384. throw wrapper.unexpectedDiiException(b);
  385. }
  386. // must be a truly unknown exception
  387. return wrapper.unknownCorbaExc( CompletionStatus.COMPLETED_MAYBE);
  388. }
  389. public void setDIIException(Exception exception)
  390. {
  391. diiRequest.env().exception(exception);
  392. }
  393. public void handleDIIReply(InputStream inputStream)
  394. {
  395. if (! isDIIRequest()) {
  396. return;
  397. }
  398. ((RequestImpl)diiRequest).unmarshalReply(inputStream);
  399. }
  400. public Message getDispatchHeader()
  401. {
  402. return dispatchHeader;
  403. }
  404. public void setDispatchHeader(Message msg)
  405. {
  406. dispatchHeader = msg;
  407. }
  408. public ByteBuffer getDispatchBuffer()
  409. {
  410. return dispatchByteBuffer;
  411. }
  412. public void setDispatchBuffer(ByteBuffer byteBuffer)
  413. {
  414. dispatchByteBuffer = byteBuffer;
  415. }
  416. public int getThreadPoolToUse() {
  417. int poolToUse = 0;
  418. Message msg = getDispatchHeader();
  419. // A null msg should never happen. But, we'll be
  420. // defensive just in case.
  421. if (msg != null) {
  422. poolToUse = msg.getThreadPoolToUse();
  423. }
  424. return poolToUse;
  425. }
  426. public byte getStreamFormatVersion()
  427. {
  428. // REVISIT: ContactInfo/Acceptor output object factories
  429. // just use this. Maybe need to distinguish:
  430. // createOutputObjectForRequest
  431. // createOutputObjectForReply
  432. // then do getStreamFormatVersionForRequest/ForReply here.
  433. if (streamFormatVersionSet) {
  434. return streamFormatVersion;
  435. }
  436. return getStreamFormatVersionForReply();
  437. }
  438. /**
  439. * If the RMI-IIOP maximum stream format version service context
  440. * is present, it indicates the maximum stream format version we
  441. * could use for the reply. If it isn't present, the default is
  442. * 2 for GIOP 1.3 or greater, 1 for lower.
  443. *
  444. * This is only sent on requests. Clients can find out the
  445. * server's maximum by looking for a tagged component in the IOR.
  446. */
  447. public byte getStreamFormatVersionForReply() {
  448. // NOTE: The request service contexts may indicate the max.
  449. ServiceContexts svc = getRequestServiceContexts();
  450. MaxStreamFormatVersionServiceContext msfvsc
  451. = (MaxStreamFormatVersionServiceContext)svc.get(
  452. MaxStreamFormatVersionServiceContext.SERVICE_CONTEXT_ID);
  453. if (msfvsc != null) {
  454. byte localMaxVersion = ORBUtility.getMaxStreamFormatVersion();
  455. byte remoteMaxVersion = msfvsc.getMaximumStreamFormatVersion();
  456. return (byte)Math.min(localMaxVersion, remoteMaxVersion);
  457. } else {
  458. // Defaults to 1 for GIOP 1.2 or less, 2 for
  459. // GIOP 1.3 or higher.
  460. if (getGIOPVersion().lessThan(GIOPVersion.V1_3))
  461. return ORBConstants.STREAM_FORMAT_VERSION_1;
  462. else
  463. return ORBConstants.STREAM_FORMAT_VERSION_2;
  464. }
  465. }
  466. public boolean isSystemExceptionReply()
  467. {
  468. return replyHeader.getReplyStatus() == ReplyMessage.SYSTEM_EXCEPTION;
  469. }
  470. public boolean isUserExceptionReply()
  471. {
  472. return replyHeader.getReplyStatus() == ReplyMessage.USER_EXCEPTION;
  473. }
  474. public boolean isLocationForwardReply()
  475. {
  476. return ( (replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD) ||
  477. (replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD_PERM) );
  478. //return replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD;
  479. }
  480. public boolean isDifferentAddrDispositionRequestedReply()
  481. {
  482. return replyHeader.getReplyStatus() == ReplyMessage.NEEDS_ADDRESSING_MODE;
  483. }
  484. public short getAddrDispositionReply()
  485. {
  486. return replyHeader.getAddrDisposition();
  487. }
  488. public IOR getForwardedIOR()
  489. {
  490. return replyHeader.getIOR();
  491. }
  492. public SystemException getSystemExceptionReply()
  493. {
  494. return replyHeader.getSystemException(replyExceptionDetailMessage);
  495. }
  496. ////////////////////////////////////////////////////
  497. //
  498. // Used by server side.
  499. //
  500. public ObjectKey getObjectKey()
  501. {
  502. return getRequestHeader().getObjectKey();
  503. }
  504. public void setProtocolHandler(CorbaProtocolHandler protocolHandler)
  505. {
  506. throw wrapper.methodShouldNotBeCalled() ;
  507. }
  508. public CorbaProtocolHandler getProtocolHandler()
  509. {
  510. // REVISIT: should look up in orb registry.
  511. return this;
  512. }
  513. ////////////////////////////////////////////////////
  514. //
  515. // ResponseHandler
  516. //
  517. public org.omg.CORBA.portable.OutputStream createReply()
  518. {
  519. // Note: relies on side-effect of setting mediator output field.
  520. // REVISIT - cast - need interface
  521. getProtocolHandler().createResponse(this, (ServiceContexts) null);
  522. return (OutputStream) getOutputObject();
  523. }
  524. public org.omg.CORBA.portable.OutputStream createExceptionReply()
  525. {
  526. // Note: relies on side-effect of setting mediator output field.
  527. // REVISIT - cast - need interface
  528. getProtocolHandler().createUserExceptionResponse(this, (ServiceContexts) null);
  529. return (OutputStream) getOutputObject();
  530. }
  531. public boolean executeReturnServantInResponseConstructor()
  532. {
  533. return _executeReturnServantInResponseConstructor;
  534. }
  535. public void setExecuteReturnServantInResponseConstructor(boolean b)
  536. {
  537. _executeReturnServantInResponseConstructor = b;
  538. }
  539. public boolean executeRemoveThreadInfoInResponseConstructor()
  540. {
  541. return _executeRemoveThreadInfoInResponseConstructor;
  542. }
  543. public void setExecuteRemoveThreadInfoInResponseConstructor(boolean b)
  544. {
  545. _executeRemoveThreadInfoInResponseConstructor = b;
  546. }
  547. public boolean executePIInResponseConstructor()
  548. {
  549. return _executePIInResponseConstructor;
  550. }
  551. public void setExecutePIInResponseConstructor( boolean b )
  552. {
  553. _executePIInResponseConstructor = b;
  554. }
  555. private byte getStreamFormatVersionForThisRequest(IOR ior,
  556. GIOPVersion giopVersion)
  557. {
  558. byte localMaxVersion
  559. = ORBUtility.getMaxStreamFormatVersion();
  560. IOR effectiveTargetIOR =
  561. ((CorbaContactInfo)this.contactInfo).getEffectiveTargetIOR();
  562. IIOPProfileTemplate temp =
  563. (IIOPProfileTemplate)effectiveTargetIOR.getProfile().getTaggedProfileTemplate();
  564. Iterator iter = temp.iteratorById(TAG_RMI_CUSTOM_MAX_STREAM_FORMAT.value);
  565. if (!iter.hasNext()) {
  566. // Didn't have the max stream format version tagged
  567. // component.
  568. if (giopVersion.lessThan(GIOPVersion.V1_3))
  569. return ORBConstants.STREAM_FORMAT_VERSION_1;
  570. else
  571. return ORBConstants.STREAM_FORMAT_VERSION_2;
  572. }
  573. byte remoteMaxVersion
  574. = ((MaxStreamFormatVersionComponent)iter.next()).getMaxStreamFormatVersion();
  575. return (byte)Math.min(localMaxVersion, remoteMaxVersion);
  576. }
  577. ////////////////////////////////////////////////////////////////////////
  578. ////////////////////////////////////////////////////////////////////////
  579. ////////////////////////////////////////////////////////////////////////
  580. // REVISIT - This could be a separate implementation object looked
  581. // up in a registry. However it needs some state in the message
  582. // mediator so combine for now.
  583. protected boolean isThreadDone = false;
  584. ////////////////////////////////////////////////////
  585. //
  586. // pept.protocol.ProtocolHandler
  587. //
  588. public boolean handleRequest(MessageMediator messageMediator)
  589. {
  590. try {
  591. dispatchHeader.callback(this);
  592. } catch (IOException e) {
  593. // REVISIT - this should be handled internally.
  594. ;
  595. }
  596. return isThreadDone;
  597. }
  598. ////////////////////////////////////////////////////
  599. //
  600. // iiop.messages.MessageHandler
  601. //
  602. private void setWorkThenPoolOrResumeSelect(Message header)
  603. {
  604. if (getConnection().getEventHandler().shouldUseSelectThreadToWait()) {
  605. resumeSelect(header);
  606. } else {
  607. // Leader/Follower when using reader thread.
  608. // When this thread is done working it will go back in pool.
  609. isThreadDone = true;
  610. // First unregister current registration.
  611. orb.getTransportManager().getSelector(0)
  612. .unregisterForEvent(getConnection().getEventHandler());
  613. // Have another thread become the reader.
  614. orb.getTransportManager().getSelector(0)
  615. .registerForEvent(getConnection().getEventHandler());
  616. }
  617. }
  618. private void setWorkThenReadOrResumeSelect(Message header)
  619. {
  620. if (getConnection().getEventHandler().shouldUseSelectThreadToWait()) {
  621. resumeSelect(header);
  622. } else {
  623. // When using reader thread then wen this thread is
  624. // done working it will continue reading.
  625. isThreadDone = false;
  626. }
  627. }
  628. private void resumeSelect(Message header)
  629. {
  630. // NOTE: VERY IMPORTANT:
  631. // Only participate in select after getting to the point
  632. // that proper serialization of fragments is ensured.
  633. if (transportDebug()) {
  634. dprint(".resumeSelect:->");
  635. // REVISIT: not-OO:
  636. String requestId = "?";
  637. if (header instanceof RequestMessage) {
  638. requestId =
  639. new Integer(((RequestMessage)header)
  640. .getRequestId()).toString();
  641. } else if (header instanceof ReplyMessage) {
  642. requestId =
  643. new Integer(((ReplyMessage)header)
  644. .getRequestId()).toString();
  645. } else if (header instanceof FragmentMessage_1_2) {
  646. requestId =
  647. new Integer(((FragmentMessage_1_2)header)
  648. .getRequestId()).toString();
  649. }
  650. dprint(".resumeSelect: id/"
  651. + requestId
  652. + " " + getConnection()
  653. );
  654. }
  655. // IMPORTANT: To avoid bug (4953599), we force the Thread that does the NIO select
  656. // to also do the enable/disable of Ops using SelectionKey.interestOps(Ops of Interest).
  657. // Otherwise, the SelectionKey.interestOps(Ops of Interest) may block indefinitely in
  658. // this thread.
  659. EventHandler eventHandler = getConnection().getEventHandler();
  660. orb.getTransportManager().getSelector(0).registerInterestOps(eventHandler);
  661. if (transportDebug()) {
  662. dprint(".resumeSelect:<-");
  663. }
  664. }
  665. private void setInputObject()
  666. {
  667. // REVISIT: refactor createInputObject (and createMessageMediator)
  668. // into base PlugInFactory. Get via connection (either ContactInfo
  669. // or Acceptor).
  670. if (getConnection().getContactInfo() != null) {
  671. inputObject = (CDRInputObject)
  672. getConnection().getContactInfo()
  673. .createInputObject(orb, this);
  674. } else if (getConnection().getAcceptor() != null) {
  675. inputObject = (CDRInputObject)
  676. getConnection().getAcceptor()
  677. .createInputObject(orb, this);
  678. } else {
  679. throw new RuntimeException("CorbaMessageMediatorImpl.setInputObject");
  680. }
  681. inputObject.setMessageMediator(this);
  682. setInputObject(inputObject);
  683. }
  684. private void signalResponseReceived()
  685. {
  686. // This will end up using the MessageMediator associated with
  687. // the original request instead of the current mediator (which
  688. // need to be constructed to hold the dispatchBuffer and connection).
  689. connection.getResponseWaitingRoom()
  690. .responseReceived((InputObject)inputObject);
  691. }
  692. // This handles message types for which we don't create classes.
  693. public void handleInput(Message header) throws IOException
  694. {
  695. try {
  696. messageHeader = header;
  697. if (transportDebug())
  698. dprint(".handleInput->: "
  699. + MessageBase.typeToString(header.getType()));
  700. setWorkThenReadOrResumeSelect(header);
  701. switch(header.getType())
  702. {
  703. case Message.GIOPCloseConnection:
  704. if (transportDebug()) {
  705. dprint(".handleInput: CloseConnection: purging");
  706. }
  707. connection.purgeCalls(wrapper.connectionRebind(), true, false);
  708. break;
  709. case Message.GIOPMessageError:
  710. if (transportDebug()) {
  711. dprint(".handleInput: MessageError: purging");
  712. }
  713. connection.purgeCalls(wrapper.recvMsgError(), true, false);
  714. break;
  715. default:
  716. if (transportDebug()) {
  717. dprint(".handleInput: ERROR: "
  718. + MessageBase.typeToString(header.getType()));
  719. }
  720. throw wrapper.badGiopRequestType() ;
  721. }
  722. releaseByteBufferToPool();
  723. } finally {
  724. if (transportDebug()) {
  725. dprint(".handleInput<-: "
  726. + MessageBase.typeToString(header.getType()));
  727. }
  728. }
  729. }
  730. public void handleInput(RequestMessage_1_0 header) throws IOException
  731. {
  732. try {
  733. if (transportDebug()) dprint(".REQUEST 1.0->: " + header);
  734. try {
  735. messageHeader = requestHeader = (RequestMessage) header;
  736. setInputObject();
  737. } finally {
  738. setWorkThenPoolOrResumeSelect(header);
  739. }
  740. getProtocolHandler().handleRequest(header, this);
  741. } catch (Throwable t) {
  742. if (transportDebug())
  743. dprint(".REQUEST 1.0: !!ERROR!!: " + header, t);
  744. // Mask the exception from thread.;
  745. } finally {
  746. if (transportDebug()) dprint(".REQUEST 1.0<-: " + header);
  747. }
  748. }
  749. public void handleInput(RequestMessage_1_1 header) throws IOException
  750. {
  751. try {
  752. if (transportDebug()) dprint(".REQUEST 1.1->: " + header);
  753. try {
  754. messageHeader = requestHeader = (RequestMessage) header;
  755. setInputObject();
  756. connection.serverRequest_1_1_Put(this);
  757. } finally {
  758. setWorkThenPoolOrResumeSelect(header);
  759. }
  760. getProtocolHandler().handleRequest(header, this);
  761. } catch (Throwable t) {
  762. if (transportDebug())
  763. dprint(".REQUEST 1.1: !!ERROR!!: " + header, t);
  764. // Mask the exception from thread.;
  765. } finally {
  766. if (transportDebug()) dprint(".REQUEST 1.1<-: " + header);
  767. }
  768. }
  769. // REVISIT: this is identical to 1_0 except for fragment part.
  770. public void handleInput(RequestMessage_1_2 header) throws IOException
  771. {
  772. try {
  773. try {
  774. messageHeader = requestHeader = (RequestMessage) header;
  775. header.unmarshalRequestID(dispatchByteBuffer);
  776. setInputObject();
  777. if (transportDebug()) dprint(".REQUEST 1.2->: id/"
  778. + header.getRequestId()
  779. + ": "
  780. + header);
  781. // NOTE: in the old code this used to be done conditionally:
  782. // if (header.moreFragmentsToFollow()).
  783. // Now we always put it in. We take it out when
  784. // the response is done.
  785. // This must happen now so if a header is fragmented the stream
  786. // may be found.
  787. connection.serverRequestMapPut(header.getRequestId(), this);
  788. } finally {
  789. // Leader/Follower.
  790. // Note: This *MUST* come after putting stream in above map
  791. // since the header may be fragmented and you do not want to
  792. // start reading again until the map above is set.
  793. setWorkThenPoolOrResumeSelect(header);
  794. }
  795. //inputObject.unmarshalHeader(); // done in subcontract.
  796. getProtocolHandler().handleRequest(header, this);
  797. } catch (Throwable t) {
  798. if (transportDebug()) dprint(".REQUEST 1.2: id/"
  799. + header.getRequestId()
  800. + ": !!ERROR!!: "
  801. + header,
  802. t);
  803. // Mask the exception from thread.;
  804. } finally {
  805. connection.serverRequestMapRemove(header.getRequestId());
  806. if (transportDebug()) dprint(".REQUEST 1.2<-: id/"
  807. + header.getRequestId()
  808. + ": "
  809. + header);
  810. }
  811. }
  812. public void handleInput(ReplyMessage_1_0 header) throws IOException
  813. {
  814. try {
  815. try {
  816. if (transportDebug()) dprint(".REPLY 1.0->: " + header);
  817. messageHeader = replyHeader = (ReplyMessage) header;
  818. setInputObject();
  819. // REVISIT: this should be done by waiting thread.
  820. inputObject.unmarshalHeader();
  821. signalResponseReceived();
  822. } finally{
  823. setWorkThenReadOrResumeSelect(header);
  824. }
  825. } catch (Throwable t) {
  826. if (transportDebug())dprint(".REPLY 1.0: !!ERROR!!: " + header, t);
  827. // Mask the exception from thread.;
  828. } finally {
  829. if (transportDebug()) dprint(".REPLY 1.0<-: " + header);
  830. }
  831. }
  832. public void handleInput(ReplyMessage_1_1 header) throws IOException
  833. {
  834. try {
  835. if (transportDebug()) dprint(".REPLY 1.1->: " + header);
  836. messageHeader = replyHeader = (ReplyMessage) header;
  837. setInputObject();
  838. if (header.moreFragmentsToFollow()) {
  839. // More fragments are coming to complete this reply, so keep
  840. // a reference to the InputStream so we can add the fragments
  841. connection.clientReply_1_1_Put(this);
  842. // In 1.1, we can't assume that we have the request ID in the
  843. // first fragment. Thus, another thread is used
  844. // to be the reader while this thread unmarshals
  845. // the extended header and wakes up the client thread.
  846. setWorkThenPoolOrResumeSelect(header);
  847. // REVISIT - error handling.
  848. // This must be done now.
  849. inputObject.unmarshalHeader();
  850. signalResponseReceived();
  851. } else {
  852. // Not fragmented, therefore we know the request
  853. // ID is here. Thus, we can unmarshal the extended header
  854. // and wake up the client thread without using a third
  855. // thread as above.
  856. // REVISIT - error handling during unmarshal.
  857. // This must be done now to get the request id.
  858. inputObject.unmarshalHeader();
  859. signalResponseReceived();
  860. setWorkThenReadOrResumeSelect(header);
  861. }
  862. } catch (Throwable t) {
  863. if (transportDebug()) dprint(".REPLY 1.1: !!ERROR!!: " + header);
  864. // Mask the exception from thread.;
  865. } finally {
  866. if (transportDebug()) dprint(".REPLY 1.1<-: " + header);
  867. }
  868. }
  869. public void handleInput(ReplyMessage_1_2 header) throws IOException
  870. {
  871. try {
  872. try {
  873. messageHeader = replyHeader = (ReplyMessage) header;
  874. // We know that the request ID is in the first fragment
  875. header.unmarshalRequestID(dispatchByteBuffer);
  876. if (transportDebug()) {
  877. dprint(".REPLY 1.2->: id/"
  878. + + header.getRequestId()
  879. + ": more?: " + header.moreFragmentsToFollow()
  880. + ": " + header);
  881. }
  882. setInputObject();
  883. signalResponseReceived();
  884. } finally {
  885. setWorkThenReadOrResumeSelect(header);
  886. }
  887. } catch (Throwable t) {
  888. if (transportDebug()) dprint(".REPLY 1.2: id/"
  889. + header.getRequestId()
  890. + ": !!ERROR!!: "
  891. + header, t);
  892. // Mask the exception from thread.;
  893. } finally {
  894. if (transportDebug()) dprint(".REPLY 1.2<-: id/"
  895. + header.getRequestId()
  896. + ": "
  897. + header);
  898. }
  899. }
  900. public void handleInput(LocateRequestMessage_1_0 header) throws IOException
  901. {
  902. try {
  903. if (transportDebug())
  904. dprint(".LOCATE_REQUEST 1.0->: " + header);
  905. try {
  906. messageHeader = header;
  907. setInputObject();
  908. } finally {
  909. setWorkThenPoolOrResumeSelect(header);
  910. }
  911. getProtocolHandler().handleRequest(header, this);
  912. } catch (Throwable t) {
  913. if (transportDebug())
  914. dprint(".LOCATE_REQUEST 1.0: !!ERROR!!: " + header, t);
  915. // Mask the exception from thread.;
  916. } finally {
  917. if (transportDebug())
  918. dprint(".LOCATE_REQUEST 1.0<-: " + header);
  919. }
  920. }
  921. public void handleInput(LocateRequestMessage_1_1 header) throws IOException
  922. {
  923. try {
  924. if (transportDebug())
  925. dprint(".LOCATE_REQUEST 1.1->: " + header);
  926. try {
  927. messageHeader = header;
  928. setInputObject();
  929. } finally {
  930. setWorkThenPoolOrResumeSelect(header);
  931. }
  932. getProtocolHandler().handleRequest(header, this);
  933. } catch (Throwable t) {
  934. if (transportDebug())
  935. dprint(".LOCATE_REQUEST 1.1: !!ERROR!!: " + header, t);
  936. // Mask the exception from thread.;
  937. } finally {
  938. if (transportDebug())
  939. dprint(".LOCATE_REQUEST 1.1<-:" + header);
  940. }
  941. }
  942. public void handleInput(LocateRequestMessage_1_2 header) throws IOException
  943. {
  944. try {
  945. try {
  946. messageHeader = header;
  947. header.unmarshalRequestID(dispatchByteBuffer);
  948. setInputObject();
  949. if (transportDebug())
  950. dprint(".LOCATE_REQUEST 1.2->: id/"
  951. + header.getRequestId()
  952. + ": "
  953. + header);
  954. if (header.moreFragmentsToFollow()) {
  955. connection.serverRequestMapPut(header.getRequestId(),this);
  956. }
  957. } finally {
  958. setWorkThenPoolOrResumeSelect(header);
  959. }
  960. getProtocolHandler().handleRequest(header, this);
  961. } catch (Throwable t) {
  962. if (transportDebug())
  963. dprint(".LOCATE_REQUEST 1.2: id/"
  964. + header.getRequestId()
  965. + ": !!ERROR!!: "
  966. + header, t);
  967. // Mask the exception from thread.;
  968. } finally {
  969. if (transportDebug())
  970. dprint(".LOCATE_REQUEST 1.2<-: id/"
  971. + header.getRequestId()
  972. + ": "
  973. + header);
  974. }
  975. }
  976. public void handleInput(LocateReplyMessage_1_0 header) throws IOException
  977. {
  978. try {
  979. if (transportDebug())
  980. dprint(".LOCATE_REPLY 1.0->:" + header);
  981. try {
  982. messageHeader = header;
  983. setInputObject();
  984. inputObject.unmarshalHeader(); // REVISIT Put in subcontract.
  985. signalResponseReceived();
  986. } finally {
  987. setWorkThenReadOrResumeSelect(header);
  988. }
  989. } catch (Throwable t) {
  990. if (transportDebug())
  991. dprint(".LOCATE_REPLY 1.0: !!ERROR!!: " + header, t);
  992. // Mask the exception from thread.;
  993. } finally {
  994. if (transportDebug())
  995. dprint(".LOCATE_REPLY 1.0<-: " + header);
  996. }
  997. }
  998. public void handleInput(LocateReplyMessage_1_1 header) throws IOException
  999. {
  1000. try {
  1001. if (transportDebug()) dprint(".LOCATE_REPLY 1.1->: " + header);
  1002. try {
  1003. messageHeader = header;
  1004. setInputObject();
  1005. // Fragmented LocateReplies are not allowed in 1.1.
  1006. inputObject.unmarshalHeader();
  1007. signalResponseReceived();
  1008. } finally {
  1009. setWorkThenReadOrResumeSelect(header);
  1010. }
  1011. } catch (Throwable t) {
  1012. if (transportDebug())
  1013. dprint(".LOCATE_REPLY 1.1: !!ERROR!!: " + header, t);
  1014. // Mask the exception from thread.;
  1015. } finally {
  1016. if (transportDebug()) dprint(".LOCATE_REPLY 1.1<-: " + header);
  1017. }
  1018. }
  1019. public void handleInput(LocateReplyMessage_1_2 header) throws IOException
  1020. {
  1021. try {
  1022. try {
  1023. messageHeader = header;
  1024. // No need to put in client reply map - already there.
  1025. header.unmarshalRequestID(dispatchByteBuffer);
  1026. setInputObject();
  1027. if (transportDebug()) dprint(".LOCATE_REPLY 1.2->: id/"
  1028. + header.getRequestId()
  1029. + ": "
  1030. + header);
  1031. signalResponseReceived();
  1032. } finally {
  1033. setWorkThenPoolOrResumeSelect(header); // REVISIT
  1034. }
  1035. } catch (Throwable t) {
  1036. if (transportDebug())
  1037. dprint(".LOCATE_REPLY 1.2: id/"
  1038. + header.getRequestId()
  1039. + ": !!ERROR!!: "
  1040. + header, t);
  1041. // Mask the exception from thread.;
  1042. } finally {
  1043. if (transportDebug()) dprint(".LOCATE_REPLY 1.2<-: id/"
  1044. + header.getRequestId()
  1045. + ": "
  1046. + header);
  1047. }
  1048. }
  1049. public void handleInput(FragmentMessage_1_1 header) throws IOException
  1050. {
  1051. try {
  1052. if (transportDebug()) {
  1053. dprint(".FRAGMENT 1.1->: "
  1054. + "more?: " + header.moreFragmentsToFollow()
  1055. + ": " + header);
  1056. }
  1057. try {
  1058. messageHeader = header;
  1059. MessageMediator mediator = null;
  1060. CDRInputObject inputObject = null;
  1061. if (connection.isServer()) {
  1062. mediator = connection.serverRequest_1_1_Get();
  1063. } else {
  1064. mediator = connection.clientReply_1_1_Get();
  1065. }
  1066. if (mediator != null) {
  1067. inputObject = (CDRInputObject) mediator.getInputObject();
  1068. }
  1069. // If no input stream available, then discard the fragment.
  1070. // This can happen:
  1071. // 1. if a fragment message is received prior to receiving
  1072. // the original request/reply message. Very unlikely.
  1073. // 2. if a fragment message is received after the
  1074. // reply has been sent (early replies)
  1075. // Note: In the case of early replies, the fragments received
  1076. // during the request processing (which are never unmarshaled),
  1077. // will eventually be discarded by the GC.
  1078. if (inputObject == null) {
  1079. if (transportDebug())
  1080. dprint(".FRAGMENT 1.1: ++++DISCARDING++++: " + header);
  1081. // need to release dispatchByteBuffer to pool if
  1082. // we are discarding
  1083. releaseByteBufferToPool();
  1084. return;
  1085. }
  1086. inputObject.getBufferManager()
  1087. .processFragment(dispatchByteBuffer, header);
  1088. if (! header.moreFragmentsToFollow()) {
  1089. if (connection.isServer()) {
  1090. connection.serverRequest_1_1_Remove();
  1091. } else {
  1092. connection.clientReply_1_1_Remove();
  1093. }
  1094. }
  1095. } finally {
  1096. // NOTE: This *must* come after queing the fragment
  1097. // when using the selector to ensure fragments stay in order.
  1098. setWorkThenReadOrResumeSelect(header);
  1099. }
  1100. } catch (Throwable t) {
  1101. if (transportDebug())
  1102. dprint(".FRAGMENT 1.1: !!ERROR!!: " + header, t);
  1103. // Mask the exception from thread.;
  1104. } finally {
  1105. if (transportDebug()) dprint(".FRAGMENT 1.1<-: " + header);
  1106. }
  1107. }
  1108. public void handleInput(FragmentMessage_1_2 header) throws IOException
  1109. {
  1110. try {
  1111. try {
  1112. messageHeader = header;
  1113. // Note: We know it's a 1.2 fragment, we have the data, but
  1114. // we need the IIOPInputStream instance to unmarshal the
  1115. // request ID... but we need the request ID to get the
  1116. // IIOPInputStream instance. So we peek at the raw bytes.
  1117. header.unmarshalRequestID(dispatchByteBuffer);
  1118. if (transportDebug()) {
  1119. dprint(".FRAGMENT 1.2->: id/"
  1120. + header.getRequestId()
  1121. + ": more?: " + header.moreFragmentsToFollow()
  1122. + ": " + header);
  1123. }
  1124. MessageMediator mediator = null;
  1125. InputObject inputObject = null;
  1126. if (connection.isServer()) {
  1127. mediator =
  1128. connection.serverRequestMapGet(header.getRequestId());
  1129. } else {
  1130. mediator =
  1131. connection.clientRequestMapGet(header.getRequestId());
  1132. }
  1133. if (mediator != null) {
  1134. inputObject = mediator.getInputObject();
  1135. }
  1136. // See 1.1 comments.
  1137. if (inputObject == null) {
  1138. if (transportDebug()) {
  1139. dprint(".FRAGMENT 1.2: id/"
  1140. + header.getRequestId()
  1141. + ": ++++DISCARDING++++: "
  1142. + header);
  1143. }
  1144. // need to release dispatchByteBuffer to pool if
  1145. // we are discarding
  1146. releaseByteBufferToPool();
  1147. return;
  1148. }
  1149. ((CDRInputObject)inputObject)
  1150. .getBufferManager().processFragment(
  1151. dispatchByteBuffer, header);
  1152. // REVISIT: but if it is a server don't you have to remove the
  1153. // stream from the map?
  1154. if (! connection.isServer()) {
  1155. /* REVISIT
  1156. * No need to do anything.
  1157. * Should we mark that last was received?
  1158. if (! header.moreFragmentsToFollow()) {
  1159. // Last fragment.
  1160. }
  1161. */
  1162. }
  1163. } finally {
  1164. // NOTE: This *must* come after queing the fragment
  1165. // when using the selector to ensure fragments stay in order.
  1166. setWorkThenReadOrResumeSelect(header);
  1167. }
  1168. } catch (Throwable t) {
  1169. if (transportDebug())
  1170. dprint(".FRAGMENT 1.2: id/"
  1171. + header.getRequestId()
  1172. + ": !!ERROR!!: "
  1173. + header, t);
  1174. // Mask the exception from thread.;
  1175. } finally {
  1176. if (transportDebug()) dprint(".FRAGMENT 1.2<-: id/"
  1177. + header.getRequestId()
  1178. + ": "
  1179. + header);
  1180. }
  1181. }
  1182. public void handleInput(CancelRequestMessage header) throws IOException
  1183. {
  1184. try {
  1185. try {
  1186. messageHeader = header;
  1187. setInputObject();
  1188. // REVISIT: Move these two to subcontract.
  1189. inputObject.unmarshalHeader();
  1190. if (transportDebug()) dprint(".CANCEL->: id/"
  1191. + header.getRequestId() + ": "
  1192. + header.getGIOPVersion() + ": "
  1193. + header);
  1194. processCancelRequest(header.getRequestId());
  1195. releaseByteBufferToPool();
  1196. } finally {
  1197. setWorkThenReadOrResumeSelect(header);
  1198. }
  1199. } catch (Throwable t) {
  1200. if (transportDebug()) dprint(".CANCEL: id/"
  1201. + header.getRequestId()
  1202. + ": !!ERROR!!: "
  1203. + header, t);
  1204. // Mask the exception from thread.;
  1205. } finally {
  1206. if (transportDebug()) dprint(".CANCEL<-: id/"
  1207. + header.getRequestId() + ": "
  1208. + header.getGIOPVersion() + ": "
  1209. + header);
  1210. }
  1211. }
  1212. private void throwNotImplemented()
  1213. {
  1214. isThreadDone = false;
  1215. throwNotImplemented("");
  1216. }
  1217. private void throwNotImplemented(String msg)
  1218. {
  1219. throw new RuntimeException("CorbaMessageMediatorImpl: not implemented " + msg);
  1220. }
  1221. private void dprint(String msg, Throwable t)
  1222. {
  1223. dprint(msg);
  1224. t.printStackTrace(System.out);
  1225. }
  1226. private void dprint(String msg)
  1227. {
  1228. ORBUtility.dprint("CorbaMessageMediatorImpl", msg);
  1229. }
  1230. protected String opAndId(CorbaMessageMediator mediator)
  1231. {
  1232. return ORBUtility.operationNameAndRequestId(mediator);
  1233. }
  1234. private boolean transportDebug()
  1235. {
  1236. return orb.transportDebugFlag;
  1237. }
  1238. // REVISIT: move this to subcontract (but both client and server need it).
  1239. private final void processCancelRequest(int cancelReqId) {
  1240. // The GIOP version of CancelRequest does not matter, since
  1241. // CancelRequest_1_0 could be sent to cancel a request which
  1242. // has a different GIOP version.
  1243. /*
  1244. * CancelRequest processing logic :
  1245. *
  1246. * - find the request with matching requestId
  1247. *
  1248. * - call cancelProcessing() in BufferManagerRead [BMR]
  1249. *
  1250. * - the hope is that worker thread would call BMR.underflow()
  1251. * to wait for more fragments to come in. When BMR.underflow() is
  1252. * called, if a CancelRequest had already arrived,
  1253. * the worker thread would throw ThreadDeath,
  1254. * else the thread would wait to be notified of the
  1255. * arrival of a new fragment or CancelRequest. Upon notification,
  1256. * the woken up thread would check to see if a CancelRequest had
  1257. * arrived and if so throw a ThreadDeath or it will continue to
  1258. * process the received fragment.
  1259. *
  1260. * - if all the fragments had been received prior to CancelRequest
  1261. * then the worker thread would never block in BMR.underflow().
  1262. * So, setting the abort flag in BMR has no effect. The request
  1263. * processing will complete normally.
  1264. *
  1265. * - in the case where the server has received enough fragments to
  1266. * start processing the request and the server sends out
  1267. * an early reply. In such a case if the CancelRequest arrives
  1268. * after the reply has been sent, it has no effect.
  1269. */
  1270. if (!connection.isServer()) {
  1271. return; // we do not support bi-directional giop yet, ignore.
  1272. }
  1273. // Try to get hold of the InputStream buffer.
  1274. // In the case of 1.0 requests there is no way to get hold of
  1275. // InputStream. Try out the 1.1 and 1.2 cases.
  1276. // was the request 1.2 ?
  1277. MessageMediator mediator = connection.serverRequestMapGet(cancelReqId);
  1278. int requestId ;
  1279. if (mediator == null) {
  1280. // was the request 1.1 ?
  1281. mediator = connection.serverRequest_1_1_Get();
  1282. if (mediator == null) {
  1283. // XXX log this!
  1284. // either the request was 1.0
  1285. // or an early reply has already been sent
  1286. // or request processing is over
  1287. // or its a spurious CancelRequest
  1288. return; // do nothing.
  1289. }
  1290. requestId = ((CorbaMessageMediator) mediator).getRequestId();
  1291. if (requestId != cancelReqId) {
  1292. // A spurious 1.1 CancelRequest has been received.
  1293. // XXX log this!
  1294. return; // do nothing
  1295. }
  1296. if (requestId == 0) { // special case
  1297. // XXX log this
  1298. // this means that
  1299. // 1. the 1.1 requests' requestId has not been received
  1300. // i.e., a CancelRequest was received even before the
  1301. // 1.1 request was received. The spec disallows this.
  1302. // 2. or the 1.1 request has a requestId 0.
  1303. //
  1304. // It is a little tricky to distinguish these two. So, be
  1305. // conservative and do not cancel the request. Downside is that
  1306. // 1.1 requests with requestId of 0 will never be cancelled.
  1307. return; // do nothing
  1308. }
  1309. } else {
  1310. requestId = ((CorbaMessageMediator) mediator).getRequestId();
  1311. }
  1312. Message msg = ((CorbaMessageMediator)mediator).getRequestHeader();
  1313. if (msg.getType() != Message.GIOPRequest) {
  1314. // Any mediator obtained here should only ever be for a GIOP
  1315. // request.
  1316. wrapper.badMessageTypeForCancel() ;
  1317. }
  1318. // At this point we have a valid message mediator that contains
  1319. // a valid requestId.
  1320. // at this point we have chosen a request to be cancelled. But we
  1321. // do not know if the target object's method has been invoked or not.
  1322. // Request input stream being available simply means that the request
  1323. // processing is not over yet. simply set the abort flag in the
  1324. // BMRS and hope that the worker thread would notice it (this can
  1325. // happen only if the request stream is being unmarshalled and the
  1326. // target's method has not been invoked yet). This guarantees
  1327. // that the requests which have been dispatched to the
  1328. // target's method will never be cancelled.
  1329. BufferManagerReadStream bufferManager = (BufferManagerReadStream)
  1330. ((CDRInputObject)mediator.getInputObject()).getBufferManager();
  1331. bufferManager.cancelProcessing(cancelReqId);
  1332. }
  1333. ////////////////////////////////////////////////////
  1334. //
  1335. // spi.protocol.CorbaProtocolHandler
  1336. //
  1337. public void handleRequest(RequestMessage msg,
  1338. CorbaMessageMediator messageMediator)
  1339. {
  1340. try {
  1341. beginRequest(messageMediator);
  1342. try {
  1343. handleRequestRequest(messageMediator);
  1344. if (messageMediator.isOneWay()) {
  1345. return;
  1346. }
  1347. } catch (Throwable t) {
  1348. if (messageMediator.isOneWay()) {
  1349. return;
  1350. }
  1351. handleThrowableDuringServerDispatch(
  1352. messageMediator, t, CompletionStatus.COMPLETED_MAYBE);
  1353. }
  1354. sendResponse(messageMediator);
  1355. } catch (Throwable t) {
  1356. dispatchError(messageMediator, "RequestMessage", t);
  1357. } finally {
  1358. endRequest(messageMediator);
  1359. }
  1360. }
  1361. public void handleRequest(LocateRequestMessage msg,
  1362. CorbaMessageMediator messageMediator)
  1363. {
  1364. try {
  1365. beginRequest(messageMediator);
  1366. try {
  1367. handleLocateRequest(messageMediator);
  1368. } catch (Throwable t) {
  1369. handleThrowableDuringServerDispatch(
  1370. messageMediator, t, CompletionStatus.COMPLETED_MAYBE);
  1371. }
  1372. sendResponse(messageMediator);
  1373. } catch (Throwable t) {
  1374. dispatchError(messageMediator, "LocateRequestMessage", t);
  1375. } finally {
  1376. endRequest(messageMediator);
  1377. }
  1378. }
  1379. private void beginRequest(CorbaMessageMediator messageMediator)
  1380. {
  1381. ORB orb = (ORB) messageMediator.getBroker();
  1382. if (orb.subcontractDebugFlag) {
  1383. dprint(".handleRequest->:");
  1384. }
  1385. connection.serverRequestProcessingBegins();
  1386. }
  1387. private void dispatchError(CorbaMessageMediator messageMediator,
  1388. String msg, Throwable t)
  1389. {
  1390. if (orb.subcontractDebugFlag) {
  1391. dprint(".handleRequest: " + opAndId(messageMediator)
  1392. + ": !!ERROR!!: "
  1393. + msg,
  1394. t);
  1395. }
  1396. // REVISIT - this makes hcks sendTwoObjects fail
  1397. // messageMediator.getConnection().close();
  1398. }
  1399. private void sendResponse(CorbaMessageMediator messageMediator)
  1400. {
  1401. if (orb.subcontractDebugFlag) {
  1402. dprint(".handleRequest: " + opAndId(messageMediator)
  1403. + ": sending response");
  1404. }
  1405. // REVISIT - type and location
  1406. CDROutputObject outputObject = (CDROutputObject)
  1407. messageMediator.getOutputObject();
  1408. if (outputObject != null) {
  1409. // REVISIT - can be null for TRANSIENT below.
  1410. outputObject.finishSendingMessage();
  1411. }
  1412. }
  1413. private void endRequest(CorbaMessageMediator messageMediator)
  1414. {
  1415. ORB orb = (ORB) messageMediator.getBroker();
  1416. if (orb.subcontractDebugFlag) {
  1417. dprint(".handleRequest<-: " + opAndId(messageMediator));
  1418. }
  1419. // release NIO ByteBuffers to ByteBufferPool
  1420. try {
  1421. OutputObject outputObj = messageMediator.getOutputObject();
  1422. if (outputObj != null) {
  1423. outputObj.close();
  1424. }
  1425. InputObject inputObj = messageMediator.getInputObject();
  1426. if (inputObj != null) {
  1427. inputObj.close();
  1428. }
  1429. } catch (IOException ex) {
  1430. // Given what close() does, this catch shouldn't ever happen.
  1431. // See CDRInput/OutputObject.close() for more info.
  1432. // It also won't result in a Corba error if an IOException happens.
  1433. if (orb.subcontractDebugFlag) {
  1434. dprint(".endRequest: IOException:" + ex.getMessage(), ex);
  1435. }
  1436. } finally {
  1437. ((CorbaConnection)messageMediator.getConnection()).serverRequestProcessingEnds();
  1438. }
  1439. }
  1440. protected void handleRequestRequest(CorbaMessageMediator messageMediator)
  1441. {
  1442. // Does nothing if already unmarshaled.
  1443. ((CDRInputObject)messageMediator.getInputObject()).unmarshalHeader();
  1444. ORB orb = (ORB)messageMediator.getBroker();
  1445. orb.checkShutdownState();
  1446. ObjectKey okey = messageMediator.getObjectKey();
  1447. if (orb.subcontractDebugFlag) {
  1448. ObjectKeyTemplate oktemp = okey.getTemplate() ;
  1449. dprint( ".handleRequest: " + opAndId(messageMediator)
  1450. + ": dispatching to scid: " + oktemp.getSubcontractId());
  1451. }
  1452. CorbaServerRequestDispatcher sc = okey.getServerRequestDispatcher(orb);
  1453. if (orb.subcontractDebugFlag) {
  1454. dprint(".handleRequest: " + opAndId(messageMediator)
  1455. + ": dispatching to sc: " + sc);
  1456. }
  1457. if (sc == null) {
  1458. throw wrapper.noServerScInDispatch() ;
  1459. }
  1460. // NOTE:
  1461. // This is necessary so mediator can act as ResponseHandler
  1462. // and pass necessary info to response constructors located
  1463. // in the subcontract.
  1464. // REVISIT - same class right now.
  1465. //messageMediator.setProtocolHandler(this);
  1466. try {
  1467. orb.startingDispatch();
  1468. sc.dispatch(messageMediator);
  1469. } finally {
  1470. orb.finishedDispatch();
  1471. }
  1472. }
  1473. protected void handleLocateRequest(CorbaMessageMediator messageMediator)
  1474. {
  1475. ORB orb = (ORB)messageMediator.getBroker();
  1476. LocateRequestMessage msg = (LocateRequestMessage)
  1477. messageMediator.getDispatchHeader();
  1478. IOR ior = null;
  1479. LocateReplyMessage reply = null;
  1480. short addrDisp = -1;
  1481. try {
  1482. ((CDRInputObject)messageMediator.getInputObject()).unmarshalHeader();
  1483. CorbaServerRequestDispatcher sc =
  1484. msg.getObjectKey().getServerRequestDispatcher( orb ) ;
  1485. if (sc == null) {
  1486. return;
  1487. }
  1488. ior = sc.locate(msg.getObjectKey());
  1489. if ( ior == null ) {
  1490. reply = MessageBase.createLocateReply(
  1491. orb, msg.getGIOPVersion(),
  1492. msg.getEncodingVersion(),
  1493. msg.getRequestId(),
  1494. LocateReplyMessage.OBJECT_HERE, null);
  1495. } else {
  1496. reply = MessageBase.createLocateReply(
  1497. orb, msg.getGIOPVersion(),
  1498. msg.getEncodingVersion(),
  1499. msg.getRequestId(),
  1500. LocateReplyMessage.OBJECT_FORWARD, ior);
  1501. }
  1502. // REVISIT: Should we catch SystemExceptions?
  1503. } catch (AddressingDispositionException ex) {
  1504. // create a response containing the expected target
  1505. // addressing disposition.
  1506. reply = MessageBase.createLocateReply(
  1507. orb, msg.getGIOPVersion(),
  1508. msg.getEncodingVersion(),
  1509. msg.getRequestId(),
  1510. LocateReplyMessage.LOC_NEEDS_ADDRESSING_MODE, null);
  1511. addrDisp = ex.expectedAddrDisp();
  1512. } catch (RequestCanceledException ex) {
  1513. return; // no need to send reply
  1514. } catch ( Exception ex ) {
  1515. // REVISIT If exception is not OBJECT_NOT_EXIST, it should
  1516. // have a different reply
  1517. // This handles OBJECT_NOT_EXIST exceptions thrown in
  1518. // the subcontract or obj manager. Send back UNKNOWN_OBJECT.
  1519. reply = MessageBase.createLocateReply(
  1520. orb, msg.getGIOPVersion(),
  1521. msg.getEncodingVersion(),
  1522. msg.getRequestId(),
  1523. LocateReplyMessage.UNKNOWN_OBJECT, null);
  1524. }
  1525. CDROutputObject outputObject =
  1526. createAppropriateOutputObject(messageMediator,
  1527. msg, reply);
  1528. messageMediator.setOutputObject(outputObject);
  1529. outputObject.setMessageMediator(messageMediator);
  1530. reply.write(outputObject);
  1531. // outputObject.setMessage(reply); // REVISIT - not necessary
  1532. if (ior != null) {
  1533. ior.write(outputObject);
  1534. }
  1535. if (addrDisp != -1) {
  1536. AddressingDispositionHelper.write(outputObject, addrDisp);
  1537. }
  1538. }
  1539. private CDROutputObject createAppropriateOutputObject(
  1540. CorbaMessageMediator messageMediator,
  1541. Message msg, LocateReplyMessage reply)
  1542. {
  1543. CDROutputObject outputObject;
  1544. if (msg.getGIOPVersion().lessThan(GIOPVersion.V1_2)) {
  1545. // locate msgs 1.0 & 1.1 :=> grow,
  1546. // REVISIT - build from factory
  1547. outputObject = new CDROutputObject(
  1548. (ORB) messageMediator.getBroker(),
  1549. this,
  1550. GIOPVersion.V1_0,
  1551. (CorbaConnection) messageMediator.getConnection(),
  1552. reply,
  1553. ORBConstants.STREAM_FORMAT_VERSION_1);
  1554. } else {
  1555. // 1.2 :=> stream
  1556. // REVISIT - build from factory
  1557. outputObject = new CDROutputObject(
  1558. (ORB) messageMediator.getBroker(),
  1559. messageMediator,
  1560. reply,
  1561. ORBConstants.STREAM_FORMAT_VERSION_1);
  1562. }
  1563. return outputObject;
  1564. }
  1565. public void handleThrowableDuringServerDispatch(
  1566. CorbaMessageMediator messageMediator,
  1567. Throwable throwable,
  1568. CompletionStatus completionStatus)
  1569. {
  1570. if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) {
  1571. dprint(".handleThrowableDuringServerDispatch: "
  1572. + opAndId(messageMediator) + ": "
  1573. + throwable);
  1574. }
  1575. // If we haven't unmarshaled the header, we probably don't
  1576. // have enough information to even send back a reply.
  1577. // REVISIT
  1578. // Cannot do this check. When target addressing disposition does
  1579. // not match (during header unmarshaling) it throws an exception
  1580. // to be handled here.
  1581. /*
  1582. if (! ((CDRInputObject)messageMediator.getInputObject())
  1583. .unmarshaledHeader()) {
  1584. return;
  1585. }
  1586. */
  1587. handleThrowableDuringServerDispatch(messageMediator,
  1588. throwable,
  1589. completionStatus,
  1590. 1);
  1591. }
  1592. // REVISIT - catch and ignore RequestCanceledException.
  1593. protected void handleThrowableDuringServerDispatch(
  1594. CorbaMessageMediator messageMediator,
  1595. Throwable throwable,
  1596. CompletionStatus completionStatus,
  1597. int iteration)
  1598. {
  1599. if (iteration > 10) {
  1600. if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) {
  1601. dprint(".handleThrowableDuringServerDispatch: "
  1602. + opAndId(messageMediator)
  1603. + ": cannot handle: "
  1604. + throwable);
  1605. }
  1606. // REVISIT - should we close connection?
  1607. RuntimeException rte =
  1608. new RuntimeException("handleThrowableDuringServerDispatch: " +
  1609. "cannot create response.");
  1610. rte.initCause(throwable);
  1611. throw rte;
  1612. }
  1613. try {
  1614. if (throwable instanceof ForwardException) {
  1615. ForwardException fex = (ForwardException)throwable ;
  1616. createLocationForward( messageMediator, fex.getIOR(), null ) ;
  1617. return;
  1618. }
  1619. if (throwable instanceof AddressingDispositionException) {
  1620. handleAddressingDisposition(
  1621. messageMediator,
  1622. (AddressingDispositionException)throwable);
  1623. return;
  1624. }
  1625. // Else.
  1626. SystemException sex =
  1627. convertThrowableToSystemException(throwable, completionStatus);
  1628. createSystemExceptionResponse(messageMediator, sex, null);
  1629. return;
  1630. } catch (Throwable throwable2) {
  1631. // User code (e.g., postinvoke, interceptors) may change
  1632. // the exception, so we end up back here.
  1633. // Report the changed exception.
  1634. handleThrowableDuringServerDispatch(messageMediator,
  1635. throwable2,
  1636. completionStatus,
  1637. iteration + 1);
  1638. return;
  1639. }
  1640. }
  1641. protected SystemException convertThrowableToSystemException(
  1642. Throwable throwable,
  1643. CompletionStatus completionStatus)
  1644. {
  1645. if (throwable instanceof SystemException) {
  1646. return (SystemException)throwable;
  1647. }
  1648. if (throwable instanceof RequestCanceledException) {
  1649. // Reporting an exception response causes the
  1650. // poa current stack, the interceptor stacks, etc.
  1651. // to be balanced. It also notifies interceptors
  1652. // that the request was cancelled.
  1653. return wrapper.requestCanceled( throwable ) ;
  1654. }
  1655. // NOTE: We do not trap ThreadDeath above Throwable.
  1656. // There is no reason to stop the thread. It is
  1657. // just a worker thread. The ORB never throws
  1658. // ThreadDeath. Client code may (e.g., in ServantManagers,
  1659. // interceptors, or servants) but that should not
  1660. // effect the ORB threads. So it is just handled
  1661. // generically.
  1662. //
  1663. // Last resort.
  1664. // If user code throws a non-SystemException report it generically.
  1665. //
  1666. return wrapper.runtimeexception( CompletionStatus.COMPLETED_MAYBE, throwable ) ;
  1667. }
  1668. protected void handleAddressingDisposition(
  1669. CorbaMessageMediator messageMediator,
  1670. AddressingDispositionException ex)
  1671. {
  1672. short addrDisp = -1;
  1673. // from iiop.RequestProcessor.
  1674. // Respond with expected target addressing disposition.
  1675. switch (messageMediator.getRequestHeader().getType()) {
  1676. case Message.GIOPRequest :
  1677. ReplyMessage replyHeader = MessageBase.createReply(
  1678. (ORB)messageMediator.getBroker(),
  1679. messageMediator.getGIOPVersion(),
  1680. messageMediator.getEncodingVersion(),
  1681. messageMediator.getRequestId(),
  1682. ReplyMessage.NEEDS_ADDRESSING_MODE,
  1683. null, null);
  1684. // REVISIT: via acceptor factory.
  1685. CDROutputObject outputObject = new CDROutputObject(
  1686. (ORB)messageMediator.getBroker(),
  1687. this,
  1688. messageMediator.getGIOPVersion(),
  1689. (CorbaConnection)messageMediator.getConnection(),
  1690. replyHeader,
  1691. ORBConstants.STREAM_FORMAT_VERSION_1);
  1692. messageMediator.setOutputObject(outputObject);
  1693. outputObject.setMessageMediator(messageMediator);
  1694. replyHeader.write(outputObject);
  1695. AddressingDispositionHelper.write(outputObject,
  1696. ex.expectedAddrDisp());
  1697. return;
  1698. case Message.GIOPLocateRequest :
  1699. LocateReplyMessage locateReplyHeader = MessageBase.createLocateReply(
  1700. (ORB)messageMediator.getBroker(),
  1701. messageMediator.getGIOPVersion(),
  1702. messageMediator.getEncodingVersion(),
  1703. messageMediator.getRequestId(),
  1704. LocateReplyMessage.LOC_NEEDS_ADDRESSING_MODE,
  1705. null);
  1706. addrDisp = ex.expectedAddrDisp();
  1707. // REVISIT: via acceptor factory.
  1708. outputObject =
  1709. createAppropriateOutputObject(messageMediator,
  1710. messageMediator.getRequestHeader(),
  1711. locateReplyHeader);
  1712. messageMediator.setOutputObject(outputObject);
  1713. outputObject.setMessageMediator(messageMediator);
  1714. locateReplyHeader.write(outputObject);
  1715. IOR ior = null;
  1716. if (ior != null) {
  1717. ior.write(outputObject);
  1718. }
  1719. if (addrDisp != -1) {
  1720. AddressingDispositionHelper.write(outputObject, addrDisp);
  1721. }
  1722. return;
  1723. }
  1724. }
  1725. public CorbaMessageMediator createResponse(
  1726. CorbaMessageMediator messageMediator,
  1727. ServiceContexts svc)
  1728. {
  1729. // REVISIT: ignore service contexts during framework transition.
  1730. // They are set in SubcontractResponseHandler to the wrong connection.
  1731. // Then they would be set again here and a duplicate contexts
  1732. // exception occurs.
  1733. return createResponseHelper(
  1734. messageMediator,
  1735. getServiceContextsForReply(messageMediator, null));
  1736. }
  1737. public CorbaMessageMediator createUserExceptionResponse(
  1738. CorbaMessageMediator messageMediator, ServiceContexts svc)
  1739. {
  1740. // REVISIT - same as above
  1741. return createResponseHelper(
  1742. messageMediator,
  1743. getServiceContextsForReply(messageMediator, null),
  1744. true);
  1745. }
  1746. public CorbaMessageMediator createUnknownExceptionResponse(
  1747. CorbaMessageMediator messageMediator, UnknownException ex)
  1748. {
  1749. // NOTE: This service context container gets augmented in
  1750. // tail call.
  1751. ServiceContexts contexts = null;
  1752. SystemException sys = new UNKNOWN( 0,
  1753. CompletionStatus.COMPLETED_MAYBE);
  1754. contexts = new ServiceContexts( (ORB)messageMediator.getBroker() );
  1755. UEInfoServiceContext uei = new UEInfoServiceContext(sys);
  1756. contexts.put( uei ) ;
  1757. return createSystemExceptionResponse(messageMediator, sys, contexts);
  1758. }
  1759. public CorbaMessageMediator createSystemExceptionResponse(
  1760. CorbaMessageMediator messageMediator,
  1761. SystemException ex,
  1762. ServiceContexts svc)
  1763. {
  1764. if (messageMediator.getConnection() != null) {
  1765. // It is possible that fragments of response have already been
  1766. // sent. Then an error may occur (e.g. marshaling error like
  1767. // non serializable object). In that case it is too late
  1768. // to send the exception. We just return the existing fragmented
  1769. // stream here. This will cause an incomplete last fragment
  1770. // to be sent. Then the other side will get a marshaling error
  1771. // when attempting to unmarshal.
  1772. // REVISIT: Impl - make interface method to do the following.
  1773. CorbaMessageMediatorImpl mediator = (CorbaMessageMediatorImpl)
  1774. ((CorbaConnection)messageMediator.getConnection())
  1775. .serverRequestMapGet(messageMediator.getRequestId());
  1776. OutputObject existingOutputObject = null;
  1777. if (mediator != null) {
  1778. existingOutputObject = mediator.getOutputObject();
  1779. }
  1780. // REVISIT: need to think about messageMediator containing correct
  1781. // pointer to output object.
  1782. if (existingOutputObject != null &&
  1783. mediator.sentFragment() &&
  1784. ! mediator.sentFullMessage())
  1785. {
  1786. return mediator;
  1787. }
  1788. }
  1789. // Only do this if interceptors have been initialized on this request
  1790. // and have not completed their lifecycle (otherwise the info stack
  1791. // may be empty or have a different request's entry on top).
  1792. if (messageMediator.executePIInResponseConstructor()) {
  1793. // REVISIT: not necessary in framework now?
  1794. // Inform Portable Interceptors of the SystemException. This is
  1795. // required to be done here because the ending interception point
  1796. // is called in the when creating the response below
  1797. // but we do not currently write the SystemException into the
  1798. // response until after the ending point is called.
  1799. ((ORB)messageMediator.getBroker()).getPIHandler().setServerPIInfo( ex );
  1800. }
  1801. if (((ORB)messageMediator.getBroker()).subcontractDebugFlag &&
  1802. ex != null)
  1803. {
  1804. dprint(".createSystemExceptionResponse: "
  1805. + opAndId(messageMediator),
  1806. ex);
  1807. }
  1808. ServiceContexts serviceContexts =
  1809. getServiceContextsForReply(messageMediator, svc);
  1810. // NOTE: We MUST add the service context before creating
  1811. // the response since service contexts are written to the
  1812. // stream when the response object is created.
  1813. addExceptionDetailMessage(messageMediator, ex, serviceContexts);
  1814. CorbaMessageMediator response =
  1815. createResponseHelper(messageMediator, serviceContexts, false);
  1816. // NOTE: From here on, it is too late to add more service contexts.
  1817. // They have already been serialized to the stream (and maybe fragments
  1818. // sent).
  1819. ORBUtility.writeSystemException(
  1820. ex, (OutputStream)response.getOutputObject());
  1821. return response;
  1822. }
  1823. private void addExceptionDetailMessage(CorbaMessageMediator mediator,
  1824. SystemException ex,
  1825. ServiceContexts serviceContexts)
  1826. {
  1827. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  1828. PrintWriter pw = new PrintWriter(baos);
  1829. ex.printStackTrace(pw);
  1830. pw.flush(); // NOTE: you must flush or baos will be empty.
  1831. EncapsOutputStream encapsOutputStream =
  1832. new EncapsOutputStream((ORB)mediator.getBroker());
  1833. encapsOutputStream.putEndian();
  1834. encapsOutputStream.write_wstring(baos.toString());
  1835. UnknownServiceContext serviceContext =
  1836. new UnknownServiceContext(ExceptionDetailMessage.value,
  1837. encapsOutputStream.toByteArray());
  1838. serviceContexts.put(serviceContext);
  1839. }
  1840. public CorbaMessageMediator createLocationForward(
  1841. CorbaMessageMediator messageMediator, IOR ior, ServiceContexts svc)
  1842. {
  1843. ReplyMessage reply
  1844. = MessageBase.createReply(
  1845. (ORB)messageMediator.getBroker(),
  1846. messageMediator.getGIOPVersion(),
  1847. messageMediator.getEncodingVersion(),
  1848. messageMediator.getRequestId(),
  1849. ReplyMessage.LOCATION_FORWARD,
  1850. getServiceContextsForReply(messageMediator, svc),
  1851. ior);
  1852. return createResponseHelper(messageMediator, reply, ior);
  1853. }
  1854. protected CorbaMessageMediator createResponseHelper(
  1855. CorbaMessageMediator messageMediator, ServiceContexts svc)
  1856. {
  1857. ReplyMessage message =
  1858. MessageBase.createReply(
  1859. (ORB)messageMediator.getBroker(),
  1860. messageMediator.getGIOPVersion(),
  1861. messageMediator.getEncodingVersion(),
  1862. messageMediator.getRequestId(),
  1863. ReplyMessage.NO_EXCEPTION,
  1864. svc,
  1865. null);
  1866. return createResponseHelper(messageMediator, message, null);
  1867. }
  1868. protected CorbaMessageMediator createResponseHelper(
  1869. CorbaMessageMediator messageMediator, ServiceContexts svc,boolean user)
  1870. {
  1871. ReplyMessage message =
  1872. MessageBase.createReply(
  1873. (ORB)messageMediator.getBroker(),
  1874. messageMediator.getGIOPVersion(),
  1875. messageMediator.getEncodingVersion(),
  1876. messageMediator.getRequestId(),
  1877. user ? ReplyMessage.USER_EXCEPTION :
  1878. ReplyMessage.SYSTEM_EXCEPTION,
  1879. svc,
  1880. null);
  1881. return createResponseHelper(messageMediator, message, null);
  1882. }
  1883. // REVISIT - IOR arg is ignored.
  1884. protected CorbaMessageMediator createResponseHelper(
  1885. CorbaMessageMediator messageMediator, ReplyMessage reply, IOR ior)
  1886. {
  1887. // REVISIT - these should be invoked from subcontract.
  1888. runServantPostInvoke(messageMediator);
  1889. runInterceptors(messageMediator, reply);
  1890. runRemoveThreadInfo(messageMediator);
  1891. if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) {
  1892. dprint(".createResponseHelper: "
  1893. + opAndId(messageMediator) + ": "
  1894. + reply);
  1895. }
  1896. messageMediator.setReplyHeader(reply);
  1897. OutputObject replyOutputObject;
  1898. // REVISIT = do not use null.
  1899. //
  1900. if (messageMediator.getConnection() == null) {
  1901. // REVISIT - needs factory
  1902. replyOutputObject =
  1903. new CDROutputObject(orb, messageMediator,
  1904. messageMediator.getReplyHeader(),
  1905. messageMediator.getStreamFormatVersion(),
  1906. BufferManagerFactory.GROW);
  1907. } else {
  1908. replyOutputObject = messageMediator.getConnection().getAcceptor()
  1909. .createOutputObject(messageMediator.getBroker(), messageMediator);
  1910. }
  1911. messageMediator.setOutputObject(replyOutputObject);
  1912. messageMediator.getOutputObject().setMessageMediator(messageMediator);
  1913. reply.write((OutputStream) messageMediator.getOutputObject());
  1914. if (reply.getIOR() != null) {
  1915. reply.getIOR().write((OutputStream) messageMediator.getOutputObject());
  1916. }
  1917. // REVISIT - not necessary?
  1918. //messageMediator.this.replyIOR = reply.getIOR();
  1919. // NOTE: The mediator holds onto output object so return value
  1920. // not really necessary.
  1921. return messageMediator;
  1922. }
  1923. protected void runServantPostInvoke(CorbaMessageMediator messageMediator)
  1924. {
  1925. // Run ServantLocator::postinvoke. This may cause a SystemException
  1926. // which will throw out of the constructor and return later
  1927. // to construct a reply for that exception. The internal logic
  1928. // of returnServant makes sure that postinvoke is only called once.
  1929. // REVISIT: instead of instanceof, put method on all orbs.
  1930. ORB orb = null;
  1931. // This flag is to deal with BootstrapServer use of reply streams,
  1932. // with ServerRequestDispatcher's use of reply streams, etc.
  1933. if (messageMediator.executeReturnServantInResponseConstructor()) {
  1934. // It is possible to get marshaling errors in the skeleton after
  1935. // postinvoke has completed. We must set this to false so that
  1936. // when the error exception reply is constructed we don't try
  1937. // to incorrectly access poa current (which will be the wrong
  1938. // one or an empty stack.
  1939. messageMediator.setExecuteReturnServantInResponseConstructor(false);
  1940. messageMediator.setExecuteRemoveThreadInfoInResponseConstructor(true);
  1941. try {
  1942. orb = (ORB)messageMediator.getBroker();
  1943. OAInvocationInfo info = orb.peekInvocationInfo() ;
  1944. ObjectAdapter oa = info.oa();
  1945. try {
  1946. oa.returnServant() ;
  1947. } catch (Throwable thr) {
  1948. wrapper.unexpectedException( thr ) ;
  1949. if (thr instanceof Error)
  1950. throw (Error)thr ;
  1951. else if (thr instanceof RuntimeException)
  1952. throw (RuntimeException)thr ;
  1953. } finally {
  1954. oa.exit();
  1955. }
  1956. } catch (EmptyStackException ese) {
  1957. throw wrapper.emptyStackRunServantPostInvoke( ese ) ;
  1958. }
  1959. }
  1960. }
  1961. protected void runInterceptors(CorbaMessageMediator messageMediator,
  1962. ReplyMessage reply)
  1963. {
  1964. if( messageMediator.executePIInResponseConstructor() ) {
  1965. // Invoke server request ending interception points (send_*):
  1966. // Note: this may end up with a SystemException or an internal
  1967. // Runtime ForwardRequest
  1968. ((ORB)messageMediator.getBroker()).getPIHandler().
  1969. invokeServerPIEndingPoint( reply );
  1970. // Note this will be executed even if a ForwardRequest or
  1971. // SystemException is thrown by a Portable Interceptors ending
  1972. // point since we end up in this constructor again anyway.
  1973. ((ORB)messageMediator.getBroker()).getPIHandler().
  1974. cleanupServerPIRequest();
  1975. // See createSystemExceptionResponse for why this is necesary.
  1976. messageMediator.setExecutePIInResponseConstructor(false);
  1977. }
  1978. }
  1979. protected void runRemoveThreadInfo(CorbaMessageMediator messageMediator)
  1980. {
  1981. // Once you get here then the final reply is available (i.e.,
  1982. // postinvoke and interceptors have completed.
  1983. if (messageMediator.executeRemoveThreadInfoInResponseConstructor()) {
  1984. messageMediator.setExecuteRemoveThreadInfoInResponseConstructor(false);
  1985. ((ORB)messageMediator.getBroker()).popInvocationInfo() ;
  1986. }
  1987. }
  1988. protected ServiceContexts getServiceContextsForReply(
  1989. CorbaMessageMediator messageMediator, ServiceContexts contexts)
  1990. {
  1991. CorbaConnection c = (CorbaConnection) messageMediator.getConnection();
  1992. if (((ORB)messageMediator.getBroker()).subcontractDebugFlag) {
  1993. dprint(".getServiceContextsForReply: "
  1994. + opAndId(messageMediator)
  1995. + ": " + c);
  1996. }
  1997. if (contexts == null) {
  1998. contexts = new ServiceContexts(((ORB)messageMediator.getBroker()));
  1999. }
  2000. // NOTE : We only want to send the runtime context the first time
  2001. if (c != null && !c.isPostInitialContexts()) {
  2002. c.setPostInitialContexts();
  2003. SendingContextServiceContext scsc =
  2004. new SendingContextServiceContext(
  2005. ((ORB)messageMediator.getBroker()).getFVDCodeBaseIOR()) ;
  2006. if (contexts.get( scsc.getId() ) != null)
  2007. throw wrapper.duplicateSendingContextServiceContext() ;
  2008. contexts.put( scsc ) ;
  2009. if ( ((ORB)messageMediator.getBroker()).subcontractDebugFlag)
  2010. dprint(".getServiceContextsForReply: "
  2011. + opAndId(messageMediator)
  2012. + ": added SendingContextServiceContext" ) ;
  2013. }
  2014. // send ORBVersion servicecontext as part of the Reply
  2015. ORBVersionServiceContext ovsc
  2016. = new ORBVersionServiceContext(ORBVersionFactory.getORBVersion());
  2017. if (contexts.get( ovsc.getId() ) != null)
  2018. throw wrapper.duplicateOrbVersionServiceContext() ;
  2019. contexts.put( ovsc ) ;
  2020. if ( ((ORB)messageMediator.getBroker()).subcontractDebugFlag)
  2021. dprint(".getServiceContextsForReply: "
  2022. + opAndId(messageMediator)
  2023. + ": added ORB version service context");
  2024. return contexts;
  2025. }
  2026. // REVISIT - this method should be migrated to orbutil.ORBUtility
  2027. // since all locations that release ByteBuffers use
  2028. // very similar logic and debug information.
  2029. private void releaseByteBufferToPool() {
  2030. if (dispatchByteBuffer != null) {
  2031. orb.getByteBufferPool().releaseByteBuffer(dispatchByteBuffer);
  2032. if (transportDebug()) {
  2033. int bbId = System.identityHashCode(dispatchByteBuffer);
  2034. StringBuffer sb = new StringBuffer();
  2035. sb.append(".handleInput: releasing ByteBuffer (" + bbId +
  2036. ") to ByteBufferPool");
  2037. dprint(sb.toString());
  2038. }
  2039. }
  2040. }
  2041. }
  2042. // End of file.