aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-kinesis-integration.md
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-06-12 15:22:59 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-06-12 15:22:59 -0700
commite9471d3414d327c7d0853e18f1844ab1bd09c8ed (patch)
treec71dabecf459ff18b1c974c40f6b8f2f8924a9ce /docs/streaming-kinesis-integration.md
parent88604051511c788d7abb41a49e3eb3a8330c09a9 (diff)
downloadspark-e9471d3414d327c7d0853e18f1844ab1bd09c8ed.tar.gz
spark-e9471d3414d327c7d0853e18f1844ab1bd09c8ed.tar.bz2
spark-e9471d3414d327c7d0853e18f1844ab1bd09c8ed.zip
[SPARK-7284] [STREAMING] Updated streaming documentation
- Kinesis API updated - Kafka version updated, and Python API for Direct Kafka added - Added SQLContext.getOrCreate() - Added information on how to get partitionId in foreachRDD Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6781 from tdas/SPARK-7284 and squashes the following commits: aac7be0 [Tathagata Das] Added information on how to get partition id a66ec22 [Tathagata Das] Complete the line incomplete line, a92ca39 [Tathagata Das] Updated streaming documentation
Diffstat (limited to 'docs/streaming-kinesis-integration.md')
-rw-r--r--docs/streaming-kinesis-integration.md24
1 files changed, 15 insertions, 9 deletions
diff --git a/docs/streaming-kinesis-integration.md b/docs/streaming-kinesis-integration.md
index 379eb513d5..aa9749afbc 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -32,7 +32,8 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
val kinesisStream = KinesisUtils.createStream(
- streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position])
+ streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
+ [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the Running the Example section for instructions on how to run the example.
@@ -44,7 +45,8 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
- streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]);
+ streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
+ [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2);
See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the next subsection for instructions to run the example.
@@ -54,19 +56,23 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
- `streamingContext`: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream
- - `[Kinesis stream name]`: The Kinesis stream that this streaming application receives from
- - The application name used in the streaming context becomes the Kinesis application name
+ - `[Kineiss app name]`: The application name that will be used to checkpoint the Kinesis
+ sequence numbers in DynamoDB table.
- The application name must be unique for a given account and region.
- - The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table (always in the us-east-1 region) created during Kinesis Client Library initialization.
- - Changing the application name or stream name can lead to Kinesis errors in some cases. If you see errors, you may need to manually delete the DynamoDB table.
+ - If the table exists but has incorrect checkpoint information (for a different stream, or
+ old expired sequenced numbers), then there may be temporary errors.
+ - `[Kinesis stream name]`: The Kinesis stream that this streaming application will pull data from.
- `[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).
+ - `[region name]`: Valid Kinesis region names can be found [here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html).
+
- `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.
- `[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see Kinesis Checkpointing section and Amazon Kinesis API documentation for more details).
+ In other versions of the API, you can also specify the AWS access key and secret key directly.
3. **Deploying:** Package `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
@@ -122,12 +128,12 @@ To run the example,
<div class="codetabs">
<div data-lang="scala" markdown="1">
- bin/run-example streaming.KinesisWordCountASL [Kinesis stream name] [endpoint URL]
+ bin/run-example streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
</div>
<div data-lang="java" markdown="1">
- bin/run-example streaming.JavaKinesisWordCountASL [Kinesis stream name] [endpoint URL]
+ bin/run-example streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
</div>
</div>
@@ -136,7 +142,7 @@ To run the example,
- To generate random string data to put onto the Kinesis stream, in another terminal, run the associated Kinesis data producer.
- bin/run-example streaming.KinesisWordCountProducerASL [Kinesis stream name] [endpoint URL] 1000 10
+ bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10
This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example.