From 418ad83fe113f2f90552eb7247670279b55aed28 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 27 Oct 2014 20:42:05 -0700 Subject: [SPARK-3911] [SQL] HiveSimpleUdf can not be optimized in constant folding ``` explain extended select cos(null) from src limit 1; ``` outputs: ``` Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5] MetastoreRelation default, src, None == Optimized Logical Plan == Limit 1 Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5] MetastoreRelation default, src, None == Physical Plan == Limit 1 Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5] HiveTableScan [], (MetastoreRelation default, src, None), None ``` After patching this PR it outputs ``` == Parsed Logical Plan == Limit 1 Project ['cos(null) AS c_0#0] UnresolvedRelation None, src, None == Analyzed Logical Plan == Limit 1 Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#0] MetastoreRelation default, src, None == Optimized Logical Plan == Limit 1 Project [null AS c_0#0] MetastoreRelation default, src, None == Physical Plan == Limit 1 Project [null AS c_0#0] HiveTableScan [], (MetastoreRelation default, src, None), None ``` Author: Cheng Hao Closes #2771 from chenghao-intel/hive_udf_constant_folding and squashes the following commits: 1379c73 [Cheng Hao] duplicate the PlanTest with catalyst/plans/PlanTest 1e52dda [Cheng Hao] add unit test for hive simple udf constant folding 01609ff [Cheng Hao] support constant folding for HiveSimpleUdf --- .../scala/org/apache/spark/sql/hive/hiveUdfs.scala | 10 ++++ .../scala/org/apache/spark/sql/QueryTest.scala | 4 +- .../apache/spark/sql/catalyst/plans/PlanTest.scala | 57 ++++++++++++++++++++++ .../spark/sql/hive/execution/HivePlanTest.scala | 32 ++++++++++++ 4 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 68f93f247d..683c820dec 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -99,6 +99,16 @@ private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[ @transient protected lazy val arguments = children.map(c => toInspector(c.dataType)).toArray + @transient + protected lazy val isUDFDeterministic = { + val udfType = function.getClass().getAnnotation(classOf[HiveUDFType]) + udfType != null && udfType.deterministic() + } + + override def foldable = { + isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && n.foldable) + } + // Create parameter converters @transient protected lazy val conversionHelper = new ConversionHelper(method, arguments) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala index 95921c3d7a..6b06410520 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql import org.scalatest.FunSuite +import org.apache.spark.sql.catalyst.expressions.{ExprId, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ @@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.util._ * It is hard to have maven allow one subproject depend on another subprojects test code. * So, we duplicate this code here. */ -class QueryTest extends FunSuite { +class QueryTest extends PlanTest { /** * Runs the plan and makes sure the answer matches the expected result. * @param rdd the [[SchemaRDD]] to be executed diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala new file mode 100644 index 0000000000..081d94b6fc --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans + +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.util._ +import org.scalatest.FunSuite + +/** + * *** DUPLICATED FROM sql/catalyst/plans. *** + * + * It is hard to have maven allow one subproject depend on another subprojects test code. + * So, we duplicate this code here. + */ +class PlanTest extends FunSuite { + + /** + * Since attribute references are given globally unique ids during analysis, + * we must normalize them to check if two different queries are identical. + */ + protected def normalizeExprIds(plan: LogicalPlan) = { + val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id)) + val minId = if (list.isEmpty) 0 else list.min + plan transformAllExpressions { + case a: AttributeReference => + AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId)) + } + } + + /** Fails the test if the two plans do not match */ + protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) { + val normalized1 = normalizeExprIds(plan1) + val normalized2 = normalizeExprIds(plan2) + if (normalized1 != normalized2) + fail( + s""" + |== FAIL: Plans do not match === + |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")} + """.stripMargin) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala new file mode 100644 index 0000000000..c939e6e99d --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.test.TestHive + +class HivePlanTest extends QueryTest { + import TestHive._ + + test("udf constant folding") { + val optimized = sql("SELECT cos(null) FROM src").queryExecution.optimizedPlan + val correctAnswer = sql("SELECT cast(null as double) FROM src").queryExecution.optimizedPlan + + comparePlans(optimized, correctAnswer) + } +} -- cgit v1.2.3