aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala9
1 files changed, 8 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index be023273db..614a6261e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -47,6 +47,13 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contains glob patterns
}
+ private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ {
+ if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path")) {
+ Map("basePath" -> path)
+ } else {
+ Map()
+ }}
+
private val metadataLog =
new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
@@ -136,7 +143,7 @@ class FileStreamSource(
paths = files.map(_.path),
userSpecifiedSchema = Some(schema),
className = fileFormatClassName,
- options = sourceOptions.optionMapWithoutPath)
+ options = optionsWithPartitionBasePath)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
checkFilesExist = false)))
}