aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2013-12-28 21:38:07 -0500
committerMatei Zaharia <matei@databricks.com>2013-12-28 21:38:07 -0500
commit0900d5c72aaf6670bb9fcce8e0c0cfa976adcdf7 (patch)
treee0e15acdac75aab1348e62a09f8f08bb8ab61b8d /streaming
parenta8f316386a429c6d73e3e3824ea6eb28b0381cb5 (diff)
downloadspark-0900d5c72aaf6670bb9fcce8e0c0cfa976adcdf7.tar.gz
spark-0900d5c72aaf6670bb9fcce8e0c0cfa976adcdf7.tar.bz2
spark-0900d5c72aaf6670bb9fcce8e0c0cfa976adcdf7.zip
Add a StreamingContext constructor that takes a conf object
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala21
1 files changed, 20 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 079841ad9d..9d2033fd11 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -71,6 +71,15 @@ class StreamingContext private (
}
/**
+ * Create a StreamingContext by providing the configuration necessary for a new SparkContext.
+ * @param conf A standard Spark application configuration
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(conf: SparkConf, batchDuration: Duration) = {
+ this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
+ }
+
+ /**
* Create a StreamingContext by providing the details necessary for creating a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your job, to display on the cluster web UI
@@ -577,6 +586,16 @@ object StreamingContext {
new PairDStreamFunctions[K, V](stream)
}
+ protected[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
+ // Set the default cleaner delay to an hour if not already set.
+ // This should be sufficient for even 1 second batch intervals.
+ val sc = new SparkContext(conf)
+ if (MetadataCleaner.getDelaySeconds(sc.conf) < 0) {
+ MetadataCleaner.setDelaySeconds(sc.conf, 3600)
+ }
+ sc
+ }
+
protected[streaming] def createNewSparkContext(
master: String,
appName: String,
@@ -586,7 +605,7 @@ object StreamingContext {
{
val sc = new SparkContext(master, appName, sparkHome, jars, environment)
// Set the default cleaner delay to an hour if not already set.
- // This should be sufficient for even 1 second interval.
+ // This should be sufficient for even 1 second batch intervals.
if (MetadataCleaner.getDelaySeconds(sc.conf) < 0) {
MetadataCleaner.setDelaySeconds(sc.conf, 3600)
}