aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorpetermaxlee <petermaxlee@gmail.com>2016-09-20 19:08:07 -0700
committerReynold Xin <rxin@databricks.com>2016-09-20 19:08:07 -0700
commit976f3b1227c1a9e0b878e010531285fdba57b6a7 (patch)
tree4071c38e8b08b7cb4b4ec35ff7af881184b9a45f /sql/core/src/test
parent7e418e99cff4cf512ab2a9fa74221c4655048c8d (diff)
downloadspark-976f3b1227c1a9e0b878e010531285fdba57b6a7.tar.gz
spark-976f3b1227c1a9e0b878e010531285fdba57b6a7.tar.bz2
spark-976f3b1227c1a9e0b878e010531285fdba57b6a7.zip
[SPARK-17513][SQL] Make StreamExecution garbage-collect its metadata
## What changes were proposed in this pull request? This PR modifies StreamExecution such that it discards metadata for batches that have already been fully processed. I used the purge method that was added as part of SPARK-17235. This is a resubmission of 15126, which was based on work by frreiss in #15067, but fixed the test case along with some typos. ## How was this patch tested? A new test case in StreamingQuerySuite. The test case would fail without the changes in this pull request. Author: petermaxlee <petermaxlee@gmail.com> Closes #15166 from petermaxlee/SPARK-17513-2.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala24
1 files changed, 24 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 9d58315c20..88f1f188ab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter {
)
}
+ testQuietly("StreamExecution metadata garbage collection") {
+ val inputData = MemoryStream[Int]
+ val mapped = inputData.toDS().map(6 / _)
+
+ // Run 3 batches, and then assert that only 1 metadata file is left at the end
+ // since the first 2 should have been purged.
+ testStream(mapped)(
+ AddData(inputData, 1, 2),
+ CheckAnswer(6, 3),
+ AddData(inputData, 1, 2),
+ CheckAnswer(6, 3, 6, 3),
+ AddData(inputData, 4, 6),
+ CheckAnswer(6, 3, 6, 3, 1, 1),
+
+ AssertOnQuery("metadata log should contain only one file") { q =>
+ val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
+ val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
+ val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475
+ assert(toTest.size == 1 && toTest.head == "2")
+ true
+ }
+ )
+ }
+
/**
* A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`.
*