diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-01-06 20:50:31 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-01-06 20:50:31 -0800 |
commit | 8e19c7663a067d55b32af68d62da42c7cd5d6009 (patch) | |
tree | 331132f4bc4dcc48d94acda2ff5d456af849ab77 /streaming/src/main | |
parent | 6b6d02be0d4e2ce562dddfb391b3302f79de8276 (diff) | |
download | spark-8e19c7663a067d55b32af68d62da42c7cd5d6009.tar.gz spark-8e19c7663a067d55b32af68d62da42c7cd5d6009.tar.bz2 spark-8e19c7663a067d55b32af68d62da42c7cd5d6009.zip |
[SPARK-7689] Remove TTL-based metadata cleaning in Spark 2.0
This PR removes `spark.cleaner.ttl` and the associated TTL-based metadata cleaning code.
Now that we have the `ContextCleaner` and a timer to trigger periodic GCs, I don't think that `spark.cleaner.ttl` is necessary anymore. The TTL-based cleaning isn't enabled by default, isn't included in our end-to-end tests, and has been a source of user confusion when it is misconfigured. If the TTL is set too low, data which is still being used may be evicted / deleted, leading to hard to diagnose bugs.
For all of these reasons, I think that we should remove this functionality in Spark 2.0. Additional benefits of doing this include marginally reduced memory usage, since we no longer need to store timetsamps in hashmaps, and a handful fewer threads.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #10534 from JoshRosen/remove-ttl-based-cleaning.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 3 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala | 14 |
2 files changed, 2 insertions, 15 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 61b230ab6f..b186d29761 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -27,8 +27,8 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec +import org.apache.spark.util.Utils import org.apache.spark.streaming.scheduler.JobGenerator -import org.apache.spark.util.{MetadataCleaner, Utils} private[streaming] class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) @@ -40,7 +40,6 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray - val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConfPairs = ssc.conf.getAll def createSparkConf(): SparkConf = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 91a43e14a8..c59348a89d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -32,7 +32,7 @@ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext.rddToFileName import org.apache.spark.streaming.scheduler.Job import org.apache.spark.streaming.ui.UIUtils -import org.apache.spark.util.{CallSite, MetadataCleaner, Utils} +import org.apache.spark.util.{CallSite, Utils} /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous @@ -271,18 +271,6 @@ abstract class DStream[T: ClassTag] ( checkpointDuration + "). Please set it to higher than " + checkpointDuration + "." ) - val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf) - logInfo("metadataCleanupDelay = " + metadataCleanerDelay) - require( - metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, - "It seems you are doing some DStream window operation or setting a checkpoint interval " + - "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + - "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" + - "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " + - "set the Java cleaner delay to more than " + - math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds." - ) - dependencies.foreach(_.validateAtStart()) logInfo("Slide time = " + slideDuration) |