diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-27 15:08:49 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-27 15:08:49 -0800 |
commit | b18d70870a33a4783c6b3b787bef9b0eec30bce0 (patch) | |
tree | 28718cee54ccce4b96dea9affa1654dea8f0d1fd /streaming/src | |
parent | 0fe2fc4d5e1b3c3cb32052a43d227bdaf29dd488 (diff) | |
download | spark-b18d70870a33a4783c6b3b787bef9b0eec30bce0.tar.gz spark-b18d70870a33a4783c6b3b787bef9b0eec30bce0.tar.bz2 spark-b18d70870a33a4783c6b3b787bef9b0eec30bce0.zip |
Modified bunch HashMaps in Spark to use TimeStampedHashMap and made various modules use CleanupTask to periodically clean up metadata.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/StreamingContext.scala | 13 |
1 files changed, 9 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 4a41f2f516..58123dc82c 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -43,7 +43,7 @@ class StreamingContext private ( * @param batchDuration The time interval at which streaming data will be divided into batches */ def this(master: String, frameworkName: String, batchDuration: Time) = - this(new SparkContext(master, frameworkName), null, batchDuration) + this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration) /** * Recreates the StreamingContext from a checkpoint file. @@ -214,11 +214,8 @@ class StreamingContext private ( "Checkpoint directory has been set, but the graph checkpointing interval has " + "not been set. Please use StreamingContext.checkpoint() to set the interval." ) - - } - /** * This function starts the execution of the streams. */ @@ -265,6 +262,14 @@ class StreamingContext private ( object StreamingContext { + + def createNewSparkContext(master: String, frameworkName: String): SparkContext = { + if (System.getProperty("spark.cleanup.delay", "-1").toInt < 0) { + System.setProperty("spark.cleanup.delay", "60") + } + new SparkContext(master, frameworkName) + } + implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = { new PairDStreamFunctions[K, V](stream) } |