diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-22 18:10:00 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-22 18:10:00 -0800 |
commit | fad2b82fc8fb49f2171af10cf7e408d8b8dd7349 (patch) | |
tree | 55a804861cd866c7583ae5074968571b76cd29ec /streaming/src/test | |
parent | 364cdb679cf2b0d5e6ed7ab89628f15594d7947f (diff) | |
download | spark-fad2b82fc8fb49f2171af10cf7e408d8b8dd7349.tar.gz spark-fad2b82fc8fb49f2171af10cf7e408d8b8dd7349.tar.bz2 spark-fad2b82fc8fb49f2171af10cf7e408d8b8dd7349.zip |
Added support for saving input files of FileInputDStream to graph checkpoints. Modified 'file input stream with checkpoint' testcase to test recovery of pre-master-failure input files.
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala | 64 |
1 files changed, 44 insertions, 20 deletions
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index d7ba7a5d17..4f6204f205 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -214,10 +214,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { //Thread.sleep(100) } val startTime = System.currentTimeMillis() - /*while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - logInfo("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size) - Thread.sleep(100) - }*/ Thread.sleep(1000) val timeTaken = System.currentTimeMillis() - startTime assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") @@ -226,11 +222,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether data received by Spark Streaming was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) - logInfo("output") + logInfo("output, size = " + outputBuffer.size) outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("expected output.size = " + expectedOutput.size) - logInfo("expected output") + logInfo("expected output, size = " + expectedOutput.size) expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("--------------------------------") @@ -256,8 +250,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams var ssc = new StreamingContext(master, framework, batchDuration) ssc.checkpoint(checkpointDir, checkpointInterval) - val filestream = ssc.textFileStream(testDir.toString) - var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]]) + val fileStream = ssc.textFileStream(testDir.toString) + val outputBuffer = new ArrayBuffer[Seq[Int]] + // Reduced over a large window to ensure that recovery from master failure + // requires reprocessing of all the files seen before the failure + val reducedStream = fileStream.map(_.toInt) + .reduceByWindow(_ + _, batchDuration * 30, batchDuration) + var outputStream = new TestOutputStream(reducedStream, outputBuffer) ssc.registerOutputStream(outputStream) ssc.start() @@ -266,31 +265,56 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { Thread.sleep(1000) for (i <- Seq(1, 2, 3)) { FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") - Thread.sleep(100) + // wait to make sure that the file is written such that it gets shown in the file listings + Thread.sleep(500) clock.addToTime(batchDuration.milliseconds) + // wait to make sure that FileInputDStream picks up this file only and not any other file + Thread.sleep(500) } - Thread.sleep(500) logInfo("Output = " + outputStream.output.mkString(",")) - assert(outputStream.output.size > 0) + assert(outputStream.output.size > 0, "No files processed before restart") ssc.stop() + for (i <- Seq(4, 5, 6)) { + FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Thread.sleep(1000) + } + // Restart stream computation from checkpoint and create more files to see whether // they are being processed logInfo("*********** RESTARTING ************") ssc = new StreamingContext(checkpointDir) ssc.start() clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - Thread.sleep(500) - for (i <- Seq(4, 5, 6)) { + for (i <- Seq(7, 8, 9)) { FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") - Thread.sleep(100) + Thread.sleep(500) clock.addToTime(batchDuration.milliseconds) + Thread.sleep(500) } - Thread.sleep(500) - outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]] - logInfo("Output = " + outputStream.output.mkString(",")) - assert(outputStream.output.size > 0) + Thread.sleep(1000) + assert(outputStream.output.size > 0, "No files processed after restart") ssc.stop() + + // Append the new output to the old buffer + outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] + outputBuffer ++= outputStream.output + + // Verify whether data received by Spark Streaming was as expected + val expectedOutput = Seq(1, 3, 6, 28, 36, 45) + logInfo("--------------------------------") + logInfo("output, size = " + outputBuffer.size) + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output, size = " + expectedOutput.size) + expectedOutput.foreach(x => logInfo("[" + x + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + assert(outputBuffer.size === expectedOutput.size) + for (i <- 0 until outputBuffer.size) { + assert(outputBuffer(i).size === 1) + assert(outputBuffer(i).head === expectedOutput(i)) + } } } |