diff options
author | Prashant Sharma <prashant.iiith@gmail.com> | 2013-01-19 21:20:49 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.iiith@gmail.com> | 2013-01-19 22:04:07 +0530 |
commit | 56b9bd197c522f33e354c2e9ad7e76440cf817e9 (patch) | |
tree | bf62af73fa0272db1ae6b7c01a6d0d0307a85be8 /streaming | |
parent | bb6ab92e31b7aad464cf8262bc3567fdeb4c14c4 (diff) | |
download | spark-56b9bd197c522f33e354c2e9ad7e76440cf817e9.tar.gz spark-56b9bd197c522f33e354c2e9ad7e76440cf817e9.tar.bz2 spark-56b9bd197c522f33e354c2e9ad7e76440cf817e9.zip |
Plug in actor as stream receiver API
Diffstat (limited to 'streaming')
3 files changed, 152 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 14500bdcb1..cd7379da14 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -1,10 +1,15 @@ package spark.streaming +import akka.actor.Props + import spark.streaming.dstream._ import spark.{RDD, Logging, SparkEnv, SparkContext} import spark.storage.StorageLevel import spark.util.MetadataCleaner +import spark.streaming.receivers.ActorReceiver +import spark.streaming.receivers.Settings + import scala.collection.mutable.Queue @@ -135,6 +140,30 @@ class StreamingContext private ( protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() /** + * Create an input stream with any arbitrary user implemented network receiver. + * @param receiver Custom implementation of NetworkReceiver + */ + def pluggableNetworkStream[T: ClassManifest]( + receiver: NetworkReceiver[T]): DStream[T] = { + val inputStream = new PluggableInputDStream[T](this, + receiver) + graph.addInputStream(inputStream) + inputStream + } + + /** + * Create an input stream with any arbitrary user implemented akka actor receiver. + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + def pluggableActorStream[T: ClassManifest]( + props: Props, name: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[T] = { + pluggableNetworkStream(new ActorReceiver(Settings(props, name, storageLevel))) + } + + /** * Create an input stream that pulls messages form a Kafka Broker. * @param hostname Zookeper hostname. * @param port Zookeper port. diff --git a/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala new file mode 100644 index 0000000000..674f1059fe --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala @@ -0,0 +1,12 @@ +package spark.streaming.dstream + +import spark.streaming.StreamingContext + +class PluggableInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) { + + def getReceiver(): NetworkReceiver[T] = { + receiver + } +} diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala new file mode 100644 index 0000000000..f24c99ad70 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala @@ -0,0 +1,111 @@ +package spark.streaming.receivers + +import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy } +import akka.actor.{ actorRef2Scala, ActorRef } +import akka.actor.{ PossiblyHarmful, OneForOneStrategy } + +import spark.storage.StorageLevel +import spark.streaming.dstream.NetworkReceiver + +import java.util.concurrent.atomic.AtomicInteger + +/** A helper with set of defaults for supervisor strategy **/ +object ReceiverSupervisorStrategy { + + import akka.util.duration._ + import akka.actor.SupervisorStrategy._ + + val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = + 15 millis) { + case _: RuntimeException ⇒ Restart + case _: Exception ⇒ Escalate + } +} + +/** + * Settings for configuring the actor creation or defining supervisor strategy + */ +case class Settings(props: Props, + name: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy) + +/** + * Statistcs for querying the supervisor about state of workers + */ +case class Statistcs(numberOfMsgs: Int, + numberOfWorkers: Int, + numberOfHiccups: Int, + otherInfo: String) + +/** Case class to receive data sent by child actors **/ +case class Data[T: ClassManifest](data: T) + +/** + * Provides Actors as receivers for receiving stream. + * + * As Actors can also be used to receive data from almost any stream source. + * A nice set of abstraction(s) for actors as receivers is already provided for + * a few general cases. It is thus exposed as an API where user may come with + * his own Actor to run as receiver for Spark Streaming input source. + */ +class ActorReceiver[T: ClassManifest](settings: Settings) + extends NetworkReceiver[T] { + + protected lazy val blocksGenerator: BlockGenerator = + new BlockGenerator(settings.storageLevel) + + protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor), + "Supervisor" + streamId) + + private class Supervisor extends Actor { + + override val supervisorStrategy = settings.supervisorStrategy + val worker = context.actorOf(settings.props, settings.name) + logInfo("Started receiver worker at:" + worker.path) + + val n: AtomicInteger = new AtomicInteger(0) + val hiccups: AtomicInteger = new AtomicInteger(0) + + def receive = { + + case props: Props => + val worker = context.actorOf(props) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case (props: Props, name: String) => + val worker = context.actorOf(props, name) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case _: PossiblyHarmful => hiccups.incrementAndGet() + + case _: Statistcs => + val workers = context.children + sender ! Statistcs(n.get, workers.size, hiccups.get, workers.mkString("\n")) + + case Data(iter: Iterator[_]) => push(iter.asInstanceOf[Iterator[T]]) + + case Data(msg) => + blocksGenerator += msg.asInstanceOf[T] + n.incrementAndGet + } + } + + protected def push(iter: Iterator[T]) { + pushBlock("block-" + streamId + "-" + System.nanoTime(), + iter, null, settings.storageLevel) + } + + protected def onStart() = { + blocksGenerator.start() + supervisor + logInfo("Supervision tree for receivers initialized at:" + supervisor.path) + } + + protected def onStop() = { + supervisor ! PoisonPill + } + +} |