aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala6
1 files changed, 3 insertions, 3 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index a22e64ca3c..eb44768b9c 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -26,8 +26,8 @@ import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
import org.apache.spark.{SparkConf, SecurityManager}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
-import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.util.AkkaUtils
+import org.apache.spark.streaming.receiver.ActorHelper
case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)
@@ -81,14 +81,14 @@ class FeederActor extends Actor {
* @see [[org.apache.spark.streaming.examples.FeederActor]]
*/
class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
-extends Actor with Receiver {
+extends Actor with ActorHelper {
lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
override def preStart = remotePublisher ! SubscribeReceiver(context.self)
def receive = {
- case msg => pushBlock(msg.asInstanceOf[T])
+ case msg => store(msg.asInstanceOf[T])
}
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)