aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-25 15:13:30 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-25 15:13:30 -0800
commit490f056cddc3dc02066a1e2414be6576d6441d51 (patch)
treef97a548bc4bd37d57e10b6c232015fdb32e27d2a /streaming
parent568bdaf8ae784c9b832f564cb99f1b81ad487f73 (diff)
downloadspark-490f056cddc3dc02066a1e2414be6576d6441d51.tar.gz
spark-490f056cddc3dc02066a1e2414be6576d6441d51.tar.bz2
spark-490f056cddc3dc02066a1e2414be6576d6441d51.zip
Allow passing sparkHome and JARs to StreamingContext constructor
Also warns if spark.cleaner.ttl is not set in the version where you pass your own SparkContext.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala40
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala36
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala1
4 files changed, 68 insertions, 10 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index e7a392fbbf..e303e33e5e 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -17,6 +17,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val framework = ssc.sc.appName
val sparkHome = ssc.sc.sparkHome
val jars = ssc.sc.jars
+ val environment = ssc.sc.environment
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 25c67b279b..31b5d2c8bc 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -6,7 +6,7 @@ import akka.zeromq.Subscribe
import spark.streaming.dstream._
-import spark.{RDD, Logging, SparkEnv, SparkContext}
+import spark._
import spark.streaming.receivers.ActorReceiver
import spark.streaming.receivers.ReceiverSupervisorStrategy
import spark.streaming.receivers.ZeroMQReceiver
@@ -14,18 +14,18 @@ import spark.storage.StorageLevel
import spark.util.MetadataCleaner
import spark.streaming.receivers.ActorReceiver
-
import scala.collection.mutable.Queue
+import scala.collection.Map
import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
+import java.util.UUID
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
-import java.util.UUID
import twitter4j.Status
/**
@@ -44,7 +44,9 @@ class StreamingContext private (
* @param sparkContext Existing SparkContext
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
- def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration)
+ def this(sparkContext: SparkContext, batchDuration: Duration) = {
+ this(sparkContext, null, batchDuration)
+ }
/**
* Create a StreamingContext by providing the details necessary for creating a new SparkContext.
@@ -52,8 +54,17 @@ class StreamingContext private (
* @param appName A name for your job, to display on the cluster web UI
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
- def this(master: String, appName: String, batchDuration: Duration) =
- this(StreamingContext.createNewSparkContext(master, appName), null, batchDuration)
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String = null,
+ jars: Seq[String] = Nil,
+ environment: Map[String, String] = Map()) = {
+ this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment),
+ null, batchDuration)
+ }
+
/**
* Re-create a StreamingContext from a checkpoint file.
@@ -65,15 +76,20 @@ class StreamingContext private (
initLogging()
if (sc_ == null && cp_ == null) {
- throw new Exception("Streaming Context cannot be initilalized with " +
+ throw new Exception("Spark Streaming cannot be initialized with " +
"both SparkContext and checkpoint as null")
}
+ if (MetadataCleaner.getDelaySeconds < 0) {
+ throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
+ + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")
+ }
+
protected[streaming] val isCheckpointPresent = (cp_ != null)
protected[streaming] val sc: SparkContext = {
if (isCheckpointPresent) {
- new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars)
+ new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars, cp_.environment)
} else {
sc_
}
@@ -478,8 +494,12 @@ object StreamingContext {
new PairDStreamFunctions[K, V](stream)
}
- protected[streaming] def createNewSparkContext(master: String, appName: String): SparkContext = {
-
+ protected[streaming] def createNewSparkContext(
+ master: String,
+ appName: String,
+ sparkHome: String,
+ jars: Seq[String],
+ environment: Map[String, String]): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
// This should be sufficient for even 1 second interval.
if (MetadataCleaner.getDelaySeconds < 0) {
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index f3b40b5b88..b528ebbc19 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -45,6 +45,42 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Creates a StreamingContext.
+ * @param master Name of the Spark Master
+ * @param appName Name to be used when registering with the scheduler
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param sparkHome The SPARK_HOME directory on the slave nodes
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ */
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String,
+ jars: Array[String]) =
+ this(new StreamingContext(master, appName, batchDuration, sparkHome, jars))
+
+ /**
+ * Creates a StreamingContext.
+ * @param master Name of the Spark Master
+ * @param appName Name to be used when registering with the scheduler
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param sparkHome The SPARK_HOME directory on the slave nodes
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ * @param environment Environment variables to set on worker nodes
+ */
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String,
+ jars: Array[String],
+ environment: JMap[String, String]) =
+ this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, environment))
+
+ /**
+ * Creates a StreamingContext using an existing SparkContext.
* @param sparkContext The underlying JavaSparkContext to use
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index dc7139cc27..ddd9becf32 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -13,6 +13,7 @@ import kafka.serializer.StringDecoder
import kafka.utils.{Utils, ZKGroupTopicDirs}
import kafka.utils.ZkUtils._
+import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._