aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala99
1 files changed, 97 insertions, 2 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index a02a36c004..55c95ae285 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming
import java.io.File
-import org.scalatest.concurrent.Eventually._
+import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql._
@@ -30,7 +30,7 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-class FileStreamSourceTest extends StreamTest with SharedSQLContext {
+class FileStreamSourceTest extends StreamTest with SharedSQLContext with PrivateMethodTester {
import testImplicits._
@@ -804,6 +804,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
)
}
}
+
+ test("compacat metadata log") {
+ val _sources = PrivateMethod[Seq[Source]]('sources)
+ val _metadataLog = PrivateMethod[FileStreamSourceLog]('metadataLog)
+
+ def verify(execution: StreamExecution)
+ (batchId: Long, expectedBatches: Int): Boolean = {
+ import CompactibleFileStreamLog._
+
+ val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
+ val metadataLog = fileSource invokePrivate _metadataLog()
+
+ if (isCompactionBatch(batchId, 2)) {
+ val path = metadataLog.batchIdToPath(batchId)
+
+ // Assert path name should be ended with compact suffix.
+ assert(path.getName.endsWith(COMPACT_FILE_SUFFIX))
+
+ // Compacted batch should include all entries from start.
+ val entries = metadataLog.get(batchId)
+ assert(entries.isDefined)
+ assert(entries.get.length === metadataLog.allFiles().length)
+ assert(metadataLog.get(None, Some(batchId)).flatMap(_._2).length === entries.get.length)
+ }
+
+ assert(metadataLog.allFiles().sortBy(_.batchId) ===
+ metadataLog.get(None, Some(batchId)).flatMap(_._2).sortBy(_.batchId))
+
+ metadataLog.get(None, Some(batchId)).flatMap(_._2).length === expectedBatches
+ }
+
+ withTempDirs { case (src, tmp) =>
+ withSQLConf(
+ SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2"
+ ) {
+ val fileStream = createFileStream("text", src.getCanonicalPath)
+ val filtered = fileStream.filter($"value" contains "keep")
+
+ testStream(filtered)(
+ AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
+ CheckAnswer("keep2", "keep3"),
+ AssertOnQuery(verify(_)(0L, 1)),
+ AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+ AssertOnQuery(verify(_)(1L, 2)),
+ AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9"),
+ AssertOnQuery(verify(_)(2L, 3)),
+ StopStream,
+ StartStream(),
+ AssertOnQuery(verify(_)(2L, 3)),
+ AddTextFileData("drop10\nkeep11", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11"),
+ AssertOnQuery(verify(_)(3L, 4)),
+ AddTextFileData("drop12\nkeep13", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11", "keep13"),
+ AssertOnQuery(verify(_)(4L, 5))
+ )
+ }
+ }
+ }
+
+ test("get arbitrary batch from FileStreamSource") {
+ withTempDirs { case (src, tmp) =>
+ withSQLConf(
+ SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+ // Force deleting the old logs
+ SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+ ) {
+ val fileStream = createFileStream("text", src.getCanonicalPath)
+ val filtered = fileStream.filter($"value" contains "keep")
+
+ testStream(filtered)(
+ AddTextFileData("keep1", src, tmp),
+ CheckAnswer("keep1"),
+ AddTextFileData("keep2", src, tmp),
+ CheckAnswer("keep1", "keep2"),
+ AddTextFileData("keep3", src, tmp),
+ CheckAnswer("keep1", "keep2", "keep3"),
+ AssertOnQuery("check getBatch") { execution: StreamExecution =>
+ val _sources = PrivateMethod[Seq[Source]]('sources)
+ val fileSource =
+ (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
+ assert(fileSource.getBatch(None, LongOffset(2)).as[String].collect() ===
+ List("keep1", "keep2", "keep3"))
+ assert(fileSource.getBatch(Some(LongOffset(0)), LongOffset(2)).as[String].collect() ===
+ List("keep2", "keep3"))
+ assert(fileSource.getBatch(Some(LongOffset(1)), LongOffset(2)).as[String].collect() ===
+ List("keep3"))
+ true
+ }
+ )
+ }
+ }
+ }
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {