aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-05-21 10:56:17 -0700
committerYin Huai <yhuai@databricks.com>2015-05-21 10:56:17 -0700
commit8730fbb47b09fcf955fe16dd03b75596db6d53b6 (patch)
treeeeb6bd4c9d95011f52ce8a7f522dc8e1e32608ff /sql/core
parent13348e21b6b1c0df42c18b82b86c613291228863 (diff)
downloadspark-8730fbb47b09fcf955fe16dd03b75596db6d53b6.tar.gz
spark-8730fbb47b09fcf955fe16dd03b75596db6d53b6.tar.bz2
spark-8730fbb47b09fcf955fe16dd03b75596db6d53b6.zip
[SPARK-7749] [SQL] Fixes partition discovery for non-partitioned tables
When no partition columns can be found, we should have an empty `PartitionSpec`, rather than a `PartitionSpec` with empty partition columns. This PR together with #6285 should fix SPARK-7749. Author: Cheng Lian <lian@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #6287 from liancheng/spark-7749 and squashes the following commits: a799ff3 [Cheng Lian] Adds test cases for SPARK-7749 c4949be [Cheng Lian] Minor refactoring, and tolerant _TEMPORARY directory name 5aa87ea [Yin Huai] Make parsePartitions more robust. fc56656 [Cheng Lian] Returns empty PartitionSpec if no partition columns can be inferred 19ae41e [Cheng Lian] Don't list base directory as leaf directory
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala84
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala49
3 files changed, 100 insertions, 40 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index 8f8138d6eb..e0ead23d78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -33,6 +33,10 @@ private[sql] case class Partition(values: Row, path: String)
private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])
+private[sql] object PartitionSpec {
+ val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition])
+}
+
private[sql] object PartitioningUtils {
// This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
// depend on Hive.
@@ -68,20 +72,37 @@ private[sql] object PartitioningUtils {
private[sql] def parsePartitions(
paths: Seq[Path],
defaultPartitionName: String): PartitionSpec = {
- val partitionValues = resolvePartitions(paths.flatMap(parsePartition(_, defaultPartitionName)))
- val fields = {
- val (PartitionValues(columnNames, literals)) = partitionValues.head
- columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
- StructField(name, dataType, nullable = true)
- }
+ // First, we need to parse every partition's path and see if we can find partition values.
+ val pathsWithPartitionValues = paths.flatMap { path =>
+ parsePartition(path, defaultPartitionName).map(path -> _)
}
- val partitions = partitionValues.zip(paths).map {
- case (PartitionValues(_, literals), path) =>
- Partition(Row(literals.map(_.value): _*), path.toString)
- }
+ if (pathsWithPartitionValues.isEmpty) {
+ // This dataset is not partitioned.
+ PartitionSpec.emptySpec
+ } else {
+ // This dataset is partitioned. We need to check whether all partitions have the same
+ // partition columns and resolve potential type conflicts.
+ val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues.map(_._2))
+
+ // Creates the StructType which represents the partition columns.
+ val fields = {
+ val PartitionValues(columnNames, literals) = resolvedPartitionValues.head
+ columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
+ // We always assume partition columns are nullable since we've no idea whether null values
+ // will be appended in the future.
+ StructField(name, dataType, nullable = true)
+ }
+ }
+
+ // Finally, we create `Partition`s based on paths and resolved partition values.
+ val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map {
+ case (PartitionValues(_, literals), (path, _)) =>
+ Partition(Row.fromSeq(literals.map(_.value)), path.toString)
+ }
- PartitionSpec(StructType(fields), partitions)
+ PartitionSpec(StructType(fields), partitions)
+ }
}
/**
@@ -111,7 +132,7 @@ private[sql] object PartitioningUtils {
while (!finished) {
// Sometimes (e.g., when speculative task is enabled), temporary directories may be left
// uncleaned. Here we simply ignore them.
- if (chopped.getName == "_temporary") {
+ if (chopped.getName.toLowerCase == "_temporary") {
return None
}
@@ -121,8 +142,12 @@ private[sql] object PartitioningUtils {
finished = maybeColumn.isEmpty || chopped.getParent == null
}
- val (columnNames, values) = columns.reverse.unzip
- Some(PartitionValues(columnNames, values))
+ if (columns.isEmpty) {
+ None
+ } else {
+ val (columnNames, values) = columns.reverse.unzip
+ Some(PartitionValues(columnNames, values))
+ }
}
private def parsePartitionColumn(
@@ -156,20 +181,25 @@ private[sql] object PartitioningUtils {
private[sql] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
// Column names of all partitions must match
val distinctPartitionsColNames = values.map(_.columnNames).distinct
- assert(distinctPartitionsColNames.size == 1, {
- val list = distinctPartitionsColNames.mkString("\t", "\n", "")
- s"Conflicting partition column names detected:\n$list"
- })
-
- // Resolves possible type conflicts for each column
- val columnCount = values.head.columnNames.size
- val resolvedValues = (0 until columnCount).map { i =>
- resolveTypeConflicts(values.map(_.literals(i)))
- }
- // Fills resolved literals back to each partition
- values.zipWithIndex.map { case (d, index) =>
- d.copy(literals = resolvedValues.map(_(index)))
+ if (distinctPartitionsColNames.isEmpty) {
+ Seq.empty
+ } else {
+ assert(distinctPartitionsColNames.size == 1, {
+ val list = distinctPartitionsColNames.mkString("\t", "\n", "")
+ s"Conflicting partition column names detected:\n$list"
+ })
+
+ // Resolves possible type conflicts for each column
+ val columnCount = values.head.columnNames.size
+ val resolvedValues = (0 until columnCount).map { i =>
+ resolveTypeConflicts(values.map(_.literals(i)))
+ }
+
+ // Fills resolved literals back to each partition
+ values.zipWithIndex.map { case (d, index) =>
+ d.copy(literals = resolvedValues.map(_(index)))
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 6a917bf38b..fcbac0d457 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -462,12 +462,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
private def discoverPartitions(): PartitionSpec = {
val leafDirs = fileStatusCache.leafDirs.keys.toSeq
-
- if (leafDirs.nonEmpty) {
- PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
- } else {
- PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition])
- }
+ PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 1927114b8d..907dbb0119 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.sources.PartitioningUtils._
-import org.apache.spark.sql.sources.{Partition, PartitionSpec}
+import org.apache.spark.sql.sources.{LogicalRelation, Partition, PartitionSpec}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, Row, SQLContext}
@@ -66,12 +66,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
assert(message.contains(expected))
}
- check("file:///", Some {
- PartitionValues(
- ArrayBuffer.empty[String],
- ArrayBuffer.empty[Literal])
- })
-
check("file://path/a=10", Some {
PartitionValues(
ArrayBuffer("a"),
@@ -93,6 +87,10 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
ArrayBuffer(Literal.create(1.5, FloatType)))
})
+ check("file:///", None)
+ check("file:///path/_temporary", None)
+ check("file:///path/_temporary/c=1.5", None)
+ check("file:///path/_temporary/path", None)
check("file://path/a=10/_temporary/c=1.5", None)
check("file://path/a=10/c=1.5/_temporary", None)
@@ -125,6 +123,25 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
Partition(Row(10.5, "hello"), "hdfs://host:9000/path/a=10.5/b=hello"))))
check(Seq(
+ "hdfs://host:9000/path/_temporary",
+ "hdfs://host:9000/path/a=10/b=20",
+ "hdfs://host:9000/path/a=10.5/b=hello",
+ "hdfs://host:9000/path/a=10.5/_temporary",
+ "hdfs://host:9000/path/a=10.5/_TeMpOrArY",
+ "hdfs://host:9000/path/a=10.5/b=hello/_temporary",
+ "hdfs://host:9000/path/a=10.5/b=hello/_TEMPORARY",
+ "hdfs://host:9000/path/_temporary/path",
+ "hdfs://host:9000/path/a=11/_temporary/path",
+ "hdfs://host:9000/path/a=10.5/b=world/_temporary/path"),
+ PartitionSpec(
+ StructType(Seq(
+ StructField("a", FloatType),
+ StructField("b", StringType))),
+ Seq(
+ Partition(Row(10, "20"), "hdfs://host:9000/path/a=10/b=20"),
+ Partition(Row(10.5, "hello"), "hdfs://host:9000/path/a=10.5/b=hello"))))
+
+ check(Seq(
s"hdfs://host:9000/path/a=10/b=20",
s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"),
PartitionSpec(
@@ -145,6 +162,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
Seq(
Partition(Row(10, null), s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
Partition(Row(10.5, null), s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"))))
+
+ check(Seq(
+ s"hdfs://host:9000/path1",
+ s"hdfs://host:9000/path2"),
+ PartitionSpec.emptySpec)
}
test("read partitioned table - normal case") {
@@ -334,4 +356,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
}
}
}
+
+ test("SPARK-7749 Non-partitioned table should have empty partition spec") {
+ withTempPath { dir =>
+ (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
+ val queryExecution = read.parquet(dir.getCanonicalPath).queryExecution
+ queryExecution.analyzed.collectFirst {
+ case LogicalRelation(relation: ParquetRelation2) =>
+ assert(relation.partitionSpec === PartitionSpec.emptySpec)
+ }.getOrElse {
+ fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")
+ }
+ }
+ }
}