diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-01-07 15:26:55 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-01-07 15:26:55 -0800 |
commit | c0c397509bc909b9bf2d5186182f461155b021ab (patch) | |
tree | 7e223edb4ee97eb0ac9bbd0e20a1fccac32e10be /streaming/src/main | |
parent | 34dbc8af21da63702bc0694d471fbfee4cd08dda (diff) | |
download | spark-c0c397509bc909b9bf2d5186182f461155b021ab.tar.gz spark-c0c397509bc909b9bf2d5186182f461155b021ab.tar.bz2 spark-c0c397509bc909b9bf2d5186182f461155b021ab.zip |
[SPARK-12510][STREAMING] Refactor ActorReceiver to support Java
This PR includes the following changes:
1. Rename `ActorReceiver` to `ActorReceiverSupervisor`
2. Remove `ActorHelper`
3. Add a new `ActorReceiver` for Scala and `JavaActorReceiver` for Java
4. Add `JavaActorWordCount` example
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10457 from zsxwing/java-actor-stream.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 4 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala | 64 |
2 files changed, 56 insertions, 12 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ca0a21fbb7..ba509a1030 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -41,7 +41,7 @@ import org.apache.spark.serializer.SerializationDebugger import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContextState._ import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver} +import org.apache.spark.streaming.receiver.{ActorReceiverSupervisor, ActorSupervisorStrategy, Receiver} import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} import org.apache.spark.util.{AsynchronousListenerBus, CallSite, ShutdownHookManager, ThreadUtils, Utils} @@ -312,7 +312,7 @@ class StreamingContext private[streaming] ( storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy ): ReceiverInputDStream[T] = withNamedScope("actor stream") { - receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) + receiverStream(new ActorReceiverSupervisor[T](props, name, storageLevel, supervisorStrategy)) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala index 7ec74016a1..0eabf3d260 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala @@ -47,13 +47,12 @@ object ActorSupervisorStrategy { /** * :: DeveloperApi :: - * A receiver trait to be mixed in with your Actor to gain access to - * the API for pushing received data into Spark Streaming for being processed. + * A base Actor that provides APIs for pushing received data into Spark Streaming for processing. * * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html * * @example {{{ - * class MyActor extends Actor with ActorHelper{ + * class MyActor extends ActorReceiver { * def receive { * case anything: String => store(anything) * } @@ -69,13 +68,60 @@ object ActorSupervisorStrategy { * should be same. */ @DeveloperApi -trait ActorHelper extends Logging{ +abstract class ActorReceiver extends Actor { - self: Actor => // to ensure that this can be added to Actor classes only + /** Store an iterator of received data as a data block into Spark's memory. */ + def store[T](iter: Iterator[T]) { + context.parent ! IteratorData(iter) + } + + /** + * Store the bytes of received data as a data block into Spark's memory. Note + * that the data in the ByteBuffer must be serialized using the same serializer + * that Spark is configured to use. + */ + def store(bytes: ByteBuffer) { + context.parent ! ByteBufferData(bytes) + } + + /** + * Store a single item of received data to Spark's memory. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + */ + def store[T](item: T) { + context.parent ! SingleItemData(item) + } +} + +/** + * :: DeveloperApi :: + * A Java UntypedActor that provides APIs for pushing received data into Spark Streaming for + * processing. + * + * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * + * @example {{{ + * class MyActor extends JavaActorReceiver { + * def receive { + * case anything: String => store(anything) + * } + * } + * + * // Can be used with an actorStream as follows + * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") + * + * }}} + * + * @note Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of push block and InputDStream + * should be same. + */ +@DeveloperApi +abstract class JavaActorReceiver extends UntypedActor { /** Store an iterator of received data as a data block into Spark's memory. */ def store[T](iter: Iterator[T]) { - logDebug("Storing iterator") context.parent ! IteratorData(iter) } @@ -85,7 +131,6 @@ trait ActorHelper extends Logging{ * that Spark is configured to use. */ def store(bytes: ByteBuffer) { - logDebug("Storing Bytes") context.parent ! ByteBufferData(bytes) } @@ -95,7 +140,6 @@ trait ActorHelper extends Logging{ * being pushed into Spark's memory. */ def store[T](item: T) { - logDebug("Storing item") context.parent ! SingleItemData(item) } } @@ -104,7 +148,7 @@ trait ActorHelper extends Logging{ * :: DeveloperApi :: * Statistics for querying the supervisor about state of workers. Used in * conjunction with `StreamingContext.actorStream` and - * [[org.apache.spark.streaming.receiver.ActorHelper]]. + * [[org.apache.spark.streaming.receiver.ActorReceiver]]. */ @DeveloperApi case class Statistics(numberOfMsgs: Int, @@ -137,7 +181,7 @@ private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorRec * context.parent ! Props(new Worker, "Worker") * }}} */ -private[streaming] class ActorReceiver[T: ClassTag]( +private[streaming] class ActorReceiverSupervisor[T: ClassTag]( props: Props, name: String, storageLevel: StorageLevel, |