diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-10-09 14:50:36 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-10-09 14:50:36 -0700 |
commit | 1faa1135a3fc0acd89f934f01a4a2edefcb93d33 (patch) | |
tree | bbfa20287a5178f975f70e6bfa6c74a65c720d44 /streaming/src | |
parent | ec4d40e48186af18e25517e0474020720645f583 (diff) | |
download | spark-1faa1135a3fc0acd89f934f01a4a2edefcb93d33.tar.gz spark-1faa1135a3fc0acd89f934f01a4a2edefcb93d33.tar.bz2 spark-1faa1135a3fc0acd89f934f01a4a2edefcb93d33.zip |
Revert "[SPARK-2805] Upgrade to akka 2.3.4"
This reverts commit b9df8af62e8d7b263a668dfb6e9668ab4294ea37.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 6107fcdc44..952a74fd5f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.streaming import akka.actor.Actor +import akka.actor.IO +import akka.actor.IOManager import akka.actor.Props import akka.util.ByteString @@ -142,6 +144,59 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") } + // TODO: This test works in IntelliJ but not through SBT + ignore("actor input stream") { + // Start the server + val testServer = new TestServer() + val port = testServer.port + testServer.start() + + // Set up the streaming context and input streams + val ssc = new StreamingContext(conf, batchDuration) + val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor", + // Had to pass the local value of port to prevent from closing over entire scope + StorageLevel.MEMORY_AND_DISK) + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + val outputStream = new TestOutputStream(networkStream, outputBuffer) + def output = outputBuffer.flatMap(x => x) + outputStream.register() + ssc.start() + + // Feed data to the server to send to the network receiver + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = 1 to 9 + val expectedOutput = input.map(x => x.toString) + Thread.sleep(1000) + for (i <- 0 until input.size) { + testServer.send(input(i).toString) + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(1000) + logInfo("Stopping server") + testServer.stop() + logInfo("Stopping context") + ssc.stop() + + // Verify whether data received was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputBuffer.size) + logInfo("output") + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + assert(output(i) === expectedOutput(i)) + } + } + + test("multi-thread receiver") { // set up the test receiver val numThreads = 10 @@ -323,6 +378,22 @@ class TestServer(portToBind: Int = 0) extends Logging { def port = serverSocket.getLocalPort } +/** This is an actor for testing actor input stream */ +class TestActor(port: Int) extends Actor with ActorHelper { + + def bytesToString(byteString: ByteString) = byteString.utf8String + + override def preStart(): Unit = { + @deprecated("suppress compile time deprecation warning", "1.0.0") + val unit = IOManager(context.system).connect(new InetSocketAddress(port)) + } + + def receive = { + case IO.Read(socket, bytes) => + store(bytesToString(bytes)) + } +} + /** This is a receiver to test multiple threads inserting data using block generator */ class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging { |