aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-03-12 23:16:59 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-12 23:16:59 -0700
commit4ea23db0efff2f39ac5b8f0bd1d9a6ffa3eceb0d (patch)
tree2e492f74af69536b303b8515251b6579c43d0227
parent6bd2eaa4a5bcf811c5b85be27c5e50058b5d0c12 (diff)
downloadspark-4ea23db0efff2f39ac5b8f0bd1d9a6ffa3eceb0d.tar.gz
spark-4ea23db0efff2f39ac5b8f0bd1d9a6ffa3eceb0d.tar.bz2
spark-4ea23db0efff2f39ac5b8f0bd1d9a6ffa3eceb0d.zip
SPARK-1019: pyspark RDD take() throws an NPE
Author: Patrick Wendell <pwendell@gmail.com> Closes #112 from pwendell/pyspark-take and squashes the following commits: daae80e [Patrick Wendell] SPARK-1019: pyspark RDD take() throws an NPE
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala8
2 files changed, 10 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index cae983ed4c..be53ca2968 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -46,6 +46,7 @@ class TaskContext(
}
def executeOnCompleteCallbacks() {
- onCompleteCallbacks.foreach{_()}
+ // Process complete callbacks in the reverse order of registration
+ onCompleteCallbacks.reverse.foreach{_()}
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index e4d0285710..b67286a4e3 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -100,6 +100,14 @@ private[spark] class PythonRDD[T: ClassTag](
}
}.start()
+ /*
+ * Partial fix for SPARK-1019: Attempts to stop reading the input stream since
+ * other completion callbacks might invalidate the input. Because interruption
+ * is not synchronous this still leaves a potential race where the interruption is
+ * processed only after the stream becomes invalid.
+ */
+ context.addOnCompleteCallback(() => context.interrupted = true)
+
// Return an iterator that read lines from the process's stdout
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
val stdoutIterator = new Iterator[Array[Byte]] {