aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-03-11 23:59:17 -0500
committerStephen Haberman <stephen@exigencecorp.com>2013-03-11 23:59:17 -0500
commit9e68f4862556995ff4a02251eac3583542c11ad8 (patch)
tree7f6a9abec254502dc7d88f9f8cac8dba81c7f0e8 /streaming
parentcbf8f0d4dda41ffd45855eab8401fda9b64168cd (diff)
downloadspark-9e68f4862556995ff4a02251eac3583542c11ad8.tar.gz
spark-9e68f4862556995ff4a02251eac3583542c11ad8.tar.bz2
spark-9e68f4862556995ff4a02251eac3583542c11ad8.zip
More quickly call close in HadoopRDD.
This also refactors out the common "gotNext" iterator pattern into a shared utility class.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala44
1 files changed, 9 insertions, 35 deletions
diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index 4af839ad7f..38239b054a 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -2,6 +2,7 @@ package spark.streaming.dstream
import spark.streaming.StreamingContext
import spark.storage.StorageLevel
+import spark.util.NextIterator
import java.io._
import java.net.Socket
@@ -59,45 +60,18 @@ object SocketReceiver {
*/
def bytesToLines(inputStream: InputStream): Iterator[String] = {
val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"))
-
- val iterator = new Iterator[String] {
- var gotNext = false
- var finished = false
- var nextValue: String = null
-
- private def getNext() {
- try {
- nextValue = dataInputStream.readLine()
- if (nextValue == null) {
- finished = true
- }
- }
- gotNext = true
- }
-
- override def hasNext: Boolean = {
- if (!finished) {
- if (!gotNext) {
- getNext()
- if (finished) {
- dataInputStream.close()
- }
- }
+ new NextIterator[String] {
+ protected override def getNext() {
+ val nextValue = dataInputStream.readLine()
+ if (nextValue == null) {
+ finished = true
}
- !finished
+ nextValue
}
- override def next(): String = {
- if (finished) {
- throw new NoSuchElementException("End of stream")
- }
- if (!gotNext) {
- getNext()
- }
- gotNext = false
- nextValue
+ protected override def close() {
+ dataInputStream.close()
}
}
- iterator
}
}