diff options
author | Yin Huai <yhuai@databricks.com> | 2016-06-19 21:45:53 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-06-19 21:45:53 -0700 |
commit | 6d0f921aedfdd3b7e8472b6776d0c7d8299190bd (patch) | |
tree | 29f6f34219596d37c44927ff386a77550854bf41 /sql/core | |
parent | 4f17fddcd57adeae0d7e31bd14423283d4b625e9 (diff) | |
download | spark-6d0f921aedfdd3b7e8472b6776d0c7d8299190bd.tar.gz spark-6d0f921aedfdd3b7e8472b6776d0c7d8299190bd.tar.bz2 spark-6d0f921aedfdd3b7e8472b6776d0c7d8299190bd.zip |
[SPARK-16036][SPARK-16037][SPARK-16034][SQL] Follow up code clean up and improvement
## What changes were proposed in this pull request?
This PR is the follow-up PR for https://github.com/apache/spark/pull/13754/files and https://github.com/apache/spark/pull/13749. I will comment inline to explain my changes.
## How was this patch tested?
Existing tests.
Author: Yin Huai <yhuai@databricks.com>
Closes #13766 from yhuai/caseSensitivity.
Diffstat (limited to 'sql/core')
5 files changed, 40 insertions, 28 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e6fc9749c7..ca3972d62d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -245,29 +245,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { if (partitioningColumns.isDefined) { throw new AnalysisException( "insertInto() can't be used together with partitionBy(). " + - "Partition columns are defined by the table into which is being inserted." + "Partition columns have already be defined for the table. " + + "It is not necessary to use partitionBy()." ) } - val partitions = normalizedParCols.map(_.map(col => col -> Option.empty[String]).toMap) - val overwrite = mode == SaveMode.Overwrite - - // A partitioned relation's schema can be different from the input logicalPlan, since - // partition columns are all moved after data columns. We Project to adjust the ordering. - // TODO: this belongs to the analyzer. - val input = normalizedParCols.map { parCols => - val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr => - parCols.contains(attr.name) - } - Project(inputDataCols ++ inputPartCols, df.logicalPlan) - }.getOrElse(df.logicalPlan) - df.sparkSession.sessionState.executePlan( InsertIntoTable( - UnresolvedRelation(tableIdent), - partitions.getOrElse(Map.empty[String, Option[String]]), - input, - overwrite, + table = UnresolvedRelation(tableIdent), + partition = Map.empty[String, Option[String]], + child = df.logicalPlan, + overwrite = mode == SaveMode.Overwrite, ifNotExists = false)).toRdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index f274fc77da..557445c2bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -435,7 +435,7 @@ case class DataSource( // If we are appending to a table that already exists, make sure the partitioning matches // up. If we fail to load the table for whatever reason, ignore the check. if (mode == SaveMode.Append) { - val existingColumns = Try { + val existingPartitionColumns = Try { resolveRelation() .asInstanceOf[HadoopFsRelation] .location @@ -444,13 +444,14 @@ case class DataSource( .fieldNames .toSeq }.getOrElse(Seq.empty[String]) + // TODO: Case sensitivity. val sameColumns = - existingColumns.map(_.toLowerCase) == partitionColumns.map(_.toLowerCase) - if (existingColumns.size > 0 && !sameColumns) { + existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase()) + if (existingPartitionColumns.size > 0 && !sameColumns) { throw new AnalysisException( s"""Requested partitioning does not match existing partitioning. |Existing partitioning columns: - | ${existingColumns.mkString(", ")} + | ${existingPartitionColumns.mkString(", ")} |Requested partitioning columns: | ${partitionColumns.mkString(", ")} |""".stripMargin) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 5963c53a1b..10425af3e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -67,7 +67,7 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo * table. It also does data type casting and field renaming, to make sure that the columns to be * inserted have the correct data type and fields have the correct names. */ -private[sql] object PreprocessTableInsertion extends Rule[LogicalPlan] { +private[sql] case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { private def preprocess( insert: InsertIntoTable, tblName: String, @@ -84,7 +84,13 @@ private[sql] object PreprocessTableInsertion extends Rule[LogicalPlan] { if (insert.partition.nonEmpty) { // the query's partitioning must match the table's partitioning // this is set for queries like: insert into ... partition (one = "a", two = <expr>) - if (insert.partition.keySet != partColNames.toSet) { + val samePartitionColumns = + if (conf.caseSensitiveAnalysis) { + insert.partition.keySet == partColNames.toSet + } else { + insert.partition.keySet.map(_.toLowerCase) == partColNames.map(_.toLowerCase).toSet + } + if (!samePartitionColumns) { throw new AnalysisException( s""" |Requested partitioning does not match the table $tblName: @@ -94,7 +100,8 @@ private[sql] object PreprocessTableInsertion extends Rule[LogicalPlan] { } expectedColumns.map(castAndRenameChildOutput(insert, _)).getOrElse(insert) } else { - // All partition columns are dynamic because this InsertIntoTable had no partitioning + // All partition columns are dynamic because because the InsertIntoTable command does + // not explicitly specify partitioning columns. expectedColumns.map(castAndRenameChildOutput(insert, _)).getOrElse(insert) .copy(partition = partColNames.map(_ -> None).toMap) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index b033e19ddf..5300cfa8a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -111,7 +111,7 @@ private[sql] class SessionState(sparkSession: SparkSession) { lazy val analyzer: Analyzer = { new Analyzer(catalog, conf) { override val extendedResolutionRules = - PreprocessTableInsertion :: + PreprocessTableInsertion(conf) :: new FindDataSourceTable(sparkSession) :: DataSourceAnalysis :: (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 8827649d0a..f40ddcc95a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1337,8 +1337,24 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(sql("select * from partitionedTable").collect().size == 1) // Inserts new data successfully when partition columns are correctly specified in // partitionBy(...). - df.write.mode("append").partitionBy("a", "b").saveAsTable("partitionedTable") - assert(sql("select * from partitionedTable").collect().size == 2) + // TODO: Right now, partition columns are always treated in a case-insensitive way. + // See the write method in DataSource.scala. + Seq((4, 5, 6)).toDF("a", "B", "c") + .write + .mode("append") + .partitionBy("a", "B") + .saveAsTable("partitionedTable") + + Seq((7, 8, 9)).toDF("a", "b", "c") + .write + .mode("append") + .partitionBy("a", "b") + .saveAsTable("partitionedTable") + + checkAnswer( + sql("select a, b, c from partitionedTable"), + Row(1, 2, 3) :: Row(4, 5, 6) :: Row(7, 8, 9) :: Nil + ) } } } |