1. /*
  2. * @(#)MessageMediator.java 1.13 03/01/23
  3. *
  4. * Copyright 2003 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package com.sun.corba.se.internal.iiop;
  8. import java.io.IOException;
  9. // to force 1.3 compiler to pickup the newer location
  10. // - remove once we move to JDK1.4
  11. import com.sun.corba.se.internal.iiop.messages.Message;
  12. import com.sun.corba.se.internal.iiop.messages.*;
  13. import com.sun.corba.se.internal.orbutil.MinorCodes;
  14. import com.sun.corba.se.internal.orbutil.ORBUtility;
  15. import com.sun.corba.se.internal.iiop.IIOPConnection.OutCallDesc;
  16. import org.omg.CORBA.INTERNAL;
  17. import org.omg.CORBA.CompletionStatus;
  18. import com.sun.corba.se.internal.orbutil.ThreadPool;
  19. import com.sun.corba.se.internal.orbutil.Work;
  20. /**
  21. * Provides processing for messages without if/switch overhead for versions
  22. * by using polymorphism and callbacks (double dispatch pattern). Separates
  23. * IIOPConnection from the workings of the iiop/message system and vice
  24. * versa. Makes it easy to see the behavior for a given {msg type, version}
  25. * combination.
  26. *
  27. * A single thread (the ReaderThread) accesses a MessageMediator instance.
  28. */
  29. public final class MessageMediator
  30. {
  31. /**
  32. * With GIOP 1.1 replies, we have no guarantee that the request ID
  33. * was in the first fragment, so we can't signal the client thread
  34. * to wake up and unmarshal the extended header. Thus, we must
  35. * use another thread to do it.
  36. */
  37. private static class ReplyProcessor_1_1 implements Work
  38. {
  39. private static final String name = "ReplyProcessor 1.1";
  40. private IIOPConnection conn;
  41. private IIOPInputStream reply;
  42. ReplyProcessor_1_1(IIOPConnection conn, IIOPInputStream reply) {
  43. this.conn = conn;
  44. this.reply = reply;
  45. }
  46. public final String getName() {
  47. return name;
  48. }
  49. public void process()
  50. {
  51. // Needs error handling
  52. reply.unmarshalHeader();
  53. ReplyMessage msg = (ReplyMessage)reply.getMessage();
  54. conn.signalReplyReceived(msg.getRequestId(),
  55. reply);
  56. }
  57. }
  58. public MessageMediator(IIOPConnection conn)
  59. {
  60. this.conn = conn;
  61. }
  62. // This could be a singleton, but that would lead to more
  63. // parameter passing. As is, there will be one instance per
  64. // IIOPConnection.
  65. private IIOPConnection conn;
  66. private byte[] buf;
  67. // If this weren't static, there wouldn't be any synchronization overhead
  68. // when threads on different connections fight for access. The downside
  69. // would be that cached threads would only be available per connection.
  70. // Since there isn't a scheme for expiring cached threads, yet, I've
  71. // made it static. -eea1 REVISIT
  72. //
  73. // You can argue this should be in IIOPConnection;
  74. private final static ThreadPool threadPool = new ThreadPool();
  75. private void dprint( String msg ) {
  76. ORBUtility.dprint( this, msg ) ;
  77. }
  78. /**
  79. * Create the appropriate message type, allocate a byte buffer of the
  80. * appropriate size, read in the message, and use the callback on the
  81. * message object to do the processing.
  82. */
  83. public final void processRequest()
  84. throws IOException
  85. {
  86. if (conn.getORB().transportDebugFlag)
  87. dprint("Creating message from stream");
  88. // Read in the message header and create the appropriate type
  89. // of IIOP message.
  90. MessageBase msg = (MessageBase)MessageBase.createFromStream(conn.getORB(),
  91. conn.getInputStream());
  92. // Create a buffer of the correct size. We don't even have
  93. // to copy the GIOP header into it since we'll never look
  94. // at the bytes again.
  95. this.buf = new byte[msg.getSize()];
  96. if (conn.getORB().transportDebugFlag)
  97. dprint("Reading the message fully, size =" + msg.getSize());
  98. // Read all the data into the buffer
  99. MessageBase.readFully(conn.getInputStream(),
  100. buf,
  101. MessageBase.GIOPMessageHeaderLength,
  102. msg.getSize() - MessageBase.GIOPMessageHeaderLength);
  103. if (conn.getORB().giopDebugFlag) {
  104. // For debugging purposes, copy the 12 bytes of the
  105. // GIOP header in to the main buffer
  106. System.arraycopy(msg.giopHeader, 0,
  107. this.buf, 0,
  108. 12);
  109. dprint("Received message:");
  110. ByteBufferWithInfo bbwi = new ByteBufferWithInfo(this.buf, 0);
  111. bbwi.buflen = msg.getSize();
  112. CDRInputStream_1_0.printBuffer(bbwi);
  113. }
  114. // Ask the message to call back to the mediator to handle
  115. // the request. The mediator does the appropriate thing
  116. // based on the message.
  117. msg.callback(this);
  118. }
  119. // (Currently this handles message types that we don't create classes for)
  120. public final void handleInput(MessageBase header)
  121. {
  122. if (conn.getORB().transportDebugFlag)
  123. dprint("Handling other GIOP message: " + header.getType());
  124. switch(header.getType())
  125. {
  126. case Message.GIOPCloseConnection:
  127. if (conn.getORB().transportDebugFlag)
  128. dprint("Connection.processInput: got CloseConn, purging");
  129. conn.purge_calls(Connection.CONN_REBIND, true, false);
  130. break;
  131. case Message.GIOPMessageError:
  132. if (conn.getORB().transportDebugFlag)
  133. dprint("Received MessageError, purging");
  134. conn.purge_calls(MinorCodes.RECV_MSG_ERROR, true, false);
  135. break;
  136. default:
  137. if (conn.getORB().transportDebugFlag)
  138. dprint("Connection: bad message type" + header.getType());
  139. throw new INTERNAL(MinorCodes.BAD_GIOP_REQUEST_TYPE,
  140. CompletionStatus.COMPLETED_NO);
  141. }
  142. }
  143. // Request messages -----------------------------
  144. public final void handleInput(RequestMessage_1_0 header)
  145. throws IOException
  146. {
  147. if (conn.getORB().transportDebugFlag)
  148. dprint("Handling GIOP 1.0 request");
  149. IIOPInputStream is = new ServerRequestImpl(conn, buf, header);
  150. threadPool.addWork(new RequestProcessor(conn.getServerGIOP().getRequestHandler(),
  151. conn,
  152. is));
  153. }
  154. public final void handleInput(RequestMessage_1_1 header)
  155. throws IOException
  156. {
  157. if (conn.getORB().transportDebugFlag)
  158. dprint("Handling GIOP 1.1 request");
  159. IIOPInputStream is = new ServerRequestImpl(conn, buf, header);
  160. // More fragments are coming to complete this request message
  161. // add stream to the serverRequestMap
  162. if (header.moreFragmentsToFollow())
  163. conn.theOnly1_1ServerRequestImpl = (ServerRequestImpl)is;
  164. threadPool.addWork(new RequestProcessor(conn.getServerGIOP().getRequestHandler(),
  165. conn,
  166. is));
  167. }
  168. public final void handleInput(RequestMessage_1_2 header)
  169. throws IOException
  170. {
  171. if (conn.getORB().transportDebugFlag)
  172. dprint("Handling GIOP 1.2 request");
  173. IIOPInputStream is = new ServerRequestImpl(conn, buf, header);
  174. header.unmarshalRequestID(buf);
  175. // More fragments are coming to complete this request message
  176. // add stream to the serverRequestMap
  177. if (header.moreFragmentsToFollow())
  178. conn.serverRequestMap.put(new Integer(header.getRequestId()), is);
  179. threadPool.addWork(new RequestProcessor(conn.getServerGIOP().getRequestHandler(),
  180. conn,
  181. is));
  182. }
  183. // Reply messages ---------------------------------
  184. public final void handleInput(ReplyMessage_1_0 header)
  185. throws IOException
  186. {
  187. if (conn.getORB().transportDebugFlag)
  188. dprint("Handling GIOP 1.0 reply");
  189. IIOPInputStream is = new ClientResponseImpl(conn, buf, header);
  190. is.unmarshalHeader();
  191. conn.signalReplyReceived(header.getRequestId(), is);
  192. }
  193. public final void handleInput(ReplyMessage_1_1 header)
  194. throws IOException
  195. {
  196. if (conn.getORB().transportDebugFlag)
  197. dprint("Handling GIOP 1.1 reply");
  198. IIOPInputStream is = new ClientResponseImpl(conn, buf, header);
  199. // More fragments are coming to complete this reply, so keep
  200. // a reference to the InputStream so we can add the fragments
  201. if (header.moreFragmentsToFollow()) {
  202. conn.theOnly1_1ClientResponseImpl = (ClientResponseImpl)is;
  203. // In 1.1, we can't assume that we have the request ID in the
  204. // first fragment. Thus, another thread is used to unmarshal
  205. // the extended header and wake up the client thread.
  206. threadPool.addWork(new ReplyProcessor_1_1(conn, is));
  207. } else {
  208. // If this is the only fragment, then we know the request
  209. // ID is here. Thus, we can unmarshal the extended header
  210. // and wake up the client thread without using a third
  211. // thread as above.
  212. is.unmarshalHeader();
  213. conn.signalReplyReceived(header.getRequestId(), is);
  214. }
  215. }
  216. public final void handleInput(ReplyMessage_1_2 header)
  217. throws IOException
  218. {
  219. if (conn.getORB().transportDebugFlag)
  220. dprint("Handling GIOP 1.2 reply");
  221. IIOPInputStream is = new ClientResponseImpl(conn, buf, header);
  222. // We know that the request ID is in the first fragment
  223. header.unmarshalRequestID(buf);
  224. // More fragments are coming to complete this reply, so keep
  225. // a reference to the InputStream so we can add the fragments
  226. if (header.moreFragmentsToFollow())
  227. conn.clientReplyMap.put(new Integer(header.getRequestId()), is);
  228. conn.signalReplyReceived(header.getRequestId(), is);
  229. }
  230. // Locate request messages ------------------------
  231. // Versions 1.0 and 1.1 cannot be fragmented, so the implementation can be
  232. // the same here.
  233. public final void handleInput(LocateRequestMessage_1_0 header)
  234. throws IOException
  235. {
  236. if (conn.getORB().transportDebugFlag)
  237. dprint("Handling GIOP 1.0 LocateRequest");
  238. IIOPInputStream is = new IIOPInputStream(conn, buf, header);
  239. threadPool.addWork(new RequestProcessor(conn.getServerGIOP().getRequestHandler(),
  240. conn,
  241. is));
  242. }
  243. public final void handleInput(LocateRequestMessage_1_1 header)
  244. throws IOException
  245. {
  246. if (conn.getORB().transportDebugFlag)
  247. dprint("Handling GIOP 1.1 LocateRequest");
  248. IIOPInputStream is = new IIOPInputStream(conn, buf, header);
  249. threadPool.addWork(new RequestProcessor(conn.getServerGIOP().getRequestHandler(),
  250. conn,
  251. is));
  252. }
  253. public final void handleInput(LocateRequestMessage_1_2 header)
  254. throws IOException
  255. {
  256. if (conn.getORB().transportDebugFlag)
  257. dprint("Handling GIOP 1.2 LocateRequest");
  258. IIOPInputStream is = new IIOPInputStream(conn, buf, header);
  259. header.unmarshalRequestID(buf);
  260. // More fragments are coming to complete this request message
  261. // add stream to the serverRequestMap
  262. if (header.moreFragmentsToFollow())
  263. conn.serverRequestMap.put(new Integer(header.getRequestId()), is);
  264. threadPool.addWork(new RequestProcessor(conn.getServerGIOP().getRequestHandler(),
  265. conn,
  266. is));
  267. }
  268. // Locate reply messages ------------------------
  269. public final void handleInput(LocateReplyMessage_1_0 header)
  270. throws IOException
  271. {
  272. if (conn.getORB().transportDebugFlag)
  273. dprint("Handling GIOP 1.0 LocateReply");
  274. IIOPInputStream is = new IIOPInputStream(conn, buf, header);
  275. is.unmarshalHeader();
  276. conn.signalReplyReceived(header.getRequestId(), is);
  277. }
  278. public final void handleInput(LocateReplyMessage_1_1 header)
  279. throws IOException
  280. {
  281. if (conn.getORB().transportDebugFlag)
  282. dprint("Handling GIOP 1.1 LocateReply");
  283. IIOPInputStream is = new IIOPInputStream(conn, buf, header);
  284. is.unmarshalHeader();
  285. // Fragmented LocateReplies are not allowed in 1.1
  286. conn.signalReplyReceived(header.getRequestId(), is);
  287. }
  288. public final void handleInput(LocateReplyMessage_1_2 header)
  289. throws IOException
  290. {
  291. if (conn.getORB().transportDebugFlag)
  292. dprint("Handling GIOP 1.2 LocateReply");
  293. IIOPInputStream is = new IIOPInputStream(conn, buf, header);
  294. header.unmarshalRequestID(buf);
  295. // More fragments are coming to complete this reply, so keep
  296. // a reference to the InputStream so we can add the fragments
  297. if (header.moreFragmentsToFollow())
  298. conn.clientReplyMap.put(new Integer(header.getRequestId()), is);
  299. conn.signalReplyReceived(header.getRequestId(), is);
  300. }
  301. // Fragment messages ----------------------------
  302. public final void handleInput(FragmentMessage_1_1 header)
  303. throws IOException
  304. {
  305. if (conn.getORB().transportDebugFlag)
  306. dprint("Handling GIOP 1.1 Fragment. Last? " + header.moreFragmentsToFollow());
  307. IIOPInputStream is;
  308. if (conn.isServer())
  309. is = conn.theOnly1_1ServerRequestImpl;
  310. else
  311. is = conn.theOnly1_1ClientResponseImpl;
  312. // if there is no inputstream available, then discard the message
  313. // fragment. This can happen
  314. // 1. if a fragment message is received prior
  315. // to receiving the original request/reply message. Very unlikely.
  316. // 2. if a fragment message is received after the reply has been sent
  317. // (early replies)
  318. // Note: In the case of early replies, the fragments received during
  319. // the request processing (which are never unmarshaled), will eventually
  320. // be discarded by the GC.
  321. if (is == null) {
  322. return;
  323. }
  324. is.getBufferManager().processFragment(buf, header);
  325. if (!conn.isServer()) {
  326. // Is it a last fragment of a reply ?
  327. if (!header.moreFragmentsToFollow()) {
  328. // It is not the responsibility of this thread to remove
  329. // the OutCallDesc from out_calls -- only the client thread
  330. // should do that.
  331. conn.theOnly1_1ClientResponseImpl = null;
  332. }
  333. }
  334. }
  335. public final void handleInput(FragmentMessage_1_2 header)
  336. throws IOException
  337. {
  338. if (conn.getORB().transportDebugFlag)
  339. dprint("Handling GIOP 1.2 Fragment. Last? " + header.moreFragmentsToFollow());
  340. // Unusual paradox: We know it's a 1.2 fragment, we have the
  341. // data, but we need the IIOPInputStream instance to unmarshal the
  342. // request ID... but we need the request ID to get the IIOPInputStream
  343. // instance.
  344. header.unmarshalRequestID(buf);
  345. Integer requestId = new Integer(header.getRequestId());
  346. IIOPInputStream is;
  347. if (conn.isServer())
  348. is = (IIOPInputStream)conn.serverRequestMap.get(requestId);
  349. else
  350. is = (IIOPInputStream)conn.clientReplyMap.get(requestId);
  351. if (is == null) {
  352. return;
  353. }
  354. is.getBufferManager().processFragment(buf, header);
  355. if (!conn.isServer()) {
  356. // Is it a last fragment of a reply?
  357. if (!header.moreFragmentsToFollow()) {
  358. // It is not the responsibility of this thread to remove
  359. // the OutCallDesc from out_calls -- only the client thread
  360. // should do that.
  361. conn.clientReplyMap.remove(requestId);
  362. }
  363. }
  364. }
  365. // Cancel request messages -----------------------
  366. private final void processCancelRequest(int cancelReqId) {
  367. // The GIOP version of CancelRequest does not matter, since
  368. // CancelRequest_1_0 could be sent to cancel a request which
  369. // has a different GIOP version.
  370. /*
  371. * CancelRequest processing logic :
  372. *
  373. * - find the request with matching requestId
  374. *
  375. * - call cancelProcessing() in BufferManagerRead [BMR]
  376. *
  377. * - the hope is that worker thread would call BMR.underflow()
  378. * to wait for more fragments to come in. When BMR.underflow() is
  379. * called, if a CancelRequest had already arrived,
  380. * the worker thread would throw ThreadDeath,
  381. * else the thread would wait to be notified of the
  382. * arrival of a new fragment or CancelRequest. Upon notification,
  383. * the woken up thread would check to see if a CancelRequest had
  384. * arrived and if so throw a ThreadDeath or it will continue to
  385. * process the received fragment.
  386. *
  387. * - if all the fragments had been received prior to CancelRequest
  388. * then the worker thread would never block in BMR.underflow().
  389. * So, setting the abort flag in BMR has no effect. The request
  390. * processing will complete normally.
  391. *
  392. * - in the case where the server has received enough fragments to
  393. * start processing the request and the server sends out
  394. * an early reply. In such a case if the CancelRequest arrives
  395. * after the reply has been sent, it has no effect.
  396. */
  397. if (!conn.isServer()) {
  398. return; // we do not support bi-directional giop yet, ignore.
  399. }
  400. // Try to get hold of the InputStream buffer.
  401. // In the case of 1.0 requests there is no way to get hold of
  402. // InputStream. Try out the 1.1 and 1.2 cases.
  403. // was the request 1.2 ?
  404. IIOPInputStream is = (IIOPInputStream) conn.serverRequestMap.get(
  405. new Integer(cancelReqId));
  406. if (is == null) { // was the request 1.1 ?
  407. is = conn.theOnly1_1ServerRequestImpl;
  408. if (is == null) {
  409. // either the request was 1.0
  410. // or an early reply has already been sent
  411. // or request processing is over
  412. // or its a spurious CancelRequest
  413. return; // do nothing.
  414. }
  415. Message msg = is.getMessage();
  416. if (msg.getType() != Message.GIOPRequest) {
  417. // this should not be true. Fragmented 1.1 messages can
  418. // only be request messages.
  419. return; // do nothing
  420. }
  421. int requestId = ((RequestMessage) msg).getRequestId();
  422. if (requestId == 0) { // special case
  423. // this means that
  424. // 1. the 1.1 requests' requestId has not been received
  425. // i.e., a CancelRequest was received even before the
  426. // 1.1 request was received. The spec disallows this.
  427. // 2. or the 1.1 request has a requestId 0.
  428. //
  429. // It is a little tricky to distinguish these two. So, be
  430. // conservative and do not cancel the request. Downside is that
  431. // 1.1 requests with requestId of 0 will never be cancelled.
  432. return; // do nothing
  433. }
  434. // at this point we do have a valid requestId for the 1.1 request
  435. if (requestId != cancelReqId) {
  436. // A spurious CancelRequest has been received.
  437. return; // do nothing
  438. }
  439. }
  440. // at this point we have chosen a request to be cancelled. But we
  441. // do not know if the target object's method has been invoked or not.
  442. // Request input stream being available simply means that the request
  443. // processing is not over yet. simply set the abort flag in the
  444. // BMRS and hope that the worker thread would notice it (this can
  445. // happen only if the request stream is being unmarshalled and the
  446. // target's method has not been invoked yet). This guarantees
  447. // that the requests which have been dispatched to the
  448. // target's method will never be cancelled.
  449. BufferManagerReadStream bufferManager = (BufferManagerReadStream)
  450. is.getBufferManager();
  451. bufferManager.cancelProcessing(cancelReqId);
  452. }
  453. // Currently the same for all versions, but separate methods since
  454. // CancelRequestMessage is an interface.
  455. public final void handleInput(CancelRequestMessage_1_0 header)
  456. throws IOException
  457. {
  458. if (conn.getORB().transportDebugFlag) {
  459. dprint("Handling GIOP 1.0 CancelRequest");
  460. }
  461. IIOPInputStream is = new IIOPInputStream(conn, buf, header);
  462. header.read(is);
  463. this.processCancelRequest(header.getRequestId());
  464. }
  465. public final void handleInput(CancelRequestMessage_1_1 header)
  466. throws IOException
  467. {
  468. if (conn.getORB().transportDebugFlag) {
  469. dprint("Handling GIOP 1.1 CancelRequest");
  470. }
  471. IIOPInputStream is = new IIOPInputStream(conn, buf, header);
  472. header.read(is);
  473. this.processCancelRequest(header.getRequestId());
  474. }
  475. public final void handleInput(CancelRequestMessage_1_2 header)
  476. throws IOException
  477. {
  478. if (conn.getORB().transportDebugFlag) {
  479. dprint("Handling GIOP 1.2 CancelRequest");
  480. }
  481. IIOPInputStream is = new IIOPInputStream(conn, buf, header);
  482. header.read(is);
  483. this.processCancelRequest(header.getRequestId());
  484. }
  485. }