aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-01-06 20:50:31 -0800
committerReynold Xin <rxin@databricks.com>2016-01-06 20:50:31 -0800
commit8e19c7663a067d55b32af68d62da42c7cd5d6009 (patch)
tree331132f4bc4dcc48d94acda2ff5d456af849ab77 /streaming/src/main
parent6b6d02be0d4e2ce562dddfb391b3302f79de8276 (diff)
downloadspark-8e19c7663a067d55b32af68d62da42c7cd5d6009.tar.gz
spark-8e19c7663a067d55b32af68d62da42c7cd5d6009.tar.bz2
spark-8e19c7663a067d55b32af68d62da42c7cd5d6009.zip
[SPARK-7689] Remove TTL-based metadata cleaning in Spark 2.0
This PR removes `spark.cleaner.ttl` and the associated TTL-based metadata cleaning code. Now that we have the `ContextCleaner` and a timer to trigger periodic GCs, I don't think that `spark.cleaner.ttl` is necessary anymore. The TTL-based cleaning isn't enabled by default, isn't included in our end-to-end tests, and has been a source of user confusion when it is misconfigured. If the TTL is set too low, data which is still being used may be evicted / deleted, leading to hard to diagnose bugs. For all of these reasons, I think that we should remove this functionality in Spark 2.0. Additional benefits of doing this include marginally reduced memory usage, since we no longer need to store timetsamps in hashmaps, and a handful fewer threads. Author: Josh Rosen <joshrosen@databricks.com> Closes #10534 from JoshRosen/remove-ttl-based-cleaning.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala14
2 files changed, 2 insertions, 15 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 61b230ab6f..b186d29761 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -27,8 +27,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.Utils
import org.apache.spark.streaming.scheduler.JobGenerator
-import org.apache.spark.util.{MetadataCleaner, Utils}
private[streaming]
class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
@@ -40,7 +40,6 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
- val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConfPairs = ssc.conf.getAll
def createSparkConf(): SparkConf = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 91a43e14a8..c59348a89d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext.rddToFileName
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.streaming.ui.UIUtils
-import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
+import org.apache.spark.util.{CallSite, Utils}
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -271,18 +271,6 @@ abstract class DStream[T: ClassTag] (
checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
)
- val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf)
- logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
- require(
- metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
- "It seems you are doing some DStream window operation or setting a checkpoint interval " +
- "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
- "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" +
- "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " +
- "set the Java cleaner delay to more than " +
- math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
- )
-
dependencies.foreach(_.validateAtStart())
logInfo("Slide time = " + slideDuration)