From 28538596558b7f69f9d22eb0902d0e609d98be88 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sat, 23 Apr 2016 21:53:05 -0700 Subject: [SPARK-14833][SQL][STREAMING][TEST] Refactor StreamTests to test for source fault-tolerance correctly. ## What changes were proposed in this pull request? Current StreamTest allows testing of a streaming Dataset generated explicitly wraps a source. This is different from the actual production code path where the source object is dynamically created through a DataSource object every time a query is started. So all the fault-tolerance testing in FileSourceSuite and FileSourceStressSuite is not really testing the actual code path as they are just reusing the FileStreamSource object. This PR fixes StreamTest and the FileSource***Suite to test this correctly. Instead of maintaining a mapping of source --> expected offset in StreamTest (which requires reuse of source object), it now maintains a mapping of source index --> offset, so that it is independent of the source object. Summary of changes - StreamTest refactored to keep track of offset by source index instead of source - AddData, AddTextData and AddParquetData updated to find the FileStreamSource object from an active query, so that it can work with sources generated when query is started. - Refactored unit tests in FileSource***Suite to test using DataFrame/Dataset generated with public, rather than reusing the same FileStreamSource. This correctly tests fault tolerance. The refactoring changed a lot of indents in FileSourceSuite, so its recommended to hide whitespace changes with this - https://github.com/apache/spark/pull/12592/files?w=1 ## How was this patch tested? Refactored unit tests. Author: Tathagata Das Closes #12592 from tdas/SPARK-14833. --- .../scala/org/apache/spark/sql/StreamTest.scala | 71 ++-- .../sql/streaming/FileStreamSourceSuite.scala | 359 +++++++++++---------- 2 files changed, 233 insertions(+), 197 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 242ea9cb27..c014f61679 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -67,12 +67,6 @@ import org.apache.spark.util.Utils */ trait StreamTest extends QueryTest with Timeouts { - implicit class RichSource(s: Source) { - def toDF(): DataFrame = Dataset.ofRows(sqlContext, StreamingExecutionRelation(s)) - - def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, StreamingExecutionRelation(s)) - } - /** How long to wait for an active stream to catch up when checking a result. */ val streamingTimeout = 10.seconds @@ -93,22 +87,21 @@ trait StreamTest extends QueryTest with Timeouts { AddDataMemory(source, data) } - /** A trait that can be extended when testing other sources. */ + /** A trait that can be extended when testing a source. */ trait AddData extends StreamAction { - def source: Source - /** - * Called to trigger adding the data. Should return the offset that will denote when this - * new data has been processed. + * Called to adding the data to a source. It should find the source to add data to from + * the active query, and then return the source object the data was added, as well as the + * offset of added data. */ - def addData(): Offset + def addData(query: Option[StreamExecution]): (Source, Offset) } case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData { override def toString: String = s"AddData to $source: ${data.mkString(",")}" - override def addData(): Offset = { - source.addData(data) + override def addData(query: Option[StreamExecution]): (Source, Offset) = { + (source, source.addData(data)) } } @@ -199,7 +192,7 @@ trait StreamTest extends QueryTest with Timeouts { var currentPlan: LogicalPlan = stream.logicalPlan var currentStream: StreamExecution = null var lastStream: StreamExecution = null - val awaiting = new mutable.HashMap[Source, Offset]() + val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for val sink = new MemorySink(stream.schema) @volatile @@ -372,15 +365,53 @@ trait StreamTest extends QueryTest with Timeouts { verify({ a.run(); true }, s"Assert failed: ${a.message}") case a: AddData => - awaiting.put(a.source, a.addData()) + try { + // Add data and get the source where it was added, and the expected offset of the + // added data. + val queryToUse = Option(currentStream).orElse(Option(lastStream)) + val (source, offset) = a.addData(queryToUse) + + def findSourceIndex(plan: LogicalPlan): Option[Int] = { + plan + .collect { case StreamingExecutionRelation(s, _) => s } + .zipWithIndex + .find(_._1 == source) + .map(_._2) + } + + // Try to find the index of the source to which data was added. Either get the index + // from the current active query or the original input logical plan. + val sourceIndex = + queryToUse.flatMap { query => + findSourceIndex(query.logicalPlan) + }.orElse { + findSourceIndex(stream.logicalPlan) + }.getOrElse { + throw new IllegalArgumentException( + "Could find index of the source to which data was added") + } + + // Store the expected offset of added data to wait for it later + awaiting.put(sourceIndex, offset) + } catch { + case NonFatal(e) => + failTest("Error adding data", e) + } case CheckAnswerRows(expectedAnswer, lastOnly) => verify(currentStream != null, "stream not running") - - // Block until all data added has been processed - awaiting.foreach { case (source, offset) => + // Get the map of source index to the current source objects + val indexToSource = currentStream + .logicalPlan + .collect { case StreamingExecutionRelation(s, _) => s } + .zipWithIndex + .map(_.swap) + .toMap + + // Block until all data added has been processed for all the source + awaiting.foreach { case (sourceIndex, offset) => failAfter(streamingTimeout) { - currentStream.awaitOffset(source, offset) + currentStream.awaitOffset(indexToSource(sourceIndex), offset) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 45dca2fadf..8ff82a3c74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -30,38 +30,59 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { import testImplicits._ - case class AddTextFileData(source: FileStreamSource, content: String, src: File, tmp: File) - extends AddData { - - override def addData(): Offset = { - source.withBatchingLocked { - val file = Utils.tempFileWith(new File(tmp, "text")) - stringToFile(file, content).renameTo(new File(src, file.getName)) - source.currentOffset - } + 1 + /** + * A subclass [[AddData]] for adding data to files. This is meant to use the + * [[FileStreamSource]] actually being used in the execution. + */ + abstract class AddFileData extends AddData { + override def addData(query: Option[StreamExecution]): (Source, Offset) = { + require( + query.nonEmpty, + "Cannot add data when there is no query for finding the active file stream source") + + val sources = query.get.logicalPlan.collect { + case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] => + source.asInstanceOf[FileStreamSource] + } + if (sources.isEmpty) { + throw new Exception( + "Could not find file source in the StreamExecution logical plan to add data to") + } else if (sources.size > 1) { + throw new Exception( + "Could not select the file source in the StreamExecution logical plan as there" + + "are multiple file sources:\n\t" + sources.mkString("\n\t")) + } + val source = sources.head + val newOffset = source.withBatchingLocked { + addData(source) + source.currentOffset + 1 + } + logInfo(s"Added data to $source at offset $newOffset") + (source, newOffset) } + + protected def addData(source: FileStreamSource): Unit } - case class AddParquetFileData( - source: FileStreamSource, - df: DataFrame, - src: File, - tmp: File) extends AddData { - - override def addData(): Offset = { - source.withBatchingLocked { - AddParquetFileData.writeToFile(df, src, tmp) - source.currentOffset - } + 1 + case class AddTextFileData(content: String, src: File, tmp: File) + extends AddFileData { + + override def addData(source: FileStreamSource): Unit = { + val file = Utils.tempFileWith(new File(tmp, "text")) + stringToFile(file, content).renameTo(new File(src, file.getName)) + } + } + + case class AddParquetFileData(data: DataFrame, src: File, tmp: File) extends AddFileData { + override def addData(source: FileStreamSource): Unit = { + AddParquetFileData.writeToFile(data, src, tmp) } } object AddParquetFileData { - def apply( - source: FileStreamSource, - seq: Seq[String], - src: File, - tmp: File): AddParquetFileData = new AddParquetFileData(source, seq.toDS().toDF(), src, tmp) + def apply(seq: Seq[String], src: File, tmp: File): AddParquetFileData = { + AddParquetFileData(seq.toDS().toDF(), src, tmp) + } def writeToFile(df: DataFrame, src: File, tmp: File): Unit = { val file = Utils.tempFileWith(new File(tmp, "parquet")) @@ -71,11 +92,11 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { } /** Use `format` and `path` to create FileStreamSource via DataFrameReader */ - def createFileStreamSource( + def createFileStream( format: String, path: String, - schema: Option[StructType] = None): FileStreamSource = { - val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + schema: Option[StructType] = None): DataFrame = { + val reader = if (schema.isDefined) { sqlContext.read.format(format).schema(schema.get) @@ -83,14 +104,18 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { sqlContext.read.format(format) } reader.stream(path) - .queryExecution.analyzed + } + + protected def getSourceFromFileStream(df: DataFrame): FileStreamSource = { + val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + df.queryExecution.analyzed .collect { case StreamingRelation(dataSource, _, _) => // There is only one source in our tests so just set sourceId to 0 dataSource.createSource(s"$checkpointLocation/sources/0").asInstanceOf[FileStreamSource] }.head } - def withTempDirs(body: (File, File) => Unit) { + protected def withTempDirs(body: (File, File) => Unit) { val src = Utils.createTempDir(namePrefix = "streaming.src") val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") try { @@ -108,6 +133,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { import testImplicits._ + /** Use `format` and `path` to create FileStreamSource via DataFrameReader */ + private def createFileStreamSource( + format: String, + path: String, + schema: Option[StructType] = None): FileStreamSource = { + getSourceFromFileStream(createFileStream(format, path, schema)) + } + private def createFileStreamSourceAndGetSchema( format: Option[String], path: Option[String], @@ -226,104 +259,88 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("read from text files") { - val src = Utils.createTempDir(namePrefix = "streaming.src") - val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") - - val textSource = createFileStreamSource("text", src.getCanonicalPath) - val filtered = textSource.toDF().filter($"value" contains "keep") - - testStream(filtered)( - AddTextFileData(textSource, "drop1\nkeep2\nkeep3", src, tmp), - CheckAnswer("keep2", "keep3"), - StopStream, - AddTextFileData(textSource, "drop4\nkeep5\nkeep6", src, tmp), - StartStream, - CheckAnswer("keep2", "keep3", "keep5", "keep6"), - AddTextFileData(textSource, "drop7\nkeep8\nkeep9", src, tmp), - CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") - ) - - Utils.deleteRecursively(src) - Utils.deleteRecursively(tmp) + withTempDirs { case (src, tmp) => + val textStream = createFileStream("text", src.getCanonicalPath) + val filtered = textStream.filter($"value" contains "keep") + + testStream(filtered)( + AddTextFileData("drop1\nkeep2\nkeep3", src, tmp), + CheckAnswer("keep2", "keep3"), + StopStream, + AddTextFileData("drop4\nkeep5\nkeep6", src, tmp), + StartStream, + CheckAnswer("keep2", "keep3", "keep5", "keep6"), + AddTextFileData("drop7\nkeep8\nkeep9", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") + ) + } } test("read from json files") { - val src = Utils.createTempDir(namePrefix = "streaming.src") - val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") - - val textSource = createFileStreamSource("json", src.getCanonicalPath, Some(valueSchema)) - val filtered = textSource.toDF().filter($"value" contains "keep") - - testStream(filtered)( - AddTextFileData( - textSource, - "{'value': 'drop1'}\n{'value': 'keep2'}\n{'value': 'keep3'}", - src, - tmp), - CheckAnswer("keep2", "keep3"), - StopStream, - AddTextFileData( - textSource, - "{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}", - src, - tmp), - StartStream, - CheckAnswer("keep2", "keep3", "keep5", "keep6"), - AddTextFileData( - textSource, - "{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}", - src, - tmp), - CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") - ) - - Utils.deleteRecursively(src) - Utils.deleteRecursively(tmp) + withTempDirs { case (src, tmp) => + val fileStream = createFileStream("json", src.getCanonicalPath, Some(valueSchema)) + val filtered = fileStream.filter($"value" contains "keep") + + testStream(filtered)( + AddTextFileData( + "{'value': 'drop1'}\n{'value': 'keep2'}\n{'value': 'keep3'}", + src, + tmp), + CheckAnswer("keep2", "keep3"), + StopStream, + AddTextFileData( + "{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}", + src, + tmp), + StartStream, + CheckAnswer("keep2", "keep3", "keep5", "keep6"), + AddTextFileData( + "{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}", + src, + tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") + ) + } } test("read from json files with inferring schema") { - val src = Utils.createTempDir(namePrefix = "streaming.src") - val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") - - // Add a file so that we can infer its schema - stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") + withTempDirs { case (src, tmp) => - val textSource = createFileStreamSource("json", src.getCanonicalPath) + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") - // FileStreamSource should infer the column "c" - val filtered = textSource.toDF().filter($"c" contains "keep") + val fileStream = createFileStream("json", src.getCanonicalPath) + assert(fileStream.schema === StructType(Seq(StructField("c", StringType)))) - testStream(filtered)( - AddTextFileData(textSource, "{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), - CheckAnswer("keep2", "keep3", "keep5", "keep6") - ) + // FileStreamSource should infer the column "c" + val filtered = fileStream.filter($"c" contains "keep") - Utils.deleteRecursively(src) - Utils.deleteRecursively(tmp) + testStream(filtered)( + AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6") + ) + } } test("reading from json files inside partitioned directory") { - val src = { - val base = Utils.createTempDir(namePrefix = "streaming.src") - new File(base, "type=X") - } - val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") - src.mkdirs() + withTempDirs { case (baseSrc, tmp) => + val src = new File(baseSrc, "type=X") + src.mkdirs() + // Add a file so that we can infer its schema + stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") - // Add a file so that we can infer its schema - stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") - - val textSource = createFileStreamSource("json", src.getCanonicalPath) + val fileStream = createFileStream("json", src.getCanonicalPath) - // FileStreamSource should infer the column "c" - val filtered = textSource.toDF().filter($"c" contains "keep") + // FileStreamSource should infer the column "c" + val filtered = fileStream.filter($"c" contains "keep") - testStream(filtered)( - AddTextFileData(textSource, "{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), - CheckAnswer("keep2", "keep3", "keep5", "keep6") - ) + testStream(filtered)( + AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6") + ) + } } @@ -333,52 +350,47 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { // Add a file so that we can infer its schema stringToFile(new File(src, "existing"), "{'k': 'value0'}") - val textSource = createFileStreamSource("json", src.getCanonicalPath) + val fileStream = createFileStream("json", src.getCanonicalPath) // FileStreamSource should infer the column "k" - val text = textSource.toDF() - assert(text.schema === StructType(Seq(StructField("k", StringType)))) + assert(fileStream.schema === StructType(Seq(StructField("k", StringType)))) // After creating DF and before starting stream, add data with different schema // Should not affect the inferred schema any more stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}") - testStream(text)( + testStream(fileStream)( // Should not pick up column v in the file added before start - AddTextFileData(textSource, "{'k': 'value2'}", src, tmp), + AddTextFileData("{'k': 'value2'}", src, tmp), CheckAnswer("value0", "value1", "value2"), // Should read data in column k, and ignore v - AddTextFileData(textSource, "{'k': 'value3', 'v': 'new'}", src, tmp), + AddTextFileData("{'k': 'value3', 'v': 'new'}", src, tmp), CheckAnswer("value0", "value1", "value2", "value3"), // Should ignore rows that do not have the necessary k column - AddTextFileData(textSource, "{'v': 'value4'}", src, tmp), + AddTextFileData("{'v': 'value4'}", src, tmp), CheckAnswer("value0", "value1", "value2", "value3", null)) } } test("read from parquet files") { - val src = Utils.createTempDir(namePrefix = "streaming.src") - val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") - - val fileSource = createFileStreamSource("parquet", src.getCanonicalPath, Some(valueSchema)) - val filtered = fileSource.toDF().filter($"value" contains "keep") - - testStream(filtered)( - AddParquetFileData(fileSource, Seq("drop1", "keep2", "keep3"), src, tmp), - CheckAnswer("keep2", "keep3"), - StopStream, - AddParquetFileData(fileSource, Seq("drop4", "keep5", "keep6"), src, tmp), - StartStream, - CheckAnswer("keep2", "keep3", "keep5", "keep6"), - AddParquetFileData(fileSource, Seq("drop7", "keep8", "keep9"), src, tmp), - CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") - ) - - Utils.deleteRecursively(src) - Utils.deleteRecursively(tmp) + withTempDirs { case (src, tmp) => + val fileStream = createFileStream("parquet", src.getCanonicalPath, Some(valueSchema)) + val filtered = fileStream.filter($"value" contains "keep") + + testStream(filtered)( + AddParquetFileData(Seq("drop1", "keep2", "keep3"), src, tmp), + CheckAnswer("keep2", "keep3"), + StopStream, + AddParquetFileData(Seq("drop4", "keep5", "keep6"), src, tmp), + StartStream, + CheckAnswer("keep2", "keep3", "keep5", "keep6"), + AddParquetFileData(Seq("drop7", "keep8", "keep9"), src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") + ) + } } test("read from parquet files with changing schema") { @@ -387,69 +399,62 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { // Add a file so that we can infer its schema AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp) - val fileSource = createFileStreamSource("parquet", src.getCanonicalPath) - val parquetData = fileSource.toDF() + val fileStream = createFileStream("parquet", src.getCanonicalPath) // FileStreamSource should infer the column "k" - assert(parquetData.schema === StructType(Seq(StructField("k", StringType)))) + assert(fileStream.schema === StructType(Seq(StructField("k", StringType)))) // After creating DF and before starting stream, add data with different schema // Should not affect the inferred schema any more AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp) - testStream(parquetData)( + testStream(fileStream)( // Should not pick up column v in the file added before start - AddParquetFileData(fileSource, Seq("value2").toDF("k"), src, tmp), + AddParquetFileData(Seq("value2").toDF("k"), src, tmp), CheckAnswer("value0", "value1", "value2"), // Should read data in column k, and ignore v - AddParquetFileData(fileSource, Seq(("value3", 1)).toDF("k", "v"), src, tmp), + AddParquetFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp), CheckAnswer("value0", "value1", "value2", "value3"), // Should ignore rows that do not have the necessary k column - AddParquetFileData(fileSource, Seq("value5").toDF("v"), src, tmp), + AddParquetFileData(Seq("value5").toDF("v"), src, tmp), CheckAnswer("value0", "value1", "value2", "value3", null) ) } } test("file stream source without schema") { - val src = Utils.createTempDir(namePrefix = "streaming.src") - - // Only "text" doesn't need a schema - createFileStreamSource("text", src.getCanonicalPath) + withTempDir { src => + // Only "text" doesn't need a schema + createFileStream("text", src.getCanonicalPath) - // Both "json" and "parquet" require a schema if no existing file to infer - intercept[AnalysisException] { - createFileStreamSource("json", src.getCanonicalPath) - } - intercept[AnalysisException] { - createFileStreamSource("parquet", src.getCanonicalPath) + // Both "json" and "parquet" require a schema if no existing file to infer + intercept[AnalysisException] { + createFileStream("json", src.getCanonicalPath) + } + intercept[AnalysisException] { + createFileStream("parquet", src.getCanonicalPath) + } } - - Utils.deleteRecursively(src) } test("fault tolerance") { - val src = Utils.createTempDir(namePrefix = "streaming.src") - val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") - - val textSource = createFileStreamSource("text", src.getCanonicalPath) - val filtered = textSource.toDF().filter($"value" contains "keep") - - testStream(filtered)( - AddTextFileData(textSource, "drop1\nkeep2\nkeep3", src, tmp), - CheckAnswer("keep2", "keep3"), - StopStream, - AddTextFileData(textSource, "drop4\nkeep5\nkeep6", src, tmp), - StartStream, - CheckAnswer("keep2", "keep3", "keep5", "keep6"), - AddTextFileData(textSource, "drop7\nkeep8\nkeep9", src, tmp), - CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") - ) - - Utils.deleteRecursively(src) - Utils.deleteRecursively(tmp) + withTempDirs { case (src, tmp) => + val fileStream = createFileStream("text", src.getCanonicalPath) + val filtered = fileStream.filter($"value" contains "keep") + + testStream(filtered)( + AddTextFileData("drop1\nkeep2\nkeep3", src, tmp), + CheckAnswer("keep2", "keep3"), + StopStream, + AddTextFileData("drop4\nkeep5\nkeep6", src, tmp), + StartStream, + CheckAnswer("keep2", "keep3", "keep5", "keep6"), + AddTextFileData("drop7\nkeep8\nkeep9", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") + ) + } } } @@ -461,10 +466,10 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQ val src = Utils.createTempDir(namePrefix = "streaming.src") val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") - val textSource = createFileStreamSource("text", src.getCanonicalPath) - val ds = textSource.toDS[String]().map(_.toInt + 1) + val fileStream = createFileStream("text", src.getCanonicalPath) + val ds = fileStream.as[String].map(_.toInt + 1) runStressTest(ds, data => { - AddTextFileData(textSource, data.mkString("\n"), src, tmp) + AddTextFileData(data.mkString("\n"), src, tmp) }) Utils.deleteRecursively(src) -- cgit v1.2.3