summaryrefslogtreecommitdiff
path: root/site/docs/1.1.0/streaming-kinesis-integration.html
blob: 3dfb3ab9c4d85d6a9e9a7c78d903798382cf2c7e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
<!DOCTYPE html>
<!--[if lt IE 7]>      <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]>         <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]>         <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
    <head>
        <meta charset="utf-8">
        <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
        <title>Spark Streaming + Kinesis Integration - Spark 1.1.0 Documentation</title>
        <meta name="description" content="">

        

        <link rel="stylesheet" href="css/bootstrap.min.css">
        <style>
            body {
                padding-top: 60px;
                padding-bottom: 40px;
            }
        </style>
        <meta name="viewport" content="width=device-width">
        <link rel="stylesheet" href="css/bootstrap-responsive.min.css">
        <link rel="stylesheet" href="css/main.css">

        <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>

        <link rel="stylesheet" href="css/pygments-default.css">

        
        <!-- Google analytics script -->
        <script type="text/javascript">
          var _gaq = _gaq || [];
          _gaq.push(['_setAccount', 'UA-32518208-1']);
          _gaq.push(['_trackPageview']);

          (function() {
            var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
            ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
            var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
          })();
        </script>
        

    </head>
    <body>
        <!--[if lt IE 7]>
            <p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
        <![endif]-->

        <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->

        <div class="navbar navbar-fixed-top" id="topbar">
            <div class="navbar-inner">
                <div class="container">
                    <div class="brand"><a href="index.html">
                      <img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">1.1.0</span>
                    </div>
                    <ul class="nav">
                        <!--TODO(andyk): Add class="active" attribute to li some how.-->
                        <li><a href="index.html">Overview</a></li>

                        <li class="dropdown">
                            <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="quick-start.html">Quick Start</a></li>
                                <li><a href="programming-guide.html">Spark Programming Guide</a></li>
                                <li class="divider"></li>
                                <li><a href="streaming-programming-guide.html">Spark Streaming</a></li>
                                <li><a href="sql-programming-guide.html">Spark SQL</a></li>
                                <li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li>
                                <li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
                                <li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
                            </ul>
                        </li>

                        <li class="dropdown">
                            <a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="api/scala/index.html#org.apache.spark.package">Scaladoc</a></li>
                                <li><a href="api/java/index.html">Javadoc</a></li>
                                <li><a href="api/python/index.html">Python API</a></li>
                            </ul>
                        </li>

                        <li class="dropdown">
                            <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="cluster-overview.html">Overview</a></li>
                                <li><a href="submitting-applications.html">Submitting Applications</a></li>
                                <li class="divider"></li>
                                <li><a href="ec2-scripts.html">Amazon EC2</a></li>
                                <li><a href="spark-standalone.html">Standalone Mode</a></li>
                                <li><a href="running-on-mesos.html">Mesos</a></li>
                                <li><a href="running-on-yarn.html">YARN</a></li>
                            </ul>
                        </li>

                        <li class="dropdown">
                            <a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="configuration.html">Configuration</a></li>
                                <li><a href="monitoring.html">Monitoring</a></li>
                                <li><a href="tuning.html">Tuning Guide</a></li>
                                <li><a href="job-scheduling.html">Job Scheduling</a></li>
                                <li><a href="security.html">Security</a></li>
                                <li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
                                <li><a href="hadoop-third-party-distributions.html">3<sup>rd</sup>-Party Hadoop Distros</a></li>
                                <li class="divider"></li>
                                <li><a href="building-with-maven.html">Building Spark with Maven</a></li>
                                <li><a href="https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark">Contributing to Spark</a></li>
                            </ul>
                        </li>
                    </ul>
                    <!--<p class="navbar-text pull-right"><span class="version-text">v1.1.0</span></p>-->
                </div>
            </div>
        </div>

        <div class="container" id="content">
          
            <h1 class="title">Spark Streaming + Kinesis Integration</h1>
          

          <p><a href="http://aws.amazon.com/kinesis/">Amazon Kinesis</a> is a fully managed service for real-time processing of streaming data at massive scale.
The Kinesis receiver creates an input DStream using the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL).
The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concepts of Workers, Checkpoints, and Shard Leases.
Here we explain how to configure Spark Streaming to receive data from Kinesis.</p>

<h4 id="configuring-kinesis">Configuring Kinesis</h4>

<p>A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or more shards per the following
<a href="http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html">guide</a>.</p>

<h4 id="configuring-spark-streaming-application">Configuring Spark Streaming Application</h4>

<ol>
  <li>
    <p><strong>Linking:</strong> In your SBT/Maven project definition, link your streaming application against the following artifact (see <a href="streaming-programming-guide.html#linking">Linking section</a> in the main programming guide for further information).</p>

    <pre><code> groupId = org.apache.spark
 artifactId = spark-streaming-kinesis-asl_2.10
 version = 1.1.0
</code></pre>

    <p><strong>Note that by linking to this library, you will include <a href="https://aws.amazon.com/asl/">ASL</a>-licensed code in your application.</strong></p>
  </li>
  <li>
    <p><strong>Programming:</strong> In the streaming application code, import <code>KinesisUtils</code> and create the input DStream as follows:</p>

    <div class="codetabs">
 <div data-lang="scala">
        <pre><code> import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.kinesis._
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

 val kinesisStream = KinesisUtils.createStream(
 	streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position])
</code></pre>

        <p>See the <a href="api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$">API docs</a>
 and the <a href="https://github.com/apache/spark/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala">example</a>. Refer to the Running the Example section for instructions on how to run the example.</p>

      </div>
 <div data-lang="java">
        <pre><code> import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.kinesis.*;
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;

 JavaReceiverInputDStream&lt;byte[]&gt; kinesisStream = KinesisUtils.createStream(
 	streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]);
</code></pre>

        <p>See the <a href="api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html">API docs</a>
 and the <a href="https://github.com/apache/spark/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java">example</a>. Refer to the next subsection for instructions to run the example.</p>

      </div>
 </div>

    <ul>
      <li>
        <p><code>streamingContext</code>: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream</p>
      </li>
      <li><code>[Kinesis stream name]</code>: The Kinesis stream that this streaming application receives from
        <ul>
          <li>The application name used in the streaming context becomes the Kinesis application name</li>
          <li>The application name must be unique for a given account and region.</li>
          <li>The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table (always in the us-east-1 region) created during Kinesis Client Library initialization. </li>
          <li>Changing the application name or stream name can lead to Kinesis errors in some cases.  If you see errors, you may need to manually delete the DynamoDB table.</li>
        </ul>
      </li>
      <li>
        <p><code>[endpoint URL]</code>: Valid Kinesis endpoints URL can be found <a href="http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region">here</a>.</p>
      </li>
      <li>
        <p><code>[checkpoint interval]</code>: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream.  For starters, set it to the same as the batch interval of the streaming application.</p>
      </li>
      <li><code>[initial position]</code>: Can be either <code>InitialPositionInStream.TRIM_HORIZON</code> or <code>InitialPositionInStream.LATEST</code> (see Kinesis Checkpointing section and Amazon Kinesis API documentation for more details).</li>
    </ul>
  </li>
  <li>
    <p><strong>Deploying:</strong> Package <code>spark-streaming-kinesis-asl_2.10</code> and its dependencies (except <code>spark-core_2.10</code> and <code>spark-streaming_2.10</code> which are provided by <code>spark-submit</code>) into the application JAR. Then use <code>spark-submit</code> to launch your application (see <a href="streaming-programming-guide.html#deploying-applications">Deploying section</a> in the main programming guide).</p>

    <p><em>Points to remember at runtime:</em></p>

    <ul>
      <li>
        <p>Kinesis data processing is ordered per partition and occurs at-least once per message.</p>
      </li>
      <li>
        <p>Multiple applications can read from the same Kinesis stream.  Kinesis will maintain the application-specific shard and checkpoint info in DynamodDB.</p>
      </li>
      <li>
        <p>A single Kinesis stream shard is processed by one input DStream at a time.</p>
      </li>
    </ul>

    <p style="text-align: center;">
  		<img src="img/streaming-kinesis-arch.png" title="Spark Streaming Kinesis Architecture" alt="Spark Streaming Kinesis Architecture" width="60%" />
   	<!-- Images are downsized intentionally to improve quality on retina displays -->
 </p>

    <ul>
      <li>
        <p>A single Kinesis input DStream can read from multiple shards of a Kinesis stream by creating multiple KinesisRecordProcessor threads.</p>
      </li>
      <li>
        <p>Multiple input DStreams running in separate processes/instances can read from a Kinesis stream.</p>
      </li>
      <li>
        <p>You never need more Kinesis input DStreams than the number of Kinesis stream shards as each input DStream will create at least one KinesisRecordProcessor thread that handles a single shard.</p>
      </li>
      <li>
        <p>Horizontal scaling is achieved by adding/removing  Kinesis input DStreams (within a single process or across multiple processes/instances) - up to the total number of Kinesis stream shards per the previous point.</p>
      </li>
      <li>
        <p>The Kinesis input DStream will balance the load between all DStreams - even across processes/instances.</p>
      </li>
      <li>
        <p>The Kinesis input DStream will balance the load during re-shard events (merging and splitting) due to changes in load.</p>
      </li>
      <li>
        <p>As a best practice, it&#8217;s recommended that you avoid re-shard jitter by over-provisioning when possible.</p>
      </li>
      <li>
        <p>Each Kinesis input DStream maintains its own checkpoint info.  See the Kinesis Checkpointing section for more details.</p>
      </li>
      <li>
        <p>There is no correlation between the number of Kinesis stream shards and the number of RDD partitions/shards created across the Spark cluster during input DStream processing.  These are 2 independent partitioning schemes.</p>
      </li>
    </ul>
  </li>
</ol>

<h4 id="running-the-example">Running the Example</h4>
<p>To run the example,</p>

<ul>
  <li>
    <p>Download Spark source and follow the <a href="building-with-maven.html">instructions</a> to build Spark with profile <em>-Pkinesis-asl</em>.</p>

    <pre><code>  mvn -Pkinesis-asl -DskipTests clean package
</code></pre>
  </li>
  <li>
    <p>Set up Kinesis stream (see earlier section) within AWS. Note the name of the Kinesis stream and the endpoint URL corresponding to the region where the stream was created.</p>
  </li>
  <li>
    <p>Set up the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_KEY with your AWS credentials.</p>
  </li>
  <li>
    <p>In the Spark root directory, run the example as</p>

    <div class="codetabs">
  <div data-lang="scala">

  	bin/run-example streaming.KinesisWordCountASL [Kinesis stream name] [endpoint URL]

      </div>
  <div data-lang="java">

        <pre><code>  bin/run-example streaming.JavaKinesisWordCountASL [Kinesis stream name] [endpoint URL]
</code></pre>

      </div>
  </div>

    <p>This will wait for data to be received from the Kinesis stream.</p>
  </li>
  <li>
    <p>To generate random string data to put onto the Kinesis stream, in another terminal, run the associated Kinesis data producer.</p>

    <pre><code>  bin/run-example streaming.KinesisWordCountProducerASL [Kinesis stream name] [endpoint URL] 1000 10
</code></pre>

    <p>This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream.  This data should then be received and processed by the running example.</p>
  </li>
</ul>

<h4 id="kinesis-checkpointing">Kinesis Checkpointing</h4>
<ul>
  <li>
    <p>Each Kinesis input DStream periodically stores the current position of the stream in the backing DynamoDB table.  This allows the system to recover from failures and continue processing where the DStream left off.</p>
  </li>
  <li>
    <p>Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling.  The provided example handles this throttling with a random-backoff-retry strategy.</p>
  </li>
  <li>If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or from the latest tip (InitialPostitionInStream.LATEST).  This is configurable.</li>
  <li>InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored). </li>
  <li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.</li>
</ul>


        </div> <!-- /container -->

        <script src="js/vendor/jquery-1.8.0.min.js"></script>
        <script src="js/vendor/bootstrap.min.js"></script>
        <script src="js/main.js"></script>

        <!-- MathJax Section -->
        <script type="text/x-mathjax-config">
            MathJax.Hub.Config({
                TeX: { equationNumbers: { autoNumber: "AMS" } }
            });
        </script>
        <script>
            // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
            // We could use "//cdn.mathjax...", but that won't support "file://".
            (function(d, script) {
                script = d.createElement('script');
                script.type = 'text/javascript';
                script.async = true;
                script.onload = function(){
                    MathJax.Hub.Config({
                        tex2jax: {
                            inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
                            displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], 
                            processEscapes: true,
                            skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
                        }
                    });
                };
                script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
                    'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML';
                d.getElementsByTagName('head')[0].appendChild(script);
            }(document));
        </script>
    </body>
</html>