diff options
author | ksonj <kson@siberie.de> | 2015-05-07 12:04:19 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-05-07 12:04:43 -0700 |
commit | dec8f53719597119034dffbe43b2a9e5fd963083 (patch) | |
tree | 46c9e203b8bebf7e5da02e3e41f0f2fbb55e44d2 | |
parent | 5784c8d95561dce432a85401e1510776fdf723a8 (diff) | |
download | spark-dec8f53719597119034dffbe43b2a9e5fd963083.tar.gz spark-dec8f53719597119034dffbe43b2a9e5fd963083.tar.bz2 spark-dec8f53719597119034dffbe43b2a9e5fd963083.zip |
[SPARK-7116] [SQL] [PYSPARK] Remove cache() causing memory leak
This patch simply removes a `cache()` on an intermediate RDD when evaluating Python UDFs.
Author: ksonj <kson@siberie.de>
Closes #5973 from ksonj/udf and squashes the following commits:
db5b564 [ksonj] removed TODO about cleaning up
fe70c54 [ksonj] Remove cache() causing memory leak
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala | 7 |
1 files changed, 3 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 7a43bfd8bc..58cb1980f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -219,8 +219,8 @@ case class EvaluatePython( /** * :: DeveloperApi :: - * Uses PythonRDD to evaluate a [[PythonUDF]], one partition of tuples at a time. The input - * data is cached and zipped with the result of the udf evaluation. + * Uses PythonRDD to evaluate a [[PythonUDF]], one partition of tuples at a time. + * The input data is zipped with the result of the udf evaluation. */ @DeveloperApi case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: SparkPlan) @@ -229,8 +229,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: def children: Seq[SparkPlan] = child :: Nil def execute(): RDD[Row] = { - // TODO: Clean up after ourselves? - val childResults = child.execute().map(_.copy()).cache() + val childResults = child.execute().map(_.copy()) val parent = childResults.mapPartitions { iter => val pickle = new Pickler |