aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-02-07 00:36:57 +0800
committerWenchen Fan <wenchen@databricks.com>2017-02-07 00:36:57 +0800
commitaff53021cf828cd7c139d8ec230d45593078b73a (patch)
treed5add8b21cf710eb30329b040bbcd3ee721fd00c /sql/hive
parent0f16ff5b0ec8cd828774ba5ddb276d7b06dbe273 (diff)
downloadspark-aff53021cf828cd7c139d8ec230d45593078b73a.tar.gz
spark-aff53021cf828cd7c139d8ec230d45593078b73a.tar.bz2
spark-aff53021cf828cd7c139d8ec230d45593078b73a.zip
[SPARK-19080][SQL] simplify data source analysis
## What changes were proposed in this pull request? The current way of resolving `InsertIntoTable` and `CreateTable` is convoluted: sometimes we replace them with concrete implementation commands during analysis, sometimes during planning phase. And the error checking logic is also a mess: we may put it in extended analyzer rules, or extended checking rules, or `CheckAnalysis`. This PR simplifies the data source analysis: 1. `InsertIntoTable` and `CreateTable` are always unresolved and need to be replaced by concrete implementation commands during analysis. 2. The error checking logic is mainly in 2 rules: `PreprocessTableCreation` and `PreprocessTableInsertion`. ## How was this patch tested? existing test. Author: Wenchen Fan <wenchen@databricks.com> Closes #16269 from cloud-fan/ddl.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala24
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala16
7 files changed, 30 insertions, 36 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index faa76b73fd..677da0dbdc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -247,16 +247,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
override def apply(plan: LogicalPlan): LogicalPlan = {
- if (!plan.resolved || plan.analyzed) {
- return plan
- }
-
plan transformUp {
// Write path
- case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
- // Inserting into partitioned table is not supported in Parquet data source (yet).
- if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
- InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists)
+ case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists)
+ // Inserting into partitioned table is not supported in Parquet data source (yet).
+ if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
+ InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists)
// Read path
case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) =>
@@ -285,16 +281,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
override def apply(plan: LogicalPlan): LogicalPlan = {
- if (!plan.resolved || plan.analyzed) {
- return plan
- }
-
plan transformUp {
// Write path
- case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
- // Inserting into partitioned table is not supported in Orc data source (yet).
- if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
- InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists)
+ case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists)
+ // Inserting into partitioned table is not supported in Orc data source (yet).
+ if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
+ InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists)
// Read path
case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 44ef5cce2e..c9be1b9d10 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -68,6 +68,8 @@ private[sql] class HiveSessionCatalog(
// and HiveCatalog. We should still do it at some point...
private val metastoreCatalog = new HiveMetastoreCatalog(sparkSession)
+ // These 2 rules must be run before all other DDL post-hoc resolution rules, i.e.
+ // `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 413712e0c6..273cf85df3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -60,20 +60,20 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
override lazy val analyzer: Analyzer = {
new Analyzer(catalog, conf) {
override val extendedResolutionRules =
- catalog.ParquetConversions ::
- catalog.OrcConversions ::
new ResolveHiveSerdeTable(sparkSession) ::
new FindDataSourceTable(sparkSession) ::
new FindHiveSerdeTable(sparkSession) ::
new ResolveSQLOnFile(sparkSession) :: Nil
override val postHocResolutionRules =
- AnalyzeCreateTable(sparkSession) ::
+ catalog.ParquetConversions ::
+ catalog.OrcConversions ::
+ PreprocessTableCreation(sparkSession) ::
PreprocessTableInsertion(conf) ::
DataSourceAnalysis(conf) ::
HiveAnalysis :: Nil
- override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
+ override val extendedCheckRules = Seq(PreWriteCheck)
}
}
@@ -89,7 +89,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
experimentalMethods.extraStrategies ++ Seq(
FileSourceStrategy,
DataSourceStrategy,
- DDLStrategy,
SpecialLimits,
InMemoryScans,
HiveTableScans,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 0f293c21fa..f45532cc38 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion}
+import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
+import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.HiveSerDe
@@ -109,13 +109,17 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
/**
* Replaces generic operations with specific variants that are designed to work with Hive.
*
- * Note that, this rule must be run after `PreprocessTableInsertion`.
+ * Note that, this rule must be run after `PreprocessTableCreation` and
+ * `PreprocessTableInsertion`.
*/
object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) =>
InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
+ case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
+ CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
+
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
CreateHiveTableAsSelectCommand(tableDesc, query, mode)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 6f43b83607..0bd08877a3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -52,7 +52,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
private def analyzeCreateTable(sql: String): CatalogTable = {
TestHive.sessionState.analyzer.execute(parser.parsePlan(sql)).collect {
- case CreateTable(tableDesc, mode, _) => tableDesc
+ case CreateTableCommand(tableDesc, _) => tableDesc
}.head
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index e3ddaf7254..71ce5a7c4a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -376,7 +376,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
val e = intercept[AnalysisException] {
sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3")
}
- assert(e.message.contains("the number of columns are different"))
+ assert(e.message.contains(
+ "target table has 4 column(s) but the inserted data has 5 column(s)"))
}
testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index e9239ea56f..1a1b2571b6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -307,13 +307,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin)
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
- df.queryExecution.sparkPlan match {
- case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
+ df.queryExecution.analyzed match {
+ case cmd: InsertIntoHadoopFsRelationCommand =>
assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
- s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan. " +
- s"However, found a ${o.toString} ")
+ s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}")
}
checkAnswer(
@@ -338,13 +336,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin)
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
- df.queryExecution.sparkPlan match {
- case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
+ df.queryExecution.analyzed match {
+ case cmd: InsertIntoHadoopFsRelationCommand =>
assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
- s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan." +
- s"However, found a ${o.toString} ")
+ s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}")
}
checkAnswer(