aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala9
1 files changed, 5 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 28a3e2dfc7..d2e9de110e 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -182,14 +182,15 @@ extends Serializable with Logging {
checkpointInterval + "). Please set it to higher than " + checkpointInterval + "."
)
- val metadataCleanupDelay = System.getProperty("spark.cleanup.delay", "-1").toDouble
+ val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds
+ logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
assert(
- metadataCleanupDelay < 0 || rememberDuration < metadataCleanupDelay * 60 * 1000,
+ metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000,
"It seems you are doing some DStream window operation or setting a checkpoint interval " +
"which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
"than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
- "delay is set to " + metadataCleanupDelay + " minutes, which is not sufficient. Please set " +
- "the Java property 'spark.cleanup.delay' to more than " +
+ "delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " +
+ "the Java property 'spark.cleaner.delay' to more than " +
math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes."
)