aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala33
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala95
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala142
6 files changed, 200 insertions, 95 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2d664d3ee6..c9ba670099 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -367,6 +367,14 @@ private[spark] object SQLConf {
"possible, or you may get wrong result.",
isPublic = false)
+ val CANONICAL_NATIVE_VIEW = booleanConf("spark.sql.nativeView.canonical",
+ defaultValue = Some(true),
+ doc = "When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
+ "CREATE VIEW statement using SQL query string generated from view definition logical " +
+ "plan. If the logical plan doesn't have a SQL representation, we fallback to the " +
+ "original native view implementation.",
+ isPublic = false)
+
val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
defaultValue = Some("_corrupt_record"),
doc = "The name of internal column for storing raw/un-parsed JSON records that fail to parse.")
@@ -550,6 +558,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with ParserCon
private[spark] def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
+ private[spark] def canonicalView: Boolean = getConf(CANONICAL_NATIVE_VIEW)
+
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
private[spark] def subexpressionEliminationEnabled: Boolean =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 5f73d71d45..d48143762c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -155,8 +155,21 @@ private[sql] trait SQLTestUtils
}
/**
+ * Drops view `viewName` after calling `f`.
+ */
+ protected def withView(viewNames: String*)(f: => Unit): Unit = {
+ try f finally {
+ viewNames.foreach { name =>
+ sqlContext.sql(s"DROP VIEW IF EXISTS $name")
+ }
+ }
+ }
+
+ /**
* Creates a temporary database and switches current database to it before executing `f`. This
* database is dropped after `f` returns.
+ *
+ * Note that this method doesn't switch current database before executing `f`.
*/
protected def withTempDatabase(f: String => Unit): Unit = {
val dbName = s"db_${UUID.randomUUID().toString.replace('-', '_')}"
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 80e45d5162..a9c0e9ab7c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -579,25 +579,24 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
case p: LogicalPlan if !p.childrenResolved => p
case p: LogicalPlan if p.resolved => p
- case CreateViewAsSelect(table, child, allowExisting, replace, sql) =>
- if (conf.nativeView) {
- if (allowExisting && replace) {
- throw new AnalysisException(
- "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
- }
+ case CreateViewAsSelect(table, child, allowExisting, replace, sql) if conf.nativeView =>
+ if (allowExisting && replace) {
+ throw new AnalysisException(
+ "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
+ }
- val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
+ val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
- execution.CreateViewAsSelect(
- table.copy(
- specifiedDatabase = Some(dbName),
- name = tblName),
- child.output,
- allowExisting,
- replace)
- } else {
- HiveNativeCommand(sql)
- }
+ execution.CreateViewAsSelect(
+ table.copy(
+ specifiedDatabase = Some(dbName),
+ name = tblName),
+ child,
+ allowExisting,
+ replace)
+
+ case CreateViewAsSelect(table, child, allowExisting, replace, sql) =>
+ HiveNativeCommand(sql)
case p @ CreateTableAsSelect(table, child, allowExisting) =>
val schema = if (table.schema.nonEmpty) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 6e288afbb4..31bda56e8a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -19,9 +19,10 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, SQLBuilder}
import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
/**
@@ -32,10 +33,12 @@ import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
// from Hive and may not work for some cases like create view on self join.
private[hive] case class CreateViewAsSelect(
tableDesc: HiveTable,
- childSchema: Seq[Attribute],
+ child: LogicalPlan,
allowExisting: Boolean,
orReplace: Boolean) extends RunnableCommand {
+ private val childSchema = child.output
+
assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
assert(tableDesc.viewText.isDefined)
@@ -44,55 +47,83 @@ private[hive] case class CreateViewAsSelect(
override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
- if (hiveContext.catalog.tableExists(tableIdentifier)) {
- if (allowExisting) {
- // view already exists, will do nothing, to keep consistent with Hive
- } else if (orReplace) {
- hiveContext.catalog.client.alertView(prepareTable())
- } else {
+ hiveContext.catalog.tableExists(tableIdentifier) match {
+ case true if allowExisting =>
+ // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
+ // already exists.
+
+ case true if orReplace =>
+ // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
+ hiveContext.catalog.client.alertView(prepareTable(sqlContext))
+
+ case true =>
+ // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
+ // exists.
throw new AnalysisException(s"View $tableIdentifier already exists. " +
"If you want to update the view definition, please use ALTER VIEW AS or " +
"CREATE OR REPLACE VIEW AS")
- }
- } else {
- hiveContext.catalog.client.createView(prepareTable())
+
+ case false =>
+ hiveContext.catalog.client.createView(prepareTable(sqlContext))
}
Seq.empty[Row]
}
- private def prepareTable(): HiveTable = {
- // setup column types according to the schema of child.
- val schema = if (tableDesc.schema == Nil) {
- childSchema.map { attr =>
- HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
- }
+ private def prepareTable(sqlContext: SQLContext): HiveTable = {
+ val expandedText = if (sqlContext.conf.canonicalView) {
+ rebuildViewQueryString(sqlContext).getOrElse(wrapViewTextWithSelect)
} else {
- childSchema.zip(tableDesc.schema).map { case (attr, col) =>
- HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment)
+ wrapViewTextWithSelect
+ }
+
+ val viewSchema = {
+ if (tableDesc.schema.isEmpty) {
+ childSchema.map { attr =>
+ HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
+ }
+ } else {
+ childSchema.zip(tableDesc.schema).map { case (attr, col) =>
+ HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment)
+ }
}
}
- val columnNames = childSchema.map(f => verbose(f.name))
+ tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
+ }
+ private def wrapViewTextWithSelect: String = {
// When user specified column names for view, we should create a project to do the renaming.
// When no column name specified, we still need to create a project to declare the columns
// we need, to make us more robust to top level `*`s.
- val projectList = if (tableDesc.schema == Nil) {
- columnNames.mkString(", ")
- } else {
- columnNames.zip(tableDesc.schema.map(f => verbose(f.name))).map {
- case (name, alias) => s"$name AS $alias"
- }.mkString(", ")
+ val viewOutput = {
+ val columnNames = childSchema.map(f => quote(f.name))
+ if (tableDesc.schema.isEmpty) {
+ columnNames.mkString(", ")
+ } else {
+ columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
+ case (name, alias) => s"$name AS $alias"
+ }.mkString(", ")
+ }
}
- val viewName = verbose(tableDesc.name)
-
- val expandedText = s"SELECT $projectList FROM (${tableDesc.viewText.get}) $viewName"
+ val viewText = tableDesc.viewText.get
+ val viewName = quote(tableDesc.name)
+ s"SELECT $viewOutput FROM ($viewText) $viewName"
+ }
- tableDesc.copy(schema = schema, viewText = Some(expandedText))
+ private def rebuildViewQueryString(sqlContext: SQLContext): Option[String] = {
+ val logicalPlan = if (tableDesc.schema.isEmpty) {
+ child
+ } else {
+ val projectList = childSchema.zip(tableDesc.schema).map {
+ case (attr, col) => Alias(attr, col.name)()
+ }
+ sqlContext.executePlan(Project(projectList, child)).analyzed
+ }
+ new SQLBuilder(logicalPlan, sqlContext).toSQL
}
// escape backtick with double-backtick in column name and wrap it with backtick.
- private def verbose(name: String) = s"`${name.replaceAll("`", "``")}`"
+ private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index 261a4746f4..1f731db26f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -147,7 +147,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
// TODO Enable this
// Query plans transformed by DistinctAggregationRewriter are not recognized yet
- ignore("distinct and non-distinct aggregation") {
+ ignore("multi-distinct columns") {
checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM t2 GROUP BY a")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 683008960a..9e53d8a81e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1319,67 +1319,119 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
- test("correctly handle CREATE OR REPLACE VIEW") {
- withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("jt", "jt2") {
- sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
- sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
- checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+ Seq(true, false).foreach { enabled =>
+ val prefix = (if (enabled) "With" else "Without") + " canonical native view: "
+ test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+ withTable("jt", "jt2") {
+ sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt2")
+ sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
+ // make sure the view has been changed.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+
+ sql("DROP VIEW testView")
+
+ val e = intercept[AnalysisException] {
+ sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
+ }
+ assert(e.message.contains("not allowed to define a view"))
+ }
+ }
+ }
- val df = (1 until 10).map(i => i -> i).toDF("i", "j")
- df.write.format("json").saveAsTable("jt2")
- sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
- // make sure the view has been changed.
- checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+ test(s"$prefix correctly handle ALTER VIEW") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+ withTable("jt", "jt2") {
+ withView("testView") {
+ sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt2")
+ sql("ALTER VIEW testView AS SELECT * FROM jt2")
+ // make sure the view has been changed.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+ }
+ }
+ }
+ }
- sql("DROP VIEW testView")
+ test(s"$prefix create hive view for json table") {
+ // json table is not hive-compatible, make sure the new flag fix it.
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+ withTable("jt") {
+ withView("testView") {
+ sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+ }
+ }
+ }
+ }
- val e = intercept[AnalysisException] {
- sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
+ test(s"$prefix create hive view for partitioned parquet table") {
+ // partitioned parquet table is not hive-compatible, make sure the new flag fix it.
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
+ withTable("parTable") {
+ withView("testView") {
+ val df = Seq(1 -> "a").toDF("i", "j")
+ df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
+ sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
+ checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
+ }
}
- assert(e.message.contains("not allowed to define a view"))
}
}
}
- test("correctly handle ALTER VIEW") {
- withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("jt", "jt2") {
- sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
- sql("CREATE VIEW testView AS SELECT id FROM jt")
-
- val df = (1 until 10).map(i => i -> i).toDF("i", "j")
- df.write.format("json").saveAsTable("jt2")
- sql("ALTER VIEW testView AS SELECT * FROM jt2")
- // make sure the view has been changed.
- checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
-
- sql("DROP VIEW testView")
+ test("CTE within view") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+ withView("cte_view") {
+ sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w")
+ checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
}
}
}
- test("create hive view for json table") {
- // json table is not hive-compatible, make sure the new flag fix it.
- withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("jt") {
- sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
- sql("CREATE VIEW testView AS SELECT id FROM jt")
- checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
- sql("DROP VIEW testView")
+ test("Using view after switching current database") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+ withView("v") {
+ sql("CREATE VIEW v AS SELECT * FROM src")
+ withTempDatabase { db =>
+ activateDatabase(db) {
+ // Should look up table `src` in database `default`.
+ checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
+
+ // The new `src` table shouldn't be scanned.
+ sql("CREATE TABLE src(key INT, value STRING)")
+ checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src"))
+ }
+ }
}
}
}
- test("create hive view for partitioned parquet table") {
- // partitioned parquet table is not hive-compatible, make sure the new flag fix it.
- withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("parTable") {
- val df = Seq(1 -> "a").toDF("i", "j")
- df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
- sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
- checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
- sql("DROP VIEW testView")
+ test("Using view after adding more columns") {
+ withSQLConf(
+ SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
+ withTable("add_col") {
+ sqlContext.range(10).write.saveAsTable("add_col")
+ withView("v") {
+ sql("CREATE VIEW v AS SELECT * FROM add_col")
+ sqlContext.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col")
+ checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10))
+ }
}
}
}