aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/api/java/JavaSparkContext.scala4
-rw-r--r--docs/scala-programming-guide.md4
-rw-r--r--docs/streaming-programming-guide.md10
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala40
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala36
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala1
8 files changed, 76 insertions, 22 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index df23710d46..7503b1a5ea 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -64,7 +64,7 @@ class SparkContext(
val appName: String,
val sparkHome: String = null,
val jars: Seq[String] = Nil,
- environment: Map[String, String] = Map())
+ val environment: Map[String, String] = Map())
extends Logging {
// Ensure logging is initialized before we spawn any threads
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
index f75fc27c7b..5f18b1e15b 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -31,8 +31,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
- * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
- * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ * @param jarFile JAR file to send to the cluster. This can be a path on the local file system
+ * or an HDFS, HTTP, HTTPS, or FTP URL.
*/
def this(master: String, appName: String, sparkHome: String, jarFile: String) =
this(new SparkContext(master, appName, sparkHome, Seq(jarFile)))
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index b98718a553..2315aadbdf 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -38,10 +38,10 @@ The first thing a Spark program must do is to create a `SparkContext` object, wh
This is done through the following constructor:
{% highlight scala %}
-new SparkContext(master, jobName, [sparkHome], [jars])
+new SparkContext(master, appName, [sparkHome], [jars])
{% endhighlight %}
-The `master` parameter is a string specifying a [Mesos](running-on-mesos.html) cluster to connect to, or a special "local" string to run in local mode, as described below. `jobName` is a name for your job, which will be shown in the Mesos web UI when running on a cluster. Finally, the last two parameters are needed to deploy your code to a cluster if running in distributed mode, as described later.
+The `master` parameter is a string specifying a [Spark or Mesos cluster URL](#master-urls) to connect to, or a special "local" string to run in local mode, as described below. `appName` is a name for your application, which will be shown in the cluster web UI. Finally, the last two parameters are needed to deploy your code to a cluster if running in distributed mode, as described later.
In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `MASTER` environment variable. For example, to run on four cores, use
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 7e913b783d..42a4a5619d 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -17,16 +17,12 @@ This guide shows some how to start programming with DStreams.
The first thing a Spark Streaming program must do is create a `StreamingContext` object, which tells Spark how to access a cluster. A `StreamingContext` can be created by using
{% highlight scala %}
-new StreamingContext(master, jobName, batchDuration)
+new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
{% endhighlight %}
-The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing mode. The `jobName` is the name of the streaming job, which is the same as the jobName used in SparkContext. It is used to identify this job in the cluster UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such the cluster can keep up with the processing of the data streams. Starting with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion.
-
-This constructor creates a SparkContext object using the given `master` and `jobName` parameters. However, if you already have a SparkContext or you need to create a custom SparkContext by specifying list of JARs, then a StreamingContext can be created from the existing SparkContext, by using
-{% highlight scala %}
-new StreamingContext(sparkContext, batchDuration)
-{% endhighlight %}
+The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing. The `appName` is a name of your program, which will be shown on your cluster's web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion. Finally, `sparkHome` and `jars` are necessary when running on a cluster to specify the location of your code, as described in the [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).
+This constructor creates a SparkContext for your job as well, which can be accessed with `streamingContext.sparkContext`.
# Attaching Input Sources - InputDStreams
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index e7a392fbbf..e303e33e5e 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -17,6 +17,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val framework = ssc.sc.appName
val sparkHome = ssc.sc.sparkHome
val jars = ssc.sc.jars
+ val environment = ssc.sc.environment
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 25c67b279b..31b5d2c8bc 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -6,7 +6,7 @@ import akka.zeromq.Subscribe
import spark.streaming.dstream._
-import spark.{RDD, Logging, SparkEnv, SparkContext}
+import spark._
import spark.streaming.receivers.ActorReceiver
import spark.streaming.receivers.ReceiverSupervisorStrategy
import spark.streaming.receivers.ZeroMQReceiver
@@ -14,18 +14,18 @@ import spark.storage.StorageLevel
import spark.util.MetadataCleaner
import spark.streaming.receivers.ActorReceiver
-
import scala.collection.mutable.Queue
+import scala.collection.Map
import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
+import java.util.UUID
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
-import java.util.UUID
import twitter4j.Status
/**
@@ -44,7 +44,9 @@ class StreamingContext private (
* @param sparkContext Existing SparkContext
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
- def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration)
+ def this(sparkContext: SparkContext, batchDuration: Duration) = {
+ this(sparkContext, null, batchDuration)
+ }
/**
* Create a StreamingContext by providing the details necessary for creating a new SparkContext.
@@ -52,8 +54,17 @@ class StreamingContext private (
* @param appName A name for your job, to display on the cluster web UI
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
- def this(master: String, appName: String, batchDuration: Duration) =
- this(StreamingContext.createNewSparkContext(master, appName), null, batchDuration)
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String = null,
+ jars: Seq[String] = Nil,
+ environment: Map[String, String] = Map()) = {
+ this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment),
+ null, batchDuration)
+ }
+
/**
* Re-create a StreamingContext from a checkpoint file.
@@ -65,15 +76,20 @@ class StreamingContext private (
initLogging()
if (sc_ == null && cp_ == null) {
- throw new Exception("Streaming Context cannot be initilalized with " +
+ throw new Exception("Spark Streaming cannot be initialized with " +
"both SparkContext and checkpoint as null")
}
+ if (MetadataCleaner.getDelaySeconds < 0) {
+ throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
+ + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")
+ }
+
protected[streaming] val isCheckpointPresent = (cp_ != null)
protected[streaming] val sc: SparkContext = {
if (isCheckpointPresent) {
- new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars)
+ new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars, cp_.environment)
} else {
sc_
}
@@ -478,8 +494,12 @@ object StreamingContext {
new PairDStreamFunctions[K, V](stream)
}
- protected[streaming] def createNewSparkContext(master: String, appName: String): SparkContext = {
-
+ protected[streaming] def createNewSparkContext(
+ master: String,
+ appName: String,
+ sparkHome: String,
+ jars: Seq[String],
+ environment: Map[String, String]): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
// This should be sufficient for even 1 second interval.
if (MetadataCleaner.getDelaySeconds < 0) {
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index f3b40b5b88..b528ebbc19 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -45,6 +45,42 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Creates a StreamingContext.
+ * @param master Name of the Spark Master
+ * @param appName Name to be used when registering with the scheduler
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param sparkHome The SPARK_HOME directory on the slave nodes
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ */
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String,
+ jars: Array[String]) =
+ this(new StreamingContext(master, appName, batchDuration, sparkHome, jars))
+
+ /**
+ * Creates a StreamingContext.
+ * @param master Name of the Spark Master
+ * @param appName Name to be used when registering with the scheduler
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param sparkHome The SPARK_HOME directory on the slave nodes
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ * @param environment Environment variables to set on worker nodes
+ */
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String,
+ jars: Array[String],
+ environment: JMap[String, String]) =
+ this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, environment))
+
+ /**
+ * Creates a StreamingContext using an existing SparkContext.
* @param sparkContext The underlying JavaSparkContext to use
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index dc7139cc27..ddd9becf32 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -13,6 +13,7 @@ import kafka.serializer.StringDecoder
import kafka.utils.{Utils, ZKGroupTopicDirs}
import kafka.utils.ZkUtils._
+import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._