aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-03-12 15:19:17 -0700
committerXiangrui Meng <meng@databricks.com>2015-03-12 15:19:17 -0700
commit9ebd6f12e67cd5995896d5bedf4a205b602109a5 (patch)
tree39ef9ce80e1810dbeb2d3fe0823808149a0de0bd
parentc684e5f9ff193ac78d48f214d7f8b87df227c8a6 (diff)
downloadspark-9ebd6f12e67cd5995896d5bedf4a205b602109a5.tar.gz
spark-9ebd6f12e67cd5995896d5bedf4a205b602109a5.tar.bz2
spark-9ebd6f12e67cd5995896d5bedf4a205b602109a5.zip
[SPARK-6294] [PySpark] fix take of PythonRDD in JVM (branch 1.2)
The Thread.interrupt() can not terminate the thread in some cases, so we should not wait for the writerThread of PythonRDD. This PR also ignore some exception during clean up. cc mengxr Author: Davies Liu <davies@databricks.com> Closes #5003 from davies/fix_take2 and squashes the following commits: 2f2f893 [Davies Liu] fix take of PythonRDD in JVM
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala9
-rw-r--r--python/pyspark/daemon.py5
-rw-r--r--python/pyspark/tests.py5
3 files changed, 15 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 2715722b7d..d3077792b5 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -77,7 +77,6 @@ private[spark] class PythonRDD(
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
- writerThread.join()
if (!reuse_worker || !released) {
try {
worker.close()
@@ -249,13 +248,17 @@ private[spark] class PythonRDD(
} catch {
case e: Exception if context.isCompleted || context.isInterrupted =>
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
- Utils.tryLog(worker.shutdownOutput())
+ if (!worker.isClosed) {
+ Utils.tryLog(worker.shutdownOutput())
+ }
case e: Exception =>
// We must avoid throwing exceptions here, because the thread uncaught exception handler
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
- Utils.tryLog(worker.shutdownOutput())
+ if (!worker.isClosed) {
+ Utils.tryLog(worker.shutdownOutput())
+ }
} finally {
// Release memory used by this thread for shuffles
env.shuffleMemoryManager.releaseMemoryForThisThread()
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index f09587f211..93885985fe 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -61,7 +61,10 @@ def worker(sock):
except SystemExit as exc:
exit_code = compute_real_exit_code(exc.code)
finally:
- outfile.flush()
+ try:
+ outfile.flush()
+ except Exception:
+ pass
return exit_code
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 5007b6ebd7..2e490a0fc2 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -739,6 +739,11 @@ class RDDTests(ReusedPySparkTestCase):
converted_rdd = RDD(data_python_rdd, self.sc)
self.assertEqual(2, converted_rdd.count())
+ # Regression test for SPARK-6294
+ def test_take_on_jrdd(self):
+ rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
+ rdd._jrdd.first()
+
class ProfilerTests(PySparkTestCase):