1. /*
  2. * @(#)BufferManagerReadStream.java 1.13 03/12/19
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package com.sun.corba.se.impl.encoding;
  8. import java.nio.ByteBuffer;
  9. import com.sun.corba.se.pept.transport.ByteBufferPool;
  10. import com.sun.corba.se.spi.logging.CORBALogDomains;
  11. import com.sun.corba.se.spi.orb.ORB;
  12. import com.sun.corba.se.impl.logging.ORBUtilSystemException;
  13. import com.sun.corba.se.impl.orbutil.ORBUtility;
  14. import com.sun.corba.se.impl.protocol.RequestCanceledException;
  15. import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage;
  16. import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
  17. import java.util.*;
  18. public class BufferManagerReadStream
  19. implements BufferManagerRead, MarkAndResetHandler
  20. {
  21. private boolean receivedCancel = false;
  22. private int cancelReqId = 0;
  23. // We should convert endOfStream to a final static dummy end node
  24. private boolean endOfStream = true;
  25. private BufferQueue fragmentQueue = new BufferQueue();
  26. // REVISIT - This should go in BufferManagerRead. But, since
  27. // BufferManagerRead is an interface. BufferManagerRead
  28. // might ought to be an abstract class instead of an
  29. // interface.
  30. private ORB orb ;
  31. private ORBUtilSystemException wrapper ;
  32. private boolean debug = false;
  33. BufferManagerReadStream( ORB orb )
  34. {
  35. this.orb = orb ;
  36. this.wrapper = ORBUtilSystemException.get( orb,
  37. CORBALogDomains.RPC_ENCODING ) ;
  38. debug = orb.transportDebugFlag;
  39. }
  40. public void cancelProcessing(int requestId) {
  41. synchronized(fragmentQueue) {
  42. receivedCancel = true;
  43. cancelReqId = requestId;
  44. fragmentQueue.notify();
  45. }
  46. }
  47. public void processFragment(ByteBuffer byteBuffer, FragmentMessage msg)
  48. {
  49. ByteBufferWithInfo bbwi =
  50. new ByteBufferWithInfo(orb, byteBuffer, msg.getHeaderLength());
  51. synchronized (fragmentQueue) {
  52. if (debug)
  53. {
  54. // print address of ByteBuffer being queued
  55. int bbAddress = System.identityHashCode(byteBuffer);
  56. StringBuffer sb = new StringBuffer(80);
  57. sb.append("processFragment() - queueing ByteBuffer id (");
  58. sb.append(bbAddress).append(") to fragment queue.");
  59. String strMsg = sb.toString();
  60. dprint(strMsg);
  61. }
  62. fragmentQueue.enqueue(bbwi);
  63. endOfStream = !msg.moreFragmentsToFollow();
  64. fragmentQueue.notify();
  65. }
  66. }
  67. public ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi)
  68. {
  69. ByteBufferWithInfo result = null;
  70. try {
  71. //System.out.println("ENTER underflow");
  72. synchronized (fragmentQueue) {
  73. if (receivedCancel) {
  74. throw new RequestCanceledException(cancelReqId);
  75. }
  76. while (fragmentQueue.size() == 0) {
  77. if (endOfStream) {
  78. throw wrapper.endOfStream() ;
  79. }
  80. try {
  81. fragmentQueue.wait();
  82. } catch (InterruptedException e) {}
  83. if (receivedCancel) {
  84. throw new RequestCanceledException(cancelReqId);
  85. }
  86. }
  87. result = fragmentQueue.dequeue();
  88. result.fragmented = true;
  89. if (debug)
  90. {
  91. // print address of ByteBuffer being dequeued
  92. int bbAddr = System.identityHashCode(result.byteBuffer);
  93. StringBuffer sb1 = new StringBuffer(80);
  94. sb1.append("underflow() - dequeued ByteBuffer id (");
  95. sb1.append(bbAddr).append(") from fragment queue.");
  96. String msg1 = sb1.toString();
  97. dprint(msg1);
  98. }
  99. // VERY IMPORTANT
  100. // Release bbwi.byteBuffer to the ByteBufferPool only if
  101. // this BufferManagerStream is not marked for potential restore.
  102. if (markEngaged == false && bbwi != null && bbwi.byteBuffer != null)
  103. {
  104. ByteBufferPool byteBufferPool = getByteBufferPool();
  105. if (debug)
  106. {
  107. // print address of ByteBuffer being released
  108. int bbAddress = System.identityHashCode(bbwi.byteBuffer);
  109. StringBuffer sb = new StringBuffer(80);
  110. sb.append("underflow() - releasing ByteBuffer id (");
  111. sb.append(bbAddress).append(") to ByteBufferPool.");
  112. String msg = sb.toString();
  113. dprint(msg);
  114. }
  115. byteBufferPool.releaseByteBuffer(bbwi.byteBuffer);
  116. bbwi.byteBuffer = null;
  117. bbwi = null;
  118. }
  119. }
  120. return result;
  121. } finally {
  122. //System.out.println("EXIT underflow");
  123. }
  124. }
  125. public void init(Message msg) {
  126. if (msg != null)
  127. endOfStream = !msg.moreFragmentsToFollow();
  128. }
  129. // Release any queued ByteBufferWithInfo's byteBuffers to the
  130. // ByteBufferPoool
  131. public void close(ByteBufferWithInfo bbwi)
  132. {
  133. int inputBbAddress = 0;
  134. // release ByteBuffers on fragmentQueue
  135. if (fragmentQueue != null)
  136. {
  137. synchronized (fragmentQueue)
  138. {
  139. // IMPORTANT: The fragment queue may have one ByteBuffer
  140. // on it that's also on the CDRInputStream if
  141. // this method is called when the stream is 'marked'.
  142. // Thus, we'll compare the ByteBuffer passed
  143. // in (from a CDRInputStream) with all ByteBuffers
  144. // on the stack. If one is found to equal, it will
  145. // not be released to the ByteBufferPool.
  146. if (bbwi != null)
  147. {
  148. inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
  149. }
  150. ByteBufferWithInfo abbwi = null;
  151. ByteBufferPool byteBufferPool = getByteBufferPool();
  152. while (fragmentQueue.size() != 0)
  153. {
  154. abbwi = fragmentQueue.dequeue();
  155. if (abbwi != null && abbwi.byteBuffer != null)
  156. {
  157. int bbAddress = System.identityHashCode(abbwi.byteBuffer);
  158. if (inputBbAddress != bbAddress)
  159. {
  160. if (debug)
  161. {
  162. // print address of ByteBuffer released
  163. StringBuffer sb = new StringBuffer(80);
  164. sb.append("close() - fragmentQueue is ")
  165. .append("releasing ByteBuffer id (")
  166. .append(bbAddress).append(") to ")
  167. .append("ByteBufferPool.");
  168. String msg = sb.toString();
  169. dprint(msg);
  170. }
  171. }
  172. byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
  173. }
  174. }
  175. }
  176. fragmentQueue = null;
  177. }
  178. // release ByteBuffers on fragmentStack
  179. if (fragmentStack != null && fragmentStack.size() != 0)
  180. {
  181. // IMPORTANT: The fragment stack may have one ByteBuffer
  182. // on it that's also on the CDRInputStream if
  183. // this method is called when the stream is 'marked'.
  184. // Thus, we'll compare the ByteBuffer passed
  185. // in (from a CDRInputStream) with all ByteBuffers
  186. // on the stack. If one is found to equal, it will
  187. // not be released to the ByteBufferPool.
  188. if (bbwi != null)
  189. {
  190. inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
  191. }
  192. ByteBufferWithInfo abbwi = null;
  193. ByteBufferPool byteBufferPool = getByteBufferPool();
  194. ListIterator itr = fragmentStack.listIterator();
  195. while (itr.hasNext())
  196. {
  197. abbwi = (ByteBufferWithInfo)itr.next();
  198. if (abbwi != null && abbwi.byteBuffer != null)
  199. {
  200. int bbAddress = System.identityHashCode(abbwi.byteBuffer);
  201. if (inputBbAddress != bbAddress)
  202. {
  203. if (debug)
  204. {
  205. // print address of ByteBuffer being released
  206. StringBuffer sb = new StringBuffer(80);
  207. sb.append("close() - fragmentStack - releasing ")
  208. .append("ByteBuffer id (" + bbAddress + ") to ")
  209. .append("ByteBufferPool.");
  210. String msg = sb.toString();
  211. dprint(msg);
  212. }
  213. byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
  214. }
  215. }
  216. }
  217. fragmentStack = null;
  218. }
  219. }
  220. protected ByteBufferPool getByteBufferPool()
  221. {
  222. return orb.getByteBufferPool();
  223. }
  224. private void dprint(String msg)
  225. {
  226. ORBUtility.dprint("BufferManagerReadStream", msg);
  227. }
  228. // Mark and reset handler ----------------------------------------
  229. private boolean markEngaged = false;
  230. // List of fragment ByteBufferWithInfos received since
  231. // the mark was engaged.
  232. private LinkedList fragmentStack = null;
  233. private RestorableInputStream inputStream = null;
  234. // Original state of the stream
  235. private Object streamMemento = null;
  236. public void mark(RestorableInputStream inputStream)
  237. {
  238. this.inputStream = inputStream;
  239. markEngaged = true;
  240. // Get the magic Object that the stream will use to
  241. // reconstruct it's state when reset is called
  242. streamMemento = inputStream.createStreamMemento();
  243. if (fragmentStack != null) {
  244. fragmentStack.clear();
  245. }
  246. }
  247. // Collects fragments received since the mark was engaged.
  248. public void fragmentationOccured(ByteBufferWithInfo newFragment)
  249. {
  250. if (!markEngaged)
  251. return;
  252. if (fragmentStack == null)
  253. fragmentStack = new LinkedList();
  254. fragmentStack.addFirst(new ByteBufferWithInfo(newFragment));
  255. }
  256. public void reset()
  257. {
  258. if (!markEngaged) {
  259. // REVISIT - call to reset without call to mark
  260. return;
  261. }
  262. markEngaged = false;
  263. // If we actually did peek across fragments, we need
  264. // to push those fragments onto the front of the
  265. // buffer queue.
  266. if (fragmentStack != null && fragmentStack.size() != 0) {
  267. ListIterator iter = fragmentStack.listIterator();
  268. synchronized(fragmentQueue) {
  269. while (iter.hasNext()) {
  270. fragmentQueue.push((ByteBufferWithInfo)iter.next());
  271. }
  272. }
  273. fragmentStack.clear();
  274. }
  275. // Give the stream the magic Object to restore
  276. // it's state.
  277. inputStream.restoreInternalState(streamMemento);
  278. }
  279. public MarkAndResetHandler getMarkAndResetHandler() {
  280. return this;
  281. }
  282. }