aboutsummaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
Diffstat (limited to 'docs')
-rw-r--r--docs/plugin-custom-receiver.md14
1 files changed, 7 insertions, 7 deletions
diff --git a/docs/plugin-custom-receiver.md b/docs/plugin-custom-receiver.md
index 41e6a17e2c..0eb4246158 100644
--- a/docs/plugin-custom-receiver.md
+++ b/docs/plugin-custom-receiver.md
@@ -20,20 +20,20 @@ Following is a simple socket text-stream receiver, which is appearently overly s
class SocketTextStreamReceiver (host:String,
port:Int,
- bytesToString: ByteString => String) extends Actor {
+ bytesToString: ByteString => String) extends Actor with Receiver {
override def preStart = IOManager(context.system).connect(host, port)
def receive = {
- case IO.Read(socket, bytes) => context.parent ! Data(bytesToString(bytes))
+ case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
}
+
}
{% endhighlight %}
-
-_Please see implementations of NetworkReceiver for more generic NetworkReceivers._
+All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details.
### A sample spark application
@@ -50,7 +50,7 @@ _Please see implementations of NetworkReceiver for more generic NetworkReceivers
{% highlight scala %}
- val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
+ val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
{% endhighlight %}
@@ -81,11 +81,11 @@ A DStream union operation is provided for taking union on multiple input streams
{% highlight scala %}
- val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
+ val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
// Another socket stream receiver
- val lines2 = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
+ val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8446, z => z.utf8String)),"SocketReceiver")
val union = lines.union(lines2)