diff options
Diffstat (limited to 'docs')
-rw-r--r-- | docs/plugin-custom-receiver.md | 14 |
1 files changed, 7 insertions, 7 deletions
diff --git a/docs/plugin-custom-receiver.md b/docs/plugin-custom-receiver.md index 41e6a17e2c..0eb4246158 100644 --- a/docs/plugin-custom-receiver.md +++ b/docs/plugin-custom-receiver.md @@ -20,20 +20,20 @@ Following is a simple socket text-stream receiver, which is appearently overly s class SocketTextStreamReceiver (host:String, port:Int, - bytesToString: ByteString => String) extends Actor { + bytesToString: ByteString => String) extends Actor with Receiver { override def preStart = IOManager(context.system).connect(host, port) def receive = { - case IO.Read(socket, bytes) => context.parent ! Data(bytesToString(bytes)) + case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes)) } + } {% endhighlight %} - -_Please see implementations of NetworkReceiver for more generic NetworkReceivers._ +All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details. ### A sample spark application @@ -50,7 +50,7 @@ _Please see implementations of NetworkReceiver for more generic NetworkReceivers {% highlight scala %} - val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver( + val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( "localhost",8445, z => z.utf8String)),"SocketReceiver") {% endhighlight %} @@ -81,11 +81,11 @@ A DStream union operation is provided for taking union on multiple input streams {% highlight scala %} - val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver( + val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( "localhost",8445, z => z.utf8String)),"SocketReceiver") // Another socket stream receiver - val lines2 = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver( + val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver( "localhost",8446, z => z.utf8String)),"SocketReceiver") val union = lines.union(lines2) |