aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-custom-receivers.md
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-07-15 17:25:51 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-08-23 09:38:50 +0530
commit39a1d58da484165790c61a924550b58837997f0d (patch)
treeacb2df42dc328787d1bc4aac4eaf1a778d08ca71 /docs/streaming-custom-receivers.md
parent4698a0d6886905ef21cbd52e108d0dcab3df12df (diff)
downloadspark-39a1d58da484165790c61a924550b58837997f0d.tar.gz
spark-39a1d58da484165790c61a924550b58837997f0d.tar.bz2
spark-39a1d58da484165790c61a924550b58837997f0d.zip
Improved documentation for spark custom receiver
Diffstat (limited to 'docs/streaming-custom-receivers.md')
-rw-r--r--docs/streaming-custom-receivers.md51
1 files changed, 48 insertions, 3 deletions
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index 5476c00d02..dfa343bf94 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -7,10 +7,45 @@ A "Spark Streaming" receiver can be a simple network stream, streams of messages
This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
+### Write a simple receiver
-## A quick and naive walk-through
+This starts with implementing [NetworkReceiver](#References)
-### Write a simple receiver
+Following is a simple socket text-stream receiver.
+
+{% highlight scala %}
+
+ class SocketTextStreamReceiver(host: String,
+ port: Int
+ ) extends NetworkReceiver[String] {
+
+ protected lazy val blocksGenerator: BlockGenerator =
+ new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)
+
+ protected def onStart() = {
+ blocksGenerator.start()
+ val socket = new Socket(host, port)
+ val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
+ var data: String = dataInputStream.readLine()
+ while (data != null) {
+ blocksGenerator += data
+ data = dataInputStream.readLine()
+ }
+ }
+
+ protected def onStop() {
+ blocksGenerator.stop()
+ }
+
+ }
+
+{% endhighlight %}
+
+
+All we did here is extended NetworkReceiver and called blockGenerator's API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details.
+
+
+### An Actor as Receiver.
This starts with implementing [Actor](#References)
@@ -46,7 +81,16 @@ All we did here is mixed in trait Receiver and called pushBlock api method to pu
{% endhighlight %}
-* Plug-in the actor configuration into the spark streaming context and create a DStream.
+* Plug-in the custom receiver into the spark streaming context and create a DStream.
+
+{% highlight scala %}
+
+ val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
+ "localhost", 8445))
+
+{% endhighlight %}
+
+* OR Plug-in the actor as receiver into the spark streaming context and create a DStream.
{% highlight scala %}
@@ -99,3 +143,4 @@ _A more comprehensive example is provided in the spark streaming examples_
## References
1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
+2.[NetworkReceiver](http://spark-project.org/docs/latest/api/streaming/index.html#spark.streaming.dstream.NetworkReceiver)