aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-03-13 19:29:46 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-03-13 19:29:46 -0700
commit4032beba4948fc931190e2f16816545c9d0a1930 (patch)
tree0362cb7f2079e721bc7f76aefc6d7bf6d06d88d4
parent3c97276a4a24acaeb34cf5c8b22fd987945c15b3 (diff)
parente7f1a69c6b8acffffb41fcb66a89e2b624c83da7 (diff)
downloadspark-4032beba4948fc931190e2f16816545c9d0a1930.tar.gz
spark-4032beba4948fc931190e2f16816545c9d0a1930.tar.bz2
spark-4032beba4948fc931190e2f16816545c9d0a1930.zip
Merge pull request #521 from stephenh/earlyclose
Close the reader in HadoopRDD as soon as iteration end.
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala33
-rw-r--r--core/src/main/scala/spark/serializer/Serializer.scala32
-rw-r--r--core/src/main/scala/spark/util/NextIterator.scala71
-rw-r--r--core/src/test/scala/spark/util/NextIteratorSuite.scala68
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala44
5 files changed, 162 insertions, 86 deletions
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index 78097502bc..a6322dc58d 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -16,6 +16,7 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
+import spark.util.NextIterator
/**
@@ -62,7 +63,7 @@ class HadoopRDD[K, V](
.asInstanceOf[InputFormat[K, V]]
}
- override def compute(theSplit: Partition, context: TaskContext) = new Iterator[(K, V)] {
+ override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
var reader: RecordReader[K, V] = null
@@ -71,38 +72,22 @@ class HadoopRDD[K, V](
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
- context.addOnCompleteCallback{ () => close() }
+ context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
- var gotNext = false
- var finished = false
-
- override def hasNext: Boolean = {
- if (!gotNext) {
- try {
- finished = !reader.next(key, value)
- } catch {
- case eof: EOFException =>
- finished = true
- }
- gotNext = true
- }
- !finished
- }
- override def next: (K, V) = {
- if (!gotNext) {
+ override def getNext() = {
+ try {
finished = !reader.next(key, value)
+ } catch {
+ case eof: EOFException =>
+ finished = true
}
- if (finished) {
- throw new NoSuchElementException("End of stream")
- }
- gotNext = false
(key, value)
}
- private def close() {
+ override def close() {
try {
reader.close()
} catch {
diff --git a/core/src/main/scala/spark/serializer/Serializer.scala b/core/src/main/scala/spark/serializer/Serializer.scala
index 50b086125a..aca86ab6f0 100644
--- a/core/src/main/scala/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/spark/serializer/Serializer.scala
@@ -72,40 +72,18 @@ trait DeserializationStream {
* Read the elements of this stream through an iterator. This can only be called once, as
* reading each element will consume data from the input source.
*/
- def asIterator: Iterator[Any] = new Iterator[Any] {
- var gotNext = false
- var finished = false
- var nextValue: Any = null
-
- private def getNext() {
+ def asIterator: Iterator[Any] = new spark.util.NextIterator[Any] {
+ override protected def getNext() = {
try {
- nextValue = readObject[Any]()
+ readObject[Any]()
} catch {
case eof: EOFException =>
finished = true
}
- gotNext = true
}
- override def hasNext: Boolean = {
- if (!gotNext) {
- getNext()
- }
- if (finished) {
- close()
- }
- !finished
- }
-
- override def next(): Any = {
- if (!gotNext) {
- getNext()
- }
- if (finished) {
- throw new NoSuchElementException("End of stream")
- }
- gotNext = false
- nextValue
+ override protected def close() {
+ DeserializationStream.this.close()
}
}
}
diff --git a/core/src/main/scala/spark/util/NextIterator.scala b/core/src/main/scala/spark/util/NextIterator.scala
new file mode 100644
index 0000000000..48b5018ddd
--- /dev/null
+++ b/core/src/main/scala/spark/util/NextIterator.scala
@@ -0,0 +1,71 @@
+package spark.util
+
+/** Provides a basic/boilerplate Iterator implementation. */
+private[spark] abstract class NextIterator[U] extends Iterator[U] {
+
+ private var gotNext = false
+ private var nextValue: U = _
+ private var closed = false
+ protected var finished = false
+
+ /**
+ * Method for subclasses to implement to provide the next element.
+ *
+ * If no next element is available, the subclass should set `finished`
+ * to `true` and may return any value (it will be ignored).
+ *
+ * This convention is required because `null` may be a valid value,
+ * and using `Option` seems like it might create unnecessary Some/None
+ * instances, given some iterators might be called in a tight loop.
+ *
+ * @return U, or set 'finished' when done
+ */
+ protected def getNext(): U
+
+ /**
+ * Method for subclasses to implement when all elements have been successfully
+ * iterated, and the iteration is done.
+ *
+ * <b>Note:</b> `NextIterator` cannot guarantee that `close` will be
+ * called because it has no control over what happens when an exception
+ * happens in the user code that is calling hasNext/next.
+ *
+ * Ideally you should have another try/catch, as in HadoopRDD, that
+ * ensures any resources are closed should iteration fail.
+ */
+ protected def close()
+
+ /**
+ * Calls the subclass-defined close method, but only once.
+ *
+ * Usually calling `close` multiple times should be fine, but historically
+ * there have been issues with some InputFormats throwing exceptions.
+ */
+ def closeIfNeeded() {
+ if (!closed) {
+ close()
+ closed = true
+ }
+ }
+
+ override def hasNext: Boolean = {
+ if (!finished) {
+ if (!gotNext) {
+ nextValue = getNext()
+ if (finished) {
+ closeIfNeeded()
+ }
+ gotNext = true
+ }
+ }
+ !finished
+ }
+
+ override def next(): U = {
+ if (!hasNext) {
+ throw new NoSuchElementException("End of stream")
+ }
+ gotNext = false
+ nextValue
+ }
+} \ No newline at end of file
diff --git a/core/src/test/scala/spark/util/NextIteratorSuite.scala b/core/src/test/scala/spark/util/NextIteratorSuite.scala
new file mode 100644
index 0000000000..ed5b36da73
--- /dev/null
+++ b/core/src/test/scala/spark/util/NextIteratorSuite.scala
@@ -0,0 +1,68 @@
+package spark.util
+
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import scala.collection.mutable.Buffer
+import java.util.NoSuchElementException
+
+class NextIteratorSuite extends FunSuite with ShouldMatchers {
+ test("one iteration") {
+ val i = new StubIterator(Buffer(1))
+ i.hasNext should be === true
+ i.next should be === 1
+ i.hasNext should be === false
+ intercept[NoSuchElementException] { i.next() }
+ }
+
+ test("two iterations") {
+ val i = new StubIterator(Buffer(1, 2))
+ i.hasNext should be === true
+ i.next should be === 1
+ i.hasNext should be === true
+ i.next should be === 2
+ i.hasNext should be === false
+ intercept[NoSuchElementException] { i.next() }
+ }
+
+ test("empty iteration") {
+ val i = new StubIterator(Buffer())
+ i.hasNext should be === false
+ intercept[NoSuchElementException] { i.next() }
+ }
+
+ test("close is called once for empty iterations") {
+ val i = new StubIterator(Buffer())
+ i.hasNext should be === false
+ i.hasNext should be === false
+ i.closeCalled should be === 1
+ }
+
+ test("close is called once for non-empty iterations") {
+ val i = new StubIterator(Buffer(1, 2))
+ i.next should be === 1
+ i.next should be === 2
+ // close isn't called until we check for the next element
+ i.closeCalled should be === 0
+ i.hasNext should be === false
+ i.closeCalled should be === 1
+ i.hasNext should be === false
+ i.closeCalled should be === 1
+ }
+
+ class StubIterator(ints: Buffer[Int]) extends NextIterator[Int] {
+ var closeCalled = 0
+
+ override def getNext() = {
+ if (ints.size == 0) {
+ finished = true
+ 0
+ } else {
+ ints.remove(0)
+ }
+ }
+
+ override def close() {
+ closeCalled += 1
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index 4af839ad7f..1408af0afa 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -2,6 +2,7 @@ package spark.streaming.dstream
import spark.streaming.StreamingContext
import spark.storage.StorageLevel
+import spark.util.NextIterator
import java.io._
import java.net.Socket
@@ -59,45 +60,18 @@ object SocketReceiver {
*/
def bytesToLines(inputStream: InputStream): Iterator[String] = {
val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"))
-
- val iterator = new Iterator[String] {
- var gotNext = false
- var finished = false
- var nextValue: String = null
-
- private def getNext() {
- try {
- nextValue = dataInputStream.readLine()
- if (nextValue == null) {
- finished = true
- }
- }
- gotNext = true
- }
-
- override def hasNext: Boolean = {
- if (!finished) {
- if (!gotNext) {
- getNext()
- if (finished) {
- dataInputStream.close()
- }
- }
+ new NextIterator[String] {
+ protected override def getNext() = {
+ val nextValue = dataInputStream.readLine()
+ if (nextValue == null) {
+ finished = true
}
- !finished
+ nextValue
}
- override def next(): String = {
- if (finished) {
- throw new NoSuchElementException("End of stream")
- }
- if (!gotNext) {
- getNext()
- }
- gotNext = false
- nextValue
+ protected override def close() {
+ dataInputStream.close()
}
}
- iterator
}
}