diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-03-13 19:29:46 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-03-13 19:29:46 -0700 |
commit | 4032beba4948fc931190e2f16816545c9d0a1930 (patch) | |
tree | 0362cb7f2079e721bc7f76aefc6d7bf6d06d88d4 /streaming | |
parent | 3c97276a4a24acaeb34cf5c8b22fd987945c15b3 (diff) | |
parent | e7f1a69c6b8acffffb41fcb66a89e2b624c83da7 (diff) | |
download | spark-4032beba4948fc931190e2f16816545c9d0a1930.tar.gz spark-4032beba4948fc931190e2f16816545c9d0a1930.tar.bz2 spark-4032beba4948fc931190e2f16816545c9d0a1930.zip |
Merge pull request #521 from stephenh/earlyclose
Close the reader in HadoopRDD as soon as iteration end.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala | 44 |
1 files changed, 9 insertions, 35 deletions
diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala index 4af839ad7f..1408af0afa 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -2,6 +2,7 @@ package spark.streaming.dstream import spark.streaming.StreamingContext import spark.storage.StorageLevel +import spark.util.NextIterator import java.io._ import java.net.Socket @@ -59,45 +60,18 @@ object SocketReceiver { */ def bytesToLines(inputStream: InputStream): Iterator[String] = { val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")) - - val iterator = new Iterator[String] { - var gotNext = false - var finished = false - var nextValue: String = null - - private def getNext() { - try { - nextValue = dataInputStream.readLine() - if (nextValue == null) { - finished = true - } - } - gotNext = true - } - - override def hasNext: Boolean = { - if (!finished) { - if (!gotNext) { - getNext() - if (finished) { - dataInputStream.close() - } - } + new NextIterator[String] { + protected override def getNext() = { + val nextValue = dataInputStream.readLine() + if (nextValue == null) { + finished = true } - !finished + nextValue } - override def next(): String = { - if (finished) { - throw new NoSuchElementException("End of stream") - } - if (!gotNext) { - getNext() - } - gotNext = false - nextValue + protected override def close() { + dataInputStream.close() } } - iterator } } |