aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-05-02 23:55:13 -0700
committerAaron Davidson <aaron@databricks.com>2014-05-02 23:55:13 -0700
commit0a14421765b672305e8f32ded4a9a1f6f7241d8d (patch)
tree8941d3dfa43c757aa5cf674ae8a8ca54d05ac14c
parent2b961d88079d7a3f9da63d5175d7b61f6dec762b (diff)
downloadspark-0a14421765b672305e8f32ded4a9a1f6f7241d8d.tar.gz
spark-0a14421765b672305e8f32ded4a9a1f6f7241d8d.tar.bz2
spark-0a14421765b672305e8f32ded4a9a1f6f7241d8d.zip
SPARK-1700: Close socket file descriptors on task completion
This will ensure that sockets do not build up over the course of a job, and that cancellation successfully cleans up sockets. Tested in standalone mode. More file descriptors spawn than expected (around 1000ish rather than the expected 8ish) but they do not pile up between runs, or as high as before (where they went up to around 5k). Author: Aaron Davidson <aaron@databricks.com> Closes #623 from aarondav/pyspark2 and squashes the following commits: 0ca13bb [Aaron Davidson] SPARK-1700: Close socket file descriptors on task completion
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala11
1 files changed, 10 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 672c344a56..6140700708 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -54,7 +54,16 @@ private[spark] class PythonRDD[T: ClassTag](
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
val env = SparkEnv.get
- val worker = env.createPythonWorker(pythonExec, envVars.toMap)
+ val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
+
+ // Ensure worker socket is closed on task completion. Closing sockets is idempotent.
+ context.addOnCompleteCallback(() =>
+ try {
+ worker.close()
+ } catch {
+ case e: Exception => logWarning("Failed to close worker socket", e)
+ }
+ )
@volatile var readerException: Exception = null