aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-10-09 14:50:36 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-10-09 14:50:36 -0700
commit1faa1135a3fc0acd89f934f01a4a2edefcb93d33 (patch)
treebbfa20287a5178f975f70e6bfa6c74a65c720d44 /streaming
parentec4d40e48186af18e25517e0474020720645f583 (diff)
downloadspark-1faa1135a3fc0acd89f934f01a4a2edefcb93d33.tar.gz
spark-1faa1135a3fc0acd89f934f01a4a2edefcb93d33.tar.bz2
spark-1faa1135a3fc0acd89f934f01a4a2edefcb93d33.zip
Revert "[SPARK-2805] Upgrade to akka 2.3.4"
This reverts commit b9df8af62e8d7b263a668dfb6e9668ab4294ea37.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala71
1 files changed, 71 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 6107fcdc44..952a74fd5f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -18,6 +18,8 @@
package org.apache.spark.streaming
import akka.actor.Actor
+import akka.actor.IO
+import akka.actor.IOManager
import akka.actor.Props
import akka.util.ByteString
@@ -142,6 +144,59 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}
+ // TODO: This test works in IntelliJ but not through SBT
+ ignore("actor input stream") {
+ // Start the server
+ val testServer = new TestServer()
+ val port = testServer.port
+ testServer.start()
+
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
+ // Had to pass the local value of port to prevent from closing over entire scope
+ StorageLevel.MEMORY_AND_DISK)
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ val outputStream = new TestOutputStream(networkStream, outputBuffer)
+ def output = outputBuffer.flatMap(x => x)
+ outputStream.register()
+ ssc.start()
+
+ // Feed data to the server to send to the network receiver
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val input = 1 to 9
+ val expectedOutput = input.map(x => x.toString)
+ Thread.sleep(1000)
+ for (i <- 0 until input.size) {
+ testServer.send(input(i).toString)
+ Thread.sleep(500)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(1000)
+ logInfo("Stopping server")
+ testServer.stop()
+ logInfo("Stopping context")
+ ssc.stop()
+
+ // Verify whether data received was as expected
+ logInfo("--------------------------------")
+ logInfo("output.size = " + outputBuffer.size)
+ logInfo("output")
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("expected output.size = " + expectedOutput.size)
+ logInfo("expected output")
+ expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("--------------------------------")
+
+ // Verify whether all the elements received are as expected
+ // (whether the elements were received one in each interval is not verified)
+ assert(output.size === expectedOutput.size)
+ for (i <- 0 until output.size) {
+ assert(output(i) === expectedOutput(i))
+ }
+ }
+
+
test("multi-thread receiver") {
// set up the test receiver
val numThreads = 10
@@ -323,6 +378,22 @@ class TestServer(portToBind: Int = 0) extends Logging {
def port = serverSocket.getLocalPort
}
+/** This is an actor for testing actor input stream */
+class TestActor(port: Int) extends Actor with ActorHelper {
+
+ def bytesToString(byteString: ByteString) = byteString.utf8String
+
+ override def preStart(): Unit = {
+ @deprecated("suppress compile time deprecation warning", "1.0.0")
+ val unit = IOManager(context.system).connect(new InetSocketAddress(port))
+ }
+
+ def receive = {
+ case IO.Read(socket, bytes) =>
+ store(bytesToString(bytes))
+ }
+}
+
/** This is a receiver to test multiple threads inserting data using block generator */
class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging {