aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-programming-guide.md
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-03-11 18:48:21 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-03-11 18:48:21 -0700
commitcd3b68d93a01f11bd3d5a441b341cb33d227e900 (patch)
treea427f6dbdae218857ec6e8de066b76bf0f43f8ed /docs/streaming-programming-guide.md
parent51a79a770a8356bd0ed244af5ca7f1c44c9437d2 (diff)
downloadspark-cd3b68d93a01f11bd3d5a441b341cb33d227e900.tar.gz
spark-cd3b68d93a01f11bd3d5a441b341cb33d227e900.tar.bz2
spark-cd3b68d93a01f11bd3d5a441b341cb33d227e900.zip
[SPARK-6128][Streaming][Documentation] Updates to Spark Streaming Programming Guide
Updates to the documentation are as follows: - Added information on Kafka Direct API and Kafka Python API - Added joins to the main streaming guide - Improved details on the fault-tolerance semantics Generated docs located here http://people.apache.org/~tdas/spark-1.3.0-temp-docs/streaming-programming-guide.html#fault-tolerance-semantics More things to add: - Configuration for Kafka receive rate - May be add concurrentJobs Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4956 from tdas/streaming-guide-update-1.3 and squashes the following commits: 819408c [Tathagata Das] Minor fixes. debe484 [Tathagata Das] Added DataFrames and MLlib 380cf8d [Tathagata Das] Fix link 04167a6 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-guide-update-1.3 0b77486 [Tathagata Das] Updates based on Josh's comments. 86c4c2a [Tathagata Das] Updated streaming guides 82de92a [Tathagata Das] Add Kafka to Python api docs
Diffstat (limited to 'docs/streaming-programming-guide.md')
-rw-r--r--docs/streaming-programming-guide.md470
1 files changed, 390 insertions, 80 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 062ac2648d..6d6229625f 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -432,7 +432,7 @@ some of the common ones are as follows.
</table>
For an up-to-date list, please refer to the
-[Apache repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
+[Maven repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
for the full list of supported sources and artifacts.
***
@@ -662,8 +662,7 @@ methods for creating DStreams from files and Akka actors as input sources.
For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores.
- <span class="badge" style="background-color: grey">Python API</span> As of Spark 1.2,
- `fileStream` is not available in the Python API, only `textFileStream` is available.
+ <span class="badge" style="background-color: grey">Python API</span> `fileStream` is not available in the Python API, only `textFileStream` is available.
- **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka
actors by using `streamingContext.actorStream(actorProps, actor-name)`. See the [Custom Receiver
@@ -682,8 +681,9 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
### Advanced Sources
{:.no_toc}
-<span class="badge" style="background-color: grey">Python API</span> As of Spark 1.2,
-these sources are not available in the Python API.
+
+<span class="badge" style="background-color: grey">Python API</span> As of Spark 1.3,
+out of these sources, *only* Kafka is available in the Python API. We will add more advanced sources in the Python API in future.
This category of sources require interfacing with external non-Spark libraries, some of them with
complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts
@@ -723,6 +723,12 @@ and it in the classpath.
Some of these advanced sources are as follows.
+- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kafka 0.8.1.1. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details.
+
+- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Flume 1.4.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details.
+
+- **Kinesis:** See the [Kinesis Integration Guide](streaming-kinesis-integration.html) for more details.
+
- **Twitter:** Spark Streaming's TwitterUtils uses Twitter4j 3.0.3 to get the public stream of tweets using
[Twitter's Streaming API](https://dev.twitter.com/docs/streaming-apis). Authentication information
can be provided by any of the [methods](http://twitter4j.org/en/configuration.html) supported by
@@ -732,17 +738,10 @@ Some of these advanced sources are as follows.
([TwitterPopularTags]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala)
and [TwitterAlgebirdCMS]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala)).
-- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} can received data from Flume 1.4.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details.
-
-- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} can receive data from Kafka 0.8.0. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details.
-
-- **Kinesis:** See the [Kinesis Integration Guide](streaming-kinesis-integration.html) for more details.
-
### Custom Sources
{:.no_toc}
-<span class="badge" style="background-color: grey">Python API</span> As of Spark 1.2,
-these sources are not available in the Python API.
+<span class="badge" style="background-color: grey">Python API</span> This is not yet supported in Python.
Input DStreams can also be created out of custom data sources. All you have to do is implement an
user-defined **receiver** (see next section to understand what that is) that can receive data from
@@ -846,7 +845,7 @@ Some of the common ones are as follows.
<tr><td></td><td></td></tr>
</table>
-The last two transformations are worth highlighting again.
+A few of these transformations are worth discussing in more detail.
#### UpdateStateByKey Operation
{:.no_toc}
@@ -997,7 +996,7 @@ In fact, you can also use [machine learning](mllib-guide.html) and
#### Window Operations
{:.no_toc}
-Finally, Spark Streaming also provides *windowed computations*, which allow you to apply
+Spark Streaming also provides *windowed computations*, which allow you to apply
transformations over a sliding window of data. This following figure illustrates this sliding
window.
@@ -1120,6 +1119,100 @@ said two parameters - <i>windowLength</i> and <i>slideInterval</i>.
<tr><td></td><td></td></tr>
</table>
+#### Join Operations
+{:.no_toc}
+Finally, its worth highlighting how easily you can perform different kinds of joins in Spark Streaming.
+
+
+##### Stream-stream joins
+{:.no_toc}
+Streams can be very easily joined with other streams.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val stream1: DStream[String, String] = ...
+val stream2: DStream[String, String] = ...
+val joinedStream = stream1.join(stream2)
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+JavaPairDStream<String, String> stream1 = ...
+JavaPairDStream<String, String> stream2 = ...
+JavaPairDStream<String, String> joinedStream = stream1.join(stream2);
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+stream1 = ...
+stream2 = ...
+joinedStream = stream1.join(stream2)
+{% endhighlight %}
+</div>
+</div>
+Here, in each batch interval, the RDD generated by `stream1` will be joined with the RDD generated by `stream2`. You can also do `leftOuterJoin`, `rightOuterJoin`, `fullOuterJoin`. Furthermore, it is often very useful to do joins over windows of the streams. That is pretty easy as well.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val windowedStream1 = stream1.window(Seconds(20))
+val windowedStream2 = stream2.window(Minutes(1))
+val joinedStream = windowedStream1.join(windowedStream2)
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
+JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
+JavaPairDStream<String, String> joinedStream = windowedStream1.join(windowedStream2);
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+windowedStream1 = stream1.window(20)
+windowedStream2 = stream2.window(60)
+joinedStream = windowedStream1.join(windowedStream2)
+{% endhighlight %}
+</div>
+</div>
+
+##### Stream-dataset joins
+{:.no_toc}
+This has already been shown earlier while explain `DStream.transform` operation. Here is yet another example of joining a windowed stream with a dataset.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val dataset: RDD[String, String] = ...
+val windowedStream = stream.window(Seconds(20))...
+val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+JavaPairRDD<String, String> dataset = ...
+JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
+JavaPairDStream<String, String> joinedStream = windowedStream.transform(
+ new Function<JavaRDD<Tuple2<String, String>>, JavaRDD<Tuple2<String, String>>>() {
+ @Override
+ public JavaRDD<Tuple2<String, String>> call(JavaRDD<Tuple2<String, String>> rdd) {
+ return rdd.join(dataset);
+ }
+ }
+);
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+dataset = ... # some RDD
+windowedStream = stream.window(20)
+joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
+{% endhighlight %}
+</div>
+</div>
+
+In fact, you can also dynamically change the dataset you want to join against. The function provided to `transform` is evaluated every batch interval and therefore will use the current dataset that `dataset` reference points to.
The complete list of DStream transformations is available in the API documentation. For the Scala API,
see [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream)
@@ -1327,6 +1420,178 @@ Note that the connections in the pool should be lazily created on demand and tim
***
+## DataFrame and SQL Operations
+You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+/** Lazily instantiated singleton instance of SQLContext */
+object SQLContextSingleton {
+ @transient private var instance: SQLContext = null
+
+ // Instantiate SQLContext on demand
+ def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
+ if (instance == null) {
+ instance = new SQLContext(sparkContext)
+ }
+ instance
+ }
+}
+
+...
+
+/** Case class for converting RDD to DataFrame */
+case class Row(word: String)
+
+...
+
+/** DataFrame operations inside your streaming program */
+
+val words: DStream[String] = ...
+
+words.foreachRDD { rdd =>
+
+ // Get the singleton instance of SQLContext
+ val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
+ import sqlContext.implicits._
+
+ // Convert RDD[String] to RDD[case class] to DataFrame
+ val wordsDataFrame = rdd.map(w => Row(w)).toDF()
+
+ // Register as table
+ wordsDataFrame.registerTempTable("words")
+
+ // Do word count on DataFrame using SQL and print it
+ val wordCountsDataFrame =
+ sqlContext.sql("select word, count(*) as total from words group by word")
+ wordCountsDataFrame.show()
+}
+
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala).
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+/** Lazily instantiated singleton instance of SQLContext */
+class JavaSQLContextSingleton {
+ static private transient SQLContext instance = null;
+ static public SQLContext getInstance(SparkContext sparkContext) {
+ if (instance == null) {
+ instance = new SQLContext(sparkContext);
+ }
+ return instance;
+ }
+}
+
+...
+
+/** Java Bean class for converting RDD to DataFrame */
+public class JavaRow implements java.io.Serializable {
+ private String word;
+
+ public String getWord() {
+ return word;
+ }
+
+ public void setWord(String word) {
+ this.word = word;
+ }
+}
+
+...
+
+/** DataFrame operations inside your streaming program */
+
+JavaDStream<String> words = ...
+
+words.foreachRDD(
+ new Function2<JavaRDD<String>, Time, Void>() {
+ @Override
+ public Void call(JavaRDD<String> rdd, Time time) {
+ SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
+
+ // Convert RDD[String] to RDD[case class] to DataFrame
+ JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
+ public JavaRow call(String word) {
+ JavaRow record = new JavaRow();
+ record.setWord(word);
+ return record;
+ }
+ });
+ DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
+
+ // Register as table
+ wordsDataFrame.registerTempTable("words");
+
+ // Do word count on table using SQL and print it
+ DataFrame wordCountsDataFrame =
+ sqlContext.sql("select word, count(*) as total from words group by word");
+ wordCountsDataFrame.show();
+ return null;
+ }
+ }
+);
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java).
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+# Lazily instantiated global instance of SQLContext
+def getSqlContextInstance(sparkContext):
+ if ('sqlContextSingletonInstance' not in globals()):
+ globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
+ return globals()['sqlContextSingletonInstance']
+
+...
+
+# DataFrame operations inside your streaming program
+
+words = ... # DStream of strings
+
+def process(time, rdd):
+ print "========= %s =========" % str(time)
+ try:
+ # Get the singleton instance of SQLContext
+ sqlContext = getSqlContextInstance(rdd.context)
+
+ # Convert RDD[String] to RDD[Row] to DataFrame
+ rowRdd = rdd.map(lambda w: Row(word=w))
+ wordsDataFrame = sqlContext.createDataFrame(rowRdd)
+
+ # Register as table
+ wordsDataFrame.registerTempTable("words")
+
+ # Do word count on table using SQL and print it
+ wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word")
+ wordCountsDataFrame.show()
+ except:
+ pass
+
+words.foreachRDD(process)
+{% endhighlight %}
+
+See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/sql_network_wordcount.py).
+
+</div>
+</div>
+
+You can also run SQL queries on tables defined on streaming data from a different thread (that is, asynchronous to the running StreamingContext). Just make sure that you set the StreamingContext to remember sufficient amount of streaming data such that query can run. Otherwise the StreamingContext, which is unaware of the any asynchronous SQL queries, will delete off old streaming data before the query can complete. For example, if you want to query the last batch, but your query can take 5 minutes to run, then call `streamingContext.remember(Minutes(5))` (in Scala, or equivalent in other languages).
+
+See the [DataFrames and SQL](sql-programming-guide.html) guide to learn more about DataFrames.
+
+***
+
+## MLlib Operations
+You can also easily use machine learning algorithms provided by [MLlib](mllib-guide.html). First of all, there are streaming machine learning algorithms (e.g. (Streaming Linear Regression](mllib-linear-methods.html#streaming-linear-regression), [Streaming KMeans](mllib-clustering.html#streaming-k-means), etc.) which can simultaneously learn from the streaming data as well as apply the model on the streaming data. Beyond these, for a much larger class of machine learning algorithms, you can learn a learning model offline (i.e. using historical data) and then apply the model online on streaming data. See the [MLlib](mllib-guide.html) guide for more details.
+
+***
+
## Caching / Persistence
Similar to RDDs, DStreams also allow developers to persist the stream's data in memory. That is,
using `persist()` method on a DStream will automatically persist every RDD of that DStream in
@@ -1580,9 +1845,8 @@ To run a Spark Streaming applications, you need to have the following.
+ *Mesos* - [Marathon](https://github.com/mesosphere/marathon) has been used to achieve this
with Mesos.
-
-- *[Experimental in Spark 1.2] Configuring write ahead logs* - In Spark 1.2,
- we have introduced a new experimental feature of write ahead logs for achieving strong
+- *[Since Spark 1.2] Configuring write ahead logs* - Since Spark 1.2,
+ we have introduced _write ahead logs_ for achieving strong
fault-tolerance guarantees. If enabled, all the data received from a receiver gets written into
a write ahead log in the configuration checkpoint directory. This prevents data loss on driver
recovery, thus ensuring zero data loss (discussed in detail in the
@@ -1668,7 +1932,7 @@ improve the performance of you application. At a high level, you need to conside
2. Setting the right batch size such that the batches of data can be processed as fast as they
are received (that is, data processing keeps up with the data ingestion).
-## Reducing the Processing Time of each Batch
+## Reducing the Batch Processing Times
There are a number of optimizations that can be done in Spark to minimize the processing time of
each batch. These have been discussed in detail in [Tuning Guide](tuning.html). This section
highlights some of the most important ones.
@@ -1740,16 +2004,15 @@ documentation), or set the `spark.default.parallelism`
### Data Serialization
{:.no_toc}
-The overhead of data serialization can be significant, especially when sub-second batch sizes are
- to be achieved. There are two aspects to it.
+The overheads of data serialization can be reduce by tuning the serialization formats. In case of streaming, there are two types of data that are being serialized.
+
+* **Input data**: By default, the input data received through Receivers is stored in the executors' memory with [StorageLevel.MEMORY_AND_DISK_SER_2](api/scala/index.html#org.apache.spark.storage.StorageLevel$). That is, the data is serialized into bytes to reduce GC overheads, and replicated for tolerating executor failures. Also, the data is kept first in memory, and spilled over to disk only if the memory is unsufficient to hold all the input data necessary for the streaming computation. This serialization obviously has overheads -- the receiver must deserialize the received data and re-serialize it using Spark's serialization format.
-* **Serialization of RDD data in Spark**: Please refer to the detailed discussion on data
- serialization in the [Tuning Guide](tuning.html). However, note that unlike Spark, by default
- RDDs are persisted as serialized byte arrays to minimize pauses related to GC.
+* **Persisted RDDs generated by Streaming Operations**: RDDs generated by streaming computations may be persisted in memory. For example, window operation persist data in memory as they would be processed multiple times. However, unlike Spark, by default RDDs are persisted with [StorageLevel.MEMORY_ONLY_SER](api/scala/index.html#org.apache.spark.storage.StorageLevel$) (i.e. serialized) to minimize GC overheads.
-* **Serialization of input data**: To ingest external data into Spark, data received as bytes
- (say, from the network) needs to deserialized from bytes and re-serialized into Spark's
- serialization format. Hence, the deserialization overhead of input data may be a bottleneck.
+In both cases, using Kryo serialization can reduce both CPU and memory overheads. See the [Spark Tuning Guide](tuning.html#data-serialization)) for more details. Consider registering custom classes, and disabling object reference tracking for Kryo (see Kryo-related configurations in the [Configuration Guide](configuration.html#compression-and-serialization)).
+
+In specific cases where the amount of data that needs to be retained for the streaming application is not large, it may be feasible to persist data (both types) as deserialized objects without incurring excessive GC overheads. For example, if you are using batch intervals of few seconds and no window operations, then you can try disabling serialization in persisted data by explicitly setting the storage level accordingly. This would reduce the CPU overheads due to serialization, potentially improving performance without too much GC overheads.
### Task Launching Overheads
{:.no_toc}
@@ -1769,7 +2032,7 @@ thus allowing sub-second batch size to be viable.
***
-## Setting the Right Batch Size
+## Setting the Right Batch Interval
For a Spark Streaming application running on a cluster to be stable, the system should be able to
process data as fast as it is being received. In other words, batches of data should be processed
as fast as they are being generated. Whether this is true for an application can be found by
@@ -1801,40 +2064,40 @@ temporary data rate increases maybe fine as long as the delay reduces back to a
## Memory Tuning
Tuning the memory usage and GC behavior of Spark applications have been discussed in great detail
-in the [Tuning Guide](tuning.html). It is recommended that you read that. In this section,
-we highlight a few customizations that are strongly recommended to minimize GC related pauses
-in Spark Streaming applications and achieving more consistent batch processing times.
-
-* **Default persistence level of DStreams**: Unlike RDDs, the default persistence level of DStreams
-serializes the data in memory (that is,
-[StorageLevel.MEMORY_ONLY_SER](api/scala/index.html#org.apache.spark.storage.StorageLevel$) for
-DStream compared to
-[StorageLevel.MEMORY_ONLY](api/scala/index.html#org.apache.spark.storage.StorageLevel$) for RDDs).
-Even though keeping the data serialized incurs higher serialization/deserialization overheads,
-it significantly reduces GC pauses.
-
-* **Clearing persistent RDDs**: By default, all persistent RDDs generated by Spark Streaming will
- be cleared from memory based on Spark's built-in policy (LRU). If `spark.cleaner.ttl` is set,
- then persistent RDDs that are older than that value are periodically cleared. As mentioned
- [earlier](#operation), this needs to be careful set based on operations used in the Spark
- Streaming program. However, a smarter unpersisting of RDDs can be enabled by setting the
- [configuration property](configuration.html#spark-properties) `spark.streaming.unpersist` to
- `true`. This makes the system to figure out which RDDs are not necessary to be kept around and
- unpersists them. This is likely to reduce
- the RDD memory usage of Spark, potentially improving GC behavior as well.
-
-* **Concurrent garbage collector**: Using the concurrent mark-and-sweep GC further
-minimizes the variability of GC pauses. Even though concurrent GC is known to reduce the
+in the [Tuning Guide](tuning.html#memory-tuning). It is strongly recommended that you read that. In this section, we discuss a few tuning parameters specifically in the context of Spark Streaming applications.
+
+The amount of cluster memory required by a Spark Streaming application depends heavily on the type of transformations used. For example, if you want to use a window operation on last 10 minutes of data, then your cluster should have sufficient memory to hold 10 minutes of worth of data in memory. Or if you want to use `updateStateByKey` with a large number of keys, then the necessary memory will be high. On the contrary, if you want to do a simple map-filter-store operation, then necessary memory will be low.
+
+In general, since the data received through receivers are stored with StorageLevel.MEMORY_AND_DISK_SER_2, the data that does not fit in memory will spill over to the disk. This may reduce the performance of the streaming application, and hence it is advised to provide sufficient memory as required by your streaming application. Its best to try and see the memory usage on a small scale and estimate accordingly.
+
+Another aspect of memory tuning is garbage collection. For a streaming application that require low latency, it is undesirable to have large pauses caused by JVM Garbage Collection.
+
+There are a few parameters that can help you tune the memory usage and GC overheads.
+
+* **Persistence Level of DStreams**: As mentioned earlier in the [Data Serialization](#data-serialization) section, the input data and RDDs are by default persisted as serialized bytes. This reduces both, the memory usage and GC overheads, compared to deserialized persistence. Enabling Kryo serialization further reduces serialized sizes and memory usage. Further reduction in memory usage can be achieved with compression (see the Spark configuration `spark.rdd.compress`), at the cost of CPU time.
+
+* **Clearing old data**: By default, all input data and persisted RDDs generated by DStream transformations are automatically cleared. Spark Streaming decides when to clear the data based on the transformations that are used. For example, if you are using window operation of 10 minutes, then Spark Streaming will keep around last 10 minutes of data, and actively throw away older data.
+Data can be retained for longer duration (e.g. interactively querying older data) by setting `streamingContext.remember`.
+
+* **CMS Garbage Collector**: Use of the concurrent mark-and-sweep GC is strongly recommended for keeping GC-related pauses consistently low. Even though concurrent GC is known to reduce the
overall processing throughput of the system, its use is still recommended to achieve more
-consistent batch processing times.
+consistent batch processing times. Make sure you set the CMS GC on both the driver (using `--driver-java-options` in `spark-submit`) and the executors (using [Spark configuration](configuration.html#runtime-environment) `spark.executor.extraJavaOptions`).
+
+* **Other tips**: To further reduce GC overheads, here are some more tips to try.
+ - Use Tachyon for off-heap storage of persisted RDDs. See more detail in the [Spark Programming Guide](programming-guide.html#rdd-persistence).
+ - Use more executors with smaller heap sizes. This will reduce the GC pressure within each JVM heap.
+
***************************************************************************************************
***************************************************************************************************
# Fault-tolerance Semantics
In this section, we will discuss the behavior of Spark Streaming applications in the event
-of node failures. To understand this, let us remember the basic fault-tolerance semantics of
-Spark's RDDs.
+of failures.
+
+## Background
+{:.no_toc}
+To understand the semantics provided by Spark Streaming, let us remember the basic fault-tolerance semantics of Spark's RDDs.
1. An RDD is an immutable, deterministically re-computable, distributed dataset. Each RDD
remembers the lineage of deterministic operations that were used on a fault-tolerant input
@@ -1868,13 +2131,43 @@ Furthermore, there are two kinds of failures that we should be concerned about:
With this basic knowledge, let us understand the fault-tolerance semantics of Spark Streaming.
-## Semantics with files as input source
+## Definitions
+{:.no_toc}
+The semantics of streaming systems are often captured in terms of how many times each record can be processed by the system. There are three types of guarantees that a system can provide under all possible operating conditions (despite failures, etc.)
+
+1. *At most once*: Each record will be either processed once or not processed at all.
+2. *At least once*: Each record will be processed one or more times. This is stronger than *at-most once* as it ensure that no data will be lost. But there may be duplicates.
+3. *Exactly once*: Each record will be processed exactly once - no data will be lost and no data will be processed multiple times. This is obviously the strongest guarantee of the three.
+
+## Basic Semantics
+{:.no_toc}
+In any stream processing system, broadly speaking, there are three steps in processing the data.
+
+1. *Receiving the data*: The data is received from sources using Receivers or otherwise.
+
+1. *Transforming the data*: The data received data is transformed using DStream and RDD transformations.
+
+1. *Pushing out the data*: The final transformed data is pushed out to external systems like file systems, databases, dashboards, etc.
+
+If a streaming application has to achieve end-to-end exactly-once guarantees, then each step has to provide exactly-once guarantee. That is, each record must be received exactly once, transformed exactly once, and pushed to downstream systems exactly once. Let's understand the semantics of these steps in the context of Spark Streaming.
+
+1. *Receiving the data*: Different input sources provided different guarantees. This is discussed in detail in the next subsection.
+
+1. *Transforming the data*: All data that has been received will be processed _exactly once_, thanks to the guarantees that RDDs provide. Even if there are failures, as long as the received input data is accessible, the final transformed RDDs will always have the same contents.
+
+1. *Pushing out the data*: Output operations by default ensure _at-least once_ semantics because it depends on the type of output operation (idempotent, or not) and the semantics of the downstream system (supports transactions or not). But users can implement their own transaction mechanisms to achieve _exactly-once_ semantics. This is discussed in more details later in the section.
+
+## Semantics of Received Data
+{:.no_toc}
+Different input sources provide different guarantees, ranging from _at-least once_ to _exactly once_. Read for more details.
+
+### With Files
{:.no_toc}
If all of the input data is already present in a fault-tolerant files system like
HDFS, Spark Streaming can always recover from any failure and process all the data. This gives
*exactly-once* semantics, that all the data will be processed exactly once no matter what fails.
-## Semantics with input sources based on receivers
+### With Receiver-based Sources
{:.no_toc}
For input sources based on receivers, the fault-tolerance semantics depend on both the failure
scenario and the type of receiver.
@@ -1893,10 +2186,9 @@ receivers, data received but not replicated can get lost. If the driver node fai
then besides these losses, all the past data that was received and replicated in memory will be
lost. This will affect the results of the stateful transformations.
-To avoid this loss of past received data, Spark 1.2 introduces an experimental feature of _write
+To avoid this loss of past received data, Spark 1.2 introduced _write
ahead logs_ which saves the received data to fault-tolerant storage. With the [write ahead logs
-enabled](#deploying-applications) and reliable receivers, there is zero data loss and
-exactly-once semantics.
+enabled](#deploying-applications) and reliable receivers, there is zero data loss. In terms of semantics, it provides at-least once guarantee.
The following table summarizes the semantics under failures:
@@ -1908,23 +2200,30 @@ The following table summarizes the semantics under failures:
</tr>
<tr>
<td>
- <b>Spark 1.1 or earlier, or</b><br/>
- <b>Spark 1.2 without write ahead log</b>
+ <i>Spark 1.1 or earlier,</i> OR<br/>
+ <i>Spark 1.2 or later without write ahead logs</i>
</td>
<td>
Buffered data lost with unreliable receivers<br/>
- Zero data loss with reliable receivers and files<br/>
+ Zero data loss with reliable receivers<br/>
+ At-least once semantics
</td>
<td>
Buffered data lost with unreliable receivers<br/>
Past data lost with all receivers<br/>
- Zero data loss with files
- </td>
+ Undefined semantics
+ </td>
</tr>
<tr>
- <td><b>Spark 1.2 with write ahead log</b></td>
- <td>Zero data loss with reliable receivers and files</td>
- <td>Zero data loss with reliable receivers and files</td>
+ <td><i>Spark 1.2 or later with write ahead logs</i></td>
+ <td>
+ Zero data loss with reliable receivers<br/>
+ At-least once semantics
+ </td>
+ <td>
+ Zero data loss with reliable receivers and files<br/>
+ At-least once semantics
+ </td>
</tr>
<tr>
<td></td>
@@ -1933,17 +2232,24 @@ The following table summarizes the semantics under failures:
</tr>
</table>
+### With Kafka Direct API
+{:.no_toc}
+In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. This approach (experimental as of Spark 1.3) is further discussed in the [Kafka Integration Guide](streaming-kafka-integration.html).
+
## Semantics of output operations
{:.no_toc}
-Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation
- always leads to the same result. As a result, all DStream transformations are guaranteed to have
- _exactly-once_ semantics. That is, the final transformed result will be same even if there were
- was a worker node failure. However, output operations (like `foreachRDD`) have _at-least once_
- semantics, that is, the transformed data may get written to an external entity more than once in
- the event of a worker failure. While this is acceptable for saving to HDFS using the
- `saveAs***Files` operations (as the file will simply get over-written by the same data),
- additional transactions-like mechanisms may be necessary to achieve exactly-once semantics
- for output operations.
+Output operations (like `foreachRDD`) have _at-least once_ semantics, that is,
+the transformed data may get written to an external entity more than once in
+the event of a worker failure. While this is acceptable for saving to file systems using the
+`saveAs***Files` operations (as the file will simply get overwritten with the same data),
+additional effort may be necessary to achieve exactly-once semantics. There are two approaches.
+
+- *Idempotent updates*: Multiple attempts always write the same data. For example, `saveAs***Files` always writes the same data to the generated files.
+
+- *Transactional updates*: All updates are made transactionally so that updates are made exactly once atomically. One way to do this would be the following.
+
+ - Use the batch time (available in `foreachRDD`) and the partition index of the transformed RDD to create an identifier. This identifier uniquely identifies a blob data in the streaming application.
+ - Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. That is, if the identifier is not already committed, commit the partition data and the identifier atomically. Else if this was already committed, skip the update.
***************************************************************************************************
@@ -2001,7 +2307,11 @@ package and renamed for better clarity.
***************************************************************************************************
# Where to Go from Here
-
+* Additional guides
+ - [Kafka Integration Guide](streaming-kafka-integration.html)
+ - [Flume Integration Guide](streaming-flume-integration.html)
+ - [Kinesis Integration Guide](streaming-kinesis-integration.html)
+ - [Custom Receiver Guide](streaming-custom-receivers.html)
* API documentation
- Scala docs
* [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) and
@@ -2023,8 +2333,8 @@ package and renamed for better clarity.
[ZeroMQUtils](api/java/index.html?org/apache/spark/streaming/zeromq/ZeroMQUtils.html), and
[MQTTUtils](api/java/index.html?org/apache/spark/streaming/mqtt/MQTTUtils.html)
- Python docs
- * [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext)
- * [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)
+ * [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) and [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)
+ * [KafkaUtils](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
* More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming)
and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming)