1. /*
  2. * @(#)CorbaResponseWaitingRoomImpl.java 1.29 04/03/01
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package com.sun.corba.se.impl.transport;
  8. import java.util.Hashtable;
  9. import org.omg.CORBA.CompletionStatus;
  10. import org.omg.CORBA.SystemException;
  11. import com.sun.corba.se.pept.encoding.InputObject;
  12. import com.sun.corba.se.pept.encoding.OutputObject;
  13. import com.sun.corba.se.pept.protocol.MessageMediator;
  14. import com.sun.corba.se.spi.logging.CORBALogDomains;
  15. import com.sun.corba.se.spi.orb.ORB;
  16. import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
  17. import com.sun.corba.se.spi.transport.CorbaConnection;
  18. import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
  19. import com.sun.corba.se.impl.encoding.BufferManagerReadStream;
  20. import com.sun.corba.se.impl.encoding.CDRInputObject;
  21. import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  22. import com.sun.corba.se.impl.orbutil.ORBUtility;
  23. import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage;
  24. import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage;
  25. /**
  26. * @author Harold Carr
  27. */
  28. public class CorbaResponseWaitingRoomImpl
  29. implements
  30. CorbaResponseWaitingRoom
  31. {
  32. final static class OutCallDesc
  33. {
  34. java.lang.Object done = new java.lang.Object();
  35. Thread thread;
  36. MessageMediator messageMediator;
  37. SystemException exception;
  38. InputObject inputObject;
  39. }
  40. private ORB orb;
  41. private ORBUtilSystemException wrapper ;
  42. private CorbaConnection connection;
  43. // Maps requestId to an OutCallDesc.
  44. private Hashtable out_calls = null; // REVISIT - use int hastable/map
  45. public CorbaResponseWaitingRoomImpl(ORB orb, CorbaConnection connection)
  46. {
  47. this.orb = orb;
  48. wrapper = ORBUtilSystemException.get( orb,
  49. CORBALogDomains.RPC_TRANSPORT ) ;
  50. this.connection = connection;
  51. out_calls = new Hashtable();
  52. }
  53. ////////////////////////////////////////////////////
  54. //
  55. // pept.transport.ResponseWaitingRoom
  56. //
  57. public void registerWaiter(MessageMediator mediator)
  58. {
  59. CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
  60. if (orb.transportDebugFlag) {
  61. dprint(".registerWaiter: " + opAndId(messageMediator));
  62. }
  63. Integer requestId = messageMediator.getRequestIdInteger();
  64. OutCallDesc call = new OutCallDesc();
  65. call.thread = Thread.currentThread();
  66. call.messageMediator = messageMediator;
  67. out_calls.put(requestId, call);
  68. }
  69. public void unregisterWaiter(MessageMediator mediator)
  70. {
  71. CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
  72. if (orb.transportDebugFlag) {
  73. dprint(".unregisterWaiter: " + opAndId(messageMediator));
  74. }
  75. Integer requestId = messageMediator.getRequestIdInteger();
  76. out_calls.remove(requestId);
  77. }
  78. public InputObject waitForResponse(MessageMediator mediator)
  79. {
  80. CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
  81. try {
  82. InputObject returnStream = null;
  83. if (orb.transportDebugFlag) {
  84. dprint(".waitForResponse->: " + opAndId(messageMediator));
  85. }
  86. Integer requestId = messageMediator.getRequestIdInteger();
  87. if (messageMediator.isOneWay()) {
  88. // The waiter is removed in releaseReply in the same
  89. // way as a normal request.
  90. if (orb.transportDebugFlag) {
  91. dprint(".waitForResponse: one way - not waiting: "
  92. + opAndId(messageMediator));
  93. }
  94. return null;
  95. }
  96. OutCallDesc call = (OutCallDesc)out_calls.get(requestId);
  97. if (call == null) {
  98. throw wrapper.nullOutCall(CompletionStatus.COMPLETED_MAYBE);
  99. }
  100. synchronized(call.done) {
  101. while (call.inputObject == null && call.exception == null) {
  102. // Wait for the reply from the server.
  103. // The ReaderThread reads in the reply IIOP message
  104. // and signals us.
  105. try {
  106. if (orb.transportDebugFlag) {
  107. dprint(".waitForResponse: waiting: "
  108. + opAndId(messageMediator));
  109. }
  110. call.done.wait();
  111. } catch (InterruptedException ie) {};
  112. }
  113. if (call.exception != null) {
  114. if (orb.transportDebugFlag) {
  115. dprint(".waitForResponse: exception: "
  116. + opAndId(messageMediator));
  117. }
  118. throw call.exception;
  119. }
  120. returnStream = call.inputObject;
  121. }
  122. // REVISIT -- exceptions from unmarshaling code will
  123. // go up through this client thread!
  124. if (returnStream != null) {
  125. // On fragmented streams the header MUST be unmarshaled here
  126. // (in the client thread) in case it blocks.
  127. // If the header was already unmarshaled, this won't
  128. // do anything
  129. // REVISIT: cast - need interface method.
  130. ((CDRInputObject)returnStream).unmarshalHeader();
  131. }
  132. return returnStream;
  133. } finally {
  134. if (orb.transportDebugFlag) {
  135. dprint(".waitForResponse<-: " + opAndId(messageMediator));
  136. }
  137. }
  138. }
  139. public void responseReceived(InputObject is)
  140. {
  141. CDRInputObject inputObject = (CDRInputObject) is;
  142. LocateReplyOrReplyMessage header = (LocateReplyOrReplyMessage)
  143. inputObject.getMessageHeader();
  144. Integer requestId = new Integer(header.getRequestId());
  145. OutCallDesc call = (OutCallDesc) out_calls.get(requestId);
  146. if (orb.transportDebugFlag) {
  147. dprint(".responseReceived: id/"
  148. + requestId + ": "
  149. + header);
  150. }
  151. // This is an interesting case. It could mean that someone sent us a
  152. // reply message, but we don't know what request it was for. That
  153. // would probably call for an error. However, there's another case
  154. // that's normal and we should think about --
  155. //
  156. // If the unmarshaling thread does all of its work inbetween the time
  157. // the ReaderThread gives it the last fragment and gets to the
  158. // out_calls.get line, then it will also be null, so just return;
  159. if (call == null) {
  160. if (orb.transportDebugFlag) {
  161. dprint(".responseReceived: id/"
  162. + requestId
  163. + ": no waiter: "
  164. + header);
  165. }
  166. return;
  167. }
  168. // Set the reply InputObject and signal the client thread
  169. // that the reply has been received.
  170. // The thread signalled will remove outcall descriptor if appropriate.
  171. // Otherwise, it'll be removed when last fragment for it has been put on
  172. // BufferManagerRead's queue.
  173. synchronized (call.done) {
  174. CorbaMessageMediator messageMediator = (CorbaMessageMediator)
  175. call.messageMediator;
  176. if (orb.transportDebugFlag) {
  177. dprint(".responseReceived: "
  178. + opAndId(messageMediator)
  179. + ": notifying waiters");
  180. }
  181. messageMediator.setReplyHeader(header);
  182. messageMediator.setInputObject(is);
  183. inputObject.setMessageMediator(messageMediator);
  184. call.inputObject = is;
  185. call.done.notify();
  186. }
  187. }
  188. public int numberRegistered()
  189. {
  190. // Note: Hashtable.size() is not synchronized
  191. return out_calls.size();
  192. }
  193. //////////////////////////////////////////////////
  194. //
  195. // CorbaResponseWaitingRoom
  196. //
  197. public void signalExceptionToAllWaiters(SystemException systemException)
  198. {
  199. if (orb.transportDebugFlag) {
  200. dprint(".signalExceptionToAllWaiters: " + systemException);
  201. }
  202. OutCallDesc call;
  203. java.util.Enumeration e = out_calls.elements();
  204. while(e.hasMoreElements()) {
  205. call = (OutCallDesc) e.nextElement();
  206. synchronized(call.done){
  207. // anything waiting for BufferManagerRead's fragment queue
  208. // needs to be cancelled
  209. CorbaMessageMediator corbaMsgMediator =
  210. (CorbaMessageMediator)call.messageMediator;
  211. CDRInputObject inputObject =
  212. (CDRInputObject)corbaMsgMediator.getInputObject();
  213. // IMPORTANT: If inputObject is null, then no need to tell
  214. // BufferManagerRead to cancel request processing.
  215. if (inputObject != null) {
  216. BufferManagerReadStream bufferManager =
  217. (BufferManagerReadStream)inputObject.getBufferManager();
  218. int requestId = corbaMsgMediator.getRequestId();
  219. bufferManager.cancelProcessing(requestId);
  220. }
  221. call.inputObject = null;
  222. call.exception = systemException;
  223. call.done.notify();
  224. }
  225. }
  226. }
  227. public MessageMediator getMessageMediator(int requestId)
  228. {
  229. Integer id = new Integer(requestId);
  230. OutCallDesc call = (OutCallDesc) out_calls.get(id);
  231. if (call == null) {
  232. // This can happen when getting early reply fragments for a
  233. // request which has completed (e.g., client marshaling error).
  234. return null;
  235. }
  236. return call.messageMediator;
  237. }
  238. ////////////////////////////////////////////////////
  239. //
  240. // Implementation.
  241. //
  242. protected void dprint(String msg)
  243. {
  244. ORBUtility.dprint("CorbaResponseWaitingRoomImpl", msg);
  245. }
  246. protected String opAndId(CorbaMessageMediator mediator)
  247. {
  248. return ORBUtility.operationNameAndRequestId(mediator);
  249. }
  250. }
  251. // End of file.