- /*
- * @(#)CorbaResponseWaitingRoomImpl.java 1.29 04/03/01
- *
- * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
- * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
- */
-
- package com.sun.corba.se.impl.transport;
-
- import java.util.Hashtable;
-
- import org.omg.CORBA.CompletionStatus;
- import org.omg.CORBA.SystemException;
-
- import com.sun.corba.se.pept.encoding.InputObject;
- import com.sun.corba.se.pept.encoding.OutputObject;
- import com.sun.corba.se.pept.protocol.MessageMediator;
-
- import com.sun.corba.se.spi.logging.CORBALogDomains;
- import com.sun.corba.se.spi.orb.ORB;
- import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
- import com.sun.corba.se.spi.transport.CorbaConnection;
- import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
-
- import com.sun.corba.se.impl.encoding.BufferManagerReadStream;
- import com.sun.corba.se.impl.encoding.CDRInputObject;
- import com.sun.corba.se.impl.logging.ORBUtilSystemException;
- import com.sun.corba.se.impl.orbutil.ORBUtility;
- import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage;
- import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage;
-
- /**
- * @author Harold Carr
- */
- public class CorbaResponseWaitingRoomImpl
- implements
- CorbaResponseWaitingRoom
- {
- final static class OutCallDesc
- {
- java.lang.Object done = new java.lang.Object();
- Thread thread;
- MessageMediator messageMediator;
- SystemException exception;
- InputObject inputObject;
- }
-
- private ORB orb;
- private ORBUtilSystemException wrapper ;
-
- private CorbaConnection connection;
- // Maps requestId to an OutCallDesc.
- private Hashtable out_calls = null; // REVISIT - use int hastable/map
-
- public CorbaResponseWaitingRoomImpl(ORB orb, CorbaConnection connection)
- {
- this.orb = orb;
- wrapper = ORBUtilSystemException.get( orb,
- CORBALogDomains.RPC_TRANSPORT ) ;
- this.connection = connection;
- out_calls = new Hashtable();
- }
-
- ////////////////////////////////////////////////////
- //
- // pept.transport.ResponseWaitingRoom
- //
-
- public void registerWaiter(MessageMediator mediator)
- {
- CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
-
- if (orb.transportDebugFlag) {
- dprint(".registerWaiter: " + opAndId(messageMediator));
- }
-
- Integer requestId = messageMediator.getRequestIdInteger();
-
- OutCallDesc call = new OutCallDesc();
- call.thread = Thread.currentThread();
- call.messageMediator = messageMediator;
- out_calls.put(requestId, call);
- }
-
- public void unregisterWaiter(MessageMediator mediator)
- {
- CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
-
- if (orb.transportDebugFlag) {
- dprint(".unregisterWaiter: " + opAndId(messageMediator));
- }
-
- Integer requestId = messageMediator.getRequestIdInteger();
-
- out_calls.remove(requestId);
- }
-
- public InputObject waitForResponse(MessageMediator mediator)
- {
- CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
-
- try {
-
- InputObject returnStream = null;
-
- if (orb.transportDebugFlag) {
- dprint(".waitForResponse->: " + opAndId(messageMediator));
- }
-
- Integer requestId = messageMediator.getRequestIdInteger();
-
- if (messageMediator.isOneWay()) {
- // The waiter is removed in releaseReply in the same
- // way as a normal request.
-
- if (orb.transportDebugFlag) {
- dprint(".waitForResponse: one way - not waiting: "
- + opAndId(messageMediator));
- }
-
- return null;
- }
-
- OutCallDesc call = (OutCallDesc)out_calls.get(requestId);
- if (call == null) {
- throw wrapper.nullOutCall(CompletionStatus.COMPLETED_MAYBE);
- }
-
- synchronized(call.done) {
-
- while (call.inputObject == null && call.exception == null) {
- // Wait for the reply from the server.
- // The ReaderThread reads in the reply IIOP message
- // and signals us.
- try {
- if (orb.transportDebugFlag) {
- dprint(".waitForResponse: waiting: "
- + opAndId(messageMediator));
- }
- call.done.wait();
- } catch (InterruptedException ie) {};
- }
-
- if (call.exception != null) {
- if (orb.transportDebugFlag) {
- dprint(".waitForResponse: exception: "
- + opAndId(messageMediator));
- }
- throw call.exception;
- }
-
- returnStream = call.inputObject;
- }
-
- // REVISIT -- exceptions from unmarshaling code will
- // go up through this client thread!
-
- if (returnStream != null) {
- // On fragmented streams the header MUST be unmarshaled here
- // (in the client thread) in case it blocks.
- // If the header was already unmarshaled, this won't
- // do anything
- // REVISIT: cast - need interface method.
- ((CDRInputObject)returnStream).unmarshalHeader();
- }
-
- return returnStream;
-
- } finally {
- if (orb.transportDebugFlag) {
- dprint(".waitForResponse<-: " + opAndId(messageMediator));
- }
- }
- }
-
- public void responseReceived(InputObject is)
- {
- CDRInputObject inputObject = (CDRInputObject) is;
- LocateReplyOrReplyMessage header = (LocateReplyOrReplyMessage)
- inputObject.getMessageHeader();
- Integer requestId = new Integer(header.getRequestId());
- OutCallDesc call = (OutCallDesc) out_calls.get(requestId);
-
- if (orb.transportDebugFlag) {
- dprint(".responseReceived: id/"
- + requestId + ": "
- + header);
- }
-
- // This is an interesting case. It could mean that someone sent us a
- // reply message, but we don't know what request it was for. That
- // would probably call for an error. However, there's another case
- // that's normal and we should think about --
- //
- // If the unmarshaling thread does all of its work inbetween the time
- // the ReaderThread gives it the last fragment and gets to the
- // out_calls.get line, then it will also be null, so just return;
- if (call == null) {
- if (orb.transportDebugFlag) {
- dprint(".responseReceived: id/"
- + requestId
- + ": no waiter: "
- + header);
- }
- return;
- }
-
- // Set the reply InputObject and signal the client thread
- // that the reply has been received.
- // The thread signalled will remove outcall descriptor if appropriate.
- // Otherwise, it'll be removed when last fragment for it has been put on
- // BufferManagerRead's queue.
- synchronized (call.done) {
- CorbaMessageMediator messageMediator = (CorbaMessageMediator)
- call.messageMediator;
-
- if (orb.transportDebugFlag) {
- dprint(".responseReceived: "
- + opAndId(messageMediator)
- + ": notifying waiters");
- }
-
- messageMediator.setReplyHeader(header);
- messageMediator.setInputObject(is);
- inputObject.setMessageMediator(messageMediator);
- call.inputObject = is;
- call.done.notify();
- }
- }
-
- public int numberRegistered()
- {
- // Note: Hashtable.size() is not synchronized
- return out_calls.size();
- }
-
- //////////////////////////////////////////////////
- //
- // CorbaResponseWaitingRoom
- //
-
- public void signalExceptionToAllWaiters(SystemException systemException)
- {
-
- if (orb.transportDebugFlag) {
- dprint(".signalExceptionToAllWaiters: " + systemException);
- }
-
- OutCallDesc call;
- java.util.Enumeration e = out_calls.elements();
- while(e.hasMoreElements()) {
- call = (OutCallDesc) e.nextElement();
-
- synchronized(call.done){
- // anything waiting for BufferManagerRead's fragment queue
- // needs to be cancelled
- CorbaMessageMediator corbaMsgMediator =
- (CorbaMessageMediator)call.messageMediator;
- CDRInputObject inputObject =
- (CDRInputObject)corbaMsgMediator.getInputObject();
- // IMPORTANT: If inputObject is null, then no need to tell
- // BufferManagerRead to cancel request processing.
- if (inputObject != null) {
- BufferManagerReadStream bufferManager =
- (BufferManagerReadStream)inputObject.getBufferManager();
- int requestId = corbaMsgMediator.getRequestId();
- bufferManager.cancelProcessing(requestId);
- }
- call.inputObject = null;
- call.exception = systemException;
- call.done.notify();
- }
- }
- }
-
- public MessageMediator getMessageMediator(int requestId)
- {
- Integer id = new Integer(requestId);
- OutCallDesc call = (OutCallDesc) out_calls.get(id);
- if (call == null) {
- // This can happen when getting early reply fragments for a
- // request which has completed (e.g., client marshaling error).
- return null;
- }
- return call.messageMediator;
- }
-
- ////////////////////////////////////////////////////
- //
- // Implementation.
- //
-
- protected void dprint(String msg)
- {
- ORBUtility.dprint("CorbaResponseWaitingRoomImpl", msg);
- }
-
- protected String opAndId(CorbaMessageMediator mediator)
- {
- return ORBUtility.operationNameAndRequestId(mediator);
- }
- }
-
- // End of file.