1. /*
  2. * @(#)BufferManagerReadStream.java 1.11 03/01/23
  3. *
  4. * Copyright 2003 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package com.sun.corba.se.internal.iiop;
  8. import org.omg.CORBA.MARSHAL;
  9. import org.omg.CORBA.CompletionStatus;
  10. import com.sun.corba.se.internal.orbutil.MinorCodes;
  11. import com.sun.corba.se.internal.iiop.messages.FragmentMessage;
  12. import com.sun.corba.se.internal.iiop.messages.Message;
  13. import java.util.*;
  14. public class BufferManagerReadStream
  15. implements BufferManagerRead, MarkAndResetHandler
  16. {
  17. private boolean receivedCancel = false;
  18. private int cancelReqId = 0;
  19. // We should convert endOfStream to a final static dummy end node
  20. private boolean endOfStream = true;
  21. private BufferQueue fragmentQueue = new BufferQueue();
  22. public void cancelProcessing(int requestId) {
  23. synchronized(fragmentQueue) {
  24. receivedCancel = true;
  25. cancelReqId = requestId;
  26. fragmentQueue.notify();
  27. }
  28. }
  29. public void processFragment (byte[] buf, FragmentMessage msg)
  30. {
  31. ByteBufferWithInfo bbwi = new ByteBufferWithInfo(buf, msg.getHeaderLength());
  32. synchronized (fragmentQueue) {
  33. fragmentQueue.enqueue(bbwi);
  34. endOfStream = !msg.moreFragmentsToFollow();
  35. fragmentQueue.notify();
  36. }
  37. }
  38. public ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi)
  39. {
  40. synchronized (fragmentQueue) {
  41. if (receivedCancel) {
  42. throw new RequestCanceledException(cancelReqId);
  43. }
  44. while (fragmentQueue.size() == 0) {
  45. if (endOfStream) {
  46. throw new MARSHAL("Unmarshaller requested more data after end of stream",
  47. MinorCodes.END_OF_STREAM,
  48. CompletionStatus.COMPLETED_NO);
  49. }
  50. try {
  51. fragmentQueue.wait();
  52. } catch (InterruptedException e) {}
  53. if (receivedCancel) {
  54. throw new RequestCanceledException(cancelReqId);
  55. }
  56. }
  57. ByteBufferWithInfo result = fragmentQueue.dequeue();
  58. result.fragmented = true;
  59. return result;
  60. }
  61. }
  62. public void init(Message msg) {
  63. if (msg != null)
  64. endOfStream = !msg.moreFragmentsToFollow();
  65. }
  66. // Mark and reset handler ----------------------------------------
  67. private boolean markEngaged = false;
  68. // List of fragment ByteBufferWithInfos received since
  69. // the mark was engaged.
  70. private LinkedList fragmentStack = null;
  71. private RestorableInputStream inputStream = null;
  72. // Original state of the stream
  73. private Object streamMemento = null;
  74. public void mark(RestorableInputStream inputStream)
  75. {
  76. this.inputStream = inputStream;
  77. markEngaged = true;
  78. // Get the magic Object that the stream will use to
  79. // reconstruct it's state when reset is called
  80. streamMemento = inputStream.createStreamMemento();
  81. if (fragmentStack != null) {
  82. fragmentStack.clear();
  83. }
  84. }
  85. // Collects fragments received since the mark was engaged.
  86. public void fragmentationOccured(ByteBufferWithInfo newFragment)
  87. {
  88. if (!markEngaged)
  89. return;
  90. if (fragmentStack == null)
  91. fragmentStack = new LinkedList();
  92. fragmentStack.addFirst(new ByteBufferWithInfo(newFragment));
  93. }
  94. public void reset()
  95. {
  96. if (!markEngaged) {
  97. // REVISIT - call to reset without call to mark
  98. return;
  99. }
  100. markEngaged = false;
  101. // If we actually did peek across fragments, we need
  102. // to push those fragments onto the front of the
  103. // buffer queue.
  104. if (fragmentStack != null && fragmentStack.size() != 0) {
  105. ListIterator iter = fragmentStack.listIterator();
  106. synchronized(fragmentQueue) {
  107. while (iter.hasNext()) {
  108. fragmentQueue.push((ByteBufferWithInfo)iter.next());
  109. }
  110. }
  111. fragmentStack.clear();
  112. }
  113. // Give the stream the magic Object to restore
  114. // it's state.
  115. inputStream.restoreInternalState(streamMemento);
  116. }
  117. public MarkAndResetHandler getMarkAndResetHandler() {
  118. return this;
  119. }
  120. }