aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-07-11 18:41:36 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-07-11 18:41:36 -0700
commite50efd53f073890d789a8448f850cc219cca7708 (patch)
tree42a546926d2b6105d3a7ea28a56ea73c88a33206
parent91a443b849e4d1ccc50a32b25fdd2bb502cf9b84 (diff)
downloadspark-e50efd53f073890d789a8448f850cc219cca7708.tar.gz
spark-e50efd53f073890d789a8448f850cc219cca7708.tar.bz2
spark-e50efd53f073890d789a8448f850cc219cca7708.zip
[SPARK-16430][SQL][STREAMING] Fixed bug in the maxFilesPerTrigger in FileStreamSource
## What changes were proposed in this pull request? Incorrect list of files were being allocated to a batch. This caused a file to read multiple times in the multiple batches. ## How was this patch tested? Added unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #14143 from tdas/SPARK-16430-1.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala35
2 files changed, 36 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 72b335a42e..0cfad659dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -73,8 +73,8 @@ class FileStreamSource(
logTrace(s"Number of seen files = ${seenFiles.size}")
if (batchFiles.nonEmpty) {
maxBatchId += 1
- metadataLog.add(maxBatchId, newFiles)
- logInfo(s"Max batch id increased to $maxBatchId with ${newFiles.size} new files")
+ metadataLog.add(maxBatchId, batchFiles)
+ logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}
new LongOffset(maxBatchId)
@@ -138,7 +138,7 @@ class FileStreamSource(
.map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw new IllegalArgumentException(
- s"Invalid value '$str' for option 'maxFilesPerBatch', must be a positive integer")
+ s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer")
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 3d28d4f99c..47260a23c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -627,6 +627,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
checkAnswer(df, data.map(_.toString).toDF("value"))
}
+ def checkAllData(data: Seq[Int]): Unit = {
+ val schema = StructType(Seq(StructField("value", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.makeRDD(memorySink.allData), schema)
+ checkAnswer(df, data.map(_.toString).toDF("value"))
+ }
+
/** Check how many batches have executed since the last time this check was made */
var lastBatchId = -1L
def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
@@ -636,6 +643,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
checkLastBatchData(3) // (1 and 2) should be in batch 1, (3) should be in batch 2 (last)
+ checkAllData(1 to 3)
lastBatchId = memorySink.latestBatchId.get
fileSource.withBatchingLocked {
@@ -645,8 +653,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
createFile(7) // 6 and 7 should be in the last batch
}
q.processAllAvailable()
- checkLastBatchData(6, 7)
checkNumBatchesSinceLastCheck(2)
+ checkLastBatchData(6, 7)
+ checkAllData(1 to 7)
fileSource.withBatchingLocked {
createFile(8)
@@ -656,8 +665,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
createFile(12) // 12 should be in the last batch
}
q.processAllAvailable()
- checkLastBatchData(12)
checkNumBatchesSinceLastCheck(3)
+ checkLastBatchData(12)
+ checkAllData(1 to 12)
+
+ q.stop()
+ }
+ }
+
+ test("max files per trigger - incorrect values") {
+ withTempDir { case src =>
+ def testMaxFilePerTriggerValue(value: String): Unit = {
+ val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
+ val e = intercept[IllegalArgumentException] {
+ testStream(df)()
+ }
+ Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
+ assert(e.getMessage.contains(s))
+ }
+ }
+
+ testMaxFilePerTriggerValue("not-a-integer")
+ testMaxFilePerTriggerValue("-1")
+ testMaxFilePerTriggerValue("0")
+ testMaxFilePerTriggerValue("10.1")
}
}