aboutsummaryrefslogtreecommitdiff
path: root/external/zeromq/src/main
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-07 15:26:55 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-01-07 15:26:55 -0800
commitc0c397509bc909b9bf2d5186182f461155b021ab (patch)
tree7e223edb4ee97eb0ac9bbd0e20a1fccac32e10be /external/zeromq/src/main
parent34dbc8af21da63702bc0694d471fbfee4cd08dda (diff)
downloadspark-c0c397509bc909b9bf2d5186182f461155b021ab.tar.gz
spark-c0c397509bc909b9bf2d5186182f461155b021ab.tar.bz2
spark-c0c397509bc909b9bf2d5186182f461155b021ab.zip
[SPARK-12510][STREAMING] Refactor ActorReceiver to support Java
This PR includes the following changes: 1. Rename `ActorReceiver` to `ActorReceiverSupervisor` 2. Remove `ActorHelper` 3. Add a new `ActorReceiver` for Scala and `JavaActorReceiver` for Java 4. Add `JavaActorWordCount` example Author: Shixiong Zhu <shixiong@databricks.com> Closes #10457 from zsxwing/java-actor-stream.
Diffstat (limited to 'external/zeromq/src/main')
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala5
1 files changed, 2 insertions, 3 deletions
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 588e6bac7b..506ba8782d 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -19,12 +19,11 @@ package org.apache.spark.streaming.zeromq
import scala.reflect.ClassTag
-import akka.actor.Actor
import akka.util.ByteString
import akka.zeromq._
import org.apache.spark.Logging
-import org.apache.spark.streaming.receiver.ActorHelper
+import org.apache.spark.streaming.receiver.ActorReceiver
/**
* A receiver to subscribe to ZeroMQ stream.
@@ -33,7 +32,7 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] => Iterator[T])
- extends Actor with ActorHelper with Logging {
+ extends ActorReceiver with Logging {
override def preStart(): Unit = {
ZeroMQExtension(context.system)