From 5ab37be9831e8a70b2502b14aed1c87cb002a189 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 24 Feb 2013 16:24:52 -0800 Subject: Fixed class paths and dependencies based on Matei's comments. --- docs/custom-streaming-receiver.md | 101 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 docs/custom-streaming-receiver.md (limited to 'docs/custom-streaming-receiver.md') diff --git a/docs/custom-streaming-receiver.md b/docs/custom-streaming-receiver.md new file mode 100644 index 0000000000..0eb4246158 --- /dev/null +++ b/docs/custom-streaming-receiver.md @@ -0,0 +1,101 @@ +--- +layout: global +title: Tutorial - Spark streaming, Plugging in a custom receiver. +--- + +A "Spark streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need. + +This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application. + + +## A quick and naive walk-through + +### Write a simple receiver + +This starts with implementing [Actor](#References) + +Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api. + +{% highlight scala %} + + class SocketTextStreamReceiver (host:String, + port:Int, + bytesToString: ByteString => String) extends Actor with Receiver { + + override def preStart = IOManager(context.system).connect(host, port) + + def receive = { + case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes)) + } + + } + + +{% endhighlight %} + +All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details. + +### A sample spark application + +* First create a Spark streaming context with master url and batchduration. + +{% highlight scala %} + + val ssc = new StreamingContext(master, "WordCountCustomStreamSource", + Seconds(batchDuration)) + +{% endhighlight %} + +* Plug-in the actor configuration into the spark streaming context and create a DStream. + +{% highlight scala %} + + val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8445, z => z.utf8String)),"SocketReceiver") + +{% endhighlight %} + +* Process it. + +{% highlight scala %} + + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + + wordCounts.print() + ssc.start() + + +{% endhighlight %} + +* After processing it, stream can be tested using the netcat utility. + + $ nc -l localhost 8445 + hello world + hello hello + + +## Multiple homogeneous/heterogeneous receivers. + +A DStream union operation is provided for taking union on multiple input streams. + +{% highlight scala %} + + val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8445, z => z.utf8String)),"SocketReceiver") + + // Another socket stream receiver + val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8446, z => z.utf8String)),"SocketReceiver") + + val union = lines.union(lines2) + +{% endhighlight %} + +Above stream can be easily process as described earlier. + +_A more comprehensive example is provided in the spark streaming examples_ + +## References + +1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html) -- cgit v1.2.3