diff options
author | Davies Liu <davies@databricks.com> | 2015-03-12 01:34:38 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-03-12 01:34:38 -0700 |
commit | 712679a7b447346a365b38574d7a86d56a93f767 (patch) | |
tree | bbae09b6d40739e27adb7959f8150e60c33992c8 /python/pyspark | |
parent | 25b71d8c15572f0f2b951c827c169f8c65f726ad (diff) | |
download | spark-712679a7b447346a365b38574d7a86d56a93f767.tar.gz spark-712679a7b447346a365b38574d7a86d56a93f767.tar.bz2 spark-712679a7b447346a365b38574d7a86d56a93f767.zip |
[SPARK-6294] fix hang when call take() in JVM on PythonRDD
The Thread.interrupt() can not terminate the thread in some cases, so we should not wait for the writerThread of PythonRDD.
This PR also ignore some exception during clean up.
cc JoshRosen mengxr
Author: Davies Liu <davies@databricks.com>
Closes #4987 from davies/fix_take and squashes the following commits:
4488f1a [Davies Liu] fix hang when call take() in JVM on PythonRDD
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/daemon.py | 5 | ||||
-rw-r--r-- | python/pyspark/tests.py | 5 |
2 files changed, 9 insertions, 1 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index f09587f211..93885985fe 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -61,7 +61,10 @@ def worker(sock): except SystemExit as exc: exit_code = compute_real_exit_code(exc.code) finally: - outfile.flush() + try: + outfile.flush() + except Exception: + pass return exit_code diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 06ba2b461d..dd8d3b1c53 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -782,6 +782,11 @@ class RDDTests(ReusedPySparkTestCase): jobId = tracker.getJobIdsForGroup("test4")[0] self.assertEqual(3, len(tracker.getJobInfo(jobId).stageIds)) + # Regression test for SPARK-6294 + def test_take_on_jrdd(self): + rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x)) + rdd._jrdd.first() + class ProfilerTests(PySparkTestCase): |