diff options
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala | 3 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala | 24 |
2 files changed, 26 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 23a7071086..0dfe7dba1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -186,7 +186,8 @@ case class DataSource( userSpecifiedSchema = Some(dataSchema), className = className, options = - new CaseInsensitiveMap(options.filterKeys(_ != "path"))).resolveRelation())) + new CaseInsensitiveMap( + options.filterKeys(_ != "path") + ("basePath" -> path))).resolveRelation())) } new FileStreamSource( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 73d1b1b1d5..64cddf0dee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -281,6 +281,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { Utils.deleteRecursively(tmp) } + + test("reading from json files inside partitioned directory") { + val src = { + val base = Utils.createTempDir(namePrefix = "streaming.src") + new File(base, "type=X") + } + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") + src.mkdirs() + + + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") + + val textSource = createFileStreamSource("json", src.getCanonicalPath) + + // FileStreamSource should infer the column "c" + val filtered = textSource.toDF().filter($"c" contains "keep") + + testStream(filtered)( + AddTextFileData(textSource, "{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6") + ) + } + test("read from parquet files") { val src = Utils.createTempDir(namePrefix = "streaming.src") val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") |