aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/plugin-custom-receiver.md14
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala158
-rw-r--r--examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala80
-rw-r--r--examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala63
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala23
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala88
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala74
8 files changed, 319 insertions, 182 deletions
diff --git a/docs/plugin-custom-receiver.md b/docs/plugin-custom-receiver.md
index 41e6a17e2c..0eb4246158 100644
--- a/docs/plugin-custom-receiver.md
+++ b/docs/plugin-custom-receiver.md
@@ -20,20 +20,20 @@ Following is a simple socket text-stream receiver, which is appearently overly s
class SocketTextStreamReceiver (host:String,
port:Int,
- bytesToString: ByteString => String) extends Actor {
+ bytesToString: ByteString => String) extends Actor with Receiver {
override def preStart = IOManager(context.system).connect(host, port)
def receive = {
- case IO.Read(socket, bytes) => context.parent ! Data(bytesToString(bytes))
+ case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
}
+
}
{% endhighlight %}
-
-_Please see implementations of NetworkReceiver for more generic NetworkReceivers._
+All we did here is mixed in trait Receiver and called pushBlock api method to push our blocks of data. Please refer to scala-docs of Receiver for more details.
### A sample spark application
@@ -50,7 +50,7 @@ _Please see implementations of NetworkReceiver for more generic NetworkReceivers
{% highlight scala %}
- val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
+ val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
{% endhighlight %}
@@ -81,11 +81,11 @@ A DStream union operation is provided for taking union on multiple input streams
{% highlight scala %}
- val lines = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
+ val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
// Another socket stream receiver
- val lines2 = ssc.pluggableActorStream[String](Props(new SocketTextStreamReceiver(
+ val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8446, z => z.utf8String)),"SocketReceiver")
val union = lines.union(lines2)
diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
new file mode 100644
index 0000000000..71b4e5bf1a
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
@@ -0,0 +1,158 @@
+package spark.streaming.examples
+
+import scala.collection.mutable.LinkedList
+import scala.util.Random
+
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.Props
+import akka.actor.actorRef2Scala
+
+import spark.streaming.Seconds
+import spark.streaming.StreamingContext
+import spark.streaming.StreamingContext.toPairDStreamFunctions
+import spark.streaming.receivers.Receiver
+import spark.util.AkkaUtils
+
+case class SubscribeReceiver(receiverActor: ActorRef)
+case class UnsubscribeReceiver(receiverActor: ActorRef)
+
+/**
+ * Sends the random content to every receiver subscribed with 1/2
+ * second delay.
+ */
+class FeederActor extends Actor {
+
+ val rand = new Random()
+ var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]()
+
+ val strings: Array[String] = Array("words ", "may ", "count ")
+
+ def makeMessage(): String = {
+ val x = rand.nextInt(3)
+ strings(x) + strings(2 - x)
+ }
+
+ /*
+ * A thread to generate random messages
+ */
+ new Thread() {
+ override def run() {
+ while (true) {
+ Thread.sleep(500)
+ receivers.foreach(_ ! makeMessage)
+ }
+ }
+ }.start()
+
+ def receive: Receive = {
+
+ case SubscribeReceiver(receiverActor: ActorRef) =>
+ println("received subscribe from %s".format(receiverActor.toString))
+ receivers = LinkedList(receiverActor) ++ receivers
+
+ case UnsubscribeReceiver(receiverActor: ActorRef) =>
+ println("received unsubscribe from %s".format(receiverActor.toString))
+ receivers = receivers.dropWhile(x => x eq receiverActor)
+
+ }
+}
+
+/**
+ * A sample actor as receiver, is also simplest. This receiver actor
+ * goes and subscribe to a typical publisher/feeder actor and receives
+ * data.
+ *
+ * @see [[spark.streaming.examples.FeederActor]]
+ */
+class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
+extends Actor with Receiver {
+
+ lazy private val remotePublisher = context.actorFor(urlOfPublisher)
+
+ override def preStart = remotePublisher ! SubscribeReceiver(context.self)
+
+ def receive = {
+ case msg ⇒ context.parent ! pushBlock(msg.asInstanceOf[T])
+ }
+
+ override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
+
+}
+
+/**
+ * A sample feeder actor
+ *
+ * Usage: FeederActor <hostname> <port>
+ * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on.
+ */
+object FeederActor {
+
+ def main(args: Array[String]) {
+ if(args.length < 2){
+ System.err.println(
+ "Usage: FeederActor <hostname> <port>\n"
+ )
+ System.exit(1)
+ }
+ val Seq(host, port) = args.toSeq
+
+
+ val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1
+ val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
+
+ println("Feeder started as:" + feeder)
+
+ actorSystem.awaitTermination();
+ }
+}
+
+/**
+ * A sample word count program demonstrating the use of plugging in
+ * Actor as Receiver
+ * Usage: ActorWordCount <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
+ *
+ * To run this example locally, you may run Feeder Actor as
+ * `$ ./run spark.streaming.examples.FeederActor 127.0.1.1 9999`
+ * and then run the example
+ * `$ ./run spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
+ */
+object ActorWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println(
+ "Usage: ActorWordCount <master> <hostname> <port>" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ val Seq(master, host, port) = args.toSeq
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "ActorWordCount",
+ Seconds(10))
+
+ /*
+ * Following is the use of actorStream to plug in custom actor as receiver
+ *
+ * An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e type of data received and InputDstream
+ * should be same.
+ *
+ * For example: Both actorStream and SampleActorReceiver are parameterized
+ * to same type to ensure type safety.
+ */
+
+ val lines = ssc.actorStream[String](
+ Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format(
+ host, port.toInt))), "SampleReceiver")
+
+ //compute wordcount
+ lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
+
+ ssc.start()
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala
deleted file mode 100644
index 553afc2024..0000000000
--- a/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-package spark.streaming.examples
-
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.Props
-import akka.actor.actorRef2Scala
-
-import spark.streaming.Seconds
-import spark.streaming.StreamingContext
-import spark.streaming.StreamingContext.toPairDStreamFunctions
-import spark.streaming.receivers.Data
-
-case class SubscribeReceiver(receiverActor: ActorRef)
-case class UnsubscribeReceiver(receiverActor: ActorRef)
-
-/**
- * A sample actor as receiver is also simplest. This receiver actor
- * goes and subscribe to a typical publisher/feeder actor and receives
- * data, thus it is important to have feeder running before this example
- * can be run. Please see FileTextStreamFeeder(sample) for feeder of this
- * receiver.
- */
-class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
- extends Actor {
-
- lazy private val remotePublisher = context.actorFor(urlOfPublisher)
-
- override def preStart = remotePublisher ! SubscribeReceiver(context.self)
-
- def receive = {
- case msg => context.parent ! Data(msg.asInstanceOf[T])
- }
-
- override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
-
-}
-
-/**
- * A sample word count program demonstrating the use of Akka actor stream.
- *
- */
-object AkkaActorWordCount {
- def main(args: Array[String]) {
- if (args.length < 4) {
- System.err.println(
- "Usage: AkkaActorWordCount <master> <batch-duration in seconds>" +
- " <remoteAkkaHost> <remoteAkkaPort>" +
- "In local mode, <master> should be 'local[n]' with n > 1")
- System.exit(1)
- }
-
- val Seq(master, batchDuration, remoteAkkaHost, remoteAkkaPort) = args.toSeq
-
- // Create the context and set the batch size
- val ssc = new StreamingContext(master, "AkkaActorWordCount",
- Seconds(batchDuration.toLong))
-
- /*
- * Following is the use of actorStream to plug in custom actor as receiver
- *
- * An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e type of data received and actorStream
- * should be same.
- *
- * For example: Both actorStream and SampleActorReceiver are parameterized
- * to same type to ensure type safety.
- */
-
- val lines = ssc.actorStream[String](
- Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format(
- remoteAkkaHost, remoteAkkaPort.toInt))), "SampleReceiver")
-
- //compute wordcount
- lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).print()
-
- ssc.start()
-
- }
-}
diff --git a/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala b/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala
deleted file mode 100644
index f4c1b87f0e..0000000000
--- a/examples/src/main/scala/spark/streaming/examples/FileTextStreamFeeder.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-package spark.streaming.examples
-
-import java.util.concurrent.CountDownLatch
-
-import scala.collection.mutable.LinkedList
-import scala.io.Source
-
-import akka.actor.{ Actor, ActorRef, actorRef2Scala }
-import akka.actor.Props
-
-import spark.util.AkkaUtils
-
-/**
- * A feeder to which multiple message receiver (specified by "noOfReceivers")actors
- * subscribe and receive file(s)'s text as stream of messages. This is provided
- * as a demonstration application for trying out Actor as receiver feature. Please see
- * SampleActorReceiver or AkkaActorWordCount example for details about the
- * receiver of this feeder.
- */
-
-object FileTextStreamFeeder {
-
- var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]()
- var countdownLatch: CountDownLatch = _
- def main(args: Array[String]) = args.toList match {
-
- case host :: port :: noOfReceivers :: fileNames =>
- val acs = AkkaUtils.createActorSystem("spark", host, port.toInt)._1
- countdownLatch = new CountDownLatch(noOfReceivers.toInt)
- val actor = acs.actorOf(Props(new FeederActor), "FeederActor")
- countdownLatch.await() //wait for all the receivers to subscribe
- for (fileName <- fileNames;line <- Source.fromFile(fileName).getLines) {
- actor ! line
- }
- acs.awaitTermination();
-
- case _ =>
- System.err.println("Usage: FileTextStreamFeeder <hostname> <port> <no_of_receivers> <filenames>")
- System.exit(1)
- }
-
- /**
- * Sends the content to every receiver subscribed
- */
- class FeederActor extends Actor {
-
- def receive: Receive = {
-
- case SubscribeReceiver(receiverActor: ActorRef) =>
- println("received subscribe from %s".format(receiverActor.toString))
- receivers = LinkedList(receiverActor) ++ receivers
- countdownLatch.countDown()
-
- case UnsubscribeReceiver(receiverActor: ActorRef) =>
- println("received unsubscribe from %s".format(receiverActor.toString))
- receivers = receivers.dropWhile(x => x eq receiverActor)
-
- case textMessage: String =>
- receivers.foreach(_ ! textMessage)
-
- }
- }
-} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index a426649726..a9684c5772 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -1,14 +1,16 @@
package spark.streaming
import akka.actor.Props
+import akka.actor.SupervisorStrategy
import spark.streaming.dstream._
import spark.{RDD, Logging, SparkEnv, SparkContext}
+import spark.streaming.receivers.ActorReceiver
+import spark.streaming.receivers.ReceiverSupervisorStrategy
import spark.storage.StorageLevel
import spark.util.MetadataCleaner
import spark.streaming.receivers.ActorReceiver
-import spark.streaming.receivers.Settings
import scala.collection.mutable.Queue
@@ -155,17 +157,22 @@ class StreamingContext private (
}
/**
- * Create an input stream with any arbitrary user implemented Akka actor receiver.
+ * Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
* @param name Name of the actor
- * @param storageLevel Storage level to use for storing the received objects
- * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of data received and actorStream
+ * should be same.
*/
def actorStream[T: ClassManifest](
- props: Props, name: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[T] = {
- networkStream(new ActorReceiver(Settings(props, name, storageLevel)))
- }
+ props: Props, name: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
+ networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
+ }
/**
* Create an input stream that pulls messages form a Kafka Broker.
diff --git a/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala
index 674f1059fe..3c2a81947b 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala
@@ -2,6 +2,7 @@ package spark.streaming.dstream
import spark.streaming.StreamingContext
+private[streaming]
class PluggableInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) {
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
index f24c99ad70..b3201d0b28 100644
--- a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
@@ -23,23 +23,47 @@ object ReceiverSupervisorStrategy {
}
/**
- * Settings for configuring the actor creation or defining supervisor strategy
+ * A receiver trait to be mixed in with your Actor to gain access to
+ * pushBlock API.
+ *
+ * @example {{{
+ * class MyActor extends Actor with Receiver{
+ * def receive {
+ * case anything :String ⇒ pushBlock(anything)
+ * }
+ * }
+ * //Can be plugged in actorStream as follows
+ * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
+ *
+ * }}}
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of push block and InputDStream
+ * should be same.
+ *
*/
-case class Settings(props: Props,
- name: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy)
+trait Receiver { self: Actor ⇒
+ def pushBlock[T: ClassManifest](iter: Iterator[T]) {
+ context.parent ! Data(iter)
+ }
+
+ def pushBlock[T: ClassManifest](data: T) {
+ context.parent ! Data(data)
+ }
+
+}
/**
- * Statistcs for querying the supervisor about state of workers
+ * Statistics for querying the supervisor about state of workers
*/
-case class Statistcs(numberOfMsgs: Int,
+case class Statistics(numberOfMsgs: Int,
numberOfWorkers: Int,
numberOfHiccups: Int,
otherInfo: String)
/** Case class to receive data sent by child actors **/
-case class Data[T: ClassManifest](data: T)
+private[streaming] case class Data[T: ClassManifest](data: T)
/**
* Provides Actors as receivers for receiving stream.
@@ -48,20 +72,37 @@ case class Data[T: ClassManifest](data: T)
* A nice set of abstraction(s) for actors as receivers is already provided for
* a few general cases. It is thus exposed as an API where user may come with
* his own Actor to run as receiver for Spark Streaming input source.
+ *
+ * This starts a supervisor actor which starts workers and also provides
+ * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance].
+ *
+ * Here's a way to start more supervisor/workers as its children.
+ *
+ * @example {{{
+ * context.parent ! Props(new Supervisor)
+ * }}} OR {{{
+ * context.parent ! Props(new Worker,"Worker")
+ * }}}
+ *
+ *
*/
-class ActorReceiver[T: ClassManifest](settings: Settings)
+private[streaming] class ActorReceiver[T: ClassManifest](
+ props: Props,
+ name: String,
+ storageLevel: StorageLevel,
+ receiverSupervisorStrategy: SupervisorStrategy)
extends NetworkReceiver[T] {
protected lazy val blocksGenerator: BlockGenerator =
- new BlockGenerator(settings.storageLevel)
+ new BlockGenerator(storageLevel)
protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
"Supervisor" + streamId)
private class Supervisor extends Actor {
- override val supervisorStrategy = settings.supervisorStrategy
- val worker = context.actorOf(settings.props, settings.name)
+ override val supervisorStrategy = receiverSupervisorStrategy
+ val worker = context.actorOf(props, name)
logInfo("Started receiver worker at:" + worker.path)
val n: AtomicInteger = new AtomicInteger(0)
@@ -69,33 +110,34 @@ class ActorReceiver[T: ClassManifest](settings: Settings)
def receive = {
- case props: Props =>
+ case Data(iter: Iterator[_]) ⇒ pushBlock(iter.asInstanceOf[Iterator[T]])
+
+ case Data(msg) ⇒
+ blocksGenerator += msg.asInstanceOf[T]
+ n.incrementAndGet
+
+ case props: Props ⇒
val worker = context.actorOf(props)
logInfo("Started receiver worker at:" + worker.path)
sender ! worker
- case (props: Props, name: String) =>
+ case (props: Props, name: String) ⇒
val worker = context.actorOf(props, name)
logInfo("Started receiver worker at:" + worker.path)
sender ! worker
case _: PossiblyHarmful => hiccups.incrementAndGet()
- case _: Statistcs =>
+ case _: Statistics ⇒
val workers = context.children
- sender ! Statistcs(n.get, workers.size, hiccups.get, workers.mkString("\n"))
-
- case Data(iter: Iterator[_]) => push(iter.asInstanceOf[Iterator[T]])
+ sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n"))
- case Data(msg) =>
- blocksGenerator += msg.asInstanceOf[T]
- n.incrementAndGet
}
}
- protected def push(iter: Iterator[T]) {
+ protected def pushBlock(iter: Iterator[T]) {
pushBlock("block-" + streamId + "-" + System.nanoTime(),
- iter, null, settings.storageLevel)
+ iter, null, storageLevel)
}
protected def onStart() = {
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index e6aecfbb76..c9f941c5b8 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -1,5 +1,11 @@
package spark.streaming
+import akka.actor.Actor
+import akka.actor.IO
+import akka.actor.IOManager
+import akka.actor.Props
+import akka.util.ByteString
+
import dstream.SparkFlumeEvent
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
@@ -7,6 +13,7 @@ import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import util.ManualClock
import spark.storage.StorageLevel
+import spark.streaming.receivers.Receiver
import spark.Logging
import scala.util.Random
import org.apache.commons.io.FileUtils
@@ -25,6 +32,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ val testPort = 9999
+
override def checkpointDir = "checkpoint"
after {
@@ -35,7 +44,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("network input stream") {
// Start the server
- val testPort = 9999
val testServer = new TestServer(testPort)
testServer.start()
@@ -181,8 +189,60 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Enable manual clock back again for other tests
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
}
+
+
+ test("actor input stream") {
+ // Start the server
+ val port = testPort
+ val testServer = new TestServer(port)
+ testServer.start()
+
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
+ StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ val outputStream = new TestOutputStream(networkStream, outputBuffer)
+ def output = outputBuffer.flatMap(x => x)
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ // Feed data to the server to send to the network receiver
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val input = 1 to 9
+ val expectedOutput = input.map(x => x.toString)
+ Thread.sleep(1000)
+ for (i <- 0 until input.size) {
+ testServer.send(input(i).toString)
+ Thread.sleep(500)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(1000)
+ logInfo("Stopping server")
+ testServer.stop()
+ logInfo("Stopping context")
+ ssc.stop()
+
+ // Verify whether data received was as expected
+ logInfo("--------------------------------")
+ logInfo("output.size = " + outputBuffer.size)
+ logInfo("output")
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("expected output.size = " + expectedOutput.size)
+ logInfo("expected output")
+ expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("--------------------------------")
+
+ // Verify whether all the elements received are as expected
+ // (whether the elements were received one in each interval is not verified)
+ assert(output.size === expectedOutput.size)
+ for (i <- 0 until output.size) {
+ assert(output(i) === expectedOutput(i))
+ }
+ }
}
+
/** This is server to test the network input stream */
class TestServer(port: Int) extends Logging {
@@ -242,3 +302,15 @@ object TestServer {
}
}
}
+
+class TestActor(port: Int) extends Actor with Receiver {
+
+ def bytesToString(byteString: ByteString) = byteString.utf8String
+
+ override def preStart = IOManager(context.system).connect(new InetSocketAddress(port))
+
+ def receive = {
+ case IO.Read(socket, bytes) =>
+ pushBlock(bytesToString(bytes))
+ }
+}