aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala103
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala65
3 files changed, 100 insertions, 71 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
index d65b3cb0e3..7f26a7e411 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
@@ -39,7 +39,8 @@ import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}
* supported by this builder (yet).
*/
class SQLBuilder(logicalPlan: LogicalPlan) extends Logging {
- require(logicalPlan.resolved, "SQLBuilder only supports resolved logical query plans")
+ require(logicalPlan.resolved,
+ "SQLBuilder only supports resolved logical query plans. Current plan:\n" + logicalPlan)
def this(df: Dataset[_]) = this(df.queryExecution.analyzed)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 082f944f99..7542f9d6c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -63,9 +63,12 @@ case class CreateViewCommand(
}
override def run(sqlContext: SQLContext): Seq[Row] = {
- val analzyedPlan = sqlContext.executePlan(child).analyzed
+ // If the plan cannot be analyzed, throw an exception and don't proceed.
+ val qe = sqlContext.executePlan(child)
+ qe.assertAnalyzed()
+ val analyzedPlan = qe.analyzed
- require(tableDesc.schema == Nil || tableDesc.schema.length == analzyedPlan.output.length)
+ require(tableDesc.schema == Nil || tableDesc.schema.length == analyzedPlan.output.length)
val sessionState = sqlContext.sessionState
if (sessionState.catalog.tableExists(tableIdentifier)) {
@@ -74,7 +77,7 @@ case class CreateViewCommand(
// already exists.
} else if (replace) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
- sessionState.catalog.alterTable(prepareTable(sqlContext, analzyedPlan))
+ sessionState.catalog.alterTable(prepareTable(sqlContext, analyzedPlan))
} else {
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
// exists.
@@ -85,68 +88,74 @@ case class CreateViewCommand(
} else {
// Create the view if it doesn't exist.
sessionState.catalog.createTable(
- prepareTable(sqlContext, analzyedPlan), ignoreIfExists = false)
+ prepareTable(sqlContext, analyzedPlan), ignoreIfExists = false)
}
Seq.empty[Row]
}
- private def prepareTable(sqlContext: SQLContext, analzyedPlan: LogicalPlan): CatalogTable = {
- val expandedText = if (sqlContext.conf.canonicalView) {
- try rebuildViewQueryString(sqlContext, analzyedPlan) catch {
- case NonFatal(e) => wrapViewTextWithSelect(analzyedPlan)
+ /**
+ * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
+ * SQL based on the analyzed plan, and also creates the proper schema for the view.
+ */
+ private def prepareTable(sqlContext: SQLContext, analyzedPlan: LogicalPlan): CatalogTable = {
+ val viewSQL: String =
+ if (sqlContext.conf.canonicalView) {
+ val logicalPlan =
+ if (tableDesc.schema.isEmpty) {
+ analyzedPlan
+ } else {
+ val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
+ case (attr, col) => Alias(attr, col.name)()
+ }
+ sqlContext.executePlan(Project(projectList, analyzedPlan)).analyzed
+ }
+ new SQLBuilder(logicalPlan).toSQL
+ } else {
+ // When user specified column names for view, we should create a project to do the renaming.
+ // When no column name specified, we still need to create a project to declare the columns
+ // we need, to make us more robust to top level `*`s.
+ val viewOutput = {
+ val columnNames = analyzedPlan.output.map(f => quote(f.name))
+ if (tableDesc.schema.isEmpty) {
+ columnNames.mkString(", ")
+ } else {
+ columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
+ case (name, alias) => s"$name AS $alias"
+ }.mkString(", ")
+ }
+ }
+
+ val viewText = tableDesc.viewText.get
+ val viewName = quote(tableDesc.identifier.table)
+ s"SELECT $viewOutput FROM ($viewText) $viewName"
}
- } else {
- wrapViewTextWithSelect(analzyedPlan)
+
+ // Validate the view SQL - make sure we can parse it and analyze it.
+ // If we cannot analyze the generated query, there is probably a bug in SQL generation.
+ try {
+ sqlContext.sql(viewSQL).queryExecution.assertAnalyzed()
+ } catch {
+ case NonFatal(e) =>
+ throw new RuntimeException(
+ "Failed to analyze the canonicalized SQL. It is possible there is a bug in Spark.", e)
}
- val viewSchema = {
+ val viewSchema: Seq[CatalogColumn] = {
if (tableDesc.schema.isEmpty) {
- analzyedPlan.output.map { a =>
+ analyzedPlan.output.map { a =>
CatalogColumn(a.name, a.dataType.simpleString)
}
} else {
- analzyedPlan.output.zip(tableDesc.schema).map { case (a, col) =>
+ analyzedPlan.output.zip(tableDesc.schema).map { case (a, col) =>
CatalogColumn(col.name, a.dataType.simpleString, nullable = true, col.comment)
}
}
}
- tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
- }
-
- private def wrapViewTextWithSelect(analzyedPlan: LogicalPlan): String = {
- // When user specified column names for view, we should create a project to do the renaming.
- // When no column name specified, we still need to create a project to declare the columns
- // we need, to make us more robust to top level `*`s.
- val viewOutput = {
- val columnNames = analzyedPlan.output.map(f => quote(f.name))
- if (tableDesc.schema.isEmpty) {
- columnNames.mkString(", ")
- } else {
- columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
- case (name, alias) => s"$name AS $alias"
- }.mkString(", ")
- }
- }
-
- val viewText = tableDesc.viewText.get
- val viewName = quote(tableDesc.identifier.table)
- s"SELECT $viewOutput FROM ($viewText) $viewName"
- }
-
- private def rebuildViewQueryString(sqlContext: SQLContext, analzyedPlan: LogicalPlan): String = {
- val logicalPlan = if (tableDesc.schema.isEmpty) {
- analzyedPlan
- } else {
- val projectList = analzyedPlan.output.zip(tableDesc.schema).map {
- case (attr, col) => Alias(attr, col.name)()
- }
- sqlContext.executePlan(Project(projectList, analzyedPlan)).analyzed
- }
- new SQLBuilder(logicalPlan).toSQL
+ tableDesc.copy(schema = viewSchema, viewText = Some(viewSQL))
}
- // escape backtick with double-backtick in column name and wrap it with backtick.
+ /** Escape backtick with double-backtick in column name and wrap it with backtick. */
private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index cdd5cb31d9..0d88b3b87f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -28,26 +28,50 @@ import org.apache.spark.sql.test.SQLTestUtils
class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
import hiveContext.implicits._
+ override def beforeAll(): Unit = {
+ // Create a simple table with two columns: id and id1
+ sqlContext.range(1, 10).selectExpr("id", "id id1").write.format("json").saveAsTable("jt")
+ }
+
+ override def afterAll(): Unit = {
+ sqlContext.sql(s"DROP TABLE IF EXISTS jt")
+ }
+
+ test("nested views") {
+ withView("jtv1", "jtv2") {
+ sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3").collect()
+ sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6").collect()
+ checkAnswer(sql("select count(*) FROM jtv2"), Row(2))
+ }
+ }
+
+ test("error handling: fail if the view sql itself is invalid") {
+ // A table that does not exist
+ intercept[AnalysisException] {
+ sql("CREATE OR REPLACE VIEW myabcdview AS SELECT * FROM table_not_exist1345").collect()
+ }
+
+ // A column that does not exist
+ intercept[AnalysisException] {
+ sql("CREATE OR REPLACE VIEW myabcdview AS SELECT random1234 FROM jt").collect()
+ }
+ }
+
test("correctly parse CREATE VIEW statement") {
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("jt") {
- val df = (1 until 10).map(i => i -> i).toDF("i", "j")
- df.write.format("json").saveAsTable("jt")
- sql(
- """CREATE VIEW IF NOT EXISTS
- |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
- |TBLPROPERTIES ('a' = 'b')
- |AS SELECT * FROM jt""".stripMargin)
- checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
- sql("DROP VIEW testView")
- }
+ sql(
+ """CREATE VIEW IF NOT EXISTS
+ |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
+ |TBLPROPERTIES ('a' = 'b')
+ |AS SELECT * FROM jt""".stripMargin)
+ checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
+ sql("DROP VIEW testView")
}
}
test("correctly handle CREATE VIEW IF NOT EXISTS") {
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
- withTable("jt", "jt2") {
- sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ withTable("jt2") {
sql("CREATE VIEW testView AS SELECT id FROM jt")
val df = (1 until 10).map(i => i -> i).toDF("i", "j")
@@ -66,8 +90,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
- withTable("jt", "jt2") {
- sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ withTable("jt2") {
sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
@@ -90,9 +113,8 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
test(s"$prefix correctly handle ALTER VIEW") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
- withTable("jt", "jt2") {
+ withTable("jt2") {
withView("testView") {
- sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
sql("CREATE VIEW testView AS SELECT id FROM jt")
val df = (1 until 10).map(i => i -> i).toDF("i", "j")
@@ -109,12 +131,9 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
// json table is not hive-compatible, make sure the new flag fix it.
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
- withTable("jt") {
- withView("testView") {
- sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
- sql("CREATE VIEW testView AS SELECT id FROM jt")
- checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
- }
+ withView("testView") {
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
}
}
}