aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala29
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala12
-rw-r--r--streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala111
3 files changed, 152 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 14500bdcb1..cd7379da14 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -1,10 +1,15 @@
package spark.streaming
+import akka.actor.Props
+
import spark.streaming.dstream._
import spark.{RDD, Logging, SparkEnv, SparkContext}
import spark.storage.StorageLevel
import spark.util.MetadataCleaner
+import spark.streaming.receivers.ActorReceiver
+import spark.streaming.receivers.Settings
+
import scala.collection.mutable.Queue
@@ -135,6 +140,30 @@ class StreamingContext private (
protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
/**
+ * Create an input stream with any arbitrary user implemented network receiver.
+ * @param receiver Custom implementation of NetworkReceiver
+ */
+ def pluggableNetworkStream[T: ClassManifest](
+ receiver: NetworkReceiver[T]): DStream[T] = {
+ val inputStream = new PluggableInputDStream[T](this,
+ receiver)
+ graph.addInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented akka actor receiver.
+ * @param props Props object defining creation of the actor
+ * @param name Name of the actor
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+ def pluggableActorStream[T: ClassManifest](
+ props: Props, name: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[T] = {
+ pluggableNetworkStream(new ActorReceiver(Settings(props, name, storageLevel)))
+ }
+
+ /**
* Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname.
* @param port Zookeper port.
diff --git a/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala
new file mode 100644
index 0000000000..674f1059fe
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala
@@ -0,0 +1,12 @@
+package spark.streaming.dstream
+
+import spark.streaming.StreamingContext
+
+class PluggableInputDStream[T: ClassManifest](
+ @transient ssc_ : StreamingContext,
+ receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) {
+
+ def getReceiver(): NetworkReceiver[T] = {
+ receiver
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
new file mode 100644
index 0000000000..f24c99ad70
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
@@ -0,0 +1,111 @@
+package spark.streaming.receivers
+
+import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
+import akka.actor.{ actorRef2Scala, ActorRef }
+import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
+
+import spark.storage.StorageLevel
+import spark.streaming.dstream.NetworkReceiver
+
+import java.util.concurrent.atomic.AtomicInteger
+
+/** A helper with set of defaults for supervisor strategy **/
+object ReceiverSupervisorStrategy {
+
+ import akka.util.duration._
+ import akka.actor.SupervisorStrategy._
+
+ val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
+ 15 millis) {
+ case _: RuntimeException ⇒ Restart
+ case _: Exception ⇒ Escalate
+ }
+}
+
+/**
+ * Settings for configuring the actor creation or defining supervisor strategy
+ */
+case class Settings(props: Props,
+ name: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy)
+
+/**
+ * Statistcs for querying the supervisor about state of workers
+ */
+case class Statistcs(numberOfMsgs: Int,
+ numberOfWorkers: Int,
+ numberOfHiccups: Int,
+ otherInfo: String)
+
+/** Case class to receive data sent by child actors **/
+case class Data[T: ClassManifest](data: T)
+
+/**
+ * Provides Actors as receivers for receiving stream.
+ *
+ * As Actors can also be used to receive data from almost any stream source.
+ * A nice set of abstraction(s) for actors as receivers is already provided for
+ * a few general cases. It is thus exposed as an API where user may come with
+ * his own Actor to run as receiver for Spark Streaming input source.
+ */
+class ActorReceiver[T: ClassManifest](settings: Settings)
+ extends NetworkReceiver[T] {
+
+ protected lazy val blocksGenerator: BlockGenerator =
+ new BlockGenerator(settings.storageLevel)
+
+ protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
+ "Supervisor" + streamId)
+
+ private class Supervisor extends Actor {
+
+ override val supervisorStrategy = settings.supervisorStrategy
+ val worker = context.actorOf(settings.props, settings.name)
+ logInfo("Started receiver worker at:" + worker.path)
+
+ val n: AtomicInteger = new AtomicInteger(0)
+ val hiccups: AtomicInteger = new AtomicInteger(0)
+
+ def receive = {
+
+ case props: Props =>
+ val worker = context.actorOf(props)
+ logInfo("Started receiver worker at:" + worker.path)
+ sender ! worker
+
+ case (props: Props, name: String) =>
+ val worker = context.actorOf(props, name)
+ logInfo("Started receiver worker at:" + worker.path)
+ sender ! worker
+
+ case _: PossiblyHarmful => hiccups.incrementAndGet()
+
+ case _: Statistcs =>
+ val workers = context.children
+ sender ! Statistcs(n.get, workers.size, hiccups.get, workers.mkString("\n"))
+
+ case Data(iter: Iterator[_]) => push(iter.asInstanceOf[Iterator[T]])
+
+ case Data(msg) =>
+ blocksGenerator += msg.asInstanceOf[T]
+ n.incrementAndGet
+ }
+ }
+
+ protected def push(iter: Iterator[T]) {
+ pushBlock("block-" + streamId + "-" + System.nanoTime(),
+ iter, null, settings.storageLevel)
+ }
+
+ protected def onStart() = {
+ blocksGenerator.start()
+ supervisor
+ logInfo("Supervision tree for receivers initialized at:" + supervisor.path)
+ }
+
+ protected def onStop() = {
+ supervisor ! PoisonPill
+ }
+
+}