diff options
Diffstat (limited to 'docs/streaming-custom-receivers.md')
-rw-r--r-- | docs/streaming-custom-receivers.md | 273 |
1 files changed, 183 insertions, 90 deletions
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 3cfa4516cc..a2dc3a8961 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -3,126 +3,219 @@ layout: global title: Spark Streaming Custom Receivers --- -A "Spark Streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need. +Spark Streaming can receive streaming data from any arbitrary data source beyond +the one's for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.). +This requires the developer to implement a *receiver* that is customized for receiving data from +the concerned data source. This guide walks through the process of implementing a custom receiver +and using it in a Spark Streaming application. + +### Implementing a Custom Receiver + +This starts with implementing a [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver). +A custom receiver must extend this abstract class by implementing two methods +- `onStart()`: Things to do to start receiving data. +- `onStop()`: Things to do to stop receiving data. + +Note that `onStart()` and `onStop()` must not block indefinitely. Typically, onStart() would start the threads +that responsible for receiving the data and `onStop()` would ensure that the receiving by those threads +are stopped. The receiving threads can also use `isStopped()`, a `Receiver` method, to check whether they +should stop receiving data. + +Once the data is received, that data can be stored inside Spark +by calling `store(data)`, which is a method provided by the +[Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) class. +There are number of flavours of `store()` which allow you store the received data +record-at-a-time or as whole collection of objects / serialized bytes. + +Any exception in the receiving threads should be caught and handled properly to avoid silent +failures of the receiver. `restart(<exception>)` will restart the receiver by +asynchronously calling `onStop()` and then calling `onStart()` after a delay. +`stop(<exception>)` will call `onStop()` and terminate the receiver. Also, `reportError(<error>)` +reports a error message to the driver (visible in the logs and UI) without stopping / restarting +the receiver. + +The following is a custom receiver that receives a stream of text over a socket. It treats +'\n' delimited lines in the text stream as records and stores them with Spark. If the receiving thread +has any error connecting or receiving, the receiver is restarted to make another attempt to connect. + +<div class="codetabs"> +<div data-lang="scala" markdown="1" > -This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application. +{% highlight scala %} -### Writing a Simple Receiver +class CustomReceiver(host: String, port: Int) + extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { + + def onStart() { + // Start the thread that receives data over a connection + new Thread("Socket Receiver") { + override def run() { receive() } + }.start() + } + + def onStop() { + // There is nothing much to do as the thread calling receive() + // is designed to stop by itself isStopped() returns false + } + + /** Create a socket connection and receive data until receiver is stopped */ + private def receive() { + var socket: Socket = null + var userInput: String = null + try { + // Connect to host:port + socket = new Socket(host, port) + + // Until stopped or connection broken continue reading + val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) + userInput = reader.readLine() + while(!isStopped && userInput != null) { + store(userInput) + userInput = reader.readLine() + } + reader.close() + socket.close() + + // Restart in an attempt to connect again when server is active again + restart("Trying to connect again") + } catch { + case e: java.net.ConnectException => + // restart if could not connect to server + restart("Error connecting to " + host + ":" + port, e) + case t: Throwable => + // restart if there is any other error + restart("Error receiving data", t) + } + } +} -This starts with implementing [NetworkReceiver](api/scala/index.html#org.apache.spark.streaming.dstream.NetworkReceiver). +{% endhighlight %} -The following is a simple socket text-stream receiver. +</div> +<div data-lang="java" markdown="1"> + +{% highlight java %} + +public class JavaCustomReceiver extends Receiver<String> { + + String host = null; + int port = -1; + + public JavaCustomReceiver(String host_ , int port_) { + super(StorageLevel.MEMORY_AND_DISK_2()); + host = host_; + port = port_; + } + + public void onStart() { + // Start the thread that receives data over a connection + new Thread() { + @Override public void run() { + receive(); + } + }.start(); + } + + public void onStop() { + // There is nothing much to do as the thread calling receive() + // is designed to stop by itself isStopped() returns false + } + + /** Create a socket connection and receive data until receiver is stopped */ + private void receive() { + Socket socket = null; + String userInput = null; + + try { + // connect to the server + socket = new Socket(host, port); + + BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + + // Until stopped or connection broken continue reading + while (!isStopped() && (userInput = reader.readLine()) != null) { + System.out.println("Received data '" + userInput + "'"); + store(userInput); + } + reader.close(); + socket.close(); + + // Restart in an attempt to connect again when server is active again + restart("Trying to connect again"); + } catch(ConnectException ce) { + // restart if could not connect to server + restart("Could not connect", ce); + } catch(Throwable t) { + // restart if there is any other error + restart("Error receiving data", t); + } + } +} -{% highlight scala %} - class SocketTextStreamReceiver(host: String, port: Int) - extends NetworkReceiver[String] - { - protected lazy val blocksGenerator: BlockGenerator = - new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2) - - protected def onStart() = { - blocksGenerator.start() - val socket = new Socket(host, port) - val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) - var data: String = dataInputStream.readLine() - while (data != null) { - blocksGenerator += data - data = dataInputStream.readLine() - } - } - - protected def onStop() { - blocksGenerator.stop() - } - } {% endhighlight %} +</div> +</div> -All we did here is extended NetworkReceiver and called blockGenerator's API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details. +### Using the custom receiver in a Spark Streaming application -### An Actor as Receiver +The custom receiver can be used in a Spark Streaming application by using +`streamingContext.receiverStream(<instance of custom receiver>)`. This will create +input DStream using data received by the instance of custom receiver, as shown below -This starts with implementing [Actor](#References) - -Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api. +<div class="codetabs"> +<div data-lang="scala" markdown="1" > {% highlight scala %} - class SocketTextStreamReceiver (host:String, - port:Int, - bytesToString: ByteString => String) extends Actor with Receiver { - - override def preStart = IOManager(context.system).connect(host, port) - - def receive = { - case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes)) - } - - } +// Assuming ssc is the StreamingContext +val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port)) +val words = lines.flatMap(_.split(" ")) +... {% endhighlight %} -All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details. - -### A Sample Spark Application +The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala). -* First create a Spark streaming context with master url and batchduration. +</div> +<div data-lang="java" markdown="1"> -{% highlight scala %} - val ssc = new StreamingContext(master, "WordCountCustomStreamSource", - Seconds(batchDuration)) +{% highlight java %} +// Assuming ssc is the JavaStreamingContext +JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port)); +JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... }); +... {% endhighlight %} -* Plug-in the custom receiver into the spark streaming context and create a DStream. +The full source code is in the example [JavaCustomReceiver.java](https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java). -{% highlight scala %} - val lines = ssc.networkStream[String](new SocketTextStreamReceiver( - "localhost", 8445)) -{% endhighlight %} +</div> +</div> -* OR Plug-in the actor as receiver into the spark streaming context and create a DStream. -{% highlight scala %} - val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( - "localhost",8445, z => z.utf8String)),"SocketReceiver") -{% endhighlight %} -* Process it. +### Implementing and Using a Custom Actor-based Receiver -{% highlight scala %} - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) +Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used to +receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper) +trait can be applied on any Akka actor, which allows received data to be stored in Spark using + `store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc. - wordCounts.print() - ssc.start() +{% highlight scala %} +class CustomActor extends Actor with ActorHelper { + def receive = { + case data: String => store(data) + } +} {% endhighlight %} -* After processing it, stream can be tested using the netcat utility. - - $ nc -l localhost 8445 - hello world - hello hello - - -## Multiple Homogeneous/Heterogeneous Receivers. - -A DStream union operation is provided for taking union on multiple input streams. +And a new input stream can be created with this custom actor as {% highlight scala %} - val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( - "localhost",8445, z => z.utf8String)),"SocketReceiver") - - // Another socket stream receiver - val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver( - "localhost",8446, z => z.utf8String)),"SocketReceiver") - - val union = lines.union(lines2) +// Assuming ssc is the StreamingContext +val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver") {% endhighlight %} -Above stream can be easily process as described earlier. - -_A more comprehensive example is provided in the spark streaming examples_ +See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala) +for an end-to-end example. -## References -1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html) -2.[NetworkReceiver](api/scala/index.html#org.apache.spark.streaming.dstream.NetworkReceiver) |