aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-custom-receivers.md
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-05-05 15:28:19 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-05-05 15:28:19 -0700
commita975a19f21e71f448b3fdb2ed4461e28ef439900 (patch)
tree3305b05ddec8ccebaa1f3a88dfef516afb6d0ad2 /docs/streaming-custom-receivers.md
parent3292e2a71bfb5df5ba156cf7557747d164d12291 (diff)
downloadspark-a975a19f21e71f448b3fdb2ed4461e28ef439900.tar.gz
spark-a975a19f21e71f448b3fdb2ed4461e28ef439900.tar.bz2
spark-a975a19f21e71f448b3fdb2ed4461e28ef439900.zip
[SPARK-1504], [SPARK-1505], [SPARK-1558] Updated Spark Streaming guide
- SPARK-1558: Updated custom receiver guide to match it with the new API - SPARK-1504: Added deployment and monitoring subsection to streaming - SPARK-1505: Added migration guide for migrating from 0.9.x and below to Spark 1.0 - Updated various Java streaming examples to use JavaReceiverInputDStream to highlight the API change. - Removed the requirement for cleaner ttl from streaming guide Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #652 from tdas/doc-fix and squashes the following commits: cb4f4b7 [Tathagata Das] Possible fix for flaky graceful shutdown test. ab71f7f [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into doc-fix 8d6ff9b [Tathagata Das] Addded migration guide to Spark Streaming. 7d171df [Tathagata Das] Added reference to JavaReceiverInputStream in examples and streaming guide. 49edd7c [Tathagata Das] Change java doc links to use Java docs. 11528d7 [Tathagata Das] Updated links on index page. ff80970 [Tathagata Das] More updates to streaming guide. 4dc42e9 [Tathagata Das] Added monitoring and other documentation in the streaming guide. 14c6564 [Tathagata Das] Updated custom receiver guide.
Diffstat (limited to 'docs/streaming-custom-receivers.md')
-rw-r--r--docs/streaming-custom-receivers.md273
1 files changed, 183 insertions, 90 deletions
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index 3cfa4516cc..a2dc3a8961 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -3,126 +3,219 @@ layout: global
title: Spark Streaming Custom Receivers
---
-A "Spark Streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need.
+Spark Streaming can receive streaming data from any arbitrary data source beyond
+the one's for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.).
+This requires the developer to implement a *receiver* that is customized for receiving data from
+the concerned data source. This guide walks through the process of implementing a custom receiver
+and using it in a Spark Streaming application.
+
+### Implementing a Custom Receiver
+
+This starts with implementing a [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver).
+A custom receiver must extend this abstract class by implementing two methods
+- `onStart()`: Things to do to start receiving data.
+- `onStop()`: Things to do to stop receiving data.
+
+Note that `onStart()` and `onStop()` must not block indefinitely. Typically, onStart() would start the threads
+that responsible for receiving the data and `onStop()` would ensure that the receiving by those threads
+are stopped. The receiving threads can also use `isStopped()`, a `Receiver` method, to check whether they
+should stop receiving data.
+
+Once the data is received, that data can be stored inside Spark
+by calling `store(data)`, which is a method provided by the
+[Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) class.
+There are number of flavours of `store()` which allow you store the received data
+record-at-a-time or as whole collection of objects / serialized bytes.
+
+Any exception in the receiving threads should be caught and handled properly to avoid silent
+failures of the receiver. `restart(<exception>)` will restart the receiver by
+asynchronously calling `onStop()` and then calling `onStart()` after a delay.
+`stop(<exception>)` will call `onStop()` and terminate the receiver. Also, `reportError(<error>)`
+reports a error message to the driver (visible in the logs and UI) without stopping / restarting
+the receiver.
+
+The following is a custom receiver that receives a stream of text over a socket. It treats
+'\n' delimited lines in the text stream as records and stores them with Spark. If the receiving thread
+has any error connecting or receiving, the receiver is restarted to make another attempt to connect.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1" >
-This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
+{% highlight scala %}
-### Writing a Simple Receiver
+class CustomReceiver(host: String, port: Int)
+ extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
+
+ def onStart() {
+ // Start the thread that receives data over a connection
+ new Thread("Socket Receiver") {
+ override def run() { receive() }
+ }.start()
+ }
+
+ def onStop() {
+ // There is nothing much to do as the thread calling receive()
+ // is designed to stop by itself isStopped() returns false
+ }
+
+ /** Create a socket connection and receive data until receiver is stopped */
+ private def receive() {
+ var socket: Socket = null
+ var userInput: String = null
+ try {
+ // Connect to host:port
+ socket = new Socket(host, port)
+
+ // Until stopped or connection broken continue reading
+ val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
+ userInput = reader.readLine()
+ while(!isStopped && userInput != null) {
+ store(userInput)
+ userInput = reader.readLine()
+ }
+ reader.close()
+ socket.close()
+
+ // Restart in an attempt to connect again when server is active again
+ restart("Trying to connect again")
+ } catch {
+ case e: java.net.ConnectException =>
+ // restart if could not connect to server
+ restart("Error connecting to " + host + ":" + port, e)
+ case t: Throwable =>
+ // restart if there is any other error
+ restart("Error receiving data", t)
+ }
+ }
+}
-This starts with implementing [NetworkReceiver](api/scala/index.html#org.apache.spark.streaming.dstream.NetworkReceiver).
+{% endhighlight %}
-The following is a simple socket text-stream receiver.
+</div>
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+
+public class JavaCustomReceiver extends Receiver<String> {
+
+ String host = null;
+ int port = -1;
+
+ public JavaCustomReceiver(String host_ , int port_) {
+ super(StorageLevel.MEMORY_AND_DISK_2());
+ host = host_;
+ port = port_;
+ }
+
+ public void onStart() {
+ // Start the thread that receives data over a connection
+ new Thread() {
+ @Override public void run() {
+ receive();
+ }
+ }.start();
+ }
+
+ public void onStop() {
+ // There is nothing much to do as the thread calling receive()
+ // is designed to stop by itself isStopped() returns false
+ }
+
+ /** Create a socket connection and receive data until receiver is stopped */
+ private void receive() {
+ Socket socket = null;
+ String userInput = null;
+
+ try {
+ // connect to the server
+ socket = new Socket(host, port);
+
+ BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+
+ // Until stopped or connection broken continue reading
+ while (!isStopped() && (userInput = reader.readLine()) != null) {
+ System.out.println("Received data '" + userInput + "'");
+ store(userInput);
+ }
+ reader.close();
+ socket.close();
+
+ // Restart in an attempt to connect again when server is active again
+ restart("Trying to connect again");
+ } catch(ConnectException ce) {
+ // restart if could not connect to server
+ restart("Could not connect", ce);
+ } catch(Throwable t) {
+ // restart if there is any other error
+ restart("Error receiving data", t);
+ }
+ }
+}
-{% highlight scala %}
- class SocketTextStreamReceiver(host: String, port: Int)
- extends NetworkReceiver[String]
- {
- protected lazy val blocksGenerator: BlockGenerator =
- new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)
-
- protected def onStart() = {
- blocksGenerator.start()
- val socket = new Socket(host, port)
- val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
- var data: String = dataInputStream.readLine()
- while (data != null) {
- blocksGenerator += data
- data = dataInputStream.readLine()
- }
- }
-
- protected def onStop() {
- blocksGenerator.stop()
- }
- }
{% endhighlight %}
+</div>
+</div>
-All we did here is extended NetworkReceiver and called blockGenerator's API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details.
+### Using the custom receiver in a Spark Streaming application
-### An Actor as Receiver
+The custom receiver can be used in a Spark Streaming application by using
+`streamingContext.receiverStream(<instance of custom receiver>)`. This will create
+input DStream using data received by the instance of custom receiver, as shown below
-This starts with implementing [Actor](#References)
-
-Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api.
+<div class="codetabs">
+<div data-lang="scala" markdown="1" >
{% highlight scala %}
- class SocketTextStreamReceiver (host:String,
- port:Int,
- bytesToString: ByteString => String) extends Actor with Receiver {
-
- override def preStart = IOManager(context.system).connect(host, port)
-
- def receive = {
- case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
- }
-
- }
+// Assuming ssc is the StreamingContext
+val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
+val words = lines.flatMap(_.split(" "))
+...
{% endhighlight %}
-All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details.
-
-### A Sample Spark Application
+The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala).
-* First create a Spark streaming context with master url and batchduration.
+</div>
+<div data-lang="java" markdown="1">
-{% highlight scala %}
- val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
- Seconds(batchDuration))
+{% highlight java %}
+// Assuming ssc is the JavaStreamingContext
+JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
+JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });
+...
{% endhighlight %}
-* Plug-in the custom receiver into the spark streaming context and create a DStream.
+The full source code is in the example [JavaCustomReceiver.java](https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java).
-{% highlight scala %}
- val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
- "localhost", 8445))
-{% endhighlight %}
+</div>
+</div>
-* OR Plug-in the actor as receiver into the spark streaming context and create a DStream.
-{% highlight scala %}
- val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
- "localhost",8445, z => z.utf8String)),"SocketReceiver")
-{% endhighlight %}
-* Process it.
+### Implementing and Using a Custom Actor-based Receiver
-{% highlight scala %}
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used to
+receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
+trait can be applied on any Akka actor, which allows received data to be stored in Spark using
+ `store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc.
- wordCounts.print()
- ssc.start()
+{% highlight scala %}
+class CustomActor extends Actor with ActorHelper {
+ def receive = {
+ case data: String => store(data)
+ }
+}
{% endhighlight %}
-* After processing it, stream can be tested using the netcat utility.
-
- $ nc -l localhost 8445
- hello world
- hello hello
-
-
-## Multiple Homogeneous/Heterogeneous Receivers.
-
-A DStream union operation is provided for taking union on multiple input streams.
+And a new input stream can be created with this custom actor as
{% highlight scala %}
- val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
- "localhost",8445, z => z.utf8String)),"SocketReceiver")
-
- // Another socket stream receiver
- val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
- "localhost",8446, z => z.utf8String)),"SocketReceiver")
-
- val union = lines.union(lines2)
+// Assuming ssc is the StreamingContext
+val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")
{% endhighlight %}
-Above stream can be easily process as described earlier.
-
-_A more comprehensive example is provided in the spark streaming examples_
+See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala)
+for an end-to-end example.
-## References
-1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
-2.[NetworkReceiver](api/scala/index.html#org.apache.spark.streaming.dstream.NetworkReceiver)