aboutsummaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-03-11 18:48:21 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-03-11 18:48:21 -0700
commitcd3b68d93a01f11bd3d5a441b341cb33d227e900 (patch)
treea427f6dbdae218857ec6e8de066b76bf0f43f8ed /docs
parent51a79a770a8356bd0ed244af5ca7f1c44c9437d2 (diff)
downloadspark-cd3b68d93a01f11bd3d5a441b341cb33d227e900.tar.gz
spark-cd3b68d93a01f11bd3d5a441b341cb33d227e900.tar.bz2
spark-cd3b68d93a01f11bd3d5a441b341cb33d227e900.zip
[SPARK-6128][Streaming][Documentation] Updates to Spark Streaming Programming Guide
Updates to the documentation are as follows: - Added information on Kafka Direct API and Kafka Python API - Added joins to the main streaming guide - Improved details on the fault-tolerance semantics Generated docs located here http://people.apache.org/~tdas/spark-1.3.0-temp-docs/streaming-programming-guide.html#fault-tolerance-semantics More things to add: - Configuration for Kafka receive rate - May be add concurrentJobs Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4956 from tdas/streaming-guide-update-1.3 and squashes the following commits: 819408c [Tathagata Das] Minor fixes. debe484 [Tathagata Das] Added DataFrames and MLlib 380cf8d [Tathagata Das] Fix link 04167a6 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-guide-update-1.3 0b77486 [Tathagata Das] Updates based on Josh's comments. 86c4c2a [Tathagata Das] Updated streaming guides 82de92a [Tathagata Das] Add Kafka to Python api docs
Diffstat (limited to 'docs')
-rw-r--r--docs/configuration.md14
-rw-r--r--docs/streaming-flume-integration.md2
-rw-r--r--docs/streaming-kafka-integration.md151
-rw-r--r--docs/streaming-programming-guide.md470
4 files changed, 528 insertions, 109 deletions
diff --git a/docs/configuration.md b/docs/configuration.md
index ae90fe1f8f..a7116fbece 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1345,9 +1345,9 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.streaming.receiver.maxRate</code></td>
- <td>infinite</td>
+ <td>not set</td>
<td>
- Maximum number records per second at which each receiver will receive data.
+ Maximum rate (number of records per second) at which each receiver will receive data.
Effectively, each stream will consume at most this number of records per second.
Setting this configuration to 0 or a negative number will put no limit on the rate.
See the <a href="streaming-programming-guide.html#deploying-applications">deployment guide</a>
@@ -1375,6 +1375,16 @@ Apart from these, the following properties are also available, and may be useful
higher memory usage in Spark.
</td>
</tr>
+<tr>
+ <td><code>spark.streaming.kafka.maxRatePerPartition</code></td>
+ <td>not set</td>
+ <td>
+ Maximum rate (number of records per second) at which data will be read from each Kafka
+ partition when using the new Kafka direct stream API. See the
+ <a href="streaming-kafka-integration.html">Kafka Integration guide</a>
+ for more details.
+ </td>
+</tr>
</table>
#### Cluster Managers
diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md
index 40e17246fe..c8ab146bca 100644
--- a/docs/streaming-flume-integration.md
+++ b/docs/streaming-flume-integration.md
@@ -5,6 +5,8 @@ title: Spark Streaming + Flume Integration Guide
[Apache Flume](https://flume.apache.org/) is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this.
+<span class="badge" style="background-color: grey">Python API</span> Flume is not yet available in the Python API.
+
## Approach 1: Flume-style Push-based Approach
Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps.
diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md
index 77c0abbbac..64714f0b79 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -2,58 +2,155 @@
layout: global
title: Spark Streaming + Kafka Integration Guide
---
-[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Here we explain how to configure Spark Streaming to receive data from Kafka.
+[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Here we explain how to configure Spark Streaming to receive data from Kafka. There are two approaches to this - the old approach using Receivers and Kafka's high-level API, and a new experimental approach (introduced in Spark 1.3) without using Receivers. They have different programming models, performance characteristics, and semantics guarantees, so read on for more details.
-1. **Linking:** In your SBT/Maven project definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
+## Approach 1: Receiver-based Approach
+This approach uses a Receiver to receive the data. The Received is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data.
+
+However, under default configuration, this approach can lose data under failures (see [receiver reliability](streaming-programming-guide.html#receiver-reliability). To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming. To ensure zero data loss, enable the Write Ahead Logs (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. See [Deploying section](streaming-programming-guide.html#deploying-applications) in the streaming programming guide for more details on Write Ahead Logs.
+
+Next, we discuss how to use this approach in your streaming application.
+
+1. **Linking:** For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
groupId = org.apache.spark
artifactId = spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION_SHORT}}
-2. **Programming:** In the streaming application code, import `KafkaUtils` and create input DStream as follows.
+ For Python applications, you will have to add this above library and its dependencies when deploying your application. See the *Deploying* subsection below.
+
+2. **Programming:** In the streaming application code, import `KafkaUtils` and create an input DStream as follows.
<div class="codetabs">
<div data-lang="scala" markdown="1">
import org.apache.spark.streaming.kafka._
- val kafkaStream = KafkaUtils.createStream(
- streamingContext, [zookeeperQuorum], [group id of the consumer], [per-topic number of Kafka partitions to consume])
+ val kafkaStream = KafkaUtils.createStream(streamingContext,
+ [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
- See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
+ You can also specify the key and value classes and their corresponding decoder classes using variations of `createStream`. See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala).
</div>
<div data-lang="java" markdown="1">
import org.apache.spark.streaming.kafka.*;
- JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(
- streamingContext, [zookeeperQuorum], [group id of the consumer], [per-topic number of Kafka partitions to consume]);
+ JavaPairReceiverInputDStream<String, String> kafkaStream =
+ KafkaUtils.createStream(streamingContext,
+ [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
- See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
+ You can also specify the key and value classes and their corresponding decoder classes using variations of `createStream`. See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java).
+
+ </div>
+ <div data-lang="python" markdown="1">
+ from pyspark.streaming.kafka import KafkaUtils
+
+ kafkaStream = KafkaUtils.createStream(streamingContext, \
+ [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
+
+ By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
+ and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/kafka_wordcount.py).
</div>
</div>
- *Points to remember:*
+ **Points to remember:**
- Topic partitions in Kafka does not correlate to partitions of RDDs generated in Spark Streaming. So increasing the number of topic-specific partitions in the `KafkaUtils.createStream()` only increases the number of threads using which topics that are consumed within a single receiver. It does not increase the parallelism of Spark in processing the data. Refer to the main document for more information on that.
- Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers.
-3. **Deploying:** Package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
-
-Note that the Kafka receiver used by default is an
-[*unreliable* receiver](streaming-programming-guide.html#receiver-reliability) section in the
-programming guide). In Spark 1.2, we have added an experimental *reliable* Kafka receiver that
-provides stronger
-[fault-tolerance guarantees](streaming-programming-guide.html#fault-tolerance-semantics) of zero
-data loss on failures. This receiver is automatically used when the write ahead log
-(also introduced in Spark 1.2) is enabled
-(see [Deployment](#deploying-applications.html) section in the programming guide). This
-may reduce the receiving throughput of individual Kafka receivers compared to the unreliable
-receivers, but this can be corrected by running
-[more receivers in parallel](streaming-programming-guide.html#level-of-parallelism-in-data-receiving)
-to increase aggregate throughput. Additionally, it is recommended that the replication of the
-received data within Spark be disabled when the write ahead log is enabled as the log is already stored
-in a replicated storage system. This can be done by setting the storage level for the input
-stream to `StorageLevel.MEMORY_AND_DISK_SER` (that is, use
+ - If you have enabled Write Ahead Logs with a replicated file system like HDFS, the received data is already being replicated in the log. Hence, the storage level in storage level for the input stream to `StorageLevel.MEMORY_AND_DISK_SER` (that is, use
`KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)`).
+
+3. **Deploying:** As with any Spark applications, `spark-submit` is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications.
+
+ For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
+
+ For Python applications which lack SBT/Maven project management, `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies can be directly added to `spark-submit` using `--packages` (see [Application Submission Guide](submitting-applications.html)). That is,
+
+ ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...
+
+ Alternatively, you can also download the JAR of the Maven artifact `spark-streaming-kafka-assembly` from the
+ [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`.
+
+## Approach 2: Direct Approach (No Receivers)
+This is a new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature in Spark 1.3 and is only available in the Scala and Java API.
+
+This approach has the following advantages over the received-based approach (i.e. Approach 1).
+
+- *Simplified Parallelism:* No need to create multiple input Kafka streams and union-ing them. With `directStream`, Spark Streaming will create as many RDD partitions as there is Kafka partitions to consume, which will all read data from Kafka in parallel. So there is one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.
+
+- *Efficiency:* Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminate the problem as there is no receiver, and hence no need for Write Ahead Logs.
+
+- *Exactly-once semantics:* The first approach uses Kafka's high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper and offsets tracked only by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures.
+
+Note that one disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself (see below).
+
+Next, we discuss how to use this approach in your streaming application.
+
+1. **Linking:** This approach is supported only in Scala/Java application. Link your SBT/Maven project with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
+
+ groupId = org.apache.spark
+ artifactId = spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
+ version = {{site.SPARK_VERSION_SHORT}}
+
+2. **Programming:** In the streaming application code, import `KafkaUtils` and create an input DStream as follows.
+
+ <div class="codetabs">
+ <div data-lang="scala" markdown="1">
+ import org.apache.spark.streaming.kafka._
+
+ val directKafkaStream = KafkaUtils.createDirectStream[
+ [key class], [value class], [key decoder class], [value decoder class] ](
+ streamingContext, [map of Kafka parameters], [set of topics to consume])
+
+ See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
+ and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala).
+ </div>
+ <div data-lang="java" markdown="1">
+ import org.apache.spark.streaming.kafka.*;
+
+ JavaPairReceiverInputDStream<String, String> directKafkaStream =
+ KafkaUtils.createDirectStream(streamingContext,
+ [key class], [value class], [key decoder class], [value decoder class],
+ [map of Kafka parameters], [set of topics to consume]);
+
+ See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
+ and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).
+
+ </div>
+ </div>
+
+ In the Kafka parameters, you must specify either `metadata.broker.list` or `bootstrap.servers`.
+ By default, it will start consuming from the latest offset of each Kafka partition. If you set configuration `auto.offset.reset` in Kafka parameters to `smallest`, then it will start consuming from the smallest offset.
+
+ You can also start consuming from any arbitrary offset using other variations of `KafkaUtils.createDirectStream`. Furthermore, if you want to access the Kafka offsets consumed in each batch, you can do the following.
+
+ <div class="codetabs">
+ <div data-lang="scala" markdown="1">
+ directKafkaStream.foreachRDD { rdd =>
+ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
+ // offsetRanges.length = # of Kafka partitions being consumed
+ ...
+ }
+ </div>
+ <div data-lang="java" markdown="1">
+ directKafkaStream.foreachRDD(
+ new Function<JavaPairRDD<String, String>, Void>() {
+ @Override
+ public Void call(JavaPairRDD<String, Integer> rdd) throws IOException {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges
+ // offsetRanges.length = # of Kafka partitions being consumed
+ ...
+ return null;
+ }
+ }
+ );
+ </div>
+ </div>
+
+ You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.
+
+ Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is, [configurations](configuration.html) of the form `spark.streaming.receiver.*` ) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use the [configurations](configuration.html) `spark.streaming.kafka.*`. An important one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate at which each Kafka partition will be read by this direct API.
+
+3. **Deploying:** Similar to the first approach, you can package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR and the launch the application using `spark-submit`. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. \ No newline at end of file
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 062ac2648d..6d6229625f 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -432,7 +432,7 @@ some of the common ones are as follows.
</table>
For an up-to-date list, please refer to the
-[Apache repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
+[Maven repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
for the full list of supported sources and artifacts.
***
@@ -662,8 +662,7 @@ methods for creating DStreams from files and Akka actors as input sources.
For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores.
- <span class="badge" style="background-color: grey">Python API</span> As of Spark 1.2,
- `fileStream` is not available in the Python API, only `textFileStream` is available.
+ <span class="badge" style="background-color: grey">Python API</span> `fileStream` is not available in the Python API, only `textFileStream` is available.
- **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka
actors by using `streamingContext.actorStream(actorProps, actor-name)`. See the [Custom Receiver
@@ -682,8 +681,9 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
### Advanced Sources
{:.no_toc}
-<span class="badge" style="background-color: grey">Python API</span> As of Spark 1.2,
-these sources are not available in the Python API.
+
+<span class="badge" style="background-color: grey">Python API</span> As of Spark 1.3,
+out of these sources, *only* Kafka is available in the Python API. We will add more advanced sources in the Python API in future.
This category of sources require interfacing with external non-Spark libraries, some of them with
complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts
@@ -723,6 +723,12 @@ and it in the classpath.
Some of these advanced sources are as follows.
+- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kafka 0.8.1.1. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details.
+
+- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Flume 1.4.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details.
+
+- **Kinesis:** See the [Kinesis Integration Guide](streaming-kinesis-integration.html) for more details.
+
- **Twitter:** Spark Streaming's TwitterUtils uses Twitter4j 3.0.3 to get the public stream of tweets using
[Twitter's Streaming API](https://dev.twitter.com/docs/streaming-apis). Authentication information
can be provided by any of the [methods](http://twitter4j.org/en/configuration.html) supported by
@@ -732,17 +738,10 @@ Some of these advanced sources are as follows.
([TwitterPopularTags]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala)
and [TwitterAlgebirdCMS]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala)).
-- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} can received data from Flume 1.4.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details.
-
-- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} can receive data from Kafka 0.8.0. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details.
-
-- **Kinesis:** See the [Kinesis Integration Guide](streaming-kinesis-integration.html) for more details.
-
### Custom Sources
{:.no_toc}
-<span class="badge" style="background-color: grey">Python API</span> As of Spark 1.2,
-these sources are not available in the Python API.
+<span class="badge" style="background-color: grey">Python API</span> This is not yet supported in Python.
Input DStreams can also be created out of custom data sources. All you have to do is implement an
user-defined **receiver** (see next section to understand what that is) that can receive data from
@@ -846,7 +845,7 @@ Some of the common ones are as follows.
<tr><td></td><td></td></tr>
</table>
-The last two transformations are worth highlighting again.
+A few of these transformations are worth discussing in more detail.
#### UpdateStateByKey Operation
{:.no_toc}
@@ -997,7 +996,7 @@ In fact, you can also use [machine learning](mllib-guide.html) and
#### Window Operations
{:.no_toc}
-Finally, Spark Streaming also provides *windowed computations*, which allow you to apply
+Spark Streaming also provides *windowed computations*, which allow you to apply
transformations over a sliding window of data. This following figure illustrates this sliding
window.
@@ -1120,6 +1119,100 @@ said two parameters - <i>windowLength</i> and <i>slideInterval</i>.
<tr><td></td><td></td></tr>
</table>
+#### Join Operations
+{:.no_toc}
+Finally, its worth highlighting how easily you can perform different kinds of joins in Spark Streaming.
+
+
+##### Stream-stream joins
+{:.no_toc}
+Streams can be very easily joined with other streams.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val stream1: DStream[String, String] = ...
+val stream2: DStream[String, String] = ...
+val joinedStream = stream1.join(stream2)
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+JavaPairDStream<String, String> stream1 = ...
+JavaPairDStream<String, String> stream2 = ...
+JavaPairDStream<String, String> joinedStream = stream1.join(stream2);
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+stream1 = ...
+stream2 = ...
+joinedStream = stream1.join(stream2)
+{% endhighlight %}
+</div>
+</div>
+Here, in each batch interval, the RDD generated by `stream1` will be joined with the RDD generated by `stream2`. You can also do `leftOuterJoin`, `rightOuterJoin`, `fullOuterJoin`. Furthermore, it is often very useful to do joins over windows of the streams. That is pretty easy as well.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val windowedStream1 = stream1.window(Seconds(20))
+val windowedStream2 = stream2.window(Minutes(1))
+val joinedStream = windowedStream1.join(windowedStream2)
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
+JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
+JavaPairDStream<String, String> joinedStream = windowedStream1.join(windowedStream2);
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+windowedStream1 = stream1.window(20)
+windowedStream2 = stream2.window(60)
+joinedStream = windowedStream1.join(windowedStream2)
+{% endhighlight %}
+</div>
+</div>
+
+##### Stream-dataset joins
+{:.no_toc}
+This has already been shown earlier while explain `DStream.transform` operation. Here is yet another example of joining a windowed stream with a dataset.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val dataset: RDD[String, String] = ...
+val windowedStream = stream.window(Seconds(20))...
+val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+JavaPairRDD<String, String> dataset = ...
+JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
+JavaPairDStream<String, String> joinedStream = windowedStream.transform(
+ new Function<JavaRDD<Tuple2<String, String>>, JavaRDD<Tuple2<String, String>>>() {
+ @Override
+ public JavaRDD<Tuple2<String, String>> call(JavaRDD<Tuple2<String, String>> rdd) {
+ return rdd.join(dataset);
+ }
+ }
+);
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+dataset = ... # some RDD
+windowedStream = stream.window(20)
+joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
+{% endhighlight %}
+</div>
+</div>
+
+In fact, you can also dynamically change the dataset you want to join against. The function provided to `transform` is evaluated every batch interval and therefore will use the current dataset that `dataset` reference points to.
The complete list of DStream transformations is available in the API documentation. For the Scala API,
see [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream)
@@ -1327,6 +1420,178 @@ Note that the connections in the pool should be lazily created on demand and tim
***
+## DataFrame and SQL Operations
+You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+/** Lazily instantiated singleton instance of SQLContext */
+object SQLContextSingleton {
+ @transient private var instance: SQLContext = null
+
+ // Instantiate SQLContext on demand
+ def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
+ if (instance == null) {
+ instance = new SQLContext(sparkContext)
+ }
+ instance
+ }
+}
+
+...
+
+/** Case class for converting RDD to DataFrame */
+case class Row(word: String)
+
+...
+
+/** DataFrame operations inside your streaming program */
+
+val words: DStream[String] = ...
+
+words.foreachRDD { rdd =>
+
+ // Get the singleton instance of SQLContext
+ val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
+ import sqlContext.implicits._
+
+ // Convert RDD[String] to RDD[case class] to DataFrame
+ val wordsDataFrame = rdd.map(w => Row(w)).toDF()
+
+ // Register as table
+ wordsDataFrame.registerTempTable("words")
+
+ // Do word count on DataFrame using SQL and print it
+ val wordCountsDataFrame =
+ sqlContext.sql("select word, count(*) as total from words group by word")
+ wordCountsDataFrame.show()
+}
+
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala).
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+/** Lazily instantiated singleton instance of SQLContext */
+class JavaSQLContextSingleton {
+ static private transient SQLContext instance = null;
+ static public SQLContext getInstance(SparkContext sparkContext) {
+ if (instance == null) {
+ instance = new SQLContext(sparkContext);
+ }
+ return instance;
+ }
+}
+
+...
+
+/** Java Bean class for converting RDD to DataFrame */
+public class JavaRow implements java.io.Serializable {
+ private String word;
+
+ public String getWord() {
+ return word;
+ }
+
+ public void setWord(String word) {
+ this.word = word;
+ }
+}
+
+...
+
+/** DataFrame operations inside your streaming program */
+
+JavaDStream<String> words = ...
+
+words.foreachRDD(
+ new Function2<JavaRDD<String>, Time, Void>() {
+ @Override
+ public Void call(JavaRDD<String> rdd, Time time) {
+ SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
+
+ // Convert RDD[String] to RDD[case class] to DataFrame
+ JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
+ public JavaRow call(String word) {
+ JavaRow record = new JavaRow();
+ record.setWord(word);
+ return record;
+ }
+ });
+ DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
+
+ // Register as table
+ wordsDataFrame.registerTempTable("words");
+
+ // Do word count on table using SQL and print it
+ DataFrame wordCountsDataFrame =
+ sqlContext.sql("select word, count(*) as total from words group by word");
+ wordCountsDataFrame.show();
+ return null;
+ }
+ }
+);
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java).
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+# Lazily instantiated global instance of SQLContext
+def getSqlContextInstance(sparkContext):
+ if ('sqlContextSingletonInstance' not in globals()):
+ globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
+ return globals()['sqlContextSingletonInstance']
+
+...
+
+# DataFrame operations inside your streaming program
+
+words = ... # DStream of strings
+
+def process(time, rdd):
+ print "========= %s =========" % str(time)
+ try:
+ # Get the singleton instance of SQLContext
+ sqlContext = getSqlContextInstance(rdd.context)
+
+ # Convert RDD[String] to RDD[Row] to DataFrame
+ rowRdd = rdd.map(lambda w: Row(word=w))
+ wordsDataFrame = sqlContext.createDataFrame(rowRdd)
+
+ # Register as table
+ wordsDataFrame.registerTempTable("words")
+
+ # Do word count on table using SQL and print it
+ wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word")
+ wordCountsDataFrame.show()
+ except:
+ pass
+
+words.foreachRDD(process)
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/sql_network_wordcount.py).
+
+</div>
+</div>
+
+You can also run SQL queries on tables defined on streaming data from a different thread (that is, asynchronous to the running StreamingContext). Just make sure that you set the StreamingContext to remember sufficient amount of streaming data such that query can run. Otherwise the StreamingContext, which is unaware of the any asynchronous SQL queries, will delete off old streaming data before the query can complete. For example, if you want to query the last batch, but your query can take 5 minutes to run, then call `streamingContext.remember(Minutes(5))` (in Scala, or equivalent in other languages).
+
+See the [DataFrames and SQL](sql-programming-guide.html) guide to learn more about DataFrames.
+
+***
+
+## MLlib Operations
+You can also easily use machine learning algorithms provided by [MLlib](mllib-guide.html). First of all, there are streaming machine learning algorithms (e.g. (Streaming Linear Regression](mllib-linear-methods.html#streaming-linear-regression), [Streaming KMeans](mllib-clustering.html#streaming-k-means), etc.) which can simultaneously learn from the streaming data as well as apply the model on the streaming data. Beyond these, for a much larger class of machine learning algorithms, you can learn a learning model offline (i.e. using historical data) and then apply the model online on streaming data. See the [MLlib](mllib-guide.html) guide for more details.
+
+***
+
## Caching / Persistence
Similar to RDDs, DStreams also allow developers to persist the stream's data in memory. That is,
using `persist()` method on a DStream will automatically persist every RDD of that DStream in
@@ -1580,9 +1845,8 @@ To run a Spark Streaming applications, you need to have the following.
+ *Mesos* - [Marathon](https://github.com/mesosphere/marathon) has been used to achieve this
with Mesos.
-
-- *[Experimental in Spark 1.2] Configuring write ahead logs* - In Spark 1.2,
- we have introduced a new experimental feature of write ahead logs for achieving strong
+- *[Since Spark 1.2] Configuring write ahead logs* - Since Spark 1.2,
+ we have introduced _write ahead logs_ for achieving strong
fault-tolerance guarantees. If enabled, all the data received from a receiver gets written into
a write ahead log in the configuration checkpoint directory. This prevents data loss on driver
recovery, thus ensuring zero data loss (discussed in detail in the
@@ -1668,7 +1932,7 @@ improve the performance of you application. At a high level, you need to conside
2. Setting the right batch size such that the batches of data can be processed as fast as they
are received (that is, data processing keeps up with the data ingestion).
-## Reducing the Processing Time of each Batch
+## Reducing the Batch Processing Times
There are a number of optimizations that can be done in Spark to minimize the processing time of
each batch. These have been discussed in detail in [Tuning Guide](tuning.html). This section
highlights some of the most important ones.
@@ -1740,16 +2004,15 @@ documentation), or set the `spark.default.parallelism`
### Data Serialization
{:.no_toc}
-The overhead of data serialization can be significant, especially when sub-second batch sizes are
- to be achieved. There are two aspects to it.
+The overheads of data serialization can be reduce by tuning the serialization formats. In case of streaming, there are two types of data that are being serialized.
+
+* **Input data**: By default, the input data received through Receivers is stored in the executors' memory with [StorageLevel.MEMORY_AND_DISK_SER_2](api/scala/index.html#org.apache.spark.storage.StorageLevel$). That is, the data is serialized into bytes to reduce GC overheads, and replicated for tolerating executor failures. Also, the data is kept first in memory, and spilled over to disk only if the memory is unsufficient to hold all the input data necessary for the streaming computation. This serialization obviously has overheads -- the receiver must deserialize the received data and re-serialize it using Spark's serialization format.
-* **Serialization of RDD data in Spark**: Please refer to the detailed discussion on data
- serialization in the [Tuning Guide](tuning.html). However, note that unlike Spark, by default
- RDDs are persisted as serialized byte arrays to minimize pauses related to GC.
+* **Persisted RDDs generated by Streaming Operations**: RDDs generated by streaming computations may be persisted in memory. For example, window operation persist data in memory as they would be processed multiple times. However, unlike Spark, by default RDDs are persisted with [StorageLevel.MEMORY_ONLY_SER](api/scala/index.html#org.apache.spark.storage.StorageLevel$) (i.e. serialized) to minimize GC overheads.
-* **Serialization of input data**: To ingest external data into Spark, data received as bytes
- (say, from the network) needs to deserialized from bytes and re-serialized into Spark's
- serialization format. Hence, the deserialization overhead of input data may be a bottleneck.
+In both cases, using Kryo serialization can reduce both CPU and memory overheads. See the [Spark Tuning Guide](tuning.html#data-serialization)) for more details. Consider registering custom classes, and disabling object reference tracking for Kryo (see Kryo-related configurations in the [Configuration Guide](configuration.html#compression-and-serialization)).
+
+In specific cases where the amount of data that needs to be retained for the streaming application is not large, it may be feasible to persist data (both types) as deserialized objects without incurring excessive GC overheads. For example, if you are using batch intervals of few seconds and no window operations, then you can try disabling serialization in persisted data by explicitly setting the storage level accordingly. This would reduce the CPU overheads due to serialization, potentially improving performance without too much GC overheads.
### Task Launching Overheads
{:.no_toc}
@@ -1769,7 +2032,7 @@ thus allowing sub-second batch size to be viable.
***
-## Setting the Right Batch Size
+## Setting the Right Batch Interval
For a Spark Streaming application running on a cluster to be stable, the system should be able to
process data as fast as it is being received. In other words, batches of data should be processed
as fast as they are being generated. Whether this is true for an application can be found by
@@ -1801,40 +2064,40 @@ temporary data rate increases maybe fine as long as the delay reduces back to a
## Memory Tuning
Tuning the memory usage and GC behavior of Spark applications have been discussed in great detail
-in the [Tuning Guide](tuning.html). It is recommended that you read that. In this section,
-we highlight a few customizations that are strongly recommended to minimize GC related pauses
-in Spark Streaming applications and achieving more consistent batch processing times.
-
-* **Default persistence level of DStreams**: Unlike RDDs, the default persistence level of DStreams
-serializes the data in memory (that is,
-[StorageLevel.MEMORY_ONLY_SER](api/scala/index.html#org.apache.spark.storage.StorageLevel$) for
-DStream compared to
-[StorageLevel.MEMORY_ONLY](api/scala/index.html#org.apache.spark.storage.StorageLevel$) for RDDs).
-Even though keeping the data serialized incurs higher serialization/deserialization overheads,
-it significantly reduces GC pauses.
-
-* **Clearing persistent RDDs**: By default, all persistent RDDs generated by Spark Streaming will
- be cleared from memory based on Spark's built-in policy (LRU). If `spark.cleaner.ttl` is set,
- then persistent RDDs that are older than that value are periodically cleared. As mentioned
- [earlier](#operation), this needs to be careful set based on operations used in the Spark
- Streaming program. However, a smarter unpersisting of RDDs can be enabled by setting the
- [configuration property](configuration.html#spark-properties) `spark.streaming.unpersist` to
- `true`. This makes the system to figure out which RDDs are not necessary to be kept around and
- unpersists them. This is likely to reduce
- the RDD memory usage of Spark, potentially improving GC behavior as well.
-
-* **Concurrent garbage collector**: Using the concurrent mark-and-sweep GC further
-minimizes the variability of GC pauses. Even though concurrent GC is known to reduce the
+in the [Tuning Guide](tuning.html#memory-tuning). It is strongly recommended that you read that. In this section, we discuss a few tuning parameters specifically in the context of Spark Streaming applications.
+
+The amount of cluster memory required by a Spark Streaming application depends heavily on the type of transformations used. For example, if you want to use a window operation on last 10 minutes of data, then your cluster should have sufficient memory to hold 10 minutes of worth of data in memory. Or if you want to use `updateStateByKey` with a large number of keys, then the necessary memory will be high. On the contrary, if you want to do a simple map-filter-store operation, then necessary memory will be low.
+
+In general, since the data received through receivers are stored with StorageLevel.MEMORY_AND_DISK_SER_2, the data that does not fit in memory will spill over to the disk. This may reduce the performance of the streaming application, and hence it is advised to provide sufficient memory as required by your streaming application. Its best to try and see the memory usage on a small scale and estimate accordingly.
+
+Another aspect of memory tuning is garbage collection. For a streaming application that require low latency, it is undesirable to have large pauses caused by JVM Garbage Collection.
+
+There are a few parameters that can help you tune the memory usage and GC overheads.
+
+* **Persistence Level of DStreams**: As mentioned earlier in the [Data Serialization](#data-serialization) section, the input data and RDDs are by default persisted as serialized bytes. This reduces both, the memory usage and GC overheads, compared to deserialized persistence. Enabling Kryo serialization further reduces serialized sizes and memory usage. Further reduction in memory usage can be achieved with compression (see the Spark configuration `spark.rdd.compress`), at the cost of CPU time.
+
+* **Clearing old data**: By default, all input data and persisted RDDs generated by DStream transformations are automatically cleared. Spark Streaming decides when to clear the data based on the transformations that are used. For example, if you are using window operation of 10 minutes, then Spark Streaming will keep around last 10 minutes of data, and actively throw away older data.
+Data can be retained for longer duration (e.g. interactively querying older data) by setting `streamingContext.remember`.
+
+* **CMS Garbage Collector**: Use of the concurrent mark-and-sweep GC is strongly recommended for keeping GC-related pauses consistently low. Even though concurrent GC is known to reduce the
overall processing throughput of the system, its use is still recommended to achieve more
-consistent batch processing times.
+consistent batch processing times. Make sure you set the CMS GC on both the driver (using `--driver-java-options` in `spark-submit`) and the executors (using [Spark configuration](configuration.html#runtime-environment) `spark.executor.extraJavaOptions`).
+
+* **Other tips**: To further reduce GC overheads, here are some more tips to try.
+ - Use Tachyon for off-heap storage of persisted RDDs. See more detail in the [Spark Programming Guide](programming-guide.html#rdd-persistence).
+ - Use more executors with smaller heap sizes. This will reduce the GC pressure within each JVM heap.
+
***************************************************************************************************
***************************************************************************************************
# Fault-tolerance Semantics
In this section, we will discuss the behavior of Spark Streaming applications in the event
-of node failures. To understand this, let us remember the basic fault-tolerance semantics of
-Spark's RDDs.
+of failures.
+
+## Background
+{:.no_toc}
+To understand the semantics provided by Spark Streaming, let us remember the basic fault-tolerance semantics of Spark's RDDs.
1. An RDD is an immutable, deterministically re-computable, distributed dataset. Each RDD
remembers the lineage of deterministic operations that were used on a fault-tolerant input
@@ -1868,13 +2131,43 @@ Furthermore, there are two kinds of failures that we should be concerned about:
With this basic knowledge, let us understand the fault-tolerance semantics of Spark Streaming.
-## Semantics with files as input source
+## Definitions
+{:.no_toc}
+The semantics of streaming systems are often captured in terms of how many times each record can be processed by the system. There are three types of guarantees that a system can provide under all possible operating conditions (despite failures, etc.)
+
+1. *At most once*: Each record will be either processed once or not processed at all.
+2. *At least once*: Each record will be processed one or more times. This is stronger than *at-most once* as it ensure that no data will be lost. But there may be duplicates.
+3. *Exactly once*: Each record will be processed exactly once - no data will be lost and no data will be processed multiple times. This is obviously the strongest guarantee of the three.
+
+## Basic Semantics
+{:.no_toc}
+In any stream processing system, broadly speaking, there are three steps in processing the data.
+
+1. *Receiving the data*: The data is received from sources using Receivers or otherwise.
+
+1. *Transforming the data*: The data received data is transformed using DStream and RDD transformations.
+
+1. *Pushing out the data*: The final transformed data is pushed out to external systems like file systems, databases, dashboards, etc.
+
+If a streaming application has to achieve end-to-end exactly-once guarantees, then each step has to provide exactly-once guarantee. That is, each record must be received exactly once, transformed exactly once, and pushed to downstream systems exactly once. Let's understand the semantics of these steps in the context of Spark Streaming.
+
+1. *Receiving the data*: Different input sources provided different guarantees. This is discussed in detail in the next subsection.
+
+1. *Transforming the data*: All data that has been received will be processed _exactly once_, thanks to the guarantees that RDDs provide. Even if there are failures, as long as the received input data is accessible, the final transformed RDDs will always have the same contents.
+
+1. *Pushing out the data*: Output operations by default ensure _at-least once_ semantics because it depends on the type of output operation (idempotent, or not) and the semantics of the downstream system (supports transactions or not). But users can implement their own transaction mechanisms to achieve _exactly-once_ semantics. This is discussed in more details later in the section.
+
+## Semantics of Received Data
+{:.no_toc}
+Different input sources provide different guarantees, ranging from _at-least once_ to _exactly once_. Read for more details.
+
+### With Files
{:.no_toc}
If all of the input data is already present in a fault-tolerant files system like
HDFS, Spark Streaming can always recover from any failure and process all the data. This gives
*exactly-once* semantics, that all the data will be processed exactly once no matter what fails.
-## Semantics with input sources based on receivers
+### With Receiver-based Sources
{:.no_toc}
For input sources based on receivers, the fault-tolerance semantics depend on both the failure
scenario and the type of receiver.
@@ -1893,10 +2186,9 @@ receivers, data received but not replicated can get lost. If the driver node fai
then besides these losses, all the past data that was received and replicated in memory will be
lost. This will affect the results of the stateful transformations.
-To avoid this loss of past received data, Spark 1.2 introduces an experimental feature of _write
+To avoid this loss of past received data, Spark 1.2 introduced _write
ahead logs_ which saves the received data to fault-tolerant storage. With the [write ahead logs
-enabled](#deploying-applications) and reliable receivers, there is zero data loss and
-exactly-once semantics.
+enabled](#deploying-applications) and reliable receivers, there is zero data loss. In terms of semantics, it provides at-least once guarantee.
The following table summarizes the semantics under failures:
@@ -1908,23 +2200,30 @@ The following table summarizes the semantics under failures:
</tr>
<tr>
<td>
- <b>Spark 1.1 or earlier, or</b><br/>
- <b>Spark 1.2 without write ahead log</b>
+ <i>Spark 1.1 or earlier,</i> OR<br/>
+ <i>Spark 1.2 or later without write ahead logs</i>
</td>
<td>
Buffered data lost with unreliable receivers<br/>
- Zero data loss with reliable receivers and files<br/>
+ Zero data loss with reliable receivers<br/>
+ At-least once semantics
</td>
<td>
Buffered data lost with unreliable receivers<br/>
Past data lost with all receivers<br/>
- Zero data loss with files
- </td>
+ Undefined semantics
+ </td>
</tr>
<tr>
- <td><b>Spark 1.2 with write ahead log</b></td>
- <td>Zero data loss with reliable receivers and files</td>
- <td>Zero data loss with reliable receivers and files</td>
+ <td><i>Spark 1.2 or later with write ahead logs</i></td>
+ <td>
+ Zero data loss with reliable receivers<br/>
+ At-least once semantics
+ </td>
+ <td>
+ Zero data loss with reliable receivers and files<br/>
+ At-least once semantics
+ </td>
</tr>
<tr>
<td></td>
@@ -1933,17 +2232,24 @@ The following table summarizes the semantics under failures:
</tr>
</table>
+### With Kafka Direct API
+{:.no_toc}
+In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. This approach (experimental as of Spark 1.3) is further discussed in the [Kafka Integration Guide](streaming-kafka-integration.html).
+
## Semantics of output operations
{:.no_toc}
-Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation
- always leads to the same result. As a result, all DStream transformations are guaranteed to have
- _exactly-once_ semantics. That is, the final transformed result will be same even if there were
- was a worker node failure. However, output operations (like `foreachRDD`) have _at-least once_
- semantics, that is, the transformed data may get written to an external entity more than once in
- the event of a worker failure. While this is acceptable for saving to HDFS using the
- `saveAs***Files` operations (as the file will simply get over-written by the same data),
- additional transactions-like mechanisms may be necessary to achieve exactly-once semantics
- for output operations.
+Output operations (like `foreachRDD`) have _at-least once_ semantics, that is,
+the transformed data may get written to an external entity more than once in
+the event of a worker failure. While this is acceptable for saving to file systems using the
+`saveAs***Files` operations (as the file will simply get overwritten with the same data),
+additional effort may be necessary to achieve exactly-once semantics. There are two approaches.
+
+- *Idempotent updates*: Multiple attempts always write the same data. For example, `saveAs***Files` always writes the same data to the generated files.
+
+- *Transactional updates*: All updates are made transactionally so that updates are made exactly once atomically. One way to do this would be the following.
+
+ - Use the batch time (available in `foreachRDD`) and the partition index of the transformed RDD to create an identifier. This identifier uniquely identifies a blob data in the streaming application.
+ - Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. That is, if the identifier is not already committed, commit the partition data and the identifier atomically. Else if this was already committed, skip the update.
***************************************************************************************************
@@ -2001,7 +2307,11 @@ package and renamed for better clarity.
***************************************************************************************************
# Where to Go from Here
-
+* Additional guides
+ - [Kafka Integration Guide](streaming-kafka-integration.html)
+ - [Flume Integration Guide](streaming-flume-integration.html)
+ - [Kinesis Integration Guide](streaming-kinesis-integration.html)
+ - [Custom Receiver Guide](streaming-custom-receivers.html)
* API documentation
- Scala docs
* [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) and
@@ -2023,8 +2333,8 @@ package and renamed for better clarity.
[ZeroMQUtils](api/java/index.html?org/apache/spark/streaming/zeromq/ZeroMQUtils.html), and
[MQTTUtils](api/java/index.html?org/apache/spark/streaming/mqtt/MQTTUtils.html)
- Python docs
- * [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext)
- * [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)
+ * [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) and [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)
+ * [KafkaUtils](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
* More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming)
and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming)