aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-11-13 18:36:56 +0800
committerCheng Lian <lian@databricks.com>2015-11-13 18:36:56 +0800
commit7b5d9051cf91c099458d092a6705545899134b3b (patch)
treec6368373308a4d8b207ea7c18cd1e948ed5f0836
parentec80c0c2fc63360ee6b5872c24e6c67779ac63f4 (diff)
downloadspark-7b5d9051cf91c099458d092a6705545899134b3b.tar.gz
spark-7b5d9051cf91c099458d092a6705545899134b3b.tar.bz2
spark-7b5d9051cf91c099458d092a6705545899134b3b.zip
[SPARK-11678][SQL] Partition discovery should stop at the root path of the table.
https://issues.apache.org/jira/browse/SPARK-11678 The change of this PR is to pass root paths of table to the partition discovery logic. So, the process of partition discovery stops at those root paths instead of going all the way to the root path of the file system. Author: Yin Huai <yhuai@databricks.com> Closes #9651 from yhuai/SPARK-11678.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala68
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala49
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala132
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala1
10 files changed, 235 insertions, 51 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 86bc3a1b6d..81962f8d63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -75,10 +75,11 @@ private[sql] object PartitioningUtils {
private[sql] def parsePartitions(
paths: Seq[Path],
defaultPartitionName: String,
- typeInference: Boolean): PartitionSpec = {
+ typeInference: Boolean,
+ basePaths: Set[Path]): PartitionSpec = {
// First, we need to parse every partition's path and see if we can find partition values.
- val (partitionValues, optBasePaths) = paths.map { path =>
- parsePartition(path, defaultPartitionName, typeInference)
+ val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
+ parsePartition(path, defaultPartitionName, typeInference, basePaths)
}.unzip
// We create pairs of (path -> path's partition value) here
@@ -101,11 +102,15 @@ private[sql] object PartitioningUtils {
// It will be recognised as conflicting directory structure:
// "hdfs://host:9000/invalidPath"
// "hdfs://host:9000/path"
- val basePaths = optBasePaths.flatMap(x => x)
+ val disvoeredBasePaths = optDiscoveredBasePaths.flatMap(x => x)
assert(
- basePaths.distinct.size == 1,
+ disvoeredBasePaths.distinct.size == 1,
"Conflicting directory structures detected. Suspicious paths:\b" +
- basePaths.distinct.mkString("\n\t", "\n\t", "\n\n"))
+ disvoeredBasePaths.distinct.mkString("\n\t", "\n\t", "\n\n") +
+ "If provided paths are partition directories, please set " +
+ "\"basePath\" in the options of the data source to specify the " +
+ "root directory of the table. If there are multiple root directories, " +
+ "please load them separately and then union them.")
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
@@ -131,7 +136,7 @@ private[sql] object PartitioningUtils {
/**
* Parses a single partition, returns column names and values of each partition column, also
- * the base path. For example, given:
+ * the path when we stop partition discovery. For example, given:
* {{{
* path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
* }}}
@@ -144,40 +149,63 @@ private[sql] object PartitioningUtils {
* Literal.create("hello", StringType),
* Literal.create(3.14, FloatType)))
* }}}
- * and the base path:
+ * and the path when we stop the discovery is:
* {{{
- * /path/to/partition
+ * hdfs://<host>:<port>/path/to/partition
* }}}
*/
private[sql] def parsePartition(
path: Path,
defaultPartitionName: String,
- typeInference: Boolean): (Option[PartitionValues], Option[Path]) = {
+ typeInference: Boolean,
+ basePaths: Set[Path]): (Option[PartitionValues], Option[Path]) = {
val columns = ArrayBuffer.empty[(String, Literal)]
// Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null
- var chopped = path
- var basePath = path
+ // currentPath is the current path that we will use to parse partition column value.
+ var currentPath: Path = path
while (!finished) {
// Sometimes (e.g., when speculative task is enabled), temporary directories may be left
- // uncleaned. Here we simply ignore them.
- if (chopped.getName.toLowerCase == "_temporary") {
+ // uncleaned. Here we simply ignore them.
+ if (currentPath.getName.toLowerCase == "_temporary") {
return (None, None)
}
- val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
- maybeColumn.foreach(columns += _)
- basePath = chopped
- chopped = chopped.getParent
- finished = (maybeColumn.isEmpty && !columns.isEmpty) || chopped.getParent == null
+ if (basePaths.contains(currentPath)) {
+ // If the currentPath is one of base paths. We should stop.
+ finished = true
+ } else {
+ // Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1.
+ // Once we get the string, we try to parse it and find the partition column and value.
+ val maybeColumn =
+ parsePartitionColumn(currentPath.getName, defaultPartitionName, typeInference)
+ maybeColumn.foreach(columns += _)
+
+ // Now, we determine if we should stop.
+ // When we hit any of the following cases, we will stop:
+ // - In this iteration, we could not parse the value of partition column and value,
+ // i.e. maybeColumn is None, and columns is not empty. At here we check if columns is
+ // empty to handle cases like /table/a=1/_temporary/something (we need to find a=1 in
+ // this case).
+ // - After we get the new currentPath, this new currentPath represent the top level dir
+ // i.e. currentPath.getParent == null. For the example of "/table/a=1/",
+ // the top level dir is "/table".
+ finished =
+ (maybeColumn.isEmpty && !columns.isEmpty) || currentPath.getParent == null
+
+ if (!finished) {
+ // For the above example, currentPath will be "/table/".
+ currentPath = currentPath.getParent
+ }
+ }
}
if (columns.isEmpty) {
(None, Some(path))
} else {
val (columnNames, values) = columns.reverse.unzip
- (Some(PartitionValues(columnNames, values)), Some(basePath))
+ (Some(PartitionValues(columnNames, values)), Some(currentPath))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 85b52f04c8..dca638b7f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -56,13 +56,14 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
val primitivesAsString = parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
new JSONRelation(
- None,
- samplingRatio,
- primitivesAsString,
- dataSchema,
- None,
- partitionColumns,
- paths)(sqlContext)
+ inputRDD = None,
+ samplingRatio = samplingRatio,
+ primitivesAsString = primitivesAsString,
+ maybeDataSchema = dataSchema,
+ maybePartitionSpec = None,
+ userDefinedPartitionColumns = partitionColumns,
+ paths = paths,
+ parameters = parameters)(sqlContext)
}
}
@@ -73,8 +74,10 @@ private[sql] class JSONRelation(
val maybeDataSchema: Option[StructType],
val maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
- override val paths: Array[String] = Array.empty[String])(@transient val sqlContext: SQLContext)
- extends HadoopFsRelation(maybePartitionSpec) {
+ override val paths: Array[String] = Array.empty[String],
+ parameters: Map[String, String] = Map.empty[String, String])
+ (@transient val sqlContext: SQLContext)
+ extends HadoopFsRelation(maybePartitionSpec, parameters) {
/** Constraints to be imposed on schema to be stored. */
private def checkConstraints(schema: StructType): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 21337b2932..cb0aab8cc0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -109,7 +109,7 @@ private[sql] class ParquetRelation(
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
val sqlContext: SQLContext)
- extends HadoopFsRelation(maybePartitionSpec)
+ extends HadoopFsRelation(maybePartitionSpec, parameters)
with Logging {
private[sql] def this(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 4b8b8e4e74..fbd387bc2e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -71,9 +71,10 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
private[sql] class TextRelation(
val maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
- override val paths: Array[String] = Array.empty[String])
+ override val paths: Array[String] = Array.empty[String],
+ parameters: Map[String, String] = Map.empty[String, String])
(@transient val sqlContext: SQLContext)
- extends HadoopFsRelation(maybePartitionSpec) {
+ extends HadoopFsRelation(maybePartitionSpec, parameters) {
/** Data schema is always a single column, named "text". */
override def dataSchema: StructType = new StructType().add("value", StringType)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 2be6cd4533..b3d3bdf50d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -416,12 +416,19 @@ abstract class OutputWriter {
* @since 1.4.0
*/
@Experimental
-abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec])
+abstract class HadoopFsRelation private[sql](
+ maybePartitionSpec: Option[PartitionSpec],
+ parameters: Map[String, String])
extends BaseRelation with FileRelation with Logging {
override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]")
- def this() = this(None)
+ def this() = this(None, Map.empty[String, String])
+
+ def this(parameters: Map[String, String]) = this(None, parameters)
+
+ private[sql] def this(maybePartitionSpec: Option[PartitionSpec]) =
+ this(maybePartitionSpec, Map.empty[String, String])
private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
@@ -519,13 +526,37 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}
/**
- * Base paths of this relation. For partitioned relations, it should be either root directories
+ * Paths of this relation. For partitioned relations, it should be root directories
* of all partition directories.
*
* @since 1.4.0
*/
def paths: Array[String]
+ /**
+ * Contains a set of paths that are considered as the base dirs of the input datasets.
+ * The partitioning discovery logic will make sure it will stop when it reaches any
+ * base path. By default, the paths of the dataset provided by users will be base paths.
+ * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path
+ * will be `/path/something=true/`, and the returned DataFrame will not contain a column of
+ * `something`. If users want to override the basePath. They can set `basePath` in the options
+ * to pass the new base path to the data source.
+ * For the above example, if the user-provided base path is `/path/`, the returned
+ * DataFrame will have the column of `something`.
+ */
+ private def basePaths: Set[Path] = {
+ val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
+ userDefinedBasePath.getOrElse {
+ // If the user does not provide basePath, we will just use paths.
+ val pathSet = paths.toSet
+ pathSet.map(p => new Path(p))
+ }.map { hdfsPath =>
+ // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
+ val fs = hdfsPath.getFileSystem(hadoopConf)
+ hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ }
+ }
+
override def inputFiles: Array[String] = cachedLeafStatuses().map(_.getPath.toString).toArray
override def sizeInBytes: Long = cachedLeafStatuses().map(_.getLen).sum
@@ -559,7 +590,10 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
userDefinedPartitionColumns match {
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
val spec = PartitioningUtils.parsePartitions(
- leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference = false)
+ leafDirs,
+ PartitioningUtils.DEFAULT_PARTITION_NAME,
+ typeInference = false,
+ basePaths = basePaths)
// Without auto inference, all of value in the `row` should be null or in StringType,
// we need to cast into the data type that user specified.
@@ -577,8 +611,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
case _ =>
// user did not provide a partitioning schema
- PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME,
- typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled())
+ PartitioningUtils.parsePartitions(
+ leafDirs,
+ PartitioningUtils.DEFAULT_PARTITION_NAME,
+ typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled(),
+ basePaths = basePaths)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 2ac87ad6cd..458786f77a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -294,7 +294,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
checkAnswer(
- sqlContext.read.parquet(path).filter("part = 1"),
+ sqlContext.read.parquet(dir.getCanonicalPath).filter("part = 1"),
(1 to 3).map(i => Row(i, i.toString, 1)))
}
}
@@ -311,7 +311,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
checkAnswer(
- sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"),
+ sqlContext.read.parquet(dir.getCanonicalPath).filter("a > 0 and (part = 0 or a > 1)"),
(2 to 3).map(i => Row(i, i.toString, 1)))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 61cc0da508..71e9034d97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -66,7 +66,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
"hdfs://host:9000/path/a=10.5/b=hello")
var exception = intercept[AssertionError] {
- parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+ parsePartitions(paths.map(new Path(_)), defaultPartitionName, true, Set.empty[Path])
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
@@ -76,7 +76,37 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/_temporary/path")
- parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+ parsePartitions(
+ paths.map(new Path(_)),
+ defaultPartitionName,
+ true,
+ Set(new Path("hdfs://host:9000/path/")))
+
+ // Valid
+ paths = Seq(
+ "hdfs://host:9000/path/something=true/table/",
+ "hdfs://host:9000/path/something=true/table/_temporary",
+ "hdfs://host:9000/path/something=true/table/a=10/b=20",
+ "hdfs://host:9000/path/something=true/table/_temporary/path")
+
+ parsePartitions(
+ paths.map(new Path(_)),
+ defaultPartitionName,
+ true,
+ Set(new Path("hdfs://host:9000/path/something=true/table")))
+
+ // Valid
+ paths = Seq(
+ "hdfs://host:9000/path/table=true/",
+ "hdfs://host:9000/path/table=true/_temporary",
+ "hdfs://host:9000/path/table=true/a=10/b=20",
+ "hdfs://host:9000/path/table=true/_temporary/path")
+
+ parsePartitions(
+ paths.map(new Path(_)),
+ defaultPartitionName,
+ true,
+ Set(new Path("hdfs://host:9000/path/table=true")))
// Invalid
paths = Seq(
@@ -85,7 +115,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
"hdfs://host:9000/path/path1")
exception = intercept[AssertionError] {
- parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+ parsePartitions(
+ paths.map(new Path(_)),
+ defaultPartitionName,
+ true,
+ Set(new Path("hdfs://host:9000/path/")))
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
@@ -101,19 +135,24 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
"hdfs://host:9000/tmp/tables/nonPartitionedTable2")
exception = intercept[AssertionError] {
- parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+ parsePartitions(
+ paths.map(new Path(_)),
+ defaultPartitionName,
+ true,
+ Set(new Path("hdfs://host:9000/tmp/tables/")))
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
}
test("parse partition") {
def check(path: String, expected: Option[PartitionValues]): Unit = {
- assert(expected === parsePartition(new Path(path), defaultPartitionName, true)._1)
+ val actual = parsePartition(new Path(path), defaultPartitionName, true, Set.empty[Path])._1
+ assert(expected === actual)
}
def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
val message = intercept[T] {
- parsePartition(new Path(path), defaultPartitionName, true)
+ parsePartition(new Path(path), defaultPartitionName, true, Set.empty[Path])
}.getMessage
assert(message.contains(expected))
@@ -152,8 +191,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
test("parse partitions") {
- def check(paths: Seq[String], spec: PartitionSpec): Unit = {
- assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) === spec)
+ def check(
+ paths: Seq[String],
+ spec: PartitionSpec,
+ rootPaths: Set[Path] = Set.empty[Path]): Unit = {
+ val actualSpec =
+ parsePartitions(
+ paths.map(new Path(_)),
+ defaultPartitionName,
+ true,
+ rootPaths)
+ assert(actualSpec === spec)
}
check(Seq(
@@ -232,7 +280,9 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
test("parse partitions with type inference disabled") {
def check(paths: Seq[String], spec: PartitionSpec): Unit = {
- assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, false) === spec)
+ val actualSpec =
+ parsePartitions(paths.map(new Path(_)), defaultPartitionName, false, Set.empty[Path])
+ assert(actualSpec === spec)
}
check(Seq(
@@ -590,6 +640,70 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}
+ test("SPARK-11678: Partition discovery stops at the root path of the dataset") {
+ withTempPath { dir =>
+ val tablePath = new File(dir, "key=value")
+ val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+ df.write
+ .format("parquet")
+ .partitionBy("b", "c", "d")
+ .save(tablePath.getCanonicalPath)
+
+ Files.touch(new File(s"${tablePath.getCanonicalPath}/", "_SUCCESS"))
+ Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar"))
+
+ checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df)
+ }
+
+ withTempPath { dir =>
+ val path = new File(dir, "key=value")
+ val tablePath = new File(path, "table")
+
+ val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+ df.write
+ .format("parquet")
+ .partitionBy("b", "c", "d")
+ .save(tablePath.getCanonicalPath)
+
+ Files.touch(new File(s"${tablePath.getCanonicalPath}/", "_SUCCESS"))
+ Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar"))
+
+ checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df)
+ }
+ }
+
+ test("use basePath to specify the root dir of a partitioned table.") {
+ withTempPath { dir =>
+ val tablePath = new File(dir, "table")
+ val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+ df.write
+ .format("parquet")
+ .partitionBy("b", "c", "d")
+ .save(tablePath.getCanonicalPath)
+
+ val twoPartitionsDF =
+ sqlContext
+ .read
+ .option("basePath", tablePath.getCanonicalPath)
+ .parquet(
+ s"${tablePath.getCanonicalPath}/b=1",
+ s"${tablePath.getCanonicalPath}/b=2")
+
+ checkAnswer(twoPartitionsDF, df.filter("b != 3"))
+
+ intercept[AssertionError] {
+ sqlContext
+ .read
+ .parquet(
+ s"${tablePath.getCanonicalPath}/b=1",
+ s"${tablePath.getCanonicalPath}/b=2")
+ }
+ }
+ }
+
test("listConflictingPartitionColumns") {
def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = {
val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 45de567039..1136670b7a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -157,7 +157,7 @@ private[sql] class OrcRelation(
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
@transient val sqlContext: SQLContext)
- extends HadoopFsRelation(maybePartitionSpec)
+ extends HadoopFsRelation(maybePartitionSpec, parameters)
with Logging {
private[sql] def this(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index bdc48a383b..01960fd290 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -89,7 +89,7 @@ class SimpleTextRelation(
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
@transient val sqlContext: SQLContext)
- extends HadoopFsRelation {
+ extends HadoopFsRelation(parameters) {
import sqlContext.sparkContext
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 100b97137c..665e87e3e3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -486,6 +486,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
val df = sqlContext.read
.format(dataSourceName)
.option("dataSchema", dataSchema.json)
+ .option("basePath", file.getCanonicalPath)
.load(s"${file.getCanonicalPath}/p1=*/p2=???")
val expectedPaths = Set(