1. /*
  2. * @(#)PipedReader.java 1.11 01/11/29
  3. *
  4. * Copyright 2002 Sun Microsystems, Inc. All rights reserved.
  5. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  6. */
  7. package java.io;
  8. /**
  9. * Piped character-input streams.
  10. *
  11. * @version 1.11, 01/11/29
  12. * @author Mark Reinhold
  13. * @since JDK1.1
  14. */
  15. public class PipedReader extends Reader {
  16. boolean closedByWriter = false;
  17. boolean closedByReader = false;
  18. boolean connected = false;
  19. /* REMIND: identification of the read and write sides needs to be
  20. more sophisticated. Either using thread groups (but what about
  21. pipes within a thread?) or using finalization (but it may be a
  22. long time until the next GC). */
  23. Thread readSide;
  24. Thread writeSide;
  25. /**
  26. * The size of the pipe's circular input buffer.
  27. */
  28. static final int PIPE_SIZE = 1024;
  29. /**
  30. * The circular buffer into which incoming data is placed.
  31. */
  32. char buffer[] = new char[PIPE_SIZE];
  33. /**
  34. * The index of the position in the circular buffer at which the
  35. * next character of data will be stored when received from the connected
  36. * piped writer. <code>in<0</code> implies the buffer is empty,
  37. * <code>in==out</code> implies the buffer is full
  38. */
  39. int in = -1;
  40. /**
  41. * The index of the position in the circular buffer at which the next
  42. * character of data will be read by this piped reader.
  43. */
  44. int out = 0;
  45. /**
  46. * Creates a <code>PipedReader</code> so
  47. * that it is connected to the piped writer
  48. * <code>src</code>. Data written to <code>src</code>
  49. * will then be available as input from this stream.
  50. *
  51. * @param src the stream to connect to.
  52. * @exception IOException if an I/O error occurs.
  53. */
  54. public PipedReader(PipedWriter src) throws IOException {
  55. connect(src);
  56. }
  57. /**
  58. * Creates a <code>PipedReader</code> so
  59. * that it is not yet connected. It must be
  60. * connected to a <code>PipedWriter</code>
  61. * before being used.
  62. *
  63. * @see java.io.PipedReader#connect(java.io.PipedWriter)
  64. * @see java.io.PipedWriter#connect(java.io.PipedReader)
  65. */
  66. public PipedReader() {
  67. }
  68. /**
  69. * Causes this piped reader to be connected
  70. * to the piped writer <code>src</code>.
  71. * If this object is already connected to some
  72. * other piped writer, an <code>IOException</code>
  73. * is thrown.
  74. * <p>
  75. * If <code>src</code> is an
  76. * unconnected piped writer and <code>snk</code>
  77. * is an unconnected piped reader, they
  78. * may be connected by either the call:
  79. * <p>
  80. * <pre><code>snk.connect(src)</code> </pre>
  81. * <p>
  82. * or the call:
  83. * <p>
  84. * <pre><code>src.connect(snk)</code> </pre>
  85. * <p>
  86. * The two
  87. * calls have the same effect.
  88. *
  89. * @param src The piped writer to connect to.
  90. * @exception IOException if an I/O error occurs.
  91. */
  92. public void connect(PipedWriter src) throws IOException {
  93. src.connect(this);
  94. }
  95. /**
  96. * Receives a char of data. This method will block if no input is
  97. * available.
  98. */
  99. synchronized void receive(int c) throws IOException {
  100. if (!connected) {
  101. throw new IOException("Pipe not connected");
  102. } else if (closedByWriter || closedByReader) {
  103. throw new IOException("Pipe closed");
  104. } else if (readSide != null && !readSide.isAlive()) {
  105. throw new IOException("Read end dead");
  106. }
  107. writeSide = Thread.currentThread();
  108. while (in == out) {
  109. if ((readSide != null) && !readSide.isAlive()) {
  110. throw new IOException("Pipe broken");
  111. }
  112. /* full: kick any waiting readers */
  113. notifyAll();
  114. try {
  115. wait(1000);
  116. } catch (InterruptedException ex) {
  117. throw new java.io.InterruptedIOException();
  118. }
  119. }
  120. if (in < 0) {
  121. in = 0;
  122. out = 0;
  123. }
  124. buffer[in++] = (char) c;
  125. if (in >= buffer.length) {
  126. in = 0;
  127. }
  128. }
  129. /**
  130. * Receives data into an array of characters. This method will
  131. * block until some input is available.
  132. */
  133. synchronized void receive(char c[], int off, int len) throws IOException {
  134. while (--len >= 0) {
  135. receive(c[off++]);
  136. }
  137. }
  138. /**
  139. * Notifies all waiting threads that the last character of data has been
  140. * received.
  141. */
  142. synchronized void receivedLast() {
  143. closedByWriter = true;
  144. notifyAll();
  145. }
  146. /**
  147. * Reads the next character of data from this piped stream.
  148. * If no character is available because the end of the stream
  149. * has been reached, the value <code>-1</code> is returned.
  150. * This method blocks until input data is available, the end of
  151. * the stream is detected, or an exception is thrown.
  152. *
  153. * If a thread was providing data characters
  154. * to the connected piped writer, but
  155. * the thread is no longer alive, then an
  156. * <code>IOException</code> is thrown.
  157. *
  158. * @return the next character of data, or <code>-1</code> if the end of the
  159. * stream is reached.
  160. * @exception IOException if the pipe is broken.
  161. */
  162. public synchronized int read() throws IOException {
  163. if (!connected) {
  164. throw new IOException("Pipe not connected");
  165. } else if (closedByReader) {
  166. throw new IOException("Pipe closed");
  167. } else if (writeSide != null && !writeSide.isAlive()
  168. && !closedByWriter && (in < 0)) {
  169. throw new IOException("Write end dead");
  170. }
  171. readSide = Thread.currentThread();
  172. int trials = 2;
  173. while (in < 0) {
  174. if (closedByWriter) {
  175. /* closed by writer, return EOF */
  176. return -1;
  177. }
  178. if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
  179. throw new IOException("Pipe broken");
  180. }
  181. /* might be a writer waiting */
  182. notifyAll();
  183. try {
  184. wait(1000);
  185. } catch (InterruptedException ex) {
  186. throw new java.io.InterruptedIOException();
  187. }
  188. }
  189. int ret = buffer[out++];
  190. if (out >= buffer.length) {
  191. out = 0;
  192. }
  193. if (in == out) {
  194. /* now empty */
  195. in = -1;
  196. }
  197. return ret;
  198. }
  199. /**
  200. * Reads up to <code>len</code> characters of data from this piped
  201. * stream into an array of characters. Less than <code>len</code> characters
  202. * will be read if the end of the data stream is reached. This method
  203. * blocks until at least one character of input is available.
  204. * If a thread was providing data characters to the connected piped output,
  205. * but the thread is no longer alive, then an <code>IOException</code>
  206. * is thrown.
  207. *
  208. * @param cbuf the buffer into which the data is read.
  209. * @param off the start offset of the data.
  210. * @param len the maximum number of characters read.
  211. * @return the total number of characters read into the buffer, or
  212. * <code>-1</code> if there is no more data because the end of
  213. * the stream has been reached.
  214. * @exception IOException if an I/O error occurs.
  215. */
  216. public synchronized int read(char cbuf[], int off, int len) throws IOException {
  217. if (!connected) {
  218. throw new IOException("Pipe not connected");
  219. } else if (closedByReader) {
  220. throw new IOException("Pipe closed");
  221. } else if (writeSide != null && !writeSide.isAlive()
  222. && !closedByWriter && (in < 0)) {
  223. throw new IOException("Write end dead");
  224. }
  225. if ((off < 0) || (off > cbuf.length) || (len < 0) ||
  226. ((off + len) > cbuf.length) || ((off + len) < 0)) {
  227. throw new IndexOutOfBoundsException();
  228. } else if (len == 0) {
  229. return 0;
  230. }
  231. /* possibly wait on the first character */
  232. int c = read();
  233. if (c < 0) {
  234. return -1;
  235. }
  236. cbuf[off] = (char)c;
  237. int rlen = 1;
  238. while ((in >= 0) && (--len > 0)) {
  239. cbuf[off + rlen] = buffer[out++];
  240. rlen++;
  241. if (out >= buffer.length) {
  242. out = 0;
  243. }
  244. if (in == out) {
  245. /* now empty */
  246. in = -1;
  247. }
  248. }
  249. return rlen;
  250. }
  251. /**
  252. * Tell whether this stream is ready to be read. A piped character
  253. * stream is ready if the circular buffer is not empty.
  254. *
  255. * @exception IOException If an I/O error occurs
  256. */
  257. public synchronized boolean ready() throws IOException {
  258. if (!connected) {
  259. throw new IOException("Pipe not connected");
  260. } else if (closedByReader) {
  261. throw new IOException("Pipe closed");
  262. } else if (writeSide != null && !writeSide.isAlive()
  263. && !closedByWriter && (in < 0)) {
  264. throw new IOException("Write end dead");
  265. }
  266. if (in < 0) {
  267. return false;
  268. } else {
  269. return true;
  270. }
  271. }
  272. /**
  273. * Closes this piped stream and releases any system resources
  274. * associated with the stream.
  275. *
  276. * @exception IOException if an I/O error occurs.
  277. */
  278. public void close() throws IOException {
  279. in = -1;
  280. closedByReader = true;
  281. }
  282. }