aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <cloud0fan@163.com>2015-10-08 12:42:10 -0700
committerYin Huai <yhuai@databricks.com>2015-10-08 12:42:10 -0700
commitaf2a5544875b23b3b62fb6d4f3bf432828720008 (patch)
treed8282c85ee2989d8e8d6e5f4216934d126eca20c /sql
parent82d275f27c3e9211ce69c5c8685a0fe90c0be26f (diff)
downloadspark-af2a5544875b23b3b62fb6d4f3bf432828720008.tar.gz
spark-af2a5544875b23b3b62fb6d4f3bf432828720008.tar.bz2
spark-af2a5544875b23b3b62fb6d4f3bf432828720008.zip
[SPARK-10337] [SQL] fix hive views on non-hive-compatible tables.
add a new config to deal with this special case. Author: Wenchen Fan <cloud0fan@163.com> Closes #8990 from cloud-fan/view-master.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala15
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala23
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala164
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala31
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala97
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala117
7 files changed, 433 insertions, 27 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index e7bbc7d5db..8f0f8910b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -319,6 +319,15 @@ private[spark] object SQLConf {
doc = "When true, some predicates will be pushed down into the Hive metastore so that " +
"unmatching partitions can be eliminated earlier.")
+ val CANONICALIZE_VIEW = booleanConf("spark.sql.canonicalizeView",
+ defaultValue = Some(false),
+ doc = "When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " +
+ "Note that this function is experimental and should ony be used when you are using " +
+ "non-hive-compatible tables written by Spark SQL. The SQL string used to create " +
+ "view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " +
+ "possible, or you may get wrong result.",
+ isPublic = false)
+
val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
defaultValue = Some("_corrupt_record"),
doc = "<TODO>")
@@ -362,7 +371,7 @@ private[spark] object SQLConf {
val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled",
defaultValue = Some(true),
- doc = "When true, automtically discover data partitions.")
+ doc = "When true, automatically discover data partitions.")
val PARTITION_COLUMN_TYPE_INFERENCE =
booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled",
@@ -372,7 +381,7 @@ private[spark] object SQLConf {
val PARTITION_MAX_FILES =
intConf("spark.sql.sources.maxConcurrentWrites",
defaultValue = Some(5),
- doc = "The maximum number of concurent files to open before falling back on sorting when " +
+ doc = "The maximum number of concurrent files to open before falling back on sorting when " +
"writing out files using dynamic partitioning.")
// The output committer class used by HadoopFsRelation. The specified class needs to be a
@@ -471,6 +480,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
+ private[spark] def canonicalizeView: Boolean = getConf(CANONICALIZE_VIEW)
+
private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, getConf(TUNGSTEN_ENABLED))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ea1521a48c..cf59bc0d59 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.execution.{FileRelation, datasources}
import org.apache.spark.sql.hive.client._
+import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
@@ -588,6 +589,28 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
case p: LogicalPlan if p.resolved => p
+
+ case CreateViewAsSelect(table, child, allowExisting, replace, sql) =>
+ if (conf.canonicalizeView) {
+ if (allowExisting && replace) {
+ throw new AnalysisException(
+ "It is not allowed to define a view with both IF NOT EXISTS and OR REPLACE.")
+ }
+
+ val (dbName, tblName) = processDatabaseAndTableName(
+ table.specifiedDatabase.getOrElse(client.currentDatabase), table.name)
+
+ execution.CreateViewAsSelect(
+ table.copy(
+ specifiedDatabase = Some(dbName),
+ name = tblName),
+ child.output,
+ allowExisting,
+ replace)
+ } else {
+ HiveNativeCommand(sql)
+ }
+
case p @ CreateTableAsSelect(table, child, allowExisting) =>
val schema = if (table.schema.nonEmpty) {
table.schema
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 256440a9a2..2bf22f5449 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -77,6 +77,16 @@ private[hive] case class CreateTableAsSelect(
childrenResolved
}
+private[hive] case class CreateViewAsSelect(
+ tableDesc: HiveTable,
+ child: LogicalPlan,
+ allowExisting: Boolean,
+ replace: Boolean,
+ sql: String) extends UnaryNode with Command {
+ override def output: Seq[Attribute] = Seq.empty[Attribute]
+ override lazy val resolved: Boolean = false
+}
+
/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
private[hive] object HiveQl extends Logging {
protected val nativeCommands = Seq(
@@ -99,7 +109,6 @@ private[hive] object HiveQl extends Logging {
"TOK_ALTERTABLE_SKEWED",
"TOK_ALTERTABLE_TOUCH",
"TOK_ALTERTABLE_UNARCHIVE",
- "TOK_ALTERVIEW",
"TOK_ALTERVIEW_ADDPARTS",
"TOK_ALTERVIEW_AS",
"TOK_ALTERVIEW_DROPPARTS",
@@ -110,7 +119,6 @@ private[hive] object HiveQl extends Logging {
"TOK_CREATEFUNCTION",
"TOK_CREATEINDEX",
"TOK_CREATEROLE",
- "TOK_CREATEVIEW",
"TOK_DESCDATABASE",
"TOK_DESCFUNCTION",
@@ -254,12 +262,17 @@ private[hive] object HiveQl extends Logging {
* Otherwise, there will be Null pointer exception,
* when retrieving properties form HiveConf.
*/
- val hContext = new Context(SessionState.get().getConf())
- val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext))
+ val hContext = createContext()
+ val node = getAst(sql, hContext)
hContext.clear()
node
}
+ private def createContext(): Context = new Context(SessionState.get().getConf())
+
+ private def getAst(sql: String, context: Context) =
+ ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, context))
+
/**
* Returns the HiveConf
*/
@@ -280,15 +293,18 @@ private[hive] object HiveQl extends Logging {
/** Creates LogicalPlan for a given HiveQL string. */
def createPlan(sql: String): LogicalPlan = {
try {
- val tree = getAst(sql)
- if (nativeCommands contains tree.getText) {
+ val context = createContext()
+ val tree = getAst(sql, context)
+ val plan = if (nativeCommands contains tree.getText) {
HiveNativeCommand(sql)
} else {
- nodeToPlan(tree) match {
+ nodeToPlan(tree, context) match {
case NativePlaceholder => HiveNativeCommand(sql)
case other => other
}
}
+ context.clear()
+ plan
} catch {
case pe: org.apache.hadoop.hive.ql.parse.ParseException =>
pe.getMessage match {
@@ -342,7 +358,9 @@ private[hive] object HiveQl extends Logging {
}
}
- protected def getClauses(clauseNames: Seq[String], nodeList: Seq[ASTNode]): Seq[Option[Node]] = {
+ protected def getClauses(
+ clauseNames: Seq[String],
+ nodeList: Seq[ASTNode]): Seq[Option[ASTNode]] = {
var remainingNodes = nodeList
val clauses = clauseNames.map { clauseName =>
val (matches, nonMatches) = remainingNodes.partition(_.getText.toUpperCase == clauseName)
@@ -489,7 +507,43 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
}
}
- protected def nodeToPlan(node: Node): LogicalPlan = node match {
+ private def createView(
+ view: ASTNode,
+ context: Context,
+ viewNameParts: ASTNode,
+ query: ASTNode,
+ schema: Seq[HiveColumn],
+ properties: Map[String, String],
+ allowExist: Boolean,
+ replace: Boolean): CreateViewAsSelect = {
+ val (db, viewName) = extractDbNameTableName(viewNameParts)
+
+ val originalText = context.getTokenRewriteStream
+ .toString(query.getTokenStartIndex, query.getTokenStopIndex)
+
+ val tableDesc = HiveTable(
+ specifiedDatabase = db,
+ name = viewName,
+ schema = schema,
+ partitionColumns = Seq.empty[HiveColumn],
+ properties = properties,
+ serdeProperties = Map[String, String](),
+ tableType = VirtualView,
+ location = None,
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ viewText = Some(originalText))
+
+ // We need to keep the original SQL string so that if `spark.sql.canonicalizeView` is
+ // false, we can fall back to use hive native command later.
+ // We can remove this when parser is configurable(can access SQLConf) in the future.
+ val sql = context.getTokenRewriteStream
+ .toString(view.getTokenStartIndex, view.getTokenStopIndex)
+ CreateViewAsSelect(tableDesc, nodeToPlan(query, context), allowExist, replace, sql)
+ }
+
+ protected def nodeToPlan(node: ASTNode, context: Context): LogicalPlan = node match {
// Special drop table that also uncaches.
case Token("TOK_DROPTABLE",
Token("TOK_TABNAME", tableNameParts) ::
@@ -521,14 +575,14 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
val Some(crtTbl) :: _ :: extended :: Nil =
getClauses(Seq("TOK_CREATETABLE", "FORMATTED", "EXTENDED"), explainArgs)
ExplainCommand(
- nodeToPlan(crtTbl),
+ nodeToPlan(crtTbl, context),
extended = extended.isDefined)
case Token("TOK_EXPLAIN", explainArgs) =>
// Ignore FORMATTED if present.
val Some(query) :: _ :: extended :: Nil =
getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs)
ExplainCommand(
- nodeToPlan(query),
+ nodeToPlan(query, context),
extended = extended.isDefined)
case Token("TOK_DESCTABLE", describeArgs) =>
@@ -563,6 +617,73 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
}
}
+ case view @ Token("TOK_ALTERVIEW", children) =>
+ val Some(viewNameParts) :: maybeQuery :: ignores =
+ getClauses(Seq(
+ "TOK_TABNAME",
+ "TOK_QUERY",
+ "TOK_ALTERVIEW_ADDPARTS",
+ "TOK_ALTERVIEW_DROPPARTS",
+ "TOK_ALTERVIEW_PROPERTIES",
+ "TOK_ALTERVIEW_RENAME"), children)
+
+ // if ALTER VIEW doesn't have query part, let hive to handle it.
+ maybeQuery.map { query =>
+ createView(view, context, viewNameParts, query, Nil, Map(), false, true)
+ }.getOrElse(NativePlaceholder)
+
+ case view @ Token("TOK_CREATEVIEW", children)
+ if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty =>
+ val Seq(
+ Some(viewNameParts),
+ Some(query),
+ maybeComment,
+ replace,
+ allowExisting,
+ maybeProperties,
+ maybeColumns,
+ maybePartCols
+ ) = getClauses(Seq(
+ "TOK_TABNAME",
+ "TOK_QUERY",
+ "TOK_TABLECOMMENT",
+ "TOK_ORREPLACE",
+ "TOK_IFNOTEXISTS",
+ "TOK_TABLEPROPERTIES",
+ "TOK_TABCOLNAME",
+ "TOK_VIEWPARTCOLS"), children)
+
+ // If the view is partitioned, we let hive handle it.
+ if (maybePartCols.isDefined) {
+ NativePlaceholder
+ } else {
+ val schema = maybeColumns.map { cols =>
+ BaseSemanticAnalyzer.getColumns(cols, true).asScala.map { field =>
+ // We can't specify column types when create view, so fill it with null first, and
+ // update it after the schema has been resolved later.
+ HiveColumn(field.getName, null, field.getComment)
+ }
+ }.getOrElse(Seq.empty[HiveColumn])
+
+ val properties = scala.collection.mutable.Map.empty[String, String]
+
+ maybeProperties.foreach {
+ case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
+ properties ++= getProperties(list)
+ }
+
+ maybeComment.foreach {
+ case Token("TOK_TABLECOMMENT", child :: Nil) =>
+ val comment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
+ if (comment ne null) {
+ properties += ("comment" -> comment)
+ }
+ }
+
+ createView(view, context, viewNameParts, query, schema, properties.toMap,
+ allowExisting.isDefined, replace.isDefined)
+ }
+
case Token("TOK_CREATETABLE", children)
if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty =>
// Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -774,7 +895,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case _ => // Unsupport features
}
- CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting != None)
+ CreateTableAsSelect(tableDesc, nodeToPlan(query, context), allowExisting != None)
// If its not a "CTAS" like above then take it as a native command
case Token("TOK_CREATETABLE", _) => NativePlaceholder
@@ -793,7 +914,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
insertClauses.last match {
case Token("TOK_CTE", cteClauses) =>
val cteRelations = cteClauses.map(node => {
- val relation = nodeToRelation(node).asInstanceOf[Subquery]
+ val relation = nodeToRelation(node, context).asInstanceOf[Subquery]
(relation.alias, relation)
}).toMap
(Some(args.head), insertClauses.init, Some(cteRelations))
@@ -847,7 +968,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
}
val relations = fromClause match {
- case Some(f) => nodeToRelation(f)
+ case Some(f) => nodeToRelation(f, context)
case None => OneRowRelation
}
@@ -1094,7 +1215,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
cteRelations.map(With(query, _)).getOrElse(query)
// HIVE-9039 renamed TOK_UNION => TOK_UNIONALL while adding TOK_UNIONDISTINCT
- case Token("TOK_UNIONALL", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right))
+ case Token("TOK_UNIONALL", left :: right :: Nil) =>
+ Union(nodeToPlan(left, context), nodeToPlan(right, context))
case a: ASTNode =>
throw new NotImplementedError(s"No parse rules for $node:\n ${dumpTree(a).toString} ")
@@ -1102,10 +1224,10 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
val allJoinTokens = "(TOK_.*JOIN)".r
val laterViewToken = "TOK_LATERAL_VIEW(.*)".r
- def nodeToRelation(node: Node): LogicalPlan = node match {
+ def nodeToRelation(node: Node, context: Context): LogicalPlan = node match {
case Token("TOK_SUBQUERY",
query :: Token(alias, Nil) :: Nil) =>
- Subquery(cleanIdentifier(alias), nodeToPlan(query))
+ Subquery(cleanIdentifier(alias), nodeToPlan(query, context))
case Token(laterViewToken(isOuter), selectClause :: relationClause :: Nil) =>
val Token("TOK_SELECT",
@@ -1121,7 +1243,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
outer = isOuter.nonEmpty,
Some(alias.toLowerCase),
attributes.map(UnresolvedAttribute(_)),
- nodeToRelation(relationClause))
+ nodeToRelation(relationClause, context))
/* All relations, possibly with aliases or sampling clauses. */
case Token("TOK_TABREF", clauses) =>
@@ -1189,7 +1311,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
}.map(_._2)
val isPreserved = tableOrdinals.map(i => (i - 1 < 0) || joinArgs(i - 1).getText == "PRESERVE")
- val tables = tableOrdinals.map(i => nodeToRelation(joinArgs(i)))
+ val tables = tableOrdinals.map(i => nodeToRelation(joinArgs(i), context))
val joinExpressions =
tableOrdinals.map(i => joinArgs(i + 1).getChildren.asScala.map(nodeToExpr))
@@ -1244,8 +1366,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case "TOK_FULLOUTERJOIN" => FullOuter
case "TOK_LEFTSEMIJOIN" => LeftSemi
}
- Join(nodeToRelation(relation1),
- nodeToRelation(relation2),
+ Join(nodeToRelation(relation1, context),
+ nodeToRelation(relation2, context),
joinType,
other.headOption.map(nodeToExpr))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
index 3811c152a7..915eae9d21 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
@@ -19,13 +19,12 @@ package org.apache.spark.sql.hive.client
import java.io.PrintStream
import java.util.{Map => JMap}
+import javax.annotation.Nullable
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Expression
-private[hive] case class HiveDatabase(
- name: String,
- location: String)
+private[hive] case class HiveDatabase(name: String, location: String)
private[hive] abstract class TableType { val name: String }
private[hive] case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
@@ -45,7 +44,7 @@ private[hive] case class HivePartition(
values: Seq[String],
storage: HiveStorageDescriptor)
-private[hive] case class HiveColumn(name: String, hiveType: String, comment: String)
+private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String)
private[hive] case class HiveTable(
specifiedDatabase: Option[String],
name: String,
@@ -126,6 +125,12 @@ private[hive] trait ClientInterface {
/** Returns the metadata for the specified table or None if it doens't exist. */
def getTableOption(dbName: String, tableName: String): Option[HiveTable]
+ /** Creates a view with the given metadata. */
+ def createView(view: HiveTable): Unit
+
+ /** Updates the given view with new metadata. */
+ def alertView(view: HiveTable): Unit
+
/** Creates a table with the given metadata. */
def createTable(table: HiveTable): Unit
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 4d1e3ed919..8f6d448b2a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -354,6 +354,37 @@ private[hive] class ClientWrapper(
qlTable
}
+ private def toViewTable(view: HiveTable): metadata.Table = {
+ // TODO: this is duplicated with `toQlTable` except the table type stuff.
+ val tbl = new metadata.Table(view.database, view.name)
+ tbl.setTableType(HTableType.VIRTUAL_VIEW)
+ tbl.setSerializationLib(null)
+ tbl.clearSerDeInfo()
+
+ // TODO: we will save the same SQL string to original and expanded text, which is different
+ // from Hive.
+ tbl.setViewOriginalText(view.viewText.get)
+ tbl.setViewExpandedText(view.viewText.get)
+
+ tbl.setFields(view.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)).asJava)
+ view.properties.foreach { case (k, v) => tbl.setProperty(k, v) }
+
+ // set owner
+ tbl.setOwner(conf.getUser)
+ // set create time
+ tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
+
+ tbl
+ }
+
+ override def createView(view: HiveTable): Unit = withHiveState {
+ client.createTable(toViewTable(view))
+ }
+
+ override def alertView(view: HiveTable): Unit = withHiveState {
+ client.alterTable(view.qualifiedName, toViewTable(view))
+ }
+
override def createTable(table: HiveTable): Unit = withHiveState {
val qlTable = toQlTable(table)
client.createTable(qlTable)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
new file mode 100644
index 0000000000..2b504ac974
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext}
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.hive.client.{HiveColumn, HiveTable}
+
+/**
+ * Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
+ * depending on Hive meta-store.
+ */
+// TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is different
+// from Hive and may not work for some cases like create view on self join.
+private[hive] case class CreateViewAsSelect(
+ tableDesc: HiveTable,
+ childSchema: Seq[Attribute],
+ allowExisting: Boolean,
+ orReplace: Boolean) extends RunnableCommand {
+
+ assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length)
+ assert(tableDesc.viewText.isDefined)
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val hiveContext = sqlContext.asInstanceOf[HiveContext]
+ val database = tableDesc.database
+ val viewName = tableDesc.name
+
+ if (hiveContext.catalog.tableExists(Seq(database, viewName))) {
+ if (allowExisting) {
+ // view already exists, will do nothing, to keep consistent with Hive
+ } else if (orReplace) {
+ hiveContext.catalog.client.alertView(prepareTable())
+ } else {
+ throw new AnalysisException(s"View $database.$viewName already exists. " +
+ "If you want to update the view definition, please use ALTER VIEW AS or " +
+ "CREATE OR REPLACE VIEW AS")
+ }
+ } else {
+ hiveContext.catalog.client.createView(prepareTable())
+ }
+
+ Seq.empty[Row]
+ }
+
+ private def prepareTable(): HiveTable = {
+ // setup column types according to the schema of child.
+ val schema = if (tableDesc.schema == Nil) {
+ childSchema.map { attr =>
+ HiveColumn(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), null)
+ }
+ } else {
+ childSchema.zip(tableDesc.schema).map { case (attr, col) =>
+ HiveColumn(col.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), col.comment)
+ }
+ }
+
+ val columnNames = childSchema.map(f => verbose(f.name))
+
+ // When user specified column names for view, we should create a project to do the renaming.
+ // When no column name specified, we still need to create a project to declare the columns
+ // we need, to make us more robust to top level `*`s.
+ val projectList = if (tableDesc.schema == Nil) {
+ columnNames.mkString(", ")
+ } else {
+ columnNames.zip(tableDesc.schema.map(f => verbose(f.name))).map {
+ case (name, alias) => s"$name AS $alias"
+ }.mkString(", ")
+ }
+
+ val viewName = verbose(tableDesc.name)
+
+ val expandedText = s"SELECT $projectList FROM (${tableDesc.viewText.get}) $viewName"
+
+ tableDesc.copy(schema = schema, viewText = Some(expandedText))
+ }
+
+ // escape backtick with double-backtick in column name and wrap it with backtick.
+ private def verbose(name: String) = s"`${name.replaceAll("`", "``")}`"
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 8c3f9ac202..ec5b83b98e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1248,4 +1248,121 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
""".stripMargin), Row("b", 6.0) :: Row("a", 7.0) :: Nil)
}
}
+
+ test("correctly parse CREATE VIEW statement") {
+ withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") {
+ withTable("jt") {
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt")
+ sql(
+ """CREATE VIEW IF NOT EXISTS
+ |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
+ |COMMENT 'blabla'
+ |TBLPROPERTIES ('a' = 'b')
+ |AS SELECT * FROM jt""".stripMargin)
+ checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
+ sql("DROP VIEW testView")
+ }
+ }
+ }
+
+ test("correctly handle CREATE VIEW IF NOT EXISTS") {
+ withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") {
+ withTable("jt", "jt2") {
+ sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt2")
+ sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2")
+
+ // make sure our view doesn't change.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+ sql("DROP VIEW testView")
+ }
+ }
+ }
+
+ test("correctly handle CREATE OR REPLACE VIEW") {
+ withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") {
+ withTable("jt", "jt2") {
+ sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt2")
+ sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
+ // make sure the view has been changed.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+
+ sql("DROP VIEW testView")
+
+ val e = intercept[AnalysisException] {
+ sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
+ }
+ assert(e.message.contains("not allowed to define a view"))
+ }
+ }
+ }
+
+ test("correctly handle ALTER VIEW") {
+ withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") {
+ withTable("jt", "jt2") {
+ sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+
+ val df = (1 until 10).map(i => i -> i).toDF("i", "j")
+ df.write.format("json").saveAsTable("jt2")
+ sql("ALTER VIEW testView AS SELECT * FROM jt2")
+ // make sure the view has been changed.
+ checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
+
+ sql("DROP VIEW testView")
+ }
+ }
+ }
+
+ test("create hive view for json table") {
+ // json table is not hive-compatible, make sure the new flag fix it.
+ withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") {
+ withTable("jt") {
+ sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+ sql("CREATE VIEW testView AS SELECT id FROM jt")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
+ sql("DROP VIEW testView")
+ }
+ }
+ }
+
+ test("create hive view for partitioned parquet table") {
+ // partitioned parquet table is not hive-compatible, make sure the new flag fix it.
+ withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") {
+ withTable("parTable") {
+ val df = Seq(1 -> "a").toDF("i", "j")
+ df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
+ sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
+ checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
+ sql("DROP VIEW testView")
+ }
+ }
+ }
+
+ test("create hive view for joined tables") {
+ // make sure the new flag can handle some complex cases like join and schema change.
+ withSQLConf(SQLConf.CANONICALIZE_VIEW.key -> "true") {
+ withTable("jt1", "jt2") {
+ sqlContext.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1")
+ sqlContext.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2")
+ sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
+
+ val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol")
+ df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1")
+ checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i)))
+
+ sql("DROP VIEW testView")
+ }
+ }
+ }
}