From fd90daf850a922fe33c3638b18304d827953e2cb Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 10 Feb 2013 19:48:42 -0800 Subject: Fixed bugs in FileInputDStream and Scheduler that occasionally failed to reprocess old files after recovering from master failure. Completely modified spark.streaming.FailureTest to test multiple master failures using file input stream. --- .../main/scala/spark/streaming/DStreamGraph.scala | 2 + .../main/scala/spark/streaming/JobManager.scala | 4 +- .../src/main/scala/spark/streaming/Scheduler.scala | 8 +- .../src/main/scala/spark/streaming/Time.scala | 4 + .../spark/streaming/dstream/FileInputDStream.scala | 13 +- .../test/scala/spark/streaming/FailureSuite.scala | 281 +++++++++++++++------ 6 files changed, 221 insertions(+), 91 deletions(-) diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index d5a5496839..7aa9d20004 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -81,12 +81,14 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private[streaming] def generateRDDs(time: Time): Seq[Job] = { this.synchronized { + logInfo("Generating RDDs for time " + time) outputStreams.flatMap(outputStream => outputStream.generateJob(time)) } } private[streaming] def forgetOldRDDs(time: Time) { this.synchronized { + logInfo("Forgetting old RDDs for time " + time) outputStreams.foreach(_.forgetOldMetadata(time)) } } diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 5acdd01e58..8b18c7bc6a 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -15,8 +15,8 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { SparkEnv.set(ssc.env) try { val timeTaken = job.run() - logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format( - (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0)) + logInfo("Total delay: %.5f s for job %s of time %s (execution: %.5f s)".format( + (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, job.time.milliseconds, timeTaken / 1000.0)) } catch { case e: Exception => logError("Running " + job + " failed", e) diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index b77986a3ba..23a0f0974d 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -40,7 +40,11 @@ class Scheduler(ssc: StreamingContext) extends Logging { clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } // Reschedule the batches that were received but not processed before failure - ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time)) + //ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time)) + val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering) + println(pendingTimes.mkString(", ")) + pendingTimes.foreach(time => + graph.generateRDDs(time).foreach(jobManager.runJob)) // Restart the timer timer.restart(graph.zeroTime.milliseconds) logInfo("Scheduler's timer restarted") @@ -64,11 +68,11 @@ class Scheduler(ssc: StreamingContext) extends Logging { graph.generateRDDs(time).foreach(jobManager.runJob) graph.forgetOldRDDs(time) doCheckpoint(time) - logInfo("Generated RDDs for time " + time) } private def doCheckpoint(time: Time) { if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { + logInfo("Checkpointing graph for time " + time) val startTime = System.currentTimeMillis() ssc.graph.updateCheckpointData(time) checkpointWriter.write(new Checkpoint(ssc, time)) diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala index 5daeb761dd..8a6c9a5cb5 100644 --- a/streaming/src/main/scala/spark/streaming/Time.scala +++ b/streaming/src/main/scala/spark/streaming/Time.scala @@ -39,4 +39,8 @@ case class Time(private val millis: Long) { override def toString: String = (millis.toString + " ms") +} + +object Time { + val ordering = Ordering.by((time: Time) => time.millis) } \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala index c6ffb252ce..10ccb4318d 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala @@ -128,7 +128,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K private[streaming] class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) { - def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]] + def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]] override def update() { hadoopFiles.clear() @@ -139,11 +139,12 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K override def restore() { hadoopFiles.foreach { - case (time, files) => { - logInfo("Restoring Hadoop RDD for time " + time + " from files " + - files.mkString("[", ",", "]") ) - files - generatedRDDs += ((time, filesToRDD(files))) + case (t, f) => { + // Restore the metadata in both files and generatedRDDs + logInfo("Restoring files for time " + t + " - " + + f.mkString("[", ", ", "]") ) + files += ((t, f)) + generatedRDDs += ((t, filesToRDD(f))) } } } diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala index c4cfffbfc1..efaa098d2e 100644 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -1,58 +1,58 @@ package spark.streaming -import org.scalatest.BeforeAndAfter +import org.scalatest.{FunSuite, BeforeAndAfter} import org.apache.commons.io.FileUtils import java.io.File import scala.runtime.RichInt import scala.util.Random import spark.streaming.StreamingContext._ -import collection.mutable.ArrayBuffer +import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import spark.Logging +import com.google.common.io.Files /** * This testsuite tests master failures at random times while the stream is running using * the real clock. */ -class FailureSuite extends TestSuiteBase with BeforeAndAfter { +class FailureSuite extends FunSuite with BeforeAndAfter with Logging { + + var testDir: File = null + var checkpointDir: File = null + val batchDuration = Milliseconds(500) before { - FileUtils.deleteDirectory(new File(checkpointDir)) + testDir = Files.createTempDir() + checkpointDir = Files.createTempDir() } after { FailureSuite.reset() - FileUtils.deleteDirectory(new File(checkpointDir)) + FileUtils.deleteDirectory(checkpointDir) + FileUtils.deleteDirectory(testDir) // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") } - override def framework = "CheckpointSuite" - - override def batchDuration = Milliseconds(500) - - override def checkpointDir = "checkpoint" - - override def checkpointInterval = batchDuration - test("multiple failures with updateStateByKey") { val n = 30 // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ... - val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq - // Last output: [ (a, 465) ] for n=30 - val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) ) + val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq + // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ... + val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j)) val operation = (st: DStream[String]) => { val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) } - st.map(x => (x, 1)) - .updateStateByKey[RichInt](updateFunc) - .checkpoint(Seconds(2)) - .map(t => (t._1, t._2.self)) + st.flatMap(_.split(" ")) + .map(x => (x, 1)) + .updateStateByKey[RichInt](updateFunc) + .checkpoint(Seconds(2)) + .map(t => (t._1, t._2.self)) } - testOperationWithMultipleFailures(input, operation, lastOutput, n, n) + testOperationWithMultipleFailures(input, operation, expectedOutput) } test("multiple failures with reduceByKeyAndWindow") { @@ -60,17 +60,18 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter { val w = 100 assert(w > n, "Window should be much larger than the number of input sets in this test") // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ... - val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq - // Last output: [ (a, 465) ] - val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) ) + val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq + // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ... + val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j)) val operation = (st: DStream[String]) => { - st.map(x => (x, 1)) + st.flatMap(_.split(" ")) + .map(x => (x, 1)) .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration) .checkpoint(Seconds(2)) } - testOperationWithMultipleFailures(input, operation, lastOutput, n, n) + testOperationWithMultipleFailures(input, operation, expectedOutput) } @@ -79,113 +80,231 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter { * final set of output values is as expected or not. Checking the final value is * proof that no intermediate data was lost due to master failures. */ - def testOperationWithMultipleFailures[U: ClassManifest, V: ClassManifest]( - input: Seq[Seq[U]], - operation: DStream[U] => DStream[V], - lastExpectedOutput: Seq[V], - numBatches: Int, - numExpectedOutput: Int + def testOperationWithMultipleFailures( + input: Seq[String], + operation: DStream[String] => DStream[(String, Int)], + expectedOutput: Seq[(String, Int)] ) { - var ssc = setupStreams[U, V](input, operation) - val mergedOutput = new ArrayBuffer[Seq[V]]() + var ssc = setupStreamsWithFileStream(operation) + + val mergedOutput = new ArrayBuffer[(String, Int)]() + val lastExpectedOutput = expectedOutput.last + val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2 var totalTimeRan = 0L - while(totalTimeRan <= numBatches * batchDuration.milliseconds * 2) { - new KillingThread(ssc, numBatches * batchDuration.milliseconds.toInt / 4).start() - val (output, timeRan) = runStreamsWithRealClock[V](ssc, numBatches, numExpectedOutput) + // Start generating files in the a different thread + val fileGeneratingThread = new FileGeneratingThread(input, testDir.getPath, batchDuration.milliseconds) + fileGeneratingThread.start() + + // Repeatedly start and kill the streaming context until timed out or + // all expected output is generated + while(!FailureSuite.outputGenerated && !FailureSuite.timedOut) { + + // Start the thread to kill the streaming after some time + FailureSuite.failed = false + val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10) + killingThread.start() + + // Run the streams with real clock until last expected output is seen or timed out + val (output, timeRan) = runStreamsWithRealClock(ssc, lastExpectedOutput, maxTimeToRun - totalTimeRan) + if (killingThread.isAlive) killingThread.interrupt() + + // Merge output and time ran and see whether already timed out or not mergedOutput ++= output totalTimeRan += timeRan logInfo("New output = " + output) logInfo("Merged output = " + mergedOutput) logInfo("Total time spent = " + totalTimeRan) - val sleepTime = Random.nextInt(numBatches * batchDuration.milliseconds.toInt / 8) - logInfo( - "\n-------------------------------------------\n" + - " Restarting stream computation in " + sleepTime + " ms " + - "\n-------------------------------------------\n" - ) - Thread.sleep(sleepTime) - FailureSuite.failed = false - ssc = new StreamingContext(checkpointDir) + if (totalTimeRan > maxTimeToRun) { + FailureSuite.timedOut = true + } + + if (!FailureSuite.outputGenerated && !FailureSuite.timedOut) { + val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 2) + logInfo( + "\n-------------------------------------------\n" + + " Restarting stream computation in " + sleepTime + " ms " + + "\n-------------------------------------------\n" + ) + Thread.sleep(sleepTime) + } + + // Recreate the streaming context from checkpoint + ssc = new StreamingContext(checkpointDir.getPath) } ssc.stop() ssc = null + logInfo("Finished test after " + FailureSuite.failureCount + " failures") + + if (FailureSuite.timedOut) { + logWarning("Timed out with run time of "+ maxTimeToRun + " ms for " + + expectedOutput.size + " batches of " + batchDuration) + } + + // Verify whether the output is as expected + verifyOutput(mergedOutput, expectedOutput) + if (fileGeneratingThread.isAlive) fileGeneratingThread.interrupt() + } - // Verify whether the last output is the expected one - val lastOutput = mergedOutput(mergedOutput.lastIndexWhere(!_.isEmpty)) - assert(lastOutput.toSet === lastExpectedOutput.toSet) - logInfo("Finished computation after " + FailureSuite.failureCount + " failures") + /** Sets up the stream operations with file input stream */ + def setupStreamsWithFileStream( + operation: DStream[String] => DStream[(String, Int)] + ): StreamingContext = { + val ssc = new StreamingContext("local[4]", "FailureSuite", batchDuration) + ssc.checkpoint(checkpointDir.getPath) + val inputStream = ssc.textFileStream(testDir.getPath) + val operatedStream = operation(inputStream) + val outputBuffer = new ArrayBuffer[Seq[(String, Int)]] with SynchronizedBuffer[Seq[(String, Int)]] + val outputStream = new TestOutputStream(operatedStream, outputBuffer) + ssc.registerOutputStream(outputStream) + ssc } /** - * Runs the streams set up in `ssc` on real clock until the expected max number of + * Runs the streams set up in `ssc` on real clock. */ - def runStreamsWithRealClock[V: ClassManifest]( - ssc: StreamingContext, - numBatches: Int, - maxExpectedOutput: Int - ): (Seq[Seq[V]], Long) = { + def runStreamsWithRealClock( + ssc: StreamingContext, + lastExpectedOutput: (String, Int), + timeout: Long + ): (Seq[(String, Int)], Long) = { System.clearProperty("spark.streaming.clock") - assert(numBatches > 0, "Number of batches to run stream computation is zero") - assert(maxExpectedOutput > 0, "Max expected outputs after " + numBatches + " is zero") - logInfo("numBatches = " + numBatches + ", maxExpectedOutput = " + maxExpectedOutput) - // Get the output buffer - val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] + val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[(String, Int)]] val output = outputStream.output - val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong val startTime = System.currentTimeMillis() + // Functions to detect various conditions + def hasFailed = FailureSuite.failed + def isLastOutputGenerated = !output.flatMap(x => x).isEmpty && output(output.lastIndexWhere(!_.isEmpty)).head == lastExpectedOutput + def isTimedOut = System.currentTimeMillis() - startTime > timeout + + // Start the streaming computation and let it run while ... + // (i) StreamingContext has not been shut down yet + // (ii) The last expected output has not been generated yet + // (iii) Its not timed out yet try { - // Start computation ssc.start() - - // Wait until expected number of output items have been generated - while (output.size < maxExpectedOutput && System.currentTimeMillis() - startTime < waitTime && !FailureSuite.failed) { - logInfo("output.size = " + output.size + ", maxExpectedOutput = " + maxExpectedOutput) + while (!hasFailed && !isLastOutputGenerated && !isTimedOut) { Thread.sleep(100) } + logInfo("Has failed = " + hasFailed) + logInfo("Is last output generated = " + isLastOutputGenerated) + logInfo("Is timed out = " + isTimedOut) } catch { case e: Exception => logInfo("Exception while running streams: " + e) } finally { ssc.stop() } + + // Verify whether the output of each batch has only one element + assert(output.forall(_.size <= 1), "output of each batch should have only one element") + + // Set appropriate flags is timed out or output has been generated + if (isTimedOut) FailureSuite.timedOut = true + if (isLastOutputGenerated) FailureSuite.outputGenerated = true + val timeTaken = System.currentTimeMillis() - startTime logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms") - (output, timeTaken) + (output.flatMap(_.headOption), timeTaken) } + /** + * Verifies the output value are the same as expected. Since failures can lead to + * a batch being processed twice, a batches output may appear more than once + * consecutively. To avoid getting confused with those, we eliminate consecutive + * duplicate batch outputs of values from the `output`. As a result, the + * expected output should not have consecutive batches with the same values as output. + */ + def verifyOutput(output: Seq[(String, Int)], expectedOutput: Seq[(String, Int)]) { + // Verify whether expected outputs do not consecutive batches with same output + for (i <- 0 until expectedOutput.size - 1) { + assert(expectedOutput(i) != expectedOutput(i+1), + "Expected output has consecutive duplicate sequence of values") + } + // Match the output with the expected output + logInfo( + "\n-------------------------------------------\n" + + " Verifying output " + + "\n-------------------------------------------\n" + ) + logInfo("Expected output, size = " + expectedOutput.size) + logInfo(expectedOutput.mkString("[", ",", "]")) + logInfo("Output, size = " + output.size) + logInfo(output.mkString("[", ",", "]")) + output.foreach(o => + assert(expectedOutput.contains(o), "Expected value " + o + " not found") + ) + } } object FailureSuite { var failed = false + var outputGenerated = false + var timedOut = false var failureCount = 0 def reset() { failed = false + outputGenerated = false + timedOut = false failureCount = 0 } } -class KillingThread(ssc: StreamingContext, maxKillWaitTime: Int) extends Thread with Logging { +/** + * Thread to kill streaming context after some time. + */ +class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging { initLogging() override def run() { - var minKillWaitTime = if (FailureSuite.failureCount == 0) 3000 else 1000 // to allow the first checkpoint - val killWaitTime = minKillWaitTime + Random.nextInt(maxKillWaitTime) - logInfo("Kill wait time = " + killWaitTime) - Thread.sleep(killWaitTime.toLong) - logInfo( - "\n---------------------------------------\n" + - "Killing streaming context after " + killWaitTime + " ms" + - "\n---------------------------------------\n" - ) - if (ssc != null) ssc.stop() - FailureSuite.failed = true - FailureSuite.failureCount += 1 + try { + var minKillWaitTime = if (FailureSuite.failureCount == 0) 5000 else 1000 // to allow the first checkpoint + val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime) + logInfo("Kill wait time = " + killWaitTime) + Thread.sleep(killWaitTime) + logInfo( + "\n---------------------------------------\n" + + "Killing streaming context after " + killWaitTime + " ms" + + "\n---------------------------------------\n" + ) + if (ssc != null) { + ssc.stop() + FailureSuite.failed = true + FailureSuite.failureCount += 1 + } + logInfo("Killing thread exited") + } catch { + case ie: InterruptedException => logInfo("Killing thread interrupted") + case e: Exception => logWarning("Exception in killing thread", e) + } } } + +/** + * Thread to generate input files periodically with the desired text + */ +class FileGeneratingThread(input: Seq[String], testDir: String, interval: Long) + extends Thread with Logging { + initLogging() + + override def run() { + try { + Thread.sleep(5000) // To make sure that all the streaming context has been set up + for (i <- 0 until input.size) { + FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n") + Thread.sleep(interval) + } + logInfo("File generating thread exited") + } catch { + case ie: InterruptedException => logInfo("File generating thread interrupted") + case e: Exception => logWarning("File generating in killing thread", e) + } + } +} + -- cgit v1.2.3