aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala68
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala49
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala132
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala1
10 files changed, 235 insertions, 51 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 86bc3a1b6d..81962f8d63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -75,10 +75,11 @@ private[sql] object PartitioningUtils {
private[sql] def parsePartitions(
paths: Seq[Path],
defaultPartitionName: String,
- typeInference: Boolean): PartitionSpec = {
+ typeInference: Boolean,
+ basePaths: Set[Path]): PartitionSpec = {
// First, we need to parse every partition's path and see if we can find partition values.
- val (partitionValues, optBasePaths) = paths.map { path =>
- parsePartition(path, defaultPartitionName, typeInference)
+ val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
+ parsePartition(path, defaultPartitionName, typeInference, basePaths)
}.unzip
// We create pairs of (path -> path's partition value) here
@@ -101,11 +102,15 @@ private[sql] object PartitioningUtils {
// It will be recognised as conflicting directory structure:
// "hdfs://host:9000/invalidPath"
// "hdfs://host:9000/path"
- val basePaths = optBasePaths.flatMap(x => x)
+ val disvoeredBasePaths = optDiscoveredBasePaths.flatMap(x => x)
assert(
- basePaths.distinct.size == 1,
+ disvoeredBasePaths.distinct.size == 1,
"Conflicting directory structures detected. Suspicious paths:\b" +
- basePaths.distinct.mkString("\n\t", "\n\t", "\n\n"))
+ disvoeredBasePaths.distinct.mkString("\n\t", "\n\t", "\n\n") +
+ "If provided paths are partition directories, please set " +
+ "\"basePath\" in the options of the data source to specify the " +
+ "root directory of the table. If there are multiple root directories, " +
+ "please load them separately and then union them.")
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
@@ -131,7 +136,7 @@ private[sql] object PartitioningUtils {
/**
* Parses a single partition, returns column names and values of each partition column, also
- * the base path. For example, given:
+ * the path when we stop partition discovery. For example, given:
* {{{
* path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
* }}}
@@ -144,40 +149,63 @@ private[sql] object PartitioningUtils {
* Literal.create("hello", StringType),
* Literal.create(3.14, FloatType)))
* }}}
- * and the base path:
+ * and the path when we stop the discovery is:
* {{{
- * /path/to/partition
+ * hdfs://<host>:<port>/path/to/partition
* }}}
*/
private[sql] def parsePartition(
path: Path,
defaultPartitionName: String,
- typeInference: Boolean): (Option[PartitionValues], Option[Path]) = {
+ typeInference: Boolean,
+ basePaths: Set[Path]): (Option[PartitionValues], Option[Path]) = {
val columns = ArrayBuffer.empty[(String, Literal)]
// Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null
- var chopped = path
- var basePath = path
+ // currentPath is the current path that we will use to parse partition column value.
+ var currentPath: Path = path
while (!finished) {
// Sometimes (e.g., when speculative task is enabled), temporary directories may be left
- // uncleaned. Here we simply ignore them.
- if (chopped.getName.toLowerCase == "_temporary") {
+ // uncleaned. Here we simply ignore them.
+ if (currentPath.getName.toLowerCase == "_temporary") {
return (None, None)
}
- val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
- maybeColumn.foreach(columns += _)
- basePath = chopped
- chopped = chopped.getParent
- finished = (maybeColumn.isEmpty && !columns.isEmpty) || chopped.getParent == null
+ if (basePaths.contains(currentPath)) {
+ // If the currentPath is one of base paths. We should stop.
+ finished = true
+ } else {
+ // Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1.
+ // Once we get the string, we try to parse it and find the partition column and value.
+ val maybeColumn =
+ parsePartitionColumn(currentPath.getName, defaultPartitionName, typeInference)
+ maybeColumn.foreach(columns += _)
+
+ // Now, we determine if we should stop.
+ // When we hit any of the following cases, we will stop:
+ // - In this iteration, we could not parse the value of partition column and value,
+ // i.e. maybeColumn is None, and columns is not empty. At here we check if columns is
+ // empty to handle cases like /table/a=1/_temporary/something (we need to find a=1 in
+ // this case).
+ // - After we get the new currentPath, this new currentPath represent the top level dir
+ // i.e. currentPath.getParent == null. For the example of "/table/a=1/",
+ // the top level dir is "/table".
+ finished =
+ (maybeColumn.isEmpty && !columns.isEmpty) || currentPath.getParent == null
+
+ if (!finished) {
+ // For the above example, currentPath will be "/table/".
+ currentPath = currentPath.getParent
+ }
+ }
}
if (columns.isEmpty) {
(None, Some(path))
} else {
val (columnNames, values) = columns.reverse.unzip
- (Some(PartitionValues(columnNames, values)), Some(basePath))
+ (Some(PartitionValues(columnNames, values)), Some(currentPath))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 85b52f04c8..dca638b7f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -56,13 +56,14 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
val primitivesAsString = parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
new JSONRelation(
- None,
- samplingRatio,
- primitivesAsString,
- dataSchema,
- None,
- partitionColumns,
- paths)(sqlContext)
+ inputRDD = None,
+ samplingRatio = samplingRatio,
+ primitivesAsString = primitivesAsString,
+ maybeDataSchema = dataSchema,
+ maybePartitionSpec = None,
+ userDefinedPartitionColumns = partitionColumns,
+ paths = paths,
+ parameters = parameters)(sqlContext)
}
}
@@ -73,8 +74,10 @@ private[sql] class JSONRelation(
val maybeDataSchema: Option[StructType],
val maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
- override val paths: Array[String] = Array.empty[String])(@transient val sqlContext: SQLContext)
- extends HadoopFsRelation(maybePartitionSpec) {
+ override val paths: Array[String] = Array.empty[String],
+ parameters: Map[String, String] = Map.empty[String, String])
+ (@transient val sqlContext: SQLContext)
+ extends HadoopFsRelation(maybePartitionSpec, parameters) {
/** Constraints to be imposed on schema to be stored. */
private def checkConstraints(schema: StructType): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 21337b2932..cb0aab8cc0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -109,7 +109,7 @@ private[sql] class ParquetRelation(
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
val sqlContext: SQLContext)
- extends HadoopFsRelation(maybePartitionSpec)
+ extends HadoopFsRelation(maybePartitionSpec, parameters)
with Logging {
private[sql] def this(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 4b8b8e4e74..fbd387bc2e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -71,9 +71,10 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
private[sql] class TextRelation(
val maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
- override val paths: Array[String] = Array.empty[String])
+ override val paths: Array[String] = Array.empty[String],
+ parameters: Map[String, String] = Map.empty[String, String])
(@transient val sqlContext: SQLContext)
- extends HadoopFsRelation(maybePartitionSpec) {
+ extends HadoopFsRelation(maybePartitionSpec, parameters) {
/** Data schema is always a single column, named "text". */
override def dataSchema: StructType = new StructType().add("value", StringType)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 2be6cd4533..b3d3bdf50d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -416,12 +416,19 @@ abstract class OutputWriter {
* @since 1.4.0
*/
@Experimental
-abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec])
+abstract class HadoopFsRelation private[sql](
+ maybePartitionSpec: Option[PartitionSpec],
+ parameters: Map[String, String])
extends BaseRelation with FileRelation with Logging {
override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]")
- def this() = this(None)
+ def this() = this(None, Map.empty[String, String])
+
+ def this(parameters: Map[String, String]) = this(None, parameters)
+
+ private[sql] def this(maybePartitionSpec: Option[PartitionSpec]) =
+ this(maybePartitionSpec, Map.empty[String, String])
private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
@@ -519,13 +526,37 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}
/**
- * Base paths of this relation. For partitioned relations, it should be either root directories
+ * Paths of this relation. For partitioned relations, it should be root directories
* of all partition directories.
*
* @since 1.4.0
*/
def paths: Array[String]
+ /**
+ * Contains a set of paths that are considered as the base dirs of the input datasets.
+ * The partitioning discovery logic will make sure it will stop when it reaches any
+ * base path. By default, the paths of the dataset provided by users will be base paths.
+ * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path
+ * will be `/path/something=true/`, and the returned DataFrame will not contain a column of
+ * `something`. If users want to override the basePath. They can set `basePath` in the options
+ * to pass the new base path to the data source.
+ * For the above example, if the user-provided base path is `/path/`, the returned
+ * DataFrame will have the column of `something`.
+ */
+ private def basePaths: Set[Path] = {
+ val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
+ userDefinedBasePath.getOrElse {
+ // If the user does not provide basePath, we will just use paths.
+ val pathSet = paths.toSet
+ pathSet.map(p => new Path(p))
+ }.map { hdfsPath =>
+ // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
+ val fs = hdfsPath.getFileSystem(hadoopConf)
+ hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ }
+ }
+
override def inputFiles: Array[String] = cachedLeafStatuses().map(_.getPath.toString).toArray
override def sizeInBytes: Long = cachedLeafStatuses().map(_.getLen).sum
@@ -559,7 +590,10 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
userDefinedPartitionColumns match {
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
val spec = PartitioningUtils.parsePartitions(
- leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference = false)
+ leafDirs,
+ PartitioningUtils.DEFAULT_PARTITION_NAME,
+ typeInference = false,
+ basePaths = basePaths)
// Without auto inference, all of value in the `row` should be null or in StringType,
// we need to cast into the data type that user specified.
@@ -577,8 +611,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
case _ =>
// user did not provide a partitioning schema
- PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME,
- typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled())
+ PartitioningUtils.parsePartitions(
+ leafDirs,
+ PartitioningUtils.DEFAULT_PARTITION_NAME,
+ typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled(),
+ basePaths = basePaths)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 2ac87ad6cd..458786f77a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -294,7 +294,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
checkAnswer(
- sqlContext.read.parquet(path).filter("part = 1"),
+ sqlContext.read.parquet(dir.getCanonicalPath).filter("part = 1"),
(1 to 3).map(i => Row(i, i.toString, 1)))
}
}
@@ -311,7 +311,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
checkAnswer(
- sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"),
+ sqlContext.read.parquet(dir.getCanonicalPath).filter("a > 0 and (part = 0 or a > 1)"),
(2 to 3).map(i => Row(i, i.toString, 1)))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 61cc0da508..71e9034d97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -66,7 +66,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
"hdfs://host:9000/path/a=10.5/b=hello")
var exception = intercept[AssertionError] {
- parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+ parsePartitions(paths.map(new Path(_)), defaultPartitionName, true, Set.empty[Path])
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
@@ -76,7 +76,37 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/_temporary/path")
- parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+ parsePartitions(
+ paths.map(new Path(_)),
+ defaultPartitionName,
+ true,
+ Set(new Path("hdfs://host:9000/path/")))
+
+ // Valid
+ paths = Seq(
+ "hdfs://host:9000/path/something=true/table/",
+ "hdfs://host:9000/path/something=true/table/_temporary",
+ "hdfs://host:9000/path/something=true/table/a=10/b=20",
+ "hdfs://host:9000/path/something=true/table/_temporary/path")
+
+ parsePartitions(
+ paths.map(new Path(_)),
+ defaultPartitionName,
+ true,
+ Set(new Path("hdfs://host:9000/path/something=true/table")))
+
+ // Valid
+ paths = Seq(
+ "hdfs://host:9000/path/table=true/",
+ "hdfs://host:9000/path/table=true/_temporary",
+ "hdfs://host:9000/path/table=true/a=10/b=20",
+ "hdfs://host:9000/path/table=true/_temporary/path")
+
+ parsePartitions(
+ paths.map(new Path(_)),
+ defaultPartitionName,
+ true,
+ Set(new Path("hdfs://host:9000/path/table=true")))
// Invalid
paths = Seq(
@@ -85,7 +115,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
"hdfs://host:9000/path/path1")
exception = intercept[AssertionError] {
- parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+ parsePartitions(
+ paths.map(new Path(_)),
+ defaultPartitionName,
+ true,
+ Set(new Path("hdfs://host:9000/path/")))
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
@@ -101,19 +135,24 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
"hdfs://host:9000/tmp/tables/nonPartitionedTable2")
exception = intercept[AssertionError] {
- parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
+ parsePartitions(
+ paths.map(new Path(_)),
+ defaultPartitionName,
+ true,
+ Set(new Path("hdfs://host:9000/tmp/tables/")))
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
}
test("parse partition") {
def check(path: String, expected: Option[PartitionValues]): Unit = {
- assert(expected === parsePartition(new Path(path), defaultPartitionName, true)._1)
+ val actual = parsePartition(new Path(path), defaultPartitionName, true, Set.empty[Path])._1
+ assert(expected === actual)
}
def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
val message = intercept[T] {
- parsePartition(new Path(path), defaultPartitionName, true)
+ parsePartition(new Path(path), defaultPartitionName, true, Set.empty[Path])
}.getMessage
assert(message.contains(expected))
@@ -152,8 +191,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
test("parse partitions") {
- def check(paths: Seq[String], spec: PartitionSpec): Unit = {
- assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) === spec)
+ def check(
+ paths: Seq[String],
+ spec: PartitionSpec,
+ rootPaths: Set[Path] = Set.empty[Path]): Unit = {
+ val actualSpec =
+ parsePartitions(
+ paths.map(new Path(_)),
+ defaultPartitionName,
+ true,
+ rootPaths)
+ assert(actualSpec === spec)
}
check(Seq(
@@ -232,7 +280,9 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
test("parse partitions with type inference disabled") {
def check(paths: Seq[String], spec: PartitionSpec): Unit = {
- assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, false) === spec)
+ val actualSpec =
+ parsePartitions(paths.map(new Path(_)), defaultPartitionName, false, Set.empty[Path])
+ assert(actualSpec === spec)
}
check(Seq(
@@ -590,6 +640,70 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}
+ test("SPARK-11678: Partition discovery stops at the root path of the dataset") {
+ withTempPath { dir =>
+ val tablePath = new File(dir, "key=value")
+ val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+ df.write
+ .format("parquet")
+ .partitionBy("b", "c", "d")
+ .save(tablePath.getCanonicalPath)
+
+ Files.touch(new File(s"${tablePath.getCanonicalPath}/", "_SUCCESS"))
+ Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar"))
+
+ checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df)
+ }
+
+ withTempPath { dir =>
+ val path = new File(dir, "key=value")
+ val tablePath = new File(path, "table")
+
+ val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+ df.write
+ .format("parquet")
+ .partitionBy("b", "c", "d")
+ .save(tablePath.getCanonicalPath)
+
+ Files.touch(new File(s"${tablePath.getCanonicalPath}/", "_SUCCESS"))
+ Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar"))
+
+ checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df)
+ }
+ }
+
+ test("use basePath to specify the root dir of a partitioned table.") {
+ withTempPath { dir =>
+ val tablePath = new File(dir, "table")
+ val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+ df.write
+ .format("parquet")
+ .partitionBy("b", "c", "d")
+ .save(tablePath.getCanonicalPath)
+
+ val twoPartitionsDF =
+ sqlContext
+ .read
+ .option("basePath", tablePath.getCanonicalPath)
+ .parquet(
+ s"${tablePath.getCanonicalPath}/b=1",
+ s"${tablePath.getCanonicalPath}/b=2")
+
+ checkAnswer(twoPartitionsDF, df.filter("b != 3"))
+
+ intercept[AssertionError] {
+ sqlContext
+ .read
+ .parquet(
+ s"${tablePath.getCanonicalPath}/b=1",
+ s"${tablePath.getCanonicalPath}/b=2")
+ }
+ }
+ }
+
test("listConflictingPartitionColumns") {
def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = {
val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 45de567039..1136670b7a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -157,7 +157,7 @@ private[sql] class OrcRelation(
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
@transient val sqlContext: SQLContext)
- extends HadoopFsRelation(maybePartitionSpec)
+ extends HadoopFsRelation(maybePartitionSpec, parameters)
with Logging {
private[sql] def this(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index bdc48a383b..01960fd290 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -89,7 +89,7 @@ class SimpleTextRelation(
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
@transient val sqlContext: SQLContext)
- extends HadoopFsRelation {
+ extends HadoopFsRelation(parameters) {
import sqlContext.sparkContext
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 100b97137c..665e87e3e3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -486,6 +486,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
val df = sqlContext.read
.format(dataSourceName)
.option("dataSchema", dataSchema.json)
+ .option("basePath", file.getCanonicalPath)
.load(s"${file.getCanonicalPath}/p1=*/p2=???")
val expectedPaths = Set(