aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-09-20 16:12:35 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-20 16:12:35 +0800
commitf039d964d152c0aeb5b71eb5188a9a7fd4b5aef3 (patch)
treeedf2965b4903c10504183006e0b60ee011dc286c
parentbe9d57fc9d8b10e4234c01c06ed43fd7dd12c07b (diff)
downloadspark-f039d964d152c0aeb5b71eb5188a9a7fd4b5aef3.tar.gz
spark-f039d964d152c0aeb5b71eb5188a9a7fd4b5aef3.tar.bz2
spark-f039d964d152c0aeb5b71eb5188a9a7fd4b5aef3.zip
Revert "[SPARK-17513][SQL] Make StreamExecution garbage-collect its metadata"
This reverts commit be9d57fc9d8b10e4234c01c06ed43fd7dd12c07b.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala24
3 files changed, 0 insertions, 32 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
index 9e2604c9c0..78d6be17df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
@@ -24,7 +24,6 @@ package org.apache.spark.sql.execution.streaming
* - Allow the user to query the latest batch id.
* - Allow the user to query the metadata object of a specified batch id.
* - Allow the user to query metadata objects in a range of batch ids.
- * - Allow the user to remove obsolete metadata
*/
trait MetadataLog[T] {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 220f77dc24..a1aae61107 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -290,13 +290,6 @@ class StreamExecution(
assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId.")
-
- // Now that we have logged the new batch, no further processing will happen for
- // the previous batch, and it is safe to discard the old metadata.
- // Note that purge is exclusive, i.e. it purges everything before currentBatchId.
- // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in
- // flight at the same time), this cleanup logic will need to change.
- offsetLog.purge(currentBatchId)
} else {
awaitBatchLock.lock()
try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index d3e2cab1b8..9d58315c20 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -125,30 +125,6 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter {
)
}
- testQuietly("StreamExecution metadata garbage collection") {
- val inputData = MemoryStream[Int]
- val mapped = inputData.toDS().map(6 / _)
-
- // Run 3 batches, and then assert that only 1 metadata file is left at the end
- // since the first 2 should have been purged.
- testStream(mapped)(
- AddData(inputData, 1, 2),
- CheckAnswer(6, 3),
- AddData(inputData, 1, 2),
- CheckAnswer(6, 3, 6, 3),
- AddData(inputData, 4, 6),
- CheckAnswer(6, 3, 6, 3, 1, 1),
-
- AssertOnQuery("metadata log should contain only one file") { q =>
- val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
- val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
- val toTest = logFileNames // Workaround for SPARK-17475
- assert(toTest.size == 1 && toTest.head == "2")
- true
- }
- )
- }
-
/**
* A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`.
*