From a975a19f21e71f448b3fdb2ed4461e28ef439900 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 5 May 2014 15:28:19 -0700 Subject: [SPARK-1504], [SPARK-1505], [SPARK-1558] Updated Spark Streaming guide - SPARK-1558: Updated custom receiver guide to match it with the new API - SPARK-1504: Added deployment and monitoring subsection to streaming - SPARK-1505: Added migration guide for migrating from 0.9.x and below to Spark 1.0 - Updated various Java streaming examples to use JavaReceiverInputDStream to highlight the API change. - Removed the requirement for cleaner ttl from streaming guide Author: Tathagata Das Closes #652 from tdas/doc-fix and squashes the following commits: cb4f4b7 [Tathagata Das] Possible fix for flaky graceful shutdown test. ab71f7f [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into doc-fix 8d6ff9b [Tathagata Das] Addded migration guide to Spark Streaming. 7d171df [Tathagata Das] Added reference to JavaReceiverInputStream in examples and streaming guide. 49edd7c [Tathagata Das] Change java doc links to use Java docs. 11528d7 [Tathagata Das] Updated links on index page. ff80970 [Tathagata Das] More updates to streaming guide. 4dc42e9 [Tathagata Das] Added monitoring and other documentation in the streaming guide. 14c6564 [Tathagata Das] Updated custom receiver guide. --- docs/streaming-programming-guide.md | 200 ++++++++++++++++++++++++++++-------- 1 file changed, 157 insertions(+), 43 deletions(-) (limited to 'docs/streaming-programming-guide.md') diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index b22bb45828..e8b718b303 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -136,7 +136,7 @@ The complete code can be found in the Spark Streaming example
First, we create a -[JavaStreamingContext](api/scala/index.html#org.apache.spark.streaming.api.java.JavaStreamingContext) object, +[JavaStreamingContext](api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html) object, which is the main entry point for all streaming functionality. Besides Spark's configuration, we specify that any DStream would be processed in 1 second batches. @@ -155,7 +155,7 @@ by specifying the IP address and port of the data server. {% highlight java %} // Create a DStream that will connect to serverIP:serverPort, like localhost:9999 -JavaDStream lines = jssc.socketTextStream("localhost", 9999); +JavaReceiverInputDStream lines = jssc.socketTextStream("localhost", 9999); {% endhighlight %} This `lines` DStream represents the stream of data that will be received from the data @@ -863,6 +863,51 @@ For DStreams that must be checkpointed (that is, DStreams created by `updateStat `reduceByKeyAndWindow` with inverse function), the checkpoint interval of the DStream is by default set to a multiple of the DStream's sliding interval such that its at least 10 seconds. +## Deployment +A Spark Streaming application is deployed on a cluster in the same way as any other Spark application. +Please refer to the [deployment guide](cluster-overview.html) for more details. + +If a running Spark Streaming application needs to be upgraded (with new application code), then +there are two possible mechanism. + +- The upgraded Spark Streaming application is started and run in parallel to the existing application. +Once the new one (receiving the same data as the old one) has been warmed up and ready +for prime time, the old one be can be brought down. Note that this can be done for data sources that support +sending the data to two destinations (i.e., the earlier and upgraded applications). + +- The existing application is shutdown gracefully (see +[`StreamingContext.stop(...)`](api/scala/index.html#org.apache.spark.streaming.StreamingContext) +or [`JavaStreamingContext.stop(...)`](api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html) +for graceful shutdown options) which ensure data that have been received is completely +processed before shutdown. Then the +upgraded application can be started, which will start processing from the same point where the earlier +application left off. Note that this can be done only with input sources that support source-side buffering +(like Kafka, and Flume) as data needs to be buffered while the previous application down and +the upgraded application is not yet up. + +## Monitoring +Beyond Spark's [monitoring capabilities](monitoring.html), there are additional capabilities +specific to Spark Streaming. When a StreamingContext is used, the +[Spark web UI](monitoring.html#web-interfaces) shows +an additional `Streaming` tab which shows statistics about running receivers (whether +receivers are active, number of records received, receiver error, etc.) +and completed batches (batch processing times, queueing delays, etc.). This can be used to +monitor the progress of the streaming application. + +The following two metrics in web UI is particularly important - +*Processing Time* and *Scheduling Delay* (under *Batch Processing Statistics*). The first is the +time to process each batch of data, and the second is the time a batch waits in a queue +for the processing of previous batches to finish. If the batch processing time is consistently more +than the batch interval and/or the queueing delay keeps increasing, then it indicates the system is +not able to process the batches as fast they are being generated and falling behind. +In that case, consider +[reducing](#reducing-the-processing-time-of-each-batch) the batch processing time. + +The progress of a Spark Streaming program can also be monitored using the +[StreamingListener](api/scala/index.html#org.apache.spark.scheduler.StreamingListener) interface, +which allows you to get receiver status and processing times. Note that this is a developer API +and it is likely to be improved upon (i.e., more information reported) in the future. + *************************************************************************************************** # Performance Tuning @@ -875,7 +920,8 @@ improve the performance of you application. At a high level, you need to conside Reducing the processing time of each batch of data by efficiently using cluster resources.
  • - Setting the right batch size such that the data processing can keep up with the data ingestion. + Setting the right batch size such that the batches of data can be processed as fast as they + are received (that is, data processing keeps up with the data ingestion).
  • @@ -884,7 +930,30 @@ There are a number of optimizations that can be done in Spark to minimize the pr each batch. These have been discussed in detail in [Tuning Guide](tuning.html). This section highlights some of the most important ones. -### Level of Parallelism +### Level of Parallelism in Data Receiving +Receiving data over the network (like Kafka, Flume, socket, etc.) requires the data to deserialized +and stored in Spark. If the data receiving becomes a bottleneck in the system, then consider +parallelizing the data receiving. Note that each input DStream +creates a single receiver (running on a worker machine) that receives a single stream of data. +Receiving multiple data streams can therefore be achieved by creating multiple input DStreams +and configuring them to receive different partitions of the data stream from the source(s). +For example, a single Kafka input stream receiving two topics of data can be split into two +Kafka input streams, each receiving only one topic. This would run two receivers on two workers, +thus allowing data to received in parallel, and increasing overall throughput. + +Another parameter that should be considered is the receiver's blocking interval. For most receivers, +the received data is coalesced together into large blocks of data before storing inside Spark's memory. +The number of blocks in each batch determines the number of tasks that will be used to process those +the received data in a map-like transformation. This blocking interval is determined by the +[configuration parameter](configuration.html) `spark.streaming.blockInterval` and the default value +is 200 milliseconds. + +An alternative to receiving data with multiple input streams / receivers is to explicitly repartition +the input data stream (using `inputStream.repartition()`). +This distributes the received batches of data across all the machines in the cluster +before further processing. + +### Level of Parallelism in Data Processing Cluster resources maybe under-utilized if the number of parallel tasks used in any stage of the computation is not high enough. For example, for distributed reduce operations like `reduceByKey` and `reduceByKeyAndWindow`, the default number of parallel tasks is 8. You can pass the level of @@ -921,16 +990,22 @@ These changes may reduce batch processing time by 100s of milliseconds, thus allowing sub-second batch size to be viable. ## Setting the Right Batch Size -For a Spark Streaming application running on a cluster to be stable, the processing of the data -streams must keep up with the rate of ingestion of the data streams. Depending on the type of -computation, the batch size used may have significant impact on the rate of ingestion that can be -sustained by the Spark Streaming application on a fixed cluster resources. For example, let us +For a Spark Streaming application running on a cluster to be stable, the system should be able to +process data as fast as it is being received. In other words, batches of data should be processed +as fast as they are being generated. Whether this is true for an application can be found by +[monitoring](#monitoring) the processing times in the streaming web UI, where the batch +processing time should be less than the batch interval. + +Depending on the nature of the streaming +computation, the batch interval used may have significant impact on the data rates that can be +sustained by the application on a fixed set of cluster resources. For example, let us consider the earlier WordCountNetwork example. For a particular data rate, the system may be able -to keep up with reporting word counts every 2 seconds (i.e., batch size of 2 seconds), but not -every 500 milliseconds. +to keep up with reporting word counts every 2 seconds (i.e., batch interval of 2 seconds), but not +every 500 milliseconds. So the batch interval needs to be set such that the expected data rate in +production can be sustained. A good approach to figure out the right batch size for your application is to test it with a -conservative batch size (say, 5-10 seconds) and a low data rate. To verify whether the system +conservative batch interval (say, 5-10 seconds) and a low data rate. To verify whether the system is able to keep up with data rate, you can check the value of the end-to-end delay experienced by each processed batch (either look for "Total delay" in Spark driver log4j logs, or use the [StreamingListener](api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener) @@ -942,29 +1017,6 @@ data rate and/or reducing the batch size. Note that momentary increase in the de temporary data rate increases maybe fine as long as the delay reduces back to a low value (i.e., less than batch size). -## 24/7 Operation -By default, Spark does not forget any of the metadata (RDDs generated, stages processed, etc.). -But for a Spark Streaming application to operate 24/7, it is necessary for Spark to do periodic -cleanup of it metadata. This can be enabled by setting the -[configuration property](configuration.html#spark-properties) `spark.cleaner.ttl` to the number of -seconds you want any metadata to persist. For example, setting `spark.cleaner.ttl` to 600 would -cause Spark periodically cleanup all metadata and persisted RDDs that are older than 10 minutes. -Note, that this property needs to be set before the SparkContext is created. - -This value is closely tied with any window operation that is being used. Any window operation -would require the input data to be persisted in memory for at least the duration of the window. -Hence it is necessary to set the delay to at least the value of the largest window operation used -in the Spark Streaming application. If this delay is set too low, the application will throw an -exception saying so. - -## Monitoring -Besides Spark's in-built [monitoring capabilities](monitoring.html), -the progress of a Spark Streaming program can also be monitored using the [StreamingListener] -(api/scala/index.html#org.apache.spark.scheduler.StreamingListener) interface, -which allows you to get statistics of batch processing times, queueing delays, -and total end-to-end delays. Note that this is still an experimental API and it is likely to be -improved upon (i.e., more information reported) in the future. - ## Memory Tuning Tuning the memory usage and GC behavior of Spark applications have been discussed in great detail in the [Tuning Guide](tuning.html). It is recommended that you read that. In this section, @@ -1249,18 +1301,80 @@ in the file. This is what the sequence of outputs would be with and without a dr If the driver had crashed in the middle of the processing of time 3, then it will process time 3 and output 30 after recovery. +*************************************************************************************************** + +# Migration Guide from 0.9.1 or below to 1.x +Between Spark 0.9.1 and Spark 1.0, there were a few API changes made to ensure future API stability. +This section elaborates the steps required to migrate your existing code to 1.0. + +**Input DStreams**: All operations that create an input stream (e.g., `StreamingContext.socketStream`, +`FlumeUtils.createStream`, etc.) now returns +[InputDStream](api/scala/index.html#org.apache.spark.streaming.dstream.InputDStream) / +[ReceiverInputDStream](api/scala/index.html#org.apache.spark.streaming.dstream.ReceiverInputDStream) +(instead of DStream) for Scala, and [JavaInputDStream](api/java/org/apache/spark/streaming/api/java/JavaInputDStream.html) / +[JavaPairInputDStream](api/java/org/apache/spark/streaming/api/java/JavaPairInputDStream.html) / +[JavaReceiverInputDStream](api/java/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.html) / +[JavaPairReceiverInputDStream](api/java/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html) +(instead of JavaDStream) for Java. This ensures that functionality specific to input streams can +be added to these classes in the future without breaking binary compatibility. +Note that your existing Spark Streaming applications should not require any change +(as these new classes are subclasses of DStream/JavaDStream) but may require recompilation with Spark 1.0. + +**Custom Network Receivers**: Since the release to Spark Streaming, custom network receivers could be defined +in Scala using the class NetworkReceiver. However, the API was limited in terms of error handling +and reporting, and could not be used from Java. Starting Spark 1.0, this class has been +replaced by [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) which has +the following advantages. + +* Methods like `stop` and `restart` have been added to for better control of the lifecycle of a receiver. See +the [custom receiver guide](streaming-custom-receiver.html) for more details. +* Custom receivers can be implemented using both Scala and Java. + +To migrate your existing custom receivers from the earlier NetworkReceiver to the new Receiver, you have +to do the following. + +* Make your custom receiver class extend +[`org.apache.spark.streaming.receiver.Receiver`](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) +instead of `org.apache.spark.streaming.dstream.NetworkReceiver`. +* Earlier, a BlockGenerator object had to be created by the custom receiver, to which received data was +added for being stored in Spark. It had to be explicitly started and stopped from `onStart()` and `onStop()` +methods. The new Receiver class makes this unnecessary as it adds a set of methods named `store()` +that can be called to store the data in Spark. So, to migrate your custom network receiver, remove any +BlockGenerator object (does not exist any more in Spark 1.0 anyway), and use `store(...)` methods on +received data. + +**Actor-based Receivers**: Data could have been received using any Akka Actors by extending the actor class with +`org.apache.spark.streaming.receivers.Receiver` trait. This has been renamed to +[`org.apache.spark.streaming.receiver.ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper) +and the `pushBlock(...)` methods to store received data has been renamed to `store(...)`. Other helper classes in +the `org.apache.spark.streaming.receivers` package were also moved +to [`org.apache.spark.streaming.receiver`](api/scala/index.html#org.apache.spark.streaming.receiver.package) +package and renamed for better clarity. + +*************************************************************************************************** + # Where to Go from Here * API documentation - - Main docs of StreamingContext and DStreams in [Scala](api/scala/index.html#org.apache.spark.streaming.package) - and [Java](api/scala/index.html#org.apache.spark.streaming.api.java.package) - - Additional docs for - [Kafka](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$), - [Flume](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$), - [Twitter](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$), - [ZeroMQ](api/scala/index.html#org.apache.spark.streaming.zeromq.ZeroMQUtils$), and - [MQTT](api/scala/index.html#org.apache.spark.streaming.mqtt.MQTTUtils$) + - Scala docs + * [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) and + [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream) + * [KafkaUtils](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$), + [FlumeUtils](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$), + [TwitterUtils](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$), + [ZeroMQUtils](api/scala/index.html#org.apache.spark.streaming.zeromq.ZeroMQUtils$), and + [MQTTUtils](api/scala/index.html#org.apache.spark.streaming.mqtt.MQTTUtils$) + - Java docs + * [JavaStreamingContext](api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html), + [JavaDStream](api/java/org/apache/spark/streaming/api/java/JavaDStream.html) and + [PairJavaDStream](api/java/org/apache/spark/streaming/api/java/PairJavaDStream.html) + * [KafkaUtils](api/java/org/apache/spark/streaming/kafka/KafkaUtils.html), + [FlumeUtils](api/java/org/apache/spark/streaming/flume/FlumeUtils.html), + [TwitterUtils](api/java/org/apache/spark/streaming/twitter/TwitterUtils.html), + [ZeroMQUtils](api/java/org/apache/spark/streaming/zeromq/ZeroMQUtils.html), and + [MQTTUtils](api/java/org/apache/spark/streaming/mqtt/MQTTUtils.html) * More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/streaming/examples) and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/streaming/examples) -* [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) describing Spark Streaming. +* [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) and +[video](http://youtu.be/g171ndOHgJ0) describing Spark Streaming. -- cgit v1.2.3