aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-12-01 13:15:06 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-12-01 13:15:06 -0800
commit477de94894b7d8eeed281d33c12bcb2269d117c7 (patch)
tree8626fa8815147b07592e6b355a4bf8052992eec4 /streaming/src
parent62965c5d8e3f4f0246ac2c8814ac75ea82b3f238 (diff)
downloadspark-477de94894b7d8eeed281d33c12bcb2269d117c7.tar.gz
spark-477de94894b7d8eeed281d33c12bcb2269d117c7.tar.bz2
spark-477de94894b7d8eeed281d33c12bcb2269d117c7.zip
Minor modifications.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala15
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala8
3 files changed, 22 insertions, 5 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 8efda2074d..28a3e2dfc7 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -146,6 +146,8 @@ extends Serializable with Logging {
}
protected[streaming] def validate() {
+ assert(rememberDuration != null, "Remember duration is set to null")
+
assert(
!mustCheckpoint || checkpointInterval != null,
"The checkpoint interval for " + this.getClass.getSimpleName + " has not been set. " +
@@ -180,13 +182,24 @@ extends Serializable with Logging {
checkpointInterval + "). Please set it to higher than " + checkpointInterval + "."
)
+ val metadataCleanupDelay = System.getProperty("spark.cleanup.delay", "-1").toDouble
+ assert(
+ metadataCleanupDelay < 0 || rememberDuration < metadataCleanupDelay * 60 * 1000,
+ "It seems you are doing some DStream window operation or setting a checkpoint interval " +
+ "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
+ "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
+ "delay is set to " + metadataCleanupDelay + " minutes, which is not sufficient. Please set " +
+ "the Java property 'spark.cleanup.delay' to more than " +
+ math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes."
+ )
+
dependencies.foreach(_.validate())
logInfo("Slide time = " + slideTime)
logInfo("Storage level = " + storageLevel)
logInfo("Checkpoint interval = " + checkpointInterval)
logInfo("Remember duration = " + rememberDuration)
- logInfo("Initialized " + this)
+ logInfo("Initialized and validated " + this)
}
protected[streaming] def setContext(s: StreamingContext) {
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index bb852cbcca..f63a9e0011 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -118,8 +118,8 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
if (seqOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
- val info = "seqOfValues =\n" + seqOfValues.map(x => "[" + x.mkString(",") + "]").mkString("\n")
- throw new Exception("Neither previous window has value for key, nor new values found\n" + info)
+ throw new Exception("Neither previous window has value for key, nor new values found. " +
+ "Are you sure your key class hashes consistently?")
}
// Reduce the new values
newValues.reduce(reduceF) // return
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 63d8766749..9c19f6588d 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -17,6 +17,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import java.util.UUID
+import spark.util.MetadataCleaner
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -268,8 +269,11 @@ class StreamingContext private (
object StreamingContext {
def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
- if (System.getProperty("spark.cleanup.delay", "-1").toDouble < 0) {
- System.setProperty("spark.cleanup.delay", "60")
+
+ // Set the default cleaner delay to an hour if not already set.
+ // This should be sufficient for even 1 second interval.
+ if (MetadataCleaner.getDelaySeconds < 0) {
+ MetadataCleaner.setDelaySeconds(60)
}
new SparkContext(master, frameworkName)
}