summaryrefslogblamecommitdiff
path: root/site/docs/1.2.2/streaming-kafka-integration.html
blob: 7616596b77e414855e9d5c07a45efb94a3274349 (plain) (tree)







































































































































































































































                                                                                                                                                                                                                                                                                                                                                                                                                                                            
<!DOCTYPE html>
<!--[if lt IE 7]>      <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]>         <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]>         <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
    <head>
        <meta charset="utf-8">
        <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
        <title>Spark Streaming + Kafka Integration Guide - Spark 1.2.2 Documentation</title>
        <meta name="description" content="">

        

        <link rel="stylesheet" href="css/bootstrap.min.css">
        <style>
            body {
                padding-top: 60px;
                padding-bottom: 40px;
            }
        </style>
        <meta name="viewport" content="width=device-width">
        <link rel="stylesheet" href="css/bootstrap-responsive.min.css">
        <link rel="stylesheet" href="css/main.css">

        <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>

        <link rel="stylesheet" href="css/pygments-default.css">

        
        <!-- Google analytics script -->
        <script type="text/javascript">
          var _gaq = _gaq || [];
          _gaq.push(['_setAccount', 'UA-32518208-2']);
          _gaq.push(['_trackPageview']);

          (function() {
            var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
            ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
            var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
          })();
        </script>
        

    </head>
    <body>
        <!--[if lt IE 7]>
            <p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
        <![endif]-->

        <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->

        <div class="navbar navbar-fixed-top" id="topbar">
            <div class="navbar-inner">
                <div class="container">
                    <div class="brand"><a href="index.html">
                      <img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">1.2.2</span>
                    </div>
                    <ul class="nav">
                        <!--TODO(andyk): Add class="active" attribute to li some how.-->
                        <li><a href="index.html">Overview</a></li>

                        <li class="dropdown">
                            <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="quick-start.html">Quick Start</a></li>
                                <li><a href="programming-guide.html">Spark Programming Guide</a></li>
                                <li class="divider"></li>
                                <li><a href="streaming-programming-guide.html">Spark Streaming</a></li>
                                <li><a href="sql-programming-guide.html">Spark SQL</a></li>
                                <li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li>
                                <li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
                                <li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
                            </ul>
                        </li>

                        <li class="dropdown">
                            <a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="api/scala/index.html#org.apache.spark.package">Scala</a></li>
                                <li><a href="api/java/index.html">Java</a></li>
                                <li><a href="api/python/index.html">Python</a></li>
                            </ul>
                        </li>

                        <li class="dropdown">
                            <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="cluster-overview.html">Overview</a></li>
                                <li><a href="submitting-applications.html">Submitting Applications</a></li>
                                <li class="divider"></li>
                                <li><a href="spark-standalone.html">Spark Standalone</a></li>
                                <li><a href="running-on-mesos.html">Mesos</a></li>
                                <li><a href="running-on-yarn.html">YARN</a></li>
                                <li class="divider"></li>
                                <li><a href="ec2-scripts.html">Amazon EC2</a></li>
                            </ul>
                        </li>

                        <li class="dropdown">
                            <a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="configuration.html">Configuration</a></li>
                                <li><a href="monitoring.html">Monitoring</a></li>
                                <li><a href="tuning.html">Tuning Guide</a></li>
                                <li><a href="job-scheduling.html">Job Scheduling</a></li>
                                <li><a href="security.html">Security</a></li>
                                <li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
                                <li><a href="hadoop-third-party-distributions.html">3<sup>rd</sup>-Party Hadoop Distros</a></li>
                                <li class="divider"></li>
                                <li><a href="building-spark.html">Building Spark</a></li>
                                <li><a href="https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark">Contributing to Spark</a></li>
                                <li><a href="https://cwiki.apache.org/confluence/display/SPARK/Supplemental+Spark+Projects">Supplemental Projects</a></li>
                            </ul>
                        </li>
                    </ul>
                    <!--<p class="navbar-text pull-right"><span class="version-text">v1.2.2</span></p>-->
                </div>
            </div>
        </div>

        <div class="container" id="content">
          
            <h1 class="title">Spark Streaming + Kafka Integration Guide</h1>
          

          <p><a href="http://kafka.apache.org/">Apache Kafka</a> is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service.  Here we explain how to configure Spark Streaming to receive data from Kafka.</p>

<ol>
  <li>
    <p><strong>Linking:</strong> In your SBT/Maven project definition, link your streaming application against the following artifact (see <a href="streaming-programming-guide.html#linking">Linking section</a> in the main programming guide for further information).</p>

    <pre><code> groupId = org.apache.spark
 artifactId = spark-streaming-kafka_2.10
 version = 1.2.2
</code></pre>
  </li>
  <li>
    <p><strong>Programming:</strong> In the streaming application code, import <code>KafkaUtils</code> and create input DStream as follows.</p>

    <div class="codetabs">
 <div data-lang="scala">
        <pre><code> import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(
 	streamingContext, [zookeeperQuorum], [group id of the consumer], [per-topic number of Kafka partitions to consume])
</code></pre>

        <p>See the <a href="api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$">API docs</a>
 and the <a href="https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala">example</a>.</p>
      </div>
 <div data-lang="java">
        <pre><code> import org.apache.spark.streaming.kafka.*;

 JavaPairReceiverInputDStream&lt;String, String&gt; kafkaStream = KafkaUtils.createStream(
 	streamingContext, [zookeeperQuorum], [group id of the consumer], [per-topic number of Kafka partitions to consume]);
</code></pre>

        <p>See the <a href="api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html">API docs</a>
 and the <a href="https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java">example</a>.</p>
      </div>
 </div>

    <p><em>Points to remember:</em></p>

    <ul>
      <li>
        <p>Topic partitions in Kafka does not correlate to partitions of RDDs generated in Spark Streaming. So increasing the number of topic-specific partitions in the <code>KafkaUtils.createStream()</code> only increases the number of threads using which topics that are consumed within a single receiver. It does not increase the parallelism of Spark in processing the data. Refer to the main document for more information on that.</p>
      </li>
      <li>
        <p>Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers.</p>
      </li>
    </ul>
  </li>
  <li>
    <p><strong>Deploying:</strong> Package <code>spark-streaming-kafka_2.10</code> and its dependencies (except <code>spark-core_2.10</code> and <code>spark-streaming_2.10</code> which are provided by <code>spark-submit</code>) into the application JAR. Then use <code>spark-submit</code> to launch your application (see <a href="streaming-programming-guide.html#deploying-applications">Deploying section</a> in the main programming guide).</p>
  </li>
</ol>

<p>Note that the Kafka receiver used by default is an
<a href="streaming-programming-guide.html#receiver-reliability"><em>unreliable</em> receiver</a> section in the
programming guide). In Spark 1.2, we have added an experimental <em>reliable</em> Kafka receiver that
provides stronger
<a href="streaming-programming-guide.html#fault-tolerance-semantics">fault-tolerance guarantees</a> of zero
data loss on failures. This receiver is automatically used when the write ahead log
(also introduced in Spark 1.2) is enabled
(see <a href="#deploying-applications.html">Deployment</a> section in the programming guide). This
may reduce the receiving throughput of individual Kafka receivers compared to the unreliable
receivers, but this can be corrected by running
<a href="streaming-programming-guide.html#level-of-parallelism-in-data-receiving">more receivers in parallel</a>
to increase aggregate throughput. Additionally, it is recommended that the replication of the
received data within Spark be disabled when the write ahead log is enabled as the log is already stored
in a replicated storage system. This can be done by setting the storage level for the input
stream to <code>StorageLevel.MEMORY_AND_DISK_SER</code> (that is, use
<code>KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)</code>).</p>


        </div> <!-- /container -->

        <script src="js/vendor/jquery-1.8.0.min.js"></script>
        <script src="js/vendor/bootstrap.min.js"></script>
        <script src="js/main.js"></script>

        <!-- MathJax Section -->
        <script type="text/x-mathjax-config">
            MathJax.Hub.Config({
                TeX: { equationNumbers: { autoNumber: "AMS" } }
            });
        </script>
        <script>
            // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
            // We could use "//cdn.mathjax...", but that won't support "file://".
            (function(d, script) {
                script = d.createElement('script');
                script.type = 'text/javascript';
                script.async = true;
                script.onload = function(){
                    MathJax.Hub.Config({
                        tex2jax: {
                            inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
                            displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
                            processEscapes: true,
                            skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
                        }
                    });
                };
                script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
                    'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML';
                d.getElementsByTagName('head')[0].appendChild(script);
            }(document));
        </script>
    </body>
</html>