aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-22 18:10:00 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-22 18:10:00 -0800
commitfad2b82fc8fb49f2171af10cf7e408d8b8dd7349 (patch)
tree55a804861cd866c7583ae5074968571b76cd29ec /streaming/src/test
parent364cdb679cf2b0d5e6ed7ab89628f15594d7947f (diff)
downloadspark-fad2b82fc8fb49f2171af10cf7e408d8b8dd7349.tar.gz
spark-fad2b82fc8fb49f2171af10cf7e408d8b8dd7349.tar.bz2
spark-fad2b82fc8fb49f2171af10cf7e408d8b8dd7349.zip
Added support for saving input files of FileInputDStream to graph checkpoints. Modified 'file input stream with checkpoint' testcase to test recovery of pre-master-failure input files.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala64
1 files changed, 44 insertions, 20 deletions
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index d7ba7a5d17..4f6204f205 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -214,10 +214,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
//Thread.sleep(100)
}
val startTime = System.currentTimeMillis()
- /*while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- logInfo("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
- Thread.sleep(100)
- }*/
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
@@ -226,11 +222,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether data received by Spark Streaming was as expected
logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
- logInfo("output")
+ logInfo("output, size = " + outputBuffer.size)
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("expected output.size = " + expectedOutput.size)
- logInfo("expected output")
+ logInfo("expected output, size = " + expectedOutput.size)
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
@@ -256,8 +250,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
var ssc = new StreamingContext(master, framework, batchDuration)
ssc.checkpoint(checkpointDir, checkpointInterval)
- val filestream = ssc.textFileStream(testDir.toString)
- var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]])
+ val fileStream = ssc.textFileStream(testDir.toString)
+ val outputBuffer = new ArrayBuffer[Seq[Int]]
+ // Reduced over a large window to ensure that recovery from master failure
+ // requires reprocessing of all the files seen before the failure
+ val reducedStream = fileStream.map(_.toInt)
+ .reduceByWindow(_ + _, batchDuration * 30, batchDuration)
+ var outputStream = new TestOutputStream(reducedStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
@@ -266,31 +265,56 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
- Thread.sleep(100)
+ // wait to make sure that the file is written such that it gets shown in the file listings
+ Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
+ // wait to make sure that FileInputDStream picks up this file only and not any other file
+ Thread.sleep(500)
}
- Thread.sleep(500)
logInfo("Output = " + outputStream.output.mkString(","))
- assert(outputStream.output.size > 0)
+ assert(outputStream.output.size > 0, "No files processed before restart")
ssc.stop()
+ for (i <- Seq(4, 5, 6)) {
+ FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
+ Thread.sleep(1000)
+ }
+
// Restart stream computation from checkpoint and create more files to see whether
// they are being processed
logInfo("*********** RESTARTING ************")
ssc = new StreamingContext(checkpointDir)
ssc.start()
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- Thread.sleep(500)
- for (i <- Seq(4, 5, 6)) {
+ for (i <- Seq(7, 8, 9)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
- Thread.sleep(100)
+ Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
+ Thread.sleep(500)
}
- Thread.sleep(500)
- outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]]
- logInfo("Output = " + outputStream.output.mkString(","))
- assert(outputStream.output.size > 0)
+ Thread.sleep(1000)
+ assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()
+
+ // Append the new output to the old buffer
+ outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
+ outputBuffer ++= outputStream.output
+
+ // Verify whether data received by Spark Streaming was as expected
+ val expectedOutput = Seq(1, 3, 6, 28, 36, 45)
+ logInfo("--------------------------------")
+ logInfo("output, size = " + outputBuffer.size)
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("expected output, size = " + expectedOutput.size)
+ expectedOutput.foreach(x => logInfo("[" + x + "]"))
+ logInfo("--------------------------------")
+
+ // Verify whether all the elements received are as expected
+ assert(outputBuffer.size === expectedOutput.size)
+ for (i <- 0 until outputBuffer.size) {
+ assert(outputBuffer(i).size === 1)
+ assert(outputBuffer(i).head === expectedOutput(i))
+ }
}
}