aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/streaming-custom-receivers.md8
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala10
2 files changed, 8 insertions, 10 deletions
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index a75587a92a..97db865daa 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -257,9 +257,9 @@ The following table summarizes the characteristics of both types of receivers
## Implementing and Using a Custom Actor-based Receiver
-Custom [Akka Actors](http://doc.akka.io/docs/akka/2.2.4/scala/actors.html) can also be used to
+Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to
receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
-trait can be applied on any Akka actor, which allows received data to be stored in Spark using
+trait can be mixed in to any Akka actor, which allows received data to be stored in Spark using
`store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc.
{% highlight scala %}
@@ -273,8 +273,8 @@ class CustomActor extends Actor with ActorHelper {
And a new input stream can be created with this custom actor as
{% highlight scala %}
-// Assuming ssc is the StreamingContext
-val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")
+val ssc: StreamingContext = ...
+val lines = ssc.actorStream[String](Props[CustomActor], "CustomReceiver")
{% endhighlight %}
See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala)
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
index 8b8dae0be6..a47fb7b7d7 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
@@ -62,15 +62,13 @@ class FeederActor extends Actor {
}.start()
def receive: Receive = {
-
case SubscribeReceiver(receiverActor: ActorRef) =>
println("received subscribe from %s".format(receiverActor.toString))
- receivers = LinkedList(receiverActor) ++ receivers
+ receivers = LinkedList(receiverActor) ++ receivers
case UnsubscribeReceiver(receiverActor: ActorRef) =>
println("received unsubscribe from %s".format(receiverActor.toString))
- receivers = receivers.dropWhile(x => x eq receiverActor)
-
+ receivers = receivers.dropWhile(x => x eq receiverActor)
}
}
@@ -129,9 +127,9 @@ object FeederActor {
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
*
* To run this example locally, you may run Feeder Actor as
- * `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999`
+ * `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.0.1 9999`
* and then run the example
- * `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 127.0.1.1 9999`
+ * `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 127.0.0.1 9999`
*/
object ActorWordCount {
def main(args: Array[String]) {