1. /*
  2. * @(#)PipedInputStream.java 1.30 00/02/02
  3. *
  4. * Copyright 1995-2000 Sun Microsystems, Inc. All Rights Reserved.
  5. *
  6. * This software is the proprietary information of Sun Microsystems, Inc.
  7. * Use is subject to license terms.
  8. *
  9. */
  10. package java.io;
  11. /**
  12. * A piped input stream should be connected
  13. * to a piped output stream; the piped input
  14. * stream then provides whatever data bytes
  15. * are written to the piped output stream.
  16. * Typically, data is read from a <code>PipedInputStream</code>
  17. * object by one thread and data is written
  18. * to the corresponding <code>PipedOutputStream</code>
  19. * by some other thread. Attempting to use
  20. * both objects from a single thread is not
  21. * recommended, as it may deadlock the thread.
  22. * The piped input stream contains a buffer,
  23. * decoupling read operations from write operations,
  24. * within limits.
  25. *
  26. * @author James Gosling
  27. * @version 1.30, 02/02/00
  28. * @see java.io.PipedOutputStream
  29. * @since JDK1.0
  30. */
  31. public
  32. class PipedInputStream extends InputStream {
  33. boolean closedByWriter = false;
  34. boolean closedByReader = false;
  35. boolean connected = false;
  36. /* REMIND: identification of the read and write sides needs to be
  37. more sophisticated. Either using thread groups (but what about
  38. pipes within a thread?) or using finalization (but it may be a
  39. long time until the next GC). */
  40. Thread readSide;
  41. Thread writeSide;
  42. /**
  43. * The size of the pipe's circular input buffer.
  44. * @since JDK1.1
  45. */
  46. protected static final int PIPE_SIZE = 1024;
  47. /**
  48. * The circular buffer into which incoming data is placed.
  49. * @since JDK1.1
  50. */
  51. protected byte buffer[] = new byte[PIPE_SIZE];
  52. /**
  53. * The index of the position in the circular buffer at which the
  54. * next byte of data will be stored when received from the connected
  55. * piped output stream. <code>in<0</code> implies the buffer is empty,
  56. * <code>in==out</code> implies the buffer is full
  57. * @since JDK1.1
  58. */
  59. protected int in = -1;
  60. /**
  61. * The index of the position in the circular buffer at which the next
  62. * byte of data will be read by this piped input stream.
  63. * @since JDK1.1
  64. */
  65. protected int out = 0;
  66. /**
  67. * Creates a <code>PipedInputStream</code> so
  68. * that it is connected to the piped output
  69. * stream <code>src</code>. Data bytes written
  70. * to <code>src</code> will then be available
  71. * as input from this stream.
  72. *
  73. * @param src the stream to connect to.
  74. * @exception IOException if an I/O error occurs.
  75. */
  76. public PipedInputStream(PipedOutputStream src) throws IOException {
  77. connect(src);
  78. }
  79. /**
  80. * Creates a <code>PipedInputStream</code> so
  81. * that it is not yet connected. It must be
  82. * connected to a <code>PipedOutputStream</code>
  83. * before being used.
  84. *
  85. * @see java.io.PipedInputStream#connect(java.io.PipedOutputStream)
  86. * @see java.io.PipedOutputStream#connect(java.io.PipedInputStream)
  87. */
  88. public PipedInputStream() {
  89. }
  90. /**
  91. * Causes this piped input stream to be connected
  92. * to the piped output stream <code>src</code>.
  93. * If this object is already connected to some
  94. * other piped output stream, an <code>IOException</code>
  95. * is thrown.
  96. * <p>
  97. * If <code>src</code> is an
  98. * unconnected piped output stream and <code>snk</code>
  99. * is an unconnected piped input stream, they
  100. * may be connected by either the call:
  101. * <p>
  102. * <pre><code>snk.connect(src)</code> </pre>
  103. * <p>
  104. * or the call:
  105. * <p>
  106. * <pre><code>src.connect(snk)</code> </pre>
  107. * <p>
  108. * The two
  109. * calls have the same effect.
  110. *
  111. * @param src The piped output stream to connect to.
  112. * @exception IOException if an I/O error occurs.
  113. */
  114. public void connect(PipedOutputStream src) throws IOException {
  115. src.connect(this);
  116. }
  117. /**
  118. * Receives a byte of data. This method will block if no input is
  119. * available.
  120. * @param b the byte being received
  121. * @exception IOException If the pipe is broken.
  122. * @since JDK1.1
  123. */
  124. protected synchronized void receive(int b) throws IOException {
  125. if (!connected) {
  126. throw new IOException("Pipe not connected");
  127. } else if (closedByWriter || closedByReader) {
  128. throw new IOException("Pipe closed");
  129. } else if (readSide != null && !readSide.isAlive()) {
  130. throw new IOException("Read end dead");
  131. }
  132. writeSide = Thread.currentThread();
  133. while (in == out) {
  134. if ((readSide != null) && !readSide.isAlive()) {
  135. throw new IOException("Pipe broken");
  136. }
  137. /* full: kick any waiting readers */
  138. notifyAll();
  139. try {
  140. wait(1000);
  141. } catch (InterruptedException ex) {
  142. throw new java.io.InterruptedIOException();
  143. }
  144. }
  145. if (in < 0) {
  146. in = 0;
  147. out = 0;
  148. }
  149. buffer[in++] = (byte)(b & 0xFF);
  150. if (in >= buffer.length) {
  151. in = 0;
  152. }
  153. }
  154. /**
  155. * Receives data into an array of bytes. This method will
  156. * block until some input is available.
  157. * @param b the buffer into which the data is received
  158. * @param off the start offset of the data
  159. * @param len the maximum number of bytes received
  160. * @return the actual number of bytes received, -1 is
  161. * returned when the end of the stream is reached.
  162. * @exception IOException If an I/O error has occurred.
  163. */
  164. synchronized void receive(byte b[], int off, int len) throws IOException {
  165. while (--len >= 0) {
  166. receive(b[off++]);
  167. }
  168. }
  169. /**
  170. * Notifies all waiting threads that the last byte of data has been
  171. * received.
  172. */
  173. synchronized void receivedLast() {
  174. closedByWriter = true;
  175. notifyAll();
  176. }
  177. /**
  178. * Reads the next byte of data from this piped input stream. The
  179. * value byte is returned as an <code>int</code> in the range
  180. * <code>0</code> to <code>255</code>. If no byte is available
  181. * because the end of the stream has been reached, the value
  182. * <code>-1</code> is returned. This method blocks until input data
  183. * is available, the end of the stream is detected, or an exception
  184. * is thrown.
  185. * If a thread was providing data bytes
  186. * to the connected piped output stream, but
  187. * the thread is no longer alive, then an
  188. * <code>IOException</code> is thrown.
  189. *
  190. * @return the next byte of data, or <code>-1</code> if the end of the
  191. * stream is reached.
  192. * @exception IOException if the pipe is broken.
  193. */
  194. public synchronized int read() throws IOException {
  195. if (!connected) {
  196. throw new IOException("Pipe not connected");
  197. } else if (closedByReader) {
  198. throw new IOException("Pipe closed");
  199. } else if (writeSide != null && !writeSide.isAlive()
  200. && !closedByWriter && (in < 0)) {
  201. throw new IOException("Write end dead");
  202. }
  203. readSide = Thread.currentThread();
  204. int trials = 2;
  205. while (in < 0) {
  206. if (closedByWriter) {
  207. /* closed by writer, return EOF */
  208. return -1;
  209. }
  210. if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
  211. throw new IOException("Pipe broken");
  212. }
  213. /* might be a writer waiting */
  214. notifyAll();
  215. try {
  216. wait(1000);
  217. } catch (InterruptedException ex) {
  218. throw new java.io.InterruptedIOException();
  219. }
  220. }
  221. int ret = buffer[out++] & 0xFF;
  222. if (out >= buffer.length) {
  223. out = 0;
  224. }
  225. if (in == out) {
  226. /* now empty */
  227. in = -1;
  228. }
  229. return ret;
  230. }
  231. /**
  232. * Reads up to <code>len</code> bytes of data from this piped input
  233. * stream into an array of bytes. Less than <code>len</code> bytes
  234. * will be read if the end of the data stream is reached. This method
  235. * blocks until at least one byte of input is available.
  236. * If a thread was providing data bytes
  237. * to the connected piped output stream, but
  238. * the thread is no longer alive, then an
  239. * <code>IOException</code> is thrown.
  240. *
  241. * @param b the buffer into which the data is read.
  242. * @param off the start offset of the data.
  243. * @param len the maximum number of bytes read.
  244. * @return the total number of bytes read into the buffer, or
  245. * <code>-1</code> if there is no more data because the end of
  246. * the stream has been reached.
  247. * @exception IOException if an I/O error occurs.
  248. */
  249. public synchronized int read(byte b[], int off, int len) throws IOException {
  250. if (b == null) {
  251. throw new NullPointerException();
  252. } else if ((off < 0) || (off > b.length) || (len < 0) ||
  253. ((off + len) > b.length) || ((off + len) < 0)) {
  254. throw new IndexOutOfBoundsException();
  255. } else if (len == 0) {
  256. return 0;
  257. }
  258. /* possibly wait on the first character */
  259. int c = read();
  260. if (c < 0) {
  261. return -1;
  262. }
  263. b[off] = (byte) c;
  264. int rlen = 1;
  265. while ((in >= 0) && (--len > 0)) {
  266. b[off + rlen] = buffer[out++];
  267. rlen++;
  268. if (out >= buffer.length) {
  269. out = 0;
  270. }
  271. if (in == out) {
  272. /* now empty */
  273. in = -1;
  274. }
  275. }
  276. return rlen;
  277. }
  278. /**
  279. * Returns the number of bytes that can be read from this input
  280. * stream without blocking. This method overrides the <code>available</code>
  281. * method of the parent class.
  282. *
  283. * @return the number of bytes that can be read from this input stream
  284. * without blocking.
  285. * @exception IOException if an I/O error occurs.
  286. * @since JDK1.0.2
  287. */
  288. public synchronized int available() throws IOException {
  289. if(in < 0)
  290. return 0;
  291. else if(in == out)
  292. return buffer.length;
  293. else if (in > out)
  294. return in - out;
  295. else
  296. return in + buffer.length - out;
  297. }
  298. /**
  299. * Closes this piped input stream and releases any system resources
  300. * associated with the stream.
  301. *
  302. * @exception IOException if an I/O error occurs.
  303. */
  304. public void close() throws IOException {
  305. in = -1;
  306. closedByReader = true;
  307. }
  308. }