diff options
author | Cheng Lian <lian@databricks.com> | 2015-05-18 11:59:44 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-05-18 11:59:44 -0700 |
commit | 010a1c278037130a69dcc79427d2b0380a2c82d8 (patch) | |
tree | fd6de419ce487cf1dfcad292eafd155e7f3712a9 | |
parent | e1ac2a955be64b8df197195e3b225271cfa8201f (diff) | |
download | spark-010a1c278037130a69dcc79427d2b0380a2c82d8.tar.gz spark-010a1c278037130a69dcc79427d2b0380a2c82d8.tar.bz2 spark-010a1c278037130a69dcc79427d2b0380a2c82d8.zip |
[SPARK-7570] [SQL] Ignores _temporary during partition discovery
<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6091)
<!-- Reviewable:end -->
Author: Cheng Lian <lian@databricks.com>
Closes #6091 from liancheng/spark-7570 and squashes the following commits:
8ff07e8 [Cheng Lian] Ignores _temporary during partition discovery
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala | 15 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala | 31 |
2 files changed, 27 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index d1f0cdab55..8f8138d6eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -23,8 +23,7 @@ import java.math.{BigDecimal => JBigDecimal} import scala.collection.mutable.ArrayBuffer import scala.util.Try -import com.google.common.cache.{CacheBuilder, Cache} -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.Path import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} @@ -69,7 +68,7 @@ private[sql] object PartitioningUtils { private[sql] def parsePartitions( paths: Seq[Path], defaultPartitionName: String): PartitionSpec = { - val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName))) + val partitionValues = resolvePartitions(paths.flatMap(parsePartition(_, defaultPartitionName))) val fields = { val (PartitionValues(columnNames, literals)) = partitionValues.head columnNames.zip(literals).map { case (name, Literal(_, dataType)) => @@ -103,13 +102,19 @@ private[sql] object PartitioningUtils { */ private[sql] def parsePartition( path: Path, - defaultPartitionName: String): PartitionValues = { + defaultPartitionName: String): Option[PartitionValues] = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` var finished = path.getParent == null var chopped = path while (!finished) { + // Sometimes (e.g., when speculative task is enabled), temporary directories may be left + // uncleaned. Here we simply ignore them. + if (chopped.getName == "_temporary") { + return None + } + val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName) maybeColumn.foreach(columns += _) chopped = chopped.getParent @@ -117,7 +122,7 @@ private[sql] object PartitioningUtils { } val (columnNames, values) = columns.reverse.unzip - PartitionValues(columnNames, values) + Some(PartitionValues(columnNames, values)) } private def parsePartitionColumn( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index 8079c46071..1927114b8d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -54,44 +54,47 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { } test("parse partition") { - def check(path: String, expected: PartitionValues): Unit = { + def check(path: String, expected: Option[PartitionValues]): Unit = { assert(expected === parsePartition(new Path(path), defaultPartitionName)) } def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { val message = intercept[T] { - parsePartition(new Path(path), defaultPartitionName) + parsePartition(new Path(path), defaultPartitionName).get }.getMessage assert(message.contains(expected)) } - check( - "file:///", + check("file:///", Some { PartitionValues( ArrayBuffer.empty[String], - ArrayBuffer.empty[Literal])) + ArrayBuffer.empty[Literal]) + }) - check( - "file://path/a=10", + check("file://path/a=10", Some { PartitionValues( ArrayBuffer("a"), - ArrayBuffer(Literal.create(10, IntegerType)))) + ArrayBuffer(Literal.create(10, IntegerType))) + }) - check( - "file://path/a=10/b=hello/c=1.5", + check("file://path/a=10/b=hello/c=1.5", Some { PartitionValues( ArrayBuffer("a", "b", "c"), ArrayBuffer( Literal.create(10, IntegerType), Literal.create("hello", StringType), - Literal.create(1.5, FloatType)))) + Literal.create(1.5, FloatType))) + }) - check( - "file://path/a=10/b_hello/c=1.5", + check("file://path/a=10/b_hello/c=1.5", Some { PartitionValues( ArrayBuffer("c"), - ArrayBuffer(Literal.create(1.5, FloatType)))) + ArrayBuffer(Literal.create(1.5, FloatType))) + }) + + check("file://path/a=10/_temporary/c=1.5", None) + check("file://path/a=10/c=1.5/_temporary", None) checkThrows[AssertionError]("file://path/=10", "Empty partition column name") checkThrows[AssertionError]("file://path/a=", "Empty partition column value") |