aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-05-04 18:47:27 -0700
committerYin Huai <yhuai@databricks.com>2016-05-04 18:47:27 -0700
commitef55e46c9225ddceebeaf19398519cbe651c1728 (patch)
treeac9e67ee7b122ba41468e3ca3fa1455cd44d389d
parent8fb1463d6a832f187f323d97635e5bec1e93c6f3 (diff)
downloadspark-ef55e46c9225ddceebeaf19398519cbe651c1728.tar.gz
spark-ef55e46c9225ddceebeaf19398519cbe651c1728.tar.bz2
spark-ef55e46c9225ddceebeaf19398519cbe651c1728.zip
[SPARK-14993][SQL] Fix Partition Discovery Inconsistency when Input is a Path to Parquet File
#### What changes were proposed in this pull request? When we load a dataset, if we set the path to ```/path/a=1```, we will not take `a` as the partitioning column. However, if we set the path to ```/path/a=1/file.parquet```, we take `a` as the partitioning column and it shows up in the schema. This PR is to fix the behavior inconsistency issue. The base path contains a set of paths that are considered as the base dirs of the input datasets. The partitioning discovery logic will make sure it will stop when it reaches any base path. By default, the paths of the dataset provided by users will be base paths. Below are three typical cases, **Case 1**```sqlContext.read.parquet("/path/something=true/")```: the base path will be `/path/something=true/`, and the returned DataFrame will not contain a column of `something`. **Case 2**```sqlContext.read.parquet("/path/something=true/a.parquet")```: the base path will be still `/path/something=true/`, and the returned DataFrame will also not contain a column of `something`. **Case 3**```sqlContext.read.parquet("/path/")```: the base path will be `/path/`, and the returned DataFrame will have the column of `something`. Users also can override the basePath by setting `basePath` in the options to pass the new base path to the data source. For example, ```sqlContext.read.option("basePath", "/path/").parquet("/path/something=true/")```, and the returned DataFrame will have the column of `something`. The related PRs: - https://github.com/apache/spark/pull/9651 - https://github.com/apache/spark/pull/10211 #### How was this patch tested? Added a couple of test cases Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #12828 from gatorsmile/readPartitionedTable.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala42
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala60
2 files changed, 88 insertions, 14 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 9d997d6285..2c44b399cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -133,23 +133,37 @@ abstract class PartitioningAwareFileCatalog(
/**
* Contains a set of paths that are considered as the base dirs of the input datasets.
* The partitioning discovery logic will make sure it will stop when it reaches any
- * base path. By default, the paths of the dataset provided by users will be base paths.
- * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path
- * will be `/path/something=true/`, and the returned DataFrame will not contain a column of
- * `something`. If users want to override the basePath. They can set `basePath` in the options
- * to pass the new base path to the data source.
- * For the above example, if the user-provided base path is `/path/`, the returned
+ * base path.
+ *
+ * By default, the paths of the dataset provided by users will be base paths.
+ * Below are three typical examples,
+ * Case 1) `sqlContext.read.parquet("/path/something=true/")`: the base path will be
+ * `/path/something=true/`, and the returned DataFrame will not contain a column of `something`.
+ * Case 2) `sqlContext.read.parquet("/path/something=true/a.parquet")`: the base path will be
+ * still `/path/something=true/`, and the returned DataFrame will also not contain a column of
+ * `something`.
+ * Case 3) `sqlContext.read.parquet("/path/")`: the base path will be `/path/`, and the returned
* DataFrame will have the column of `something`.
+ *
+ * Users also can override the basePath by setting `basePath` in the options to pass the new base
+ * path to the data source.
+ * For example, `sqlContext.read.option("basePath", "/path/").parquet("/path/something=true/")`,
+ * and the returned DataFrame will have the column of `something`.
*/
private def basePaths: Set[Path] = {
- val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
- userDefinedBasePath.getOrElse {
- // If the user does not provide basePath, we will just use paths.
- paths.toSet
- }.map { hdfsPath =>
- // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
- val fs = hdfsPath.getFileSystem(hadoopConf)
- hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ parameters.get("basePath").map(new Path(_)) match {
+ case Some(userDefinedBasePath) =>
+ val fs = userDefinedBasePath.getFileSystem(hadoopConf)
+ if (!fs.isDirectory(userDefinedBasePath)) {
+ throw new IllegalArgumentException("Option 'basePath' must be a directory")
+ }
+ Set(fs.makeQualified(userDefinedBasePath))
+
+ case None =>
+ paths.map { path =>
+ // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
+ val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path)
+ if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 5bffb307ec..cb2c2522b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -191,6 +191,29 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
checkThrows[AssertionError]("file://path/a=", "Empty partition column value")
}
+ test("parse partition with base paths") {
+ // when the basePaths is the same as the path to a leaf directory
+ val partitionSpec1: Option[PartitionValues] = parsePartition(
+ path = new Path("file://path/a=10"),
+ defaultPartitionName = defaultPartitionName,
+ typeInference = true,
+ basePaths = Set(new Path("file://path/a=10")))._1
+
+ assert(partitionSpec1.isEmpty)
+
+ // when the basePaths is the path to a base directory of leaf directories
+ val partitionSpec2: Option[PartitionValues] = parsePartition(
+ path = new Path("file://path/a=10"),
+ defaultPartitionName = defaultPartitionName,
+ typeInference = true,
+ basePaths = Set(new Path("file://path")))._1
+
+ assert(partitionSpec2 ==
+ Option(PartitionValues(
+ ArrayBuffer("a"),
+ ArrayBuffer(Literal.create(10, IntegerType)))))
+ }
+
test("parse partitions") {
def check(
paths: Seq[String],
@@ -413,6 +436,43 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}
+ test("read partitioned table using different path options") {
+ withTempDir { base =>
+ val pi = 1
+ val ps = "foo"
+ val path = makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)
+ makeParquetFile(
+ (1 to 10).map(i => ParquetData(i, i.toString)), path)
+
+ // when the input is the base path containing partitioning directories
+ val baseDf = sqlContext.read.parquet(base.getCanonicalPath)
+ assert(baseDf.schema.map(_.name) === Seq("intField", "stringField", "pi", "ps"))
+
+ // when the input is a path to the leaf directory containing a parquet file
+ val partDf = sqlContext.read.parquet(path.getCanonicalPath)
+ assert(partDf.schema.map(_.name) === Seq("intField", "stringField"))
+
+ path.listFiles().foreach { f =>
+ if (f.getName.toLowerCase().endsWith(".parquet")) {
+ // when the input is a path to a parquet file
+ val df = sqlContext.read.parquet(f.getCanonicalPath)
+ assert(df.schema.map(_.name) === Seq("intField", "stringField"))
+ }
+ }
+
+ path.listFiles().foreach { f =>
+ if (f.getName.toLowerCase().endsWith(".parquet")) {
+ // when the input is a path to a parquet file but `basePath` is overridden to
+ // the base path containing partitioning directories
+ val df = sqlContext
+ .read.option("basePath", base.getCanonicalPath)
+ .parquet(f.getCanonicalPath)
+ assert(df.schema.map(_.name) === Seq("intField", "stringField", "pi", "ps"))
+ }
+ }
+ }
+ }
+
test("read partitioned table - partition key included in Parquet file") {
withTempDir { base =>
for {