aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@appier.com>2015-11-04 10:56:32 -0800
committerYin Huai <yhuai@databricks.com>2015-11-04 10:56:32 -0800
commitde289bf279e14e47859b5fbcd70e97b9d0759f14 (patch)
treedfe89d28139e29428269e106fcab291da2f72764 /sql
parent987df4bfcafeca3633453c2d2f8e14d221fcef33 (diff)
downloadspark-de289bf279e14e47859b5fbcd70e97b9d0759f14.tar.gz
spark-de289bf279e14e47859b5fbcd70e97b9d0759f14.tar.bz2
spark-de289bf279e14e47859b5fbcd70e97b9d0759f14.zip
[SPARK-10304][SQL] Following up checking valid dir structure for partition discovery
This patch follows up #8840. Author: Liang-Chi Hsieh <viirya@appier.com> Closes #9459 from viirya/detect_invalid_part_dir_following.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala16
2 files changed, 29 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 16dc23661c..86bc3a1b6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -81,6 +81,8 @@ private[sql] object PartitioningUtils {
parsePartition(path, defaultPartitionName, typeInference)
}.unzip
+ // We create pairs of (path -> path's partition value) here
+ // If the corresponding partition value is None, the pair will be skiped
val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _))
if (pathsWithPartitionValues.isEmpty) {
@@ -89,11 +91,21 @@ private[sql] object PartitioningUtils {
} else {
// This dataset is partitioned. We need to check whether all partitions have the same
// partition columns and resolve potential type conflicts.
+
+ // Check if there is conflicting directory structure.
+ // For the paths such as:
+ // var paths = Seq(
+ // "hdfs://host:9000/invalidPath",
+ // "hdfs://host:9000/path/a=10/b=20",
+ // "hdfs://host:9000/path/a=10.5/b=hello")
+ // It will be recognised as conflicting directory structure:
+ // "hdfs://host:9000/invalidPath"
+ // "hdfs://host:9000/path"
val basePaths = optBasePaths.flatMap(x => x)
assert(
basePaths.distinct.size == 1,
"Conflicting directory structures detected. Suspicious paths:\b" +
- basePaths.mkString("\n\t", "\n\t", "\n\n"))
+ basePaths.distinct.mkString("\n\t", "\n\t", "\n\n"))
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 67b6a37fa5..61cc0da508 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -88,6 +88,22 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
+
+ // Invalid
+ // Conflicting directory structure:
+ // "hdfs://host:9000/tmp/tables/partitionedTable"
+ // "hdfs://host:9000/tmp/tables/nonPartitionedTable1"
+ // "hdfs://host:9000/tmp/tables/nonPartitionedTable2"
+ paths = Seq(
+ "hdfs://host:9000/tmp/tables/partitionedTable",
+ "hdfs://host:9000/tmp/tables/partitionedTable/p=1/",
+ "hdfs://host:9000/tmp/tables/nonPartitionedTable1",
+ "hdfs://host:9000/tmp/tables/nonPartitionedTable2")
+
+ exception = intercept[AssertionError] {
+ parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+ }
+ assert(exception.getMessage().contains("Conflicting directory structures detected"))
}
test("parse partition") {