aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorksonj <kson@siberie.de>2015-05-07 12:04:19 -0700
committerMichael Armbrust <michael@databricks.com>2015-05-07 12:04:43 -0700
commitdec8f53719597119034dffbe43b2a9e5fd963083 (patch)
tree46c9e203b8bebf7e5da02e3e41f0f2fbb55e44d2
parent5784c8d95561dce432a85401e1510776fdf723a8 (diff)
downloadspark-dec8f53719597119034dffbe43b2a9e5fd963083.tar.gz
spark-dec8f53719597119034dffbe43b2a9e5fd963083.tar.bz2
spark-dec8f53719597119034dffbe43b2a9e5fd963083.zip
[SPARK-7116] [SQL] [PYSPARK] Remove cache() causing memory leak
This patch simply removes a `cache()` on an intermediate RDD when evaluating Python UDFs. Author: ksonj <kson@siberie.de> Closes #5973 from ksonj/udf and squashes the following commits: db5b564 [ksonj] removed TODO about cleaning up fe70c54 [ksonj] Remove cache() causing memory leak
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala7
1 files changed, 3 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 7a43bfd8bc..58cb1980f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -219,8 +219,8 @@ case class EvaluatePython(
/**
* :: DeveloperApi ::
- * Uses PythonRDD to evaluate a [[PythonUDF]], one partition of tuples at a time. The input
- * data is cached and zipped with the result of the udf evaluation.
+ * Uses PythonRDD to evaluate a [[PythonUDF]], one partition of tuples at a time.
+ * The input data is zipped with the result of the udf evaluation.
*/
@DeveloperApi
case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: SparkPlan)
@@ -229,8 +229,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
def children: Seq[SparkPlan] = child :: Nil
def execute(): RDD[Row] = {
- // TODO: Clean up after ourselves?
- val childResults = child.execute().map(_.copy()).cache()
+ val childResults = child.execute().map(_.copy())
val parent = childResults.mapPartitions { iter =>
val pickle = new Pickler