aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala11
1 files changed, 10 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 672c344a56..6140700708 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -54,7 +54,16 @@ private[spark] class PythonRDD[T: ClassTag](
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
val env = SparkEnv.get
- val worker = env.createPythonWorker(pythonExec, envVars.toMap)
+ val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
+
+ // Ensure worker socket is closed on task completion. Closing sockets is idempotent.
+ context.addOnCompleteCallback(() =>
+ try {
+ worker.close()
+ } catch {
+ case e: Exception => logWarning("Failed to close worker socket", e)
+ }
+ )
@volatile var readerException: Exception = null