aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/util/MetadataCleaner.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala15
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala8
4 files changed, 28 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
index 71ac39864e..2541b26255 100644
--- a/core/src/main/scala/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -5,7 +5,7 @@ import java.util.{TimerTask, Timer}
import spark.Logging
class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
- val delaySeconds = (System.getProperty("spark.cleanup.delay", "-100").toDouble * 60).toInt
+ val delaySeconds = MetadataCleaner.getDelaySeconds
val periodSeconds = math.max(10, delaySeconds / 10)
val timer = new Timer(name + " cleanup timer", true)
val task = new TimerTask {
@@ -30,3 +30,8 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging
timer.cancel()
}
}
+
+object MetadataCleaner {
+ def getDelaySeconds = (System.getProperty("spark.cleaner.delay", "-100").toDouble * 60).toInt
+ def setDelaySeconds(delay: Long) { System.setProperty("spark.cleaner.delay", delay.toString) }
+}
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 8efda2074d..28a3e2dfc7 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -146,6 +146,8 @@ extends Serializable with Logging {
}
protected[streaming] def validate() {
+ assert(rememberDuration != null, "Remember duration is set to null")
+
assert(
!mustCheckpoint || checkpointInterval != null,
"The checkpoint interval for " + this.getClass.getSimpleName + " has not been set. " +
@@ -180,13 +182,24 @@ extends Serializable with Logging {
checkpointInterval + "). Please set it to higher than " + checkpointInterval + "."
)
+ val metadataCleanupDelay = System.getProperty("spark.cleanup.delay", "-1").toDouble
+ assert(
+ metadataCleanupDelay < 0 || rememberDuration < metadataCleanupDelay * 60 * 1000,
+ "It seems you are doing some DStream window operation or setting a checkpoint interval " +
+ "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
+ "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
+ "delay is set to " + metadataCleanupDelay + " minutes, which is not sufficient. Please set " +
+ "the Java property 'spark.cleanup.delay' to more than " +
+ math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes."
+ )
+
dependencies.foreach(_.validate())
logInfo("Slide time = " + slideTime)
logInfo("Storage level = " + storageLevel)
logInfo("Checkpoint interval = " + checkpointInterval)
logInfo("Remember duration = " + rememberDuration)
- logInfo("Initialized " + this)
+ logInfo("Initialized and validated " + this)
}
protected[streaming] def setContext(s: StreamingContext) {
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index bb852cbcca..f63a9e0011 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -118,8 +118,8 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
if (seqOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
- val info = "seqOfValues =\n" + seqOfValues.map(x => "[" + x.mkString(",") + "]").mkString("\n")
- throw new Exception("Neither previous window has value for key, nor new values found\n" + info)
+ throw new Exception("Neither previous window has value for key, nor new values found. " +
+ "Are you sure your key class hashes consistently?")
}
// Reduce the new values
newValues.reduce(reduceF) // return
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 63d8766749..9c19f6588d 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -17,6 +17,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import java.util.UUID
+import spark.util.MetadataCleaner
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -268,8 +269,11 @@ class StreamingContext private (
object StreamingContext {
def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
- if (System.getProperty("spark.cleanup.delay", "-1").toDouble < 0) {
- System.setProperty("spark.cleanup.delay", "60")
+
+ // Set the default cleaner delay to an hour if not already set.
+ // This should be sufficient for even 1 second interval.
+ if (MetadataCleaner.getDelaySeconds < 0) {
+ MetadataCleaner.setDelaySeconds(60)
}
new SparkContext(master, frameworkName)
}