diff options
author | petermaxlee <petermaxlee@gmail.com> | 2016-09-19 22:19:51 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-09-19 22:19:51 -0700 |
commit | be9d57fc9d8b10e4234c01c06ed43fd7dd12c07b (patch) | |
tree | a5ca485bc454b64cbc31dc2dc35b36004c9eda19 /sql/core/src/test/scala/org | |
parent | 26145a5af9a88053c0eaf280206ca2621c8919f6 (diff) | |
download | spark-be9d57fc9d8b10e4234c01c06ed43fd7dd12c07b.tar.gz spark-be9d57fc9d8b10e4234c01c06ed43fd7dd12c07b.tar.bz2 spark-be9d57fc9d8b10e4234c01c06ed43fd7dd12c07b.zip |
[SPARK-17513][SQL] Make StreamExecution garbage-collect its metadata
## What changes were proposed in this pull request?
This PR modifies StreamExecution such that it discards metadata for batches that have already been fully processed. I used the purge method that was added as part of SPARK-17235.
This is based on work by frreiss in #15067, but fixed the test case along with some typos.
## How was this patch tested?
A new test case in StreamingQuerySuite. The test case would fail without the changes in this pull request.
Author: petermaxlee <petermaxlee@gmail.com>
Author: frreiss <frreiss@us.ibm.com>
Closes #15126 from petermaxlee/SPARK-17513.
Diffstat (limited to 'sql/core/src/test/scala/org')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 9d58315c20..d3e2cab1b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } + testQuietly("StreamExecution metadata garbage collection") { + val inputData = MemoryStream[Int] + val mapped = inputData.toDS().map(6 / _) + + // Run 3 batches, and then assert that only 1 metadata file is left at the end + // since the first 2 should have been purged. + testStream(mapped)( + AddData(inputData, 1, 2), + CheckAnswer(6, 3), + AddData(inputData, 1, 2), + CheckAnswer(6, 3, 6, 3), + AddData(inputData, 4, 6), + CheckAnswer(6, 3, 6, 3, 1, 1), + + AssertOnQuery("metadata log should contain only one file") { q => + val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) + val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) + val toTest = logFileNames // Workaround for SPARK-17475 + assert(toTest.size == 1 && toTest.head == "2") + true + } + ) + } + /** * A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`. * |