aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-custom-receivers.md
diff options
context:
space:
mode:
Diffstat (limited to 'docs/streaming-custom-receivers.md')
-rw-r--r--docs/streaming-custom-receivers.md90
1 files changed, 76 insertions, 14 deletions
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index 27cd085782..6a2048121f 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -7,25 +7,30 @@ Spark Streaming can receive streaming data from any arbitrary data source beyond
the one's for which it has in-built support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.).
This requires the developer to implement a *receiver* that is customized for receiving data from
the concerned data source. This guide walks through the process of implementing a custom receiver
-and using it in a Spark Streaming application.
+and using it in a Spark Streaming application. Note that custom receivers can be implemented
+in Scala or Java.
-### Implementing a Custom Receiver
+## Implementing a Custom Receiver
-This starts with implementing a [Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver).
+This starts with implementing a **Receiver**
+([Scala doc](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver),
+[Java doc](api/java/org/apache/spark/streaming/receiver/Receiver.html)).
A custom receiver must extend this abstract class by implementing two methods
+
- `onStart()`: Things to do to start receiving data.
- `onStop()`: Things to do to stop receiving data.
-Note that `onStart()` and `onStop()` must not block indefinitely. Typically, onStart() would start the threads
+Both `onStart()` and `onStop()` must not block indefinitely. Typically, `onStart()` would start the threads
that responsible for receiving the data and `onStop()` would ensure that the receiving by those threads
are stopped. The receiving threads can also use `isStopped()`, a `Receiver` method, to check whether they
should stop receiving data.
Once the data is received, that data can be stored inside Spark
-by calling `store(data)`, which is a method provided by the
-[Receiver](api/scala/index.html#org.apache.spark.streaming.receiver.Receiver) class.
+by calling `store(data)`, which is a method provided by the Receiver class.
There are number of flavours of `store()` which allow you store the received data
-record-at-a-time or as whole collection of objects / serialized bytes.
+record-at-a-time or as whole collection of objects / serialized bytes. Note that the flavour of
+`store()` used to implemented a receiver affects its reliability and fault-tolerance semantics.
+This is discussed [later](#receiver-reliability) in more detail.
Any exception in the receiving threads should be caught and handled properly to avoid silent
failures of the receiver. `restart(<exception>)` will restart the receiver by
@@ -158,7 +163,7 @@ public class JavaCustomReceiver extends Receiver<String> {
</div>
-### Using the custom receiver in a Spark Streaming application
+## Using the custom receiver in a Spark Streaming application
The custom receiver can be used in a Spark Streaming application by using
`streamingContext.receiverStream(<instance of custom receiver>)`. This will create
@@ -191,9 +196,68 @@ The full source code is in the example [JavaCustomReceiver.java](https://github.
</div>
</div>
-
-
-### Implementing and Using a Custom Actor-based Receiver
+## Receiver Reliability
+As discussed in brief in the
+[Spark Streaming Programming Guide](streaming-programming-guide.html#receiver-reliability),
+there are two kinds of receivers based on their reliability and fault-tolerance semantics.
+
+1. *Reliable Receiver* - For *reliable sources* that allow sent data to be acknowledged, a
+ *reliable receiver* correctly acknowledges to the source that the data has been received
+ and stored in Spark reliably (that is, replicated successfully). Usually,
+ implementing this receiver involves careful consideration of the semantics of source
+ acknowledgements.
+1. *Unreliable Receiver* - These are receivers for unreliable sources that do not support
+ acknowledging. Even for reliable sources, one may implement an unreliable receiver that
+ do not go into the complexity of acknowledging correctly.
+
+To implement a *reliable receiver*, you have to use `store(multiple-records)` to store data.
+This flavour of `store` is a blocking call which returns only after all the given records have
+been stored inside Spark. If the receiver's configured storage level uses replication
+(enabled by default), then this call returns after replication has completed.
+Thus it ensures that the data is reliably stored, and the receiver can now acknowledge the
+source appropriately. This ensures that no data is caused when the receiver fails in the middle
+of replicating data -- the buffered data will not be acknowledged and hence will be later resent
+by the source.
+
+An *unreliable receiver* does not have to implement any of this logic. It can simply receive
+records from the source and insert them one-at-a-time using `store(single-record)`. While it does
+not get the reliability guarantees of `store(multiple-records)`, it has the following advantages.
+
+- The system takes care of chunking that data into appropriate sized blocks (look for block
+interval in the [Spark Streaming Programming Guide](streaming-programming-guide.html)).
+- The system takes care of controlling the receiving rates if the rate limits have been specified.
+- Because of these two, unreliable receivers are simpler to implement than reliable receivers.
+
+The following table summarizes the characteristics of both types of receivers
+
+<table class="table">
+<tr>
+ <th>Receiver Type</th>
+ <th>Characteristics</th>
+</tr>
+<tr>
+ <td><b>Unreliable Receivers</b></td>
+ <td>
+ Simple to implement.<br>
+ System takes care of block generation and rate control.
+ No fault-tolerance guarantees, can lose data on receiver failure.
+ </td>
+</tr>
+<tr>
+ <td><b>Reliable Receivers</b></td>
+ <td>
+ Strong fault-tolerance guarantees, can ensure zero data loss.<br/>
+ Block generation and rate control to be handled by the receiver implementation.<br/>
+ Implementation complexity depends on the acknowledgement mechanisms of the source.
+ </td>
+</tr>
+<tr>
+ <td></td>
+ <td></td>
+</tr>
+</table>
+
+## Implementing and Using a Custom Actor-based Receiver
Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used to
receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
@@ -203,7 +267,7 @@ trait can be applied on any Akka actor, which allows received data to be stored
{% highlight scala %}
class CustomActor extends Actor with ActorHelper {
def receive = {
- case data: String => store(data)
+ case data: String => store(data)
}
}
{% endhighlight %}
@@ -217,5 +281,3 @@ val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")
See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala)
for an end-to-end example.
-
-