diff options
author | Davies Liu <davies@databricks.com> | 2015-03-12 15:19:17 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-03-12 15:19:17 -0700 |
commit | 9ebd6f12e67cd5995896d5bedf4a205b602109a5 (patch) | |
tree | 39ef9ce80e1810dbeb2d3fe0823808149a0de0bd | |
parent | c684e5f9ff193ac78d48f214d7f8b87df227c8a6 (diff) | |
download | spark-9ebd6f12e67cd5995896d5bedf4a205b602109a5.tar.gz spark-9ebd6f12e67cd5995896d5bedf4a205b602109a5.tar.bz2 spark-9ebd6f12e67cd5995896d5bedf4a205b602109a5.zip |
[SPARK-6294] [PySpark] fix take of PythonRDD in JVM (branch 1.2)
The Thread.interrupt() can not terminate the thread in some cases, so we should not wait for the writerThread of PythonRDD.
This PR also ignore some exception during clean up.
cc mengxr
Author: Davies Liu <davies@databricks.com>
Closes #5003 from davies/fix_take2 and squashes the following commits:
2f2f893 [Davies Liu] fix take of PythonRDD in JVM
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 9 | ||||
-rw-r--r-- | python/pyspark/daemon.py | 5 | ||||
-rw-r--r-- | python/pyspark/tests.py | 5 |
3 files changed, 15 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 2715722b7d..d3077792b5 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -77,7 +77,6 @@ private[spark] class PythonRDD( context.addTaskCompletionListener { context => writerThread.shutdownOnTaskCompletion() - writerThread.join() if (!reuse_worker || !released) { try { worker.close() @@ -249,13 +248,17 @@ private[spark] class PythonRDD( } catch { case e: Exception if context.isCompleted || context.isInterrupted => logDebug("Exception thrown after task completion (likely due to cleanup)", e) - Utils.tryLog(worker.shutdownOutput()) + if (!worker.isClosed) { + Utils.tryLog(worker.shutdownOutput()) + } case e: Exception => // We must avoid throwing exceptions here, because the thread uncaught exception handler // will kill the whole executor (see org.apache.spark.executor.Executor). _exception = e - Utils.tryLog(worker.shutdownOutput()) + if (!worker.isClosed) { + Utils.tryLog(worker.shutdownOutput()) + } } finally { // Release memory used by this thread for shuffles env.shuffleMemoryManager.releaseMemoryForThisThread() diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index f09587f211..93885985fe 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -61,7 +61,10 @@ def worker(sock): except SystemExit as exc: exit_code = compute_real_exit_code(exc.code) finally: - outfile.flush() + try: + outfile.flush() + except Exception: + pass return exit_code diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 5007b6ebd7..2e490a0fc2 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -739,6 +739,11 @@ class RDDTests(ReusedPySparkTestCase): converted_rdd = RDD(data_python_rdd, self.sc) self.assertEqual(2, converted_rdd.count()) + # Regression test for SPARK-6294 + def test_take_on_jrdd(self): + rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x)) + rdd._jrdd.first() + class ProfilerTests(PySparkTestCase): |