summaryrefslogblamecommitdiff
path: root/site/docs/1.5.0/api/python/_modules/pyspark/streaming/flume.html
blob: 9248099abf3f4197b36c964a14057edc6e3cc4a9 (plain) (tree)







































































































































































































































                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
  "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">


<html xmlns="http://www.w3.org/1999/xhtml">
  <head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    
    <title>pyspark.streaming.flume &mdash; PySpark master documentation</title>
    
    <link rel="stylesheet" href="../../../_static/nature.css" type="text/css" />
    <link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" />
    
    <script type="text/javascript">
      var DOCUMENTATION_OPTIONS = {
        URL_ROOT:    '../../../',
        VERSION:     'master',
        COLLAPSE_INDEX: false,
        FILE_SUFFIX: '.html',
        HAS_SOURCE:  true
      };
    </script>
    <script type="text/javascript" src="../../../_static/jquery.js"></script>
    <script type="text/javascript" src="../../../_static/underscore.js"></script>
    <script type="text/javascript" src="../../../_static/doctools.js"></script>
    <link rel="top" title="PySpark master documentation" href="../../../index.html" />
    <link rel="up" title="Module code" href="../../index.html" /> 
  </head>
  <body role="document">
    <div class="related" role="navigation" aria-label="related navigation">
      <h3>Navigation</h3>
      <ul>
        <li class="nav-item nav-item-0"><a href="../../../index.html">PySpark master documentation</a> &raquo;</li>
          <li class="nav-item nav-item-1"><a href="../../index.html" accesskey="U">Module code</a> &raquo;</li> 
      </ul>
    </div>  

    <div class="document">
      <div class="documentwrapper">
        <div class="bodywrapper">
          <div class="body" role="main">
            
  <h1>Source code for pyspark.streaming.flume</h1><div class="highlight"><pre>
<span class="c">#</span>
<span class="c"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c"># contributor license agreements.  See the NOTICE file distributed with</span>
<span class="c"># this work for additional information regarding copyright ownership.</span>
<span class="c"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c"># the License.  You may obtain a copy of the License at</span>
<span class="c">#</span>
<span class="c">#    http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c">#</span>
<span class="c"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c"># See the License for the specific language governing permissions and</span>
<span class="c"># limitations under the License.</span>
<span class="c">#</span>

<span class="kn">import</span> <span class="nn">sys</span>
<span class="k">if</span> <span class="n">sys</span><span class="o">.</span><span class="n">version</span> <span class="o">&gt;=</span> <span class="s">&quot;3&quot;</span><span class="p">:</span>
    <span class="kn">from</span> <span class="nn">io</span> <span class="kn">import</span> <span class="n">BytesIO</span>
<span class="k">else</span><span class="p">:</span>
    <span class="kn">from</span> <span class="nn">StringIO</span> <span class="kn">import</span> <span class="n">StringIO</span>
<span class="kn">from</span> <span class="nn">py4j.java_gateway</span> <span class="kn">import</span> <span class="n">Py4JJavaError</span>

<span class="kn">from</span> <span class="nn">pyspark.storagelevel</span> <span class="kn">import</span> <span class="n">StorageLevel</span>
<span class="kn">from</span> <span class="nn">pyspark.serializers</span> <span class="kn">import</span> <span class="n">PairDeserializer</span><span class="p">,</span> <span class="n">NoOpSerializer</span><span class="p">,</span> <span class="n">UTF8Deserializer</span><span class="p">,</span> <span class="n">read_int</span>
<span class="kn">from</span> <span class="nn">pyspark.streaming</span> <span class="kn">import</span> <span class="n">DStream</span>

<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s">&#39;FlumeUtils&#39;</span><span class="p">,</span> <span class="s">&#39;utf8_decoder&#39;</span><span class="p">]</span>


<div class="viewcode-block" id="utf8_decoder"><a class="viewcode-back" href="../../../pyspark.streaming.html#pyspark.streaming.flume.utf8_decoder">[docs]</a><span class="k">def</span> <span class="nf">utf8_decoder</span><span class="p">(</span><span class="n">s</span><span class="p">):</span>
    <span class="sd">&quot;&quot;&quot; Decode the unicode as UTF-8 &quot;&quot;&quot;</span>
    <span class="k">if</span> <span class="n">s</span> <span class="ow">is</span> <span class="bp">None</span><span class="p">:</span>
        <span class="k">return</span> <span class="bp">None</span>
    <span class="k">return</span> <span class="n">s</span><span class="o">.</span><span class="n">decode</span><span class="p">(</span><span class="s">&#39;utf-8&#39;</span><span class="p">)</span>

</div>
<div class="viewcode-block" id="FlumeUtils"><a class="viewcode-back" href="../../../pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils">[docs]</a><span class="k">class</span> <span class="nc">FlumeUtils</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>

    <span class="nd">@staticmethod</span>
<div class="viewcode-block" id="FlumeUtils.createStream"><a class="viewcode-back" href="../../../pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils.createStream">[docs]</a>    <span class="k">def</span> <span class="nf">createStream</span><span class="p">(</span><span class="n">ssc</span><span class="p">,</span> <span class="n">hostname</span><span class="p">,</span> <span class="n">port</span><span class="p">,</span>
                     <span class="n">storageLevel</span><span class="o">=</span><span class="n">StorageLevel</span><span class="o">.</span><span class="n">MEMORY_AND_DISK_SER_2</span><span class="p">,</span>
                     <span class="n">enableDecompression</span><span class="o">=</span><span class="bp">False</span><span class="p">,</span>
                     <span class="n">bodyDecoder</span><span class="o">=</span><span class="n">utf8_decoder</span><span class="p">):</span>
        <span class="sd">&quot;&quot;&quot;</span>
<span class="sd">        Create an input stream that pulls events from Flume.</span>

<span class="sd">        :param ssc:  StreamingContext object</span>
<span class="sd">        :param hostname:  Hostname of the slave machine to which the flume data will be sent</span>
<span class="sd">        :param port:  Port of the slave machine to which the flume data will be sent</span>
<span class="sd">        :param storageLevel:  Storage level to use for storing the received objects</span>
<span class="sd">        :param enableDecompression:  Should netty server decompress input stream</span>
<span class="sd">        :param bodyDecoder:  A function used to decode body (default is utf8_decoder)</span>
<span class="sd">        :return: A DStream object</span>
<span class="sd">        &quot;&quot;&quot;</span>
        <span class="n">jlevel</span> <span class="o">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_getJavaStorageLevel</span><span class="p">(</span><span class="n">storageLevel</span><span class="p">)</span>

        <span class="k">try</span><span class="p">:</span>
            <span class="n">helperClass</span> <span class="o">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">java</span><span class="o">.</span><span class="n">lang</span><span class="o">.</span><span class="n">Thread</span><span class="o">.</span><span class="n">currentThread</span><span class="p">()</span><span class="o">.</span><span class="n">getContextClassLoader</span><span class="p">()</span>\
                <span class="o">.</span><span class="n">loadClass</span><span class="p">(</span><span class="s">&quot;org.apache.spark.streaming.flume.FlumeUtilsPythonHelper&quot;</span><span class="p">)</span>
            <span class="n">helper</span> <span class="o">=</span> <span class="n">helperClass</span><span class="o">.</span><span class="n">newInstance</span><span class="p">()</span>
            <span class="n">jstream</span> <span class="o">=</span> <span class="n">helper</span><span class="o">.</span><span class="n">createStream</span><span class="p">(</span><span class="n">ssc</span><span class="o">.</span><span class="n">_jssc</span><span class="p">,</span> <span class="n">hostname</span><span class="p">,</span> <span class="n">port</span><span class="p">,</span> <span class="n">jlevel</span><span class="p">,</span> <span class="n">enableDecompression</span><span class="p">)</span>
        <span class="k">except</span> <span class="n">Py4JJavaError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
            <span class="k">if</span> <span class="s">&#39;ClassNotFoundException&#39;</span> <span class="ow">in</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">java_exception</span><span class="p">):</span>
                <span class="n">FlumeUtils</span><span class="o">.</span><span class="n">_printErrorMsg</span><span class="p">(</span><span class="n">ssc</span><span class="o">.</span><span class="n">sparkContext</span><span class="p">)</span>
            <span class="k">raise</span> <span class="n">e</span>

        <span class="k">return</span> <span class="n">FlumeUtils</span><span class="o">.</span><span class="n">_toPythonDStream</span><span class="p">(</span><span class="n">ssc</span><span class="p">,</span> <span class="n">jstream</span><span class="p">,</span> <span class="n">bodyDecoder</span><span class="p">)</span>
</div>
    <span class="nd">@staticmethod</span>
<div class="viewcode-block" id="FlumeUtils.createPollingStream"><a class="viewcode-back" href="../../../pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils.createPollingStream">[docs]</a>    <span class="k">def</span> <span class="nf">createPollingStream</span><span class="p">(</span><span class="n">ssc</span><span class="p">,</span> <span class="n">addresses</span><span class="p">,</span>
                            <span class="n">storageLevel</span><span class="o">=</span><span class="n">StorageLevel</span><span class="o">.</span><span class="n">MEMORY_AND_DISK_SER_2</span><span class="p">,</span>
                            <span class="n">maxBatchSize</span><span class="o">=</span><span class="mi">1000</span><span class="p">,</span>
                            <span class="n">parallelism</span><span class="o">=</span><span class="mi">5</span><span class="p">,</span>
                            <span class="n">bodyDecoder</span><span class="o">=</span><span class="n">utf8_decoder</span><span class="p">):</span>
        <span class="sd">&quot;&quot;&quot;</span>
<span class="sd">        Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.</span>
<span class="sd">        This stream will poll the sink for data and will pull events as they are available.</span>

<span class="sd">        :param ssc:  StreamingContext object</span>
<span class="sd">        :param addresses:  List of (host, port)s on which the Spark Sink is running.</span>
<span class="sd">        :param storageLevel:  Storage level to use for storing the received objects</span>
<span class="sd">        :param maxBatchSize:  The maximum number of events to be pulled from the Spark sink</span>
<span class="sd">                              in a single RPC call</span>
<span class="sd">        :param parallelism:  Number of concurrent requests this stream should send to the sink.</span>
<span class="sd">                             Note that having a higher number of requests concurrently being pulled</span>
<span class="sd">                             will result in this stream using more threads</span>
<span class="sd">        :param bodyDecoder:  A function used to decode body (default is utf8_decoder)</span>
<span class="sd">        :return: A DStream object</span>
<span class="sd">        &quot;&quot;&quot;</span>
        <span class="n">jlevel</span> <span class="o">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">_sc</span><span class="o">.</span><span class="n">_getJavaStorageLevel</span><span class="p">(</span><span class="n">storageLevel</span><span class="p">)</span>
        <span class="n">hosts</span> <span class="o">=</span> <span class="p">[]</span>
        <span class="n">ports</span> <span class="o">=</span> <span class="p">[]</span>
        <span class="k">for</span> <span class="p">(</span><span class="n">host</span><span class="p">,</span> <span class="n">port</span><span class="p">)</span> <span class="ow">in</span> <span class="n">addresses</span><span class="p">:</span>
            <span class="n">hosts</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">host</span><span class="p">)</span>
            <span class="n">ports</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">port</span><span class="p">)</span>

        <span class="k">try</span><span class="p">:</span>
            <span class="n">helperClass</span> <span class="o">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">_jvm</span><span class="o">.</span><span class="n">java</span><span class="o">.</span><span class="n">lang</span><span class="o">.</span><span class="n">Thread</span><span class="o">.</span><span class="n">currentThread</span><span class="p">()</span><span class="o">.</span><span class="n">getContextClassLoader</span><span class="p">()</span> \
                <span class="o">.</span><span class="n">loadClass</span><span class="p">(</span><span class="s">&quot;org.apache.spark.streaming.flume.FlumeUtilsPythonHelper&quot;</span><span class="p">)</span>
            <span class="n">helper</span> <span class="o">=</span> <span class="n">helperClass</span><span class="o">.</span><span class="n">newInstance</span><span class="p">()</span>
            <span class="n">jstream</span> <span class="o">=</span> <span class="n">helper</span><span class="o">.</span><span class="n">createPollingStream</span><span class="p">(</span>
                <span class="n">ssc</span><span class="o">.</span><span class="n">_jssc</span><span class="p">,</span> <span class="n">hosts</span><span class="p">,</span> <span class="n">ports</span><span class="p">,</span> <span class="n">jlevel</span><span class="p">,</span> <span class="n">maxBatchSize</span><span class="p">,</span> <span class="n">parallelism</span><span class="p">)</span>
        <span class="k">except</span> <span class="n">Py4JJavaError</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
            <span class="k">if</span> <span class="s">&#39;ClassNotFoundException&#39;</span> <span class="ow">in</span> <span class="nb">str</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">java_exception</span><span class="p">):</span>
                <span class="n">FlumeUtils</span><span class="o">.</span><span class="n">_printErrorMsg</span><span class="p">(</span><span class="n">ssc</span><span class="o">.</span><span class="n">sparkContext</span><span class="p">)</span>
            <span class="k">raise</span> <span class="n">e</span>

        <span class="k">return</span> <span class="n">FlumeUtils</span><span class="o">.</span><span class="n">_toPythonDStream</span><span class="p">(</span><span class="n">ssc</span><span class="p">,</span> <span class="n">jstream</span><span class="p">,</span> <span class="n">bodyDecoder</span><span class="p">)</span>
</div>
    <span class="nd">@staticmethod</span>
    <span class="k">def</span> <span class="nf">_toPythonDStream</span><span class="p">(</span><span class="n">ssc</span><span class="p">,</span> <span class="n">jstream</span><span class="p">,</span> <span class="n">bodyDecoder</span><span class="p">):</span>
        <span class="n">ser</span> <span class="o">=</span> <span class="n">PairDeserializer</span><span class="p">(</span><span class="n">NoOpSerializer</span><span class="p">(),</span> <span class="n">NoOpSerializer</span><span class="p">())</span>
        <span class="n">stream</span> <span class="o">=</span> <span class="n">DStream</span><span class="p">(</span><span class="n">jstream</span><span class="p">,</span> <span class="n">ssc</span><span class="p">,</span> <span class="n">ser</span><span class="p">)</span>

        <span class="k">def</span> <span class="nf">func</span><span class="p">(</span><span class="n">event</span><span class="p">):</span>
            <span class="n">headersBytes</span> <span class="o">=</span> <span class="n">BytesIO</span><span class="p">(</span><span class="n">event</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span> <span class="k">if</span> <span class="n">sys</span><span class="o">.</span><span class="n">version</span> <span class="o">&gt;=</span> <span class="s">&quot;3&quot;</span> <span class="k">else</span> <span class="n">StringIO</span><span class="p">(</span><span class="n">event</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
            <span class="n">headers</span> <span class="o">=</span> <span class="p">{}</span>
            <span class="n">strSer</span> <span class="o">=</span> <span class="n">UTF8Deserializer</span><span class="p">()</span>
            <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="n">read_int</span><span class="p">(</span><span class="n">headersBytes</span><span class="p">)):</span>
                <span class="n">key</span> <span class="o">=</span> <span class="n">strSer</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">headersBytes</span><span class="p">)</span>
                <span class="n">value</span> <span class="o">=</span> <span class="n">strSer</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">headersBytes</span><span class="p">)</span>
                <span class="n">headers</span><span class="p">[</span><span class="n">key</span><span class="p">]</span> <span class="o">=</span> <span class="n">value</span>
            <span class="n">body</span> <span class="o">=</span> <span class="n">bodyDecoder</span><span class="p">(</span><span class="n">event</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span>
            <span class="k">return</span> <span class="p">(</span><span class="n">headers</span><span class="p">,</span> <span class="n">body</span><span class="p">)</span>
        <span class="k">return</span> <span class="n">stream</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="n">func</span><span class="p">)</span>

    <span class="nd">@staticmethod</span>
    <span class="k">def</span> <span class="nf">_printErrorMsg</span><span class="p">(</span><span class="n">sc</span><span class="p">):</span>
        <span class="k">print</span><span class="p">(</span><span class="s">&quot;&quot;&quot;</span>
<span class="s">________________________________________________________________________________________________</span>

<span class="s">  Spark Streaming&#39;s Flume libraries not found in class path. Try one of the following.</span>

<span class="s">  1. Include the Flume library and its dependencies with in the</span>
<span class="s">     spark-submit command as</span>

<span class="s">     $ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:</span><span class="si">%s</span><span class="s"> ...</span>

<span class="s">  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,</span>
<span class="s">     Group Id = org.apache.spark, Artifact Id = spark-streaming-flume-assembly, Version = </span><span class="si">%s</span><span class="s">.</span>
<span class="s">     Then, include the jar in the spark-submit command as</span>

<span class="s">     $ bin/spark-submit --jars &lt;spark-streaming-flume-assembly.jar&gt; ...</span>

<span class="s">________________________________________________________________________________________________</span>

<span class="s">&quot;&quot;&quot;</span> <span class="o">%</span> <span class="p">(</span><span class="n">sc</span><span class="o">.</span><span class="n">version</span><span class="p">,</span> <span class="n">sc</span><span class="o">.</span><span class="n">version</span><span class="p">))</span></div>
</pre></div>

          </div>
        </div>
      </div>
      <div class="sphinxsidebar" role="navigation" aria-label="main navigation">
        <div class="sphinxsidebarwrapper">
            <p class="logo"><a href="../../../index.html">
              <img class="logo" src="../../../_static/spark-logo-hd.png" alt="Logo"/>
            </a></p>
<div id="searchbox" style="display: none" role="search">
  <h3>Quick search</h3>
    <form class="search" action="../../../search.html" method="get">
      <input type="text" name="q" />
      <input type="submit" value="Go" />
      <input type="hidden" name="check_keywords" value="yes" />
      <input type="hidden" name="area" value="default" />
    </form>
    <p class="searchtip" style="font-size: 90%">
    Enter search terms or a module, class or function name.
    </p>
</div>
<script type="text/javascript">$('#searchbox').show(0);</script>
        </div>
      </div>
      <div class="clearer"></div>
    </div>
    <div class="related" role="navigation" aria-label="related navigation">
      <h3>Navigation</h3>
      <ul>
        <li class="nav-item nav-item-0"><a href="../../../index.html">PySpark master documentation</a> &raquo;</li>
          <li class="nav-item nav-item-1"><a href="../../index.html" >Module code</a> &raquo;</li> 
      </ul>
    </div>
    <div class="footer" role="contentinfo">
        &copy; Copyright .
      Created using <a href="http://sphinx-doc.org/">Sphinx</a> 1.3.1.
    </div>
  </body>
</html>