aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala35
2 files changed, 36 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 72b335a42e..0cfad659dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -73,8 +73,8 @@ class FileStreamSource(
logTrace(s"Number of seen files = ${seenFiles.size}")
if (batchFiles.nonEmpty) {
maxBatchId += 1
- metadataLog.add(maxBatchId, newFiles)
- logInfo(s"Max batch id increased to $maxBatchId with ${newFiles.size} new files")
+ metadataLog.add(maxBatchId, batchFiles)
+ logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}
new LongOffset(maxBatchId)
@@ -138,7 +138,7 @@ class FileStreamSource(
.map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw new IllegalArgumentException(
- s"Invalid value '$str' for option 'maxFilesPerBatch', must be a positive integer")
+ s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer")
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 3d28d4f99c..47260a23c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -627,6 +627,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
checkAnswer(df, data.map(_.toString).toDF("value"))
}
+ def checkAllData(data: Seq[Int]): Unit = {
+ val schema = StructType(Seq(StructField("value", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.makeRDD(memorySink.allData), schema)
+ checkAnswer(df, data.map(_.toString).toDF("value"))
+ }
+
/** Check how many batches have executed since the last time this check was made */
var lastBatchId = -1L
def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
@@ -636,6 +643,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
checkLastBatchData(3) // (1 and 2) should be in batch 1, (3) should be in batch 2 (last)
+ checkAllData(1 to 3)
lastBatchId = memorySink.latestBatchId.get
fileSource.withBatchingLocked {
@@ -645,8 +653,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
createFile(7) // 6 and 7 should be in the last batch
}
q.processAllAvailable()
- checkLastBatchData(6, 7)
checkNumBatchesSinceLastCheck(2)
+ checkLastBatchData(6, 7)
+ checkAllData(1 to 7)
fileSource.withBatchingLocked {
createFile(8)
@@ -656,8 +665,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
createFile(12) // 12 should be in the last batch
}
q.processAllAvailable()
- checkLastBatchData(12)
checkNumBatchesSinceLastCheck(3)
+ checkLastBatchData(12)
+ checkAllData(1 to 12)
+
+ q.stop()
+ }
+ }
+
+ test("max files per trigger - incorrect values") {
+ withTempDir { case src =>
+ def testMaxFilePerTriggerValue(value: String): Unit = {
+ val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
+ val e = intercept[IllegalArgumentException] {
+ testStream(df)()
+ }
+ Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
+ assert(e.getMessage.contains(s))
+ }
+ }
+
+ testMaxFilePerTriggerValue("not-a-integer")
+ testMaxFilePerTriggerValue("-1")
+ testMaxFilePerTriggerValue("0")
+ testMaxFilePerTriggerValue("10.1")
}
}