diff options
Diffstat (limited to 'sql/hive/src')
7 files changed, 30 insertions, 36 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index faa76b73fd..677da0dbdc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -247,16 +247,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } override def apply(plan: LogicalPlan): LogicalPlan = { - if (!plan.resolved || plan.analyzed) { - return plan - } - plan transformUp { // Write path - case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) - // Inserting into partitioned table is not supported in Parquet data source (yet). - if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => - InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists) + case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Parquet data source (yet). + if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => + InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists) // Read path case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => @@ -285,16 +281,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } override def apply(plan: LogicalPlan): LogicalPlan = { - if (!plan.resolved || plan.analyzed) { - return plan - } - plan transformUp { // Write path - case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) - // Inserting into partitioned table is not supported in Orc data source (yet). - if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => - InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists) + case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Orc data source (yet). + if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => + InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists) // Read path case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 44ef5cce2e..c9be1b9d10 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -68,6 +68,8 @@ private[sql] class HiveSessionCatalog( // and HiveCatalog. We should still do it at some point... private val metastoreCatalog = new HiveMetastoreCatalog(sparkSession) + // These 2 rules must be run before all other DDL post-hoc resolution rules, i.e. + // `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`. val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 413712e0c6..273cf85df3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -60,20 +60,20 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override lazy val analyzer: Analyzer = { new Analyzer(catalog, conf) { override val extendedResolutionRules = - catalog.ParquetConversions :: - catalog.OrcConversions :: new ResolveHiveSerdeTable(sparkSession) :: new FindDataSourceTable(sparkSession) :: new FindHiveSerdeTable(sparkSession) :: new ResolveSQLOnFile(sparkSession) :: Nil override val postHocResolutionRules = - AnalyzeCreateTable(sparkSession) :: + catalog.ParquetConversions :: + catalog.OrcConversions :: + PreprocessTableCreation(sparkSession) :: PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: HiveAnalysis :: Nil - override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) + override val extendedCheckRules = Seq(PreWriteCheck) } } @@ -89,7 +89,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) experimentalMethods.extraStrategies ++ Seq( FileSourceStrategy, DataSourceStrategy, - DDLStrategy, SpecialLimits, InMemoryScans, HiveTableScans, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 0f293c21fa..f45532cc38 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion} +import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} +import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.HiveSerDe @@ -109,13 +109,17 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { /** * Replaces generic operations with specific variants that are designed to work with Hive. * - * Note that, this rule must be run after `PreprocessTableInsertion`. + * Note that, this rule must be run after `PreprocessTableCreation` and + * `PreprocessTableInsertion`. */ object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) => InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists) + case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => + CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) + case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => CreateHiveTableAsSelectCommand(tableDesc, query, mode) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 6f43b83607..0bd08877a3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -52,7 +52,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle private def analyzeCreateTable(sql: String): CatalogTable = { TestHive.sessionState.analyzer.execute(parser.parsePlan(sql)).collect { - case CreateTable(tableDesc, mode, _) => tableDesc + case CreateTableCommand(tableDesc, _) => tableDesc }.head } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index e3ddaf7254..71ce5a7c4a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -376,7 +376,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef val e = intercept[AnalysisException] { sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3") } - assert(e.message.contains("the number of columns are different")) + assert(e.message.contains( + "target table has 4 column(s) but the inserted data has 5 column(s)")) } testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index e9239ea56f..1a1b2571b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -307,13 +307,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") - df.queryExecution.sparkPlan match { - case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) => + df.queryExecution.analyzed match { + case cmd: InsertIntoHadoopFsRelationCommand => assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet")) case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[HadoopFsRelation ].getCanonicalName} and " + - s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan. " + - s"However, found a ${o.toString} ") + s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}") } checkAnswer( @@ -338,13 +336,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { """.stripMargin) val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") - df.queryExecution.sparkPlan match { - case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) => + df.queryExecution.analyzed match { + case cmd: InsertIntoHadoopFsRelationCommand => assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet")) case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[HadoopFsRelation ].getCanonicalName} and " + - s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan." + - s"However, found a ${o.toString} ") + s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}") } checkAnswer( |