1. /*
  2. * @(#)PipedInputStream.java 1.35 03/12/19
  3. *
  4. * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package java.io;
  8. /**
  9. * A piped input stream should be connected
  10. * to a piped output stream; the piped input
  11. * stream then provides whatever data bytes
  12. * are written to the piped output stream.
  13. * Typically, data is read from a <code>PipedInputStream</code>
  14. * object by one thread and data is written
  15. * to the corresponding <code>PipedOutputStream</code>
  16. * by some other thread. Attempting to use
  17. * both objects from a single thread is not
  18. * recommended, as it may deadlock the thread.
  19. * The piped input stream contains a buffer,
  20. * decoupling read operations from write operations,
  21. * within limits.
  22. *
  23. * @author James Gosling
  24. * @version 1.35, 12/19/03
  25. * @see java.io.PipedOutputStream
  26. * @since JDK1.0
  27. */
  28. public
  29. class PipedInputStream extends InputStream {
  30. boolean closedByWriter = false;
  31. volatile boolean closedByReader = false;
  32. boolean connected = false;
  33. /* REMIND: identification of the read and write sides needs to be
  34. more sophisticated. Either using thread groups (but what about
  35. pipes within a thread?) or using finalization (but it may be a
  36. long time until the next GC). */
  37. Thread readSide;
  38. Thread writeSide;
  39. /**
  40. * The size of the pipe's circular input buffer.
  41. * @since JDK1.1
  42. */
  43. protected static final int PIPE_SIZE = 1024;
  44. /**
  45. * The circular buffer into which incoming data is placed.
  46. * @since JDK1.1
  47. */
  48. protected byte buffer[] = new byte[PIPE_SIZE];
  49. /**
  50. * The index of the position in the circular buffer at which the
  51. * next byte of data will be stored when received from the connected
  52. * piped output stream. <code>in<0</code> implies the buffer is empty,
  53. * <code>in==out</code> implies the buffer is full
  54. * @since JDK1.1
  55. */
  56. protected int in = -1;
  57. /**
  58. * The index of the position in the circular buffer at which the next
  59. * byte of data will be read by this piped input stream.
  60. * @since JDK1.1
  61. */
  62. protected int out = 0;
  63. /**
  64. * Creates a <code>PipedInputStream</code> so
  65. * that it is connected to the piped output
  66. * stream <code>src</code>. Data bytes written
  67. * to <code>src</code> will then be available
  68. * as input from this stream.
  69. *
  70. * @param src the stream to connect to.
  71. * @exception IOException if an I/O error occurs.
  72. */
  73. public PipedInputStream(PipedOutputStream src) throws IOException {
  74. connect(src);
  75. }
  76. /**
  77. * Creates a <code>PipedInputStream</code> so
  78. * that it is not yet connected. It must be
  79. * connected to a <code>PipedOutputStream</code>
  80. * before being used.
  81. *
  82. * @see java.io.PipedInputStream#connect(java.io.PipedOutputStream)
  83. * @see java.io.PipedOutputStream#connect(java.io.PipedInputStream)
  84. */
  85. public PipedInputStream() {
  86. }
  87. /**
  88. * Causes this piped input stream to be connected
  89. * to the piped output stream <code>src</code>.
  90. * If this object is already connected to some
  91. * other piped output stream, an <code>IOException</code>
  92. * is thrown.
  93. * <p>
  94. * If <code>src</code> is an
  95. * unconnected piped output stream and <code>snk</code>
  96. * is an unconnected piped input stream, they
  97. * may be connected by either the call:
  98. * <p>
  99. * <pre><code>snk.connect(src)</code> </pre>
  100. * <p>
  101. * or the call:
  102. * <p>
  103. * <pre><code>src.connect(snk)</code> </pre>
  104. * <p>
  105. * The two
  106. * calls have the same effect.
  107. *
  108. * @param src The piped output stream to connect to.
  109. * @exception IOException if an I/O error occurs.
  110. */
  111. public void connect(PipedOutputStream src) throws IOException {
  112. src.connect(this);
  113. }
  114. /**
  115. * Receives a byte of data. This method will block if no input is
  116. * available.
  117. * @param b the byte being received
  118. * @exception IOException If the pipe is broken.
  119. * @since JDK1.1
  120. */
  121. protected synchronized void receive(int b) throws IOException {
  122. checkStateForReceive();
  123. writeSide = Thread.currentThread();
  124. if (in == out)
  125. awaitSpace();
  126. if (in < 0) {
  127. in = 0;
  128. out = 0;
  129. }
  130. buffer[in++] = (byte)(b & 0xFF);
  131. if (in >= buffer.length) {
  132. in = 0;
  133. }
  134. }
  135. /**
  136. * Receives data into an array of bytes. This method will
  137. * block until some input is available.
  138. * @param b the buffer into which the data is received
  139. * @param off the start offset of the data
  140. * @param len the maximum number of bytes received
  141. * @exception IOException If an I/O error has occurred.
  142. */
  143. synchronized void receive(byte b[], int off, int len) throws IOException {
  144. checkStateForReceive();
  145. writeSide = Thread.currentThread();
  146. int bytesToTransfer = len;
  147. while (bytesToTransfer > 0) {
  148. if (in == out)
  149. awaitSpace();
  150. int nextTransferAmount = 0;
  151. if (out < in) {
  152. nextTransferAmount = buffer.length - in;
  153. } else if (in < out) {
  154. if (in == -1) {
  155. in = out = 0;
  156. nextTransferAmount = buffer.length - in;
  157. } else {
  158. nextTransferAmount = out - in;
  159. }
  160. }
  161. if (nextTransferAmount > bytesToTransfer)
  162. nextTransferAmount = bytesToTransfer;
  163. assert(nextTransferAmount > 0);
  164. System.arraycopy(b, off, buffer, in, nextTransferAmount);
  165. bytesToTransfer -= nextTransferAmount;
  166. off += nextTransferAmount;
  167. in += nextTransferAmount;
  168. if (in >= buffer.length) {
  169. in = 0;
  170. }
  171. }
  172. }
  173. private void checkStateForReceive() throws IOException {
  174. if (!connected) {
  175. throw new IOException("Pipe not connected");
  176. } else if (closedByWriter || closedByReader) {
  177. throw new IOException("Pipe closed");
  178. } else if (readSide != null && !readSide.isAlive()) {
  179. throw new IOException("Read end dead");
  180. }
  181. }
  182. private void awaitSpace() throws IOException {
  183. while (in == out) {
  184. if ((readSide != null) && !readSide.isAlive()) {
  185. throw new IOException("Pipe broken");
  186. }
  187. /* full: kick any waiting readers */
  188. notifyAll();
  189. try {
  190. wait(1000);
  191. } catch (InterruptedException ex) {
  192. throw new java.io.InterruptedIOException();
  193. }
  194. }
  195. }
  196. /**
  197. * Notifies all waiting threads that the last byte of data has been
  198. * received.
  199. */
  200. synchronized void receivedLast() {
  201. closedByWriter = true;
  202. notifyAll();
  203. }
  204. /**
  205. * Reads the next byte of data from this piped input stream. The
  206. * value byte is returned as an <code>int</code> in the range
  207. * <code>0</code> to <code>255</code>. If no byte is available
  208. * because the end of the stream has been reached, the value
  209. * <code>-1</code> is returned. This method blocks until input data
  210. * is available, the end of the stream is detected, or an exception
  211. * is thrown.
  212. * If a thread was providing data bytes
  213. * to the connected piped output stream, but
  214. * the thread is no longer alive, then an
  215. * <code>IOException</code> is thrown.
  216. *
  217. * @return the next byte of data, or <code>-1</code> if the end of the
  218. * stream is reached.
  219. * @exception IOException if the pipe is broken.
  220. */
  221. public synchronized int read() throws IOException {
  222. if (!connected) {
  223. throw new IOException("Pipe not connected");
  224. } else if (closedByReader) {
  225. throw new IOException("Pipe closed");
  226. } else if (writeSide != null && !writeSide.isAlive()
  227. && !closedByWriter && (in < 0)) {
  228. throw new IOException("Write end dead");
  229. }
  230. readSide = Thread.currentThread();
  231. int trials = 2;
  232. while (in < 0) {
  233. if (closedByWriter) {
  234. /* closed by writer, return EOF */
  235. return -1;
  236. }
  237. if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
  238. throw new IOException("Pipe broken");
  239. }
  240. /* might be a writer waiting */
  241. notifyAll();
  242. try {
  243. wait(1000);
  244. } catch (InterruptedException ex) {
  245. throw new java.io.InterruptedIOException();
  246. }
  247. }
  248. int ret = buffer[out++] & 0xFF;
  249. if (out >= buffer.length) {
  250. out = 0;
  251. }
  252. if (in == out) {
  253. /* now empty */
  254. in = -1;
  255. }
  256. return ret;
  257. }
  258. /**
  259. * Reads up to <code>len</code> bytes of data from this piped input
  260. * stream into an array of bytes. Less than <code>len</code> bytes
  261. * will be read if the end of the data stream is reached. This method
  262. * blocks until at least one byte of input is available.
  263. * If a thread was providing data bytes
  264. * to the connected piped output stream, but
  265. * the thread is no longer alive, then an
  266. * <code>IOException</code> is thrown.
  267. *
  268. * @param b the buffer into which the data is read.
  269. * @param off the start offset of the data.
  270. * @param len the maximum number of bytes read.
  271. * @return the total number of bytes read into the buffer, or
  272. * <code>-1</code> if there is no more data because the end of
  273. * the stream has been reached.
  274. * @exception IOException if an I/O error occurs.
  275. */
  276. public synchronized int read(byte b[], int off, int len) throws IOException {
  277. if (b == null) {
  278. throw new NullPointerException();
  279. } else if ((off < 0) || (off > b.length) || (len < 0) ||
  280. ((off + len) > b.length) || ((off + len) < 0)) {
  281. throw new IndexOutOfBoundsException();
  282. } else if (len == 0) {
  283. return 0;
  284. }
  285. /* possibly wait on the first character */
  286. int c = read();
  287. if (c < 0) {
  288. return -1;
  289. }
  290. b[off] = (byte) c;
  291. int rlen = 1;
  292. while ((in >= 0) && (--len > 0)) {
  293. b[off + rlen] = buffer[out++];
  294. rlen++;
  295. if (out >= buffer.length) {
  296. out = 0;
  297. }
  298. if (in == out) {
  299. /* now empty */
  300. in = -1;
  301. }
  302. }
  303. return rlen;
  304. }
  305. /**
  306. * Returns the number of bytes that can be read from this input
  307. * stream without blocking. This method overrides the <code>available</code>
  308. * method of the parent class.
  309. *
  310. * @return the number of bytes that can be read from this input stream
  311. * without blocking.
  312. * @exception IOException if an I/O error occurs.
  313. * @since JDK1.0.2
  314. */
  315. public synchronized int available() throws IOException {
  316. if(in < 0)
  317. return 0;
  318. else if(in == out)
  319. return buffer.length;
  320. else if (in > out)
  321. return in - out;
  322. else
  323. return in + buffer.length - out;
  324. }
  325. /**
  326. * Closes this piped input stream and releases any system resources
  327. * associated with the stream.
  328. *
  329. * @exception IOException if an I/O error occurs.
  330. */
  331. public void close() throws IOException {
  332. closedByReader = true;
  333. synchronized (this) {
  334. in = -1;
  335. }
  336. }
  337. }