From aff53021cf828cd7c139d8ec230d45593078b73a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 7 Feb 2017 00:36:57 +0800 Subject: [SPARK-19080][SQL] simplify data source analysis ## What changes were proposed in this pull request? The current way of resolving `InsertIntoTable` and `CreateTable` is convoluted: sometimes we replace them with concrete implementation commands during analysis, sometimes during planning phase. And the error checking logic is also a mess: we may put it in extended analyzer rules, or extended checking rules, or `CheckAnalysis`. This PR simplifies the data source analysis: 1. `InsertIntoTable` and `CreateTable` are always unresolved and need to be replaced by concrete implementation commands during analysis. 2. The error checking logic is mainly in 2 rules: `PreprocessTableCreation` and `PreprocessTableInsertion`. ## How was this patch tested? existing test. Author: Wenchen Fan Closes #16269 from cloud-fan/ddl. --- .../sql/catalyst/analysis/CheckAnalysis.scala | 26 +---- .../analysis/UnsupportedOperationChecker.scala | 3 - .../plans/logical/basicLogicalOperators.scala | 16 +-- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../apache/spark/sql/execution/SparkPlanner.scala | 1 - .../spark/sql/execution/SparkStrategies.scala | 28 ----- .../spark/sql/execution/command/tables.scala | 6 +- .../execution/datasources/DataSourceStrategy.scala | 18 ++- .../spark/sql/execution/datasources/ddl.scala | 18 ++- .../spark/sql/execution/datasources/rules.scala | 130 ++++++++------------- .../apache/spark/sql/internal/SessionState.scala | 5 +- .../spark/sql/execution/command/DDLSuite.scala | 4 +- .../org/apache/spark/sql/sources/InsertSuite.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 24 ++-- .../apache/spark/sql/hive/HiveSessionCatalog.scala | 2 + .../apache/spark/sql/hive/HiveSessionState.scala | 9 +- .../org/apache/spark/sql/hive/HiveStrategies.scala | 10 +- .../spark/sql/hive/HiveDDLCommandSuite.scala | 2 +- .../spark/sql/hive/InsertIntoHiveTableSuite.scala | 3 +- .../org/apache/spark/sql/hive/parquetSuites.scala | 16 +-- 20 files changed, 126 insertions(+), 199 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index f13a1f6d5d..2a3d3a173c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -376,28 +376,6 @@ trait CheckAnalysis extends PredicateHelper { |Conflicting attributes: ${conflictingAttributes.mkString(",")} """.stripMargin) - case InsertIntoTable(t, _, _, _, _) - if !t.isInstanceOf[LeafNode] || - t.isInstanceOf[Range] || - t == OneRowRelation || - t.isInstanceOf[LocalRelation] => - failAnalysis(s"Inserting into an RDD-based table is not allowed.") - - case i @ InsertIntoTable(table, partitions, query, _, _) => - val numStaticPartitions = partitions.values.count(_.isDefined) - if (table.output.size != (query.output.size + numStaticPartitions)) { - failAnalysis( - s"$table requires that the data to be inserted have the same number of " + - s"columns as the target table: target table has ${table.output.size} " + - s"column(s) but the inserted data has " + - s"${query.output.size + numStaticPartitions} column(s), including " + - s"$numStaticPartitions partition column(s) having constant value(s).") - } - - case o if !o.resolved => - failAnalysis( - s"unresolved operator ${operator.simpleString}") - case o if o.expressions.exists(!_.deterministic) && !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] && !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] => @@ -413,6 +391,10 @@ trait CheckAnalysis extends PredicateHelper { } } extendedCheckRules.foreach(_(plan)) + plan.foreachUp { + case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}") + case _ => + } plan.foreach(_.setAnalyzed()) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index f4d016cb96..e4fd737b35 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -111,9 +111,6 @@ object UnsupportedOperationChecker { throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " + "streaming DataFrames/Datasets") - case _: InsertIntoTable => - throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets") - case Join(left, right, joinType, _) => joinType match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 432097d621..8d7a6bc4b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -363,7 +363,8 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { } /** - * Insert some data into a table. + * Insert some data into a table. Note that this plan is unresolved and has to be replaced by the + * concrete implementations during analysis. * * @param table the logical plan representing the table. In the future this should be a * [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables @@ -374,25 +375,24 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { * Map('a' -> Some('1'), 'b' -> Some('2')), * and `INSERT INTO tbl PARTITION (a=1, b) AS ...` * would have Map('a' -> Some('1'), 'b' -> None). - * @param child the logical plan representing data to write to. + * @param query the logical plan representing data to write to. * @param overwrite overwrite existing table or partitions. * @param ifNotExists If true, only write if the table or partition does not exist. */ case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], - child: LogicalPlan, + query: LogicalPlan, overwrite: Boolean, ifNotExists: Boolean) extends LogicalPlan { - - override def children: Seq[LogicalPlan] = child :: Nil - override def output: Seq[Attribute] = Seq.empty - assert(overwrite || !ifNotExists) assert(partition.values.forall(_.nonEmpty) || !ifNotExists) - override lazy val resolved: Boolean = childrenResolved && table.resolved + // We don't want `table` in children as sometimes we don't want to transform it. + override def children: Seq[LogicalPlan] = query :: Nil + override def output: Seq[Attribute] = Seq.empty + override lazy val resolved: Boolean = false } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ff1f0177e8..81657d9e47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -265,7 +265,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { InsertIntoTable( table = UnresolvedRelation(tableIdent), partition = Map.empty[String, Option[String]], - child = df.logicalPlan, + query = df.logicalPlan, overwrite = mode == SaveMode.Overwrite, ifNotExists = false)).toRdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 73e2ffdf00..678241656c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -36,7 +36,6 @@ class SparkPlanner( extraStrategies ++ ( FileSourceStrategy :: DataSourceStrategy :: - DDLStrategy :: SpecialLimits :: Aggregation :: JoinSelection :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index fafb919670..e3ec343479 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -405,32 +405,4 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => Nil } } - - object DDLStrategy extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => - val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore) - ExecutedCommandExec(cmd) :: Nil - - case CreateTable(tableDesc, mode, None) => - val cmd = - CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) - ExecutedCommandExec(cmd) :: Nil - - // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule - // `CreateTables` - - case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isDatasourceTable(tableDesc) => - val cmd = - CreateDataSourceTableAsSelectCommand( - tableDesc, - mode, - query) - ExecutedCommandExec(cmd) :: Nil - - case c: CreateTempViewUsing => ExecutedCommandExec(c) :: Nil - - case _ => Nil - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index bb903a2662..bc4b5b6258 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -111,10 +111,12 @@ case class CreateTableLikeCommand( * [AS select_statement]; * }}} */ -case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand { +case class CreateTableCommand( + table: CatalogTable, + ignoreIfExists: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - sparkSession.sessionState.catalog.createTable(table, ifNotExists) + sparkSession.sessionState.catalog.createTable(table, ignoreIfExists) Seq.empty[Row] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 19db293132..d8a5158287 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -45,7 +45,8 @@ import org.apache.spark.unsafe.types.UTF8String * Replaces generic operations with specific variants that are designed to work with Spark * SQL Data Sources. * - * Note that, this rule must be run after [[PreprocessTableInsertion]]. + * Note that, this rule must be run after `PreprocessTableCreation` and + * `PreprocessTableInsertion`. */ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { @@ -130,6 +131,17 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) => + CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) + + case CreateTable(tableDesc, mode, Some(query)) + if query.resolved && DDLUtils.isDatasourceTable(tableDesc) => + CreateDataSourceTableAsSelectCommand(tableDesc, mode, query) + + case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _), + parts, query, overwrite, false) if parts.isEmpty => + InsertIntoDataSourceCommand(l, query, overwrite) + case InsertIntoTable( l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false) => // If the InsertIntoTable command is for a partitioned HadoopFsRelation and @@ -273,10 +285,6 @@ object DataSourceStrategy extends Strategy with Logging { Map.empty, None) :: Nil - case InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _), - part, query, overwrite, false) if part.isEmpty => - ExecutedCommandExec(InsertIntoDataSourceCommand(l, query, overwrite)) :: Nil - case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index d10fa2c9ff..110d503f91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -20,15 +20,23 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} -import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand} import org.apache.spark.sql.types._ +/** + * Create a table and optionally insert some data into it. Note that this plan is unresolved and + * has to be replaced by the concrete implementations during analysis. + * + * @param tableDesc the metadata of the table to be created. + * @param mode the data writing mode + * @param query an optional logical plan representing data to write into the created table. + */ case class CreateTable( tableDesc: CatalogTable, mode: SaveMode, - query: Option[LogicalPlan]) extends Command { + query: Option[LogicalPlan]) extends LogicalPlan { assert(tableDesc.provider.isDefined, "The table to be created must have a provider.") if (query.isEmpty) { @@ -37,7 +45,9 @@ case class CreateTable( "create table without data insertion can only use ErrorIfExists or Ignore as SaveMode.") } - override def innerChildren: Seq[QueryPlan[_]] = query.toSeq + override def children: Seq[LogicalPlan] = query.toSeq + override def output: Seq[Attribute] = Seq.empty + override lazy val resolved: Boolean = false } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 623d47b4c9..e053a0e9e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -21,12 +21,11 @@ import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering} -import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} +import org.apache.spark.sql.sources.InsertableRelation import org.apache.spark.sql.types.{AtomicType, StructType} /** @@ -65,10 +64,10 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] { } /** - * Analyze [[CreateTable]] and do some normalization and checking. - * For CREATE TABLE AS SELECT, the SELECT query is also analyzed. + * Preprocess [[CreateTable]], to do some normalization and checking. */ -case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { +case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[LogicalPlan] { + private val catalog = sparkSession.sessionState.catalog def apply(plan: LogicalPlan): LogicalPlan = plan transform { // When we CREATE TABLE without specifying the table schema, we should fail the query if @@ -91,16 +90,10 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl // bucket spec, etc. match the existing table, and adjust the columns order of the given query // if necessary. case c @ CreateTable(tableDesc, SaveMode.Append, Some(query)) - if sparkSession.sessionState.catalog.tableExists(tableDesc.identifier) => + if query.resolved && catalog.tableExists(tableDesc.identifier) => // This is guaranteed by the parser and `DataFrameWriter` assert(tableDesc.provider.isDefined) - // Analyze the query in CTAS and then we can do the normalization and checking. - val qe = sparkSession.sessionState.executePlan(query) - qe.assertAnalyzed() - val analyzedQuery = qe.analyzed - - val catalog = sparkSession.sessionState.catalog val db = tableDesc.identifier.database.getOrElse(catalog.getCurrentDatabase) val tableIdentWithDB = tableDesc.identifier.copy(database = Some(db)) val tableName = tableIdentWithDB.unquotedString @@ -121,7 +114,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl s"`${specifiedProvider.getSimpleName}`.") } - if (analyzedQuery.schema.length != existingTable.schema.length) { + if (query.schema.length != existingTable.schema.length) { throw new AnalysisException( s"The column number of the existing table $tableName" + s"(${existingTable.schema.catalogString}) doesn't match the data schema" + @@ -135,8 +128,8 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl // adjust the column order of the given dataframe according to it, or throw exception // if the column names do not match. val adjustedColumns = tableCols.map { col => - analyzedQuery.resolve(Seq(col), resolver).getOrElse { - val inputColumns = analyzedQuery.schema.map(_.name).mkString(", ") + query.resolve(Seq(col), resolver).getOrElse { + val inputColumns = query.schema.map(_.name).mkString(", ") throw new AnalysisException( s"cannot resolve '$col' given input columns: [$inputColumns]") } @@ -172,10 +165,10 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl """.stripMargin) } - val newQuery = if (adjustedColumns != analyzedQuery.output) { - Project(adjustedColumns, analyzedQuery) + val newQuery = if (adjustedColumns != query.output) { + Project(adjustedColumns, query) } else { - analyzedQuery + query } c.copy( @@ -191,15 +184,12 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl // * partition columns' type must be AtomicType. // * sort columns' type must be orderable. // * reorder table schema or output of query plan, to put partition columns at the end. - case c @ CreateTable(tableDesc, _, query) => + case c @ CreateTable(tableDesc, _, query) if query.forall(_.resolved) => if (query.isDefined) { assert(tableDesc.schema.isEmpty, "Schema may not be specified in a Create Table As Select (CTAS) statement") - val qe = sparkSession.sessionState.executePlan(query.get) - qe.assertAnalyzed() - val analyzedQuery = qe.analyzed - + val analyzedQuery = query.get val normalizedTable = normalizeCatalogTable(analyzedQuery.schema, tableDesc) val output = analyzedQuery.output @@ -319,16 +309,15 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { val normalizedPartSpec = PartitioningUtils.normalizePartitionSpec( insert.partition, partColNames, tblName, conf.resolver) - val expectedColumns = { - val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet - insert.table.output.filterNot(a => staticPartCols.contains(a.name)) - } + val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet + val expectedColumns = insert.table.output.filterNot(a => staticPartCols.contains(a.name)) - if (expectedColumns.length != insert.child.schema.length) { + if (expectedColumns.length != insert.query.schema.length) { throw new AnalysisException( - s"Cannot insert into table $tblName because the number of columns are different: " + - s"need ${expectedColumns.length} columns, " + - s"but query has ${insert.child.schema.length} columns.") + s"$tblName requires that the data to be inserted have the same number of columns as the " + + s"target table: target table has ${insert.table.output.size} column(s) but the " + + s"inserted data has ${insert.query.output.length + staticPartCols.size} column(s), " + + s"including ${staticPartCols.size} partition column(s) having constant value(s).") } if (normalizedPartSpec.nonEmpty) { @@ -353,7 +342,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { private def castAndRenameChildOutput( insert: InsertIntoTable, expectedOutput: Seq[Attribute]): InsertIntoTable = { - val newChildOutput = expectedOutput.zip(insert.child.output).map { + val newChildOutput = expectedOutput.zip(insert.query.output).map { case (expected, actual) => if (expected.dataType.sameType(actual.dataType) && expected.name == actual.name && @@ -368,15 +357,15 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { } } - if (newChildOutput == insert.child.output) { + if (newChildOutput == insert.query.output) { insert } else { - insert.copy(child = Project(newChildOutput, insert.child)) + insert.copy(query = Project(newChildOutput, insert.query)) } } def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(table, _, child, _, _) if table.resolved && child.resolved => + case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved => table match { case relation: CatalogRelation => val metadata = relation.catalogTable @@ -387,7 +376,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { case LogicalRelation(_: InsertableRelation, _, catalogTable) => val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown") preprocess(i, tblName, Nil) - case other => i + case _ => i } } } @@ -398,10 +387,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { object HiveOnlyCheck extends (LogicalPlan => Unit) { def apply(plan: LogicalPlan): Unit = { plan.foreach { - case CreateTable(tableDesc, _, Some(_)) if DDLUtils.isHiveTable(tableDesc) => - throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT") case CreateTable(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) => - throw new AnalysisException("Hive support is required to CREATE Hive TABLE") + throw new AnalysisException("Hive support is required to CREATE Hive TABLE (AS SELECT)") case _ => // OK } } @@ -410,63 +397,40 @@ object HiveOnlyCheck extends (LogicalPlan => Unit) { /** * A rule to do various checks before inserting into or writing to a data source table. */ -case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) - extends (LogicalPlan => Unit) { +object PreWriteCheck extends (LogicalPlan => Unit) { def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } def apply(plan: LogicalPlan): Unit = { plan.foreach { - case logical.InsertIntoTable( - l @ LogicalRelation(t: InsertableRelation, _, _), partition, query, _, _) => - // Right now, we do not support insert into a data source table with partition specs. - if (partition.nonEmpty) { - failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.") - } else { - // Get all input data source relations of the query. - val srcRelations = query.collect { - case LogicalRelation(src: BaseRelation, _, _) => src - } - if (srcRelations.contains(t)) { - failAnalysis( - "Cannot insert overwrite into table that is also being read from.") - } else { - // OK - } + case InsertIntoTable(l @ LogicalRelation(relation, _, _), partition, query, _, _) => + // Get all input data source relations of the query. + val srcRelations = query.collect { + case LogicalRelation(src, _, _) => src } - - case logical.InsertIntoTable( - LogicalRelation(r: HadoopFsRelation, _, _), part, query, _, _) => - // We need to make sure the partition columns specified by users do match partition - // columns of the relation. - val existingPartitionColumns = r.partitionSchema.fieldNames.toSet - val specifiedPartitionColumns = part.keySet - if (existingPartitionColumns != specifiedPartitionColumns) { - failAnalysis("Specified partition columns " + - s"(${specifiedPartitionColumns.mkString(", ")}) " + - "do not match the partition columns of the table. Please use " + - s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.") + if (srcRelations.contains(relation)) { + failAnalysis("Cannot insert into table that is also being read from.") } else { // OK } - PartitioningUtils.validatePartitionColumn( - r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis) + relation match { + case _: HadoopFsRelation => // OK - // Get all input data source relations of the query. - val srcRelations = query.collect { - case LogicalRelation(src: BaseRelation, _, _) => src - } - if (srcRelations.contains(r)) { - failAnalysis( - "Cannot insert overwrite into table that is also being read from.") - } else { - // OK + // Right now, we do not support insert into a non-file-based data source table with + // partition specs. + case _: InsertableRelation if partition.nonEmpty => + failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.") + + case _ => failAnalysis(s"$relation does not allow insertion.") } - case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) => - // The relation in l is not an InsertableRelation. - failAnalysis(s"$l does not allow insertion.") + case InsertIntoTable(t, _, _, _, _) + if !t.isInstanceOf[LeafNode] || + t.isInstanceOf[Range] || + t == OneRowRelation || + t.isInstanceOf[LocalRelation] => + failAnalysis(s"Inserting into an RDD-based table is not allowed.") case _ => // OK } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index a5ebe4780f..6908560511 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -118,12 +118,11 @@ private[sql] class SessionState(sparkSession: SparkSession) { new ResolveSQLOnFile(sparkSession) :: Nil override val postHocResolutionRules = - AnalyzeCreateTable(sparkSession) :: + PreprocessTableCreation(sparkSession) :: PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: Nil - override val extendedCheckRules = - Seq(PreWriteCheck(conf, catalog), HiveOnlyCheck) + override val extendedCheckRules = Seq(PreWriteCheck, HiveOnlyCheck) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index f6d1ee2287..bcb707c8fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1555,13 +1555,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { var e = intercept[AnalysisException] { sql("CREATE TABLE t SELECT 1 as a, 1 as b") }.getMessage - assert(e.contains("Hive support is required to use CREATE Hive TABLE AS SELECT")) + assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)")) spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1") e = intercept[AnalysisException] { sql("CREATE TABLE t SELECT a, b from t1") }.getMessage - assert(e.contains("Hive support is required to use CREATE Hive TABLE AS SELECT")) + assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 13284ba649..5b215ca07f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -113,7 +113,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { |INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt """.stripMargin) }.getMessage - assert(message.contains("the number of columns are different") + assert(message.contains("target table has 2 column(s) but the inserted data has 1 column(s)") ) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index faa76b73fd..677da0dbdc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -247,16 +247,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } override def apply(plan: LogicalPlan): LogicalPlan = { - if (!plan.resolved || plan.analyzed) { - return plan - } - plan transformUp { // Write path - case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) - // Inserting into partitioned table is not supported in Parquet data source (yet). - if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => - InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists) + case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Parquet data source (yet). + if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => + InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists) // Read path case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => @@ -285,16 +281,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } override def apply(plan: LogicalPlan): LogicalPlan = { - if (!plan.resolved || plan.analyzed) { - return plan - } - plan transformUp { // Write path - case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) - // Inserting into partitioned table is not supported in Orc data source (yet). - if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => - InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists) + case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Orc data source (yet). + if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => + InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists) // Read path case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 44ef5cce2e..c9be1b9d10 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -68,6 +68,8 @@ private[sql] class HiveSessionCatalog( // and HiveCatalog. We should still do it at some point... private val metastoreCatalog = new HiveMetastoreCatalog(sparkSession) + // These 2 rules must be run before all other DDL post-hoc resolution rules, i.e. + // `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`. val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 413712e0c6..273cf85df3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -60,20 +60,20 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override lazy val analyzer: Analyzer = { new Analyzer(catalog, conf) { override val extendedResolutionRules = - catalog.ParquetConversions :: - catalog.OrcConversions :: new ResolveHiveSerdeTable(sparkSession) :: new FindDataSourceTable(sparkSession) :: new FindHiveSerdeTable(sparkSession) :: new ResolveSQLOnFile(sparkSession) :: Nil override val postHocResolutionRules = - AnalyzeCreateTable(sparkSession) :: + catalog.ParquetConversions :: + catalog.OrcConversions :: + PreprocessTableCreation(sparkSession) :: PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: HiveAnalysis :: Nil - override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) + override val extendedCheckRules = Seq(PreWriteCheck) } } @@ -89,7 +89,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) experimentalMethods.extraStrategies ++ Seq( FileSourceStrategy, DataSourceStrategy, - DDLStrategy, SpecialLimits, InMemoryScans, HiveTableScans, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 0f293c21fa..f45532cc38 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion} +import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} +import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.HiveSerDe @@ -109,13 +109,17 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { /** * Replaces generic operations with specific variants that are designed to work with Hive. * - * Note that, this rule must be run after `PreprocessTableInsertion`. + * Note that, this rule must be run after `PreprocessTableCreation` and + * `PreprocessTableInsertion`. */ object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) => InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists) + case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => + CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) + case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => CreateHiveTableAsSelectCommand(tableDesc, query, mode) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 6f43b83607..0bd08877a3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -52,7 +52,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle private def analyzeCreateTable(sql: String): CatalogTable = { TestHive.sessionState.analyzer.execute(parser.parsePlan(sql)).collect { - case CreateTable(tableDesc, mode, _) => tableDesc + case CreateTableCommand(tableDesc, _) => tableDesc }.head } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index e3ddaf7254..71ce5a7c4a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -376,7 +376,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef val e = intercept[AnalysisException] { sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3") } - assert(e.message.contains("the number of columns are different")) + assert(e.message.contains( + "target table has 4 column(s) but the inserted data has 5 column(s)")) } testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index e9239ea56f..1a1b2571b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -307,13 +307,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") - df.queryExecution.sparkPlan match { - case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) => + df.queryExecution.analyzed match { + case cmd: InsertIntoHadoopFsRelationCommand => assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet")) case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[HadoopFsRelation ].getCanonicalName} and " + - s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan. " + - s"However, found a ${o.toString} ") + s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}") } checkAnswer( @@ -338,13 +336,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") - df.queryExecution.sparkPlan match { - case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) => + df.queryExecution.analyzed match { + case cmd: InsertIntoHadoopFsRelationCommand => assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet")) case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[HadoopFsRelation ].getCanonicalName} and " + - s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan." + - s"However, found a ${o.toString} ") + s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}") } checkAnswer( -- cgit v1.2.3