diff options
author | Chris Fregly <chris@fregly.com> | 2014-08-02 13:35:35 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-08-02 13:35:35 -0700 |
commit | 91f9504e6086fac05b40545099f9818949c24bca (patch) | |
tree | c79c63f0b3f82c4c9b632072f384b85bc7f646f1 /docs | |
parent | 67bd8e3c217a80c3117a6e3853aa60fe13d08c91 (diff) | |
download | spark-91f9504e6086fac05b40545099f9818949c24bca.tar.gz spark-91f9504e6086fac05b40545099f9818949c24bca.tar.bz2 spark-91f9504e6086fac05b40545099f9818949c24bca.zip |
[SPARK-1981] Add AWS Kinesis streaming support
Author: Chris Fregly <chris@fregly.com>
Closes #1434 from cfregly/master and squashes the following commits:
4774581 [Chris Fregly] updated docs, renamed retry to retryRandom to be more clear, removed retries around store() method
0393795 [Chris Fregly] moved Kinesis examples out of examples/ and back into extras/kinesis-asl
691a6be [Chris Fregly] fixed tests and formatting, fixed a bug with JavaKinesisWordCount during union of streams
0e1c67b [Chris Fregly] Merge remote-tracking branch 'upstream/master'
74e5c7c [Chris Fregly] updated per TD's feedback. simplified examples, updated docs
e33cbeb [Chris Fregly] Merge remote-tracking branch 'upstream/master'
bf614e9 [Chris Fregly] per matei's feedback: moved the kinesis examples into the examples/ dir
d17ca6d [Chris Fregly] per TD's feedback: updated docs, simplified the KinesisUtils api
912640c [Chris Fregly] changed the foundKinesis class to be a publically-avail class
db3eefd [Chris Fregly] Merge remote-tracking branch 'upstream/master'
21de67f [Chris Fregly] Merge remote-tracking branch 'upstream/master'
6c39561 [Chris Fregly] parameterized the versions of the aws java sdk and kinesis client
338997e [Chris Fregly] improve build docs for kinesis
828f8ae [Chris Fregly] more cleanup
e7c8978 [Chris Fregly] Merge remote-tracking branch 'upstream/master'
cd68c0d [Chris Fregly] fixed typos and backward compatibility
d18e680 [Chris Fregly] Merge remote-tracking branch 'upstream/master'
b3b0ff1 [Chris Fregly] [SPARK-1981] Add AWS Kinesis streaming support
Diffstat (limited to 'docs')
-rw-r--r-- | docs/streaming-custom-receivers.md | 4 | ||||
-rw-r--r-- | docs/streaming-kinesis.md | 58 | ||||
-rw-r--r-- | docs/streaming-programming-guide.md | 12 |
3 files changed, 68 insertions, 6 deletions
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index a2dc3a8961..1e045a3dd0 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -4,7 +4,7 @@ title: Spark Streaming Custom Receivers --- Spark Streaming can receive streaming data from any arbitrary data source beyond -the one's for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.). +the one's for which it has in-built support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.). This requires the developer to implement a *receiver* that is customized for receiving data from the concerned data source. This guide walks through the process of implementing a custom receiver and using it in a Spark Streaming application. @@ -174,7 +174,7 @@ val words = lines.flatMap(_.split(" ")) ... {% endhighlight %} -The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala). +The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala). </div> <div data-lang="java" markdown="1"> diff --git a/docs/streaming-kinesis.md b/docs/streaming-kinesis.md new file mode 100644 index 0000000000..801c905c88 --- /dev/null +++ b/docs/streaming-kinesis.md @@ -0,0 +1,58 @@ +--- +layout: global +title: Spark Streaming Kinesis Receiver +--- + +### Kinesis +Build notes: +<li>Spark supports a Kinesis Streaming Receiver which is not included in the default build due to licensing restrictions.</li> +<li>_**Note that by embedding this library you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.</li> +<li>The Spark Kinesis Streaming Receiver source code, examples, tests, and artifacts live in $SPARK_HOME/extras/kinesis-asl.</li> +<li>To build with Kinesis, you must run the maven or sbt builds with -Pkinesis-asl`.</li> +<li>Applications will need to link to the 'spark-streaming-kinesis-asl` artifact.</li> + +Kinesis examples notes: +<li>To build the Kinesis examples, you must run the maven or sbt builds with -Pkinesis-asl`.</li> +<li>These examples automatically determine the number of local threads and KinesisReceivers to spin up based on the number of shards for the stream.</li> +<li>KinesisWordCountProducerASL will generate random data to put onto the Kinesis stream for testing.</li> +<li>Checkpointing is disabled (no checkpoint dir is set). The examples as written will not recover from a driver failure.</li> + +Deployment and runtime notes: +<li>A single KinesisReceiver can process many shards of a stream.</li> +<li>Each shard of a stream is processed by one or more KinesisReceiver's managed by the Kinesis Client Library (KCL) Worker.</li> +<li>You never need more KinesisReceivers than the number of shards in your stream.</li> +<li>You can horizontally scale the receiving by creating more KinesisReceiver/DStreams (up to the number of shards for a given stream)</li> +<li>The Kinesis libraries must be present on all worker nodes, as they will need access to the Kinesis Client Library.</li> +<li>This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence:<br/> + 1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY<br/> + 2) Java System Properties - aws.accessKeyId and aws.secretKey<br/> + 3) Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs<br/> + 4) Instance profile credentials - delivered through the Amazon EC2 metadata service<br/> +</li> +<li>You need to setup a Kinesis stream with 1 or more shards per the following:<br/> + http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html</li> +<li>Valid Kinesis endpoint urls can be found here: Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region</li> +<li>When you first start up the KinesisReceiver, the Kinesis Client Library (KCL) needs ~30s to establish connectivity with the AWS Kinesis service, +retrieve any checkpoint data, and negotiate with other KCL's reading from the same stream.</li> +<li>Be careful when changing the app name. Kinesis maintains a mapping table in DynamoDB based on this app name (http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization). +Changing the app name could lead to Kinesis errors as only 1 logical application can process a stream. In order to start fresh, +it's always best to delete the DynamoDB table that matches your app name. This DynamoDB table lives in us-east-1 regardless of the Kinesis endpoint URL.</li> + +Failure recovery notes: +<li>The combination of Spark Streaming and Kinesis creates 3 different checkpoints as follows:<br/> + 1) RDD data checkpoint (Spark Streaming) - frequency is configurable with DStream.checkpoint(Duration)<br/> + 2) RDD metadata checkpoint (Spark Streaming) - frequency is every DStream batch<br/> + 3) Kinesis checkpointing (Kinesis) - frequency is controlled by the developer calling ICheckpointer.checkpoint() directly<br/> +</li> +<li>Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling</li> +<li>Upon startup, a KinesisReceiver will begin processing records with sequence numbers greater than the last checkpoint sequence number recorded per shard.</li> +<li>If no checkpoint info exists, the worker will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) +or from the tip/latest (InitialPostitionInStream.LATEST). This is configurable.</li> +<li>When pulling from the stream tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts.</li> +<li>InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running.</li> +<li>In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data +depending on the checkpoint frequency.</li> +<li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency.</li> +<li>Record processing should be idempotent when possible.</li> +<li>Failed or latent KinesisReceivers will be detected and automatically shutdown/load-balanced by the KCL.</li> +<li>If possible, explicitly shutdown the worker if a failure occurs in order to trigger the final checkpoint.</li> diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 7b8b793343..9f331ed50d 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -9,7 +9,7 @@ title: Spark Streaming Programming Guide # Overview Spark Streaming is an extension of the core Spark API that allows enables high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources -like Kafka, Flume, Twitter, ZeroMQ or plain old TCP sockets and be processed using complex +like Kafka, Flume, Twitter, ZeroMQ, Kinesis or plain old TCP sockets and be processed using complex algorithms expressed with high-level functions like `map`, `reduce`, `join` and `window`. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark's in-built @@ -38,7 +38,7 @@ stream of results in batches. Spark Streaming provides a high-level abstraction called *discretized stream* or *DStream*, which represents a continuous stream of data. DStreams can be created either from input data -stream from sources such as Kafka and Flume, or by applying high-level +stream from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of [RDDs](api/scala/index.html#org.apache.spark.rdd.RDD). @@ -313,7 +313,7 @@ To write your own Spark Streaming program, you will have to add the following de artifactId = spark-streaming_{{site.SCALA_BINARY_VERSION}} version = {{site.SPARK_VERSION}} -For ingesting data from sources like Kafka and Flume that are not present in the Spark +For ingesting data from sources like Kafka, Flume, and Kinesis that are not present in the Spark Streaming core API, you will have to add the corresponding artifact `spark-streaming-xyz_{{site.SCALA_BINARY_VERSION}}` to the dependencies. For example, @@ -327,6 +327,7 @@ some of the common ones are as follows. <tr><td> Twitter </td><td> spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}} </td></tr> <tr><td> ZeroMQ </td><td> spark-streaming-zeromq_{{site.SCALA_BINARY_VERSION}} </td></tr> <tr><td> MQTT </td><td> spark-streaming-mqtt_{{site.SCALA_BINARY_VERSION}} </td></tr> +<tr><td> Kinesis<br/>(built separately)</td><td> kinesis-asl_{{site.SCALA_BINARY_VERSION}} </td></tr> <tr><td> </td><td></td></tr> </table> @@ -442,7 +443,7 @@ see the API documentations of the relevant functions in Scala and [JavaStreamingContext](api/scala/index.html#org.apache.spark.streaming.api.java.JavaStreamingContext) for Java. -Additional functionality for creating DStreams from sources such as Kafka, Flume, and Twitter +Additional functionality for creating DStreams from sources such as Kafka, Flume, Kinesis, and Twitter can be imported by adding the right dependencies as explained in an [earlier](#linking) section. To take the case of Kafka, after adding the artifact `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` to the @@ -467,6 +468,9 @@ For more details on these additional sources, see the corresponding [API documen Furthermore, you can also implement your own custom receiver for your sources. See the [Custom Receiver Guide](streaming-custom-receivers.html). +### Kinesis +[Kinesis](streaming-kinesis.html) + ## Operations There are two kinds of DStream operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams |