From f039d964d152c0aeb5b71eb5188a9a7fd4b5aef3 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 20 Sep 2016 16:12:35 +0800 Subject: Revert "[SPARK-17513][SQL] Make StreamExecution garbage-collect its metadata" This reverts commit be9d57fc9d8b10e4234c01c06ed43fd7dd12c07b. --- .../sql/execution/streaming/MetadataLog.scala | 1 - .../sql/execution/streaming/StreamExecution.scala | 7 ------- .../spark/sql/streaming/StreamingQuerySuite.scala | 24 ---------------------- 3 files changed, 32 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala index 9e2604c9c0..78d6be17df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala @@ -24,7 +24,6 @@ package org.apache.spark.sql.execution.streaming * - Allow the user to query the latest batch id. * - Allow the user to query the metadata object of a specified batch id. * - Allow the user to query metadata objects in a range of batch ids. - * - Allow the user to remove obsolete metadata */ trait MetadataLog[T] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 220f77dc24..a1aae61107 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -290,13 +290,6 @@ class StreamExecution( assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") - - // Now that we have logged the new batch, no further processing will happen for - // the previous batch, and it is safe to discard the old metadata. - // Note that purge is exclusive, i.e. it purges everything before currentBatchId. - // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in - // flight at the same time), this cleanup logic will need to change. - offsetLog.purge(currentBatchId) } else { awaitBatchLock.lock() try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index d3e2cab1b8..9d58315c20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -125,30 +125,6 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } - testQuietly("StreamExecution metadata garbage collection") { - val inputData = MemoryStream[Int] - val mapped = inputData.toDS().map(6 / _) - - // Run 3 batches, and then assert that only 1 metadata file is left at the end - // since the first 2 should have been purged. - testStream(mapped)( - AddData(inputData, 1, 2), - CheckAnswer(6, 3), - AddData(inputData, 1, 2), - CheckAnswer(6, 3, 6, 3), - AddData(inputData, 4, 6), - CheckAnswer(6, 3, 6, 3, 1, 1), - - AssertOnQuery("metadata log should contain only one file") { q => - val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) - val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) - val toTest = logFileNames // Workaround for SPARK-17475 - assert(toTest.size == 1 && toTest.head == "2") - true - } - ) - } - /** * A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`. * -- cgit v1.2.3