1. /*
  2. * Copyright 2003-2004 The Apache Software Foundation
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package org.apache.commons.net.telnet;
  17. import java.io.BufferedInputStream;
  18. import java.io.IOException;
  19. import java.io.InputStream;
  20. import java.io.InterruptedIOException;
  21. /***
  22. *
  23. * <p>
  24. *
  25. * <p>
  26. * <p>
  27. * @author Daniel F. Savarese
  28. * @author Bruno D'Avanzo
  29. ***/
  30. final class TelnetInputStream extends BufferedInputStream implements Runnable
  31. {
  32. static final int _STATE_DATA = 0, _STATE_IAC = 1, _STATE_WILL = 2,
  33. _STATE_WONT = 3, _STATE_DO = 4, _STATE_DONT = 5,
  34. _STATE_SB = 6, _STATE_SE = 7, _STATE_CR = 8, _STATE_IAC_SB = 9;
  35. private boolean __hasReachedEOF, __isClosed;
  36. private boolean __readIsWaiting;
  37. private int __receiveState, __queueHead, __queueTail, __bytesAvailable;
  38. private int[] __queue;
  39. private TelnetClient __client;
  40. private Thread __thread;
  41. private IOException __ioException;
  42. /* TERMINAL-TYPE option (start)*/
  43. private int __suboption[] = new int[256];
  44. private int __suboption_count = 0;
  45. /* TERMINAL-TYPE option (end)*/
  46. private boolean _ayt_flag = false;
  47. private boolean __threaded = false;
  48. TelnetInputStream(InputStream input, TelnetClient client)
  49. {
  50. super(input);
  51. __client = client;
  52. __receiveState = _STATE_DATA;
  53. __isClosed = true;
  54. __hasReachedEOF = false;
  55. // Make it 1025, because when full, one slot will go unused, and we
  56. // want a 1024 byte buffer just to have a round number (base 2 that is)
  57. //__queue = new int[1025];
  58. __queue = new int[2049];
  59. __queueHead = 0;
  60. __queueTail = 0;
  61. __bytesAvailable = 0;
  62. __ioException = null;
  63. __readIsWaiting = false;
  64. __thread = new Thread(this);
  65. }
  66. void _start()
  67. {
  68. int priority;
  69. __isClosed = false;
  70. // Need to set a higher priority in case JVM does not use pre-emptive
  71. // threads. This should prevent scheduler induced deadlock (rather than
  72. // deadlock caused by a bug in this code).
  73. priority = Thread.currentThread().getPriority() + 1;
  74. if (priority > Thread.MAX_PRIORITY)
  75. priority = Thread.MAX_PRIORITY;
  76. __thread.setPriority(priority);
  77. __thread.setDaemon(true);
  78. __thread.start();
  79. __threaded = true;
  80. }
  81. // synchronized(__client) critical sections are to protect against
  82. // TelnetOutputStream writing through the telnet client at same time
  83. // as a processDo/Will/etc. command invoked from TelnetInputStream
  84. // tries to write.
  85. private int __read() throws IOException
  86. {
  87. int ch;
  88. _loop:
  89. while (true)
  90. {
  91. // Exit only when we reach end of stream.
  92. if ((ch = super.read()) < 0)
  93. return -1;
  94. ch = (ch & 0xff);
  95. /* Code Section added for supporting AYT (start)*/
  96. synchronized (__client)
  97. {
  98. __client._processAYTResponse();
  99. }
  100. /* Code Section added for supporting AYT (end)*/
  101. /* Code Section added for supporting spystreams (start)*/
  102. __client._spyRead(ch);
  103. /* Code Section added for supporting spystreams (end)*/
  104. _mainSwitch:
  105. switch (__receiveState)
  106. {
  107. case _STATE_CR:
  108. if (ch == '\0')
  109. {
  110. // Strip null
  111. continue;
  112. }
  113. // How do we handle newline after cr?
  114. // else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
  115. // Handle as normal data by falling through to _STATE_DATA case
  116. case _STATE_DATA:
  117. if (ch == TelnetCommand.IAC)
  118. {
  119. __receiveState = _STATE_IAC;
  120. continue;
  121. }
  122. if (ch == '\r')
  123. {
  124. synchronized (__client)
  125. {
  126. if (__client._requestedDont(TelnetOption.BINARY))
  127. __receiveState = _STATE_CR;
  128. else
  129. __receiveState = _STATE_DATA;
  130. }
  131. }
  132. else
  133. __receiveState = _STATE_DATA;
  134. break;
  135. case _STATE_IAC:
  136. switch (ch)
  137. {
  138. case TelnetCommand.WILL:
  139. __receiveState = _STATE_WILL;
  140. continue;
  141. case TelnetCommand.WONT:
  142. __receiveState = _STATE_WONT;
  143. continue;
  144. case TelnetCommand.DO:
  145. __receiveState = _STATE_DO;
  146. continue;
  147. case TelnetCommand.DONT:
  148. __receiveState = _STATE_DONT;
  149. continue;
  150. /* TERMINAL-TYPE option (start)*/
  151. case TelnetCommand.SB:
  152. __suboption_count = 0;
  153. __receiveState = _STATE_SB;
  154. continue;
  155. /* TERMINAL-TYPE option (end)*/
  156. case TelnetCommand.IAC:
  157. __receiveState = _STATE_DATA;
  158. break;
  159. default:
  160. break;
  161. }
  162. __receiveState = _STATE_DATA;
  163. continue;
  164. case _STATE_WILL:
  165. synchronized (__client)
  166. {
  167. __client._processWill(ch);
  168. __client._flushOutputStream();
  169. }
  170. __receiveState = _STATE_DATA;
  171. continue;
  172. case _STATE_WONT:
  173. synchronized (__client)
  174. {
  175. __client._processWont(ch);
  176. __client._flushOutputStream();
  177. }
  178. __receiveState = _STATE_DATA;
  179. continue;
  180. case _STATE_DO:
  181. synchronized (__client)
  182. {
  183. __client._processDo(ch);
  184. __client._flushOutputStream();
  185. }
  186. __receiveState = _STATE_DATA;
  187. continue;
  188. case _STATE_DONT:
  189. synchronized (__client)
  190. {
  191. __client._processDont(ch);
  192. __client._flushOutputStream();
  193. }
  194. __receiveState = _STATE_DATA;
  195. continue;
  196. /* TERMINAL-TYPE option (start)*/
  197. case _STATE_SB:
  198. switch (ch)
  199. {
  200. case TelnetCommand.IAC:
  201. __receiveState = _STATE_IAC_SB;
  202. continue;
  203. default:
  204. // store suboption char
  205. __suboption[__suboption_count++] = ch;
  206. break;
  207. }
  208. __receiveState = _STATE_SB;
  209. continue;
  210. case _STATE_IAC_SB:
  211. switch (ch)
  212. {
  213. case TelnetCommand.SE:
  214. synchronized (__client)
  215. {
  216. __client._processSuboption(__suboption, __suboption_count);
  217. __client._flushOutputStream();
  218. }
  219. __receiveState = _STATE_DATA;
  220. continue;
  221. default:
  222. __receiveState = _STATE_SB;
  223. break;
  224. }
  225. __receiveState = _STATE_DATA;
  226. continue;
  227. /* TERMINAL-TYPE option (end)*/
  228. }
  229. break;
  230. }
  231. return ch;
  232. }
  233. // synchronized(__client) critical sections are to protect against
  234. // TelnetOutputStream writing through the telnet client at same time
  235. // as a processDo/Will/etc. command invoked from TelnetInputStream
  236. // tries to write.
  237. private void __processChar(int ch) throws InterruptedException
  238. {
  239. // Critical section because we're altering __bytesAvailable,
  240. // __queueTail, and the contents of _queue.
  241. synchronized (__queue)
  242. {
  243. while (__bytesAvailable >= __queue.length - 1)
  244. {
  245. if(__threaded)
  246. {
  247. __queue.notify();
  248. try
  249. {
  250. __queue.wait();
  251. }
  252. catch (InterruptedException e)
  253. {
  254. throw e;
  255. }
  256. }
  257. }
  258. // Need to do this in case we're not full, but block on a read
  259. if (__readIsWaiting && __threaded)
  260. {
  261. __queue.notify();
  262. }
  263. __queue[__queueTail] = ch;
  264. ++__bytesAvailable;
  265. if (++__queueTail >= __queue.length)
  266. __queueTail = 0;
  267. }
  268. }
  269. public int read() throws IOException
  270. {
  271. // Critical section because we're altering __bytesAvailable,
  272. // __queueHead, and the contents of _queue in addition to
  273. // testing value of __hasReachedEOF.
  274. synchronized (__queue)
  275. {
  276. while (true)
  277. {
  278. if (__ioException != null)
  279. {
  280. IOException e;
  281. e = __ioException;
  282. __ioException = null;
  283. throw e;
  284. }
  285. if (__bytesAvailable == 0)
  286. {
  287. // Return -1 if at end of file
  288. if (__hasReachedEOF)
  289. return -1;
  290. // Otherwise, we have to wait for queue to get something
  291. if(__threaded)
  292. {
  293. __queue.notify();
  294. try
  295. {
  296. __readIsWaiting = true;
  297. __queue.wait();
  298. __readIsWaiting = false;
  299. }
  300. catch (InterruptedException e)
  301. {
  302. throw new IOException("Fatal thread interruption during read.");
  303. }
  304. }
  305. else
  306. {
  307. //__alreadyread = false;
  308. __readIsWaiting = true;
  309. int ch;
  310. do
  311. {
  312. try
  313. {
  314. if ((ch = __read()) < 0)
  315. if(ch != -2)
  316. return (ch);
  317. }
  318. catch (InterruptedIOException e)
  319. {
  320. synchronized (__queue)
  321. {
  322. __ioException = e;
  323. __queue.notifyAll();
  324. try
  325. {
  326. __queue.wait(100);
  327. }
  328. catch (InterruptedException interrupted)
  329. {
  330. }
  331. }
  332. return (-1);
  333. }
  334. try
  335. {
  336. if(ch != -2)
  337. {
  338. __processChar(ch);
  339. }
  340. }
  341. catch (InterruptedException e)
  342. {
  343. if (__isClosed)
  344. return (-1);
  345. }
  346. }
  347. while (super.available() > 0);
  348. __readIsWaiting = false;
  349. }
  350. continue;
  351. }
  352. else
  353. {
  354. int ch;
  355. ch = __queue[__queueHead];
  356. if (++__queueHead >= __queue.length)
  357. __queueHead = 0;
  358. --__bytesAvailable;
  359. return ch;
  360. }
  361. }
  362. }
  363. }
  364. /***
  365. * Reads the next number of bytes from the stream into an array and
  366. * returns the number of bytes read. Returns -1 if the end of the
  367. * stream has been reached.
  368. * <p>
  369. * @param buffer The byte array in which to store the data.
  370. * @return The number of bytes read. Returns -1 if the
  371. * end of the message has been reached.
  372. * @exception IOException If an error occurs in reading the underlying
  373. * stream.
  374. ***/
  375. public int read(byte buffer[]) throws IOException
  376. {
  377. return read(buffer, 0, buffer.length);
  378. }
  379. /***
  380. * Reads the next number of bytes from the stream into an array and returns
  381. * the number of bytes read. Returns -1 if the end of the
  382. * message has been reached. The characters are stored in the array
  383. * starting from the given offset and up to the length specified.
  384. * <p>
  385. * @param buffer The byte array in which to store the data.
  386. * @param offset The offset into the array at which to start storing data.
  387. * @param length The number of bytes to read.
  388. * @return The number of bytes read. Returns -1 if the
  389. * end of the stream has been reached.
  390. * @exception IOException If an error occurs while reading the underlying
  391. * stream.
  392. ***/
  393. public int read(byte buffer[], int offset, int length) throws IOException
  394. {
  395. int ch, off;
  396. if (length < 1)
  397. return 0;
  398. // Critical section because run() may change __bytesAvailable
  399. synchronized (__queue)
  400. {
  401. if (length > __bytesAvailable)
  402. length = __bytesAvailable;
  403. }
  404. if ((ch = read()) == -1)
  405. return -1;
  406. off = offset;
  407. do
  408. {
  409. buffer[offset++] = (byte)ch;
  410. }
  411. while (--length > 0 && (ch = read()) != -1);
  412. //__client._spyRead(buffer, off, offset - off);
  413. return (offset - off);
  414. }
  415. /*** Returns false. Mark is not supported. ***/
  416. public boolean markSupported()
  417. {
  418. return false;
  419. }
  420. public int available() throws IOException
  421. {
  422. // Critical section because run() may change __bytesAvailable
  423. synchronized (__queue)
  424. {
  425. return __bytesAvailable;
  426. }
  427. }
  428. // Cannot be synchronized. Will cause deadlock if run() is blocked
  429. // in read because BufferedInputStream read() is synchronized.
  430. public void close() throws IOException
  431. {
  432. // Completely disregard the fact thread may still be running.
  433. // We can't afford to block on this close by waiting for
  434. // thread to terminate because few if any JVM's will actually
  435. // interrupt a system read() from the interrupt() method.
  436. super.close();
  437. synchronized (__queue)
  438. {
  439. __hasReachedEOF = true;
  440. __isClosed = true;
  441. if (__thread.isAlive())
  442. {
  443. __thread.interrupt();
  444. }
  445. __queue.notifyAll();
  446. }
  447. __threaded = false;
  448. }
  449. public void run()
  450. {
  451. int ch;
  452. try
  453. {
  454. _outerLoop:
  455. while (!__isClosed)
  456. {
  457. try
  458. {
  459. if ((ch = __read()) < 0)
  460. break;
  461. }
  462. catch (InterruptedIOException e)
  463. {
  464. synchronized (__queue)
  465. {
  466. __ioException = e;
  467. __queue.notifyAll();
  468. try
  469. {
  470. __queue.wait(100);
  471. }
  472. catch (InterruptedException interrupted)
  473. {
  474. if (__isClosed)
  475. break _outerLoop;
  476. }
  477. continue;
  478. }
  479. } catch(RuntimeException re) {
  480. // We treat any runtime exceptions as though the
  481. // stream has been closed. We close the
  482. // underlying stream just to be sure.
  483. super.close();
  484. // Breaking the loop has the effect of setting
  485. // the state to closed at the end of the method.
  486. break _outerLoop;
  487. }
  488. try
  489. {
  490. __processChar(ch);
  491. }
  492. catch (InterruptedException e)
  493. {
  494. if (__isClosed)
  495. break _outerLoop;
  496. }
  497. }
  498. }
  499. catch (IOException ioe)
  500. {
  501. synchronized (__queue)
  502. {
  503. __ioException = ioe;
  504. }
  505. }
  506. synchronized (__queue)
  507. {
  508. __isClosed = true; // Possibly redundant
  509. __hasReachedEOF = true;
  510. __queue.notify();
  511. }
  512. __threaded = false;
  513. }
  514. }
  515. /* Emacs configuration
  516. * Local variables: **
  517. * mode: java **
  518. * c-basic-offset: 4 **
  519. * indent-tabs-mode: nil **
  520. * End: **
  521. */