diff options
author | Denny <dennybritz@gmail.com> | 2012-11-12 19:39:29 -0800 |
---|---|---|
committer | Denny <dennybritz@gmail.com> | 2012-11-12 19:39:29 -0800 |
commit | 255b3e44c18e64a55afb184f39746780b391a496 (patch) | |
tree | 479fedcfdbf7e68bc6dc73188421a49fbc356a82 /streaming/src/test/scala | |
parent | 0fd4c93f1c349f052f633fea64f975d53976bd9c (diff) | |
parent | 564dd8c3f415746a68f05bde6ea2a0e7a7760b4c (diff) | |
download | spark-255b3e44c18e64a55afb184f39746780b391a496.tar.gz spark-255b3e44c18e64a55afb184f39746780b391a496.tar.bz2 spark-255b3e44c18e64a55afb184f39746780b391a496.zip |
Merge branch 'dev' into kafka
Diffstat (limited to 'streaming/src/test/scala')
-rw-r--r-- | streaming/src/test/scala/spark/streaming/CheckpointSuite.scala | 26 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala | 136 |
2 files changed, 134 insertions, 28 deletions
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 0450120061..0bcf207082 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -15,12 +15,16 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { } after { + + if (ssc != null) ssc.stop() FileUtils.deleteDirectory(new File(checkpointDir)) } + var ssc: StreamingContext = null + override def framework = "CheckpointSuite" - override def batchDuration = Milliseconds(500) + override def batchDuration = Milliseconds(200) override def checkpointDir = "checkpoint" @@ -30,12 +34,12 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { test("basic stream+rdd recovery") { - assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second") + assert(batchDuration === Milliseconds(200), "batchDuration for this test must be 1 second") assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration") System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - val stateStreamCheckpointInterval = Seconds(2) + val stateStreamCheckpointInterval = Seconds(1) // this ensure checkpointing occurs at least once val firstNumBatches = (stateStreamCheckpointInterval.millis / batchDuration.millis) * 2 @@ -110,6 +114,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { runStreamsWithRealDelay(ssc, 4) ssc.stop() System.clearProperty("spark.streaming.manualClock.jump") + ssc = null } test("map and reduceByKey") { @@ -131,9 +136,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration) .checkpoint(Seconds(2)) } - for (i <- Seq(2, 3, 4)) { - testCheckpointedOperation(input, operation, output, i) - } + testCheckpointedOperation(input, operation, output, 3) } test("updateStateByKey") { @@ -148,9 +151,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { .checkpoint(Seconds(2)) .map(t => (t._1, t._2.self)) } - for (i <- Seq(2, 3, 4)) { - testCheckpointedOperation(input, operation, output, i) - } + testCheckpointedOperation(input, operation, output, 3) } @@ -171,7 +172,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Do half the computation (half the number of batches), create checkpoint file and quit - val ssc = setupStreams[U, V](input, operation) + ssc = setupStreams[U, V](input, operation) val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs) verifyOutput[V](output, expectedOutput.take(initialNumBatches), true) Thread.sleep(1000) @@ -182,9 +183,10 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { " Restarting stream computation " + "\n-------------------------------------------\n" ) - val sscNew = new StreamingContext(checkpointDir) - val outputNew = runStreams[V](sscNew, nextNumBatches, nextNumExpectedOutputs) + ssc = new StreamingContext(checkpointDir) + val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs) verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true) + ssc = null } /** diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 8f892baab1..0957748603 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -9,12 +9,19 @@ import spark.storage.StorageLevel import spark.Logging import scala.util.Random import org.apache.commons.io.FileUtils +import org.scalatest.BeforeAndAfter -class InputStreamsSuite extends TestSuiteBase { +class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + override def checkpointDir = "checkpoint" + + after { + FileUtils.deleteDirectory(new File(checkpointDir)) + } + test("network input stream") { // Start the server val serverPort = 9999 @@ -30,7 +37,7 @@ class InputStreamsSuite extends TestSuiteBase { ssc.registerOutputStream(outputStream) ssc.start() - // Feed data to the server to send to the Spark Streaming network receiver + // Feed data to the server to send to the network receiver val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) val expectedOutput = input.map(_.toString) @@ -52,7 +59,7 @@ class InputStreamsSuite extends TestSuiteBase { logInfo("Stopping context") ssc.stop() - // Verify whether data received by Spark Streaming was as expected + // Verify whether data received was as expected logInfo("--------------------------------") logInfo("output.size = " + outputBuffer.size) logInfo("output") @@ -69,6 +76,49 @@ class InputStreamsSuite extends TestSuiteBase { } } + test("network input stream with checkpoint") { + // Start the server + val serverPort = 9999 + val server = new TestServer(9999) + server.start() + + // Set up the streaming context and input streams + var ssc = new StreamingContext(master, framework) + ssc.setBatchDuration(batchDuration) + ssc.checkpoint(checkpointDir, checkpointInterval) + val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK) + var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]]) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Feed data to the server to send to the network receiver + var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + for (i <- Seq(1, 2, 3)) { + server.send(i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + assert(outputStream.output.size > 0) + ssc.stop() + + // Restart stream computation from checkpoint and feed more data to see whether + // they are being received and processed + logInfo("*********** RESTARTING ************") + ssc = new StreamingContext(checkpointDir) + ssc.start() + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + for (i <- Seq(4, 5, 6)) { + server.send(i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]] + assert(outputStream.output.size > 0) + ssc.stop() + } + test("file input stream") { // Create a temporary directory val dir = { @@ -76,7 +126,7 @@ class InputStreamsSuite extends TestSuiteBase { temp.delete() temp.mkdirs() temp.deleteOnExit() - println("Created temp dir " + temp) + logInfo("Created temp dir " + temp) temp } @@ -84,7 +134,9 @@ class InputStreamsSuite extends TestSuiteBase { val ssc = new StreamingContext(master, framework) ssc.setBatchDuration(batchDuration) val filestream = ssc.textFileStream(dir.toString) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + def output = outputBuffer.flatMap(x => x) + val outputStream = new TestOutputStream(filestream, outputBuffer) ssc.registerOutputStream(outputStream) ssc.start() @@ -96,36 +148,88 @@ class InputStreamsSuite extends TestSuiteBase { Thread.sleep(1000) for (i <- 0 until input.size) { FileUtils.writeStringToFile(new File(dir, i.toString), input(i).toString + "\n") - Thread.sleep(500) + Thread.sleep(100) clock.addToTime(batchDuration.milliseconds) - Thread.sleep(500) + Thread.sleep(100) } val startTime = System.currentTimeMillis() - while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - println("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size) + while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + //println("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size) Thread.sleep(100) } Thread.sleep(1000) val timeTaken = System.currentTimeMillis() - startTime assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") - println("Stopping context") + logInfo("Stopping context") ssc.stop() // Verify whether data received by Spark Streaming was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) + logInfo("output.size = " + output.size) logInfo("output") - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + output.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("expected output.size = " + expectedOutput.size) logInfo("expected output") expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("--------------------------------") - assert(outputBuffer.size === expectedOutput.size) - for (i <- 0 until outputBuffer.size) { - assert(outputBuffer(i).size === 1) - assert(outputBuffer(i).head === expectedOutput(i)) + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + assert(output(i).size === 1) + assert(output(i).head.toString === expectedOutput(i)) + } + } + + test("file input stream with checkpoint") { + // Create a temporary directory + val dir = { + var temp = File.createTempFile(".temp.", Random.nextInt().toString) + temp.delete() + temp.mkdirs() + temp.deleteOnExit() + println("Created temp dir " + temp) + temp } + + // Set up the streaming context and input streams + var ssc = new StreamingContext(master, framework) + ssc.setBatchDuration(batchDuration) + ssc.checkpoint(checkpointDir, checkpointInterval) + val filestream = ssc.textFileStream(dir.toString) + var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]]) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Create files and advance manual clock to process them + var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + Thread.sleep(1000) + for (i <- Seq(1, 2, 3)) { + FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + logInfo("Output = " + outputStream.output.mkString(",")) + assert(outputStream.output.size > 0) + ssc.stop() + + // Restart stream computation from checkpoint and create more files to see whether + // they are being processed + logInfo("*********** RESTARTING ************") + ssc = new StreamingContext(checkpointDir) + ssc.start() + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + Thread.sleep(500) + for (i <- Seq(4, 5, 6)) { + FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]] + logInfo("Output = " + outputStream.output.mkString(",")) + assert(outputStream.output.size > 0) + ssc.stop() } } |