aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala13
1 files changed, 8 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 4ac666c54f..119e0459c5 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -67,17 +67,16 @@ private[spark] class PythonRDD(
envVars += ("SPARK_REUSE_WORKER" -> "1")
}
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
+ // Whether is the worker released into idle pool
+ @volatile var released = false
// Start a thread to feed the process input from our parent's iterator
val writerThread = new WriterThread(env, worker, split, context)
- var complete_cleanly = false
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
writerThread.join()
- if (reuse_worker && complete_cleanly) {
- env.releasePythonWorker(pythonExec, envVars.toMap, worker)
- } else {
+ if (!reuse_worker || !released) {
try {
worker.close()
} catch {
@@ -145,8 +144,12 @@ private[spark] class PythonRDD(
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
+ // Check whether the worker is ready to be re-used.
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
- complete_cleanly = true
+ if (reuse_worker) {
+ env.releasePythonWorker(pythonExec, envVars.toMap, worker)
+ released = true
+ }
}
null
}