aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjiangxingbo <jiangxb1987@gmail.com>2017-01-16 19:11:21 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-16 19:11:21 +0800
commite635cbb6e61dee450db0ef45f3d82ac282a85d3c (patch)
tree6743801e406e256dab532c2a8346bb764396d5cd
parent61e48f52d1d8c7431707bd3511b6fe9f0ae996c0 (diff)
downloadspark-e635cbb6e61dee450db0ef45f3d82ac282a85d3c.tar.gz
spark-e635cbb6e61dee450db0ef45f3d82ac282a85d3c.tar.bz2
spark-e635cbb6e61dee450db0ef45f3d82ac282a85d3c.zip
[SPARK-18801][SQL][FOLLOWUP] Alias the view with its child
## What changes were proposed in this pull request? This PR is a follow-up to address the comments https://github.com/apache/spark/pull/16233/files#r95669988 and https://github.com/apache/spark/pull/16233/files#r95662299. We try to wrap the child by: 1. Generate the `queryOutput` by: 1.1. If the query column names are defined, map the column names to attributes in the child output by name; 1.2. Else set the child output attributes to `queryOutput`. 2. Map the `queryQutput` to view output by index, if the corresponding attributes don't match, try to up cast and alias the attribute in `queryOutput` to the attribute in the view output. 3. Add a Project over the child, with the new output generated by the previous steps. If the view output doesn't have the same number of columns neither with the child output, nor with the query column names, throw an AnalysisException. ## How was this patch tested? Add new test cases in `SQLViewSuite`. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #16561 from jiangxb1987/alias-view.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala24
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala66
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala49
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala21
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala111
5 files changed, 214 insertions, 57 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 1957df89e6..bd9037ec43 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2281,12 +2281,6 @@ class Analyzer(
"type of the field in the target object")
}
- private def illegalNumericPrecedence(from: DataType, to: DataType): Boolean = {
- val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from)
- val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to)
- toPrecedence > 0 && fromPrecedence > toPrecedence
- }
-
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case p if !p.childrenResolved => p
case p if p.resolved => p
@@ -2294,19 +2288,11 @@ class Analyzer(
case p => p transformExpressions {
case u @ UpCast(child, _, _) if !child.resolved => u
- case UpCast(child, dataType, walkedTypePath) => (child.dataType, dataType) match {
- case (from: NumericType, to: DecimalType) if !to.isWiderThan(from) =>
- fail(child, to, walkedTypePath)
- case (from: DecimalType, to: NumericType) if !from.isTighterThan(to) =>
- fail(child, to, walkedTypePath)
- case (from, to) if illegalNumericPrecedence(from, to) =>
- fail(child, to, walkedTypePath)
- case (TimestampType, DateType) =>
- fail(child, DateType, walkedTypePath)
- case (StringType, to: NumericType) =>
- fail(child, to, walkedTypePath)
- case _ => Cast(child, dataType.asNullable)
- }
+ case UpCast(child, dataType, walkedTypePath)
+ if Cast.mayTruncate(child.dataType, dataType) =>
+ fail(child, dataType, walkedTypePath)
+
+ case UpCast(child, dataType, walkedTypePath) => Cast(child, dataType.asNullable)
}
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
index 737f846ef4..a5640a6c96 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
@@ -28,22 +28,60 @@ import org.apache.spark.sql.catalyst.rules.Rule
*/
/**
- * Make sure that a view's child plan produces the view's output attributes. We wrap the child
- * with a Project and add an alias for each output attribute. The attributes are resolved by
- * name. This should be only done after the batch of Resolution, because the view attributes are
- * not completely resolved during the batch of Resolution.
+ * Make sure that a view's child plan produces the view's output attributes. We try to wrap the
+ * child by:
+ * 1. Generate the `queryOutput` by:
+ * 1.1. If the query column names are defined, map the column names to attributes in the child
+ * output by name(This is mostly for handling view queries like SELECT * FROM ..., the
+ * schema of the referenced table/view may change after the view has been created, so we
+ * have to save the output of the query to `viewQueryColumnNames`, and restore them during
+ * view resolution, in this way, we are able to get the correct view column ordering and
+ * omit the extra columns that we don't require);
+ * 1.2. Else set the child output attributes to `queryOutput`.
+ * 2. Map the `queryQutput` to view output by index, if the corresponding attributes don't match,
+ * try to up cast and alias the attribute in `queryOutput` to the attribute in the view output.
+ * 3. Add a Project over the child, with the new output generated by the previous steps.
+ * If the view output doesn't have the same number of columns neither with the child output, nor
+ * with the query column names, throw an AnalysisException.
+ *
+ * This should be only done after the batch of Resolution, because the view attributes are not
+ * completely resolved during the batch of Resolution.
*/
case class AliasViewChild(conf: CatalystConf) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case v @ View(_, output, child) if child.resolved =>
+ case v @ View(desc, output, child) if child.resolved && output != child.output =>
val resolver = conf.resolver
- val newOutput = output.map { attr =>
- val originAttr = findAttributeByName(attr.name, child.output, resolver)
- // The dataType of the output attributes may be not the same with that of the view output,
- // so we should cast the attribute to the dataType of the view output attribute. If the
- // cast can't perform, will throw an AnalysisException.
- Alias(Cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
- qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
+ val queryColumnNames = desc.viewQueryColumnNames
+ val queryOutput = if (queryColumnNames.nonEmpty) {
+ // If the view output doesn't have the same number of columns with the query column names,
+ // throw an AnalysisException.
+ if (output.length != queryColumnNames.length) {
+ throw new AnalysisException(
+ s"The view output ${output.mkString("[", ",", "]")} doesn't have the same number of " +
+ s"columns with the query column names ${queryColumnNames.mkString("[", ",", "]")}")
+ }
+ desc.viewQueryColumnNames.map { colName =>
+ findAttributeByName(colName, child.output, resolver)
+ }
+ } else {
+ // For view created before Spark 2.2.0, the view text is already fully qualified, the plan
+ // output is the same with the view output.
+ child.output
+ }
+ // Map the attributes in the query output to the attributes in the view output by index.
+ val newOutput = output.zip(queryOutput).map {
+ case (attr, originAttr) if attr != originAttr =>
+ // The dataType of the output attributes may be not the same with that of the view
+ // output, so we should cast the attribute to the dataType of the view output attribute.
+ // Will throw an AnalysisException if the cast can't perform or might truncate.
+ if (Cast.mayTruncate(originAttr.dataType, attr.dataType)) {
+ throw new AnalysisException(s"Cannot up cast ${originAttr.sql} from " +
+ s"${originAttr.dataType.simpleString} to ${attr.simpleString} as it may truncate\n")
+ } else {
+ Alias(Cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
+ qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
+ }
+ case (_, originAttr) => originAttr
}
v.copy(child = Project(newOutput, child))
}
@@ -74,7 +112,9 @@ object EliminateView extends Rule[LogicalPlan] {
// The child should have the same output attributes with the View operator, so we simply
// remove the View operator.
case View(_, output, child) =>
- assert(output == child.output, "The output of the child is different from the view output")
+ assert(output == child.output,
+ s"The output of the child ${child.output.mkString("[", ",", "]")} is different from the " +
+ s"view output ${output.mkString("[", ",", "]")}")
child
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index a9de10717e..2adccdd7bf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -19,12 +19,15 @@ package org.apache.spark.sql.catalyst.catalog
import java.util.Date
+import scala.collection.mutable
+
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.StructType
+
/**
@@ -178,6 +181,8 @@ case class CatalogTable(
unsupportedFeatures: Seq[String] = Seq.empty,
tracksPartitionsInCatalog: Boolean = false) {
+ import CatalogTable._
+
/** schema of this table's partition columns */
def partitionSchema: StructType = StructType(schema.filter {
c => partitionColumnNames.contains(c.name)
@@ -198,9 +203,44 @@ case class CatalogTable(
/**
* Return the default database name we use to resolve a view, should be None if the CatalogTable
- * is not a View.
+ * is not a View or created by older versions of Spark(before 2.2.0).
+ */
+ def viewDefaultDatabase: Option[String] = properties.get(VIEW_DEFAULT_DATABASE)
+
+ /**
+ * Return the output column names of the query that creates a view, the column names are used to
+ * resolve a view, should be empty if the CatalogTable is not a View or created by older versions
+ * of Spark(before 2.2.0).
+ */
+ def viewQueryColumnNames: Seq[String] = {
+ for {
+ numCols <- properties.get(VIEW_QUERY_OUTPUT_NUM_COLUMNS).toSeq
+ index <- 0 until numCols.toInt
+ } yield properties.getOrElse(
+ s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index",
+ throw new AnalysisException("Corrupted view query output column names in catalog: " +
+ s"$numCols parts expected, but part $index is missing.")
+ )
+ }
+
+ /**
+ * Insert/Update the view query output column names in `properties`.
*/
- def viewDefaultDatabase: Option[String] = properties.get(CatalogTable.VIEW_DEFAULT_DATABASE)
+ def withQueryColumnNames(columns: Seq[String]): CatalogTable = {
+ val props = new mutable.HashMap[String, String]
+ if (columns.nonEmpty) {
+ props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString)
+ columns.zipWithIndex.foreach { case (colName, index) =>
+ props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName)
+ }
+ }
+
+ // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
+ // while `CatalogTable` should be serializable.
+ copy(properties = properties.filterNot { case (key, _) =>
+ key.startsWith(VIEW_QUERY_OUTPUT_PREFIX)
+ } ++ props)
+ }
/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
@@ -254,6 +294,9 @@ case class CatalogTable(
object CatalogTable {
val VIEW_DEFAULT_DATABASE = "view.default.database"
+ val VIEW_QUERY_OUTPUT_PREFIX = "view.query.out."
+ val VIEW_QUERY_OUTPUT_NUM_COLUMNS = VIEW_QUERY_OUTPUT_PREFIX + "numCols"
+ val VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX = VIEW_QUERY_OUTPUT_PREFIX + "col."
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 14e275bf88..ad59271e5b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -21,7 +21,7 @@ import java.math.{BigDecimal => JavaBigDecimal}
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.types._
@@ -89,6 +89,25 @@ object Cast {
case _ => false
}
+ /**
+ * Return true iff we may truncate during casting `from` type to `to` type. e.g. long -> int,
+ * timestamp -> date.
+ */
+ def mayTruncate(from: DataType, to: DataType): Boolean = (from, to) match {
+ case (from: NumericType, to: DecimalType) if !to.isWiderThan(from) => true
+ case (from: DecimalType, to: NumericType) if !from.isTighterThan(to) => true
+ case (from, to) if illegalNumericPrecedence(from, to) => true
+ case (TimestampType, DateType) => true
+ case (StringType, to: NumericType) => true
+ case _ => false
+ }
+
+ private def illegalNumericPrecedence(from: DataType, to: DataType): Boolean = {
+ val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from)
+ val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to)
+ toPrecedence > 0 && fromPrecedence > toPrecedence
+ }
+
def forceNullable(from: DataType, to: DataType): Boolean = (from, to) match {
case (NullType, _) => true
case (_, _) if from == to => false
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index e06d0ae045..9bc078dbb0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -553,18 +553,24 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
identifier = TableIdentifier("view1", Some(db)),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
- schema = new StructType().add("id", "int").add("id1", "int"),
+ schema = new StructType().add("x", "long").add("y", "long"),
viewOriginalText = Some("SELECT * FROM jt"),
viewText = Some("SELECT * FROM jt"),
- properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+ CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1"))
val view2 = CatalogTable(
identifier = TableIdentifier("view2", Some(db)),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
- schema = new StructType().add("id", "int").add("id1", "int"),
+ schema = new StructType().add("id", "long").add("id1", "long"),
viewOriginalText = Some("SELECT * FROM view1"),
viewText = Some("SELECT * FROM view1"),
- properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> db))
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> db,
+ CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "x",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "y"))
activateDatabase(db) {
hiveContext.sessionState.catalog.createTable(view1, ignoreIfExists = false)
hiveContext.sessionState.catalog.createTable(view2, ignoreIfExists = false)
@@ -583,7 +589,9 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
schema = new StructType().add("n", "int"),
viewOriginalText = Some("WITH w AS (SELECT 1 AS n) SELECT n FROM w"),
viewText = Some("WITH w AS (SELECT 1 AS n) SELECT n FROM w"),
- properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+ CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "1",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "n"))
hiveContext.sessionState.catalog.createTable(cte_view, ignoreIfExists = false)
checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
}
@@ -595,10 +603,13 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
identifier = TableIdentifier("join_view"),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
- schema = new StructType().add("id", "int").add("id1", "int"),
+ schema = new StructType().add("id", "long").add("id1", "long"),
viewOriginalText = Some("SELECT * FROM jt"),
viewText = Some("SELECT * FROM jt"),
- properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+ CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1"))
hiveContext.sessionState.catalog.createTable(join_view, ignoreIfExists = false)
checkAnswer(
sql("SELECT * FROM join_view t1 JOIN join_view t2 ON t1.id = t2.id ORDER BY t1.id"),
@@ -620,10 +631,13 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
identifier = TableIdentifier("view1"),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
- schema = new StructType().add("id", "int").add("id1", "int"),
+ schema = new StructType().add("id", "long").add("id1", "long"),
viewOriginalText = Some("SELECT * FROM invalid_db.jt"),
viewText = Some("SELECT * FROM invalid_db.jt"),
- properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+ CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1"))
hiveContext.sessionState.catalog.createTable(view1, ignoreIfExists = false)
assertInvalidReference("SELECT * FROM view1")
@@ -632,10 +646,13 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
identifier = TableIdentifier("view2"),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
- schema = new StructType().add("id", "int").add("id1", "int"),
+ schema = new StructType().add("id", "long").add("id1", "long"),
viewOriginalText = Some("SELECT * FROM invalid_table"),
viewText = Some("SELECT * FROM invalid_table"),
- properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+ CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1"))
hiveContext.sessionState.catalog.createTable(view2, ignoreIfExists = false)
assertInvalidReference("SELECT * FROM view2")
@@ -644,10 +661,13 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
identifier = TableIdentifier("view3"),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
- schema = new StructType().add("id", "int").add("id1", "int"),
+ schema = new StructType().add("id", "long").add("id1", "long"),
viewOriginalText = Some("SELECT * FROM view2"),
viewText = Some("SELECT * FROM view2"),
- properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+ CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1"))
hiveContext.sessionState.catalog.createTable(view3, ignoreIfExists = false)
assertInvalidReference("SELECT * FROM view3")
}
@@ -680,21 +700,70 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
- test("correctly handle type casting between view output and child output") {
+ test("resolve a view with custom column names") {
withTable("testTable") {
+ spark.range(1, 10).selectExpr("id", "id + 1 id1").write.saveAsTable("testTable")
withView("testView") {
- spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("testTable")
- sql("CREATE VIEW testView AS SELECT * FROM testTable")
+ val testView = CatalogTable(
+ identifier = TableIdentifier("testView"),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("x", "long").add("y", "long"),
+ viewOriginalText = Some("SELECT * FROM testTable"),
+ viewText = Some("SELECT * FROM testTable"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+ CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1"))
+ hiveContext.sessionState.catalog.createTable(testView, ignoreIfExists = false)
+
+ // Correctly resolve a view with custom column names.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY x"), (1 to 9).map(i => Row(i, i + 1)))
+
+ // Correctly resolve a view when the referenced table schema changes.
+ spark.range(1, 10).selectExpr("id", "id + id dummy", "id + 1 id1")
+ .write.mode(SaveMode.Overwrite).saveAsTable("testTable")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY x"), (1 to 9).map(i => Row(i, i + 1)))
+
+ // Throw an AnalysisException if the column name is not found.
+ spark.range(1, 10).selectExpr("id", "id + 1 dummy")
+ .write.mode(SaveMode.Overwrite).saveAsTable("testTable")
+ intercept[AnalysisException](sql("SELECT * FROM testView"))
+ }
+ }
+ }
+
+ test("resolve a view when the dataTypes of referenced table columns changed") {
+ withTable("testTable") {
+ spark.range(1, 10).selectExpr("id", "id + 1 id1").write.saveAsTable("testTable")
+ withView("testView") {
+ val testView = CatalogTable(
+ identifier = TableIdentifier("testView"),
+ tableType = CatalogTableType.VIEW,
+ storage = CatalogStorageFormat.empty,
+ schema = new StructType().add("id", "long").add("id1", "long"),
+ viewOriginalText = Some("SELECT * FROM testTable"),
+ viewText = Some("SELECT * FROM testTable"),
+ properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+ CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id",
+ s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1"))
+ hiveContext.sessionState.catalog.createTable(testView, ignoreIfExists = false)
// Allow casting from IntegerType to LongType
- val df = (1 until 10).map(i => i).toDF("id1")
+ val df = (1 until 10).map(i => (i, i + 1)).toDF("id", "id1")
df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable")
- checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i)))
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i + 1)))
- // Can't cast from ArrayType to LongType, throw an AnalysisException.
- val df2 = (1 until 10).map(i => Seq(i)).toDF("id1")
+ // Casting from DoubleType to LongType might truncate, throw an AnalysisException.
+ val df2 = (1 until 10).map(i => (i.toDouble, i.toDouble)).toDF("id", "id1")
df2.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable")
- intercept[AnalysisException](sql("SELECT * FROM testView ORDER BY id1"))
+ intercept[AnalysisException](sql("SELECT * FROM testView"))
+
+ // Can't cast from ArrayType to LongType, throw an AnalysisException.
+ val df3 = (1 until 10).map(i => (i, Seq(i))).toDF("id", "id1")
+ df3.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable")
+ intercept[AnalysisException](sql("SELECT * FROM testView"))
}
}
}