aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test/scala/org
diff options
context:
space:
mode:
authorjiangxingbo <jiangxb1987@gmail.com>2017-01-11 13:44:07 -0800
committerHerman van Hovell <hvanhovell@databricks.com>2017-01-11 13:44:07 -0800
commit30a07071f099c0ebcf04c4df61f8d414dcbad7b5 (patch)
tree40311294f1c2be27b5e0bf101700bb4553aadc0b /sql/hive/src/test/scala/org
parent3bc2eff8880a3ba8d4318118715ea1a47048e3de (diff)
downloadspark-30a07071f099c0ebcf04c4df61f8d414dcbad7b5.tar.gz
spark-30a07071f099c0ebcf04c4df61f8d414dcbad7b5.tar.bz2
spark-30a07071f099c0ebcf04c4df61f8d414dcbad7b5.zip
[SPARK-18801][SQL] Support resolve a nested view
## What changes were proposed in this pull request? We should be able to resolve a nested view. The main advantage is that if you update an underlying view, the current view also gets updated. The new approach should be compatible with older versions of SPARK/HIVE, that means: 1. The new approach should be able to resolve the views that created by older versions of SPARK/HIVE; 2. The new approach should be able to resolve the views that are currently supported by SPARK SQL. The new approach mainly brings in the following changes: 1. Add a new operator called `View` to keep track of the CatalogTable that describes the view, and the output attributes as well as the child of the view; 2. Update the `ResolveRelations` rule to resolve the relations and views, note that a nested view should be resolved correctly; 3. Add `viewDefaultDatabase` variable to `CatalogTable` to keep track of the default database name used to resolve a view, if the `CatalogTable` is not a view, then the variable should be `None`; 4. Add `AnalysisContext` to enable us to still support a view created with CTE/Windows query; 5. Enables the view support without enabling Hive support (i.e., enableHiveSupport); 6. Fix a weird behavior: the result of a view query may have different schema if the referenced table has been changed. After this PR, we try to cast the child output attributes to that from the view schema, throw an AnalysisException if cast is not allowed. Note this is compatible with the views defined by older versions of Spark(before 2.2), which have empty `defaultDatabase` and all the relations in `viewText` have database part defined. ## How was this patch tested? 1. Add new tests in `SessionCatalogSuite` to test the function `lookupRelation`; 2. Add new test case in `SQLViewSuite` to test resolve a nested view. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #16233 from jiangxb1987/resolve-view.
Diffstat (limited to 'sql/hive/src/test/scala/org')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala155
1 files changed, 155 insertions, 0 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index ba65db71ed..e06d0ae045 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.StructType
/**
* A suite for testing view related functionality.
@@ -543,4 +545,157 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
}
+
+ test("correctly resolve a nested view") {
+ withTempDatabase { db =>
+ withView(s"$db.view1", s"$db.view2") {
+ val view1 = CatalogTable(
+ identifier = TableIdentifier("view1", Some(db)),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("id", "int").add("id1", "int"),
+ viewOriginalText = Some("SELECT * FROM jt"),
+ viewText = Some("SELECT * FROM jt"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ val view2 = CatalogTable(
+ identifier = TableIdentifier("view2", Some(db)),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("id", "int").add("id1", "int"),
+ viewOriginalText = Some("SELECT * FROM view1"),
+ viewText = Some("SELECT * FROM view1"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> db))
+ activateDatabase(db) {
+ hiveContext.sessionState.catalog.createTable(view1, ignoreIfExists = false)
+ hiveContext.sessionState.catalog.createTable(view2, ignoreIfExists = false)
+ checkAnswer(sql("SELECT * FROM view2 ORDER BY id"), (1 to 9).map(i => Row(i, i)))
+ }
+ }
+ }
+ }
+
+ test("correctly resolve a view with CTE") {
+ withView("cte_view") {
+ val cte_view = CatalogTable(
+ identifier = TableIdentifier("cte_view"),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("n", "int"),
+ viewOriginalText = Some("WITH w AS (SELECT 1 AS n) SELECT n FROM w"),
+ viewText = Some("WITH w AS (SELECT 1 AS n) SELECT n FROM w"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ hiveContext.sessionState.catalog.createTable(cte_view, ignoreIfExists = false)
+ checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
+ }
+ }
+
+ test("correctly resolve a view in a self join") {
+ withView("join_view") {
+ val join_view = CatalogTable(
+ identifier = TableIdentifier("join_view"),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("id", "int").add("id1", "int"),
+ viewOriginalText = Some("SELECT * FROM jt"),
+ viewText = Some("SELECT * FROM jt"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ hiveContext.sessionState.catalog.createTable(join_view, ignoreIfExists = false)
+ checkAnswer(
+ sql("SELECT * FROM join_view t1 JOIN join_view t2 ON t1.id = t2.id ORDER BY t1.id"),
+ (1 to 9).map(i => Row(i, i, i, i)))
+ }
+ }
+
+ private def assertInvalidReference(query: String): Unit = {
+ val e = intercept[AnalysisException] {
+ sql(query)
+ }.getMessage
+ assert(e.contains("Table or view not found"))
+ }
+
+ test("error handling: fail if the referenced table or view is invalid") {
+ withView("view1", "view2", "view3") {
+ // Fail if the referenced table is defined in a invalid database.
+ val view1 = CatalogTable(
+ identifier = TableIdentifier("view1"),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("id", "int").add("id1", "int"),
+ viewOriginalText = Some("SELECT * FROM invalid_db.jt"),
+ viewText = Some("SELECT * FROM invalid_db.jt"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ hiveContext.sessionState.catalog.createTable(view1, ignoreIfExists = false)
+ assertInvalidReference("SELECT * FROM view1")
+
+ // Fail if the referenced table is invalid.
+ val view2 = CatalogTable(
+ identifier = TableIdentifier("view2"),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("id", "int").add("id1", "int"),
+ viewOriginalText = Some("SELECT * FROM invalid_table"),
+ viewText = Some("SELECT * FROM invalid_table"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ hiveContext.sessionState.catalog.createTable(view2, ignoreIfExists = false)
+ assertInvalidReference("SELECT * FROM view2")
+
+ // Fail if the referenced view is invalid.
+ val view3 = CatalogTable(
+ identifier = TableIdentifier("view3"),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("id", "int").add("id1", "int"),
+ viewOriginalText = Some("SELECT * FROM view2"),
+ viewText = Some("SELECT * FROM view2"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ hiveContext.sessionState.catalog.createTable(view3, ignoreIfExists = false)
+ assertInvalidReference("SELECT * FROM view3")
+ }
+ }
+
+ test("make sure we can resolve view created by old version of Spark") {
+ withTable("hive_table") {
+ withView("old_view") {
+ spark.sql("CREATE TABLE hive_table AS SELECT 1 AS a, 2 AS b")
+ // The views defined by older versions of Spark(before 2.2) will have empty view default
+ // database name, and all the relations referenced in the viewText will have database part
+ // defined.
+ val view = CatalogTable(
+ identifier = TableIdentifier("old_view"),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("a", "int").add("b", "int"),
+ viewOriginalText = Some(s"SELECT * FROM hive_table"),
+ viewText = Some("SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b` FROM (SELECT " +
+ "`gen_attr_0`, `gen_attr_1` FROM (SELECT `a` AS `gen_attr_0`, `b` AS " +
+ "`gen_attr_1` FROM hive_table) AS gen_subquery_0) AS hive_table")
+ )
+ hiveContext.sessionState.catalog.createTable(view, ignoreIfExists = false)
+ val df = sql("SELECT * FROM old_view")
+ // Check the output rows.
+ checkAnswer(df, Row(1, 2))
+ // Check the output schema.
+ assert(df.schema.sameType(view.schema))
+ }
+ }
+ }
+
+ test("correctly handle type casting between view output and child output") {
+ withTable("testTable") {
+ withView("testView") {
+ spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("testTable")
+ sql("CREATE VIEW testView AS SELECT * FROM testTable")
+
+ // Allow casting from IntegerType to LongType
+ val df = (1 until 10).map(i => i).toDF("id1")
+ df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i)))
+
+ // Can't cast from ArrayType to LongType, throw an AnalysisException.
+ val df2 = (1 until 10).map(i => Seq(i)).toDF("id1")
+ df2.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable")
+ intercept[AnalysisException](sql("SELECT * FROM testView ORDER BY id1"))
+ }
+ }
+ }
}