diff options
Diffstat (limited to 'sql/core')
5 files changed, 37 insertions, 21 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5f17fdf946..d3273025b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.sql.execution.datasources @@ -432,7 +432,7 @@ case class DataSource( } val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis - PartitioningUtils.validatePartitionColumnDataTypes( + PartitioningUtils.validatePartitionColumn( data.schema, partitionColumns, caseSensitive) // If we are appending to a table that already exists, make sure the partitioning matches diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 74f2993754..2340ff0afe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -339,7 +339,7 @@ private[sql] object PartitioningUtils { private val upCastingOrder: Seq[DataType] = Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType) - def validatePartitionColumnDataTypes( + def validatePartitionColumn( schema: StructType, partitionColumns: Seq[String], caseSensitive: Boolean): Unit = { @@ -350,6 +350,10 @@ private[sql] object PartitioningUtils { case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column") } } + + if (partitionColumns.size == schema.fields.size) { + throw new AnalysisException(s"Cannot use all columns for partition columns") + } } def partitionColumnsSchema( @@ -359,7 +363,7 @@ private[sql] object PartitioningUtils { val equality = columnNameEquality(caseSensitive) StructType(partitionColumns.map { col => schema.find(f => equality(f.name, col)).getOrElse { - throw new RuntimeException(s"Partition column $col not found in schema $schema") + throw new AnalysisException(s"Partition column $col not found in schema $schema") } }).asNullable } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 9afd715016..7ac62fb191 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -154,7 +154,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // OK } - PartitioningUtils.validatePartitionColumnDataTypes( + PartitioningUtils.validatePartitionColumn( r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis) // Get all input data source relations of the query. @@ -205,7 +205,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // OK } - PartitioningUtils.validatePartitionColumnDataTypes( + PartitioningUtils.validatePartitionColumn( c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis) for { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index e191010329..efb04912d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -91,7 +91,7 @@ class FileStreamSinkWriter( hadoopConf: Configuration, options: Map[String, String]) extends Serializable with Logging { - PartitioningUtils.validatePartitionColumnDataTypes( + PartitioningUtils.validatePartitionColumn( data.schema, partitionColumnNames, data.sqlContext.conf.caseSensitiveAnalysis) private val serializableConf = new SerializableConfiguration(hadoopConf) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala index 431a943304..bf6063a4c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala @@ -572,4 +572,16 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { cq.awaitTermination(2000L) } + + test("prevent all column partitioning") { + withTempDir { dir => + val path = dir.getCanonicalPath + intercept[AnalysisException] { + spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path) + } + intercept[AnalysisException] { + spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path) + } + } + } } |