diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-01-20 13:55:41 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-01-20 13:55:41 -0800 |
commit | b7d74a602f622d8e105b349bd6d17ba42e7668dc (patch) | |
tree | 118deb532942513693e60f851dde638d7fa818cd /docs | |
parent | 944fdadf77523570f6b33544ad0b388031498952 (diff) | |
download | spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.gz spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.bz2 spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.zip |
[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project
Include the following changes:
1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream
2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream"
3. Update the ActorWordCount example and add the JavaActorWordCount example
4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10744 from zsxwing/streaming-akka-2.
Diffstat (limited to 'docs')
-rw-r--r-- | docs/streaming-custom-receivers.md | 49 | ||||
-rw-r--r-- | docs/streaming-programming-guide.md | 4 |
2 files changed, 41 insertions, 12 deletions
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 97db865daa..95b99862ec 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -257,25 +257,54 @@ The following table summarizes the characteristics of both types of receivers ## Implementing and Using a Custom Actor-based Receiver +<div class="codetabs"> +<div data-lang="scala" markdown="1" > + Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to -receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper) -trait can be mixed in to any Akka actor, which allows received data to be stored in Spark using - `store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc. +receive data. Extending [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver) +allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of +this actor can be configured to handle failures, etc. {% highlight scala %} -class CustomActor extends Actor with ActorHelper { + +class CustomActor extends ActorReceiver { def receive = { case data: String => store(data) } } + +// A new input stream can be created with this custom actor as +val ssc: StreamingContext = ... +val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver") + {% endhighlight %} -And a new input stream can be created with this custom actor as +See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example. +</div> +<div data-lang="java" markdown="1"> + +Custom [Akka UntypedActors](http://doc.akka.io/docs/akka/2.3.11/java/untyped-actors.html) can also be used to +receive data. Extending [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver) +allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of +this actor can be configured to handle failures, etc. + +{% highlight java %} + +class CustomActor extends JavaActorReceiver { + @Override + public void onReceive(Object msg) throws Exception { + store((String) msg); + } +} + +// A new input stream can be created with this custom actor as +JavaStreamingContext jssc = ...; +JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver"); -{% highlight scala %} -val ssc: StreamingContext = ... -val lines = ssc.actorStream[String](Props[CustomActor], "CustomReceiver") {% endhighlight %} -See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) -for an end-to-end example. +See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example. +</div> +</div> + +<span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala libraries, AkkaUtils is not available in the Python API. diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 8fd075d02b..93c34efb66 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -659,11 +659,11 @@ methods for creating DStreams from files and Akka actors as input sources. <span class="badge" style="background-color: grey">Python API</span> `fileStream` is not available in the Python API, only `textFileStream` is available. - **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka - actors by using `streamingContext.actorStream(actorProps, actor-name)`. See the [Custom Receiver + actors by using `AkkaUtils.createStream(ssc, actorProps, actor-name)`. See the [Custom Receiver Guide](streaming-custom-receivers.html) for more details. <span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala - libraries, `actorStream` is not available in the Python API. + libraries, `AkkaUtils.createStream` is not available in the Python API. - **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream. |