aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-07 15:26:55 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-01-07 15:26:55 -0800
commitc0c397509bc909b9bf2d5186182f461155b021ab (patch)
tree7e223edb4ee97eb0ac9bbd0e20a1fccac32e10be /streaming
parent34dbc8af21da63702bc0694d471fbfee4cd08dda (diff)
downloadspark-c0c397509bc909b9bf2d5186182f461155b021ab.tar.gz
spark-c0c397509bc909b9bf2d5186182f461155b021ab.tar.bz2
spark-c0c397509bc909b9bf2d5186182f461155b021ab.zip
[SPARK-12510][STREAMING] Refactor ActorReceiver to support Java
This PR includes the following changes: 1. Rename `ActorReceiver` to `ActorReceiverSupervisor` 2. Remove `ActorHelper` 3. Add a new `ActorReceiver` for Scala and `JavaActorReceiver` for Java 4. Add `JavaActorWordCount` example Author: Shixiong Zhu <shixiong@databricks.com> Closes #10457 from zsxwing/java-actor-stream.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala64
2 files changed, 56 insertions, 12 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ca0a21fbb7..ba509a1030 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -41,7 +41,7 @@ import org.apache.spark.serializer.SerializationDebugger
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
+import org.apache.spark.streaming.receiver.{ActorReceiverSupervisor, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.{AsynchronousListenerBus, CallSite, ShutdownHookManager, ThreadUtils, Utils}
@@ -312,7 +312,7 @@ class StreamingContext private[streaming] (
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
): ReceiverInputDStream[T] = withNamedScope("actor stream") {
- receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
+ receiverStream(new ActorReceiverSupervisor[T](props, name, storageLevel, supervisorStrategy))
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
index 7ec74016a1..0eabf3d260 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
@@ -47,13 +47,12 @@ object ActorSupervisorStrategy {
/**
* :: DeveloperApi ::
- * A receiver trait to be mixed in with your Actor to gain access to
- * the API for pushing received data into Spark Streaming for being processed.
+ * A base Actor that provides APIs for pushing received data into Spark Streaming for processing.
*
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
*
* @example {{{
- * class MyActor extends Actor with ActorHelper{
+ * class MyActor extends ActorReceiver {
* def receive {
* case anything: String => store(anything)
* }
@@ -69,13 +68,60 @@ object ActorSupervisorStrategy {
* should be same.
*/
@DeveloperApi
-trait ActorHelper extends Logging{
+abstract class ActorReceiver extends Actor {
- self: Actor => // to ensure that this can be added to Actor classes only
+ /** Store an iterator of received data as a data block into Spark's memory. */
+ def store[T](iter: Iterator[T]) {
+ context.parent ! IteratorData(iter)
+ }
+
+ /**
+ * Store the bytes of received data as a data block into Spark's memory. Note
+ * that the data in the ByteBuffer must be serialized using the same serializer
+ * that Spark is configured to use.
+ */
+ def store(bytes: ByteBuffer) {
+ context.parent ! ByteBufferData(bytes)
+ }
+
+ /**
+ * Store a single item of received data to Spark's memory.
+ * These single items will be aggregated together into data blocks before
+ * being pushed into Spark's memory.
+ */
+ def store[T](item: T) {
+ context.parent ! SingleItemData(item)
+ }
+}
+
+/**
+ * :: DeveloperApi ::
+ * A Java UntypedActor that provides APIs for pushing received data into Spark Streaming for
+ * processing.
+ *
+ * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ *
+ * @example {{{
+ * class MyActor extends JavaActorReceiver {
+ * def receive {
+ * case anything: String => store(anything)
+ * }
+ * }
+ *
+ * // Can be used with an actorStream as follows
+ * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
+ *
+ * }}}
+ *
+ * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of push block and InputDStream
+ * should be same.
+ */
+@DeveloperApi
+abstract class JavaActorReceiver extends UntypedActor {
/** Store an iterator of received data as a data block into Spark's memory. */
def store[T](iter: Iterator[T]) {
- logDebug("Storing iterator")
context.parent ! IteratorData(iter)
}
@@ -85,7 +131,6 @@ trait ActorHelper extends Logging{
* that Spark is configured to use.
*/
def store(bytes: ByteBuffer) {
- logDebug("Storing Bytes")
context.parent ! ByteBufferData(bytes)
}
@@ -95,7 +140,6 @@ trait ActorHelper extends Logging{
* being pushed into Spark's memory.
*/
def store[T](item: T) {
- logDebug("Storing item")
context.parent ! SingleItemData(item)
}
}
@@ -104,7 +148,7 @@ trait ActorHelper extends Logging{
* :: DeveloperApi ::
* Statistics for querying the supervisor about state of workers. Used in
* conjunction with `StreamingContext.actorStream` and
- * [[org.apache.spark.streaming.receiver.ActorHelper]].
+ * [[org.apache.spark.streaming.receiver.ActorReceiver]].
*/
@DeveloperApi
case class Statistics(numberOfMsgs: Int,
@@ -137,7 +181,7 @@ private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorRec
* context.parent ! Props(new Worker, "Worker")
* }}}
*/
-private[streaming] class ActorReceiver[T: ClassTag](
+private[streaming] class ActorReceiverSupervisor[T: ClassTag](
props: Props,
name: String,
storageLevel: StorageLevel,