diff options
Diffstat (limited to 'docs/streaming-custom-receivers.md')
-rw-r--r-- | docs/streaming-custom-receivers.md | 90 |
1 files changed, 76 insertions, 14 deletions
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 27cd085782..6a2048121f 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -7,25 +7,30 @@ Spark Streaming can receive streaming data from any arbitrary data source beyond the one's for which it has in-built support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.). This requires the developer to implement a *receiver* that is customized for receiving data from the concerned data source. This guide walks through the process of implementing a custom receiver -and using it in a Spark Streaming application. +and using it in a Spark Streaming application. Note that custom receivers can be implemented +in Scala or Java. -### Implementing a Custom Receiver +## Implementing a Custom Receiver -This starts with implementing a [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver). +This starts with implementing a **Receiver** +([Scala doc](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver), +[Java doc](api/java/org/apache/spark/streaming/receiver/Receiver.html)). A custom receiver must extend this abstract class by implementing two methods + - `onStart()`: Things to do to start receiving data. - `onStop()`: Things to do to stop receiving data. -Note that `onStart()` and `onStop()` must not block indefinitely. Typically, onStart() would start the threads +Both `onStart()` and `onStop()` must not block indefinitely. Typically, `onStart()` would start the threads that responsible for receiving the data and `onStop()` would ensure that the receiving by those threads are stopped. The receiving threads can also use `isStopped()`, a `Receiver` method, to check whether they should stop receiving data. Once the data is received, that data can be stored inside Spark -by calling `store(data)`, which is a method provided by the -[Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) class. +by calling `store(data)`, which is a method provided by the Receiver class. There are number of flavours of `store()` which allow you store the received data -record-at-a-time or as whole collection of objects / serialized bytes. +record-at-a-time or as whole collection of objects / serialized bytes. Note that the flavour of +`store()` used to implemented a receiver affects its reliability and fault-tolerance semantics. +This is discussed [later](#receiver-reliability) in more detail. Any exception in the receiving threads should be caught and handled properly to avoid silent failures of the receiver. `restart(<exception>)` will restart the receiver by @@ -158,7 +163,7 @@ public class JavaCustomReceiver extends Receiver<String> { </div> -### Using the custom receiver in a Spark Streaming application +## Using the custom receiver in a Spark Streaming application The custom receiver can be used in a Spark Streaming application by using `streamingContext.receiverStream(<instance of custom receiver>)`. This will create @@ -191,9 +196,68 @@ The full source code is in the example [JavaCustomReceiver.java](https://github. </div> </div> - - -### Implementing and Using a Custom Actor-based Receiver +## Receiver Reliability +As discussed in brief in the +[Spark Streaming Programming Guide](streaming-programming-guide.html#receiver-reliability), +there are two kinds of receivers based on their reliability and fault-tolerance semantics. + +1. *Reliable Receiver* - For *reliable sources* that allow sent data to be acknowledged, a + *reliable receiver* correctly acknowledges to the source that the data has been received + and stored in Spark reliably (that is, replicated successfully). Usually, + implementing this receiver involves careful consideration of the semantics of source + acknowledgements. +1. *Unreliable Receiver* - These are receivers for unreliable sources that do not support + acknowledging. Even for reliable sources, one may implement an unreliable receiver that + do not go into the complexity of acknowledging correctly. + +To implement a *reliable receiver*, you have to use `store(multiple-records)` to store data. +This flavour of `store` is a blocking call which returns only after all the given records have +been stored inside Spark. If the receiver's configured storage level uses replication +(enabled by default), then this call returns after replication has completed. +Thus it ensures that the data is reliably stored, and the receiver can now acknowledge the +source appropriately. This ensures that no data is caused when the receiver fails in the middle +of replicating data -- the buffered data will not be acknowledged and hence will be later resent +by the source. + +An *unreliable receiver* does not have to implement any of this logic. It can simply receive +records from the source and insert them one-at-a-time using `store(single-record)`. While it does +not get the reliability guarantees of `store(multiple-records)`, it has the following advantages. + +- The system takes care of chunking that data into appropriate sized blocks (look for block +interval in the [Spark Streaming Programming Guide](streaming-programming-guide.html)). +- The system takes care of controlling the receiving rates if the rate limits have been specified. +- Because of these two, unreliable receivers are simpler to implement than reliable receivers. + +The following table summarizes the characteristics of both types of receivers + +<table class="table"> +<tr> + <th>Receiver Type</th> + <th>Characteristics</th> +</tr> +<tr> + <td><b>Unreliable Receivers</b></td> + <td> + Simple to implement.<br> + System takes care of block generation and rate control. + No fault-tolerance guarantees, can lose data on receiver failure. + </td> +</tr> +<tr> + <td><b>Reliable Receivers</b></td> + <td> + Strong fault-tolerance guarantees, can ensure zero data loss.<br/> + Block generation and rate control to be handled by the receiver implementation.<br/> + Implementation complexity depends on the acknowledgement mechanisms of the source. + </td> +</tr> +<tr> + <td></td> + <td></td> +</tr> +</table> + +## Implementing and Using a Custom Actor-based Receiver Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used to receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper) @@ -203,7 +267,7 @@ trait can be applied on any Akka actor, which allows received data to be stored {% highlight scala %} class CustomActor extends Actor with ActorHelper { def receive = { - case data: String => store(data) + case data: String => store(data) } } {% endhighlight %} @@ -217,5 +281,3 @@ val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver") See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example. - - |