From 39a1d58da484165790c61a924550b58837997f0d Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Mon, 15 Jul 2013 17:25:51 +0530 Subject: Improved documentation for spark custom receiver --- docs/streaming-custom-receivers.md | 51 +++++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) (limited to 'docs/streaming-custom-receivers.md') diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 5476c00d02..dfa343bf94 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -7,10 +7,45 @@ A "Spark Streaming" receiver can be a simple network stream, streams of messages This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application. +### Write a simple receiver -## A quick and naive walk-through +This starts with implementing [NetworkReceiver](#References) -### Write a simple receiver +Following is a simple socket text-stream receiver. + +{% highlight scala %} + + class SocketTextStreamReceiver(host: String, + port: Int + ) extends NetworkReceiver[String] { + + protected lazy val blocksGenerator: BlockGenerator = + new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2) + + protected def onStart() = { + blocksGenerator.start() + val socket = new Socket(host, port) + val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) + var data: String = dataInputStream.readLine() + while (data != null) { + blocksGenerator += data + data = dataInputStream.readLine() + } + } + + protected def onStop() { + blocksGenerator.stop() + } + + } + +{% endhighlight %} + + +All we did here is extended NetworkReceiver and called blockGenerator's API method (i.e. +=) to push our blocks of data. Please refer to scala-docs of NetworkReceiver for more details. + + +### An Actor as Receiver. This starts with implementing [Actor](#References) @@ -46,7 +81,16 @@ All we did here is mixed in trait Receiver and called pushBlock api method to pu {% endhighlight %} -* Plug-in the actor configuration into the spark streaming context and create a DStream. +* Plug-in the custom receiver into the spark streaming context and create a DStream. + +{% highlight scala %} + + val lines = ssc.networkStream[String](new SocketTextStreamReceiver( + "localhost", 8445)) + +{% endhighlight %} + +* OR Plug-in the actor as receiver into the spark streaming context and create a DStream. {% highlight scala %} @@ -99,3 +143,4 @@ _A more comprehensive example is provided in the spark streaming examples_ ## References 1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html) +2.[NetworkReceiver](http://spark-project.org/docs/latest/api/streaming/index.html#spark.streaming.dstream.NetworkReceiver) -- cgit v1.2.3