aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-06-24 02:17:12 -0700
committerCheng Lian <lian@databricks.com>2015-06-24 02:17:12 -0700
commitcc465fd92482737c21971d82e30d4cf247acf932 (patch)
treef95d331889694cb87836c5a99b21e9072e38566e /sql/core
parent09fcf96b8f881988a4bc7fe26a3f6ed12dfb6adb (diff)
downloadspark-cc465fd92482737c21971d82e30d4cf247acf932.tar.gz
spark-cc465fd92482737c21971d82e30d4cf247acf932.tar.bz2
spark-cc465fd92482737c21971d82e30d4cf247acf932.zip
[SPARK-8138] [SQL] Improves error message when conflicting partition columns are found
This PR improves the error message shown when conflicting partition column names are detected. This can be particularly annoying and confusing when there are a large number of partitions while a handful of them happened to contain unexpected temporary file(s). Now all suspicious directories are listed as below: ``` java.lang.AssertionError: assertion failed: Conflicting partition column names detected: Partition column name list #0: b, c, d Partition column name list #1: b, c Partition column name list #2: b For partitioned table directories, data files should only live in leaf directories. Please check the following directories for unexpected files: file:/tmp/foo/b=0 file:/tmp/foo/b=1 file:/tmp/foo/b=1/c=1 file:/tmp/foo/b=0/c=0 ``` Author: Cheng Lian <lian@databricks.com> Closes #6610 from liancheng/part-errmsg and squashes the following commits: 7d05f2c [Cheng Lian] Fixes Scala style issue a149250 [Cheng Lian] Adds test case for the error message 6b74dd8 [Cheng Lian] Also lists suspicious non-leaf partition directories a935eb8 [Cheng Lian] Improves error message when conflicting partition columns are found
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala47
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala45
2 files changed, 82 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index c6f535dde7..8b2a45d8e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -84,7 +84,7 @@ private[sql] object PartitioningUtils {
} else {
// This dataset is partitioned. We need to check whether all partitions have the same
// partition columns and resolve potential type conflicts.
- val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues.map(_._2))
+ val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
// Creates the StructType which represents the partition columns.
val fields = {
@@ -181,19 +181,18 @@ private[sql] object PartitioningUtils {
* StringType
* }}}
*/
- private[sql] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
- // Column names of all partitions must match
- val distinctPartitionsColNames = values.map(_.columnNames).distinct
-
- if (distinctPartitionsColNames.isEmpty) {
+ private[sql] def resolvePartitions(
+ pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
+ if (pathsWithPartitionValues.isEmpty) {
Seq.empty
} else {
- assert(distinctPartitionsColNames.size == 1, {
- val list = distinctPartitionsColNames.mkString("\t", "\n\t", "")
- s"Conflicting partition column names detected:\n$list"
- })
+ val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct
+ assert(
+ distinctPartColNames.size == 1,
+ listConflictingPartitionColumns(pathsWithPartitionValues))
// Resolves possible type conflicts for each column
+ val values = pathsWithPartitionValues.map(_._2)
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
@@ -206,6 +205,34 @@ private[sql] object PartitioningUtils {
}
}
+ private[sql] def listConflictingPartitionColumns(
+ pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
+ val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct
+
+ def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
+ seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value })
+
+ val partColNamesToPaths = groupByKey(pathWithPartitionValues.map {
+ case (path, partValues) => partValues.columnNames -> path
+ })
+
+ val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map {
+ case (names, index) =>
+ s"Partition column name list #$index: $names"
+ }
+
+ // Lists out those non-leaf partition directories that also contain files
+ val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)
+
+ s"Conflicting partition column names detected:\n" +
+ distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
+ "For partitioned table directories, data files should only live in leaf directories.\n" +
+ "And directories at the same level should have the same partition column name.\n" +
+ "Please check the following directories for unexpected files or " +
+ "inconsistent partition column names:\n" +
+ suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
+ }
+
/**
* Converts a string to a [[Literal]] with automatic type inference. Currently only supports
* [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 01df189d1f..d0ebb11b06 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -538,4 +538,49 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
checkAnswer(sqlContext.read.format("parquet").load(dir.getCanonicalPath), df)
}
}
+
+ test("listConflictingPartitionColumns") {
+ def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = {
+ val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>
+ s"\tPartition column name list #$index: $list"
+ }.mkString("\n", "\n", "\n")
+
+ // scalastyle:off
+ s"""Conflicting partition column names detected:
+ |$conflictingColNameLists
+ |For partitioned table directories, data files should only live in leaf directories.
+ |And directories at the same level should have the same partition column name.
+ |Please check the following directories for unexpected files or inconsistent partition column names:
+ |${paths.map("\t" + _).mkString("\n", "\n", "")}
+ """.stripMargin.trim
+ // scalastyle:on
+ }
+
+ assert(
+ listConflictingPartitionColumns(
+ Seq(
+ (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))),
+ (new Path("file:/tmp/foo/b=1"), PartitionValues(Seq("b"), Seq(Literal(1)))))).trim ===
+ makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", "file:/tmp/foo/b=1")))
+
+ assert(
+ listConflictingPartitionColumns(
+ Seq(
+ (new Path("file:/tmp/foo/a=1/_temporary"), PartitionValues(Seq("a"), Seq(Literal(1)))),
+ (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))))).trim ===
+ makeExpectedMessage(
+ Seq("a"),
+ Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1")))
+
+ assert(
+ listConflictingPartitionColumns(
+ Seq(
+ (new Path("file:/tmp/foo/a=1"),
+ PartitionValues(Seq("a"), Seq(Literal(1)))),
+ (new Path("file:/tmp/foo/a=1/b=foo"),
+ PartitionValues(Seq("a", "b"), Seq(Literal(1), Literal("foo")))))).trim ===
+ makeExpectedMessage(
+ Seq("a", "a, b"),
+ Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo")))
+ }
}