aboutsummaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-09-03 17:38:01 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-09-03 17:38:01 -0700
commita5224079286d1777864cf9fa77330aadae10cd7b (patch)
treeb44c8672b86a6b38769b62484772c6f237c39480 /docs
parent996b7434ee0d0c7c26987eb9cf050c139fdd2db2 (diff)
downloadspark-a5224079286d1777864cf9fa77330aadae10cd7b.tar.gz
spark-a5224079286d1777864cf9fa77330aadae10cd7b.tar.bz2
spark-a5224079286d1777864cf9fa77330aadae10cd7b.zip
[SPARK-2419][Streaming][Docs] Updates to the streaming programming guide
Updated the main streaming programming guide, and also added source-specific guides for Kafka, Flume, Kinesis. Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Jacek Laskowski <jacek@japila.pl> Closes #2254 from tdas/streaming-doc-fix and squashes the following commits: e45c6d7 [Jacek Laskowski] More fixes from an old PR 5125316 [Tathagata Das] Fixed links dc02f26 [Tathagata Das] Refactored streaming kinesis guide and made many other changes. acbc3e3 [Tathagata Das] Fixed links between streaming guides. cb7007f [Tathagata Das] Added Streaming + Flume integration guide. 9bd9407 [Tathagata Das] Updated streaming programming guide with additional information from SPARK-2419.
Diffstat (limited to 'docs')
-rw-r--r--docs/streaming-flume-integration.md132
-rw-r--r--docs/streaming-kafka-integration.md42
-rw-r--r--docs/streaming-kinesis-integration.md110
-rw-r--r--docs/streaming-kinesis.md59
-rw-r--r--docs/streaming-programming-guide.md518
5 files changed, 622 insertions, 239 deletions
diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md
new file mode 100644
index 0000000000..d57c3e0ef9
--- /dev/null
+++ b/docs/streaming-flume-integration.md
@@ -0,0 +1,132 @@
+---
+layout: global
+title: Spark Streaming + Flume Integration Guide
+---
+
+[Apache Flume](https://flume.apache.org/) is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this.
+
+## Approach 1: Flume-style Push-based Approach
+Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps.
+
+#### General Requirements
+Choose a machine in your cluster such that
+
+- When your Flume + Spark Streaming application is launched, one of the Spark workers must run on that machine.
+
+- Flume can be configured to push data to a port on that machine.
+
+Due to the push model, the streaming application needs to be up, with the receiver scheduled and listening on the chosen port, for Flume to be able push data.
+
+#### Configuring Flume
+Configure Flume agent to send data to an Avro sink by having the following in the configuration file.
+
+ agent.sinks = avroSink
+ agent.sinks.avroSink.type = avro
+ agent.sinks.avroSink.channel = memoryChannel
+ agent.sinks.avroSink.hostname = <chosen machine's hostname>
+ agent.sinks.avroSink.port = <chosen port on the machine>
+
+See the [Flume's documentation](https://flume.apache.org/documentation.html) for more information about
+configuring Flume agents.
+
+#### Configuring Spark Streaming Application
+1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
+
+ groupId = org.apache.spark
+ artifactId = spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}
+ version = {{site.SPARK_VERSION_SHORT}}
+
+2. **Programming:** In the streaming application code, import `FlumeUtils` and create input DStream as follows.
+
+ <div class="codetabs">
+ <div data-lang="scala" markdown="1">
+ import org.apache.spark.streaming.flume._
+
+ val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
+
+ See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$)
+ and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala).
+ </div>
+ <div data-lang="java" markdown="1">
+ import org.apache.spark.streaming.flume.*;
+
+ JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
+ FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]);
+
+ See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
+ and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
+ </div>
+ </div>
+
+ Note that the hostname should be the same as the one used by the resource manager in the
+ cluster (Mesos, YARN or Spark Standalone), so that resource allocation can match the names and launch
+ the receiver in the right machine.
+
+3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
+
+## Approach 2 (Experimental): Pull-based Approach using a Custom Sink
+Instead of Flume pushing data directly to Spark Streaming, this approach runs a custom Flume sink that allows the following.
+- Flume pushes data into the sink, and the data stays buffered.
+- Spark Streaming uses transactions to pull data from the sink. Transactions succeed only after data is received and replicated by Spark Streaming.
+This ensures that better reliability and fault-tolerance than the previous approach. However, this requires configuring Flume to run a custom sink. Here are the configuration steps.
+
+#### General Requirements
+Choose a machine that will run the custom sink in a Flume agent. The rest of the Flume pipeline is configured to send data to that agent. Machines in the Spark cluster should have access to the chosen machine running the custom sink.
+
+#### Configuring Flume
+Configuring Flume on the chosen machine requires the following two steps.
+
+1. **Sink JARs**: Add the following JARs to Flume's classpath (see [Flume's documentation](https://flume.apache.org/documentation.html) to see how) in the machine designated to run the custom sink .
+
+ (i) *Custom sink JAR*: Download the JAR corresponding to the following artifact (or [direct link](http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}}/{{site.SPARK_VERSION_SHORT}}/spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}}-{{site.SPARK_VERSION_SHORT}}.jar)).
+
+ groupId = org.apache.spark
+ artifactId = spark-streaming-flume-sink_{{site.SCALA_BINARY_VERSION}}
+ version = {{site.SPARK_VERSION_SHORT}}
+
+ (ii) *Scala library JAR*: Download the Scala library JAR for Scala {{site.SCALA_VERSION}}. It can be found with the following artifact detail (or, [direct link](http://search.maven.org/remotecontent?filepath=org/scala-lang/scala-library/{{site.SCALA_VERSION}}/scala-library-{{site.SCALA_VERSION}}.jar)).
+
+ groupId = org.scala-lang
+ artifactId = scala-library
+ version = {{site.SCALA_VERSION}}
+
+2. **Configuration file**: On that machine, configure Flume agent to send data to an Avro sink by having the following in the configuration file.
+
+ agent.sinks = spark
+ agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
+ agent.sinks.spark.hostname = <hostname of the local machine>
+ agent.sinks.spark.port = <port to listen on for connection from Spark>
+ agent.sinks.spark.channel = memoryChannel
+
+ Also make sure that the upstream Flume pipeline is configured to send the data to the Flume agent running this sink.
+
+See the [Flume's documentation](https://flume.apache.org/documentation.html) for more information about
+configuring Flume agents.
+
+#### Configuring Spark Streaming Application
+1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide).
+
+2. **Programming:** In the streaming application code, import `FlumeUtils` and create input DStream as follows.
+
+ <div class="codetabs">
+ <div data-lang="scala" markdown="1">
+ import org.apache.spark.streaming.flume._
+
+ val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])
+ </div>
+ <div data-lang="java" markdown="1">
+ import org.apache.spark.streaming.flume.*;
+
+ JavaReceiverInputDStream<SparkFlumeEvent>flumeStream =
+ FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
+ </div>
+ </div>
+
+ See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).
+
+ Note that each input DStream can be configured to receive data from multiple sinks.
+
+3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
+
+
+
diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md
new file mode 100644
index 0000000000..a3b705d4c3
--- /dev/null
+++ b/docs/streaming-kafka-integration.md
@@ -0,0 +1,42 @@
+---
+layout: global
+title: Spark Streaming + Kafka Integration Guide
+---
+[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Here we explain how to configure Spark Streaming to receive data from Kafka.
+
+1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
+
+ groupId = org.apache.spark
+ artifactId = spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
+ version = {{site.SPARK_VERSION_SHORT}}
+
+2. **Programming:** In the streaming application code, import `KafkaUtils` and create input DStream as follows.
+
+ <div class="codetabs">
+ <div data-lang="scala" markdown="1">
+ import org.apache.spark.streaming.kafka._
+
+ val kafkaStream = KafkaUtils.createStream(
+ streamingContext, [zookeeperQuorum], [group id of the consumer], [per-topic number of Kafka partitions to consume])
+
+ See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
+ and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala).
+ </div>
+ <div data-lang="java" markdown="1">
+ import org.apache.spark.streaming.kafka.*;
+
+ JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(
+ streamingContext, [zookeeperQuorum], [group id of the consumer], [per-topic number of Kafka partitions to consume]);
+
+ See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
+ and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java).
+ </div>
+ </div>
+
+ *Points to remember:*
+
+ - Topic partitions in Kafka does not correlate to partitions of RDDs generated in Spark Streaming. So increasing the number of topic-specific partitions in the `KafkaUtils.createStream()` only increases the number of threads using which topics that are consumed within a single receiver. It does not increase the parallelism of Spark in processing the data. Refer to the main document for more information on that.
+
+ - Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers.
+
+3. **Deploying:** Package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md
new file mode 100644
index 0000000000..079d4c5550
--- /dev/null
+++ b/docs/streaming-kinesis-integration.md
@@ -0,0 +1,110 @@
+---
+layout: global
+title: Spark Streaming + Kinesis Integration
+---
+[Amazon Kinesis](http://aws.amazon.com/kinesis/) is a fully managed service for real-time processing of streaming data at massive scale.
+The Kinesis input DStream and receiver uses the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL).
+The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concept of Workers, Checkpoints, and Shard Leases.
+Here we explain how to configure Spark Streaming to receive data from Kinesis.
+
+#### Configuring Kinesis
+
+A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or more shards per the following
+[guide](http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html).
+
+
+#### Configuring Spark Streaming Application
+
+1. **Linking:** In your SBT/Maven projrect definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
+
+ groupId = org.apache.spark
+ artifactId = spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}
+ version = {{site.SPARK_VERSION_SHORT}}
+
+ **Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.**
+
+2. **Programming:** In the streaming application code, import `KinesisUtils` and create input DStream as follows.
+
+ <div class="codetabs">
+ <div data-lang="scala" markdown="1">
+ import org.apache.spark.streaming.kinesis._
+ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+ val kinesisStream = KinesisUtils.createStream(
+ streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position])
+
+ See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
+ and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the next subsection for instructions to run the example.
+
+ </div>
+ <div data-lang="java" markdown="1">
+ import org.apache.spark.streaming.flume.*;
+
+ JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
+ streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]);
+
+ See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
+ and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the next subsection for instructions to run the example.
+
+ </div>
+ </div>
+
+ `[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).
+
+ `[checkpoint interval]`: The interval at which the Kinesis client library is going to save its position in the stream. For starters, set it to the same as the batch interval of the streaming application.
+
+ `[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see later section and Amazon Kinesis API documentation for more details).
+
+ *Points to remember:*
+
+ - The name used in the context of the streaming application must be unique for a given account and region. Changing the app name or stream name could lead to Kinesis errors as only a single logical application can process a single stream.
+ - A single Kinesis input DStream can receive many Kinesis shards by spinning up multiple KinesisRecordProcessor threads. Note that there is no correlation between number of shards in Kinesis and the number of partitions in the generated RDDs that is used for processing the data.
+ - You never need more KinesisReceivers than the number of shards in your stream as each will spin up at least one KinesisRecordProcessor thread.
+ - Horizontal scaling is achieved by autoscaling additional Kinesis input DStreams (separate processes) up to the number of current shards for a given stream, of course.
+
+3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
+
+ - A DynamoDB table and CloudWatch namespace are created during KCL initialization using this Kinesis application name. This DynamoDB table lives in the us-east-1 region regardless of the Kinesis endpoint URL. It is used to store KCL's checkpoint information.
+
+ - If you are seeing errors after changing the app name or stream name, it may be necessary to manually delete the DynamoDB table and start from scratch.
+
+#### Running the Example
+To run the example,
+- Download Spark source and follow the [instructions](building-with-maven.html) to build Spark with profile *-Pkinesis-asl*.
+
+ mvn -Pkinesis-asl -DskipTests clean package
+
+- Set up Kinesis stream (see earlier section). Note the name of the Kinesis stream, and the endpoint URL corresponding to the region the stream is based on.
+
+- Set up the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_KEY with your AWS credentials.
+
+- In the Spark root directory, run the example as
+ <div class="codetabs">
+ <div data-lang="scala" markdown="1">
+
+ bin/run-example streaming.KinesisWordCountASL [Kinesis stream name] [endpoint URL]
+
+ </div>
+ <div data-lang="java" markdown="1">
+
+ bin/run-example streaming.JavaKinesisWordCountASL [Kinesis stream name] [endpoint URL]
+
+ </div>
+ </div>
+
+ This will wait for data to be received from Kinesis.
+
+- To generate random string data, in another terminal, run the associated Kinesis data producer.
+
+ bin/run-example streaming.KinesisWordCountProducerASL [Kinesis stream name] [endpoint URL] 1000 10
+
+ This will push random words to the Kinesis stream, which should then be received and processed by the running example.
+
+#### Kinesis Checkpointing
+The Kinesis receiver checkpoints the position of the stream that has been read periodically, so that the system can recover from failures and continue processing where it had left off. Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random-backoff-retry strategy.
+
+- If no Kinesis checkpoint info exists, the KinesisReceiver will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or from the latest tip (InitialPostitionInStream.LATEST). This is configurable.
+
+- InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running (and no checkpoint info is being stored). In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data.
+
+- InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records where the impact is dependent on checkpoint frequency.
diff --git a/docs/streaming-kinesis.md b/docs/streaming-kinesis.md
deleted file mode 100644
index 16ad322210..0000000000
--- a/docs/streaming-kinesis.md
+++ /dev/null
@@ -1,59 +0,0 @@
----
-layout: global
-title: Spark Streaming Kinesis Receiver
----
-
-## Kinesis
-###Design
-<li>The KinesisReceiver uses the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License.</li>
-<li>The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concept of Workers, Checkpoints, and Shard Leases.</li>
-<li>The KCL uses DynamoDB to maintain all state. A DynamoDB table is created in the us-east-1 region (regardless of Kinesis stream region) during KCL initialization for each Kinesis application name.</li>
-<li>A single KinesisReceiver can process many shards of a stream by spinning up multiple KinesisRecordProcessor threads.</li>
-<li>You never need more KinesisReceivers than the number of shards in your stream as each will spin up at least one KinesisRecordProcessor thread.</li>
-<li>Horizontal scaling is achieved by autoscaling additional KinesisReceiver (separate processes) or spinning up new KinesisRecordProcessor threads within each KinesisReceiver - up to the number of current shards for a given stream, of course. Don't forget to autoscale back down!</li>
-
-### Build
-<li>Spark supports a Streaming KinesisReceiver, but it is not included in the default build due to Amazon Software Licensing (ASL) restrictions.</li>
-<li>To build with the Kinesis Streaming Receiver and supporting ASL-licensed code, you must run the maven or sbt builds with the **-Pkinesis-asl** profile.</li>
-<li>All KinesisReceiver-related code, examples, tests, and artifacts live in **$SPARK_HOME/extras/kinesis-asl/**.</li>
-<li>Kinesis-based Spark Applications will need to link to the **spark-streaming-kinesis-asl** artifact that is built when **-Pkinesis-asl** is specified.</li>
-<li>_**Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.</li>
-
-###Example
-<li>To build the Kinesis example, you must run the maven or sbt builds with the **-Pkinesis-asl** profile.</li>
-<li>You need to setup a Kinesis stream at one of the valid Kinesis endpoints with 1 or more shards per the following: http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html</li>
-<li>Valid Kinesis endpoints can be found here: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region</li>
-<li>When running **locally**, the example automatically determines the number of threads and KinesisReceivers to spin up based on the number of shards configured for the stream. Therefore, **local[n]** is not needed when starting the example as with other streaming examples.</li>
-<li>While this example could use a single KinesisReceiver which spins up multiple KinesisRecordProcessor threads to process multiple shards, I wanted to demonstrate unioning multiple KinesisReceivers as a single DStream. (It's a bit confusing in local mode.)</li>
-<li>**KinesisWordCountProducerASL** is provided to generate random records into the Kinesis stream for testing.</li>
-<li>The example has been configured to immediately replicate incoming stream data to another node by using (StorageLevel.MEMORY_AND_DISK_2)
-<li>Spark checkpointing is disabled because the example does not use any stateful or window-based DStream operations such as updateStateByKey and reduceByWindow. If those operations are introduced, you would need to enable checkpointing or risk losing data in the case of a failure.</li>
-<li>Kinesis checkpointing is enabled. This means that the example will recover from a Kinesis failure.</li>
-<li>The example uses InitialPositionInStream.LATEST strategy to pull from the latest tip of the stream if no Kinesis checkpoint info exists.</li>
-<li>In our example, **KinesisWordCount** is the Kinesis application name for both the Scala and Java versions. The use of this application name is described next.</li>
-
-###Deployment and Runtime
-<li>A Kinesis application name must be unique for a given account and region.</li>
-<li>A DynamoDB table and CloudWatch namespace are created during KCL initialization using this Kinesis application name. http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization</li>
-<li>This DynamoDB table lives in the us-east-1 region regardless of the Kinesis endpoint URL.</li>
-<li>Changing the app name or stream name could lead to Kinesis errors as only a single logical application can process a single stream.</li>
-<li>If you are seeing errors after changing the app name or stream name, it may be necessary to manually delete the DynamoDB table and start from scratch.</li>
-<li>The Kinesis libraries must be present on all worker nodes, as they will need access to the KCL.</li>
-<li>The KinesisReceiver uses the DefaultAWSCredentialsProviderChain for AWS credentials which searches for credentials in the following order of precedence:</br>
-1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY<br/>
-2) Java System Properties - aws.accessKeyId and aws.secretKey<br/>
-3) Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs<br/>
-4) Instance profile credentials - delivered through the Amazon EC2 metadata service
-</li>
-
-###Fault-Tolerance
-<li>The combination of Spark Streaming and Kinesis creates 2 different checkpoints that may occur at different intervals.</li>
-<li>Checkpointing too frequently against Kinesis will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random backoff retry strategy.</li>
-<li>Upon startup, a KinesisReceiver will begin processing records with sequence numbers greater than the last Kinesis checkpoint sequence number recorded per shard (stored in the DynamoDB table).</li>
-<li>If no Kinesis checkpoint info exists, the KinesisReceiver will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or from the latest tip (InitialPostitionInStream.LATEST). This is configurable.</li>
-<li>InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running (and no checkpoint info is being stored.)</li>
-<li>In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data.</li>
-<li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records where the impact is dependent on checkpoint frequency.</li>
-<li>Record processing should be idempotent when possible.</li>
-<li>A failed or latent KinesisRecordProcessor within the KinesisReceiver will be detected and automatically restarted by the KCL.</li>
-<li>If possible, the KinesisReceiver should be shutdown cleanly in order to trigger a final checkpoint of all KinesisRecordProcessors to avoid duplicate record processing.</li> \ No newline at end of file
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 9f331ed50d..3d4bce4966 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -7,12 +7,12 @@ title: Spark Streaming Programming Guide
{:toc}
# Overview
-Spark Streaming is an extension of the core Spark API that allows enables high-throughput,
+Spark Streaming is an extension of the core Spark API that allows enables scalable, high-throughput,
fault-tolerant stream processing of live data streams. Data can be ingested from many sources
like Kafka, Flume, Twitter, ZeroMQ, Kinesis or plain old TCP sockets and be processed using complex
algorithms expressed with high-level functions like `map`, `reduce`, `join` and `window`.
Finally, processed data can be pushed out to filesystems, databases,
-and live dashboards. In fact, you can apply Spark's in-built
+and live dashboards. In fact, you can apply Spark's
[machine learning](mllib-guide.html) algorithms, and
[graph processing](graphx-programming-guide.html) algorithms on data streams.
@@ -60,35 +60,24 @@ do is as follows.
<div data-lang="scala" markdown="1" >
First, we import the names of the Spark Streaming classes, and some implicit
conversions from StreamingContext into our environment, to add useful methods to
-other classes we need (like DStream).
-
-[StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) is the
-main entry point for all streaming functionality.
+other classes we need (like DStream). [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) is the
+main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and batch interval of 1 second.
{% highlight scala %}
+import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
-{% endhighlight %}
-
-Then we create a
-[StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) object.
-Besides Spark's configuration, we specify that any DStream will be processed
-in 1 second batches.
-{% highlight scala %}
-import org.apache.spark.api.java.function._
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.api._
-// Create a StreamingContext with a local master
-// Spark Streaming needs at least two working thread
-val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(1))
+// Create a local StreamingContext with two working thread and batch interval of 1 second
+val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
+val ssc = new StreamingContext(conf, Seconds(1))
{% endhighlight %}
-Using this context, we then create a new DStream
-by specifying the IP address and port of the data server.
+Using this context, we can create a DStream that represents streaming data from a TCP
+source hostname, e.g. `localhost`, and port, e.g. `9999`
{% highlight scala %}
-// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
+// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
{% endhighlight %}
@@ -112,7 +101,7 @@ import org.apache.spark.streaming.StreamingContext._
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
-// Print a few of the counts to the console
+// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
{% endhighlight %}
@@ -139,23 +128,25 @@ The complete code can be found in the Spark Streaming example
First, we create a
[JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html) object,
which is the main entry point for all streaming
-functionality. Besides Spark's configuration, we specify that any DStream would be processed
-in 1 second batches.
+functionality. We create a local StreamingContext with two execution threads, and a batch interval of 1 second.
{% highlight java %}
+import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
-// Create a StreamingContext with a local master
-JavaStreamingContext jssc = new JavaStreamingContext("local[2]", "JavaNetworkWordCount", new Duration(1000))
+
+// Create a local StreamingContext with two working thread and batch interval of 1 second
+val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
+JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000))
{% endhighlight %}
-Using this context, we then create a new DStream
-by specifying the IP address and port of the data server.
+Using this context, we can create a DStream that represents streaming data from a TCP
+source hostname, e.g. `localhost`, and port, e.g. `9999`
{% highlight java %}
-// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
+// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
{% endhighlight %}
@@ -197,7 +188,9 @@ JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
return i1 + i2;
}
});
-wordCounts.print(); // Print a few of the counts to the console
+
+// Print the first ten elements of each RDD generated in this DStream to the console
+wordCounts.print();
{% endhighlight %}
The `words` DStream is further mapped (one-to-one transformation) to a DStream of `(word,
@@ -207,8 +200,8 @@ using a [Function2](api/scala/index.html#org.apache.spark.api.java.function.Func
Finally, `wordCounts.print()` will print a few of the counts generated every second.
Note that when these lines are executed, Spark Streaming only sets up the computation it
-will perform when it is started, and no real processing has started yet. To start the processing
-after all the transformations have been setup, we finally call
+will perform after it is started, and no real processing has started yet. To start the processing
+after all the transformations have been setup, we finally call `start` method.
{% highlight java %}
jssc.start(); // Start the computation
@@ -235,12 +228,12 @@ Then, in a different terminal, you can start the example by using
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight bash %}
-$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999
+$ ./bin/run-example streaming.NetworkWordCount localhost 9999
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight bash %}
-$ ./bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999
+$ ./bin/run-example JavaNetworkWordCount localhost 9999
{% endhighlight %}
</div>
</div>
@@ -281,25 +274,11 @@ Time: 1357008430000 ms
</td>
</table>
-You can also use Spark Streaming directly from the Spark shell:
-
-{% highlight bash %}
-$ bin/spark-shell
-{% endhighlight %}
-
-... and create your StreamingContext by wrapping the existing interactive shell
-SparkContext object, `sc`:
-
-{% highlight scala %}
-val ssc = new StreamingContext(sc, Seconds(1))
-{% endhighlight %}
-When working with the shell, you may also need to send a `^D` to your netcat session
-to force the pipeline to print the word counts to the console at the sink.
-
-***************************************************************************************************
+***************************************************************************************************
+***************************************************************************************************
-# Basics
+# Basic Concepts
Next, we move beyond the simple example and elaborate on the basics of Spark Streaming that you
need to know to write your streaming applications.
@@ -319,68 +298,120 @@ Streaming core
artifact `spark-streaming-xyz_{{site.SCALA_BINARY_VERSION}}` to the dependencies. For example,
some of the common ones are as follows.
-
<table class="table">
<tr><th>Source</th><th>Artifact</th></tr>
<tr><td> Kafka </td><td> spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}} </td></tr>
<tr><td> Flume </td><td> spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} </td></tr>
+<tr><td> Kinesis<br/></td><td>spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} </td></tr>
<tr><td> Twitter </td><td> spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}} </td></tr>
<tr><td> ZeroMQ </td><td> spark-streaming-zeromq_{{site.SCALA_BINARY_VERSION}} </td></tr>
<tr><td> MQTT </td><td> spark-streaming-mqtt_{{site.SCALA_BINARY_VERSION}} </td></tr>
-<tr><td> Kinesis<br/>(built separately)</td><td> kinesis-asl_{{site.SCALA_BINARY_VERSION}} </td></tr>
-<tr><td> </td><td></td></tr>
+<tr><td></td><td></td></tr>
</table>
For an up-to-date list, please refer to the
-[Apache repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION}}%22)
+[Apache repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
for the full list of supported sources and artifacts.
-## Initializing
+***
+
+## Initializing StreamingContext
+
+To initialize a Spark Streaming program, a **StreamingContext** object has to be created which is the main entry point of all Spark Streaming functionality.
<div class="codetabs">
<div data-lang="scala" markdown="1">
-To initialize a Spark Streaming program in Scala, a
-[`StreamingContext`](api/scala/index.html#org.apache.spark.streaming.StreamingContext)
-object has to be created, which is the main entry point of all Spark Streaming functionality.
-A `StreamingContext` object can be created by using
+A [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) object can be created from a [SparkConf](api/scala/index.html#org.apache.spark.SparkConf) object.
{% highlight scala %}
-new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
+import org.apache.spark._
+import org.apache.spark.streaming._
+
+val conf = new SparkConf().setAppName(appName).setMaster(master)
+val ssc = new StreamingContext(conf, Seconds(1))
{% endhighlight %}
-</div>
-<div data-lang="java" markdown="1">
-To initialize a Spark Streaming program in Java, a
-[`JavaStreamingContext`](api/scala/index.html#org.apache.spark.streaming.api.java.JavaStreamingContext)
-object has to be created, which is the main entry point of all Spark Streaming functionality.
-A `JavaStreamingContext` object can be created by using
+The `appName` parameter is a name for your application to show on the cluster UI.
+`master` is a [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls),
+or a special __"local[\*]"__ string to run in local mode. In practice, when running on a cluster,
+you will not want to hardcode `master` in the program,
+but rather [launch the application with `spark-submit`](submitting-applications.html) and
+receive it there. However, for local testing and unit tests, you can pass "local[\*]" to run Spark Streaming
+in-process (detects the number of cores in the local system). Note that this internally creates a [SparkContext](api/scala/index.html#org.apache.spark.SparkContext) (starting point of all Spark functionality) which can be accessed as `ssc.sparkContext`.
+
+The batch interval must be set based on the latency requirements of your application
+and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-size)
+section for more details.
+
+A `StreamingContext` object can also be created from an existing `SparkContext` object.
{% highlight scala %}
-new JavaStreamingContext(master, appName, batchInterval, [sparkHome], [jars])
+import org.apache.spark.streaming._
+
+val sc = ... // existing SparkContext
+val ssc = new StreamingContext(sc, Seconds(1))
{% endhighlight %}
+
+
</div>
-</div>
+<div data-lang="java" markdown="1">
-The `master` parameter is a standard [Spark cluster URL](programming-guide.html#master-urls)
-and can be "local" for local testing. The `appName` is a name of your program,
-which will be shown on your cluster's web UI. The `batchInterval` is the size of the batches,
-as explained earlier. Finally, the last two parameters are needed to deploy your code to a cluster
- if running in distributed mode, as described in the
- [Spark programming guide](programming-guide.html#deploying-code-on-a-cluster).
- Additionally, the underlying SparkContext can be accessed as
-`ssc.sparkContext`.
+A [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html) object can be created from a [SparkConf](api/java/index.html?org/apache/spark/SparkConf.html) object.
+
+{% highlight java %}
+import org.apache.spark.*;
+import org.apache.spark.streaming.api.java.*;
+
+SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
+JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration(1000));
+{% endhighlight %}
+
+The `appName` parameter is a name for your application to show on the cluster UI.
+`master` is a [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls),
+or a special __"local[\*]"__ string to run in local mode. In practice, when running on a cluster,
+you will not want to hardcode `master` in the program,
+but rather [launch the application with `spark-submit`](submitting-applications.html) and
+receive it there. However, for local testing and unit tests, you can pass "local[*]" to run Spark Streaming
+in-process. Note that this internally creates a [JavaSparkContext](api/java/index.html?org/apache/spark/api/java/JavaSparkContext.html) (starting point of all Spark functionality) which can be accessed as `ssc.sparkContext`.
The batch interval must be set based on the latency requirements of your application
and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-size)
section for more details.
-## DStreams
-*Discretized Stream* or *DStream* is the basic abstraction provided by Spark Streaming.
+A `JavaStreamingContext` object can also be created from an existing `JavaSparkContext`.
+
+{% highlight java %}
+import org.apache.spark.streaming.api.java.*;
+
+JavaSparkContext sc = ... //existing JavaSparkContext
+JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
+{% endhighlight %}
+</div>
+</div>
+
+After a context is defined, you have to do the follow steps.
+1. Define the input sources.
+1. Setup the streaming computations.
+1. Start the receiving and procesing of data using `streamingContext.start()`.
+1. The processing will continue until `streamingContext.stop()` is called.
+
+##### Points to remember:
+{:.no_toc}
+- Once a context has been started, no new streaming computations can be setup or added to it.
+- Once a context has been stopped, it cannot be started (that is, re-used) again.
+- Only one StreamingContext can be active in a JVM at the same time.
+- stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set optional parameter of `stop()` called `stopSparkContext` to false.
+- A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.
+
+***
+
+## Discretized Streams (DStreams)
+**Discretized Stream** or **DStream** is the basic abstraction provided by Spark Streaming.
It represents a continuous stream of data, either the input data stream received from source,
or the processed data stream generated by transforming the input stream. Internally,
-it is represented by a continuous sequence of RDDs, which is Spark's abstraction of an immutable,
-distributed dataset. Each RDD in a DStream contains data from a certain interval,
+a DStream is represented by a continuous series of RDDs, which is Spark's abstraction of an immutable,
+distributed dataset (see [Spark Programming Guide](programming-guide.html#resilient-distributed-datasets-rdds) for more details). Each RDD in a DStream contains data from a certain interval,
as shown in the following figure.
<p style="text-align: center;">
@@ -392,8 +423,8 @@ as shown in the following figure.
Any operation applied on a DStream translates to operations on the underlying RDDs. For example,
in the [earlier example](#a-quick-example) of converting a stream of lines to words,
-the `flatmap` operation is applied on each RDD in the `lines` DStream to generate the RDDs of the
- `words` DStream. This is shown the following figure.
+the `flatMap` operation is applied on each RDD in the `lines` DStream to generate the RDDs of the
+ `words` DStream. This is shown in the following figure.
<p style="text-align: center;">
<img src="img/streaming-dstream-ops.png"
@@ -404,83 +435,117 @@ the `flatmap` operation is applied on each RDD in the `lines` DStream to generat
These underlying RDD transformations are computed by the Spark engine. The DStream operations
-hide most of these details and provides the developer with higher-level API for convenience.
+hide most of these details and provide the developer with higher-level API for convenience.
These operations are discussed in detail in later sections.
-## Input Sources
+***
+
+## Input DStreams
+Input DStreams are DStreams representing the stream of raw data received from streaming sources.
+Spark Streaming has two categories of streaming sources.
+
+- *Basic sources*: Sources directly available in the StreamingContext API. Example: file systems, socket connections, and Akka actors.
+- *Advanced sources*: Sources like Kafka, Flume, Kinesis, Twitter, etc. are available through extra utility classes. These require linking against extra dependencies as discussed in the [linking](#linking) section.
+
+Every input DStream (except file stream) is associated with a single [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) object which receives the data from a source and stores it in Spark's memory for processing. A receiver is run within a Spark worker/executor as a long-running task, hence it occupies one of the cores allocated to the Spark Streaming application. Hence, it is important to remember that Spark Streaming application needs to be allocated enough cores to process the received data, as well as, to run the receiver(s). Therefore, few important points to remember are:
+
+##### Points to remember:
+{:.no_toc}
+
+- If the number of cores allocated to the application is less than or equal to the number of input DStreams / receivers, then the system will receive data, but not be able to process them.
+- When running locally, if you master URL is set to "local", then there is only one core to run tasks. That is insufficient for programs with even one input DStream (file streams are okay) as the receiver will occupy that core and there will be no core left to process the data.
+
+### Basic Sources
+{:.no_toc}
We have already taken a look at the `ssc.socketTextStream(...)` in the [quick
example](#a-quick-example) which creates a DStream from text
-data received over a TCP socket connection. Besides sockets, the core Spark Streaming API provides
+data received over a TCP socket connection. Besides sockets, the StreamingContext API provides
methods for creating DStreams from files and Akka actors as input sources.
-Specifically, for files, the DStream can be created as
+- **File Streams:** For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as
-<div class="codetabs">
-<div data-lang="scala">
-{% highlight scala %}
-ssc.fileStream(dataDirectory)
-{% endhighlight %}
-</div>
-<div data-lang="java">
-{% highlight java %}
-jssc.fileStream(dataDirectory);
-{% endhighlight %}
-</div>
-</div>
+ <div class="codetabs">
+ <div data-lang="scala" markdown="1">
+ streamingContext.fileStream[keyClass, valueClass, inputFormatClass](dataDirectory)
+ </div>
+ <div data-lang="java" markdown="1">
+ streamingContext.fileStream<keyClass, valueClass, inputFormatClass>(dataDirectory);
+ </div>
+ </div>
-Spark Streaming will monitor the directory `dataDirectory` for any Hadoop-compatible filesystem
-and process any files created in that directory. Note that
+ Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that
- * The files must have the same data format.
- * The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into
- the data directory.
- * Once moved the files must not be changed.
+ + The files must have the same data format.
+ + The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into
+ the data directory.
+ + Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
-For more details on streams from files, Akka actors and sockets,
+ For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores.
+
+- **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka actors by using `streamingContext.actorStream(actorProps, actor-name)`. See the [Custom Receiver Guide](#implementing-and-using-a-custom-actor-based-receiver) for more details.
+
+- **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.
+
+For more details on streams from sockets, files, and actors,
see the API documentations of the relevant functions in
[StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) for
-Scala and [JavaStreamingContext](api/scala/index.html#org.apache.spark.streaming.api.java.JavaStreamingContext)
- for Java.
+Scala and [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html) for Java.
+
+### Advanced Sources
+{:.no_toc}
+This category of sources require interfacing with external non-Spark libraries, some of them with complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues related to version conflicts of dependencies, the functionality to create DStreams from these sources have been moved to separate libraries, that can be [linked to](#linking) explicitly as necessary. For example, if you want to create a DStream using data from Twitter's stream of tweets, you have to do the following.
-Additional functionality for creating DStreams from sources such as Kafka, Flume, Kinesis, and Twitter
-can be imported by adding the right dependencies as explained in an
-[earlier](#linking) section. To take the
-case of Kafka, after adding the artifact `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` to the
-project dependencies, you can create a DStream from Kafka as
+1. *Linking*: Add the artifact `spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}` to the SBT/Maven project dependencies.
+1. *Programming*: Import the `TwitterUtils` class and create a DStream with `TwitterUtils.createStream` as shown below.
+1. *Deploying*: Generate an uber JAR with all the dependencies (including the dependency `spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}` and its transitive dependencies) and then deploy the application. This is further explained in the [Deploying section](#deploying-applications).
<div class="codetabs">
<div data-lang="scala">
{% highlight scala %}
-import org.apache.spark.streaming.kafka._
-KafkaUtils.createStream(ssc, kafkaParams, ...)
+import org.apache.spark.streaming.twitter._
+
+TwitterUtils.createStream(ssc)
{% endhighlight %}
</div>
<div data-lang="java">
{% highlight java %}
-import org.apache.spark.streaming.kafka.*;
-KafkaUtils.createStream(jssc, kafkaParams, ...);
+import org.apache.spark.streaming.twitter.*;
+
+TwitterUtils.createStream(jssc);
{% endhighlight %}
</div>
</div>
-For more details on these additional sources, see the corresponding [API documentation](#where-to-go-from-here).
-Furthermore, you can also implement your own custom receiver for your sources. See the
-[Custom Receiver Guide](streaming-custom-receivers.html).
+Note that these advanced sources are not available in the `spark-shell`, hence applications based on these
+advanced sources cannot be tested in the shell.
+
+Some of these advanced sources are as follows.
+
+- **Twitter:** Spark Streaming's TwitterUtils uses Twitter4j 3.0.3 to get the public stream of tweets using
+ [Twitter's Streaming API](https://dev.twitter.com/docs/streaming-apis). Authentication information
+ can be provided by any of the [methods](http://twitter4j.org/en/configuration.html) supported by
+ Twitter4J library. You can either get the public stream, or get the filtered stream based on a
+ keywords. See the API documentation ([Scala](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$), [Java](api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html)) and examples ([TwitterPopularTags]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala) and
+ [TwitterAlgebirdCMS]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala)).
+
+- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} can received data from Flume 1.4.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details.
-### Kinesis
-[Kinesis](streaming-kinesis.html)
+- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} can receive data from Kafka 0.8.0. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details.
-## Operations
-There are two kinds of DStream operations - _transformations_ and _output operations_. Similar to
-RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams
-with transformed data. After applying a sequence of transformations to the input streams, output
-operations need to called, which write data out to an external data sink, such as a filesystem or a
-database.
+- **Kinesis:** See the [Kinesis Integration Guide](streaming-kinesis-integration.html) for more details.
-### Transformations
-DStreams support many of the transformations available on normal Spark RDD's. Some of the
-common ones are as follows.
+### Custom Sources
+{:.no_toc}
+Input DStreams can also be created out of custom data sources. All you have to do is implement an user-defined **receiver** (see next section to understand what that is) that can receive data from the custom sources and push it into Spark. See the
+[Custom Receiver Guide](streaming-custom-receivers.html) for details.
+
+***
+
+## Transformations on DStreams
+Similar to that of RDDs, transformations allow the data from the input DStream to be modified.
+DStreams support many of the transformations available on normal Spark RDD's.
+Some of the common ones are as follows.
<table class="table">
<tr><th style="width:25%">Transformation</th><th>Meaning</th></tr>
@@ -557,8 +622,8 @@ common ones are as follows.
The last two transformations are worth highlighting again.
-<h4>UpdateStateByKey Operation</h4>
-
+#### UpdateStateByKey Operation
+{:.no_toc}
The `updateStateByKey` operation allows you to maintain arbitrary state while continuously updating
it with new information. To use this, you will have to do two steps.
@@ -616,8 +681,8 @@ the `(word, 1)` pairs) and the `runningCount` having the previous count. For the
Scala code, take a look at the example
[StatefulNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala).
-<h4>Transform Operation</h4>
-
+#### Transform Operation
+{:.no_toc}
The `transform` operation (along with its variations like `transformWith`) allows
arbitrary RDD-to-RDD functions to be applied on a DStream. It can be used to apply any RDD
operation that is not exposed in the DStream API.
@@ -662,8 +727,8 @@ JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
In fact, you can also use [machine learning](mllib-guide.html) and
[graph computation](graphx-programming-guide.html) algorithms in the `transform` method.
-<h4>Window Operations</h4>
-
+#### Window Operations
+{:.no_toc}
Finally, Spark Streaming also provides *windowed computations*, which allow you to apply
transformations over a sliding window of data. This following figure illustrates this sliding
window.
@@ -678,11 +743,11 @@ window.
As shown in the figure, every time the window *slides* over a source DStream,
the source RDDs that fall within the window are combined and operated upon to produce the
RDDs of the windowed DStream. In this specific case, the operation is applied over last 3 time
-units of data, and slides by 2 time units. This shows that any window-based operation needs to
+units of data, and slides by 2 time units. This shows that any window operation needs to
specify two parameters.
* <i>window length</i> - The duration of the window (3 in the figure)
- * <i>slide interval</i> - The interval at which the window-based operation is performed (2 in
+ * <i>sliding interval</i> - The interval at which the window operation is performed (2 in
the figure).
These two parameters must be multiples of the batch interval of the source DStream (1 in the
@@ -720,7 +785,7 @@ JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow
</div>
</div>
-Some of the common window-based operations are as follows. All of these operations take the
+Some of the common window operations are as follows. All of these operations take the
said two parameters - <i>windowLength</i> and <i>slideInterval</i>.
<table class="table">
@@ -778,21 +843,27 @@ said two parameters - <i>windowLength</i> and <i>slideInterval</i>.
<tr><td></td><td></td></tr>
</table>
-### Output Operations
-When an output operator is called, it triggers the computation of a stream. Currently the following
-output operators are defined:
+
+The complete list of DStream transformations is available in the API documentation. For the Scala API,
+see [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream)
+and [PairDStreamFunctions](api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions).
+For the Java API, see [JavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html)
+and [JavaPairDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaPairDStream.html).
+
+***
+
+## Output Operations on DStreams
+Output operations allow DStream's data to be pushed out external systems like a database or a file systems.
+Since the output operations actually allow the transformed data to be consumed by external systems,
+they trigger the actual execution of all the DStream transformations (similar to actions for RDDs).
+Currently, the following output operations are defined:
<table class="table">
<tr><th style="width:30%">Output Operation</th><th>Meaning</th></tr>
<tr>
<td> <b>print</b>() </td>
- <td> Prints first ten elements of every batch of data in a DStream on the driver. </td>
-</tr>
-<tr>
- <td> <b>foreachRDD</b>(<i>func</i>) </td>
- <td> The fundamental output operator. Applies a function, <i>func</i>, to each RDD generated from
- the stream. This function should have side effects, such as printing output, saving the RDD to
- external files, or writing it over the network to an external system. </td>
+ <td> Prints first ten elements of every batch of data in a DStream on the driver.
+ This is useful for development and debugging. </td>
</tr>
<tr>
<td> <b>saveAsObjectFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
@@ -811,17 +882,84 @@ output operators are defined:
<td> Save this DStream's contents as a Hadoop file. The file name at each batch interval is
generated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>. </td>
</tr>
+<tr>
+ <td> <b>foreachRDD</b>(<i>func</i>) </td>
+ <td> The most generic output operator that applies a function, <i>func</i>, to each RDD generated from
+ the stream. This function should push the data in each RDD to a external system, like saving the RDD to
+ files, or writing it over the network to a database. Note that the function <i>func</i> is executed
+ at the driver, and will usually have RDD actions in it that will force the computation of the streaming RDDs.</td>
+</tr>
<tr><td></td><td></td></tr>
</table>
+### Design Patterns for using foreachRDD
+{:.no_toc}
+`dstream.foreachRDD` is a powerful primitive that allows data to sent out to external systems.
+However, it is important to understand how to use this primitive correctly and efficiently.
+Some of the common mistakes to avoid are as follows.
-The complete list of DStream operations is available in the API documentation. For the Scala API,
-see [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream)
-and [PairDStreamFunctions](api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions).
-For the Java API, see [JavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html)
-and [JavaPairDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaPairDStream.html).
+- Often writing data to external system requires creating a connection object
+(e.g. TCP connection to a remote server) and using it to send data to a remote system.
+For this purpose, a developer may inadvertantly try creating a connection object at
+the Spark driver, but try to use it in a Spark worker to save records in the RDDs.
+For example (in Scala),
+
+ dstream.foreachRDD(rdd => {
+ val connection = createNewConnection() // executed at the driver
+ rdd.foreach(record => {
+ connection.send(record) // executed at the worker
+ })
+ })
+
+ This is incorrect as this requires the connection object to be serialized and sent from the driver to the worker. Such connection objects are rarely transferrable across machines. This error may manifest as serialization errors (connection object not serializable), initialization errors (connection object needs to be initialized at the workers), etc. The correct solution is to create the connection object at the worker.
+
+- However, this can lead to another common mistake - creating a new connection for every record. For example,
+
+ dstream.foreachRDD(rdd => {
+ rdd.foreach(record => {
+ val connection = createNewConnection()
+ connection.send(record)
+ connection.close()
+ })
+ })
+
+ Typically, creating a connection object has time and resource overheads. Therefore, creating and destroying a connection object for each record can incur unnecessarily high overheads and can significantly reduce the overall throughput of the system. A better solution is to use `rdd.foreachPartition` - create a single connection object and send all the records in a RDD partition using that connection.
+
+ dstream.foreachRDD(rdd => {
+ rdd.foreachPartition(partitionOfRecords => {
+ val connection = createNewConnection()
+ partitionOfRecords.foreach(record => connection.send(record))
+ connection.close()
+ })
+ })
+
+ This amortizes the connection creation overheads over many records.
+
+- Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches.
+ One can maintain a static pool of connection objects than can be reused as
+ RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.
-## Persistence
+ dstream.foreachRDD(rdd => {
+ rdd.foreachPartition(partitionOfRecords => {
+ // ConnectionPool is a static, lazily initialized pool of connections
+ val connection = ConnectionPool.getConnection()
+ partitionOfRecords.foreach(record => connection.send(record))
+ ConnectionPool.returnConnection(connection) // return to the pool for future reuse
+ })
+ })
+
+ Note that the connections in the pool should be lazily created on demand and timed out if not used for a while. This achieves the most efficient sending of data to external systems.
+
+
+##### Other points to remember:
+{:.no_toc}
+- DStreams are executed lazily by the output operations, just like RDDs are lazily executed by RDD actions. Specifically, RDD actions inside the DStream output operations force the processing of the received data. Hence, if your application does not have any output operation, or has output operations like `dstream.foreachRDD()` without any RDD action inside them, then nothing will get executed. The system will simply receive the data and discard it.
+
+- By default, output operations are executed one-at-a-time. And they are executed in the order they are defined in the application.
+
+***
+
+## Caching / Persistence
Similar to RDDs, DStreams also allow developers to persist the stream's data in memory. That is,
using `persist()` method on a DStream would automatically persist every RDD of that DStream in
memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple
@@ -838,7 +976,9 @@ memory. This is further discussed in the [Performance Tuning](#memory-tuning) se
information on different persistence levels can be found in
[Spark Programming Guide](programming-guide.html#rdd-persistence).
-## RDD Checkpointing
+***
+
+## Checkpointing
A _stateful operation_ is one which operates over multiple batches of data. This includes all
window-based operations and the `updateStateByKey` operation. Since stateful operations have a
dependency on previous batches of data, they continuously accumulate metadata over time.
@@ -867,10 +1007,19 @@ For DStreams that must be checkpointed (that is, DStreams created by `updateStat
`reduceByKeyAndWindow` with inverse function), the checkpoint interval of the DStream is by
default set to a multiple of the DStream's sliding interval such that its at least 10 seconds.
-## Deployment
+***
+
+## Deploying Applications
A Spark Streaming application is deployed on a cluster in the same way as any other Spark application.
Please refer to the [deployment guide](cluster-overview.html) for more details.
+Note that the applications
+that use [advanced sources](#advanced-sources) (e.g. Kafka, Flume, Twitter) are also required to package the
+extra artifact they link to, along with their dependencies, in the JAR that is used to deploy the application.
+For example, an application using `TwitterUtils` will have to include
+`spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}}` and all its transitive
+dependencies in the application JAR.
+
If a running Spark Streaming application needs to be upgraded (with new application code), then
there are two possible mechanism.
@@ -889,7 +1038,9 @@ application left off. Note that this can be done only with input sources that su
(like Kafka, and Flume) as data needs to be buffered while the previous application down and
the upgraded application is not yet up.
-## Monitoring
+***
+
+## Monitoring Applications
Beyond Spark's [monitoring capabilities](monitoring.html), there are additional capabilities
specific to Spark Streaming. When a StreamingContext is used, the
[Spark web UI](monitoring.html#web-interfaces) shows
@@ -912,22 +1063,18 @@ The progress of a Spark Streaming program can also be monitored using the
which allows you to get receiver status and processing times. Note that this is a developer API
and it is likely to be improved upon (i.e., more information reported) in the future.
-***************************************************************************************************
+***************************************************************************************************
+***************************************************************************************************
# Performance Tuning
Getting the best performance of a Spark Streaming application on a cluster requires a bit of
tuning. This section explains a number of the parameters and configurations that can tuned to
improve the performance of you application. At a high level, you need to consider two things:
-<ol>
-<li>
- Reducing the processing time of each batch of data by efficiently using cluster resources.
-</li>
-<li>
- Setting the right batch size such that the batches of data can be processed as fast as they
- are received (that is, data processing keeps up with the data ingestion).
-</li>
-</ol>
+1. Reducing the processing time of each batch of data by efficiently using cluster resources.
+
+2. Setting the right batch size such that the batches of data can be processed as fast as they
+ are received (that is, data processing keeps up with the data ingestion).
## Reducing the Processing Time of each Batch
There are a number of optimizations that can be done in Spark to minimize the processing time of
@@ -935,6 +1082,7 @@ each batch. These have been discussed in detail in [Tuning Guide](tuning.html).
highlights some of the most important ones.
### Level of Parallelism in Data Receiving
+{:.no_toc}
Receiving data over the network (like Kafka, Flume, socket, etc.) requires the data to deserialized
and stored in Spark. If the data receiving becomes a bottleneck in the system, then consider
parallelizing the data receiving. Note that each input DStream
@@ -958,6 +1106,7 @@ This distributes the received batches of data across specified number of machine
before further processing.
### Level of Parallelism in Data Processing
+{:.no_toc}
Cluster resources maybe under-utilized if the number of parallel tasks used in any stage of the
computation is not high enough. For example, for distributed reduce operations like `reduceByKey`
and `reduceByKeyAndWindow`, the default number of parallel tasks is decided by the [config property]
@@ -968,6 +1117,7 @@ documentation), or set the [config property](configuration.html#spark-properties
`spark.default.parallelism` to change the default.
### Data Serialization
+{:.no_toc}
The overhead of data serialization can be significant, especially when sub-second batch sizes are
to be achieved. There are two aspects to it.
@@ -980,6 +1130,7 @@ The overhead of data serialization can be significant, especially when sub-secon
serialization format. Hence, the deserialization overhead of input data may be a bottleneck.
### Task Launching Overheads
+{:.no_toc}
If the number of tasks launched per second is high (say, 50 or more per second), then the overhead
of sending out tasks to the slaves maybe significant and will make it hard to achieve sub-second
latencies. The overhead can be reduced by the following changes:
@@ -994,6 +1145,8 @@ latencies. The overhead can be reduced by the following changes:
These changes may reduce batch processing time by 100s of milliseconds,
thus allowing sub-second batch size to be viable.
+***
+
## Setting the Right Batch Size
For a Spark Streaming application running on a cluster to be stable, the system should be able to
process data as fast as it is being received. In other words, batches of data should be processed
@@ -1022,6 +1175,8 @@ data rate and/or reducing the batch size. Note that momentary increase in the de
temporary data rate increases maybe fine as long as the delay reduces back to a low value
(i.e., less than batch size).
+***
+
## Memory Tuning
Tuning the memory usage and GC behavior of Spark applications have been discussed in great detail
in the [Tuning Guide](tuning.html). It is recommended that you read that. In this section,
@@ -1037,7 +1192,7 @@ Even though keeping the data serialized incurs higher serialization/deserializat
it significantly reduces GC pauses.
* **Clearing persistent RDDs**: By default, all persistent RDDs generated by Spark Streaming will
- be cleared from memory based on Spark's in-built policy (LRU). If `spark.cleaner.ttl` is set,
+ be cleared from memory based on Spark's built-in policy (LRU). If `spark.cleaner.ttl` is set,
then persistent RDDs that are older than that value are periodically cleared. As mentioned
[earlier](#operation), this needs to be careful set based on operations used in the Spark
Streaming program. However, a smarter unpersisting of RDDs can be enabled by setting the
@@ -1051,7 +1206,8 @@ minimizes the variability of GC pauses. Even though concurrent GC is known to re
overall processing throughput of the system, its use is still recommended to achieve more
consistent batch processing times.
-***************************************************************************************************
+***************************************************************************************************
+***************************************************************************************************
# Fault-tolerance Properties
In this section, we are going to discuss the behavior of Spark Streaming application in the event
@@ -1124,7 +1280,7 @@ def functionToCreateContext(): StreamingContext = {
ssc
}
-// Get StreaminContext from checkpoint data or create a new one
+// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
@@ -1178,10 +1334,7 @@ context.awaitTermination();
If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
If the directory does not exist (i.e., running for the first time),
then the function `contextFactory` will be called to create a new
-context and set up the DStreams. See the Scala example
-[JavaRecoverableWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaRecoverableWordCount.scala)
-(note that this example is missing in the 0.9 release, so you can test it using the master branch).
-This example appends the word counts of network data into a file.
+context and set up the DStreams.
You can also explicitly create a `JavaStreamingContext` from the checkpoint data and start
the computation by using `new JavaStreamingContext(checkpointDirectory)`.
@@ -1208,7 +1361,8 @@ automatically restarted, and the word counts will cont
For other deployment environments like Mesos and Yarn, you have to restart the driver through other
mechanisms.
-<h4>Recovery Semantics</h4>
+#### Recovery Semantics
+{:.no_toc}
There are two different failure behaviors based on which input sources are used.
@@ -1306,7 +1460,8 @@ in the file. This is what the sequence of outputs would be with and without a dr
If the driver had crashed in the middle of the processing of time 3, then it will process time 3
and output 30 after recovery.
-***************************************************************************************************
+***************************************************************************************************
+***************************************************************************************************
# Migration Guide from 0.9.1 or below to 1.x
Between Spark 0.9.1 and Spark 1.0, there were a few API changes made to ensure future API stability.
@@ -1332,7 +1487,7 @@ replaced by [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.
the following advantages.
* Methods like `stop` and `restart` have been added to for better control of the lifecycle of a receiver. See
-the [custom receiver guide](streaming-custom-receiver.html) for more details.
+the [custom receiver guide](streaming-custom-receivers.html) for more details.
* Custom receivers can be implemented using both Scala and Java.
To migrate your existing custom receivers from the earlier NetworkReceiver to the new Receiver, you have
@@ -1357,6 +1512,7 @@ to [`org.apache.spark.streaming.receiver`](api/scala/index.html#org.apache.spark
package and renamed for better clarity.
***************************************************************************************************
+***************************************************************************************************
# Where to Go from Here
@@ -1366,6 +1522,7 @@ package and renamed for better clarity.
[DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream)
* [KafkaUtils](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$),
[FlumeUtils](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$),
+ [KinesisUtils](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$),
[TwitterUtils](api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$),
[ZeroMQUtils](api/scala/index.html#org.apache.spark.streaming.zeromq.ZeroMQUtils$), and
[MQTTUtils](api/scala/index.html#org.apache.spark.streaming.mqtt.MQTTUtils$)
@@ -1375,6 +1532,7 @@ package and renamed for better clarity.
[PairJavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/PairJavaDStream.html)
* [KafkaUtils](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html),
[FlumeUtils](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html),
+ [KinesisUtils](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
[TwitterUtils](api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html),
[ZeroMQUtils](api/java/index.html?org/apache/spark/streaming/zeromq/ZeroMQUtils.html), and
[MQTTUtils](api/java/index.html?org/apache/spark/streaming/mqtt/MQTTUtils.html)