aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-01-26 20:30:13 -0800
committerYin Huai <yhuai@databricks.com>2016-01-26 20:30:13 -0800
commit58f5d8c1da6feeb598aa5f74ffe1593d4839d11d (patch)
tree635abe8a8aaad075cff00316b0a421c0259f84df /sql
parentce38a35b764397fcf561ac81de6da96579f5c13e (diff)
downloadspark-58f5d8c1da6feeb598aa5f74ffe1593d4839d11d.tar.gz
spark-58f5d8c1da6feeb598aa5f74ffe1593d4839d11d.tar.bz2
spark-58f5d8c1da6feeb598aa5f74ffe1593d4839d11d.zip
[SPARK-12728][SQL] Integrates SQL generation with native view
This PR is a follow-up of PR #10541. It integrates the newly introduced SQL generation feature with native view to make native view canonical. In this PR, a new SQL option `spark.sql.nativeView.canonical` is added. When this option and `spark.sql.nativeView` are both `true`, Spark SQL tries to handle `CREATE VIEW` DDL statements using SQL query strings generated from view definition logical plans. If we failed to map the plan to SQL, we fallback to the original native view approach. One important issue this PR fixes is that, now we can use CTE when defining a view. Originally, when native view is turned on, we wrap the view definition text with an extra `SELECT`. However, HiveQL parser doesn't allow CTE appearing as a subquery. Namely, something like this is disallowed: ```sql SELECT n FROM ( WITH w AS (SELECT 1 AS n) SELECT * FROM w ) v ``` This PR fixes this issue because the extra `SELECT` is no longer needed (also, CTE expressions are inlined as subqueries during analysis phase, thus there won't be CTE expressions in the generated SQL query string). Author: Cheng Lian <lian@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #10733 from liancheng/spark-12728.integrate-sql-gen-with-native-view.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala33
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala95
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala142
6 files changed, 200 insertions, 95 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2d664d3ee6..c9ba670099 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -367,6 +367,14 @@ private[spark] object SQLConf {
"possible, or you may get wrong result.",
isPublic = false)
+ val CANONICAL_NATIVE_VIEW = booleanConf("spark.sql.nativeView.canonical",
+ defaultValue = Some(true),
+ doc = "When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
+ "CREATE VIEW statement using SQL query string generated from view definition logical " +
+ "plan. If the logical plan doesn't have a SQL representation, we fallback to the " +
+ "original native view implementation.",
+ isPublic = false)
+
val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
defaultValue = Some("_corrupt_record"),
doc = "The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
@@ -550,6 +558,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with ParserCon
private[spark] def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
+ private[spark] def canonicalView: Boolean = getConf(CANONICAL_NATIVE_VIEW)
+
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
private[spark] def subexpressionEliminationEnabled: Boolean =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 5f73d71d45..d48143762c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -155,8 +155,21 @@ private[sql] trait SQLTestUtils
}
/**
+ * Drops view `viewName` after calling `f`.
+ */
+ protected def withView(viewNames: String*)(f: => Unit): Unit = {
+ try f finally {
+ viewNames.foreach { name =>
+ sqlContext.sql(s"DROP VIEW IF EXISTS $name")
+ }
+ }
+ }
+
+ /**
* Creates a temporary database and switches current database to it before executing `f`. This
* database is dropped after `f` returns.
+ *
+ * Note that this method doesn't switch current database before executing `f`.
*/
protected def withTempDatabase(f: String => Unit): Unit = {
val dbName = s"db_${UUID.randomUUID().toString.replace('-', '_')}"
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 80e45d5162..a9c0e9ab7c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -579,25 +579,24 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
case p: LogicalPlan if !p.childrenResolved => p
case p: LogicalPlan if p.resolved => p
- case CreateViewAsSelect(table, child, allowExisting, replace, sql) =>
- if (conf.nativeView) {
- if (allowExisting && replace) {
- throw new AnalysisException(
- "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
- }
+ case CreateViewAsSelect(table, child, allowExisting, replace, sql) if conf.nativeView =>
+ if (allowExisting && replace) {
+ throw new AnalysisException(
+ "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
+ }
- val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
- execution.CreateViewAsSelect(
- table.copy(
- specifiedDatabase = Some(dbName),
- name = tblName),
- child.output,
- allowExisting,
- replace)
- } else {
- HiveNativeCommand(sql)
- }
+ execution.CreateViewAsSelect(
+ table.copy(
+ specifiedDatabase = Some(dbName),
+ name = tblName),
+ child,
+ allowExisting,
+ replace)
+
+ case CreateViewAsSelect(table, child, allowExisting, replace, sql) =>
+ HiveNativeCommand(sql)
case p @ CreateTableAsSelect(table, child, allowExisting) =>
val schema = if (table.schema.nonEmpty) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 6e288afbb4..31bda56e8a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -19,9 +19,10 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, SQLBuilder}
import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
/**
@@ -32,10 +33,12 @@ import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
// from Hive and may not work for some cases like create view on self join.
private[hive] case class CreateViewAsSelect(
tableDesc: HiveTable,
- childSchema: Seq[Attribute],
+ child: LogicalPlan,
allowExisting: Boolean,
orReplace: Boolean) extends RunnableCommand {
+ private val childSchema = child.output
+
assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
assert(tableDesc.viewText.isDefined)
@@ -44,55 +47,83 @@ private[hive] case class CreateViewAsSelect(
override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
- if (hiveContext.catalog.tableExists(tableIdentifier)) {
- if (allowExisting) {
- // view already exists, will do nothing, to keep consistent with Hive
- } else if (orReplace) {
- hiveContext.catalog.client.alertView(prepareTable())
- } else {
+ hiveContext.catalog.tableExists(tableIdentifier) match {
+ case true if allowExisting =>
+ // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
+ // already exists.
+
+ case true if orReplace =>
+ // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
+ hiveContext.catalog.client.alertView(prepareTable(sqlContext))
+
+ case true =>
+ // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
+ // exists.
throw new AnalysisException(s"View $tableIdentifier already exists. " +
"If you want to update the view definition, please use ALTER VIEW AS or " +
"CREATE OR REPLACE VIEW AS")
- }
- } else {
- hiveContext.catalog.client.createView(prepareTable())
+
+ case false =>
+ hiveContext.catalog.client.createView(prepareTable(sqlContext))
}
Seq.empty[Row]
}
- private def prepareTable(): HiveTable = {
- // setup column types according to the schema of child.
- val schema = if (tableDesc.schema == Nil) {
- childSchema.map { attr =>
- HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
- }
+ private def prepareTable(sqlContext: SQLContext): HiveTable = {
+ val expandedText = if (sqlContext.conf.canonicalView) {
+ rebuildViewQueryString(sqlContext).getOrElse(wrapViewTextWithSelect)
} else {
- childSchema.zip(tableDesc.schema).map { case (attr, col) =>
- HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment)
+ wrapViewTextWithSelect
+ }
+
+ val viewSchema = {
+ if (tableDesc.schema.isEmpty) {
+ childSchema.map { attr =>
+ HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
+ }
+ } else {
+ childSchema.zip(tableDesc.schema).map { case (attr, col) =>
+ HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment)
+ }
}
}
- val columnNames = childSchema.map(f => verbose(f.name))
+ tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
+ }
+ private def wrapViewTextWithSelect: String = {
// When user specified column names for view, we should create a project to do the renaming.
// When no column name specified, we still need to create a project to declare the columns
// we need, to make us more robust to top level `*`s.
- val projectList = if (tableDesc.schema == Nil) {
- columnNames.mkString(", ")
- } else {
- columnNames.zip(tableDesc.schema.map(f => verbose(f.name))).map {
- case (name, alias) => s"$name AS $alias"
- }.mkString(", ")
+ val viewOutput = {
+ val columnNames = childSchema.map(f => quote(f.name))
+ if (tableDesc.schema.isEmpty) {
+ columnNames.mkString(", ")
+ } else {
+ columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
+ case (name, alias) => s"$name AS $alias"
+ }.mkString(", ")
+ }
}
- val viewName = verbose(tableDesc.name)
-
- val expandedText = s"SELECT $projectList FROM (${tableDesc.viewText.get}) $viewName"
+ val viewText = tableDesc.viewText.get
+ val viewName = quote(tableDesc.name)
+ s"SELECT $viewOutput FROM ($viewText) $viewName"
+ }
- tableDesc.copy(schema = schema, viewText = Some(expandedText))
+ private def rebuildViewQueryString(sqlContext: SQLContext): Option[String] = {
+ val logicalPlan = if (tableDesc.schema.isEmpty) {
+ child
+ } else {
+ val projectList = childSchema.zip(tableDesc.schema).map {
+ case (attr, col) => Alias(attr, col.name)()
+ }
+ sqlContext.executePlan(Project(projectList, child)).analyzed
+ }
+ new SQLBuilder(logicalPlan, sqlContext).toSQL
}
// escape backtick with double-backtick in column name and wrap it with backtick.
- private def verbose(name: String) = s"`${name.replaceAll("`", "``")}`"
+ private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index 261a4746f4..1f731db26f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -147,7 +147,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
// TODO Enable this
// Query plans transformed by DistinctAggregationRewriter are not recognized yet
- ignore("distinct and non-distinct aggregation") {
+ ignore("multi-distinct columns") {
checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM t2 GROUP BY a")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 683008960a..9e53d8a81e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1319,67 +1319,119 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
- test("correctly handle CREATE OR REPLACE VIEW") {
- withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("jt", "jt2") {
- sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
- sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
- checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+ Seq(true, false).foreach { enabled =>
+ val prefix = (if (enabled) "With" else "Without") + " canonical native view: "
+ test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+ withTable("jt", "jt2") {
+ sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt2")
+ sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
+ // make sure the view has been changed.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+
+ sql("DROP VIEW testView")
+
+ val e = intercept[AnalysisException] {
+ sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
+ }
+ assert(e.message.contains("not allowed to define a view"))
+ }
+ }
+ }
- val df = (1 until 10).map(i => i -> i).toDF("i", "j")
- df.write.format("json").saveAsTable("jt2")
- sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
- // make sure the view has been changed.
- checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+ test(s"$prefix correctly handle ALTER VIEW") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+ withTable("jt", "jt2") {
+ withView("testView") {
+ sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt2")
+ sql("ALTER VIEW testView AS SELECT * FROM jt2")
+ // make sure the view has been changed.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+ }
+ }
+ }
+ }
- sql("DROP VIEW testView")
+ test(s"$prefix create hive view for json table") {
+ // json table is not hive-compatible, make sure the new flag fix it.
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+ withTable("jt") {
+ withView("testView") {
+ sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+ }
+ }
+ }
+ }
- val e = intercept[AnalysisException] {
- sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
+ test(s"$prefix create hive view for partitioned parquet table") {
+ // partitioned parquet table is not hive-compatible, make sure the new flag fix it.
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+ withTable("parTable") {
+ withView("testView") {
+ val df = Seq(1 -> "a").toDF("i", "j")
+ df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
+ sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
+ checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
+ }
}
- assert(e.message.contains("not allowed to define a view"))
}
}
}
- test("correctly handle ALTER VIEW") {
- withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("jt", "jt2") {
- sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
- sql("CREATE VIEW testView AS SELECT id FROM jt")
-
- val df = (1 until 10).map(i => i -> i).toDF("i", "j")
- df.write.format("json").saveAsTable("jt2")
- sql("ALTER VIEW testView AS SELECT * FROM jt2")
- // make sure the view has been changed.
- checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
-
- sql("DROP VIEW testView")
+ test("CTE within view") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+ withView("cte_view") {
+ sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w")
+ checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
}
}
}
- test("create hive view for json table") {
- // json table is not hive-compatible, make sure the new flag fix it.
- withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("jt") {
- sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
- sql("CREATE VIEW testView AS SELECT id FROM jt")
- checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
- sql("DROP VIEW testView")
+ test("Using view after switching current database") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+ withView("v") {
+ sql("CREATE VIEW v AS SELECT * FROM src")
+ withTempDatabase { db =>
+ activateDatabase(db) {
+ // Should look up table `src` in database `default`.
+ checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
+
+ // The new `src` table shouldn't be scanned.
+ sql("CREATE TABLE src(key INT, value STRING)")
+ checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
+ }
+ }
}
}
}
- test("create hive view for partitioned parquet table") {
- // partitioned parquet table is not hive-compatible, make sure the new flag fix it.
- withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("parTable") {
- val df = Seq(1 -> "a").toDF("i", "j")
- df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
- sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
- checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
- sql("DROP VIEW testView")
+ test("Using view after adding more columns") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+ withTable("add_col") {
+ sqlContext.range(10).write.saveAsTable("add_col")
+ withView("v") {
+ sql("CREATE VIEW v AS SELECT * FROM add_col")
+ sqlContext.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col")
+ checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10))
+ }
}
}
}