diff options
Diffstat (limited to 'docs/streaming-custom-receivers.md')
-rw-r--r-- | docs/streaming-custom-receivers.md | 49 |
1 files changed, 39 insertions, 10 deletions
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 97db865daa..95b99862ec 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -257,25 +257,54 @@ The following table summarizes the characteristics of both types of receivers ## Implementing and Using a Custom Actor-based Receiver +<div class="codetabs"> +<div data-lang="scala" markdown="1" > + Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to -receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper) -trait can be mixed in to any Akka actor, which allows received data to be stored in Spark using - `store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc. +receive data. Extending [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver) +allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of +this actor can be configured to handle failures, etc. {% highlight scala %} -class CustomActor extends Actor with ActorHelper { + +class CustomActor extends ActorReceiver { def receive = { case data: String => store(data) } } + +// A new input stream can be created with this custom actor as +val ssc: StreamingContext = ... +val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver") + {% endhighlight %} -And a new input stream can be created with this custom actor as +See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example. +</div> +<div data-lang="java" markdown="1"> + +Custom [Akka UntypedActors](http://doc.akka.io/docs/akka/2.3.11/java/untyped-actors.html) can also be used to +receive data. Extending [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver) +allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of +this actor can be configured to handle failures, etc. + +{% highlight java %} + +class CustomActor extends JavaActorReceiver { + @Override + public void onReceive(Object msg) throws Exception { + store((String) msg); + } +} + +// A new input stream can be created with this custom actor as +JavaStreamingContext jssc = ...; +JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver"); -{% highlight scala %} -val ssc: StreamingContext = ... -val lines = ssc.actorStream[String](Props[CustomActor], "CustomReceiver") {% endhighlight %} -See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) -for an end-to-end example. +See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example. +</div> +</div> + +<span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala libraries, AkkaUtils is not available in the Python API. |