aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-01-29 17:28:37 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-01-29 17:28:37 -0800
commit5c746eedda8cff2fc1692cf6dce376f4b0ca6fac (patch)
treed4474477ae270cb6059da20a4706b5c45e3ec787 /core
parentce9c43ba8ca1ba6507fd3bf3c647ab7396d33653 (diff)
downloadspark-5c746eedda8cff2fc1692cf6dce376f4b0ca6fac.tar.gz
spark-5c746eedda8cff2fc1692cf6dce376f4b0ca6fac.tar.bz2
spark-5c746eedda8cff2fc1692cf6dce376f4b0ca6fac.zip
[SPARK-5395] [PySpark] fix python process leak while coalesce()
Currently, the Python process is released into pool only after the task had finished, it cause many process forked if coalesce() is called. This PR will change it to release the process as soon as read all the data from it (finish the partition), then a process could be reused to process multiple partitions in a single task. Author: Davies Liu <davies@databricks.com> Closes #4238 from davies/py_leak and squashes the following commits: ec80a43 [Davies Liu] add @volatile 6da437a [Davies Liu] address comments 24ed322 [Davies Liu] fix python process leak while coalesce()
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala13
1 files changed, 8 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 4ac666c54f..119e0459c5 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -67,17 +67,16 @@ private[spark] class PythonRDD(
envVars += ("SPARK_REUSE_WORKER" -> "1")
}
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
+ // Whether is the worker released into idle pool
+ @volatile var released = false
// Start a thread to feed the process input from our parent's iterator
val writerThread = new WriterThread(env, worker, split, context)
- var complete_cleanly = false
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
writerThread.join()
- if (reuse_worker && complete_cleanly) {
- env.releasePythonWorker(pythonExec, envVars.toMap, worker)
- } else {
+ if (!reuse_worker || !released) {
try {
worker.close()
} catch {
@@ -145,8 +144,12 @@ private[spark] class PythonRDD(
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
+ // Check whether the worker is ready to be re-used.
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
- complete_cleanly = true
+ if (reuse_worker) {
+ env.releasePythonWorker(pythonExec, envVars.toMap, worker)
+ released = true
+ }
}
null
}