diff options
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala | 40 |
1 files changed, 21 insertions, 19 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 89de15acf5..73d1b1b1d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -63,6 +63,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { format: String, path: String, schema: Option[StructType] = None): FileStreamSource = { + val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath val reader = if (schema.isDefined) { sqlContext.read.format(format).schema(schema.get) @@ -71,8 +72,10 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { } reader.stream(path) .queryExecution.analyzed - .collect { case StreamingRelation(s: FileStreamSource, _) => s } - .head + .collect { case StreamingRelation(dataSource, _, _) => + // There is only one source in our tests so just set sourceId to 0 + dataSource.createSource(s"$checkpointLocation/sources/0").asInstanceOf[FileStreamSource] + }.head } val valueSchema = new StructType().add("value", StringType) @@ -96,9 +99,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { reader.stream() } df.queryExecution.analyzed - .collect { case StreamingRelation(s: FileStreamSource, _) => s } - .head - .schema + .collect { case StreamingRelation(dataSource, _, _) => + dataSource.sourceSchema() + }.head._2 } test("FileStreamSource schema: no path") { @@ -202,8 +205,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("read from text files") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val textSource = createFileStreamSource("text", src.getCanonicalPath) val filtered = textSource.toDF().filter($"value" contains "keep") @@ -224,8 +227,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("read from json files") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val textSource = createFileStreamSource("json", src.getCanonicalPath, Some(valueSchema)) val filtered = textSource.toDF().filter($"value" contains "keep") @@ -258,8 +261,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("read from json files with inferring schema") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") // Add a file so that we can infer its schema stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") @@ -279,8 +282,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("read from parquet files") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val fileSource = createFileStreamSource("parquet", src.getCanonicalPath, Some(valueSchema)) val filtered = fileSource.toDF().filter($"value" contains "keep") @@ -301,7 +304,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("file stream source without schema") { - val src = Utils.createTempDir("streaming.src") + val src = Utils.createTempDir(namePrefix = "streaming.src") // Only "text" doesn't need a schema createFileStreamSource("text", src.getCanonicalPath) @@ -318,8 +321,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("fault tolerance") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val textSource = createFileStreamSource("text", src.getCanonicalPath) val filtered = textSource.toDF().filter($"value" contains "keep") @@ -338,7 +341,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { Utils.deleteRecursively(src) Utils.deleteRecursively(tmp) } - } class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQLContext { @@ -346,8 +348,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQ import testImplicits._ test("file source stress test") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val textSource = createFileStreamSource("text", src.getCanonicalPath) val ds = textSource.toDS[String]().map(_.toInt + 1) |