aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala16
2 files changed, 29 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 16dc23661c..86bc3a1b6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -81,6 +81,8 @@ private[sql] object PartitioningUtils {
parsePartition(path, defaultPartitionName, typeInference)
}.unzip
+ // We create pairs of (path -> path's partition value) here
+ // If the corresponding partition value is None, the pair will be skiped
val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _))
if (pathsWithPartitionValues.isEmpty) {
@@ -89,11 +91,21 @@ private[sql] object PartitioningUtils {
} else {
// This dataset is partitioned. We need to check whether all partitions have the same
// partition columns and resolve potential type conflicts.
+
+ // Check if there is conflicting directory structure.
+ // For the paths such as:
+ // var paths = Seq(
+ // "hdfs://host:9000/invalidPath",
+ // "hdfs://host:9000/path/a=10/b=20",
+ // "hdfs://host:9000/path/a=10.5/b=hello")
+ // It will be recognised as conflicting directory structure:
+ // "hdfs://host:9000/invalidPath"
+ // "hdfs://host:9000/path"
val basePaths = optBasePaths.flatMap(x => x)
assert(
basePaths.distinct.size == 1,
"Conflicting directory structures detected. Suspicious paths:\b" +
- basePaths.mkString("\n\t", "\n\t", "\n\n"))
+ basePaths.distinct.mkString("\n\t", "\n\t", "\n\n"))
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 67b6a37fa5..61cc0da508 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -88,6 +88,22 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
+
+ // Invalid
+ // Conflicting directory structure:
+ // "hdfs://host:9000/tmp/tables/partitionedTable"
+ // "hdfs://host:9000/tmp/tables/nonPartitionedTable1"
+ // "hdfs://host:9000/tmp/tables/nonPartitionedTable2"
+ paths = Seq(
+ "hdfs://host:9000/tmp/tables/partitionedTable",
+ "hdfs://host:9000/tmp/tables/partitionedTable/p=1/",
+ "hdfs://host:9000/tmp/tables/nonPartitionedTable1",
+ "hdfs://host:9000/tmp/tables/nonPartitionedTable2")
+
+ exception = intercept[AssertionError] {
+ parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+ }
+ assert(exception.getMessage().contains("Conflicting directory structures detected"))
}
test("parse partition") {