diff options
author | Reynold Xin <rxin@cs.berkeley.edu> | 2013-03-18 18:30:14 +0800 |
---|---|---|
committer | Reynold Xin <rxin@cs.berkeley.edu> | 2013-03-18 18:30:14 +0800 |
commit | ba9d00c44a42fea41e136384d884242cce93056e (patch) | |
tree | fc725976a8ade33b55c7c991a2691683fa2023cc /docs/streaming-custom-receivers.md | |
parent | 19d3b059e3622e5e20b56b4bf1e46cea22b6bee7 (diff) | |
parent | c1e9cdc49f89222b366a14a20ffd937ca0fb9adc (diff) | |
download | spark-ba9d00c44a42fea41e136384d884242cce93056e.tar.gz spark-ba9d00c44a42fea41e136384d884242cce93056e.tar.bz2 spark-ba9d00c44a42fea41e136384d884242cce93056e.zip |
Merge branch 'master' into graph
Conflicts:
run2.cmd
Diffstat (limited to 'docs/streaming-custom-receivers.md')
-rw-r--r-- | docs/streaming-custom-receivers.md | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md new file mode 100644 index 0000000000..5476c00d02 --- /dev/null +++ b/docs/streaming-custom-receivers.md @@ -0,0 +1,101 @@ +--- +layout: global +title: Tutorial - Spark Streaming, Plugging in a custom receiver. +--- + +A "Spark Streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need. + +This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application. + + +## A quick and naive walk-through + +### Write a simple receiver + +This starts with implementing [Actor](#References) + +Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api. + +{% highlight scala %} + + class SocketTextStreamReceiver (host:String, + port:Int, + bytesToString: ByteString => String) extends Actor with Receiver { + + override def preStart = IOManager(context.system).connect(host, port) + + def receive = { + case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes)) + } + + } + + +{% endhighlight %} + +All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details. + +### A sample spark application + +* First create a Spark streaming context with master url and batchduration. + +{% highlight scala %} + + val ssc = new StreamingContext(master, "WordCountCustomStreamSource", + Seconds(batchDuration)) + +{% endhighlight %} + +* Plug-in the actor configuration into the spark streaming context and create a DStream. + +{% highlight scala %} + + val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8445, z => z.utf8String)),"SocketReceiver") + +{% endhighlight %} + +* Process it. + +{% highlight scala %} + + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + + wordCounts.print() + ssc.start() + + +{% endhighlight %} + +* After processing it, stream can be tested using the netcat utility. + + $ nc -l localhost 8445 + hello world + hello hello + + +## Multiple homogeneous/heterogeneous receivers. + +A DStream union operation is provided for taking union on multiple input streams. + +{% highlight scala %} + + val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8445, z => z.utf8String)),"SocketReceiver") + + // Another socket stream receiver + val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver( + "localhost",8446, z => z.utf8String)),"SocketReceiver") + + val union = lines.union(lines2) + +{% endhighlight %} + +Above stream can be easily process as described earlier. + +_A more comprehensive example is provided in the spark streaming examples_ + +## References + +1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html) |