aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-27 15:08:49 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-27 15:08:49 -0800
commitb18d70870a33a4783c6b3b787bef9b0eec30bce0 (patch)
tree28718cee54ccce4b96dea9affa1654dea8f0d1fd /streaming/src
parent0fe2fc4d5e1b3c3cb32052a43d227bdaf29dd488 (diff)
downloadspark-b18d70870a33a4783c6b3b787bef9b0eec30bce0.tar.gz
spark-b18d70870a33a4783c6b3b787bef9b0eec30bce0.tar.bz2
spark-b18d70870a33a4783c6b3b787bef9b0eec30bce0.zip
Modified bunch HashMaps in Spark to use TimeStampedHashMap and made various modules use CleanupTask to periodically clean up metadata.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala13
1 files changed, 9 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 4a41f2f516..58123dc82c 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -43,7 +43,7 @@ class StreamingContext private (
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
def this(master: String, frameworkName: String, batchDuration: Time) =
- this(new SparkContext(master, frameworkName), null, batchDuration)
+ this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
/**
* Recreates the StreamingContext from a checkpoint file.
@@ -214,11 +214,8 @@ class StreamingContext private (
"Checkpoint directory has been set, but the graph checkpointing interval has " +
"not been set. Please use StreamingContext.checkpoint() to set the interval."
)
-
-
}
-
/**
* This function starts the execution of the streams.
*/
@@ -265,6 +262,14 @@ class StreamingContext private (
object StreamingContext {
+
+ def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
+ if (System.getProperty("spark.cleanup.delay", "-1").toInt < 0) {
+ System.setProperty("spark.cleanup.delay", "60")
+ }
+ new SparkContext(master, frameworkName)
+ }
+
implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}