aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-06-10 12:43:27 -0700
committerMichael Armbrust <michael@databricks.com>2016-06-10 12:43:27 -0700
commit2413fce9d6812a91eeffb4435c2b5b361d23214b (patch)
tree91d746b031e47725765e5e82f314de3970f56132
parent7d7a0a5e0749909e97d90188707cc9220a1bb73a (diff)
downloadspark-2413fce9d6812a91eeffb4435c2b5b361d23214b.tar.gz
spark-2413fce9d6812a91eeffb4435c2b5b361d23214b.tar.bz2
spark-2413fce9d6812a91eeffb4435c2b5b361d23214b.zip
[SPARK-15743][SQL] Prevent saving with all-column partitioning
## What changes were proposed in this pull request? When saving datasets on storage, `partitionBy` provides an easy way to construct the directory structure. However, if a user choose all columns as partition columns, some exceptions occurs. - **ORC with all column partitioning**: `AnalysisException` on **future read** due to schema inference failure. ```scala scala> spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data") scala> spark.read.format("orc").load("/tmp/data").collect() org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC at /tmp/data. It must be specified manually; ``` - **Parquet with all-column partitioning**: `InvalidSchemaException` on **write execution** due to Parquet limitation. ```scala scala> spark.range(100).write.format("parquet").mode("overwrite").partitionBy("id").save("/tmp/data") [Stage 0:> (0 + 8) / 8]16/06/02 16:51:17 ERROR Utils: Aborting task org.apache.parquet.schema.InvalidSchemaException: A group type can not be empty. Parquet does not support empty group without leaves. Empty group: spark_schema ... (lots of error messages) ``` Although some formats like JSON support all-column partitioning without any problem, it seems not a good idea to make lots of empty directories. This PR prevents saving with all-column partitioning by consistently raising `AnalysisException` before executing save operation. ## How was this patch tested? Newly added `PartitioningUtilsSuite`. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13486 from dongjoon-hyun/SPARK-15743.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala32
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala12
5 files changed, 37 insertions, 21 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5f17fdf946..d3273025b6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.spark.sql.execution.datasources
@@ -432,7 +432,7 @@ case class DataSource(
}
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
- PartitioningUtils.validatePartitionColumnDataTypes(
+ PartitioningUtils.validatePartitionColumn(
data.schema, partitionColumns, caseSensitive)
// If we are appending to a table that already exists, make sure the partitioning matches
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 74f2993754..2340ff0afe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -339,7 +339,7 @@ private[sql] object PartitioningUtils {
private val upCastingOrder: Seq[DataType] =
Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
- def validatePartitionColumnDataTypes(
+ def validatePartitionColumn(
schema: StructType,
partitionColumns: Seq[String],
caseSensitive: Boolean): Unit = {
@@ -350,6 +350,10 @@ private[sql] object PartitioningUtils {
case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column")
}
}
+
+ if (partitionColumns.size == schema.fields.size) {
+ throw new AnalysisException(s"Cannot use all columns for partition columns")
+ }
}
def partitionColumnsSchema(
@@ -359,7 +363,7 @@ private[sql] object PartitioningUtils {
val equality = columnNameEquality(caseSensitive)
StructType(partitionColumns.map { col =>
schema.find(f => equality(f.name, col)).getOrElse {
- throw new RuntimeException(s"Partition column $col not found in schema $schema")
+ throw new AnalysisException(s"Partition column $col not found in schema $schema")
}
}).asNullable
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 9afd715016..7ac62fb191 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -154,7 +154,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
// OK
}
- PartitioningUtils.validatePartitionColumnDataTypes(
+ PartitioningUtils.validatePartitionColumn(
r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis)
// Get all input data source relations of the query.
@@ -205,7 +205,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
// OK
}
- PartitioningUtils.validatePartitionColumnDataTypes(
+ PartitioningUtils.validatePartitionColumn(
c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
for {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index e191010329..efb04912d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -91,7 +91,7 @@ class FileStreamSinkWriter(
hadoopConf: Configuration,
options: Map[String, String]) extends Serializable with Logging {
- PartitioningUtils.validatePartitionColumnDataTypes(
+ PartitioningUtils.validatePartitionColumn(
data.schema, partitionColumnNames, data.sqlContext.conf.caseSensitiveAnalysis)
private val serializableConf = new SerializableConfiguration(hadoopConf)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index 431a943304..bf6063a4c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -572,4 +572,16 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
cq.awaitTermination(2000L)
}
+
+ test("prevent all column partitioning") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+ intercept[AnalysisException] {
+ spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path)
+ }
+ intercept[AnalysisException] {
+ spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path)
+ }
+ }
+ }
}