aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala9
-rw-r--r--python/pyspark/daemon.py5
-rw-r--r--python/pyspark/tests.py5
3 files changed, 15 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 8d4a53b4ca..4c71b69069 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -76,7 +76,6 @@ private[spark] class PythonRDD(
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
- writerThread.join()
if (!reuse_worker || !released) {
try {
worker.close()
@@ -248,13 +247,17 @@ private[spark] class PythonRDD(
} catch {
case e: Exception if context.isCompleted || context.isInterrupted =>
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
- Utils.tryLog(worker.shutdownOutput())
+ if (!worker.isClosed) {
+ Utils.tryLog(worker.shutdownOutput())
+ }
case e: Exception =>
// We must avoid throwing exceptions here, because the thread uncaught exception handler
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
- Utils.tryLog(worker.shutdownOutput())
+ if (!worker.isClosed) {
+ Utils.tryLog(worker.shutdownOutput())
+ }
} finally {
// Release memory used by this thread for shuffles
env.shuffleMemoryManager.releaseMemoryForThisThread()
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index f09587f211..93885985fe 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -61,7 +61,10 @@ def worker(sock):
except SystemExit as exc:
exit_code = compute_real_exit_code(exc.code)
finally:
- outfile.flush()
+ try:
+ outfile.flush()
+ except Exception:
+ pass
return exit_code
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 06ba2b461d..dd8d3b1c53 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -782,6 +782,11 @@ class RDDTests(ReusedPySparkTestCase):
jobId = tracker.getJobIdsForGroup("test4")[0]
self.assertEqual(3, len(tracker.getJobInfo(jobId).stageIds))
+ # Regression test for SPARK-6294
+ def test_take_on_jrdd(self):
+ rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
+ rdd._jrdd.first()
+
class ProfilerTests(PySparkTestCase):