aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorjiangxingbo <jiangxb1987@gmail.com>2017-01-11 13:44:07 -0800
committerHerman van Hovell <hvanhovell@databricks.com>2017-01-11 13:44:07 -0800
commit30a07071f099c0ebcf04c4df61f8d414dcbad7b5 (patch)
tree40311294f1c2be27b5e0bf101700bb4553aadc0b /sql/hive
parent3bc2eff8880a3ba8d4318118715ea1a47048e3de (diff)
downloadspark-30a07071f099c0ebcf04c4df61f8d414dcbad7b5.tar.gz
spark-30a07071f099c0ebcf04c4df61f8d414dcbad7b5.tar.bz2
spark-30a07071f099c0ebcf04c4df61f8d414dcbad7b5.zip
[SPARK-18801][SQL] Support resolve a nested view
## What changes were proposed in this pull request? We should be able to resolve a nested view. The main advantage is that if you update an underlying view, the current view also gets updated. The new approach should be compatible with older versions of SPARK/HIVE, that means: 1. The new approach should be able to resolve the views that created by older versions of SPARK/HIVE; 2. The new approach should be able to resolve the views that are currently supported by SPARK SQL. The new approach mainly brings in the following changes: 1. Add a new operator called `View` to keep track of the CatalogTable that describes the view, and the output attributes as well as the child of the view; 2. Update the `ResolveRelations` rule to resolve the relations and views, note that a nested view should be resolved correctly; 3. Add `viewDefaultDatabase` variable to `CatalogTable` to keep track of the default database name used to resolve a view, if the `CatalogTable` is not a view, then the variable should be `None`; 4. Add `AnalysisContext` to enable us to still support a view created with CTE/Windows query; 5. Enables the view support without enabling Hive support (i.e., enableHiveSupport); 6. Fix a weird behavior: the result of a view query may have different schema if the referenced table has been changed. After this PR, we try to cast the child output attributes to that from the view schema, throw an AnalysisException if cast is not allowed. Note this is compatible with the views defined by older versions of Spark(before 2.2), which have empty `defaultDatabase` and all the relations in `viewText` have database part defined. ## How was this patch tested? 1. Add new tests in `SessionCatalogSuite` to test the function `lookupRelation`; 2. Add new test case in `SQLViewSuite` to test resolve a nested view. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #16233 from jiangxb1987/resolve-view.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala20
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala155
5 files changed, 182 insertions, 12 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 474a2c868e..208c8c9d5d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -606,8 +606,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
/**
- * Restores table metadata from the table properties if it's a datasouce table. This method is
- * kind of a opposite version of [[createTable]].
+ * Restores table metadata from the table properties. This method is kind of a opposite version
+ * of [[createTable]].
*
* It reads table schema, provider, partition column names and bucket specification from table
* properties, and filter out these special entries from table properties.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ee4589f855..0c110d3500 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -111,6 +111,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
new Path(new Path(dbLocation), tblName).toString
}
+ /**
+ * Returns a [[LogicalPlan]] that represents the given table or view from Hive metastore.
+ *
+ * @param tableIdent The name of the table/view that we look up.
+ * @param alias The alias name of the table/view that we look up.
+ * @return a [[LogicalPlan]] that represents the given table or view from Hive metastore.
+ */
def lookupRelation(
tableIdent: TableIdentifier,
alias: Option[String]): LogicalPlan = {
@@ -125,11 +132,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// Otherwise, wrap the table with a Subquery using the table name.
alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
} else if (table.tableType == CatalogTableType.VIEW) {
+ val tableIdentifier = table.identifier
val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
- SubqueryAlias(
- alias.getOrElse(table.identifier.table),
- sparkSession.sessionState.sqlParser.parsePlan(viewText),
- Option(table.identifier))
+ // The relation is a view, so we wrap the relation by:
+ // 1. Add a [[View]] operator over the relation to keep track of the view desc;
+ // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
+ val child = View(
+ desc = table,
+ output = table.schema.toAttributes,
+ child = sparkSession.sessionState.sqlParser.parsePlan(viewText))
+ SubqueryAlias(alias.getOrElse(tableIdentifier.table), child, Option(tableIdentifier))
} else {
val qualifiedTable =
MetastoreRelation(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 462b3c2686..b3cbbedbe1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchTableExce
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
@@ -46,16 +47,18 @@ private[sql] class HiveSessionCatalog(
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
conf: SQLConf,
- hadoopConf: Configuration)
+ hadoopConf: Configuration,
+ parser: ParserInterface)
extends SessionCatalog(
externalCatalog,
globalTempViewManager,
functionResourceLoader,
functionRegistry,
conf,
- hadoopConf) {
+ hadoopConf,
+ parser) {
- override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = {
+ override def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
synchronized {
val table = formatTableName(name.table)
val db = formatDatabaseName(name.database.getOrElse(currentDb))
@@ -65,8 +68,7 @@ private[sql] class HiveSessionCatalog(
SubqueryAlias(relationAlias, viewDef, Some(name))
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempTables.contains(table)) {
- val database = name.database.map(formatDatabaseName)
- val newName = name.copy(database = database, table = table)
+ val newName = name.copy(database = Some(db), table = table)
metastoreCatalog.lookupRelation(newName, alias)
} else {
val relation = tempTables(table)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index aebee85f92..9b4b8b6fcd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -50,7 +50,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
functionResourceLoader,
functionRegistry,
conf,
- newHadoopConf())
+ newHadoopConf(),
+ sqlParser)
}
/**
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index ba65db71ed..e06d0ae045 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.StructType
/**
* A suite for testing view related functionality.
@@ -543,4 +545,157 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
}
+
+ test("correctly resolve a nested view") {
+ withTempDatabase { db =>
+ withView(s"$db.view1", s"$db.view2") {
+ val view1 = CatalogTable(
+ identifier = TableIdentifier("view1", Some(db)),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("id", "int").add("id1", "int"),
+ viewOriginalText = Some("SELECT * FROM jt"),
+ viewText = Some("SELECT * FROM jt"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ val view2 = CatalogTable(
+ identifier = TableIdentifier("view2", Some(db)),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("id", "int").add("id1", "int"),
+ viewOriginalText = Some("SELECT * FROM view1"),
+ viewText = Some("SELECT * FROM view1"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> db))
+ activateDatabase(db) {
+ hiveContext.sessionState.catalog.createTable(view1, ignoreIfExists = false)
+ hiveContext.sessionState.catalog.createTable(view2, ignoreIfExists = false)
+ checkAnswer(sql("SELECT * FROM view2 ORDER BY id"), (1 to 9).map(i => Row(i, i)))
+ }
+ }
+ }
+ }
+
+ test("correctly resolve a view with CTE") {
+ withView("cte_view") {
+ val cte_view = CatalogTable(
+ identifier = TableIdentifier("cte_view"),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("n", "int"),
+ viewOriginalText = Some("WITH w AS (SELECT 1 AS n) SELECT n FROM w"),
+ viewText = Some("WITH w AS (SELECT 1 AS n) SELECT n FROM w"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ hiveContext.sessionState.catalog.createTable(cte_view, ignoreIfExists = false)
+ checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
+ }
+ }
+
+ test("correctly resolve a view in a self join") {
+ withView("join_view") {
+ val join_view = CatalogTable(
+ identifier = TableIdentifier("join_view"),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("id", "int").add("id1", "int"),
+ viewOriginalText = Some("SELECT * FROM jt"),
+ viewText = Some("SELECT * FROM jt"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ hiveContext.sessionState.catalog.createTable(join_view, ignoreIfExists = false)
+ checkAnswer(
+ sql("SELECT * FROM join_view t1 JOIN join_view t2 ON t1.id = t2.id ORDER BY t1.id"),
+ (1 to 9).map(i => Row(i, i, i, i)))
+ }
+ }
+
+ private def assertInvalidReference(query: String): Unit = {
+ val e = intercept[AnalysisException] {
+ sql(query)
+ }.getMessage
+ assert(e.contains("Table or view not found"))
+ }
+
+ test("error handling: fail if the referenced table or view is invalid") {
+ withView("view1", "view2", "view3") {
+ // Fail if the referenced table is defined in a invalid database.
+ val view1 = CatalogTable(
+ identifier = TableIdentifier("view1"),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("id", "int").add("id1", "int"),
+ viewOriginalText = Some("SELECT * FROM invalid_db.jt"),
+ viewText = Some("SELECT * FROM invalid_db.jt"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ hiveContext.sessionState.catalog.createTable(view1, ignoreIfExists = false)
+ assertInvalidReference("SELECT * FROM view1")
+
+ // Fail if the referenced table is invalid.
+ val view2 = CatalogTable(
+ identifier = TableIdentifier("view2"),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("id", "int").add("id1", "int"),
+ viewOriginalText = Some("SELECT * FROM invalid_table"),
+ viewText = Some("SELECT * FROM invalid_table"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ hiveContext.sessionState.catalog.createTable(view2, ignoreIfExists = false)
+ assertInvalidReference("SELECT * FROM view2")
+
+ // Fail if the referenced view is invalid.
+ val view3 = CatalogTable(
+ identifier = TableIdentifier("view3"),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("id", "int").add("id1", "int"),
+ viewOriginalText = Some("SELECT * FROM view2"),
+ viewText = Some("SELECT * FROM view2"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ hiveContext.sessionState.catalog.createTable(view3, ignoreIfExists = false)
+ assertInvalidReference("SELECT * FROM view3")
+ }
+ }
+
+ test("make sure we can resolve view created by old version of Spark") {
+ withTable("hive_table") {
+ withView("old_view") {
+ spark.sql("CREATE TABLE hive_table AS SELECT 1 AS a, 2 AS b")
+ // The views defined by older versions of Spark(before 2.2) will have empty view default
+ // database name, and all the relations referenced in the viewText will have database part
+ // defined.
+ val view = CatalogTable(
+ identifier = TableIdentifier("old_view"),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("a", "int").add("b", "int"),
+ viewOriginalText = Some(s"SELECT * FROM hive_table"),
+ viewText = Some("SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b` FROM (SELECT " +
+ "`gen_attr_0`, `gen_attr_1` FROM (SELECT `a` AS `gen_attr_0`, `b` AS " +
+ "`gen_attr_1` FROM hive_table) AS gen_subquery_0) AS hive_table")
+ )
+ hiveContext.sessionState.catalog.createTable(view, ignoreIfExists = false)
+ val df = sql("SELECT * FROM old_view")
+ // Check the output rows.
+ checkAnswer(df, Row(1, 2))
+ // Check the output schema.
+ assert(df.schema.sameType(view.schema))
+ }
+ }
+ }
+
+ test("correctly handle type casting between view output and child output") {
+ withTable("testTable") {
+ withView("testView") {
+ spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("testTable")
+ sql("CREATE VIEW testView AS SELECT * FROM testTable")
+
+ // Allow casting from IntegerType to LongType
+ val df = (1 until 10).map(i => i).toDF("id1")
+ df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i)))
+
+ // Can't cast from ArrayType to LongType, throw an AnalysisException.
+ val df2 = (1 until 10).map(i => Seq(i)).toDF("id1")
+ df2.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable")
+ intercept[AnalysisException](sql("SELECT * FROM testView ORDER BY id1"))
+ }
+ }
+ }
}