aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/plugin-custom-receiver.md101
-rw-r--r--examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala80
-rw-r--r--examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala63
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala29
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala12
-rw-r--r--streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala111
6 files changed, 396 insertions, 0 deletions
diff --git a/docs/plugin-custom-receiver.md b/docs/plugin-custom-receiver.md
new file mode 100644
index 0000000000..41e6a17e2c
--- /dev/null
+++ b/docs/plugin-custom-receiver.md
@@ -0,0 +1,101 @@
+---
+layout: global
+title: Tutorial - Spark streaming, Plugging in a custom receiver.
+---
+
+A "Spark streaming" receiver can be a simple network stream, streams of messages from a message queue, files etc. A receiver can also assume roles more than just receiving data like filtering, preprocessing, to name a few of the possibilities. The api to plug-in any user defined custom receiver is thus provided to encourage development of receivers which may be well suited to ones specific need.
+
+This guide shows the programming model and features by walking through a simple sample receiver and corresponding Spark Streaming application.
+
+
+## A quick and naive walk-through
+
+### Write a simple receiver
+
+This starts with implementing [Actor](#References)
+
+Following is a simple socket text-stream receiver, which is appearently overly simplified using Akka's socket.io api.
+
+{% highlight scala %}
+
+ class SocketTextStreamReceiver (host:String,
+ port:Int,
+ bytesToString: ByteString => String) extends Actor {
+
+ override def preStart = IOManager(context.system).connect(host, port)
+
+ def receive = {
+ case IO.Read(socket, bytes) => context.parent ! Data(bytesToString(bytes))
+ }
+ }
+
+
+{% endhighlight %}
+
+
+_Please see implementations of NetworkReceiver for more generic NetworkReceivers._
+
+### A sample spark application
+
+* First create a Spark streaming context with master url and batchduration.
+
+{% highlight scala %}
+
+ val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
+ Seconds(batchDuration))
+
+{% endhighlight %}
+
+* Plug-in the actor configuration into the spark streaming context and create a DStream.
+
+{% highlight scala %}
+
+ val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
+ "localhost",8445, z => z.utf8String)),"SocketReceiver")
+
+{% endhighlight %}
+
+* Process it.
+
+{% highlight scala %}
+
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+
+ wordCounts.print()
+ ssc.start()
+
+
+{% endhighlight %}
+
+* After processing it, stream can be tested using the netcat utility.
+
+ $ nc -l localhost 8445
+ hello world
+ hello hello
+
+
+## Multiple homogeneous/heterogeneous receivers.
+
+A DStream union operation is provided for taking union on multiple input streams.
+
+{% highlight scala %}
+
+ val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
+ "localhost",8445, z => z.utf8String)),"SocketReceiver")
+
+ // Another socket stream receiver
+ val lines2 = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
+ "localhost",8446, z => z.utf8String)),"SocketReceiver")
+
+ val union = lines.union(lines2)
+
+{% endhighlight %}
+
+Above stream can be easily process as described earlier.
+
+_A more comprehensive example is provided in the spark streaming examples_
+
+## References
+
+1.[Akka Actor documentation](http://doc.akka.io/docs/akka/2.0.5/scala/actors.html)
diff --git a/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala
new file mode 100644
index 0000000000..ff05842c71
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala
@@ -0,0 +1,80 @@
+package spark.streaming.examples
+
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.Props
+import akka.actor.actorRef2Scala
+
+import spark.streaming.Seconds
+import spark.streaming.StreamingContext
+import spark.streaming.StreamingContext.toPairDStreamFunctions
+import spark.streaming.receivers.Data
+
+case class SubscribeReceiver(receiverActor: ActorRef)
+case class UnsubscribeReceiver(receiverActor: ActorRef)
+
+/**
+ * A sample actor as receiver is also simplest. This receiver actor
+ * goes and subscribe to a typical publisher/feeder actor and receives
+ * data, thus it is important to have feeder running before this example
+ * can be run. Please see FileTextStreamFeeder(sample) for feeder of this
+ * receiver.
+ */
+class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
+ extends Actor {
+
+ lazy private val remotePublisher = context.actorFor(urlOfPublisher)
+
+ override def preStart = remotePublisher ! SubscribeReceiver(context.self)
+
+ def receive = {
+ case msg => context.parent ! Data(msg.asInstanceOf[T])
+ }
+
+ override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
+
+}
+
+/**
+ * A sample word count program demonstrating the use of plugging in
+ * AkkaActor as Receiver
+ */
+object AkkaActorWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ System.err.println(
+ "Usage: AkkaActorWordCount <master> <batch-duration in seconds>" +
+ " <remoteAkkaHost> <remoteAkkaPort>" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ val Seq(master, batchDuration, remoteAkkaHost, remoteAkkaPort) = args.toSeq
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "AkkaActorWordCount",
+ Seconds(batchDuration.toLong))
+
+ /*
+ * Following is the use of pluggableActorStream to plug in custom actor as receiver
+ *
+ * An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e type of data received and PluggableInputDstream
+ * should be same.
+ *
+ * For example: Both pluggableActorStream and SampleActorReceiver are parameterized
+ * to same type to ensure type safety.
+ */
+
+ val lines = ssc.pluggableActorStream[String](
+ Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format(
+ remoteAkkaHost, remoteAkkaPort.toInt))), "SampleReceiver")
+
+ //compute wordcount
+ lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
+
+ ssc.start()
+
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala b/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala
new file mode 100644
index 0000000000..f4c1b87f0e
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala
@@ -0,0 +1,63 @@
+package spark.streaming.examples
+
+import java.util.concurrent.CountDownLatch
+
+import scala.collection.mutable.LinkedList
+import scala.io.Source
+
+import akka.actor.{ Actor, ActorRef, actorRef2Scala }
+import akka.actor.Props
+
+import spark.util.AkkaUtils
+
+/**
+ * A feeder to which multiple message receiver (specified by "noOfReceivers")actors
+ * subscribe and receive file(s)'s text as stream of messages. This is provided
+ * as a demonstration application for trying out Actor as receiver feature. Please see
+ * SampleActorReceiver or AkkaActorWordCount example for details about the
+ * receiver of this feeder.
+ */
+
+object FileTextStreamFeeder {
+
+ var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]()
+ var countdownLatch: CountDownLatch = _
+ def main(args: Array[String]) = args.toList match {
+
+ case host :: port :: noOfReceivers :: fileNames =>
+ val acs = AkkaUtils.createActorSystem("spark", host, port.toInt)._1
+ countdownLatch = new CountDownLatch(noOfReceivers.toInt)
+ val actor = acs.actorOf(Props(new FeederActor), "FeederActor")
+ countdownLatch.await() //wait for all the receivers to subscribe
+ for (fileName <- fileNames;line <- Source.fromFile(fileName).getLines) {
+ actor ! line
+ }
+ acs.awaitTermination();
+
+ case _ =>
+ System.err.println("Usage: FileTextStreamFeeder <hostname> <port> <no_of_receivers> <filenames>")
+ System.exit(1)
+ }
+
+ /**
+ * Sends the content to every receiver subscribed
+ */
+ class FeederActor extends Actor {
+
+ def receive: Receive = {
+
+ case SubscribeReceiver(receiverActor: ActorRef) =>
+ println("received subscribe from %s".format(receiverActor.toString))
+ receivers = LinkedList(receiverActor) ++ receivers
+ countdownLatch.countDown()
+
+ case UnsubscribeReceiver(receiverActor: ActorRef) =>
+ println("received unsubscribe from %s".format(receiverActor.toString))
+ receivers = receivers.dropWhile(x => x eq receiverActor)
+
+ case textMessage: String =>
+ receivers.foreach(_ ! textMessage)
+
+ }
+ }
+} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 14500bdcb1..cd7379da14 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -1,10 +1,15 @@
package spark.streaming
+import akka.actor.Props
+
import spark.streaming.dstream._
import spark.{RDD, Logging, SparkEnv, SparkContext}
import spark.storage.StorageLevel
import spark.util.MetadataCleaner
+import spark.streaming.receivers.ActorReceiver
+import spark.streaming.receivers.Settings
+
import scala.collection.mutable.Queue
@@ -135,6 +140,30 @@ class StreamingContext private (
protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
/**
+ * Create an input stream with any arbitrary user implemented network receiver.
+ * @param receiver Custom implementation of NetworkReceiver
+ */
+ def pluggableNetworkStream[T: ClassManifest](
+ receiver: NetworkReceiver[T]): DStream[T] = {
+ val inputStream = new PluggableInputDStream[T](this,
+ receiver)
+ graph.addInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented akka actor receiver.
+ * @param props Props object defining creation of the actor
+ * @param name Name of the actor
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+ def pluggableActorStream[T: ClassManifest](
+ props: Props, name: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[T] = {
+ pluggableNetworkStream(new ActorReceiver(Settings(props, name, storageLevel)))
+ }
+
+ /**
* Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname.
* @param port Zookeper port.
diff --git a/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala
new file mode 100644
index 0000000000..674f1059fe
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala
@@ -0,0 +1,12 @@
+package spark.streaming.dstream
+
+import spark.streaming.StreamingContext
+
+class PluggableInputDStream[T: ClassManifest](
+ @transient ssc_ : StreamingContext,
+ receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) {
+
+ def getReceiver(): NetworkReceiver[T] = {
+ receiver
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
new file mode 100644
index 0000000000..f24c99ad70
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
@@ -0,0 +1,111 @@
+package spark.streaming.receivers
+
+import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
+import akka.actor.{ actorRef2Scala, ActorRef }
+import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
+
+import spark.storage.StorageLevel
+import spark.streaming.dstream.NetworkReceiver
+
+import java.util.concurrent.atomic.AtomicInteger
+
+/** A helper with set of defaults for supervisor strategy **/
+object ReceiverSupervisorStrategy {
+
+ import akka.util.duration._
+ import akka.actor.SupervisorStrategy._
+
+ val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
+ 15 millis) {
+ case _: RuntimeException ⇒ Restart
+ case _: Exception ⇒ Escalate
+ }
+}
+
+/**
+ * Settings for configuring the actor creation or defining supervisor strategy
+ */
+case class Settings(props: Props,
+ name: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy)
+
+/**
+ * Statistcs for querying the supervisor about state of workers
+ */
+case class Statistcs(numberOfMsgs: Int,
+ numberOfWorkers: Int,
+ numberOfHiccups: Int,
+ otherInfo: String)
+
+/** Case class to receive data sent by child actors **/
+case class Data[T: ClassManifest](data: T)
+
+/**
+ * Provides Actors as receivers for receiving stream.
+ *
+ * As Actors can also be used to receive data from almost any stream source.
+ * A nice set of abstraction(s) for actors as receivers is already provided for
+ * a few general cases. It is thus exposed as an API where user may come with
+ * his own Actor to run as receiver for Spark Streaming input source.
+ */
+class ActorReceiver[T: ClassManifest](settings: Settings)
+ extends NetworkReceiver[T] {
+
+ protected lazy val blocksGenerator: BlockGenerator =
+ new BlockGenerator(settings.storageLevel)
+
+ protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
+ "Supervisor" + streamId)
+
+ private class Supervisor extends Actor {
+
+ override val supervisorStrategy = settings.supervisorStrategy
+ val worker = context.actorOf(settings.props, settings.name)
+ logInfo("Started receiver worker at:" + worker.path)
+
+ val n: AtomicInteger = new AtomicInteger(0)
+ val hiccups: AtomicInteger = new AtomicInteger(0)
+
+ def receive = {
+
+ case props: Props =>
+ val worker = context.actorOf(props)
+ logInfo("Started receiver worker at:" + worker.path)
+ sender ! worker
+
+ case (props: Props, name: String) =>
+ val worker = context.actorOf(props, name)
+ logInfo("Started receiver worker at:" + worker.path)
+ sender ! worker
+
+ case _: PossiblyHarmful => hiccups.incrementAndGet()
+
+ case _: Statistcs =>
+ val workers = context.children
+ sender ! Statistcs(n.get, workers.size, hiccups.get, workers.mkString("\n"))
+
+ case Data(iter: Iterator[_]) => push(iter.asInstanceOf[Iterator[T]])
+
+ case Data(msg) =>
+ blocksGenerator += msg.asInstanceOf[T]
+ n.incrementAndGet
+ }
+ }
+
+ protected def push(iter: Iterator[T]) {
+ pushBlock("block-" + streamId + "-" + System.nanoTime(),
+ iter, null, settings.storageLevel)
+ }
+
+ protected def onStart() = {
+ blocksGenerator.start()
+ supervisor
+ logInfo("Supervision tree for receivers initialized at:" + supervisor.path)
+ }
+
+ protected def onStop() = {
+ supervisor ! PoisonPill
+ }
+
+}