aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-02-07 00:36:57 +0800
committerWenchen Fan <wenchen@databricks.com>2017-02-07 00:36:57 +0800
commitaff53021cf828cd7c139d8ec230d45593078b73a (patch)
treed5add8b21cf710eb30329b040bbcd3ee721fd00c
parent0f16ff5b0ec8cd828774ba5ddb276d7b06dbe273 (diff)
downloadspark-aff53021cf828cd7c139d8ec230d45593078b73a.tar.gz
spark-aff53021cf828cd7c139d8ec230d45593078b73a.tar.bz2
spark-aff53021cf828cd7c139d8ec230d45593078b73a.zip
[SPARK-19080][SQL] simplify data source analysis
## What changes were proposed in this pull request? The current way of resolving `InsertIntoTable` and `CreateTable` is convoluted: sometimes we replace them with concrete implementation commands during analysis, sometimes during planning phase. And the error checking logic is also a mess: we may put it in extended analyzer rules, or extended checking rules, or `CheckAnalysis`. This PR simplifies the data source analysis: 1. `InsertIntoTable` and `CreateTable` are always unresolved and need to be replaced by concrete implementation commands during analysis. 2. The error checking logic is mainly in 2 rules: `PreprocessTableCreation` and `PreprocessTableInsertion`. ## How was this patch tested? existing test. Author: Wenchen Fan <wenchen@databricks.com> Closes #16269 from cloud-fan/ddl.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala26
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala130
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala24
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala16
20 files changed, 126 insertions, 199 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index f13a1f6d5d..2a3d3a173c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -376,28 +376,6 @@ trait CheckAnalysis extends PredicateHelper {
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
""".stripMargin)
- case InsertIntoTable(t, _, _, _, _)
- if !t.isInstanceOf[LeafNode] ||
- t.isInstanceOf[Range] ||
- t == OneRowRelation ||
- t.isInstanceOf[LocalRelation] =>
- failAnalysis(s"Inserting into an RDD-based table is not allowed.")
-
- case i @ InsertIntoTable(table, partitions, query, _, _) =>
- val numStaticPartitions = partitions.values.count(_.isDefined)
- if (table.output.size != (query.output.size + numStaticPartitions)) {
- failAnalysis(
- s"$table requires that the data to be inserted have the same number of " +
- s"columns as the target table: target table has ${table.output.size} " +
- s"column(s) but the inserted data has " +
- s"${query.output.size + numStaticPartitions} column(s), including " +
- s"$numStaticPartitions partition column(s) having constant value(s).")
- }
-
- case o if !o.resolved =>
- failAnalysis(
- s"unresolved operator ${operator.simpleString}")
-
case o if o.expressions.exists(!_.deterministic) &&
!o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
!o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] =>
@@ -413,6 +391,10 @@ trait CheckAnalysis extends PredicateHelper {
}
}
extendedCheckRules.foreach(_(plan))
+ plan.foreachUp {
+ case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}")
+ case _ =>
+ }
plan.foreach(_.setAnalyzed())
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index f4d016cb96..e4fd737b35 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -111,9 +111,6 @@ object UnsupportedOperationChecker {
throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
"streaming DataFrames/Datasets")
- case _: InsertIntoTable =>
- throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets")
-
case Join(left, right, joinType, _) =>
joinType match {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 432097d621..8d7a6bc4b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -363,7 +363,8 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
}
/**
- * Insert some data into a table.
+ * Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
+ * concrete implementations during analysis.
*
* @param table the logical plan representing the table. In the future this should be a
* [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables
@@ -374,25 +375,24 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
* Map('a' -> Some('1'), 'b' -> Some('2')),
* and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
* would have Map('a' -> Some('1'), 'b' -> None).
- * @param child the logical plan representing data to write to.
+ * @param query the logical plan representing data to write to.
* @param overwrite overwrite existing table or partitions.
* @param ifNotExists If true, only write if the table or partition does not exist.
*/
case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
- child: LogicalPlan,
+ query: LogicalPlan,
overwrite: Boolean,
ifNotExists: Boolean)
extends LogicalPlan {
-
- override def children: Seq[LogicalPlan] = child :: Nil
- override def output: Seq[Attribute] = Seq.empty
-
assert(overwrite || !ifNotExists)
assert(partition.values.forall(_.nonEmpty) || !ifNotExists)
- override lazy val resolved: Boolean = childrenResolved && table.resolved
+ // We don't want `table` in children as sometimes we don't want to transform it.
+ override def children: Seq[LogicalPlan] = query :: Nil
+ override def output: Seq[Attribute] = Seq.empty
+ override lazy val resolved: Boolean = false
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index ff1f0177e8..81657d9e47 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -265,7 +265,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
InsertIntoTable(
table = UnresolvedRelation(tableIdent),
partition = Map.empty[String, Option[String]],
- child = df.logicalPlan,
+ query = df.logicalPlan,
overwrite = mode == SaveMode.Overwrite,
ifNotExists = false)).toRdd
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index 73e2ffdf00..678241656c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -36,7 +36,6 @@ class SparkPlanner(
extraStrategies ++ (
FileSourceStrategy ::
DataSourceStrategy ::
- DDLStrategy ::
SpecialLimits ::
Aggregation ::
JoinSelection ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index fafb919670..e3ec343479 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -405,32 +405,4 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case _ => Nil
}
}
-
- object DDLStrategy extends Strategy {
- def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
- val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore)
- ExecutedCommandExec(cmd) :: Nil
-
- case CreateTable(tableDesc, mode, None) =>
- val cmd =
- CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
- ExecutedCommandExec(cmd) :: Nil
-
- // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule
- // `CreateTables`
-
- case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isDatasourceTable(tableDesc) =>
- val cmd =
- CreateDataSourceTableAsSelectCommand(
- tableDesc,
- mode,
- query)
- ExecutedCommandExec(cmd) :: Nil
-
- case c: CreateTempViewUsing => ExecutedCommandExec(c) :: Nil
-
- case _ => Nil
- }
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index bb903a2662..bc4b5b6258 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -111,10 +111,12 @@ case class CreateTableLikeCommand(
* [AS select_statement];
* }}}
*/
-case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
+case class CreateTableCommand(
+ table: CatalogTable,
+ ignoreIfExists: Boolean) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- sparkSession.sessionState.catalog.createTable(table, ifNotExists)
+ sparkSession.sessionState.catalog.createTable(table, ignoreIfExists)
Seq.empty[Row]
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 19db293132..d8a5158287 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -45,7 +45,8 @@ import org.apache.spark.unsafe.types.UTF8String
* Replaces generic operations with specific variants that are designed to work with Spark
* SQL Data Sources.
*
- * Note that, this rule must be run after [[PreprocessTableInsertion]].
+ * Note that, this rule must be run after `PreprocessTableCreation` and
+ * `PreprocessTableInsertion`.
*/
case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
@@ -130,6 +131,17 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
}
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) =>
+ CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
+
+ case CreateTable(tableDesc, mode, Some(query))
+ if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
+ CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
+
+ case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _),
+ parts, query, overwrite, false) if parts.isEmpty =>
+ InsertIntoDataSourceCommand(l, query, overwrite)
+
case InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false) =>
// If the InsertIntoTable command is for a partitioned HadoopFsRelation and
@@ -273,10 +285,6 @@ object DataSourceStrategy extends Strategy with Logging {
Map.empty,
None) :: Nil
- case InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
- part, query, overwrite, false) if part.isEmpty =>
- ExecutedCommandExec(InsertIntoDataSourceCommand(l, query, overwrite)) :: Nil
-
case _ => Nil
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index d10fa2c9ff..110d503f91 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -20,15 +20,23 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.types._
+/**
+ * Create a table and optionally insert some data into it. Note that this plan is unresolved and
+ * has to be replaced by the concrete implementations during analysis.
+ *
+ * @param tableDesc the metadata of the table to be created.
+ * @param mode the data writing mode
+ * @param query an optional logical plan representing data to write into the created table.
+ */
case class CreateTable(
tableDesc: CatalogTable,
mode: SaveMode,
- query: Option[LogicalPlan]) extends Command {
+ query: Option[LogicalPlan]) extends LogicalPlan {
assert(tableDesc.provider.isDefined, "The table to be created must have a provider.")
if (query.isEmpty) {
@@ -37,7 +45,9 @@ case class CreateTable(
"create table without data insertion can only use ErrorIfExists or Ignore as SaveMode.")
}
- override def innerChildren: Seq[QueryPlan[_]] = query.toSeq
+ override def children: Seq[LogicalPlan] = query.toSeq
+ override def output: Seq[Attribute] = Seq.empty
+ override lazy val resolved: Boolean = false
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 623d47b4c9..e053a0e9e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -21,12 +21,11 @@ import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering}
-import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
+import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.types.{AtomicType, StructType}
/**
@@ -65,10 +64,10 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] {
}
/**
- * Analyze [[CreateTable]] and do some normalization and checking.
- * For CREATE TABLE AS SELECT, the SELECT query is also analyzed.
+ * Preprocess [[CreateTable]], to do some normalization and checking.
*/
-case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+ private val catalog = sparkSession.sessionState.catalog
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// When we CREATE TABLE without specifying the table schema, we should fail the query if
@@ -91,16 +90,10 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
// bucket spec, etc. match the existing table, and adjust the columns order of the given query
// if necessary.
case c @ CreateTable(tableDesc, SaveMode.Append, Some(query))
- if sparkSession.sessionState.catalog.tableExists(tableDesc.identifier) =>
+ if query.resolved && catalog.tableExists(tableDesc.identifier) =>
// This is guaranteed by the parser and `DataFrameWriter`
assert(tableDesc.provider.isDefined)
- // Analyze the query in CTAS and then we can do the normalization and checking.
- val qe = sparkSession.sessionState.executePlan(query)
- qe.assertAnalyzed()
- val analyzedQuery = qe.analyzed
-
- val catalog = sparkSession.sessionState.catalog
val db = tableDesc.identifier.database.getOrElse(catalog.getCurrentDatabase)
val tableIdentWithDB = tableDesc.identifier.copy(database = Some(db))
val tableName = tableIdentWithDB.unquotedString
@@ -121,7 +114,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
s"`${specifiedProvider.getSimpleName}`.")
}
- if (analyzedQuery.schema.length != existingTable.schema.length) {
+ if (query.schema.length != existingTable.schema.length) {
throw new AnalysisException(
s"The column number of the existing table $tableName" +
s"(${existingTable.schema.catalogString}) doesn't match the data schema" +
@@ -135,8 +128,8 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
// adjust the column order of the given dataframe according to it, or throw exception
// if the column names do not match.
val adjustedColumns = tableCols.map { col =>
- analyzedQuery.resolve(Seq(col), resolver).getOrElse {
- val inputColumns = analyzedQuery.schema.map(_.name).mkString(", ")
+ query.resolve(Seq(col), resolver).getOrElse {
+ val inputColumns = query.schema.map(_.name).mkString(", ")
throw new AnalysisException(
s"cannot resolve '$col' given input columns: [$inputColumns]")
}
@@ -172,10 +165,10 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
""".stripMargin)
}
- val newQuery = if (adjustedColumns != analyzedQuery.output) {
- Project(adjustedColumns, analyzedQuery)
+ val newQuery = if (adjustedColumns != query.output) {
+ Project(adjustedColumns, query)
} else {
- analyzedQuery
+ query
}
c.copy(
@@ -191,15 +184,12 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
// * partition columns' type must be AtomicType.
// * sort columns' type must be orderable.
// * reorder table schema or output of query plan, to put partition columns at the end.
- case c @ CreateTable(tableDesc, _, query) =>
+ case c @ CreateTable(tableDesc, _, query) if query.forall(_.resolved) =>
if (query.isDefined) {
assert(tableDesc.schema.isEmpty,
"Schema may not be specified in a Create Table As Select (CTAS) statement")
- val qe = sparkSession.sessionState.executePlan(query.get)
- qe.assertAnalyzed()
- val analyzedQuery = qe.analyzed
-
+ val analyzedQuery = query.get
val normalizedTable = normalizeCatalogTable(analyzedQuery.schema, tableDesc)
val output = analyzedQuery.output
@@ -319,16 +309,15 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
val normalizedPartSpec = PartitioningUtils.normalizePartitionSpec(
insert.partition, partColNames, tblName, conf.resolver)
- val expectedColumns = {
- val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet
- insert.table.output.filterNot(a => staticPartCols.contains(a.name))
- }
+ val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet
+ val expectedColumns = insert.table.output.filterNot(a => staticPartCols.contains(a.name))
- if (expectedColumns.length != insert.child.schema.length) {
+ if (expectedColumns.length != insert.query.schema.length) {
throw new AnalysisException(
- s"Cannot insert into table $tblName because the number of columns are different: " +
- s"need ${expectedColumns.length} columns, " +
- s"but query has ${insert.child.schema.length} columns.")
+ s"$tblName requires that the data to be inserted have the same number of columns as the " +
+ s"target table: target table has ${insert.table.output.size} column(s) but the " +
+ s"inserted data has ${insert.query.output.length + staticPartCols.size} column(s), " +
+ s"including ${staticPartCols.size} partition column(s) having constant value(s).")
}
if (normalizedPartSpec.nonEmpty) {
@@ -353,7 +342,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
private def castAndRenameChildOutput(
insert: InsertIntoTable,
expectedOutput: Seq[Attribute]): InsertIntoTable = {
- val newChildOutput = expectedOutput.zip(insert.child.output).map {
+ val newChildOutput = expectedOutput.zip(insert.query.output).map {
case (expected, actual) =>
if (expected.dataType.sameType(actual.dataType) &&
expected.name == actual.name &&
@@ -368,15 +357,15 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
}
}
- if (newChildOutput == insert.child.output) {
+ if (newChildOutput == insert.query.output) {
insert
} else {
- insert.copy(child = Project(newChildOutput, insert.child))
+ insert.copy(query = Project(newChildOutput, insert.query))
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case i @ InsertIntoTable(table, _, child, _, _) if table.resolved && child.resolved =>
+ case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved =>
table match {
case relation: CatalogRelation =>
val metadata = relation.catalogTable
@@ -387,7 +376,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
case LogicalRelation(_: InsertableRelation, _, catalogTable) =>
val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
preprocess(i, tblName, Nil)
- case other => i
+ case _ => i
}
}
}
@@ -398,10 +387,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
object HiveOnlyCheck extends (LogicalPlan => Unit) {
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
- case CreateTable(tableDesc, _, Some(_)) if DDLUtils.isHiveTable(tableDesc) =>
- throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT")
case CreateTable(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) =>
- throw new AnalysisException("Hive support is required to CREATE Hive TABLE")
+ throw new AnalysisException("Hive support is required to CREATE Hive TABLE (AS SELECT)")
case _ => // OK
}
}
@@ -410,63 +397,40 @@ object HiveOnlyCheck extends (LogicalPlan => Unit) {
/**
* A rule to do various checks before inserting into or writing to a data source table.
*/
-case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
- extends (LogicalPlan => Unit) {
+object PreWriteCheck extends (LogicalPlan => Unit) {
def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
- case logical.InsertIntoTable(
- l @ LogicalRelation(t: InsertableRelation, _, _), partition, query, _, _) =>
- // Right now, we do not support insert into a data source table with partition specs.
- if (partition.nonEmpty) {
- failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
- } else {
- // Get all input data source relations of the query.
- val srcRelations = query.collect {
- case LogicalRelation(src: BaseRelation, _, _) => src
- }
- if (srcRelations.contains(t)) {
- failAnalysis(
- "Cannot insert overwrite into table that is also being read from.")
- } else {
- // OK
- }
+ case InsertIntoTable(l @ LogicalRelation(relation, _, _), partition, query, _, _) =>
+ // Get all input data source relations of the query.
+ val srcRelations = query.collect {
+ case LogicalRelation(src, _, _) => src
}
-
- case logical.InsertIntoTable(
- LogicalRelation(r: HadoopFsRelation, _, _), part, query, _, _) =>
- // We need to make sure the partition columns specified by users do match partition
- // columns of the relation.
- val existingPartitionColumns = r.partitionSchema.fieldNames.toSet
- val specifiedPartitionColumns = part.keySet
- if (existingPartitionColumns != specifiedPartitionColumns) {
- failAnalysis("Specified partition columns " +
- s"(${specifiedPartitionColumns.mkString(", ")}) " +
- "do not match the partition columns of the table. Please use " +
- s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.")
+ if (srcRelations.contains(relation)) {
+ failAnalysis("Cannot insert into table that is also being read from.")
} else {
// OK
}
- PartitioningUtils.validatePartitionColumn(
- r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis)
+ relation match {
+ case _: HadoopFsRelation => // OK
- // Get all input data source relations of the query.
- val srcRelations = query.collect {
- case LogicalRelation(src: BaseRelation, _, _) => src
- }
- if (srcRelations.contains(r)) {
- failAnalysis(
- "Cannot insert overwrite into table that is also being read from.")
- } else {
- // OK
+ // Right now, we do not support insert into a non-file-based data source table with
+ // partition specs.
+ case _: InsertableRelation if partition.nonEmpty =>
+ failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
+
+ case _ => failAnalysis(s"$relation does not allow insertion.")
}
- case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
- // The relation in l is not an InsertableRelation.
- failAnalysis(s"$l does not allow insertion.")
+ case InsertIntoTable(t, _, _, _, _)
+ if !t.isInstanceOf[LeafNode] ||
+ t.isInstanceOf[Range] ||
+ t == OneRowRelation ||
+ t.isInstanceOf[LocalRelation] =>
+ failAnalysis(s"Inserting into an RDD-based table is not allowed.")
case _ => // OK
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index a5ebe4780f..6908560511 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -118,12 +118,11 @@ private[sql] class SessionState(sparkSession: SparkSession) {
new ResolveSQLOnFile(sparkSession) :: Nil
override val postHocResolutionRules =
- AnalyzeCreateTable(sparkSession) ::
+ PreprocessTableCreation(sparkSession) ::
PreprocessTableInsertion(conf) ::
DataSourceAnalysis(conf) :: Nil
- override val extendedCheckRules =
- Seq(PreWriteCheck(conf, catalog), HiveOnlyCheck)
+ override val extendedCheckRules = Seq(PreWriteCheck, HiveOnlyCheck)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index f6d1ee2287..bcb707c8fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1555,13 +1555,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
var e = intercept[AnalysisException] {
sql("CREATE TABLE t SELECT 1 as a, 1 as b")
}.getMessage
- assert(e.contains("Hive support is required to use CREATE Hive TABLE AS SELECT"))
+ assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
e = intercept[AnalysisException] {
sql("CREATE TABLE t SELECT a, b from t1")
}.getMessage
- assert(e.contains("Hive support is required to use CREATE Hive TABLE AS SELECT"))
+ assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 13284ba649..5b215ca07f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -113,7 +113,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
|INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt
""".stripMargin)
}.getMessage
- assert(message.contains("the number of columns are different")
+ assert(message.contains("target table has 2 column(s) but the inserted data has 1 column(s)")
)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index faa76b73fd..677da0dbdc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -247,16 +247,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
override def apply(plan: LogicalPlan): LogicalPlan = {
- if (!plan.resolved || plan.analyzed) {
- return plan
- }
-
plan transformUp {
// Write path
- case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
- // Inserting into partitioned table is not supported in Parquet data source (yet).
- if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
- InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists)
+ case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists)
+ // Inserting into partitioned table is not supported in Parquet data source (yet).
+ if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
+ InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists)
// Read path
case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) =>
@@ -285,16 +281,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
override def apply(plan: LogicalPlan): LogicalPlan = {
- if (!plan.resolved || plan.analyzed) {
- return plan
- }
-
plan transformUp {
// Write path
- case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
- // Inserting into partitioned table is not supported in Orc data source (yet).
- if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
- InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists)
+ case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists)
+ // Inserting into partitioned table is not supported in Orc data source (yet).
+ if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
+ InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists)
// Read path
case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 44ef5cce2e..c9be1b9d10 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -68,6 +68,8 @@ private[sql] class HiveSessionCatalog(
// and HiveCatalog. We should still do it at some point...
private val metastoreCatalog = new HiveMetastoreCatalog(sparkSession)
+ // These 2 rules must be run before all other DDL post-hoc resolution rules, i.e.
+ // `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 413712e0c6..273cf85df3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -60,20 +60,20 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
override lazy val analyzer: Analyzer = {
new Analyzer(catalog, conf) {
override val extendedResolutionRules =
- catalog.ParquetConversions ::
- catalog.OrcConversions ::
new ResolveHiveSerdeTable(sparkSession) ::
new FindDataSourceTable(sparkSession) ::
new FindHiveSerdeTable(sparkSession) ::
new ResolveSQLOnFile(sparkSession) :: Nil
override val postHocResolutionRules =
- AnalyzeCreateTable(sparkSession) ::
+ catalog.ParquetConversions ::
+ catalog.OrcConversions ::
+ PreprocessTableCreation(sparkSession) ::
PreprocessTableInsertion(conf) ::
DataSourceAnalysis(conf) ::
HiveAnalysis :: Nil
- override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
+ override val extendedCheckRules = Seq(PreWriteCheck)
}
}
@@ -89,7 +89,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
experimentalMethods.extraStrategies ++ Seq(
FileSourceStrategy,
DataSourceStrategy,
- DDLStrategy,
SpecialLimits,
InMemoryScans,
HiveTableScans,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 0f293c21fa..f45532cc38 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion}
+import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
+import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.HiveSerDe
@@ -109,13 +109,17 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
/**
* Replaces generic operations with specific variants that are designed to work with Hive.
*
- * Note that, this rule must be run after `PreprocessTableInsertion`.
+ * Note that, this rule must be run after `PreprocessTableCreation` and
+ * `PreprocessTableInsertion`.
*/
object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) =>
InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
+ case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
+ CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
+
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
CreateHiveTableAsSelectCommand(tableDesc, query, mode)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 6f43b83607..0bd08877a3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -52,7 +52,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
private def analyzeCreateTable(sql: String): CatalogTable = {
TestHive.sessionState.analyzer.execute(parser.parsePlan(sql)).collect {
- case CreateTable(tableDesc, mode, _) => tableDesc
+ case CreateTableCommand(tableDesc, _) => tableDesc
}.head
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index e3ddaf7254..71ce5a7c4a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -376,7 +376,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
val e = intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3")
}
- assert(e.message.contains("the number of columns are different"))
+ assert(e.message.contains(
+ "target table has 4 column(s) but the inserted data has 5 column(s)"))
}
testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index e9239ea56f..1a1b2571b6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -307,13 +307,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin)
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
- df.queryExecution.sparkPlan match {
- case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
+ df.queryExecution.analyzed match {
+ case cmd: InsertIntoHadoopFsRelationCommand =>
assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
- s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan. " +
- s"However, found a ${o.toString} ")
+ s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}")
}
checkAnswer(
@@ -338,13 +336,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin)
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
- df.queryExecution.sparkPlan match {
- case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
+ df.queryExecution.analyzed match {
+ case cmd: InsertIntoHadoopFsRelationCommand =>
assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
- s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan." +
- s"However, found a ${o.toString} ")
+ s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}")
}
checkAnswer(