aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-10-27 20:42:05 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-27 20:42:05 -0700
commit418ad83fe113f2f90552eb7247670279b55aed28 (patch)
treeca5a99abff09519e18c17e299938841242b76685 /sql
parent7e3a1ada86e6adf1ddd4d8a321824daf5f3b2c75 (diff)
downloadspark-418ad83fe113f2f90552eb7247670279b55aed28.tar.gz
spark-418ad83fe113f2f90552eb7247670279b55aed28.tar.bz2
spark-418ad83fe113f2f90552eb7247670279b55aed28.zip
[SPARK-3911] [SQL] HiveSimpleUdf can not be optimized in constant folding
``` explain extended select cos(null) from src limit 1; ``` outputs: ``` Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5] MetastoreRelation default, src, None == Optimized Logical Plan == Limit 1 Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5] MetastoreRelation default, src, None == Physical Plan == Limit 1 Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5] HiveTableScan [], (MetastoreRelation default, src, None), None ``` After patching this PR it outputs ``` == Parsed Logical Plan == Limit 1 Project ['cos(null) AS c_0#0] UnresolvedRelation None, src, None == Analyzed Logical Plan == Limit 1 Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#0] MetastoreRelation default, src, None == Optimized Logical Plan == Limit 1 Project [null AS c_0#0] MetastoreRelation default, src, None == Physical Plan == Limit 1 Project [null AS c_0#0] HiveTableScan [], (MetastoreRelation default, src, None), None ``` Author: Cheng Hao <hao.cheng@intel.com> Closes #2771 from chenghao-intel/hive_udf_constant_folding and squashes the following commits: 1379c73 [Cheng Hao] duplicate the PlanTest with catalyst/plans/PlanTest 1e52dda [Cheng Hao] add unit test for hive simple udf constant folding 01609ff [Cheng Hao] support constant folding for HiveSimpleUdf
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala57
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala32
5 files changed, 104 insertions, 2 deletions
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index 7e9f47ef21..c4a1f899d8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -33,7 +33,8 @@ class PlanTest extends FunSuite {
* we must normalize them to check if two different queries are identical.
*/
protected def normalizeExprIds(plan: LogicalPlan) = {
- val minId = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id)).min
+ val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id))
+ val minId = if (list.isEmpty) 0 else list.min
plan transformAllExpressions {
case a: AttributeReference =>
AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index 68f93f247d..683c820dec 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -99,6 +99,16 @@ private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[
@transient
protected lazy val arguments = children.map(c => toInspector(c.dataType)).toArray
+ @transient
+ protected lazy val isUDFDeterministic = {
+ val udfType = function.getClass().getAnnotation(classOf[HiveUDFType])
+ udfType != null && udfType.deterministic()
+ }
+
+ override def foldable = {
+ isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && n.foldable)
+ }
+
// Create parameter converters
@transient
protected lazy val conversionHelper = new ConversionHelper(method, arguments)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 95921c3d7a..6b06410520 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
import org.scalatest.FunSuite
+import org.apache.spark.sql.catalyst.expressions.{ExprId, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
@@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.util._
* It is hard to have maven allow one subproject depend on another subprojects test code.
* So, we duplicate this code here.
*/
-class QueryTest extends FunSuite {
+class QueryTest extends PlanTest {
/**
* Runs the plan and makes sure the answer matches the expected result.
* @param rdd the [[SchemaRDD]] to be executed
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
new file mode 100644
index 0000000000..081d94b6fc
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util._
+import org.scalatest.FunSuite
+
+/**
+ * *** DUPLICATED FROM sql/catalyst/plans. ***
+ *
+ * It is hard to have maven allow one subproject depend on another subprojects test code.
+ * So, we duplicate this code here.
+ */
+class PlanTest extends FunSuite {
+
+ /**
+ * Since attribute references are given globally unique ids during analysis,
+ * we must normalize them to check if two different queries are identical.
+ */
+ protected def normalizeExprIds(plan: LogicalPlan) = {
+ val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id))
+ val minId = if (list.isEmpty) 0 else list.min
+ plan transformAllExpressions {
+ case a: AttributeReference =>
+ AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId))
+ }
+ }
+
+ /** Fails the test if the two plans do not match */
+ protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
+ val normalized1 = normalizeExprIds(plan1)
+ val normalized2 = normalizeExprIds(plan2)
+ if (normalized1 != normalized2)
+ fail(
+ s"""
+ |== FAIL: Plans do not match ===
+ |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")}
+ """.stripMargin)
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
new file mode 100644
index 0000000000..c939e6e99d
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.test.TestHive
+
+class HivePlanTest extends QueryTest {
+ import TestHive._
+
+ test("udf constant folding") {
+ val optimized = sql("SELECT cos(null) FROM src").queryExecution.optimizedPlan
+ val correctAnswer = sql("SELECT cast(null as double) FROM src").queryExecution.optimizedPlan
+
+ comparePlans(optimized, correctAnswer)
+ }
+}