diff options
author | Xiao Li <gatorsmile@gmail.com> | 2017-04-16 12:09:34 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-04-16 12:09:34 +0800 |
commit | e090f3c0ceebdf341536a1c0c70c80afddf2ee2a (patch) | |
tree | 4f09e117f81240fb3cbc7b27ca502cb1e34e10b2 /sql | |
parent | 35e5ae4f81176af52569c465520a703529893b50 (diff) | |
download | spark-e090f3c0ceebdf341536a1c0c70c80afddf2ee2a.tar.gz spark-e090f3c0ceebdf341536a1c0c70c80afddf2ee2a.tar.bz2 spark-e090f3c0ceebdf341536a1c0c70c80afddf2ee2a.zip |
[SPARK-20335][SQL] Children expressions of Hive UDF impacts the determinism of Hive UDF
### What changes were proposed in this pull request?
```JAVA
/**
* Certain optimizations should not be applied if UDF is not deterministic.
* Deterministic UDF returns same result each time it is invoked with a
* particular input. This determinism just needs to hold within the context of
* a query.
*
* return true if the UDF is deterministic
*/
boolean deterministic() default true;
```
Based on the definition of [UDFType](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFType.java#L42-L50), when Hive UDF's children are non-deterministic, Hive UDF is also non-deterministic.
### How was this patch tested?
Added test cases.
Author: Xiao Li <gatorsmile@gmail.com>
Closes #17635 from gatorsmile/udfDeterministic.
Diffstat (limited to 'sql')
4 files changed, 47 insertions, 3 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 51c814cf32..a83ad61b20 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -44,7 +44,7 @@ private[hive] case class HiveSimpleUDF( name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) extends Expression with HiveInspectors with CodegenFallback with Logging { - override def deterministic: Boolean = isUDFDeterministic + override def deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) override def nullable: Boolean = true @@ -123,7 +123,7 @@ private[hive] case class HiveGenericUDF( override def nullable: Boolean = true - override def deterministic: Boolean = isUDFDeterministic + override def deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) override def foldable: Boolean = isUDFDeterministic && returnInspector.isInstanceOf[ConstantObjectInspector] diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 4a8086d7e5..84f915977b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -509,6 +509,19 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(null, null, 110.0, null, null, 10.0) :: Nil) } + test("non-deterministic children expressions of UDAF") { + val e = intercept[AnalysisException] { + spark.sql( + """ + |SELECT mydoublesum(value + 1.5 * key + rand()) + |FROM agg1 + |GROUP BY key + """.stripMargin) + }.getMessage + assert(Seq("nondeterministic expression", + "should not appear in the arguments of an aggregate function").forall(e.contains)) + } + test("interpreted aggregate function") { checkAnswer( spark.sql( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala index c9ef72ee11..479ca1e8de 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ +import org.apache.hadoop.hive.ql.udf.UDAFPercentile import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDAFEvaluator, GenericUDAFMax} import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.{AggregationBuffer, Mode} import org.apache.hadoop.hive.ql.util.JavaDataModel @@ -26,7 +27,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectIns import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -84,6 +85,21 @@ class HiveUDAFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { Row(1, Row(1, 1)) )) } + + test("non-deterministic children expressions of UDAF") { + withTempView("view1") { + spark.range(1).selectExpr("id as x", "id as y").createTempView("view1") + withUserDefinedFunction("testUDAFPercentile" -> true) { + // non-deterministic children of Hive UDAF + sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS '${classOf[UDAFPercentile].getName}'") + val e1 = intercept[AnalysisException] { + sql("SELECT testUDAFPercentile(x, rand()) from view1 group by y") + }.getMessage + assert(Seq("nondeterministic expression", + "should not appear in the arguments of an aggregate function").forall(e1.contains)) + } + } + } } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index ef6883839d..4bbf925919 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn import org.apache.hadoop.io.{LongWritable, Writable} import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.functions.max import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -387,6 +388,20 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { hiveContext.reset() } + test("non-deterministic children of UDF") { + withUserDefinedFunction("testStringStringUDF" -> true, "testGenericUDFHash" -> true) { + // HiveSimpleUDF + sql(s"CREATE TEMPORARY FUNCTION testStringStringUDF AS '${classOf[UDFStringString].getName}'") + val df1 = sql("SELECT testStringStringUDF(rand(), \"hello\")") + assert(!df1.logicalPlan.asInstanceOf[Project].projectList.forall(_.deterministic)) + + // HiveGenericUDF + sql(s"CREATE TEMPORARY FUNCTION testGenericUDFHash AS '${classOf[GenericUDFHash].getName}'") + val df2 = sql("SELECT testGenericUDFHash(rand())") + assert(!df2.logicalPlan.asInstanceOf[Project].projectList.forall(_.deterministic)) + } + } + test("Hive UDFs with insufficient number of input arguments should trigger an analysis error") { Seq((1, 2)).toDF("a", "b").createOrReplaceTempView("testUDF") |