diff options
author | Matei Zaharia <matei@databricks.com> | 2013-12-28 21:38:07 -0500 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2013-12-28 21:38:07 -0500 |
commit | 0900d5c72aaf6670bb9fcce8e0c0cfa976adcdf7 (patch) | |
tree | e0e15acdac75aab1348e62a09f8f08bb8ab61b8d /streaming/src | |
parent | a8f316386a429c6d73e3e3824ea6eb28b0381cb5 (diff) | |
download | spark-0900d5c72aaf6670bb9fcce8e0c0cfa976adcdf7.tar.gz spark-0900d5c72aaf6670bb9fcce8e0c0cfa976adcdf7.tar.bz2 spark-0900d5c72aaf6670bb9fcce8e0c0cfa976adcdf7.zip |
Add a StreamingContext constructor that takes a conf object
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 21 |
1 files changed, 20 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 079841ad9d..9d2033fd11 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -71,6 +71,15 @@ class StreamingContext private ( } /** + * Create a StreamingContext by providing the configuration necessary for a new SparkContext. + * @param conf A standard Spark application configuration + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(conf: SparkConf, batchDuration: Duration) = { + this(StreamingContext.createNewSparkContext(conf), null, batchDuration) + } + + /** * Create a StreamingContext by providing the details necessary for creating a new SparkContext. * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). * @param appName A name for your job, to display on the cluster web UI @@ -577,6 +586,16 @@ object StreamingContext { new PairDStreamFunctions[K, V](stream) } + protected[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = { + // Set the default cleaner delay to an hour if not already set. + // This should be sufficient for even 1 second batch intervals. + val sc = new SparkContext(conf) + if (MetadataCleaner.getDelaySeconds(sc.conf) < 0) { + MetadataCleaner.setDelaySeconds(sc.conf, 3600) + } + sc + } + protected[streaming] def createNewSparkContext( master: String, appName: String, @@ -586,7 +605,7 @@ object StreamingContext { { val sc = new SparkContext(master, appName, sparkHome, jars, environment) // Set the default cleaner delay to an hour if not already set. - // This should be sufficient for even 1 second interval. + // This should be sufficient for even 1 second batch intervals. if (MetadataCleaner.getDelaySeconds(sc.conf) < 0) { MetadataCleaner.setDelaySeconds(sc.conf, 3600) } |