aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala9
1 files changed, 6 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 8d4a53b4ca..4c71b69069 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -76,7 +76,6 @@ private[spark] class PythonRDD(
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
- writerThread.join()
if (!reuse_worker || !released) {
try {
worker.close()
@@ -248,13 +247,17 @@ private[spark] class PythonRDD(
} catch {
case e: Exception if context.isCompleted || context.isInterrupted =>
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
- Utils.tryLog(worker.shutdownOutput())
+ if (!worker.isClosed) {
+ Utils.tryLog(worker.shutdownOutput())
+ }
case e: Exception =>
// We must avoid throwing exceptions here, because the thread uncaught exception handler
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
- Utils.tryLog(worker.shutdownOutput())
+ if (!worker.isClosed) {
+ Utils.tryLog(worker.shutdownOutput())
+ }
} finally {
// Release memory used by this thread for shuffles
env.shuffleMemoryManager.releaseMemoryForThisThread()