summaryrefslogtreecommitdiff
path: root/site/docs/1.3.1/streaming-flume-integration.html
blob: 6670052ecae38991300604923b97def82ea770fd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
<!DOCTYPE html>
<!--[if lt IE 7]>      <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]>         <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]>         <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
    <head>
        <meta charset="utf-8">
        <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
        <title>Spark Streaming + Flume Integration Guide - Spark 1.3.1 Documentation</title>
        

        

        <link rel="stylesheet" href="css/bootstrap.min.css">
        <style>
            body {
                padding-top: 60px;
                padding-bottom: 40px;
            }
        </style>
        <meta name="viewport" content="width=device-width">
        <link rel="stylesheet" href="css/bootstrap-responsive.min.css">
        <link rel="stylesheet" href="css/main.css">

        <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>

        <link rel="stylesheet" href="css/pygments-default.css">

        
        <!-- Google analytics script -->
        <script type="text/javascript">
          var _gaq = _gaq || [];
          _gaq.push(['_setAccount', 'UA-32518208-2']);
          _gaq.push(['_trackPageview']);

          (function() {
            var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
            ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
            var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
          })();
        </script>
        

    </head>
    <body>
        <!--[if lt IE 7]>
            <p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
        <![endif]-->

        <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->

        <div class="navbar navbar-fixed-top" id="topbar">
            <div class="navbar-inner">
                <div class="container">
                    <div class="brand"><a href="index.html">
                      <img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">1.3.1</span>
                    </div>
                    <ul class="nav">
                        <!--TODO(andyk): Add class="active" attribute to li some how.-->
                        <li><a href="index.html">Overview</a></li>

                        <li class="dropdown">
                            <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="quick-start.html">Quick Start</a></li>
                                <li><a href="programming-guide.html">Spark Programming Guide</a></li>
                                <li class="divider"></li>
                                <li><a href="streaming-programming-guide.html">Spark Streaming</a></li>
                                <li><a href="sql-programming-guide.html">DataFrames and SQL</a></li>
                                <li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li>
                                <li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
                                <li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
                            </ul>
                        </li>

                        <li class="dropdown">
                            <a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="api/scala/index.html#org.apache.spark.package">Scala</a></li>
                                <li><a href="api/java/index.html">Java</a></li>
                                <li><a href="api/python/index.html">Python</a></li>
                            </ul>
                        </li>

                        <li class="dropdown">
                            <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="cluster-overview.html">Overview</a></li>
                                <li><a href="submitting-applications.html">Submitting Applications</a></li>
                                <li class="divider"></li>
                                <li><a href="spark-standalone.html">Spark Standalone</a></li>
                                <li><a href="running-on-mesos.html">Mesos</a></li>
                                <li><a href="running-on-yarn.html">YARN</a></li>
                                <li class="divider"></li>
                                <li><a href="ec2-scripts.html">Amazon EC2</a></li>
                            </ul>
                        </li>

                        <li class="dropdown">
                            <a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
                            <ul class="dropdown-menu">
                                <li><a href="configuration.html">Configuration</a></li>
                                <li><a href="monitoring.html">Monitoring</a></li>
                                <li><a href="tuning.html">Tuning Guide</a></li>
                                <li><a href="job-scheduling.html">Job Scheduling</a></li>
                                <li><a href="security.html">Security</a></li>
                                <li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
                                <li><a href="hadoop-third-party-distributions.html">3<sup>rd</sup>-Party Hadoop Distros</a></li>
                                <li class="divider"></li>
                                <li><a href="building-spark.html">Building Spark</a></li>
                                <li><a href="https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark">Contributing to Spark</a></li>
                                <li><a href="https://cwiki.apache.org/confluence/display/SPARK/Supplemental+Spark+Projects">Supplemental Projects</a></li>
                            </ul>
                        </li>
                    </ul>
                    <!--<p class="navbar-text pull-right"><span class="version-text">v1.3.1</span></p>-->
                </div>
            </div>
        </div>

        <div class="container" id="content">
          
            <h1 class="title">Spark Streaming + Flume Integration Guide</h1>
          

          <p><a href="https://flume.apache.org/">Apache Flume</a> is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this.</p>

<p><span class="badge" style="background-color: grey">Python API</span> Flume is not yet available in the Python API.</p>

<h2 id="approach-1-flume-style-push-based-approach">Approach 1: Flume-style Push-based Approach</h2>
<p>Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps.</p>

<h4 id="general-requirements">General Requirements</h4>
<p>Choose a machine in your cluster such that</p>

<ul>
  <li>
    <p>When your Flume + Spark Streaming application is launched, one of the Spark workers must run on that machine.</p>
  </li>
  <li>
    <p>Flume can be configured to push data to a port on that machine.</p>
  </li>
</ul>

<p>Due to the push model, the streaming application needs to be up, with the receiver scheduled and listening on the chosen port, for Flume to be able push data.</p>

<h4 id="configuring-flume">Configuring Flume</h4>
<p>Configure Flume agent to send data to an Avro sink by having the following in the configuration file.</p>

<pre><code>agent.sinks = avroSink
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.hostname = &lt;chosen machine's hostname&gt;
agent.sinks.avroSink.port = &lt;chosen port on the machine&gt;
</code></pre>

<p>See the <a href="https://flume.apache.org/documentation.html">Flume&#8217;s documentation</a> for more information about
configuring Flume agents.</p>

<h4 id="configuring-spark-streaming-application">Configuring Spark Streaming Application</h4>
<ol>
  <li>
    <p><strong>Linking:</strong> In your SBT/Maven projrect definition, link your streaming application against the following artifact (see <a href="streaming-programming-guide.html#linking">Linking section</a> in the main programming guide for further information).</p>

    <pre><code> groupId = org.apache.spark
 artifactId = spark-streaming-flume_2.10
 version = 1.3.1
</code></pre>
  </li>
  <li>
    <p><strong>Programming:</strong> In the streaming application code, import <code>FlumeUtils</code> and create input DStream as follows.</p>

    <div class="codetabs">
 <div data-lang="scala">
        <pre><code> import org.apache.spark.streaming.flume._

 val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
</code></pre>

        <p>See the <a href="api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$">API docs</a>
 and the <a href="https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala">example</a>.</p>
      </div>
 <div data-lang="java">
        <pre><code> import org.apache.spark.streaming.flume.*;

 JavaReceiverInputDStream&lt;SparkFlumeEvent&gt; flumeStream =
 	FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]);
</code></pre>

        <p>See the <a href="api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html">API docs</a>
 and the <a href="https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java">example</a>.</p>
      </div>
 </div>

    <p>Note that the hostname should be the same as the one used by the resource manager in the
 cluster (Mesos, YARN or Spark Standalone), so that resource allocation can match the names and launch
 the receiver in the right machine.</p>
  </li>
  <li>
    <p><strong>Deploying:</strong> Package <code>spark-streaming-flume_2.10</code> and its dependencies (except <code>spark-core_2.10</code> and <code>spark-streaming_2.10</code> which are provided by <code>spark-submit</code>) into the application JAR. Then use <code>spark-submit</code> to launch your application (see <a href="streaming-programming-guide.html#deploying-applications">Deploying section</a> in the main programming guide).</p>
  </li>
</ol>

<h2 id="approach-2-pull-based-approach-using-a-custom-sink">Approach 2: Pull-based Approach using a Custom Sink</h2>
<p>Instead of Flume pushing data directly to Spark Streaming, this approach runs a custom Flume sink that allows the following.</p>

<ul>
  <li>Flume pushes data into the sink, and the data stays buffered.</li>
  <li>Spark Streaming uses a <a href="streaming-programming-guide.html#receiver-reliability">reliable Flume receiver</a>
and transactions to pull data from the sink. Transactions succeed only after data is received and
replicated by Spark Streaming.</li>
</ul>

<p>This ensures stronger reliability and
<a href="streaming-programming-guide.html#fault-tolerance-semantics">fault-tolerance guarantees</a>
than the previous approach. However, this requires configuring Flume to run a custom sink.
Here are the configuration steps.</p>

<h4 id="general-requirements-1">General Requirements</h4>
<p>Choose a machine that will run the custom sink in a Flume agent. The rest of the Flume pipeline is configured to send data to that agent. Machines in the Spark cluster should have access to the chosen machine running the custom sink.</p>

<h4 id="configuring-flume-1">Configuring Flume</h4>
<p>Configuring Flume on the chosen machine requires the following two steps.</p>

<ol>
  <li>
    <p><strong>Sink JARs</strong>: Add the following JARs to Flume&#8217;s classpath (see <a href="https://flume.apache.org/documentation.html">Flume&#8217;s documentation</a> to see how) in the machine designated to run the custom sink .</p>

    <p>(i) <em>Custom sink JAR</em>: Download the JAR corresponding to the following artifact (or <a href="http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-flume-sink_2.10/1.3.1/spark-streaming-flume-sink_2.10-1.3.1.jar">direct link</a>).</p>

    <pre><code> groupId = org.apache.spark
 artifactId = spark-streaming-flume-sink_2.10
 version = 1.3.1
</code></pre>

    <p>(ii) <em>Scala library JAR</em>: Download the Scala library JAR for Scala 2.10.4. It can be found with the following artifact detail (or, <a href="http://search.maven.org/remotecontent?filepath=org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar">direct link</a>).</p>

    <pre><code> groupId = org.scala-lang
 artifactId = scala-library
 version = 2.10.4
</code></pre>
  </li>
  <li>
    <p><strong>Configuration file</strong>: On that machine, configure Flume agent to send data to an Avro sink by having the following in the configuration file.</p>

    <pre><code> agent.sinks = spark
 agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
 agent.sinks.spark.hostname = &lt;hostname of the local machine&gt;
 agent.sinks.spark.port = &lt;port to listen on for connection from Spark&gt;
 agent.sinks.spark.channel = memoryChannel
</code></pre>

    <p>Also make sure that the upstream Flume pipeline is configured to send the data to the Flume agent running this sink.</p>
  </li>
</ol>

<p>See the <a href="https://flume.apache.org/documentation.html">Flume&#8217;s documentation</a> for more information about
configuring Flume agents.</p>

<h4 id="configuring-spark-streaming-application-1">Configuring Spark Streaming Application</h4>
<ol>
  <li>
    <p><strong>Linking:</strong> In your SBT/Maven project definition, link your streaming application against the <code>spark-streaming-flume_2.10</code> (see <a href="streaming-programming-guide.html#linking">Linking section</a> in the main programming guide).</p>
  </li>
  <li>
    <p><strong>Programming:</strong> In the streaming application code, import <code>FlumeUtils</code> and create input DStream as follows.</p>

    <div class="codetabs">
 <div data-lang="scala">
        <pre><code> import org.apache.spark.streaming.flume._

 val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])
</code></pre>
      </div>
 <div data-lang="java">
        <pre><code> import org.apache.spark.streaming.flume.*;

 JavaReceiverInputDStream&lt;SparkFlumeEvent&gt;flumeStream =
     FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port]);
</code></pre>
      </div>
 </div>

    <p>See the Scala example <a href="https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala">FlumePollingEventCount</a>.</p>

    <p>Note that each input DStream can be configured to receive data from multiple sinks.</p>
  </li>
  <li>
    <p><strong>Deploying:</strong> Package <code>spark-streaming-flume_2.10</code> and its dependencies (except <code>spark-core_2.10</code> and <code>spark-streaming_2.10</code> which are provided by <code>spark-submit</code>) into the application JAR. Then use <code>spark-submit</code> to launch your application (see <a href="streaming-programming-guide.html#deploying-applications">Deploying section</a> in the main programming guide).</p>
  </li>
</ol>



        </div> <!-- /container -->

        <script src="js/vendor/jquery-1.8.0.min.js"></script>
        <script src="js/vendor/bootstrap.min.js"></script>
        <script src="js/main.js"></script>

        <!-- MathJax Section -->
        <script type="text/x-mathjax-config">
            MathJax.Hub.Config({
                TeX: { equationNumbers: { autoNumber: "AMS" } }
            });
        </script>
        <script>
            // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
            // We could use "//cdn.mathjax...", but that won't support "file://".
            (function(d, script) {
                script = d.createElement('script');
                script.type = 'text/javascript';
                script.async = true;
                script.onload = function(){
                    MathJax.Hub.Config({
                        tex2jax: {
                            inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
                            displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
                            processEscapes: true,
                            skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
                        }
                    });
                };
                script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
                    'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML';
                d.getElementsByTagName('head')[0].appendChild(script);
            }(document));
        </script>
    </body>
</html>