aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-10-17 14:12:07 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-10-17 14:12:07 -0700
commitadcb7d3350032dda69a43de724c8bdff5fef2c67 (patch)
treef65f1f1fe89831ad2c92bbe82056cd70ca62a02c /sql/core/src
parent803e7f087797bae643754f8db88848a17282ca6e (diff)
downloadspark-adcb7d3350032dda69a43de724c8bdff5fef2c67.tar.gz
spark-adcb7d3350032dda69a43de724c8bdff5fef2c67.tar.bz2
spark-adcb7d3350032dda69a43de724c8bdff5fef2c67.zip
[SPARK-3855][SQL] Preserve the result attribute of python UDFs though transformations
In the current implementation it was possible for the reference to change after analysis. Author: Michael Armbrust <michael@databricks.com> Closes #2717 from marmbrus/pythonUdfResults and squashes the following commits: da14879 [Michael Armbrust] Fix test 6343bcb [Michael Armbrust] add test 9533286 [Michael Armbrust] Correctly preserve the result attribute of python UDFs though transformations
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala12
2 files changed, 11 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 4f1af7234d..79e4ddb8c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -295,7 +295,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
case logical.Repartition(expressions, child) =>
execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
- case e @ EvaluatePython(udf, child) =>
+ case e @ EvaluatePython(udf, child, _) =>
BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil
case _ => Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 0977da3e85..be729e5d24 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -105,13 +105,21 @@ private[spark] object ExtractPythonUdfs extends Rule[LogicalPlan] {
}
}
+object EvaluatePython {
+ def apply(udf: PythonUDF, child: LogicalPlan) =
+ new EvaluatePython(udf, child, AttributeReference("pythonUDF", udf.dataType)())
+}
+
/**
* :: DeveloperApi ::
* Evaluates a [[PythonUDF]], appending the result to the end of the input tuple.
*/
@DeveloperApi
-case class EvaluatePython(udf: PythonUDF, child: LogicalPlan) extends logical.UnaryNode {
- val resultAttribute = AttributeReference("pythonUDF", udf.dataType, nullable=true)()
+case class EvaluatePython(
+ udf: PythonUDF,
+ child: LogicalPlan,
+ resultAttribute: AttributeReference)
+ extends logical.UnaryNode {
def output = child.output :+ resultAttribute
}