diff options
8 files changed, 319 insertions, 182 deletions
diff --git a/docs/plugin-custom-receiver.md b/docs/plugin-custom-receiver.md index 41e6a17e2c..0eb4246158 100644 --- a/docs/plugin-custom-receiver.md +++ b/docs/plugin-custom-receiver.md @@ -20,20 +20,20 @@ Following is a simple socket text-stream receiver, which is appearently overly s class SocketTextStreamReceiver (host:String, port:Int, - bytesToString: ByteString => String) extends Actor { + bytesToString: ByteString => String) extends Actor with Receiver { override def preStart = IOManager(context.system).connect(host, port) def receive = { - case IO.Read(socket, bytes) => context.parent ! Data(bytesToString(bytes)) + case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes)) } + } {% endhighlight %} - -_Please see implementations of NetworkReceiver for more generic NetworkReceivers._ +All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details. ### A sample spark application @@ -50,7 +50,7 @@ _Please see implementations of NetworkReceiver for more generic NetworkReceivers {% highlight scala %} - val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver( + val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( "localhost",8445, z => z.utf8String)),"SocketReceiver") {% endhighlight %} @@ -81,11 +81,11 @@ A DStream union operation is provided for taking union on multiple input streams {% highlight scala %} - val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver( + val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver( "localhost",8445, z => z.utf8String)),"SocketReceiver") // Another socket stream receiver - val lines2 = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver( + val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver( "localhost",8446, z => z.utf8String)),"SocketReceiver") val union = lines.union(lines2) diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala new file mode 100644 index 0000000000..71b4e5bf1a --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala @@ -0,0 +1,158 @@ +package spark.streaming.examples + +import scala.collection.mutable.LinkedList +import scala.util.Random + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.actorRef2Scala + +import spark.streaming.Seconds +import spark.streaming.StreamingContext +import spark.streaming.StreamingContext.toPairDStreamFunctions +import spark.streaming.receivers.Receiver +import spark.util.AkkaUtils + +case class SubscribeReceiver(receiverActor: ActorRef) +case class UnsubscribeReceiver(receiverActor: ActorRef) + +/** + * Sends the random content to every receiver subscribed with 1/2 + * second delay. + */ +class FeederActor extends Actor { + + val rand = new Random() + var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]() + + val strings: Array[String] = Array("words ", "may ", "count ") + + def makeMessage(): String = { + val x = rand.nextInt(3) + strings(x) + strings(2 - x) + } + + /* + * A thread to generate random messages + */ + new Thread() { + override def run() { + while (true) { + Thread.sleep(500) + receivers.foreach(_ ! makeMessage) + } + } + }.start() + + def receive: Receive = { + + case SubscribeReceiver(receiverActor: ActorRef) => + println("received subscribe from %s".format(receiverActor.toString)) + receivers = LinkedList(receiverActor) ++ receivers + + case UnsubscribeReceiver(receiverActor: ActorRef) => + println("received unsubscribe from %s".format(receiverActor.toString)) + receivers = receivers.dropWhile(x => x eq receiverActor) + + } +} + +/** + * A sample actor as receiver, is also simplest. This receiver actor + * goes and subscribe to a typical publisher/feeder actor and receives + * data. + * + * @see [[spark.streaming.examples.FeederActor]] + */ +class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String) +extends Actor with Receiver { + + lazy private val remotePublisher = context.actorFor(urlOfPublisher) + + override def preStart = remotePublisher ! SubscribeReceiver(context.self) + + def receive = { + case msg ⇒ context.parent ! pushBlock(msg.asInstanceOf[T]) + } + + override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self) + +} + +/** + * A sample feeder actor + * + * Usage: FeederActor <hostname> <port> + * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on. + */ +object FeederActor { + + def main(args: Array[String]) { + if(args.length < 2){ + System.err.println( + "Usage: FeederActor <hostname> <port>\n" + ) + System.exit(1) + } + val Seq(host, port) = args.toSeq + + + val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1 + val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") + + println("Feeder started as:" + feeder) + + actorSystem.awaitTermination(); + } +} + +/** + * A sample word count program demonstrating the use of plugging in + * Actor as Receiver + * Usage: ActorWordCount <master> <hostname> <port> + * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on. + * + * To run this example locally, you may run Feeder Actor as + * `$ ./run spark.streaming.examples.FeederActor 127.0.1.1 9999` + * and then run the example + * `$ ./run spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999` + */ +object ActorWordCount { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println( + "Usage: ActorWordCount <master> <hostname> <port>" + + "In local mode, <master> should be 'local[n]' with n > 1") + System.exit(1) + } + + val Seq(master, host, port) = args.toSeq + + // Create the context and set the batch size + val ssc = new StreamingContext(master, "ActorWordCount", + Seconds(10)) + + /* + * Following is the use of actorStream to plug in custom actor as receiver + * + * An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e type of data received and InputDstream + * should be same. + * + * For example: Both actorStream and SampleActorReceiver are parameterized + * to same type to ensure type safety. + */ + + val lines = ssc.actorStream[String]( + Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format( + host, port.toInt))), "SampleReceiver") + + //compute wordcount + lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() + + ssc.start() + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala deleted file mode 100644 index 553afc2024..0000000000 --- a/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala +++ /dev/null @@ -1,80 +0,0 @@ -package spark.streaming.examples - -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props -import akka.actor.actorRef2Scala - -import spark.streaming.Seconds -import spark.streaming.StreamingContext -import spark.streaming.StreamingContext.toPairDStreamFunctions -import spark.streaming.receivers.Data - -case class SubscribeReceiver(receiverActor: ActorRef) -case class UnsubscribeReceiver(receiverActor: ActorRef) - -/** - * A sample actor as receiver is also simplest. This receiver actor - * goes and subscribe to a typical publisher/feeder actor and receives - * data, thus it is important to have feeder running before this example - * can be run. Please see FileTextStreamFeeder(sample) for feeder of this - * receiver. - */ -class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String) - extends Actor { - - lazy private val remotePublisher = context.actorFor(urlOfPublisher) - - override def preStart = remotePublisher ! SubscribeReceiver(context.self) - - def receive = { - case msg => context.parent ! Data(msg.asInstanceOf[T]) - } - - override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self) - -} - -/** - * A sample word count program demonstrating the use of Akka actor stream. - * - */ -object AkkaActorWordCount { - def main(args: Array[String]) { - if (args.length < 4) { - System.err.println( - "Usage: AkkaActorWordCount <master> <batch-duration in seconds>" + - " <remoteAkkaHost> <remoteAkkaPort>" + - "In local mode, <master> should be 'local[n]' with n > 1") - System.exit(1) - } - - val Seq(master, batchDuration, remoteAkkaHost, remoteAkkaPort) = args.toSeq - - // Create the context and set the batch size - val ssc = new StreamingContext(master, "AkkaActorWordCount", - Seconds(batchDuration.toLong)) - - /* - * Following is the use of actorStream to plug in custom actor as receiver - * - * An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e type of data received and actorStream - * should be same. - * - * For example: Both actorStream and SampleActorReceiver are parameterized - * to same type to ensure type safety. - */ - - val lines = ssc.actorStream[String]( - Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format( - remoteAkkaHost, remoteAkkaPort.toInt))), "SampleReceiver") - - //compute wordcount - lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print() - - ssc.start() - - } -} diff --git a/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala b/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala deleted file mode 100644 index f4c1b87f0e..0000000000 --- a/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala +++ /dev/null @@ -1,63 +0,0 @@ -package spark.streaming.examples - -import java.util.concurrent.CountDownLatch - -import scala.collection.mutable.LinkedList -import scala.io.Source - -import akka.actor.{ Actor, ActorRef, actorRef2Scala } -import akka.actor.Props - -import spark.util.AkkaUtils - -/** - * A feeder to which multiple message receiver (specified by "noOfReceivers")actors - * subscribe and receive file(s)'s text as stream of messages. This is provided - * as a demonstration application for trying out Actor as receiver feature. Please see - * SampleActorReceiver or AkkaActorWordCount example for details about the - * receiver of this feeder. - */ - -object FileTextStreamFeeder { - - var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]() - var countdownLatch: CountDownLatch = _ - def main(args: Array[String]) = args.toList match { - - case host :: port :: noOfReceivers :: fileNames => - val acs = AkkaUtils.createActorSystem("spark", host, port.toInt)._1 - countdownLatch = new CountDownLatch(noOfReceivers.toInt) - val actor = acs.actorOf(Props(new FeederActor), "FeederActor") - countdownLatch.await() //wait for all the receivers to subscribe - for (fileName <- fileNames;line <- Source.fromFile(fileName).getLines) { - actor ! line - } - acs.awaitTermination(); - - case _ => - System.err.println("Usage: FileTextStreamFeeder <hostname> <port> <no_of_receivers> <filenames>") - System.exit(1) - } - - /** - * Sends the content to every receiver subscribed - */ - class FeederActor extends Actor { - - def receive: Receive = { - - case SubscribeReceiver(receiverActor: ActorRef) => - println("received subscribe from %s".format(receiverActor.toString)) - receivers = LinkedList(receiverActor) ++ receivers - countdownLatch.countDown() - - case UnsubscribeReceiver(receiverActor: ActorRef) => - println("received unsubscribe from %s".format(receiverActor.toString)) - receivers = receivers.dropWhile(x => x eq receiverActor) - - case textMessage: String => - receivers.foreach(_ ! textMessage) - - } - } -}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index a426649726..a9684c5772 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -1,14 +1,16 @@ package spark.streaming import akka.actor.Props +import akka.actor.SupervisorStrategy import spark.streaming.dstream._ import spark.{RDD, Logging, SparkEnv, SparkContext} +import spark.streaming.receivers.ActorReceiver +import spark.streaming.receivers.ReceiverSupervisorStrategy import spark.storage.StorageLevel import spark.util.MetadataCleaner import spark.streaming.receivers.ActorReceiver -import spark.streaming.receivers.Settings import scala.collection.mutable.Queue @@ -155,17 +157,22 @@ class StreamingContext private ( } /** - * Create an input stream with any arbitrary user implemented Akka actor receiver. + * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor * @param name Name of the actor - * @param storageLevel Storage level to use for storing the received objects - * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + * @param storageLevel RDD storage level. Defaults to memory-only. + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. */ def actorStream[T: ClassManifest]( - props: Props, name: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[T] = { - networkStream(new ActorReceiver(Settings(props, name, storageLevel))) - } + props: Props, name: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { + networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) + } /** * Create an input stream that pulls messages form a Kafka Broker. diff --git a/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala index 674f1059fe..3c2a81947b 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala @@ -2,6 +2,7 @@ package spark.streaming.dstream import spark.streaming.StreamingContext +private[streaming] class PluggableInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) { diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala index f24c99ad70..b3201d0b28 100644 --- a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala @@ -23,23 +23,47 @@ object ReceiverSupervisorStrategy { } /** - * Settings for configuring the actor creation or defining supervisor strategy + * A receiver trait to be mixed in with your Actor to gain access to + * pushBlock API. + * + * @example {{{ + * class MyActor extends Actor with Receiver{ + * def receive { + * case anything :String ⇒ pushBlock(anything) + * } + * } + * //Can be plugged in actorStream as follows + * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") + * + * }}} + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of push block and InputDStream + * should be same. + * */ -case class Settings(props: Props, - name: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, - supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy) +trait Receiver { self: Actor ⇒ + def pushBlock[T: ClassManifest](iter: Iterator[T]) { + context.parent ! Data(iter) + } + + def pushBlock[T: ClassManifest](data: T) { + context.parent ! Data(data) + } + +} /** - * Statistcs for querying the supervisor about state of workers + * Statistics for querying the supervisor about state of workers */ -case class Statistcs(numberOfMsgs: Int, +case class Statistics(numberOfMsgs: Int, numberOfWorkers: Int, numberOfHiccups: Int, otherInfo: String) /** Case class to receive data sent by child actors **/ -case class Data[T: ClassManifest](data: T) +private[streaming] case class Data[T: ClassManifest](data: T) /** * Provides Actors as receivers for receiving stream. @@ -48,20 +72,37 @@ case class Data[T: ClassManifest](data: T) * A nice set of abstraction(s) for actors as receivers is already provided for * a few general cases. It is thus exposed as an API where user may come with * his own Actor to run as receiver for Spark Streaming input source. + * + * This starts a supervisor actor which starts workers and also provides + * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. + * + * Here's a way to start more supervisor/workers as its children. + * + * @example {{{ + * context.parent ! Props(new Supervisor) + * }}} OR {{{ + * context.parent ! Props(new Worker,"Worker") + * }}} + * + * */ -class ActorReceiver[T: ClassManifest](settings: Settings) +private[streaming] class ActorReceiver[T: ClassManifest]( + props: Props, + name: String, + storageLevel: StorageLevel, + receiverSupervisorStrategy: SupervisorStrategy) extends NetworkReceiver[T] { protected lazy val blocksGenerator: BlockGenerator = - new BlockGenerator(settings.storageLevel) + new BlockGenerator(storageLevel) protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor), "Supervisor" + streamId) private class Supervisor extends Actor { - override val supervisorStrategy = settings.supervisorStrategy - val worker = context.actorOf(settings.props, settings.name) + override val supervisorStrategy = receiverSupervisorStrategy + val worker = context.actorOf(props, name) logInfo("Started receiver worker at:" + worker.path) val n: AtomicInteger = new AtomicInteger(0) @@ -69,33 +110,34 @@ class ActorReceiver[T: ClassManifest](settings: Settings) def receive = { - case props: Props => + case Data(iter: Iterator[_]) ⇒ pushBlock(iter.asInstanceOf[Iterator[T]]) + + case Data(msg) ⇒ + blocksGenerator += msg.asInstanceOf[T] + n.incrementAndGet + + case props: Props ⇒ val worker = context.actorOf(props) logInfo("Started receiver worker at:" + worker.path) sender ! worker - case (props: Props, name: String) => + case (props: Props, name: String) ⇒ val worker = context.actorOf(props, name) logInfo("Started receiver worker at:" + worker.path) sender ! worker case _: PossiblyHarmful => hiccups.incrementAndGet() - case _: Statistcs => + case _: Statistics ⇒ val workers = context.children - sender ! Statistcs(n.get, workers.size, hiccups.get, workers.mkString("\n")) - - case Data(iter: Iterator[_]) => push(iter.asInstanceOf[Iterator[T]]) + sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n")) - case Data(msg) => - blocksGenerator += msg.asInstanceOf[T] - n.incrementAndGet } } - protected def push(iter: Iterator[T]) { + protected def pushBlock(iter: Iterator[T]) { pushBlock("block-" + streamId + "-" + System.nanoTime(), - iter, null, settings.storageLevel) + iter, null, storageLevel) } protected def onStart() = { diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index e6aecfbb76..c9f941c5b8 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -1,5 +1,11 @@ package spark.streaming +import akka.actor.Actor +import akka.actor.IO +import akka.actor.IOManager +import akka.actor.Props +import akka.util.ByteString + import dstream.SparkFlumeEvent import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} @@ -7,6 +13,7 @@ import java.util.concurrent.{TimeUnit, ArrayBlockingQueue} import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import util.ManualClock import spark.storage.StorageLevel +import spark.streaming.receivers.Receiver import spark.Logging import scala.util.Random import org.apache.commons.io.FileUtils @@ -25,6 +32,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + val testPort = 9999 + override def checkpointDir = "checkpoint" after { @@ -35,7 +44,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("network input stream") { // Start the server - val testPort = 9999 val testServer = new TestServer(testPort) testServer.start() @@ -181,8 +189,60 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Enable manual clock back again for other tests System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") } + + + test("actor input stream") { + // Start the server + val port = testPort + val testServer = new TestServer(port) + testServer.start() + + // Set up the streaming context and input streams + val ssc = new StreamingContext(master, framework, batchDuration) + val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor", + StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + val outputStream = new TestOutputStream(networkStream, outputBuffer) + def output = outputBuffer.flatMap(x => x) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Feed data to the server to send to the network receiver + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = 1 to 9 + val expectedOutput = input.map(x => x.toString) + Thread.sleep(1000) + for (i <- 0 until input.size) { + testServer.send(input(i).toString) + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(1000) + logInfo("Stopping server") + testServer.stop() + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputBuffer.size) + logInfo("output") + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + assert(output(i) === expectedOutput(i)) + } + } } + /** This is server to test the network input stream */ class TestServer(port: Int) extends Logging { @@ -242,3 +302,15 @@ object TestServer { } } } + +class TestActor(port: Int) extends Actor with Receiver { + + def bytesToString(byteString: ByteString) = byteString.utf8String + + override def preStart = IOManager(context.system).connect(new InetSocketAddress(port)) + + def receive = { + case IO.Read(socket, bytes) => + pushBlock(bytesToString(bytes)) + } +} |