aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-03-12 01:34:38 -0700
committerXiangrui Meng <meng@databricks.com>2015-03-12 01:34:38 -0700
commit712679a7b447346a365b38574d7a86d56a93f767 (patch)
treebbae09b6d40739e27adb7959f8150e60c33992c8 /core
parent25b71d8c15572f0f2b951c827c169f8c65f726ad (diff)
downloadspark-712679a7b447346a365b38574d7a86d56a93f767.tar.gz
spark-712679a7b447346a365b38574d7a86d56a93f767.tar.bz2
spark-712679a7b447346a365b38574d7a86d56a93f767.zip
[SPARK-6294] fix hang when call take() in JVM on PythonRDD
The Thread.interrupt() can not terminate the thread in some cases, so we should not wait for the writerThread of PythonRDD. This PR also ignore some exception during clean up. cc JoshRosen mengxr Author: Davies Liu <davies@databricks.com> Closes #4987 from davies/fix_take and squashes the following commits: 4488f1a [Davies Liu] fix hang when call take() in JVM on PythonRDD
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala9
1 files changed, 6 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 8d4a53b4ca..4c71b69069 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -76,7 +76,6 @@ private[spark] class PythonRDD(
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
- writerThread.join()
if (!reuse_worker || !released) {
try {
worker.close()
@@ -248,13 +247,17 @@ private[spark] class PythonRDD(
} catch {
case e: Exception if context.isCompleted || context.isInterrupted =>
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
- Utils.tryLog(worker.shutdownOutput())
+ if (!worker.isClosed) {
+ Utils.tryLog(worker.shutdownOutput())
+ }
case e: Exception =>
// We must avoid throwing exceptions here, because the thread uncaught exception handler
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
- Utils.tryLog(worker.shutdownOutput())
+ if (!worker.isClosed) {
+ Utils.tryLog(worker.shutdownOutput())
+ }
} finally {
// Release memory used by this thread for shuffles
env.shuffleMemoryManager.releaseMemoryForThisThread()