diff options
author | Liang-Chi Hsieh <viirya@appier.com> | 2015-11-04 10:56:32 -0800 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-11-04 10:56:32 -0800 |
commit | de289bf279e14e47859b5fbcd70e97b9d0759f14 (patch) | |
tree | dfe89d28139e29428269e106fcab291da2f72764 /sql | |
parent | 987df4bfcafeca3633453c2d2f8e14d221fcef33 (diff) | |
download | spark-de289bf279e14e47859b5fbcd70e97b9d0759f14.tar.gz spark-de289bf279e14e47859b5fbcd70e97b9d0759f14.tar.bz2 spark-de289bf279e14e47859b5fbcd70e97b9d0759f14.zip |
[SPARK-10304][SQL] Following up checking valid dir structure for partition discovery
This patch follows up #8840.
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes #9459 from viirya/detect_invalid_part_dir_following.
Diffstat (limited to 'sql')
2 files changed, 29 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 16dc23661c..86bc3a1b6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -81,6 +81,8 @@ private[sql] object PartitioningUtils { parsePartition(path, defaultPartitionName, typeInference) }.unzip + // We create pairs of (path -> path's partition value) here + // If the corresponding partition value is None, the pair will be skiped val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _)) if (pathsWithPartitionValues.isEmpty) { @@ -89,11 +91,21 @@ private[sql] object PartitioningUtils { } else { // This dataset is partitioned. We need to check whether all partitions have the same // partition columns and resolve potential type conflicts. + + // Check if there is conflicting directory structure. + // For the paths such as: + // var paths = Seq( + // "hdfs://host:9000/invalidPath", + // "hdfs://host:9000/path/a=10/b=20", + // "hdfs://host:9000/path/a=10.5/b=hello") + // It will be recognised as conflicting directory structure: + // "hdfs://host:9000/invalidPath" + // "hdfs://host:9000/path" val basePaths = optBasePaths.flatMap(x => x) assert( basePaths.distinct.size == 1, "Conflicting directory structures detected. Suspicious paths:\b" + - basePaths.mkString("\n\t", "\n\t", "\n\n")) + basePaths.distinct.mkString("\n\t", "\n\t", "\n\n")) val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 67b6a37fa5..61cc0da508 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -88,6 +88,22 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) } assert(exception.getMessage().contains("Conflicting directory structures detected")) + + // Invalid + // Conflicting directory structure: + // "hdfs://host:9000/tmp/tables/partitionedTable" + // "hdfs://host:9000/tmp/tables/nonPartitionedTable1" + // "hdfs://host:9000/tmp/tables/nonPartitionedTable2" + paths = Seq( + "hdfs://host:9000/tmp/tables/partitionedTable", + "hdfs://host:9000/tmp/tables/partitionedTable/p=1/", + "hdfs://host:9000/tmp/tables/nonPartitionedTable1", + "hdfs://host:9000/tmp/tables/nonPartitionedTable2") + + exception = intercept[AssertionError] { + parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) + } + assert(exception.getMessage().contains("Conflicting directory structures detected")) } test("parse partition") { |