diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-03-12 23:16:59 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-03-12 23:16:59 -0700 |
commit | 4ea23db0efff2f39ac5b8f0bd1d9a6ffa3eceb0d (patch) | |
tree | 2e492f74af69536b303b8515251b6579c43d0227 | |
parent | 6bd2eaa4a5bcf811c5b85be27c5e50058b5d0c12 (diff) | |
download | spark-4ea23db0efff2f39ac5b8f0bd1d9a6ffa3eceb0d.tar.gz spark-4ea23db0efff2f39ac5b8f0bd1d9a6ffa3eceb0d.tar.bz2 spark-4ea23db0efff2f39ac5b8f0bd1d9a6ffa3eceb0d.zip |
SPARK-1019: pyspark RDD take() throws an NPE
Author: Patrick Wendell <pwendell@gmail.com>
Closes #112 from pwendell/pyspark-take and squashes the following commits:
daae80e [Patrick Wendell] SPARK-1019: pyspark RDD take() throws an NPE
-rw-r--r-- | core/src/main/scala/org/apache/spark/TaskContext.scala | 3 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 8 |
2 files changed, 10 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index cae983ed4c..be53ca2968 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -46,6 +46,7 @@ class TaskContext( } def executeOnCompleteCallbacks() { - onCompleteCallbacks.foreach{_()} + // Process complete callbacks in the reverse order of registration + onCompleteCallbacks.reverse.foreach{_()} } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index e4d0285710..b67286a4e3 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -100,6 +100,14 @@ private[spark] class PythonRDD[T: ClassTag]( } }.start() + /* + * Partial fix for SPARK-1019: Attempts to stop reading the input stream since + * other completion callbacks might invalidate the input. Because interruption + * is not synchronous this still leaves a potential race where the interruption is + * processed only after the stream becomes invalid. + */ + context.addOnCompleteCallback(() => context.interrupted = true) + // Return an iterator that read lines from the process's stdout val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize)) val stdoutIterator = new Iterator[Array[Byte]] { |