aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala9
-rw-r--r--python/pyspark/daemon.py5
-rw-r--r--python/pyspark/tests.py5
3 files changed, 15 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 2715722b7d..d3077792b5 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -77,7 +77,6 @@ private[spark] class PythonRDD(
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
- writerThread.join()
if (!reuse_worker || !released) {
try {
worker.close()
@@ -249,13 +248,17 @@ private[spark] class PythonRDD(
} catch {
case e: Exception if context.isCompleted || context.isInterrupted =>
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
- Utils.tryLog(worker.shutdownOutput())
+ if (!worker.isClosed) {
+ Utils.tryLog(worker.shutdownOutput())
+ }
case e: Exception =>
// We must avoid throwing exceptions here, because the thread uncaught exception handler
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
- Utils.tryLog(worker.shutdownOutput())
+ if (!worker.isClosed) {
+ Utils.tryLog(worker.shutdownOutput())
+ }
} finally {
// Release memory used by this thread for shuffles
env.shuffleMemoryManager.releaseMemoryForThisThread()
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index f09587f211..93885985fe 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -61,7 +61,10 @@ def worker(sock):
except SystemExit as exc:
exit_code = compute_real_exit_code(exc.code)
finally:
- outfile.flush()
+ try:
+ outfile.flush()
+ except Exception:
+ pass
return exit_code
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 5007b6ebd7..2e490a0fc2 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -739,6 +739,11 @@ class RDDTests(ReusedPySparkTestCase):
converted_rdd = RDD(data_python_rdd, self.sc)
self.assertEqual(2, converted_rdd.count())
+ # Regression test for SPARK-6294
+ def test_take_on_jrdd(self):
+ rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
+ rdd._jrdd.first()
+
class ProfilerTests(PySparkTestCase):