aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-programming-guide.md
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-06-24 23:57:47 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2013-06-24 23:57:47 -0700
commitc89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6 (patch)
tree01f9f42f30a4aa2f73cb200c89a71254bf74d80e /docs/streaming-programming-guide.md
parent48c7e373c62b2e8cf48157ceb0d92c38c3a40652 (diff)
parent78ffe164b33c6b11a2e511442605acd2f795a1b5 (diff)
downloadspark-c89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6.tar.gz
spark-c89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6.tar.bz2
spark-c89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6.zip
Merge branch 'master' into streaming
Conflicts: .gitignore
Diffstat (limited to 'docs/streaming-programming-guide.md')
-rw-r--r--docs/streaming-programming-guide.md33
1 files changed, 16 insertions, 17 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 0e618a06c7..f5788dc467 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -7,9 +7,9 @@ title: Spark Streaming Programming Guide
{:toc}
# Overview
-A Spark Streaming application is very similar to a Spark application; it consists of a *driver program* that runs the user's `main` function and continuous executes various *parallel operations* on input streams of data. The main abstraction Spark Streaming provides is a *discretized stream* (DStream), which is a continuous sequence of RDDs (distributed collection of elements) representing a continuous stream of data. DStreams can created from live incoming data (such as data from a socket, Kafka, etc.) or it can be generated by transformation of existing DStreams using parallel operators like map, reduce, and window. The basic processing model is as follows:
-(i) While a Spark Streaming driver program is running, the system receives data from various sources and and divides the data into batches. Each batch of data is treated as a RDD, that is a immutable and parallel collection of data. These input data RDDs are automatically persisted in memory (serialized by default) and replicated to two nodes for fault-tolerance. This sequence of RDDs is collectively referred to as an InputDStream.
-(ii) Data received by InputDStreams are processed processed using DStream operations. Since all data is represented as RDDs and all DStream operations as RDD operations, data is automatically recovered in the event of node failures.
+A Spark Streaming application is very similar to a Spark application; it consists of a *driver program* that runs the user's `main` function and continuous executes various *parallel operations* on input streams of data. The main abstraction Spark Streaming provides is a *discretized stream* (DStream), which is a continuous sequence of RDDs (distributed collections of elements) representing a continuous stream of data. DStreams can be created from live incoming data (such as data from a socket, Kafka, etc.) or can be generated by transformong existing DStreams using parallel operators like `map`, `reduce`, and `window`. The basic processing model is as follows:
+(i) While a Spark Streaming driver program is running, the system receives data from various sources and and divides it into batches. Each batch of data is treated as an RDD, that is, an immutable parallel collection of data. These input RDDs are saved in memory and replicated to two nodes for fault-tolerance. This sequence of RDDs is collectively called an InputDStream.
+(ii) Data received by InputDStreams are processed using DStream operations. Since all data is represented as RDDs and all DStream operations as RDD operations, data is automatically recovered in the event of node failures.
This guide shows some how to start programming with DStreams.
@@ -17,16 +17,12 @@ This guide shows some how to start programming with DStreams.
The first thing a Spark Streaming program must do is create a `StreamingContext` object, which tells Spark how to access a cluster. A `StreamingContext` can be created by using
{% highlight scala %}
-new StreamingContext(master, jobName, batchDuration)
+new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
{% endhighlight %}
-The `master` parameter is either the [Mesos master URL](running-on-mesos.html) (for running on a cluster)or the special "local" string (for local mode) that is used to create a Spark Context. For more information about this please refer to the [Spark programming guide](scala-programming-guide.html). The `jobName` is the name of the streaming job, which is the same as the jobName used in SparkContext. It is used to identify this job in the Mesos web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such the cluster can keep up with the processing of the data streams. Starting with something conservative like 5 seconds maybe a good start. See [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion.
-
-This constructor creates a SparkContext object using the given `master` and `jobName` parameters. However, if you already have a SparkContext or you need to create a custom SparkContext by specifying list of JARs, then a StreamingContext can be created from the existing SparkContext, by using
-{% highlight scala %}
-new StreamingContext(sparkContext, batchDuration)
-{% endhighlight %}
+The `master` parameter is a standard [Spark cluster URL](scala-programming-guide.html#master-urls) and can be "local" for local testing. The `appName` is a name of your program, which will be shown on your cluster's web UI. The `batchDuration` is the size of the batches (as explained earlier). This must be set carefully such the cluster can keep up with the processing of the data streams. Start with something conservative like 5 seconds. See the [Performance Tuning](#setting-the-right-batch-size) section for a detailed discussion. Finally, `sparkHome` and `jars` are necessary when running on a cluster to specify the location of your code, as described in the [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).
+This constructor creates a SparkContext for your job as well, which can be accessed with `streamingContext.sparkContext`.
# Attaching Input Sources - InputDStreams
@@ -87,7 +83,7 @@ DStreams support many of the transformations available on normal Spark RDD's:
<tr>
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
<td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together all the values of each key in the RDDs of the source DStream. <br />
- <b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluser) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
+ <b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.
</td>
</tr>
<tr>
@@ -136,7 +132,7 @@ Spark Streaming features windowed computations, which allow you to apply transfo
<td> <b>groupByKeyAndWindow</b>(<i>windowDuration</i>, <i>slideDuration</i>, [<i>numTasks</i>])
</td>
<td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together values of each key over batches in a sliding window. <br />
-<b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluser) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.</td>
+<b>Note:</b> By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.</td>
</tr>
<tr>
<td> <b>reduceByKeyAndWindow</b>(<i>func</i>, <i>windowDuration</i>, <i>slideDuration</i>, [<i>numTasks</i>]) </td>
@@ -286,7 +282,9 @@ For input streams that receive data from the network (that is, subclasses of Net
Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in memory. This is further discussed in the [Performance Tuning](#memory-tuning) section. More information on different persistence levels can be found in [Spark Programming Guide](scala-programming-guide.html#rdd-persistence).
# RDD Checkpointing within DStreams
-DStreams created by stateful operations like `updateStateByKey` require the RDDs in the DStream to be periodically saved to HDFS files for checkpointing. This is because, unless checkpointed, the lineage of operations of the state RDDs can increase indefinitely (since each RDD in the DStream depends on the previous RDD). This leads to two problems - (i) the size of Spark tasks increase proportionally with the RDD lineage leading higher task launch times, (ii) no limit on the amount of recomputation required on failure. Checkpointing RDDs at some interval by writing them to HDFS allows the lineage to be truncated. Note that checkpointing also incurs the cost of saving to HDFS which may cause the corresponding batch to take longer to process. Hence, the interval of checkpointing needs to be set carefully. At small batch sizes (say 1 second), checkpointing every batch may significantly reduce operation throughput. Conversely, checkpointing too slowly causes the lineage and task sizes to grow which may have detrimental effects. Typically, a checkpoint interval of 5 - 10 times of sliding interval of a DStream is good setting to try.
+A _stateful operation_ is one which operates over multiple batches of data. This includes all window-based operations and the `updateStateByKey` operation.
+
+Because stateful operations have a dependency on previous batches of data, they continuously accumulate metadata over time. To clear this metadata, streaming supports periodic _checkpointing_ by saving intermediate data to HDFS. Note that checkpointing also incurs the cost of saving to HDFS which may cause the corresponding batch to take longer to process. Hence, the interval of checkpointing needs to be set carefully. At small batch sizes (say 1 second), checkpointing every batch may significantly reduce operation throughput. Conversely, checkpointing too slowly causes the lineage and task sizes to grow which may have detrimental effects. Typically, a checkpoint interval of 5 - 10 times of sliding interval of a DStream is good setting to try.
To enable checkpointing, the developer has to provide the HDFS path to which RDD will be saved. This is done by using
@@ -380,15 +378,15 @@ val ssc = new StreamingContext(checkpointDirectory)
On calling `ssc.start()` on this new context, the following steps are taken by the system
-1. Schedule the transformations and output operations for all the time steps between the time when the driver failed and when it was restarted. This is also done for those time steps that were scheduled but not processed due to the failure. This will make the system recompute all the intermediate data from the checkpointed RDD files, etc.
+1. Schedule the transformations and output operations for all the time steps between the time when the driver failed and when it last checkpointed. This is also done for those time steps that were previously scheduled but not processed due to the failure. This will make the system recompute all the intermediate data from the checkpointed RDD files, etc.
1. Restart the network receivers, if any, and continue receiving new data.
In the current _alpha_ release, there are two different failure behaviors based on which input sources are used.
1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can re-computed and therefore no data will be lost due to any failure.
-1. _Using any input source that receives data through a network_ - As aforesaid, the received input data is replicated in memory to multiple nodes. Since, all the data in the Spark worker's memory is lost when the Spark driver fails, the past input data will not be accessible and driver recovers. Hence, if stateful and window-based operations are used (like `updateStateByKey`, `window`, `countByValueAndWindow`, etc.), then the intermediate state will not be recovered completely.
+1. _Using any input source that receives data through a network_ - The received input data is replicated in memory to multiple nodes. Since, all the data in the Spark worker's memory is lost when the Spark driver fails, the past input data will not be accessible and driver recovers. Hence, if stateful and window-based operations are used (like `updateStateByKey`, `window`, `countByValueAndWindow`, etc.), then the intermediate state will not be recovered completely.
-In future releases, this behaviour will be fixed for all input sources, that is, all data will be recovered irrespective of which input sources are used. Note that for non-stateful transformations like `map`, `count`, and `reduceByKey`, with _all_ input streams, the system, upon restarting, will continue to receive and process new data.
+In future releases, we will support full recoverability for all input sources. Note that for non-stateful transformations like `map`, `count`, and `reduceByKey`, with _all_ input streams, the system, upon restarting, will continue to receive and process new data.
To better understand the behavior of the system under driver failure with a HDFS source, lets consider what will happen with a file input stream Specifically, in the case of the file input stream, it will correctly identify new files that were created while the driver was down and process them in the same way as it would have if the driver had not failed. To explain further in the case of file input stream, we shall use an example. Lets say, files are being generated every second, and a Spark Streaming program reads every new file and output the number of lines in the file. This is what the sequence of outputs would be with and without a driver failure.
@@ -514,5 +512,6 @@ JavaPairDStream<String, Integer> wordCounts = words.map(
# Where to Go from Here
-* Documentation - [Scala](api/streaming/index.html#spark.streaming.package) and [Java](api/streaming/index.html#spark.streaming.api.java.package)
+* API docs - [Scala](api/streaming/index.html#spark.streaming.package) and [Java](api/streaming/index.html#spark.streaming.api.java.package)
* More examples - [Scala](https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/streaming/examples) and [Java](https://github.com/mesos/spark/tree/master/examples/src/main/java/spark/streaming/examples)
+* [Paper describing Spark Streaming](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf)