diff options
author | Stephen Haberman <stephen@exigencecorp.com> | 2013-03-11 23:59:17 -0500 |
---|---|---|
committer | Stephen Haberman <stephen@exigencecorp.com> | 2013-03-11 23:59:17 -0500 |
commit | 9e68f4862556995ff4a02251eac3583542c11ad8 (patch) | |
tree | 7f6a9abec254502dc7d88f9f8cac8dba81c7f0e8 /streaming | |
parent | cbf8f0d4dda41ffd45855eab8401fda9b64168cd (diff) | |
download | spark-9e68f4862556995ff4a02251eac3583542c11ad8.tar.gz spark-9e68f4862556995ff4a02251eac3583542c11ad8.tar.bz2 spark-9e68f4862556995ff4a02251eac3583542c11ad8.zip |
More quickly call close in HadoopRDD.
This also refactors out the common "gotNext" iterator pattern into
a shared utility class.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala | 44 |
1 files changed, 9 insertions, 35 deletions
diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala index 4af839ad7f..38239b054a 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -2,6 +2,7 @@ package spark.streaming.dstream import spark.streaming.StreamingContext import spark.storage.StorageLevel +import spark.util.NextIterator import java.io._ import java.net.Socket @@ -59,45 +60,18 @@ object SocketReceiver { */ def bytesToLines(inputStream: InputStream): Iterator[String] = { val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")) - - val iterator = new Iterator[String] { - var gotNext = false - var finished = false - var nextValue: String = null - - private def getNext() { - try { - nextValue = dataInputStream.readLine() - if (nextValue == null) { - finished = true - } - } - gotNext = true - } - - override def hasNext: Boolean = { - if (!finished) { - if (!gotNext) { - getNext() - if (finished) { - dataInputStream.close() - } - } + new NextIterator[String] { + protected override def getNext() { + val nextValue = dataInputStream.readLine() + if (nextValue == null) { + finished = true } - !finished + nextValue } - override def next(): String = { - if (finished) { - throw new NoSuchElementException("End of stream") - } - if (!gotNext) { - getNext() - } - gotNext = false - nextValue + protected override def close() { + dataInputStream.close() } } - iterator } } |