1. /*
  2. * @(#)PipedInputStream.java 1.32 03/01/23
  3. *
  4. * Copyright 2003 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package java.io;
  8. /**
  9. * A piped input stream should be connected
  10. * to a piped output stream; the piped input
  11. * stream then provides whatever data bytes
  12. * are written to the piped output stream.
  13. * Typically, data is read from a <code>PipedInputStream</code>
  14. * object by one thread and data is written
  15. * to the corresponding <code>PipedOutputStream</code>
  16. * by some other thread. Attempting to use
  17. * both objects from a single thread is not
  18. * recommended, as it may deadlock the thread.
  19. * The piped input stream contains a buffer,
  20. * decoupling read operations from write operations,
  21. * within limits.
  22. *
  23. * @author James Gosling
  24. * @version 1.32, 01/23/03
  25. * @see java.io.PipedOutputStream
  26. * @since JDK1.0
  27. */
  28. public
  29. class PipedInputStream extends InputStream {
  30. boolean closedByWriter = false;
  31. boolean closedByReader = false;
  32. boolean connected = false;
  33. /* REMIND: identification of the read and write sides needs to be
  34. more sophisticated. Either using thread groups (but what about
  35. pipes within a thread?) or using finalization (but it may be a
  36. long time until the next GC). */
  37. Thread readSide;
  38. Thread writeSide;
  39. /**
  40. * The size of the pipe's circular input buffer.
  41. * @since JDK1.1
  42. */
  43. protected static final int PIPE_SIZE = 1024;
  44. /**
  45. * The circular buffer into which incoming data is placed.
  46. * @since JDK1.1
  47. */
  48. protected byte buffer[] = new byte[PIPE_SIZE];
  49. /**
  50. * The index of the position in the circular buffer at which the
  51. * next byte of data will be stored when received from the connected
  52. * piped output stream. <code>in<0</code> implies the buffer is empty,
  53. * <code>in==out</code> implies the buffer is full
  54. * @since JDK1.1
  55. */
  56. protected int in = -1;
  57. /**
  58. * The index of the position in the circular buffer at which the next
  59. * byte of data will be read by this piped input stream.
  60. * @since JDK1.1
  61. */
  62. protected int out = 0;
  63. /**
  64. * Creates a <code>PipedInputStream</code> so
  65. * that it is connected to the piped output
  66. * stream <code>src</code>. Data bytes written
  67. * to <code>src</code> will then be available
  68. * as input from this stream.
  69. *
  70. * @param src the stream to connect to.
  71. * @exception IOException if an I/O error occurs.
  72. */
  73. public PipedInputStream(PipedOutputStream src) throws IOException {
  74. connect(src);
  75. }
  76. /**
  77. * Creates a <code>PipedInputStream</code> so
  78. * that it is not yet connected. It must be
  79. * connected to a <code>PipedOutputStream</code>
  80. * before being used.
  81. *
  82. * @see java.io.PipedInputStream#connect(java.io.PipedOutputStream)
  83. * @see java.io.PipedOutputStream#connect(java.io.PipedInputStream)
  84. */
  85. public PipedInputStream() {
  86. }
  87. /**
  88. * Causes this piped input stream to be connected
  89. * to the piped output stream <code>src</code>.
  90. * If this object is already connected to some
  91. * other piped output stream, an <code>IOException</code>
  92. * is thrown.
  93. * <p>
  94. * If <code>src</code> is an
  95. * unconnected piped output stream and <code>snk</code>
  96. * is an unconnected piped input stream, they
  97. * may be connected by either the call:
  98. * <p>
  99. * <pre><code>snk.connect(src)</code> </pre>
  100. * <p>
  101. * or the call:
  102. * <p>
  103. * <pre><code>src.connect(snk)</code> </pre>
  104. * <p>
  105. * The two
  106. * calls have the same effect.
  107. *
  108. * @param src The piped output stream to connect to.
  109. * @exception IOException if an I/O error occurs.
  110. */
  111. public void connect(PipedOutputStream src) throws IOException {
  112. src.connect(this);
  113. }
  114. /**
  115. * Receives a byte of data. This method will block if no input is
  116. * available.
  117. * @param b the byte being received
  118. * @exception IOException If the pipe is broken.
  119. * @since JDK1.1
  120. */
  121. protected synchronized void receive(int b) throws IOException {
  122. if (!connected) {
  123. throw new IOException("Pipe not connected");
  124. } else if (closedByWriter || closedByReader) {
  125. throw new IOException("Pipe closed");
  126. } else if (readSide != null && !readSide.isAlive()) {
  127. throw new IOException("Read end dead");
  128. }
  129. writeSide = Thread.currentThread();
  130. while (in == out) {
  131. if ((readSide != null) && !readSide.isAlive()) {
  132. throw new IOException("Pipe broken");
  133. }
  134. /* full: kick any waiting readers */
  135. notifyAll();
  136. try {
  137. wait(1000);
  138. } catch (InterruptedException ex) {
  139. throw new java.io.InterruptedIOException();
  140. }
  141. }
  142. if (in < 0) {
  143. in = 0;
  144. out = 0;
  145. }
  146. buffer[in++] = (byte)(b & 0xFF);
  147. if (in >= buffer.length) {
  148. in = 0;
  149. }
  150. }
  151. /**
  152. * Receives data into an array of bytes. This method will
  153. * block until some input is available.
  154. * @param b the buffer into which the data is received
  155. * @param off the start offset of the data
  156. * @param len the maximum number of bytes received
  157. * @return the actual number of bytes received, -1 is
  158. * returned when the end of the stream is reached.
  159. * @exception IOException If an I/O error has occurred.
  160. */
  161. synchronized void receive(byte b[], int off, int len) throws IOException {
  162. while (--len >= 0) {
  163. receive(b[off++]);
  164. }
  165. }
  166. /**
  167. * Notifies all waiting threads that the last byte of data has been
  168. * received.
  169. */
  170. synchronized void receivedLast() {
  171. closedByWriter = true;
  172. notifyAll();
  173. }
  174. /**
  175. * Reads the next byte of data from this piped input stream. The
  176. * value byte is returned as an <code>int</code> in the range
  177. * <code>0</code> to <code>255</code>. If no byte is available
  178. * because the end of the stream has been reached, the value
  179. * <code>-1</code> is returned. This method blocks until input data
  180. * is available, the end of the stream is detected, or an exception
  181. * is thrown.
  182. * If a thread was providing data bytes
  183. * to the connected piped output stream, but
  184. * the thread is no longer alive, then an
  185. * <code>IOException</code> is thrown.
  186. *
  187. * @return the next byte of data, or <code>-1</code> if the end of the
  188. * stream is reached.
  189. * @exception IOException if the pipe is broken.
  190. */
  191. public synchronized int read() throws IOException {
  192. if (!connected) {
  193. throw new IOException("Pipe not connected");
  194. } else if (closedByReader) {
  195. throw new IOException("Pipe closed");
  196. } else if (writeSide != null && !writeSide.isAlive()
  197. && !closedByWriter && (in < 0)) {
  198. throw new IOException("Write end dead");
  199. }
  200. readSide = Thread.currentThread();
  201. int trials = 2;
  202. while (in < 0) {
  203. if (closedByWriter) {
  204. /* closed by writer, return EOF */
  205. return -1;
  206. }
  207. if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
  208. throw new IOException("Pipe broken");
  209. }
  210. /* might be a writer waiting */
  211. notifyAll();
  212. try {
  213. wait(1000);
  214. } catch (InterruptedException ex) {
  215. throw new java.io.InterruptedIOException();
  216. }
  217. }
  218. int ret = buffer[out++] & 0xFF;
  219. if (out >= buffer.length) {
  220. out = 0;
  221. }
  222. if (in == out) {
  223. /* now empty */
  224. in = -1;
  225. }
  226. return ret;
  227. }
  228. /**
  229. * Reads up to <code>len</code> bytes of data from this piped input
  230. * stream into an array of bytes. Less than <code>len</code> bytes
  231. * will be read if the end of the data stream is reached. This method
  232. * blocks until at least one byte of input is available.
  233. * If a thread was providing data bytes
  234. * to the connected piped output stream, but
  235. * the thread is no longer alive, then an
  236. * <code>IOException</code> is thrown.
  237. *
  238. * @param b the buffer into which the data is read.
  239. * @param off the start offset of the data.
  240. * @param len the maximum number of bytes read.
  241. * @return the total number of bytes read into the buffer, or
  242. * <code>-1</code> if there is no more data because the end of
  243. * the stream has been reached.
  244. * @exception IOException if an I/O error occurs.
  245. */
  246. public synchronized int read(byte b[], int off, int len) throws IOException {
  247. if (b == null) {
  248. throw new NullPointerException();
  249. } else if ((off < 0) || (off > b.length) || (len < 0) ||
  250. ((off + len) > b.length) || ((off + len) < 0)) {
  251. throw new IndexOutOfBoundsException();
  252. } else if (len == 0) {
  253. return 0;
  254. }
  255. /* possibly wait on the first character */
  256. int c = read();
  257. if (c < 0) {
  258. return -1;
  259. }
  260. b[off] = (byte) c;
  261. int rlen = 1;
  262. while ((in >= 0) && (--len > 0)) {
  263. b[off + rlen] = buffer[out++];
  264. rlen++;
  265. if (out >= buffer.length) {
  266. out = 0;
  267. }
  268. if (in == out) {
  269. /* now empty */
  270. in = -1;
  271. }
  272. }
  273. return rlen;
  274. }
  275. /**
  276. * Returns the number of bytes that can be read from this input
  277. * stream without blocking. This method overrides the <code>available</code>
  278. * method of the parent class.
  279. *
  280. * @return the number of bytes that can be read from this input stream
  281. * without blocking.
  282. * @exception IOException if an I/O error occurs.
  283. * @since JDK1.0.2
  284. */
  285. public synchronized int available() throws IOException {
  286. if(in < 0)
  287. return 0;
  288. else if(in == out)
  289. return buffer.length;
  290. else if (in > out)
  291. return in - out;
  292. else
  293. return in + buffer.length - out;
  294. }
  295. /**
  296. * Closes this piped input stream and releases any system resources
  297. * associated with the stream.
  298. *
  299. * @exception IOException if an I/O error occurs.
  300. */
  301. public void close() throws IOException {
  302. in = -1;
  303. closedByReader = true;
  304. }
  305. }