aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorXiao Li <gatorsmile@gmail.com>2017-04-16 12:09:34 +0800
committerWenchen Fan <wenchen@databricks.com>2017-04-16 12:09:34 +0800
commite090f3c0ceebdf341536a1c0c70c80afddf2ee2a (patch)
tree4f09e117f81240fb3cbc7b27ca502cb1e34e10b2 /sql
parent35e5ae4f81176af52569c465520a703529893b50 (diff)
downloadspark-e090f3c0ceebdf341536a1c0c70c80afddf2ee2a.tar.gz
spark-e090f3c0ceebdf341536a1c0c70c80afddf2ee2a.tar.bz2
spark-e090f3c0ceebdf341536a1c0c70c80afddf2ee2a.zip
[SPARK-20335][SQL] Children expressions of Hive UDF impacts the determinism of Hive UDF
### What changes were proposed in this pull request? ```JAVA /** * Certain optimizations should not be applied if UDF is not deterministic. * Deterministic UDF returns same result each time it is invoked with a * particular input. This determinism just needs to hold within the context of * a query. * * return true if the UDF is deterministic */ boolean deterministic() default true; ``` Based on the definition of [UDFType](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFType.java#L42-L50), when Hive UDF's children are non-deterministic, Hive UDF is also non-deterministic. ### How was this patch tested? Added test cases. Author: Xiao Li <gatorsmile@gmail.com> Closes #17635 from gatorsmile/udfDeterministic.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala18
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala15
4 files changed, 47 insertions, 3 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 51c814cf32..a83ad61b20 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -44,7 +44,7 @@ private[hive] case class HiveSimpleUDF(
name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
extends Expression with HiveInspectors with CodegenFallback with Logging {
- override def deterministic: Boolean = isUDFDeterministic
+ override def deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic)
override def nullable: Boolean = true
@@ -123,7 +123,7 @@ private[hive] case class HiveGenericUDF(
override def nullable: Boolean = true
- override def deterministic: Boolean = isUDFDeterministic
+ override def deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic)
override def foldable: Boolean =
isUDFDeterministic && returnInspector.isInstanceOf[ConstantObjectInspector]
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 4a8086d7e5..84f915977b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -509,6 +509,19 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
Row(null, null, 110.0, null, null, 10.0) :: Nil)
}
+ test("non-deterministic children expressions of UDAF") {
+ val e = intercept[AnalysisException] {
+ spark.sql(
+ """
+ |SELECT mydoublesum(value + 1.5 * key + rand())
+ |FROM agg1
+ |GROUP BY key
+ """.stripMargin)
+ }.getMessage
+ assert(Seq("nondeterministic expression",
+ "should not appear in the arguments of an aggregate function").forall(e.contains))
+ }
+
test("interpreted aggregate function") {
checkAnswer(
spark.sql(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala
index c9ef72ee11..479ca1e8de 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution
import scala.collection.JavaConverters._
+import org.apache.hadoop.hive.ql.udf.UDAFPercentile
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDAFEvaluator, GenericUDAFMax}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.{AggregationBuffer, Mode}
import org.apache.hadoop.hive.ql.util.JavaDataModel
@@ -26,7 +27,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectIns
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -84,6 +85,21 @@ class HiveUDAFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
Row(1, Row(1, 1))
))
}
+
+ test("non-deterministic children expressions of UDAF") {
+ withTempView("view1") {
+ spark.range(1).selectExpr("id as x", "id as y").createTempView("view1")
+ withUserDefinedFunction("testUDAFPercentile" -> true) {
+ // non-deterministic children of Hive UDAF
+ sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS '${classOf[UDAFPercentile].getName}'")
+ val e1 = intercept[AnalysisException] {
+ sql("SELECT testUDAFPercentile(x, rand()) from view1 group by y")
+ }.getMessage
+ assert(Seq("nondeterministic expression",
+ "should not appear in the arguments of an aggregate function").forall(e1.contains))
+ }
+ }
+ }
}
/**
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index ef6883839d..4bbf925919 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
import org.apache.hadoop.io.{LongWritable, Writable}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -387,6 +388,20 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
hiveContext.reset()
}
+ test("non-deterministic children of UDF") {
+ withUserDefinedFunction("testStringStringUDF" -> true, "testGenericUDFHash" -> true) {
+ // HiveSimpleUDF
+ sql(s"CREATE TEMPORARY FUNCTION testStringStringUDF AS '${classOf[UDFStringString].getName}'")
+ val df1 = sql("SELECT testStringStringUDF(rand(), \"hello\")")
+ assert(!df1.logicalPlan.asInstanceOf[Project].projectList.forall(_.deterministic))
+
+ // HiveGenericUDF
+ sql(s"CREATE TEMPORARY FUNCTION testGenericUDFHash AS '${classOf[GenericUDFHash].getName}'")
+ val df2 = sql("SELECT testGenericUDFHash(rand())")
+ assert(!df2.logicalPlan.asInstanceOf[Project].projectList.forall(_.deterministic))
+ }
+ }
+
test("Hive UDFs with insufficient number of input arguments should trigger an analysis error") {
Seq((1, 2)).toDF("a", "b").createOrReplaceTempView("testUDF")