diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-07-11 18:41:36 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-07-11 18:41:36 -0700 |
commit | e50efd53f073890d789a8448f850cc219cca7708 (patch) | |
tree | 42a546926d2b6105d3a7ea28a56ea73c88a33206 | |
parent | 91a443b849e4d1ccc50a32b25fdd2bb502cf9b84 (diff) | |
download | spark-e50efd53f073890d789a8448f850cc219cca7708.tar.gz spark-e50efd53f073890d789a8448f850cc219cca7708.tar.bz2 spark-e50efd53f073890d789a8448f850cc219cca7708.zip |
[SPARK-16430][SQL][STREAMING] Fixed bug in the maxFilesPerTrigger in FileStreamSource
## What changes were proposed in this pull request?
Incorrect list of files were being allocated to a batch. This caused a file to read multiple times in the multiple batches.
## How was this patch tested?
Added unit tests
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #14143 from tdas/SPARK-16430-1.
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala | 6 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala | 35 |
2 files changed, 36 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 72b335a42e..0cfad659dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -73,8 +73,8 @@ class FileStreamSource( logTrace(s"Number of seen files = ${seenFiles.size}") if (batchFiles.nonEmpty) { maxBatchId += 1 - metadataLog.add(maxBatchId, newFiles) - logInfo(s"Max batch id increased to $maxBatchId with ${newFiles.size} new files") + metadataLog.add(maxBatchId, batchFiles) + logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files") } new LongOffset(maxBatchId) @@ -138,7 +138,7 @@ class FileStreamSource( .map { str => Try(str.toInt).toOption.filter(_ > 0).getOrElse { throw new IllegalArgumentException( - s"Invalid value '$str' for option 'maxFilesPerBatch', must be a positive integer") + s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 3d28d4f99c..47260a23c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -627,6 +627,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest { checkAnswer(df, data.map(_.toString).toDF("value")) } + def checkAllData(data: Seq[Int]): Unit = { + val schema = StructType(Seq(StructField("value", StringType))) + val df = spark.createDataFrame( + spark.sparkContext.makeRDD(memorySink.allData), schema) + checkAnswer(df, data.map(_.toString).toDF("value")) + } + /** Check how many batches have executed since the last time this check was made */ var lastBatchId = -1L def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = { @@ -636,6 +643,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } checkLastBatchData(3) // (1 and 2) should be in batch 1, (3) should be in batch 2 (last) + checkAllData(1 to 3) lastBatchId = memorySink.latestBatchId.get fileSource.withBatchingLocked { @@ -645,8 +653,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest { createFile(7) // 6 and 7 should be in the last batch } q.processAllAvailable() - checkLastBatchData(6, 7) checkNumBatchesSinceLastCheck(2) + checkLastBatchData(6, 7) + checkAllData(1 to 7) fileSource.withBatchingLocked { createFile(8) @@ -656,8 +665,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest { createFile(12) // 12 should be in the last batch } q.processAllAvailable() - checkLastBatchData(12) checkNumBatchesSinceLastCheck(3) + checkLastBatchData(12) + checkAllData(1 to 12) + + q.stop() + } + } + + test("max files per trigger - incorrect values") { + withTempDir { case src => + def testMaxFilePerTriggerValue(value: String): Unit = { + val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath) + val e = intercept[IllegalArgumentException] { + testStream(df)() + } + Seq("maxFilesPerTrigger", value, "positive integer").foreach { s => + assert(e.getMessage.contains(s)) + } + } + + testMaxFilePerTriggerValue("not-a-integer") + testMaxFilePerTriggerValue("-1") + testMaxFilePerTriggerValue("0") + testMaxFilePerTriggerValue("10.1") } } |