aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala24
2 files changed, 26 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 23a7071086..0dfe7dba1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -186,7 +186,8 @@ case class DataSource(
userSpecifiedSchema = Some(dataSchema),
className = className,
options =
- new CaseInsensitiveMap(options.filterKeys(_ != "path"))).resolveRelation()))
+ new CaseInsensitiveMap(
+ options.filterKeys(_ != "path") + ("basePath" -> path))).resolveRelation()))
}
new FileStreamSource(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 73d1b1b1d5..64cddf0dee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -281,6 +281,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
Utils.deleteRecursively(tmp)
}
+
+ test("reading from json files inside partitioned directory") {
+ val src = {
+ val base = Utils.createTempDir(namePrefix = "streaming.src")
+ new File(base, "type=X")
+ }
+ val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
+ src.mkdirs()
+
+
+ // Add a file so that we can infer its schema
+ stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
+
+ val textSource = createFileStreamSource("json", src.getCanonicalPath)
+
+ // FileStreamSource should infer the column "c"
+ val filtered = textSource.toDF().filter($"c" contains "keep")
+
+ testStream(filtered)(
+ AddTextFileData(textSource, "{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6")
+ )
+ }
+
test("read from parquet files") {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")