aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-07-11 18:41:36 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-07-11 18:41:36 -0700
commite50efd53f073890d789a8448f850cc219cca7708 (patch)
tree42a546926d2b6105d3a7ea28a56ea73c88a33206 /sql/core/src/test
parent91a443b849e4d1ccc50a32b25fdd2bb502cf9b84 (diff)
downloadspark-e50efd53f073890d789a8448f850cc219cca7708.tar.gz
spark-e50efd53f073890d789a8448f850cc219cca7708.tar.bz2
spark-e50efd53f073890d789a8448f850cc219cca7708.zip
[SPARK-16430][SQL][STREAMING] Fixed bug in the maxFilesPerTrigger in FileStreamSource
## What changes were proposed in this pull request? Incorrect list of files were being allocated to a batch. This caused a file to read multiple times in the multiple batches. ## How was this patch tested? Added unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #14143 from tdas/SPARK-16430-1.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala35
1 files changed, 33 insertions, 2 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 3d28d4f99c..47260a23c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -627,6 +627,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
checkAnswer(df, data.map(_.toString).toDF("value"))
}
+ def checkAllData(data: Seq[Int]): Unit = {
+ val schema = StructType(Seq(StructField("value", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.makeRDD(memorySink.allData), schema)
+ checkAnswer(df, data.map(_.toString).toDF("value"))
+ }
+
/** Check how many batches have executed since the last time this check was made */
var lastBatchId = -1L
def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
@@ -636,6 +643,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
checkLastBatchData(3) // (1 and 2) should be in batch 1, (3) should be in batch 2 (last)
+ checkAllData(1 to 3)
lastBatchId = memorySink.latestBatchId.get
fileSource.withBatchingLocked {
@@ -645,8 +653,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
createFile(7) // 6 and 7 should be in the last batch
}
q.processAllAvailable()
- checkLastBatchData(6, 7)
checkNumBatchesSinceLastCheck(2)
+ checkLastBatchData(6, 7)
+ checkAllData(1 to 7)
fileSource.withBatchingLocked {
createFile(8)
@@ -656,8 +665,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
createFile(12) // 12 should be in the last batch
}
q.processAllAvailable()
- checkLastBatchData(12)
checkNumBatchesSinceLastCheck(3)
+ checkLastBatchData(12)
+ checkAllData(1 to 12)
+
+ q.stop()
+ }
+ }
+
+ test("max files per trigger - incorrect values") {
+ withTempDir { case src =>
+ def testMaxFilePerTriggerValue(value: String): Unit = {
+ val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
+ val e = intercept[IllegalArgumentException] {
+ testStream(df)()
+ }
+ Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
+ assert(e.getMessage.contains(s))
+ }
+ }
+
+ testMaxFilePerTriggerValue("not-a-integer")
+ testMaxFilePerTriggerValue("-1")
+ testMaxFilePerTriggerValue("0")
+ testMaxFilePerTriggerValue("10.1")
}
}