aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-04-23 21:53:05 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-04-23 21:53:05 -0700
commit28538596558b7f69f9d22eb0902d0e609d98be88 (patch)
tree916725e859cfa51bd7f8697d0fe1e571bcc0df64
parentba5e0b87a043e46e9599695c82d90e7572185aa5 (diff)
downloadspark-28538596558b7f69f9d22eb0902d0e609d98be88.tar.gz
spark-28538596558b7f69f9d22eb0902d0e609d98be88.tar.bz2
spark-28538596558b7f69f9d22eb0902d0e609d98be88.zip
[SPARK-14833][SQL][STREAMING][TEST] Refactor StreamTests to test for source fault-tolerance correctly.
## What changes were proposed in this pull request? Current StreamTest allows testing of a streaming Dataset generated explicitly wraps a source. This is different from the actual production code path where the source object is dynamically created through a DataSource object every time a query is started. So all the fault-tolerance testing in FileSourceSuite and FileSourceStressSuite is not really testing the actual code path as they are just reusing the FileStreamSource object. This PR fixes StreamTest and the FileSource***Suite to test this correctly. Instead of maintaining a mapping of source --> expected offset in StreamTest (which requires reuse of source object), it now maintains a mapping of source index --> offset, so that it is independent of the source object. Summary of changes - StreamTest refactored to keep track of offset by source index instead of source - AddData, AddTextData and AddParquetData updated to find the FileStreamSource object from an active query, so that it can work with sources generated when query is started. - Refactored unit tests in FileSource***Suite to test using DataFrame/Dataset generated with public, rather than reusing the same FileStreamSource. This correctly tests fault tolerance. The refactoring changed a lot of indents in FileSourceSuite, so its recommended to hide whitespace changes with this - https://github.com/apache/spark/pull/12592/files?w=1 ## How was this patch tested? Refactored unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #12592 from tdas/SPARK-14833.
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala71
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala359
2 files changed, 233 insertions, 197 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
index 242ea9cb27..c014f61679 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
@@ -67,12 +67,6 @@ import org.apache.spark.util.Utils
*/
trait StreamTest extends QueryTest with Timeouts {
- implicit class RichSource(s: Source) {
- def toDF(): DataFrame = Dataset.ofRows(sqlContext, StreamingExecutionRelation(s))
-
- def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, StreamingExecutionRelation(s))
- }
-
/** How long to wait for an active stream to catch up when checking a result. */
val streamingTimeout = 10.seconds
@@ -93,22 +87,21 @@ trait StreamTest extends QueryTest with Timeouts {
AddDataMemory(source, data)
}
- /** A trait that can be extended when testing other sources. */
+ /** A trait that can be extended when testing a source. */
trait AddData extends StreamAction {
- def source: Source
-
/**
- * Called to trigger adding the data. Should return the offset that will denote when this
- * new data has been processed.
+ * Called to adding the data to a source. It should find the source to add data to from
+ * the active query, and then return the source object the data was added, as well as the
+ * offset of added data.
*/
- def addData(): Offset
+ def addData(query: Option[StreamExecution]): (Source, Offset)
}
case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData {
override def toString: String = s"AddData to $source: ${data.mkString(",")}"
- override def addData(): Offset = {
- source.addData(data)
+ override def addData(query: Option[StreamExecution]): (Source, Offset) = {
+ (source, source.addData(data))
}
}
@@ -199,7 +192,7 @@ trait StreamTest extends QueryTest with Timeouts {
var currentPlan: LogicalPlan = stream.logicalPlan
var currentStream: StreamExecution = null
var lastStream: StreamExecution = null
- val awaiting = new mutable.HashMap[Source, Offset]()
+ val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for
val sink = new MemorySink(stream.schema)
@volatile
@@ -372,15 +365,53 @@ trait StreamTest extends QueryTest with Timeouts {
verify({ a.run(); true }, s"Assert failed: ${a.message}")
case a: AddData =>
- awaiting.put(a.source, a.addData())
+ try {
+ // Add data and get the source where it was added, and the expected offset of the
+ // added data.
+ val queryToUse = Option(currentStream).orElse(Option(lastStream))
+ val (source, offset) = a.addData(queryToUse)
+
+ def findSourceIndex(plan: LogicalPlan): Option[Int] = {
+ plan
+ .collect { case StreamingExecutionRelation(s, _) => s }
+ .zipWithIndex
+ .find(_._1 == source)
+ .map(_._2)
+ }
+
+ // Try to find the index of the source to which data was added. Either get the index
+ // from the current active query or the original input logical plan.
+ val sourceIndex =
+ queryToUse.flatMap { query =>
+ findSourceIndex(query.logicalPlan)
+ }.orElse {
+ findSourceIndex(stream.logicalPlan)
+ }.getOrElse {
+ throw new IllegalArgumentException(
+ "Could find index of the source to which data was added")
+ }
+
+ // Store the expected offset of added data to wait for it later
+ awaiting.put(sourceIndex, offset)
+ } catch {
+ case NonFatal(e) =>
+ failTest("Error adding data", e)
+ }
case CheckAnswerRows(expectedAnswer, lastOnly) =>
verify(currentStream != null, "stream not running")
-
- // Block until all data added has been processed
- awaiting.foreach { case (source, offset) =>
+ // Get the map of source index to the current source objects
+ val indexToSource = currentStream
+ .logicalPlan
+ .collect { case StreamingExecutionRelation(s, _) => s }
+ .zipWithIndex
+ .map(_.swap)
+ .toMap
+
+ // Block until all data added has been processed for all the source
+ awaiting.foreach { case (sourceIndex, offset) =>
failAfter(streamingTimeout) {
- currentStream.awaitOffset(source, offset)
+ currentStream.awaitOffset(indexToSource(sourceIndex), offset)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 45dca2fadf..8ff82a3c74 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -30,38 +30,59 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
import testImplicits._
- case class AddTextFileData(source: FileStreamSource, content: String, src: File, tmp: File)
- extends AddData {
-
- override def addData(): Offset = {
- source.withBatchingLocked {
- val file = Utils.tempFileWith(new File(tmp, "text"))
- stringToFile(file, content).renameTo(new File(src, file.getName))
- source.currentOffset
- } + 1
+ /**
+ * A subclass [[AddData]] for adding data to files. This is meant to use the
+ * [[FileStreamSource]] actually being used in the execution.
+ */
+ abstract class AddFileData extends AddData {
+ override def addData(query: Option[StreamExecution]): (Source, Offset) = {
+ require(
+ query.nonEmpty,
+ "Cannot add data when there is no query for finding the active file stream source")
+
+ val sources = query.get.logicalPlan.collect {
+ case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
+ source.asInstanceOf[FileStreamSource]
+ }
+ if (sources.isEmpty) {
+ throw new Exception(
+ "Could not find file source in the StreamExecution logical plan to add data to")
+ } else if (sources.size > 1) {
+ throw new Exception(
+ "Could not select the file source in the StreamExecution logical plan as there" +
+ "are multiple file sources:\n\t" + sources.mkString("\n\t"))
+ }
+ val source = sources.head
+ val newOffset = source.withBatchingLocked {
+ addData(source)
+ source.currentOffset + 1
+ }
+ logInfo(s"Added data to $source at offset $newOffset")
+ (source, newOffset)
}
+
+ protected def addData(source: FileStreamSource): Unit
}
- case class AddParquetFileData(
- source: FileStreamSource,
- df: DataFrame,
- src: File,
- tmp: File) extends AddData {
-
- override def addData(): Offset = {
- source.withBatchingLocked {
- AddParquetFileData.writeToFile(df, src, tmp)
- source.currentOffset
- } + 1
+ case class AddTextFileData(content: String, src: File, tmp: File)
+ extends AddFileData {
+
+ override def addData(source: FileStreamSource): Unit = {
+ val file = Utils.tempFileWith(new File(tmp, "text"))
+ stringToFile(file, content).renameTo(new File(src, file.getName))
+ }
+ }
+
+ case class AddParquetFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
+ override def addData(source: FileStreamSource): Unit = {
+ AddParquetFileData.writeToFile(data, src, tmp)
}
}
object AddParquetFileData {
- def apply(
- source: FileStreamSource,
- seq: Seq[String],
- src: File,
- tmp: File): AddParquetFileData = new AddParquetFileData(source, seq.toDS().toDF(), src, tmp)
+ def apply(seq: Seq[String], src: File, tmp: File): AddParquetFileData = {
+ AddParquetFileData(seq.toDS().toDF(), src, tmp)
+ }
def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
val file = Utils.tempFileWith(new File(tmp, "parquet"))
@@ -71,11 +92,11 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
}
/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
- def createFileStreamSource(
+ def createFileStream(
format: String,
path: String,
- schema: Option[StructType] = None): FileStreamSource = {
- val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+ schema: Option[StructType] = None): DataFrame = {
+
val reader =
if (schema.isDefined) {
sqlContext.read.format(format).schema(schema.get)
@@ -83,14 +104,18 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
sqlContext.read.format(format)
}
reader.stream(path)
- .queryExecution.analyzed
+ }
+
+ protected def getSourceFromFileStream(df: DataFrame): FileStreamSource = {
+ val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+ df.queryExecution.analyzed
.collect { case StreamingRelation(dataSource, _, _) =>
// There is only one source in our tests so just set sourceId to 0
dataSource.createSource(s"$checkpointLocation/sources/0").asInstanceOf[FileStreamSource]
}.head
}
- def withTempDirs(body: (File, File) => Unit) {
+ protected def withTempDirs(body: (File, File) => Unit) {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
try {
@@ -108,6 +133,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
import testImplicits._
+ /** Use `format` and `path` to create FileStreamSource via DataFrameReader */
+ private def createFileStreamSource(
+ format: String,
+ path: String,
+ schema: Option[StructType] = None): FileStreamSource = {
+ getSourceFromFileStream(createFileStream(format, path, schema))
+ }
+
private def createFileStreamSourceAndGetSchema(
format: Option[String],
path: Option[String],
@@ -226,104 +259,88 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("read from text files") {
- val src = Utils.createTempDir(namePrefix = "streaming.src")
- val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
-
- val textSource = createFileStreamSource("text", src.getCanonicalPath)
- val filtered = textSource.toDF().filter($"value" contains "keep")
-
- testStream(filtered)(
- AddTextFileData(textSource, "drop1\nkeep2\nkeep3", src, tmp),
- CheckAnswer("keep2", "keep3"),
- StopStream,
- AddTextFileData(textSource, "drop4\nkeep5\nkeep6", src, tmp),
- StartStream,
- CheckAnswer("keep2", "keep3", "keep5", "keep6"),
- AddTextFileData(textSource, "drop7\nkeep8\nkeep9", src, tmp),
- CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
- )
-
- Utils.deleteRecursively(src)
- Utils.deleteRecursively(tmp)
+ withTempDirs { case (src, tmp) =>
+ val textStream = createFileStream("text", src.getCanonicalPath)
+ val filtered = textStream.filter($"value" contains "keep")
+
+ testStream(filtered)(
+ AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
+ CheckAnswer("keep2", "keep3"),
+ StopStream,
+ AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
+ StartStream,
+ CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+ AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
+ )
+ }
}
test("read from json files") {
- val src = Utils.createTempDir(namePrefix = "streaming.src")
- val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
-
- val textSource = createFileStreamSource("json", src.getCanonicalPath, Some(valueSchema))
- val filtered = textSource.toDF().filter($"value" contains "keep")
-
- testStream(filtered)(
- AddTextFileData(
- textSource,
- "{'value': 'drop1'}\n{'value': 'keep2'}\n{'value': 'keep3'}",
- src,
- tmp),
- CheckAnswer("keep2", "keep3"),
- StopStream,
- AddTextFileData(
- textSource,
- "{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}",
- src,
- tmp),
- StartStream,
- CheckAnswer("keep2", "keep3", "keep5", "keep6"),
- AddTextFileData(
- textSource,
- "{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}",
- src,
- tmp),
- CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
- )
-
- Utils.deleteRecursively(src)
- Utils.deleteRecursively(tmp)
+ withTempDirs { case (src, tmp) =>
+ val fileStream = createFileStream("json", src.getCanonicalPath, Some(valueSchema))
+ val filtered = fileStream.filter($"value" contains "keep")
+
+ testStream(filtered)(
+ AddTextFileData(
+ "{'value': 'drop1'}\n{'value': 'keep2'}\n{'value': 'keep3'}",
+ src,
+ tmp),
+ CheckAnswer("keep2", "keep3"),
+ StopStream,
+ AddTextFileData(
+ "{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}",
+ src,
+ tmp),
+ StartStream,
+ CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+ AddTextFileData(
+ "{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}",
+ src,
+ tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
+ )
+ }
}
test("read from json files with inferring schema") {
- val src = Utils.createTempDir(namePrefix = "streaming.src")
- val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
-
- // Add a file so that we can infer its schema
- stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
+ withTempDirs { case (src, tmp) =>
- val textSource = createFileStreamSource("json", src.getCanonicalPath)
+ // Add a file so that we can infer its schema
+ stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
- // FileStreamSource should infer the column "c"
- val filtered = textSource.toDF().filter($"c" contains "keep")
+ val fileStream = createFileStream("json", src.getCanonicalPath)
+ assert(fileStream.schema === StructType(Seq(StructField("c", StringType))))
- testStream(filtered)(
- AddTextFileData(textSource, "{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
- CheckAnswer("keep2", "keep3", "keep5", "keep6")
- )
+ // FileStreamSource should infer the column "c"
+ val filtered = fileStream.filter($"c" contains "keep")
- Utils.deleteRecursively(src)
- Utils.deleteRecursively(tmp)
+ testStream(filtered)(
+ AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6")
+ )
+ }
}
test("reading from json files inside partitioned directory") {
- val src = {
- val base = Utils.createTempDir(namePrefix = "streaming.src")
- new File(base, "type=X")
- }
- val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
- src.mkdirs()
+ withTempDirs { case (baseSrc, tmp) =>
+ val src = new File(baseSrc, "type=X")
+ src.mkdirs()
+ // Add a file so that we can infer its schema
+ stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
- // Add a file so that we can infer its schema
- stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
-
- val textSource = createFileStreamSource("json", src.getCanonicalPath)
+ val fileStream = createFileStream("json", src.getCanonicalPath)
- // FileStreamSource should infer the column "c"
- val filtered = textSource.toDF().filter($"c" contains "keep")
+ // FileStreamSource should infer the column "c"
+ val filtered = fileStream.filter($"c" contains "keep")
- testStream(filtered)(
- AddTextFileData(textSource, "{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
- CheckAnswer("keep2", "keep3", "keep5", "keep6")
- )
+ testStream(filtered)(
+ AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6")
+ )
+ }
}
@@ -333,52 +350,47 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
// Add a file so that we can infer its schema
stringToFile(new File(src, "existing"), "{'k': 'value0'}")
- val textSource = createFileStreamSource("json", src.getCanonicalPath)
+ val fileStream = createFileStream("json", src.getCanonicalPath)
// FileStreamSource should infer the column "k"
- val text = textSource.toDF()
- assert(text.schema === StructType(Seq(StructField("k", StringType))))
+ assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
// After creating DF and before starting stream, add data with different schema
// Should not affect the inferred schema any more
stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}")
- testStream(text)(
+ testStream(fileStream)(
// Should not pick up column v in the file added before start
- AddTextFileData(textSource, "{'k': 'value2'}", src, tmp),
+ AddTextFileData("{'k': 'value2'}", src, tmp),
CheckAnswer("value0", "value1", "value2"),
// Should read data in column k, and ignore v
- AddTextFileData(textSource, "{'k': 'value3', 'v': 'new'}", src, tmp),
+ AddTextFileData("{'k': 'value3', 'v': 'new'}", src, tmp),
CheckAnswer("value0", "value1", "value2", "value3"),
// Should ignore rows that do not have the necessary k column
- AddTextFileData(textSource, "{'v': 'value4'}", src, tmp),
+ AddTextFileData("{'v': 'value4'}", src, tmp),
CheckAnswer("value0", "value1", "value2", "value3", null))
}
}
test("read from parquet files") {
- val src = Utils.createTempDir(namePrefix = "streaming.src")
- val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
-
- val fileSource = createFileStreamSource("parquet", src.getCanonicalPath, Some(valueSchema))
- val filtered = fileSource.toDF().filter($"value" contains "keep")
-
- testStream(filtered)(
- AddParquetFileData(fileSource, Seq("drop1", "keep2", "keep3"), src, tmp),
- CheckAnswer("keep2", "keep3"),
- StopStream,
- AddParquetFileData(fileSource, Seq("drop4", "keep5", "keep6"), src, tmp),
- StartStream,
- CheckAnswer("keep2", "keep3", "keep5", "keep6"),
- AddParquetFileData(fileSource, Seq("drop7", "keep8", "keep9"), src, tmp),
- CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
- )
-
- Utils.deleteRecursively(src)
- Utils.deleteRecursively(tmp)
+ withTempDirs { case (src, tmp) =>
+ val fileStream = createFileStream("parquet", src.getCanonicalPath, Some(valueSchema))
+ val filtered = fileStream.filter($"value" contains "keep")
+
+ testStream(filtered)(
+ AddParquetFileData(Seq("drop1", "keep2", "keep3"), src, tmp),
+ CheckAnswer("keep2", "keep3"),
+ StopStream,
+ AddParquetFileData(Seq("drop4", "keep5", "keep6"), src, tmp),
+ StartStream,
+ CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+ AddParquetFileData(Seq("drop7", "keep8", "keep9"), src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
+ )
+ }
}
test("read from parquet files with changing schema") {
@@ -387,69 +399,62 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
// Add a file so that we can infer its schema
AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
- val fileSource = createFileStreamSource("parquet", src.getCanonicalPath)
- val parquetData = fileSource.toDF()
+ val fileStream = createFileStream("parquet", src.getCanonicalPath)
// FileStreamSource should infer the column "k"
- assert(parquetData.schema === StructType(Seq(StructField("k", StringType))))
+ assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
// After creating DF and before starting stream, add data with different schema
// Should not affect the inferred schema any more
AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)
- testStream(parquetData)(
+ testStream(fileStream)(
// Should not pick up column v in the file added before start
- AddParquetFileData(fileSource, Seq("value2").toDF("k"), src, tmp),
+ AddParquetFileData(Seq("value2").toDF("k"), src, tmp),
CheckAnswer("value0", "value1", "value2"),
// Should read data in column k, and ignore v
- AddParquetFileData(fileSource, Seq(("value3", 1)).toDF("k", "v"), src, tmp),
+ AddParquetFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
CheckAnswer("value0", "value1", "value2", "value3"),
// Should ignore rows that do not have the necessary k column
- AddParquetFileData(fileSource, Seq("value5").toDF("v"), src, tmp),
+ AddParquetFileData(Seq("value5").toDF("v"), src, tmp),
CheckAnswer("value0", "value1", "value2", "value3", null)
)
}
}
test("file stream source without schema") {
- val src = Utils.createTempDir(namePrefix = "streaming.src")
-
- // Only "text" doesn't need a schema
- createFileStreamSource("text", src.getCanonicalPath)
+ withTempDir { src =>
+ // Only "text" doesn't need a schema
+ createFileStream("text", src.getCanonicalPath)
- // Both "json" and "parquet" require a schema if no existing file to infer
- intercept[AnalysisException] {
- createFileStreamSource("json", src.getCanonicalPath)
- }
- intercept[AnalysisException] {
- createFileStreamSource("parquet", src.getCanonicalPath)
+ // Both "json" and "parquet" require a schema if no existing file to infer
+ intercept[AnalysisException] {
+ createFileStream("json", src.getCanonicalPath)
+ }
+ intercept[AnalysisException] {
+ createFileStream("parquet", src.getCanonicalPath)
+ }
}
-
- Utils.deleteRecursively(src)
}
test("fault tolerance") {
- val src = Utils.createTempDir(namePrefix = "streaming.src")
- val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
-
- val textSource = createFileStreamSource("text", src.getCanonicalPath)
- val filtered = textSource.toDF().filter($"value" contains "keep")
-
- testStream(filtered)(
- AddTextFileData(textSource, "drop1\nkeep2\nkeep3", src, tmp),
- CheckAnswer("keep2", "keep3"),
- StopStream,
- AddTextFileData(textSource, "drop4\nkeep5\nkeep6", src, tmp),
- StartStream,
- CheckAnswer("keep2", "keep3", "keep5", "keep6"),
- AddTextFileData(textSource, "drop7\nkeep8\nkeep9", src, tmp),
- CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
- )
-
- Utils.deleteRecursively(src)
- Utils.deleteRecursively(tmp)
+ withTempDirs { case (src, tmp) =>
+ val fileStream = createFileStream("text", src.getCanonicalPath)
+ val filtered = fileStream.filter($"value" contains "keep")
+
+ testStream(filtered)(
+ AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
+ CheckAnswer("keep2", "keep3"),
+ StopStream,
+ AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
+ StartStream,
+ CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+ AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
+ )
+ }
}
}
@@ -461,10 +466,10 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQ
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
- val textSource = createFileStreamSource("text", src.getCanonicalPath)
- val ds = textSource.toDS[String]().map(_.toInt + 1)
+ val fileStream = createFileStream("text", src.getCanonicalPath)
+ val ds = fileStream.as[String].map(_.toInt + 1)
runStressTest(ds, data => {
- AddTextFileData(textSource, data.mkString("\n"), src, tmp)
+ AddTextFileData(data.mkString("\n"), src, tmp)
})
Utils.deleteRecursively(src)