aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-programming-guide.md
diff options
context:
space:
mode:
Diffstat (limited to 'docs/streaming-programming-guide.md')
-rw-r--r--docs/streaming-programming-guide.md64
1 files changed, 50 insertions, 14 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 3d4bce4966..41f170580f 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -233,7 +233,7 @@ $ ./bin/run-example streaming.NetworkWordCount localhost 9999
</div>
<div data-lang="java" markdown="1">
{% highlight bash %}
-$ ./bin/run-example JavaNetworkWordCount localhost 9999
+$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
{% endhighlight %}
</div>
</div>
@@ -262,7 +262,7 @@ hello world
{% highlight bash %}
# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount
-$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999
+$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
@@ -285,12 +285,22 @@ need to know to write your streaming applications.
## Linking
-To write your own Spark Streaming program, you will have to add the following dependency to your
- SBT or Maven project:
+Similar to Spark, Spark Streaming is available through Maven Central. To write your own Spark Streaming program, you will have to add the following dependency to your SBT or Maven project.
+
+<div class="codetabs">
+<div data-lang="Maven" markdown="1">
- groupId = org.apache.spark
- artifactId = spark-streaming_{{site.SCALA_BINARY_VERSION}}
- version = {{site.SPARK_VERSION}}
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_{{site.SCALA_BINARY_VERSION}}</artifactId>
+ <version>{{site.SPARK_VERSION}}</version>
+ </dependency>
+</div>
+<div data-lang="SBT" markdown="1">
+
+ libraryDependencies += "org.apache.spark" % "spark-streaming_{{site.SCALA_BINARY_VERSION}}" % "{{site.SPARK_VERSION}}"
+</div>
+</div>
For ingesting data from sources like Kafka, Flume, and Kinesis that are not present in the Spark
Streaming core
@@ -302,7 +312,7 @@ some of the common ones are as follows.
<tr><th>Source</th><th>Artifact</th></tr>
<tr><td> Kafka </td><td> spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}} </td></tr>
<tr><td> Flume </td><td> spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} </td></tr>
-<tr><td> Kinesis<br/></td><td>spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} </td></tr>
+<tr><td> Kinesis<br/></td><td>spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} [Apache Software License] </td></tr>
<tr><td> Twitter </td><td> spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}} </td></tr>
<tr><td> ZeroMQ </td><td> spark-streaming-zeromq_{{site.SCALA_BINARY_VERSION}} </td></tr>
<tr><td> MQTT </td><td> spark-streaming-mqtt_{{site.SCALA_BINARY_VERSION}} </td></tr>
@@ -373,7 +383,7 @@ or a special __"local[\*]"__ string to run in local mode. In practice, when runn
you will not want to hardcode `master` in the program,
but rather [launch the application with `spark-submit`](submitting-applications.html) and
receive it there. However, for local testing and unit tests, you can pass "local[*]" to run Spark Streaming
-in-process. Note that this internally creates a [JavaSparkContext](api/java/index.html?org/apache/spark/api/java/JavaSparkContext.html) (starting point of all Spark functionality) which can be accessed as `ssc.sparkContext`.
+in-process. Note that this internally creates a [JavaSparkContext](api/java/index.html?org/apache/spark/api/java/JavaSparkContext.html) (starting point of all Spark functionality) which can be accessed as `ssc.sparkContext`.
The batch interval must be set based on the latency requirements of your application
and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-size)
@@ -447,11 +457,12 @@ Spark Streaming has two categories of streaming sources.
- *Basic sources*: Sources directly available in the StreamingContext API. Example: file systems, socket connections, and Akka actors.
- *Advanced sources*: Sources like Kafka, Flume, Kinesis, Twitter, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the [linking](#linking) section.
-Every input DStream (except file stream) is associated with a single [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) object which receives the data from a source and stores it in Spark's memory for processing. A receiver is run within a Spark worker/executor as a long-running task, hence it occupies one of the cores allocated to the Spark Streaming application. Hence, it is important to remember that Spark Streaming application needs to be allocated enough cores to process the received data, as well as, to run the receiver(s). Therefore, few important points to remember are:
+Every input DStream (except file stream) is associated with a single [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) object which receives the data from a source and stores it in Spark's memory for processing. So every input DStream receives a single stream of data. Note that in a streaming application, you can create multiple input DStreams to receive multiple streams of data in parallel. This is discussed later in the [Performance Tuning](#level-of-parallelism-in-data-receiving) section.
+
+A receiver is run within a Spark worker/executor as a long-running task, hence it occupies one of the cores allocated to the Spark Streaming application. Hence, it is important to remember that Spark Streaming application needs to be allocated enough cores to process the received data, as well as, to run the receiver(s). Therefore, few important points to remember are:
##### Points to remember:
{:.no_toc}
-
- If the number of cores allocated to the application is less than or equal to the number of input DStreams / receivers, then the system will receive data, but not be able to process them.
- When running locally, if you master URL is set to "local", then there is only one core to run tasks. That is insufficient for programs with even one input DStream (file streams are okay) as the receiver will occupy that core and there will be no core left to process the data.
@@ -1089,9 +1100,34 @@ parallelizing the data receiving. Note that each input DStream
creates a single receiver (running on a worker machine) that receives a single stream of data.
Receiving multiple data streams can therefore be achieved by creating multiple input DStreams
and configuring them to receive different partitions of the data stream from the source(s).
-For example, a single Kafka input stream receiving two topics of data can be split into two
+For example, a single Kafka input DStream receiving two topics of data can be split into two
Kafka input streams, each receiving only one topic. This would run two receivers on two workers,
-thus allowing data to be received in parallel, and increasing overall throughput.
+thus allowing data to be received in parallel, and increasing overall throughput. These multiple
+DStream can be unioned together to create a single DStream. Then the transformations that was
+being applied on the single input DStream can applied on the unified stream. This is done as follows.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val numStreams = 5
+val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
+val unifiedStream = streamingContext.union(kafkaStreams)
+unifiedStream.print()
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+int numStreams = 5;
+List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
+for (int i = 0; i < numStreams; i++) {
+ kafkaStreams.add(KafkaUtils.createStream(...));
+}
+JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
+unifiedStream.print();
+{% endhighlight %}
+</div>
+</div>
+
Another parameter that should be considered is the receiver's blocking interval. For most receivers,
the received data is coalesced together into large blocks of data before storing inside Spark's memory.
@@ -1107,7 +1143,7 @@ before further processing.
### Level of Parallelism in Data Processing
{:.no_toc}
-Cluster resources maybe under-utilized if the number of parallel tasks used in any stage of the
+Cluster resources can be under-utilized if the number of parallel tasks used in any stage of the
computation is not high enough. For example, for distributed reduce operations like `reduceByKey`
and `reduceByKeyAndWindow`, the default number of parallel tasks is decided by the [config property]
(configuration.html#spark-properties) `spark.default.parallelism`. You can pass the level of