aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@appier.com>2015-11-03 07:41:50 -0800
committerDavies Liu <davies.liu@gmail.com>2015-11-03 07:41:50 -0800
commitd6035d97c91fe78b1336ade48134252915263ea6 (patch)
treeee75febb06b35ff99f3cb2111061770a92821c12 /sql
parent57446eb69ceb6b8856ab22b54abb22b47b80f841 (diff)
downloadspark-d6035d97c91fe78b1336ade48134252915263ea6.tar.gz
spark-d6035d97c91fe78b1336ade48134252915263ea6.tar.bz2
spark-d6035d97c91fe78b1336ade48134252915263ea6.zip
[SPARK-10304] [SQL] Partition discovery should throw an exception if the dir structure is invalid
JIRA: https://issues.apache.org/jira/browse/SPARK-10304 This patch detects if the structure of partition directories is not valid. The test cases are from #8547. Thanks zhzhan. cc liancheng Author: Liang-Chi Hsieh <viirya@appier.com> Closes #8840 from viirya/detect_invalid_part_dir.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala36
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala36
2 files changed, 59 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 0a2007e158..628c5e1893 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -77,9 +77,11 @@ private[sql] object PartitioningUtils {
defaultPartitionName: String,
typeInference: Boolean): PartitionSpec = {
// First, we need to parse every partition's path and see if we can find partition values.
- val pathsWithPartitionValues = paths.flatMap { path =>
- parsePartition(path, defaultPartitionName, typeInference).map(path -> _)
- }
+ val (partitionValues, optBasePaths) = paths.map { path =>
+ parsePartition(path, defaultPartitionName, typeInference)
+ }.unzip
+
+ val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _))
if (pathsWithPartitionValues.isEmpty) {
// This dataset is not partitioned.
@@ -87,6 +89,12 @@ private[sql] object PartitioningUtils {
} else {
// This dataset is partitioned. We need to check whether all partitions have the same
// partition columns and resolve potential type conflicts.
+ val basePaths = optBasePaths.flatMap(x => x)
+ assert(
+ basePaths.distinct.size == 1,
+ "Conflicting directory structures detected. Suspicious paths:\b" +
+ basePaths.mkString("\n\t", "\n\t", "\n\n"))
+
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
// Creates the StructType which represents the partition columns.
@@ -110,12 +118,12 @@ private[sql] object PartitioningUtils {
}
/**
- * Parses a single partition, returns column names and values of each partition column. For
- * example, given:
+ * Parses a single partition, returns column names and values of each partition column, also
+ * the base path. For example, given:
* {{{
* path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
* }}}
- * it returns:
+ * it returns the partition:
* {{{
* PartitionValues(
* Seq("a", "b", "c"),
@@ -124,34 +132,40 @@ private[sql] object PartitioningUtils {
* Literal.create("hello", StringType),
* Literal.create(3.14, FloatType)))
* }}}
+ * and the base path:
+ * {{{
+ * /path/to/partition
+ * }}}
*/
private[sql] def parsePartition(
path: Path,
defaultPartitionName: String,
- typeInference: Boolean): Option[PartitionValues] = {
+ typeInference: Boolean): (Option[PartitionValues], Option[Path]) = {
val columns = ArrayBuffer.empty[(String, Literal)]
// Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null
var chopped = path
+ var basePath = path
while (!finished) {
// Sometimes (e.g., when speculative task is enabled), temporary directories may be left
// uncleaned. Here we simply ignore them.
if (chopped.getName.toLowerCase == "_temporary") {
- return None
+ return (None, None)
}
val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
maybeColumn.foreach(columns += _)
+ basePath = chopped
chopped = chopped.getParent
- finished = maybeColumn.isEmpty || chopped.getParent == null
+ finished = (maybeColumn.isEmpty && !columns.isEmpty) || chopped.getParent == null
}
if (columns.isEmpty) {
- None
+ (None, Some(path))
} else {
val (columnNames, values) = columns.reverse.unzip
- Some(PartitionValues(columnNames, values))
+ (Some(PartitionValues(columnNames, values)), Some(basePath))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 3a23b8ed66..67b6a37fa5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -58,14 +58,46 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
check(defaultPartitionName, Literal.create(null, NullType))
}
+ test("parse invalid partitioned directories") {
+ // Invalid
+ var paths = Seq(
+ "hdfs://host:9000/invalidPath",
+ "hdfs://host:9000/path/a=10/b=20",
+ "hdfs://host:9000/path/a=10.5/b=hello")
+
+ var exception = intercept[AssertionError] {
+ parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+ }
+ assert(exception.getMessage().contains("Conflicting directory structures detected"))
+
+ // Valid
+ paths = Seq(
+ "hdfs://host:9000/path/_temporary",
+ "hdfs://host:9000/path/a=10/b=20",
+ "hdfs://host:9000/path/_temporary/path")
+
+ parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+
+ // Invalid
+ paths = Seq(
+ "hdfs://host:9000/path/_temporary",
+ "hdfs://host:9000/path/a=10/b=20",
+ "hdfs://host:9000/path/path1")
+
+ exception = intercept[AssertionError] {
+ parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+ }
+ assert(exception.getMessage().contains("Conflicting directory structures detected"))
+ }
+
test("parse partition") {
def check(path: String, expected: Option[PartitionValues]): Unit = {
- assert(expected === parsePartition(new Path(path), defaultPartitionName, true))
+ assert(expected === parsePartition(new Path(path), defaultPartitionName, true)._1)
}
def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
val message = intercept[T] {
- parsePartition(new Path(path), defaultPartitionName, true).get
+ parsePartition(new Path(path), defaultPartitionName, true)
}.getMessage
assert(message.contains(expected))