summaryrefslogtreecommitdiff
path: root/site/docs/1.0.1/streaming-custom-receivers.html
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@apache.org>2014-07-11 17:23:23 +0000
committerPatrick Wendell <pwendell@apache.org>2014-07-11 17:23:23 +0000
commit0beac4e243f85e71554fe04093b09eb1745fea82 (patch)
treebc20d10426c5d57e2f189305865dc2bbec447923 /site/docs/1.0.1/streaming-custom-receivers.html
parentddec2123ba6ab95543d1b250d4f20fb811c48f09 (diff)
downloadspark-website-0beac4e243f85e71554fe04093b09eb1745fea82.tar.gz
spark-website-0beac4e243f85e71554fe04093b09eb1745fea82.tar.bz2
spark-website-0beac4e243f85e71554fe04093b09eb1745fea82.zip
Updating docs for 1.0.1 release
Diffstat (limited to 'site/docs/1.0.1/streaming-custom-receivers.html')
-rw-r--r--site/docs/1.0.1/streaming-custom-receivers.html363
1 files changed, 363 insertions, 0 deletions
diff --git a/site/docs/1.0.1/streaming-custom-receivers.html b/site/docs/1.0.1/streaming-custom-receivers.html
new file mode 100644
index 000000000..4b1902a9c
--- /dev/null
+++ b/site/docs/1.0.1/streaming-custom-receivers.html
@@ -0,0 +1,363 @@
+<!DOCTYPE html>
+<!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
+<!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
+<!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
+ <head>
+ <meta charset="utf-8">
+ <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
+ <title>Spark Streaming Custom Receivers - Spark 1.0.1 Documentation</title>
+ <meta name="description" content="">
+
+
+
+ <link rel="stylesheet" href="css/bootstrap.min.css">
+ <style>
+ body {
+ padding-top: 60px;
+ padding-bottom: 40px;
+ }
+ </style>
+ <meta name="viewport" content="width=device-width">
+ <link rel="stylesheet" href="css/bootstrap-responsive.min.css">
+ <link rel="stylesheet" href="css/main.css">
+
+ <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
+
+ <link rel="stylesheet" href="css/pygments-default.css">
+
+
+ <!-- Google analytics script -->
+ <script type="text/javascript">
+ var _gaq = _gaq || [];
+ _gaq.push(['_setAccount', 'UA-32518208-1']);
+ _gaq.push(['_trackPageview']);
+
+ (function() {
+ var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
+ ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
+ var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
+ })();
+ </script>
+
+
+ </head>
+ <body>
+ <!--[if lt IE 7]>
+ <p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
+ <![endif]-->
+
+ <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->
+
+ <div class="navbar navbar-fixed-top" id="topbar">
+ <div class="navbar-inner">
+ <div class="container">
+ <div class="brand"><a href="index.html">
+ <img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">1.0.1</span>
+ </div>
+ <ul class="nav">
+ <!--TODO(andyk): Add class="active" attribute to li some how.-->
+ <li><a href="index.html">Overview</a></li>
+
+ <li class="dropdown">
+ <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a>
+ <ul class="dropdown-menu">
+ <li><a href="quick-start.html">Quick Start</a></li>
+ <li><a href="programming-guide.html">Spark Programming Guide</a></li>
+ <li class="divider"></li>
+ <li><a href="streaming-programming-guide.html">Spark Streaming</a></li>
+ <li><a href="sql-programming-guide.html">Spark SQL</a></li>
+ <li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li>
+ <li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
+ <li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
+ </ul>
+ </li>
+
+ <li class="dropdown">
+ <a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a>
+ <ul class="dropdown-menu">
+ <li><a href="api/scala/index.html#org.apache.spark.package">Scaladoc</a></li>
+ <li><a href="api/java/index.html">Javadoc</a></li>
+ <li><a href="api/python/index.html">Python API</a></li>
+ </ul>
+ </li>
+
+ <li class="dropdown">
+ <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
+ <ul class="dropdown-menu">
+ <li><a href="cluster-overview.html">Overview</a></li>
+ <li><a href="submitting-applications.html">Submitting Applications</a></li>
+ <li class="divider"></li>
+ <li><a href="ec2-scripts.html">Amazon EC2</a></li>
+ <li><a href="spark-standalone.html">Standalone Mode</a></li>
+ <li><a href="running-on-mesos.html">Mesos</a></li>
+ <li><a href="running-on-yarn.html">YARN</a></li>
+ </ul>
+ </li>
+
+ <li class="dropdown">
+ <a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
+ <ul class="dropdown-menu">
+ <li><a href="configuration.html">Configuration</a></li>
+ <li><a href="monitoring.html">Monitoring</a></li>
+ <li><a href="tuning.html">Tuning Guide</a></li>
+ <li><a href="job-scheduling.html">Job Scheduling</a></li>
+ <li><a href="security.html">Security</a></li>
+ <li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
+ <li><a href="hadoop-third-party-distributions.html">3<sup>rd</sup>-Party Hadoop Distros</a></li>
+ <li class="divider"></li>
+ <li><a href="building-with-maven.html">Building Spark with Maven</a></li>
+ <li><a href="https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark">Contributing to Spark</a></li>
+ </ul>
+ </li>
+ </ul>
+ <!--<p class="navbar-text pull-right"><span class="version-text">v1.0.1</span></p>-->
+ </div>
+ </div>
+ </div>
+
+ <div class="container" id="content">
+
+ <h1 class="title">Spark Streaming Custom Receivers</h1>
+
+
+ <p>Spark Streaming can receive streaming data from any arbitrary data source beyond
+the one&#8217;s for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.).
+This requires the developer to implement a <em>receiver</em> that is customized for receiving data from
+the concerned data source. This guide walks through the process of implementing a custom receiver
+and using it in a Spark Streaming application.</p>
+
+<h3 id="implementing-a-custom-receiver">Implementing a Custom Receiver</h3>
+
+<p>This starts with implementing a <a href="api/scala/index.html#org.apache.spark.streaming.receiver.Receiver">Receiver</a>.
+A custom receiver must extend this abstract class by implementing two methods
+- <code>onStart()</code>: Things to do to start receiving data.
+- <code>onStop()</code>: Things to do to stop receiving data.</p>
+
+<p>Note that <code>onStart()</code> and <code>onStop()</code> must not block indefinitely. Typically, onStart() would start the threads
+that responsible for receiving the data and <code>onStop()</code> would ensure that the receiving by those threads
+are stopped. The receiving threads can also use <code>isStopped()</code>, a <code>Receiver</code> method, to check whether they
+should stop receiving data.</p>
+
+<p>Once the data is received, that data can be stored inside Spark
+by calling <code>store(data)</code>, which is a method provided by the
+<a href="api/scala/index.html#org.apache.spark.streaming.receiver.Receiver">Receiver</a> class.
+There are number of flavours of <code>store()</code> which allow you store the received data
+record-at-a-time or as whole collection of objects / serialized bytes.</p>
+
+<p>Any exception in the receiving threads should be caught and handled properly to avoid silent
+failures of the receiver. <code>restart(&lt;exception&gt;)</code> will restart the receiver by
+asynchronously calling <code>onStop()</code> and then calling <code>onStart()</code> after a delay.
+<code>stop(&lt;exception&gt;)</code> will call <code>onStop()</code> and terminate the receiver. Also, <code>reportError(&lt;error&gt;)</code>
+reports a error message to the driver (visible in the logs and UI) without stopping / restarting
+the receiver.</p>
+
+<p>The following is a custom receiver that receives a stream of text over a socket. It treats
+&#8216;\n&#8217; delimited lines in the text stream as records and stores them with Spark. If the receiving thread
+has any error connecting or receiving, the receiver is restarted to make another attempt to connect.</p>
+
+<div class="codetabs">
+<div data-lang="scala">
+
+ <div class="highlight"><pre><code class="scala"><span class="k">class</span> <span class="nc">CustomReceiver</span><span class="o">(</span><span class="n">host</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">port</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span>
+ <span class="k">extends</span> <span class="nc">Receiver</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="nc">StorageLevel</span><span class="o">.</span><span class="nc">MEMORY_AND_DISK_2</span><span class="o">)</span> <span class="k">with</span> <span class="nc">Logging</span> <span class="o">{</span>
+
+ <span class="k">def</span> <span class="n">onStart</span><span class="o">()</span> <span class="o">{</span>
+ <span class="c1">// Start the thread that receives data over a connection</span>
+ <span class="k">new</span> <span class="nc">Thread</span><span class="o">(</span><span class="s">&quot;Socket Receiver&quot;</span><span class="o">)</span> <span class="o">{</span>
+ <span class="k">override</span> <span class="k">def</span> <span class="n">run</span><span class="o">()</span> <span class="o">{</span> <span class="n">receive</span><span class="o">()</span> <span class="o">}</span>
+ <span class="o">}.</span><span class="n">start</span><span class="o">()</span>
+ <span class="o">}</span>
+
+ <span class="k">def</span> <span class="n">onStop</span><span class="o">()</span> <span class="o">{</span>
+ <span class="c1">// There is nothing much to do as the thread calling receive()</span>
+ <span class="c1">// is designed to stop by itself isStopped() returns false</span>
+ <span class="o">}</span>
+
+ <span class="cm">/** Create a socket connection and receive data until receiver is stopped */</span>
+ <span class="k">private</span> <span class="k">def</span> <span class="n">receive</span><span class="o">()</span> <span class="o">{</span>
+ <span class="k">var</span> <span class="n">socket</span><span class="k">:</span> <span class="kt">Socket</span> <span class="o">=</span> <span class="kc">null</span>
+ <span class="k">var</span> <span class="n">userInput</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="kc">null</span>
+ <span class="k">try</span> <span class="o">{</span>
+ <span class="c1">// Connect to host:port</span>
+ <span class="n">socket</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">Socket</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">)</span>
+
+ <span class="c1">// Until stopped or connection broken continue reading</span>
+ <span class="k">val</span> <span class="n">reader</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">BufferedReader</span><span class="o">(</span><span class="k">new</span> <span class="nc">InputStreamReader</span><span class="o">(</span><span class="n">socket</span><span class="o">.</span><span class="n">getInputStream</span><span class="o">(),</span> <span class="s">&quot;UTF-8&quot;</span><span class="o">))</span>
+ <span class="n">userInput</span> <span class="k">=</span> <span class="n">reader</span><span class="o">.</span><span class="n">readLine</span><span class="o">()</span>
+ <span class="k">while</span><span class="o">(!</span><span class="n">isStopped</span> <span class="o">&amp;&amp;</span> <span class="n">userInput</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">store</span><span class="o">(</span><span class="n">userInput</span><span class="o">)</span>
+ <span class="n">userInput</span> <span class="k">=</span> <span class="n">reader</span><span class="o">.</span><span class="n">readLine</span><span class="o">()</span>
+ <span class="o">}</span>
+ <span class="n">reader</span><span class="o">.</span><span class="n">close</span><span class="o">()</span>
+ <span class="n">socket</span><span class="o">.</span><span class="n">close</span><span class="o">()</span>
+
+ <span class="c1">// Restart in an attempt to connect again when server is active again</span>
+ <span class="n">restart</span><span class="o">(</span><span class="s">&quot;Trying to connect again&quot;</span><span class="o">)</span>
+ <span class="o">}</span> <span class="k">catch</span> <span class="o">{</span>
+ <span class="k">case</span> <span class="n">e</span><span class="k">:</span> <span class="kt">java.net.ConnectException</span> <span class="o">=&gt;</span>
+ <span class="c1">// restart if could not connect to server</span>
+ <span class="n">restart</span><span class="o">(</span><span class="s">&quot;Error connecting to &quot;</span> <span class="o">+</span> <span class="n">host</span> <span class="o">+</span> <span class="s">&quot;:&quot;</span> <span class="o">+</span> <span class="n">port</span><span class="o">,</span> <span class="n">e</span><span class="o">)</span>
+ <span class="k">case</span> <span class="n">t</span><span class="k">:</span> <span class="kt">Throwable</span> <span class="o">=&gt;</span>
+ <span class="c1">// restart if there is any other error</span>
+ <span class="n">restart</span><span class="o">(</span><span class="s">&quot;Error receiving data&quot;</span><span class="o">,</span> <span class="n">t</span><span class="o">)</span>
+ <span class="o">}</span>
+ <span class="o">}</span>
+<span class="o">}</span>
+</code></pre></div>
+
+ </div>
+<div data-lang="java">
+
+ <div class="highlight"><pre><code class="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">JavaCustomReceiver</span> <span class="kd">extends</span> <span class="n">Receiver</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
+
+ <span class="n">String</span> <span class="n">host</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
+ <span class="kt">int</span> <span class="n">port</span> <span class="o">=</span> <span class="o">-</span><span class="mi">1</span><span class="o">;</span>
+
+ <span class="kd">public</span> <span class="nf">JavaCustomReceiver</span><span class="o">(</span><span class="n">String</span> <span class="n">host_</span> <span class="o">,</span> <span class="kt">int</span> <span class="n">port_</span><span class="o">)</span> <span class="o">{</span>
+ <span class="kd">super</span><span class="o">(</span><span class="n">StorageLevel</span><span class="o">.</span><span class="na">MEMORY_AND_DISK_2</span><span class="o">());</span>
+ <span class="n">host</span> <span class="o">=</span> <span class="n">host_</span><span class="o">;</span>
+ <span class="n">port</span> <span class="o">=</span> <span class="n">port_</span><span class="o">;</span>
+ <span class="o">}</span>
+
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onStart</span><span class="o">()</span> <span class="o">{</span>
+ <span class="c1">// Start the thread that receives data over a connection</span>
+ <span class="k">new</span> <span class="nf">Thread</span><span class="o">()</span> <span class="o">{</span>
+ <span class="nd">@Override</span> <span class="kd">public</span> <span class="kt">void</span> <span class="n">run</span><span class="o">()</span> <span class="o">{</span>
+ <span class="n">receive</span><span class="o">();</span>
+ <span class="o">}</span>
+ <span class="o">}.</span><span class="na">start</span><span class="o">();</span>
+ <span class="o">}</span>
+
+ <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onStop</span><span class="o">()</span> <span class="o">{</span>
+ <span class="c1">// There is nothing much to do as the thread calling receive()</span>
+ <span class="c1">// is designed to stop by itself isStopped() returns false</span>
+ <span class="o">}</span>
+
+ <span class="cm">/** Create a socket connection and receive data until receiver is stopped */</span>
+ <span class="kd">private</span> <span class="kt">void</span> <span class="nf">receive</span><span class="o">()</span> <span class="o">{</span>
+ <span class="n">Socket</span> <span class="n">socket</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
+ <span class="n">String</span> <span class="n">userInput</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
+
+ <span class="k">try</span> <span class="o">{</span>
+ <span class="c1">// connect to the server</span>
+ <span class="n">socket</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Socket</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">);</span>
+
+ <span class="n">BufferedReader</span> <span class="n">reader</span> <span class="o">=</span> <span class="k">new</span> <span class="n">BufferedReader</span><span class="o">(</span><span class="k">new</span> <span class="n">InputStreamReader</span><span class="o">(</span><span class="n">socket</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">()));</span>
+
+ <span class="c1">// Until stopped or connection broken continue reading</span>
+ <span class="k">while</span> <span class="o">(!</span><span class="n">isStopped</span><span class="o">()</span> <span class="o">&amp;&amp;</span> <span class="o">(</span><span class="n">userInput</span> <span class="o">=</span> <span class="n">reader</span><span class="o">.</span><span class="na">readLine</span><span class="o">())</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;Received data &#39;&quot;</span> <span class="o">+</span> <span class="n">userInput</span> <span class="o">+</span> <span class="s">&quot;&#39;&quot;</span><span class="o">);</span>
+ <span class="n">store</span><span class="o">(</span><span class="n">userInput</span><span class="o">);</span>
+ <span class="o">}</span>
+ <span class="n">reader</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
+ <span class="n">socket</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
+
+ <span class="c1">// Restart in an attempt to connect again when server is active again</span>
+ <span class="n">restart</span><span class="o">(</span><span class="s">&quot;Trying to connect again&quot;</span><span class="o">);</span>
+ <span class="o">}</span> <span class="k">catch</span><span class="o">(</span><span class="n">ConnectException</span> <span class="n">ce</span><span class="o">)</span> <span class="o">{</span>
+ <span class="c1">// restart if could not connect to server</span>
+ <span class="n">restart</span><span class="o">(</span><span class="s">&quot;Could not connect&quot;</span><span class="o">,</span> <span class="n">ce</span><span class="o">);</span>
+ <span class="o">}</span> <span class="k">catch</span><span class="o">(</span><span class="n">Throwable</span> <span class="n">t</span><span class="o">)</span> <span class="o">{</span>
+ <span class="c1">// restart if there is any other error</span>
+ <span class="n">restart</span><span class="o">(</span><span class="s">&quot;Error receiving data&quot;</span><span class="o">,</span> <span class="n">t</span><span class="o">);</span>
+ <span class="o">}</span>
+ <span class="o">}</span>
+<span class="o">}</span>
+</code></pre></div>
+
+ </div>
+</div>
+
+<h3 id="using-the-custom-receiver-in-a-spark-streaming-application">Using the custom receiver in a Spark Streaming application</h3>
+
+<p>The custom receiver can be used in a Spark Streaming application by using
+<code>streamingContext.receiverStream(&lt;instance of custom receiver&gt;)</code>. This will create
+input DStream using data received by the instance of custom receiver, as shown below</p>
+
+<div class="codetabs">
+<div data-lang="scala">
+
+ <div class="highlight"><pre><code class="scala"><span class="c1">// Assuming ssc is the StreamingContext</span>
+<span class="k">val</span> <span class="n">customReceiverStream</span> <span class="k">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">receiverStream</span><span class="o">(</span><span class="k">new</span> <span class="nc">CustomReceiver</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">))</span>
+<span class="k">val</span> <span class="n">words</span> <span class="k">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">flatMap</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">))</span>
+<span class="o">...</span>
+</code></pre></div>
+
+ <p>The full source code is in the example <a href="https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala">CustomReceiver.scala</a>.</p>
+
+ </div>
+<div data-lang="java">
+
+ <div class="highlight"><pre><code class="java"><span class="c1">// Assuming ssc is the JavaStreamingContext</span>
+<span class="n">JavaDStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">customReceiverStream</span> <span class="o">=</span> <span class="n">ssc</span><span class="o">.</span><span class="na">receiverStream</span><span class="o">(</span><span class="k">new</span> <span class="n">JavaCustomReceiver</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">));</span>
+<span class="n">JavaDStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">});</span>
+<span class="o">...</span>
+</code></pre></div>
+
+ <p>The full source code is in the example <a href="https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java">JavaCustomReceiver.java</a>.</p>
+
+ </div>
+</div>
+
+<h3 id="implementing-and-using-a-custom-actor-based-receiver">Implementing and Using a Custom Actor-based Receiver</h3>
+
+<p>Custom <a href="http://doc.akka.io/docs/akka/2.2.4/scala/actors.html">Akka Actors</a> can also be used to
+receive data. The <a href="api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper"><code>ActorHelper</code></a>
+trait can be applied on any Akka actor, which allows received data to be stored in Spark using
+ <code>store(...)</code> methods. The supervisor strategy of this actor can be configured to handle failures, etc.</p>
+
+<div class="highlight"><pre><code class="scala"><span class="k">class</span> <span class="nc">CustomActor</span> <span class="k">extends</span> <span class="nc">Actor</span> <span class="k">with</span> <span class="nc">ActorHelper</span> <span class="o">{</span>
+ <span class="k">def</span> <span class="n">receive</span> <span class="k">=</span> <span class="o">{</span>
+ <span class="k">case</span> <span class="n">data</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=&gt;</span> <span class="n">store</span><span class="o">(</span><span class="n">data</span><span class="o">)</span>
+ <span class="o">}</span>
+<span class="o">}</span>
+</code></pre></div>
+
+<p>And a new input stream can be created with this custom actor as</p>
+
+<div class="highlight"><pre><code class="scala"><span class="c1">// Assuming ssc is the StreamingContext</span>
+<span class="k">val</span> <span class="n">lines</span> <span class="k">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">actorStream</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="nc">Props</span><span class="o">(</span><span class="k">new</span> <span class="nc">CustomActor</span><span class="o">()),</span> <span class="s">&quot;CustomReceiver&quot;</span><span class="o">)</span>
+</code></pre></div>
+
+<p>See <a href="https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala">ActorWordCount.scala</a>
+for an end-to-end example.</p>
+
+
+
+ </div> <!-- /container -->
+
+ <script src="js/vendor/jquery-1.8.0.min.js"></script>
+ <script src="js/vendor/bootstrap.min.js"></script>
+ <script src="js/main.js"></script>
+
+ <!-- MathJax Section -->
+ <script type="text/x-mathjax-config">
+ MathJax.Hub.Config({
+ TeX: { equationNumbers: { autoNumber: "AMS" } }
+ });
+ </script>
+ <script>
+ // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
+ // We could use "//cdn.mathjax...", but that won't support "file://".
+ (function(d, script) {
+ script = d.createElement('script');
+ script.type = 'text/javascript';
+ script.async = true;
+ script.onload = function(){
+ MathJax.Hub.Config({
+ tex2jax: {
+ inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
+ displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
+ processEscapes: true,
+ skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
+ }
+ });
+ };
+ script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
+ 'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML';
+ d.getElementsByTagName('head')[0].appendChild(script);
+ }(document));
+ </script>
+ </body>
+</html>