diff options
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala | 6 |
1 files changed, 3 insertions, 3 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala index a22e64ca3c..eb44768b9c 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala @@ -26,8 +26,8 @@ import akka.actor.{Actor, ActorRef, Props, actorRef2Scala} import org.apache.spark.{SparkConf, SecurityManager} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions -import org.apache.spark.streaming.receivers.Receiver import org.apache.spark.util.AkkaUtils +import org.apache.spark.streaming.receiver.ActorHelper case class SubscribeReceiver(receiverActor: ActorRef) case class UnsubscribeReceiver(receiverActor: ActorRef) @@ -81,14 +81,14 @@ class FeederActor extends Actor { * @see [[org.apache.spark.streaming.examples.FeederActor]] */ class SampleActorReceiver[T: ClassTag](urlOfPublisher: String) -extends Actor with Receiver { +extends Actor with ActorHelper { lazy private val remotePublisher = context.actorSelection(urlOfPublisher) override def preStart = remotePublisher ! SubscribeReceiver(context.self) def receive = { - case msg => pushBlock(msg.asInstanceOf[T]) + case msg => store(msg.asInstanceOf[T]) } override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self) |