aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala36
1 files changed, 3 insertions, 33 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 29afe5751b..ecfcafe69c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -104,20 +104,12 @@ case class DataSource(
* dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use
* this information, therefore calls to this method should be very cheap, i.e. there won't
* be any further inference in any triggers.
- * 4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve the
- * existing table's partitioning scheme. This is achieved by not providing
- * `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for an early
- * exit, if we don't care about the schema of the original table.
*
* @param format the file format object for this DataSource
- * @param justPartitioning Whether to exit early and provide just the schema partitioning.
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
- * columns. If `justPartitioning` is `true`, then the dataSchema will be provided as
- * `null`.
+ * columns.
*/
- private def getOrInferFileFormatSchema(
- format: FileFormat,
- justPartitioning: Boolean = false): (StructType, StructType) = {
+ private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = {
// the operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
@@ -174,9 +166,7 @@ case class DataSource(
StructType(partitionFields)
}
}
- if (justPartitioning) {
- return (null, partitionSchema)
- }
+
val dataSchema = userSpecifiedSchema.map { schema =>
val equality = sparkSession.sessionState.conf.resolver
StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))
@@ -434,26 +424,6 @@ case class DataSource(
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
- // If we are appending to a table that already exists, make sure the partitioning matches
- // up. If we fail to load the table for whatever reason, ignore the check.
- if (mode == SaveMode.Append) {
- val existingPartitionColumns = Try {
- getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
- }.getOrElse(Seq.empty[String])
- // TODO: Case sensitivity.
- val sameColumns =
- existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
- if (existingPartitionColumns.nonEmpty && !sameColumns) {
- throw new AnalysisException(
- s"""Requested partitioning does not match existing partitioning.
- |Existing partitioning columns:
- | ${existingPartitionColumns.mkString(", ")}
- |Requested partitioning columns:
- | ${partitionColumns.mkString(", ")}
- |""".stripMargin)
- }
- }
-
// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
// not need to have the query as child, to avoid to analyze an optimized query,
// because InsertIntoHadoopFsRelationCommand will be optimized first.