diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-04-20 12:22:51 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-04-20 12:22:51 -0700 |
commit | cb8ea9e1f34b9af287b3d10e47f24de4307c63ba (patch) | |
tree | 2435b823fe87310d868563268af9330310f157bd /sql | |
parent | acc7e592c4ee5b4a6f42945329fc289fd11e1793 (diff) | |
download | spark-cb8ea9e1f34b9af287b3d10e47f24de4307c63ba.tar.gz spark-cb8ea9e1f34b9af287b3d10e47f24de4307c63ba.tar.bz2 spark-cb8ea9e1f34b9af287b3d10e47f24de4307c63ba.zip |
[SPARK-14741][SQL] Fixed error in reading json file stream inside a partitioned directory
## What changes were proposed in this pull request?
Consider the following directory structure
dir/col=X/some-files
If we create a text format streaming dataframe on `dir/col=X/` then it should not consider as partitioning in columns. Even though the streaming dataframe does not do so, the generated batch dataframes pick up col as a partitioning columns, causing mismatch streaming source schema and generated df schema. This leads to runtime failure:
```
18:55:11.262 ERROR org.apache.spark.sql.execution.streaming.StreamExecution: Query query-0 terminated with error
java.lang.AssertionError: assertion failed: Invalid batch: c#2 != c#7,type#8
```
The reason is that the partition inferring code has no idea of a base path, above which it should not search of partitions. This PR makes sure that the batch DF is generated with the basePath set as the original path on which the file stream source is defined.
## How was this patch tested?
New unit test
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #12517 from tdas/SPARK-14741.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala | 3 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala | 24 |
2 files changed, 26 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 23a7071086..0dfe7dba1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -186,7 +186,8 @@ case class DataSource( userSpecifiedSchema = Some(dataSchema), className = className, options = - new CaseInsensitiveMap(options.filterKeys(_ != "path"))).resolveRelation())) + new CaseInsensitiveMap( + options.filterKeys(_ != "path") + ("basePath" -> path))).resolveRelation())) } new FileStreamSource( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 73d1b1b1d5..64cddf0dee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -281,6 +281,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { Utils.deleteRecursively(tmp) } + + test("reading from json files inside partitioned directory") { + val src = { + val base = Utils.createTempDir(namePrefix = "streaming.src") + new File(base, "type=X") + } + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") + src.mkdirs() + + + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") + + val textSource = createFileStreamSource("json", src.getCanonicalPath) + + // FileStreamSource should infer the column "c" + val filtered = textSource.toDF().filter($"c" contains "keep") + + testStream(filtered)( + AddTextFileData(textSource, "{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6") + ) + } + test("read from parquet files") { val src = Utils.createTempDir(namePrefix = "streaming.src") val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") |