aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-03-13 19:29:46 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-03-13 19:29:46 -0700
commit4032beba4948fc931190e2f16816545c9d0a1930 (patch)
tree0362cb7f2079e721bc7f76aefc6d7bf6d06d88d4 /streaming
parent3c97276a4a24acaeb34cf5c8b22fd987945c15b3 (diff)
parente7f1a69c6b8acffffb41fcb66a89e2b624c83da7 (diff)
downloadspark-4032beba4948fc931190e2f16816545c9d0a1930.tar.gz
spark-4032beba4948fc931190e2f16816545c9d0a1930.tar.bz2
spark-4032beba4948fc931190e2f16816545c9d0a1930.zip
Merge pull request #521 from stephenh/earlyclose
Close the reader in HadoopRDD as soon as iteration end.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala44
1 files changed, 9 insertions, 35 deletions
diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index 4af839ad7f..1408af0afa 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -2,6 +2,7 @@ package spark.streaming.dstream
import spark.streaming.StreamingContext
import spark.storage.StorageLevel
+import spark.util.NextIterator
import java.io._
import java.net.Socket
@@ -59,45 +60,18 @@ object SocketReceiver {
*/
def bytesToLines(inputStream: InputStream): Iterator[String] = {
val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"))
-
- val iterator = new Iterator[String] {
- var gotNext = false
- var finished = false
- var nextValue: String = null
-
- private def getNext() {
- try {
- nextValue = dataInputStream.readLine()
- if (nextValue == null) {
- finished = true
- }
- }
- gotNext = true
- }
-
- override def hasNext: Boolean = {
- if (!finished) {
- if (!gotNext) {
- getNext()
- if (finished) {
- dataInputStream.close()
- }
- }
+ new NextIterator[String] {
+ protected override def getNext() = {
+ val nextValue = dataInputStream.readLine()
+ if (nextValue == null) {
+ finished = true
}
- !finished
+ nextValue
}
- override def next(): String = {
- if (finished) {
- throw new NoSuchElementException("End of stream")
- }
- if (!gotNext) {
- getNext()
- }
- gotNext = false
- nextValue
+ protected override def close() {
+ dataInputStream.close()
}
}
- iterator
}
}