aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala12
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala244
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala48
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionSQLBuilderSuite.scala75
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala146
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/SQLBuilderTest.scala74
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala70
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala1
10 files changed, 647 insertions, 27 deletions
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index bd1a52e5f3..afd2f61158 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -41,9 +41,12 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
private val originalColumnBatchSize = TestHive.conf.columnBatchSize
private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning
- def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
+ def testCases: Seq[(String, File)] = {
+ hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
+ }
override def beforeAll() {
+ super.beforeAll()
TestHive.cacheTables = true
// Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*)
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
@@ -68,10 +71,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
// For debugging dump some statistics about how much time was spent in various optimizer rules.
logWarning(RuleExecutor.dumpTimeSpent())
+ super.afterAll()
}
/** A list of tests deemed out of scope currently and thus completely disregarded. */
- override def blackList = Seq(
+ override def blackList: Seq[String] = Seq(
// These tests use hooks that are not on the classpath and thus break all subsequent execution.
"hook_order",
"hook_context_cs",
@@ -106,7 +110,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"alter_merge",
"alter_concatenate_indexed_table",
"protectmode2",
- //"describe_table",
+ // "describe_table",
"describe_comment_nonascii",
"create_merge_compressed",
@@ -323,7 +327,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
* The set of tests that are believed to be working in catalyst. Tests not on whiteList or
* blacklist are implicitly marked as ignored.
*/
- override def whiteList = Seq(
+ override def whiteList: Seq[String] = Seq(
"add_part_exist",
"add_part_multiple",
"add_partition_no_whitelist",
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
index 98bbdf0653..bad3ca6da2 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
@@ -104,6 +104,7 @@ class HiveWindowFunctionQuerySuite extends HiveComparisonTest with BeforeAndAfte
TimeZone.setDefault(originalTimeZone)
Locale.setDefault(originalLocale)
TestHive.reset()
+ super.afterAll()
}
/////////////////////////////////////////////////////////////////////////////
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index bf3fe12d5c..5b13dbe473 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -668,7 +668,8 @@ private[hive] object HiveQl extends SparkQl with Logging {
Option(FunctionRegistry.getFunctionInfo(functionName.toLowerCase)).getOrElse(
sys.error(s"Couldn't find function $functionName"))
val functionClassName = functionInfo.getFunctionClass.getName
- HiveGenericUDTF(new HiveFunctionWrapper(functionClassName), children.map(nodeToExpr))
+ HiveGenericUDTF(
+ functionName, new HiveFunctionWrapper(functionClassName), children.map(nodeToExpr))
case other => super.nodeToGenerator(node)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
new file mode 100644
index 0000000000..1c910051fa
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder}
+import org.apache.spark.sql.catalyst.optimizer.ProjectCollapsing
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+ * A builder class used to convert a resolved logical plan into a SQL query string. Note that this
+ * all resolved logical plan are convertible. They either don't have corresponding SQL
+ * representations (e.g. logical plans that operate on local Scala collections), or are simply not
+ * supported by this builder (yet).
+ */
+class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Logging {
+ def this(df: DataFrame) = this(df.queryExecution.analyzed, df.sqlContext)
+
+ def toSQL: Option[String] = {
+ val canonicalizedPlan = Canonicalizer.execute(logicalPlan)
+ val maybeSQL = try {
+ toSQL(canonicalizedPlan)
+ } catch { case cause: UnsupportedOperationException =>
+ logInfo(s"Failed to build SQL query string because: ${cause.getMessage}")
+ None
+ }
+
+ if (maybeSQL.isDefined) {
+ logDebug(
+ s"""Built SQL query string successfully from given logical plan:
+ |
+ |# Original logical plan:
+ |${logicalPlan.treeString}
+ |# Canonicalized logical plan:
+ |${canonicalizedPlan.treeString}
+ |# Built SQL query string:
+ |${maybeSQL.get}
+ """.stripMargin)
+ } else {
+ logDebug(
+ s"""Failed to build SQL query string from given logical plan:
+ |
+ |# Original logical plan:
+ |${logicalPlan.treeString}
+ |# Canonicalized logical plan:
+ |${canonicalizedPlan.treeString}
+ """.stripMargin)
+ }
+
+ maybeSQL
+ }
+
+ private def projectToSQL(
+ projectList: Seq[NamedExpression],
+ child: LogicalPlan,
+ isDistinct: Boolean): Option[String] = {
+ for {
+ childSQL <- toSQL(child)
+ listSQL = projectList.map(_.sql).mkString(", ")
+ maybeFrom = child match {
+ case OneRowRelation => " "
+ case _ => " FROM "
+ }
+ distinct = if (isDistinct) " DISTINCT " else " "
+ } yield s"SELECT$distinct$listSQL$maybeFrom$childSQL"
+ }
+
+ private def aggregateToSQL(
+ groupingExprs: Seq[Expression],
+ aggExprs: Seq[Expression],
+ child: LogicalPlan): Option[String] = {
+ val aggSQL = aggExprs.map(_.sql).mkString(", ")
+ val groupingSQL = groupingExprs.map(_.sql).mkString(", ")
+ val maybeGroupBy = if (groupingSQL.isEmpty) "" else " GROUP BY "
+ val maybeFrom = child match {
+ case OneRowRelation => " "
+ case _ => " FROM "
+ }
+
+ toSQL(child).map { childSQL =>
+ s"SELECT $aggSQL$maybeFrom$childSQL$maybeGroupBy$groupingSQL"
+ }
+ }
+
+ private def toSQL(node: LogicalPlan): Option[String] = node match {
+ case Distinct(Project(list, child)) =>
+ projectToSQL(list, child, isDistinct = true)
+
+ case Project(list, child) =>
+ projectToSQL(list, child, isDistinct = false)
+
+ case Aggregate(groupingExprs, aggExprs, child) =>
+ aggregateToSQL(groupingExprs, aggExprs, child)
+
+ case Limit(limit, child) =>
+ for {
+ childSQL <- toSQL(child)
+ limitSQL = limit.sql
+ } yield s"$childSQL LIMIT $limitSQL"
+
+ case Filter(condition, child) =>
+ for {
+ childSQL <- toSQL(child)
+ whereOrHaving = child match {
+ case _: Aggregate => "HAVING"
+ case _ => "WHERE"
+ }
+ conditionSQL = condition.sql
+ } yield s"$childSQL $whereOrHaving $conditionSQL"
+
+ case Union(left, right) =>
+ for {
+ leftSQL <- toSQL(left)
+ rightSQL <- toSQL(right)
+ } yield s"$leftSQL UNION ALL $rightSQL"
+
+ // ParquetRelation converted from Hive metastore table
+ case Subquery(alias, LogicalRelation(r: ParquetRelation, _)) =>
+ // There seems to be a bug related to `ParquetConversions` analysis rule. The problem is
+ // that, the metastore database name and table name are not always propagated to converted
+ // `ParquetRelation` instances via data source options. Here we use subquery alias as a
+ // workaround.
+ Some(s"`$alias`")
+
+ case Subquery(alias, child) =>
+ toSQL(child).map(childSQL => s"($childSQL) AS $alias")
+
+ case Join(left, right, joinType, condition) =>
+ for {
+ leftSQL <- toSQL(left)
+ rightSQL <- toSQL(right)
+ joinTypeSQL = joinType.sql
+ conditionSQL = condition.map(" ON " + _.sql).getOrElse("")
+ } yield s"$leftSQL $joinTypeSQL JOIN $rightSQL$conditionSQL"
+
+ case MetastoreRelation(database, table, alias) =>
+ val aliasSQL = alias.map(a => s" AS `$a`").getOrElse("")
+ Some(s"`$database`.`$table`$aliasSQL")
+
+ case Sort(orders, _, RepartitionByExpression(partitionExprs, child, _))
+ if orders.map(_.child) == partitionExprs =>
+ for {
+ childSQL <- toSQL(child)
+ partitionExprsSQL = partitionExprs.map(_.sql).mkString(", ")
+ } yield s"$childSQL CLUSTER BY $partitionExprsSQL"
+
+ case Sort(orders, global, child) =>
+ for {
+ childSQL <- toSQL(child)
+ ordersSQL = orders.map { case SortOrder(e, dir) => s"${e.sql} ${dir.sql}" }.mkString(", ")
+ orderOrSort = if (global) "ORDER" else "SORT"
+ } yield s"$childSQL $orderOrSort BY $ordersSQL"
+
+ case RepartitionByExpression(partitionExprs, child, _) =>
+ for {
+ childSQL <- toSQL(child)
+ partitionExprsSQL = partitionExprs.map(_.sql).mkString(", ")
+ } yield s"$childSQL DISTRIBUTE BY $partitionExprsSQL"
+
+ case OneRowRelation =>
+ Some("")
+
+ case _ => None
+ }
+
+ object Canonicalizer extends RuleExecutor[LogicalPlan] {
+ override protected def batches: Seq[Batch] = Seq(
+ Batch("Canonicalizer", FixedPoint(100),
+ // The `WidenSetOperationTypes` analysis rule may introduce extra `Project`s over
+ // `Aggregate`s to perform type casting. This rule merges these `Project`s into
+ // `Aggregate`s.
+ ProjectCollapsing,
+
+ // Used to handle other auxiliary `Project`s added by analyzer (e.g.
+ // `ResolveAggregateFunctions` rule)
+ RecoverScopingInfo
+ )
+ )
+
+ object RecoverScopingInfo extends Rule[LogicalPlan] {
+ override def apply(tree: LogicalPlan): LogicalPlan = tree transform {
+ // This branch handles aggregate functions within HAVING clauses. For example:
+ //
+ // SELECT key FROM src GROUP BY key HAVING max(value) > "val_255"
+ //
+ // This kind of query results in query plans of the following form because of analysis rule
+ // `ResolveAggregateFunctions`:
+ //
+ // Project ...
+ // +- Filter ...
+ // +- Aggregate ...
+ // +- MetastoreRelation default, src, None
+ case plan @ Project(_, Filter(_, _: Aggregate)) =>
+ wrapChildWithSubquery(plan)
+
+ case plan @ Project(_,
+ _: Subquery | _: Filter | _: Join | _: MetastoreRelation | OneRowRelation | _: Limit
+ ) => plan
+
+ case plan: Project =>
+ wrapChildWithSubquery(plan)
+ }
+
+ def wrapChildWithSubquery(project: Project): Project = project match {
+ case Project(projectList, child) =>
+ val alias = SQLBuilder.newSubqueryName
+ val childAttributes = child.outputSet
+ val aliasedProjectList = projectList.map(_.transform {
+ case a: Attribute if childAttributes.contains(a) =>
+ a.withQualifiers(alias :: Nil)
+ }.asInstanceOf[NamedExpression])
+
+ Project(aliasedProjectList, Subquery(alias, child))
+ }
+ }
+ }
+}
+
+object SQLBuilder {
+ private val nextSubqueryId = new AtomicLong(0)
+
+ private def newSubqueryName: String = s"gen_subquery_${nextSubqueryId.getAndIncrement()}"
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index b1a6d0ab7d..e76c18fa52 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -17,30 +17,26 @@
package org.apache.spark.sql.hive
-import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import scala.util.Try
import org.apache.hadoop.hive.ql.exec._
-import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType}
-import org.apache.hadoop.hive.ql.udf.generic._
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper
-import org.apache.hadoop.hive.serde2.objectinspector.{ConstantObjectInspector, ObjectInspector, ObjectInspectorFactory}
+import org.apache.hadoop.hive.ql.udf.generic._
+import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType}
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions
+import org.apache.hadoop.hive.serde2.objectinspector.{ConstantObjectInspector, ObjectInspector, ObjectInspectorFactory}
import org.apache.spark.Logging
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.catalyst.util.sequenceOption
+import org.apache.spark.sql.catalyst.{InternalRow, analysis}
import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.sql.hive.client.ClientWrapper
import org.apache.spark.sql.types._
@@ -75,19 +71,19 @@ private[hive] class HiveFunctionRegistry(
try {
if (classOf[GenericUDFMacro].isAssignableFrom(functionInfo.getFunctionClass)) {
HiveGenericUDF(
- new HiveFunctionWrapper(functionClassName, functionInfo.getGenericUDF), children)
+ name, new HiveFunctionWrapper(functionClassName, functionInfo.getGenericUDF), children)
} else if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
- HiveSimpleUDF(new HiveFunctionWrapper(functionClassName), children)
+ HiveSimpleUDF(name, new HiveFunctionWrapper(functionClassName), children)
} else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
- HiveGenericUDF(new HiveFunctionWrapper(functionClassName), children)
+ HiveGenericUDF(name, new HiveFunctionWrapper(functionClassName), children)
} else if (
classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
- HiveUDAFFunction(new HiveFunctionWrapper(functionClassName), children)
+ HiveUDAFFunction(name, new HiveFunctionWrapper(functionClassName), children)
} else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
HiveUDAFFunction(
- new HiveFunctionWrapper(functionClassName), children, isUDAFBridgeRequired = true)
+ name, new HiveFunctionWrapper(functionClassName), children, isUDAFBridgeRequired = true)
} else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
- val udtf = HiveGenericUDTF(new HiveFunctionWrapper(functionClassName), children)
+ val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(functionClassName), children)
udtf.elementTypes // Force it to check input data types.
udtf
} else {
@@ -137,7 +133,8 @@ private[hive] class HiveFunctionRegistry(
}
}
-private[hive] case class HiveSimpleUDF(funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
+private[hive] case class HiveSimpleUDF(
+ name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
extends Expression with HiveInspectors with CodegenFallback with Logging {
override def deterministic: Boolean = isUDFDeterministic
@@ -191,6 +188,8 @@ private[hive] case class HiveSimpleUDF(funcWrapper: HiveFunctionWrapper, childre
override def toString: String = {
s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
}
+
+ override def sql: String = s"$name(${children.map(_.sql).mkString(", ")})"
}
// Adapter from Catalyst ExpressionResult to Hive DeferredObject
@@ -205,7 +204,8 @@ private[hive] class DeferredObjectAdapter(oi: ObjectInspector, dataType: DataTyp
override def get(): AnyRef = wrap(func(), oi, dataType)
}
-private[hive] case class HiveGenericUDF(funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
+private[hive] case class HiveGenericUDF(
+ name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
extends Expression with HiveInspectors with CodegenFallback with Logging {
override def nullable: Boolean = true
@@ -257,6 +257,8 @@ private[hive] case class HiveGenericUDF(funcWrapper: HiveFunctionWrapper, childr
override def toString: String = {
s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
}
+
+ override def sql: String = s"$name(${children.map(_.sql).mkString(", ")})"
}
/**
@@ -271,6 +273,7 @@ private[hive] case class HiveGenericUDF(funcWrapper: HiveFunctionWrapper, childr
* user defined aggregations, which have clean semantics even in a partitioned execution.
*/
private[hive] case class HiveGenericUDTF(
+ name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression])
extends Generator with HiveInspectors with CodegenFallback {
@@ -336,6 +339,8 @@ private[hive] case class HiveGenericUDTF(
override def toString: String = {
s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
}
+
+ override def sql: String = s"$name(${children.map(_.sql).mkString(", ")})"
}
/**
@@ -343,6 +348,7 @@ private[hive] case class HiveGenericUDTF(
* performance a lot.
*/
private[hive] case class HiveUDAFFunction(
+ name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression],
isUDAFBridgeRequired: Boolean = false,
@@ -427,5 +433,9 @@ private[hive] case class HiveUDAFFunction(
override def supportsPartial: Boolean = false
override val dataType: DataType = inspectorToDataType(returnInspector)
-}
+ override def sql(isDistinct: Boolean): String = {
+ val distinct = if (isDistinct) "DISTINCT " else " "
+ s"$name($distinct${children.map(_.sql).mkString(", ")})"
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionSQLBuilderSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionSQLBuilderSuite.scala
new file mode 100644
index 0000000000..3a6eb57add
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ExpressionSQLBuilderSuite.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.{If, Literal}
+
+class ExpressionSQLBuilderSuite extends SQLBuilderTest {
+ test("literal") {
+ checkSQL(Literal("foo"), "\"foo\"")
+ checkSQL(Literal("\"foo\""), "\"\\\"foo\\\"\"")
+ checkSQL(Literal(1: Byte), "CAST(1 AS TINYINT)")
+ checkSQL(Literal(2: Short), "CAST(2 AS SMALLINT)")
+ checkSQL(Literal(4: Int), "4")
+ checkSQL(Literal(8: Long), "CAST(8 AS BIGINT)")
+ checkSQL(Literal(1.5F), "CAST(1.5 AS FLOAT)")
+ checkSQL(Literal(2.5D), "2.5")
+ checkSQL(
+ Literal(Timestamp.valueOf("2016-01-01 00:00:00")),
+ "TIMESTAMP('2016-01-01 00:00:00.0')")
+ // TODO tests for decimals
+ }
+
+ test("binary comparisons") {
+ checkSQL('a.int === 'b.int, "(`a` = `b`)")
+ checkSQL('a.int <=> 'b.int, "(`a` <=> `b`)")
+ checkSQL('a.int !== 'b.int, "(NOT (`a` = `b`))")
+
+ checkSQL('a.int < 'b.int, "(`a` < `b`)")
+ checkSQL('a.int <= 'b.int, "(`a` <= `b`)")
+ checkSQL('a.int > 'b.int, "(`a` > `b`)")
+ checkSQL('a.int >= 'b.int, "(`a` >= `b`)")
+
+ checkSQL('a.int in ('b.int, 'c.int), "(`a` IN (`b`, `c`))")
+ checkSQL('a.int in (1, 2), "(`a` IN (1, 2))")
+
+ checkSQL('a.int.isNull, "(`a` IS NULL)")
+ checkSQL('a.int.isNotNull, "(`a` IS NOT NULL)")
+ }
+
+ test("logical operators") {
+ checkSQL('a.boolean && 'b.boolean, "(`a` AND `b`)")
+ checkSQL('a.boolean || 'b.boolean, "(`a` OR `b`)")
+ checkSQL(!'a.boolean, "(NOT `a`)")
+ checkSQL(If('a.boolean, 'b.int, 'c.int), "(IF(`a`, `b`, `c`))")
+ }
+
+ test("arithmetic expressions") {
+ checkSQL('a.int + 'b.int, "(`a` + `b`)")
+ checkSQL('a.int - 'b.int, "(`a` - `b`)")
+ checkSQL('a.int * 'b.int, "(`a` * `b`)")
+ checkSQL('a.int / 'b.int, "(`a` / `b`)")
+ checkSQL('a.int % 'b.int, "(`a` % `b`)")
+
+ checkSQL(-'a.int, "(-`a`)")
+ checkSQL(-('a.int + 'b.int), "(-(`a` + `b`))")
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
new file mode 100644
index 0000000000..0e81acf532
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.functions._
+
+class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
+ import testImplicits._
+
+ protected override def beforeAll(): Unit = {
+ sqlContext.range(10).write.saveAsTable("t0")
+
+ sqlContext
+ .range(10)
+ .select('id as 'key, concat(lit("val_"), 'id) as 'value)
+ .write
+ .saveAsTable("t1")
+
+ sqlContext.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write.saveAsTable("t2")
+ }
+
+ override protected def afterAll(): Unit = {
+ sql("DROP TABLE IF EXISTS t0")
+ sql("DROP TABLE IF EXISTS t1")
+ sql("DROP TABLE IF EXISTS t2")
+ }
+
+ private def checkHiveQl(hiveQl: String): Unit = {
+ val df = sql(hiveQl)
+ val convertedSQL = new SQLBuilder(df).toSQL
+
+ if (convertedSQL.isEmpty) {
+ fail(
+ s"""Cannot convert the following HiveQL query plan back to SQL query string:
+ |
+ |# Original HiveQL query string:
+ |$hiveQl
+ |
+ |# Resolved query plan:
+ |${df.queryExecution.analyzed.treeString}
+ """.stripMargin)
+ }
+
+ val sqlString = convertedSQL.get
+ try {
+ checkAnswer(sql(sqlString), df)
+ } catch { case cause: Throwable =>
+ fail(
+ s"""Failed to execute converted SQL string or got wrong answer:
+ |
+ |# Converted SQL query string:
+ |$sqlString
+ |
+ |# Original HiveQL query string:
+ |$hiveQl
+ |
+ |# Resolved query plan:
+ |${df.queryExecution.analyzed.treeString}
+ """.stripMargin,
+ cause)
+ }
+ }
+
+ test("in") {
+ checkHiveQl("SELECT id FROM t0 WHERE id IN (1, 2, 3)")
+ }
+
+ test("aggregate function in having clause") {
+ checkHiveQl("SELECT COUNT(value) FROM t1 GROUP BY key HAVING MAX(key) > 0")
+ }
+
+ test("aggregate function in order by clause") {
+ checkHiveQl("SELECT COUNT(value) FROM t1 GROUP BY key ORDER BY MAX(key)")
+ }
+
+ // TODO Fix name collision introduced by ResolveAggregateFunction analysis rule
+ // When there are multiple aggregate functions in ORDER BY clause, all of them are extracted into
+ // Aggregate operator and aliased to the same name "aggOrder". This is OK for normal query
+ // execution since these aliases have different expression ID. But this introduces name collision
+ // when converting resolved plans back to SQL query strings as expression IDs are stripped.
+ ignore("aggregate function in order by clause with multiple order keys") {
+ checkHiveQl("SELECT COUNT(value) FROM t1 GROUP BY key ORDER BY key, MAX(key)")
+ }
+
+ test("type widening in union") {
+ checkHiveQl("SELECT id FROM t0 UNION ALL SELECT CAST(id AS INT) AS id FROM t0")
+ }
+
+ test("case") {
+ checkHiveQl("SELECT CASE WHEN id % 2 > 0 THEN 0 WHEN id % 2 = 0 THEN 1 END FROM t0")
+ }
+
+ test("case with else") {
+ checkHiveQl("SELECT CASE WHEN id % 2 > 0 THEN 0 ELSE 1 END FROM t0")
+ }
+
+ test("case with key") {
+ checkHiveQl("SELECT CASE id WHEN 0 THEN 'foo' WHEN 1 THEN 'bar' END FROM t0")
+ }
+
+ test("case with key and else") {
+ checkHiveQl("SELECT CASE id WHEN 0 THEN 'foo' WHEN 1 THEN 'bar' ELSE 'baz' END FROM t0")
+ }
+
+ test("select distinct without aggregate functions") {
+ checkHiveQl("SELECT DISTINCT id FROM t0")
+ }
+
+ test("cluster by") {
+ checkHiveQl("SELECT id FROM t0 CLUSTER BY id")
+ }
+
+ test("distribute by") {
+ checkHiveQl("SELECT id FROM t0 DISTRIBUTE BY id")
+ }
+
+ test("distribute by with sort by") {
+ checkHiveQl("SELECT id FROM t0 DISTRIBUTE BY id SORT BY id")
+ }
+
+ test("distinct aggregation") {
+ checkHiveQl("SELECT COUNT(DISTINCT id) FROM t0")
+ }
+
+ // TODO Enable this
+ // Query plans transformed by DistinctAggregationRewriter are not recognized yet
+ ignore("distinct and non-distinct aggregation") {
+ checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM t2 GROUP BY a")
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SQLBuilderTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SQLBuilderTest.scala
new file mode 100644
index 0000000000..cf4a3fdd88
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SQLBuilderTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.{DataFrame, QueryTest}
+
+abstract class SQLBuilderTest extends QueryTest with TestHiveSingleton {
+ protected def checkSQL(e: Expression, expectedSQL: String): Unit = {
+ val actualSQL = e.sql
+ try {
+ assert(actualSQL === expectedSQL)
+ } catch {
+ case cause: Throwable =>
+ fail(
+ s"""Wrong SQL generated for the following expression:
+ |
+ |${e.prettyName}
+ |
+ |$cause
+ """.stripMargin)
+ }
+ }
+
+ protected def checkSQL(plan: LogicalPlan, expectedSQL: String): Unit = {
+ val maybeSQL = new SQLBuilder(plan, hiveContext).toSQL
+
+ if (maybeSQL.isEmpty) {
+ fail(
+ s"""Cannot convert the following logical query plan to SQL:
+ |
+ |${plan.treeString}
+ """.stripMargin)
+ }
+
+ val actualSQL = maybeSQL.get
+
+ try {
+ assert(actualSQL === expectedSQL)
+ } catch {
+ case cause: Throwable =>
+ fail(
+ s"""Wrong SQL generated for the following logical query plan:
+ |
+ |${plan.treeString}
+ |
+ |$cause
+ """.stripMargin)
+ }
+
+ checkAnswer(sqlContext.sql(actualSQL), new DataFrame(sqlContext, plan))
+ }
+
+ protected def checkSQL(df: DataFrame, expectedSQL: String): Unit = {
+ checkSQL(df.queryExecution.analyzed, expectedSQL)
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index d7e8ebc8d3..57358a0784 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -27,9 +27,10 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.{ExplainCommand, SetCommand}
import org.apache.spark.sql.execution.datasources.DescribeCommand
+import org.apache.spark.sql.execution.{ExplainCommand, SetCommand}
import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.hive.{InsertIntoHiveTable => LogicalInsertIntoHiveTable, SQLBuilder}
/**
* Allows the creations of tests that execute the same query against both hive
@@ -130,6 +131,28 @@ abstract class HiveComparisonTest
new java.math.BigInteger(1, digest.digest).toString(16)
}
+ /** Used for testing [[SQLBuilder]] */
+ private var numConvertibleQueries: Int = 0
+ private var numTotalQueries: Int = 0
+
+ override protected def afterAll(): Unit = {
+ logInfo({
+ val percentage = if (numTotalQueries > 0) {
+ numConvertibleQueries.toDouble / numTotalQueries * 100
+ } else {
+ 0D
+ }
+
+ s"""SQLBuiler statistics:
+ |- Total query number: $numTotalQueries
+ |- Number of convertible queries: $numConvertibleQueries
+ |- Percentage of convertible queries: $percentage%
+ """.stripMargin
+ })
+
+ super.afterAll()
+ }
+
protected def prepareAnswer(
hiveQuery: TestHive.type#QueryExecution,
answer: Seq[String]): Seq[String] = {
@@ -372,8 +395,49 @@ abstract class HiveComparisonTest
// Run w/ catalyst
val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) =>
- val query = new TestHive.QueryExecution(queryString)
- try { (query, prepareAnswer(query, query.stringResult())) } catch {
+ var query: TestHive.QueryExecution = null
+ try {
+ query = {
+ val originalQuery = new TestHive.QueryExecution(queryString)
+ val containsCommands = originalQuery.analyzed.collectFirst {
+ case _: Command => ()
+ case _: LogicalInsertIntoHiveTable => ()
+ }.nonEmpty
+
+ if (containsCommands) {
+ originalQuery
+ } else {
+ numTotalQueries += 1
+ new SQLBuilder(originalQuery.analyzed, TestHive).toSQL.map { sql =>
+ numConvertibleQueries += 1
+ logInfo(
+ s"""
+ |### Running SQL generation round-trip test {{{
+ |${originalQuery.analyzed.treeString}
+ |Original SQL:
+ |$queryString
+ |
+ |Generated SQL:
+ |$sql
+ |}}}
+ """.stripMargin.trim)
+ new TestHive.QueryExecution(sql)
+ }.getOrElse {
+ logInfo(
+ s"""
+ |### Cannot convert the following logical plan back to SQL {{{
+ |${originalQuery.analyzed.treeString}
+ |Original SQL:
+ |$queryString
+ |}}}
+ """.stripMargin.trim)
+ originalQuery
+ }
+ }
+ }
+
+ (query, prepareAnswer(query, query.stringResult()))
+ } catch {
case e: Throwable =>
val errorMessage =
s"""
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index fa99289b41..4659d745fe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -60,6 +60,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
TimeZone.setDefault(originalTimeZone)
Locale.setDefault(originalLocale)
sql("DROP TEMPORARY FUNCTION udtf_count2")
+ super.afterAll()
}
test("SPARK-4908: concurrent hive native commands") {