diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-13 12:17:45 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-13 12:17:45 -0800 |
commit | 39addd380363c0371e935fae50983fe87158c1ac (patch) | |
tree | 0b428024e5ed5faac707ccb5b5573b0a2400d3b2 /streaming/src | |
parent | fd90daf850a922fe33c3638b18304d827953e2cb (diff) | |
download | spark-39addd380363c0371e935fae50983fe87158c1ac.tar.gz spark-39addd380363c0371e935fae50983fe87158c1ac.tar.bz2 spark-39addd380363c0371e935fae50983fe87158c1ac.zip |
Changed scheduler and file input stream to fix bugs in the driver fault tolerance. Added MasterFailureTest to rigorously test master fault tolerance with file input stream.
Diffstat (limited to 'streaming/src')
18 files changed, 693 insertions, 452 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 0eb6aad187..0c1b667c0a 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -292,7 +292,7 @@ abstract class DStream[T: ClassManifest] ( * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this - * (eg. ForEachDStream). + * to generate their own jobs. */ protected[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { @@ -308,19 +308,18 @@ abstract class DStream[T: ClassManifest] ( } /** - * Dereference RDDs that are older than rememberDuration. + * Clear metadata that are older than `rememberDuration` of this DStream. + * This is an internal method that should not be called directly. This default + * implementation clears the old generated RDDs. Subclasses of DStream may override + * this to clear their own metadata along with the generated RDDs. */ - protected[streaming] def forgetOldMetadata(time: Time) { + protected[streaming] def clearOldMetadata(time: Time) { var numForgotten = 0 - generatedRDDs.keys.foreach(t => { - if (t <= (time - rememberDuration)) { - generatedRDDs.remove(t) - numForgotten += 1 - logInfo("Forgot RDD of time " + t + " from " + this) - } - }) - logInfo("Forgot " + numForgotten + " RDDs from " + this) - dependencies.foreach(_.forgetOldMetadata(time)) + val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) + generatedRDDs --= oldRDDs.keys + logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " + + (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) + dependencies.foreach(_.clearOldMetadata(time)) } /* Adds metadata to the Stream while it is running. diff --git a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala index a375980b84..6b0fade7c6 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala @@ -87,7 +87,7 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) } override def toString() = { - "[\n" + checkpointFiles.size + "\n" + checkpointFiles.mkString("\n") + "\n]" + "[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]" } } diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index 7aa9d20004..22d9e24f05 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -11,17 +11,20 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() - private[streaming] var zeroTime: Time = null - private[streaming] var batchDuration: Duration = null - private[streaming] var rememberDuration: Duration = null - private[streaming] var checkpointInProgress = false + var rememberDuration: Duration = null + var checkpointInProgress = false - private[streaming] def start(time: Time) { + var zeroTime: Time = null + var startTime: Time = null + var batchDuration: Duration = null + + def start(time: Time) { this.synchronized { if (zeroTime != null) { throw new Exception("DStream graph computation already started") } zeroTime = time + startTime = time outputStreams.foreach(_.initialize(zeroTime)) outputStreams.foreach(_.remember(rememberDuration)) outputStreams.foreach(_.validate) @@ -29,19 +32,23 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } } - private[streaming] def stop() { + def restart(time: Time) { + this.synchronized { startTime = time } + } + + def stop() { this.synchronized { inputStreams.par.foreach(_.stop()) } } - private[streaming] def setContext(ssc: StreamingContext) { + def setContext(ssc: StreamingContext) { this.synchronized { outputStreams.foreach(_.setContext(ssc)) } } - private[streaming] def setBatchDuration(duration: Duration) { + def setBatchDuration(duration: Duration) { this.synchronized { if (batchDuration != null) { throw new Exception("Batch duration already set as " + batchDuration + @@ -51,61 +58,61 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { batchDuration = duration } - private[streaming] def remember(duration: Duration) { + def remember(duration: Duration) { this.synchronized { if (rememberDuration != null) { throw new Exception("Batch duration already set as " + batchDuration + ". cannot set it again.") } + rememberDuration = duration } - rememberDuration = duration } - private[streaming] def addInputStream(inputStream: InputDStream[_]) { + def addInputStream(inputStream: InputDStream[_]) { this.synchronized { inputStream.setGraph(this) inputStreams += inputStream } } - private[streaming] def addOutputStream(outputStream: DStream[_]) { + def addOutputStream(outputStream: DStream[_]) { this.synchronized { outputStream.setGraph(this) outputStreams += outputStream } } - private[streaming] def getInputStreams() = this.synchronized { inputStreams.toArray } + def getInputStreams() = this.synchronized { inputStreams.toArray } - private[streaming] def getOutputStreams() = this.synchronized { outputStreams.toArray } + def getOutputStreams() = this.synchronized { outputStreams.toArray } - private[streaming] def generateRDDs(time: Time): Seq[Job] = { + def generateRDDs(time: Time): Seq[Job] = { this.synchronized { logInfo("Generating RDDs for time " + time) outputStreams.flatMap(outputStream => outputStream.generateJob(time)) } } - private[streaming] def forgetOldRDDs(time: Time) { + def clearOldMetadata(time: Time) { this.synchronized { - logInfo("Forgetting old RDDs for time " + time) - outputStreams.foreach(_.forgetOldMetadata(time)) + logInfo("Clearing old metadata for time " + time) + outputStreams.foreach(_.clearOldMetadata(time)) } } - private[streaming] def updateCheckpointData(time: Time) { + def updateCheckpointData(time: Time) { this.synchronized { outputStreams.foreach(_.updateCheckpointData(time)) } } - private[streaming] def restoreCheckpointData() { + def restoreCheckpointData() { this.synchronized { outputStreams.foreach(_.restoreCheckpointData()) } } - private[streaming] def validate() { + def validate() { this.synchronized { assert(batchDuration != null, "Batch duration has not been set") //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low") diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 8b18c7bc6a..649494ff4a 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -38,13 +38,19 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { logInfo("Added " + job + " to queue") } + def stop() { + jobExecutor.shutdown() + } + private def clearJob(job: Job) { jobs.synchronized { - val jobsOfTime = jobs.get(job.time) + val time = job.time + val jobsOfTime = jobs.get(time) if (jobsOfTime.isDefined) { jobsOfTime.get -= job if (jobsOfTime.get.isEmpty) { - jobs -= job.time + ssc.scheduler.clearOldMetadata(time) + jobs -= time } } else { throw new Exception("Job finished for time " + job.time + diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index 23a0f0974d..57d494da83 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -9,11 +9,8 @@ class Scheduler(ssc: StreamingContext) extends Logging { initLogging() - val graph = ssc.graph - val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt val jobManager = new JobManager(ssc, concurrentJobs) - val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { new CheckpointWriter(ssc.checkpointDir) } else { @@ -24,53 +21,80 @@ class Scheduler(ssc: StreamingContext) extends Logging { val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => generateRDDs(new Time(longTime))) + val graph = ssc.graph - def start() { - // If context was started from checkpoint, then restart timer such that - // this timer's triggers occur at the same time as the original timer. - // Otherwise just start the timer from scratch, and initialize graph based - // on this first trigger time of the timer. + def start() = synchronized { if (ssc.isCheckpointPresent) { - // If manual clock is being used for testing, then - // either set the manual clock to the last checkpointed time, - // or if the property is defined set it to that time - if (clock.isInstanceOf[ManualClock]) { - val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds - val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong - clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) - } - // Reschedule the batches that were received but not processed before failure - //ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time)) - val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering) - println(pendingTimes.mkString(", ")) - pendingTimes.foreach(time => - graph.generateRDDs(time).foreach(jobManager.runJob)) - // Restart the timer - timer.restart(graph.zeroTime.milliseconds) - logInfo("Scheduler's timer restarted") + restart() } else { - val firstTime = new Time(timer.start()) - graph.start(firstTime - ssc.graph.batchDuration) - logInfo("Scheduler's timer started") + startFirstTime() } logInfo("Scheduler started") } - def stop() { + def stop() = synchronized { timer.stop() - graph.stop() + jobManager.stop() + ssc.graph.stop() logInfo("Scheduler stopped") } - - private def generateRDDs(time: Time) { + + private def startFirstTime() { + val startTime = new Time(timer.getStartTime()) + graph.start(startTime - graph.batchDuration) + timer.start(startTime.milliseconds) + logInfo("Scheduler's timer started at " + startTime) + } + + private def restart() { + + // If manual clock is being used for testing, then + // either set the manual clock to the last checkpointed time, + // or if the property is defined set it to that time + if (clock.isInstanceOf[ManualClock]) { + val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds + val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong + clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) + } + + val batchDuration = ssc.graph.batchDuration + + // Batches when the master was down, that is, + // between the checkpoint and current restart time + val checkpointTime = ssc.initialCheckpoint.checkpointTime + val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) + val downTimes = checkpointTime.until(restartTime, batchDuration) + logInfo("Batches during down time: " + downTimes.mkString(", ")) + + // Batches that were unprocessed before failure + val pendingTimes = ssc.initialCheckpoint.pendingTimes + logInfo("Batches pending processing: " + pendingTimes.mkString(", ")) + // Reschedule jobs for these times + val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) + logInfo("Batches to reschedule: " + timesToReschedule.mkString(", ")) + timesToReschedule.foreach(time => + graph.generateRDDs(time).foreach(jobManager.runJob) + ) + + // Restart the timer + timer.start(restartTime.milliseconds) + logInfo("Scheduler's timer restarted") + } + + /** Generates the RDDs, clears old metadata and does checkpoint for the given time */ + def generateRDDs(time: Time) { SparkEnv.set(ssc.env) logInfo("\n-----------------------------------------------------\n") graph.generateRDDs(time).foreach(jobManager.runJob) - graph.forgetOldRDDs(time) doCheckpoint(time) } - private def doCheckpoint(time: Time) { + + def clearOldMetadata(time: Time) { + ssc.graph.clearOldMetadata(time) + } + + def doCheckpoint(time: Time) { if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) val startTime = System.currentTimeMillis() diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala index 8a6c9a5cb5..8201e84a20 100644 --- a/streaming/src/main/scala/spark/streaming/Time.scala +++ b/streaming/src/main/scala/spark/streaming/Time.scala @@ -37,6 +37,16 @@ case class Time(private val millis: Long) { def max(that: Time): Time = if (this > that) this else that + def until(that: Time, interval: Duration): Seq[Time] = { + assert(that > this, "Cannot create sequence as " + that + " not more than " + this) + assert( + (that - this).isMultipleOf(interval), + "Cannot create sequence as gap between " + that + " and " + + this + " is not multiple of " + interval + ) + (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_)) + } + override def toString: String = (millis.toString + " ms") } diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala index 10ccb4318d..41b9bd9461 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala @@ -21,19 +21,21 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData + // Latest file mod time seen till any point of time private val lastModTimeFiles = new HashSet[String]() private var lastModTime = 0L @transient private var path_ : Path = null @transient private var fs_ : FileSystem = null - @transient private var files = new HashMap[Time, Array[String]] + @transient private[streaming] var files = new HashMap[Time, Array[String]] override def start() { if (newFilesOnly) { - lastModTime = System.currentTimeMillis() + lastModTime = graph.zeroTime.milliseconds } else { lastModTime = 0 } + logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly) } override def stop() { } @@ -43,38 +45,50 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K * a union RDD out of them. Note that this maintains the list of files that were processed * in the latest modification time in the previous call to this method. This is because the * modification time returned by the FileStatus API seems to return times only at the - * granularity of seconds. Hence, new files may have the same modification time as the - * latest modification time in the previous call to this method and the list of files - * maintained is used to filter the one that have been processed. + * granularity of seconds. And new files may have the same modification time as the + * latest modification time in the previous call to this method yet was not reported in + * the previous call. */ override def compute(validTime: Time): Option[RDD[(K, V)]] = { + assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime) + // Create the filter for selecting new files val newFilter = new PathFilter() { + // Latest file mod time seen in this round of fetching files and its corresponding files var latestModTime = 0L val latestModTimeFiles = new HashSet[String]() def accept(path: Path): Boolean = { - if (!filter(path)) { + if (!filter(path)) { // Reject file if it does not satisfy filter + logDebug("Rejected by filter " + path) return false - } else { + } else { // Accept file only if val modTime = fs.getFileStatus(path).getModificationTime() - if (modTime < lastModTime){ - return false + logDebug("Mod time for " + path + " is " + modTime) + if (modTime < lastModTime) { + logDebug("Mod time less than last mod time") + return false // If the file was created before the last time it was called } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) { - return false + logDebug("Mod time equal to last mod time, but file considered already") + return false // If the file was created exactly as lastModTime but not reported yet + } else if (modTime > validTime.milliseconds) { + logDebug("Mod time more than valid time") + return false // If the file was created after the time this function call requires } if (modTime > latestModTime) { latestModTime = modTime latestModTimeFiles.clear() + logDebug("Latest mod time updated to " + latestModTime) } latestModTimeFiles += path.toString + logDebug("Accepted " + path) return true } } } - + logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime) val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString) - logInfo("New files: " + newFiles.mkString(", ")) + logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) if (newFiles.length > 0) { // Update the modification time and the files processed for that modification time if (lastModTime != newFilter.latestModTime) { @@ -82,17 +96,21 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K lastModTimeFiles.clear() } lastModTimeFiles ++= newFilter.latestModTimeFiles + logDebug("Last mod time updated to " + lastModTime) } files += ((validTime, newFiles)) Some(filesToRDD(newFiles)) } - /** Forget the old time-to-files mappings along with old RDDs */ - protected[streaming] override def forgetOldMetadata(time: Time) { - super.forgetOldMetadata(time) - val filesToBeRemoved = files.filter(_._1 <= (time - rememberDuration)) - files --= filesToBeRemoved.keys - logInfo("Forgot " + filesToBeRemoved.size + " files from " + this) + /** Clear the old time-to-files mappings along with old RDDs */ + protected[streaming] override def clearOldMetadata(time: Time) { + super.clearOldMetadata(time) + val oldFiles = files.filter(_._1 <= (time - rememberDuration)) + files --= oldFiles.keys + logInfo("Cleared " + oldFiles.size + " old files that were older than " + + (time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) + logDebug("Cleared files are:\n" + + oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) } /** Generate one RDD from an array of files */ @@ -148,6 +166,11 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K } } } + + override def toString() = { + "[\n" + hadoopFiles.size + " file sets\n" + + hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]" + } } } diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 8c322dd698..ecc75ec913 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -46,8 +46,15 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming def stop() {} override def compute(validTime: Time): Option[RDD[T]] = { - val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) - Some(new BlockRDD[T](ssc.sc, blockIds)) + // If this is called for any time before the start time of the context, + // then this returns an empty RDD. This may happen when recovering from a + // master failure forces + if (validTime >= graph.startTime) { + val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) + Some(new BlockRDD[T](ssc.sc, blockIds)) + } else { + Some(new BlockRDD[T](ssc.sc, Array[String]())) + } } } diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala new file mode 100644 index 0000000000..3ffe4b64d0 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -0,0 +1,375 @@ +package spark.streaming.util + +import spark.{Logging, RDD} +import spark.streaming._ +import spark.streaming.dstream.ForEachDStream +import StreamingContext._ + +import scala.util.Random +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} + +import java.io.{File, ObjectInputStream, IOException} +import java.util.UUID + +import com.google.common.io.Files + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.{FileUtil, FileSystem, Path} +import org.apache.hadoop.conf.Configuration + + +private[streaming] +object MasterFailureTest extends Logging { + initLogging() + + @volatile var killed = false + @volatile var killCount = 0 + + def main(args: Array[String]) { + if (args.size < 2) { + println( + "Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]") + System.exit(1) + } + val directory = args(0) + val numBatches = args(1).toInt + val batchDuration = if (args.size > 2) Milliseconds(args(2).toInt) else Seconds(1) + + println("\n\n========================= MAP TEST =========================\n\n") + testMap(directory, numBatches, batchDuration) + + println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n") + testUpdateStateByKey(directory, numBatches, batchDuration) + } + + def testMap(directory: String, numBatches: Int, batchDuration: Duration) { + // Input: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ... + val input = (1 to numBatches).map(_.toString).toSeq + // Expected output: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ... + val expectedOutput = (1 to numBatches) + + val operation = (st: DStream[String]) => st.map(_.toInt) + + // Run streaming operation with multiple master failures + val output = testOperation(directory, batchDuration, input, operation, expectedOutput) + + logInfo("Expected output, size = " + expectedOutput.size) + logInfo(expectedOutput.mkString("[", ",", "]")) + logInfo("Output, size = " + output.size) + logInfo(output.mkString("[", ",", "]")) + + // Verify whether all the values of the expected output is present + // in the output + assert(output.distinct.toSet == expectedOutput.toSet) + } + + + def testUpdateStateByKey(directory: String, numBatches: Int, batchDuration: Duration) { + // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ... + val input = (1 to numBatches).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq + // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ... + val expectedOutput = (1L to numBatches).map(i => (1L to i).reduce(_ + _)).map(j => ("a", j)) + + val operation = (st: DStream[String]) => { + val updateFunc = (values: Seq[Long], state: Option[Long]) => { + Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L)) + } + st.flatMap(_.split(" ")) + .map(x => (x, 1L)) + .updateStateByKey[Long](updateFunc) + .checkpoint(batchDuration * 5) + } + + // Run streaming operation with multiple master failures + val output = testOperation(directory, batchDuration, input, operation, expectedOutput) + + logInfo("Expected output, size = " + expectedOutput.size + "\n" + expectedOutput) + logInfo("Output, size = " + output.size + "\n" + output) + + // Verify whether all the values in the output are among the expected output values + output.foreach(o => + assert(expectedOutput.contains(o), "Expected value " + o + " not found") + ) + + // Verify whether the last expected output value has been generated, there by + // confirming that none of the inputs have been missed + assert(output.last == expectedOutput.last) + } + + /** + * Tests stream operation with multiple master failures, and verifies whether the + * final set of output values is as expected or not. + */ + def testOperation[T: ClassManifest]( + directory: String, + batchDuration: Duration, + input: Seq[String], + operation: DStream[String] => DStream[T], + expectedOutput: Seq[T] + ): Seq[T] = { + + // Just making sure that the expected output does not have duplicates + assert(expectedOutput.distinct.toSet == expectedOutput.toSet) + + // Setup the stream computation with the given operation + val (ssc, checkpointDir, testDir) = setupStreams(directory, batchDuration, operation) + + // Start generating files in the a different thread + val fileGeneratingThread = new FileGeneratingThread(input, testDir, batchDuration.milliseconds) + fileGeneratingThread.start() + + // Run the streams and repeatedly kill it until the last expected output + // has been generated, or until it has run for twice the expected time + val lastExpectedOutput = expectedOutput.last + val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2 + val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun) + + // Delete directories + fileGeneratingThread.join() + val fs = checkpointDir.getFileSystem(new Configuration()) + fs.delete(checkpointDir, true) + fs.delete(testDir, true) + logInfo("Finished test after " + killCount + " failures") + mergedOutput + } + + /** + * Sets up the stream computation with the given operation, directory (local or HDFS), + * and batch duration. Returns the streaming context and the directory to which + * files should be written for testing. + */ + private def setupStreams[T: ClassManifest]( + directory: String, + batchDuration: Duration, + operation: DStream[String] => DStream[T] + ): (StreamingContext, Path, Path) = { + // Reset all state + reset() + + // Create the directories for this test + val uuid = UUID.randomUUID().toString + val rootDir = new Path(directory, uuid) + val fs = rootDir.getFileSystem(new Configuration()) + val checkpointDir = new Path(rootDir, "checkpoint") + val testDir = new Path(rootDir, "test") + fs.mkdirs(checkpointDir) + fs.mkdirs(testDir) + + // Setup the streaming computation with the given operation + System.clearProperty("spark.driver.port") + var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration) + ssc.checkpoint(checkpointDir.toString) + val inputStream = ssc.textFileStream(testDir.toString) + val operatedStream = operation(inputStream) + val outputStream = new TestOutputStream(operatedStream) + ssc.registerOutputStream(outputStream) + (ssc, checkpointDir, testDir) + } + + + /** + * Repeatedly starts and kills the streaming context until timed out or + * the last expected output is generated. Finally, return + */ + private def runStreams[T: ClassManifest]( + ssc_ : StreamingContext, + lastExpectedOutput: T, + maxTimeToRun: Long + ): Seq[T] = { + + var ssc = ssc_ + var totalTimeRan = 0L + var isLastOutputGenerated = false + var isTimedOut = false + val mergedOutput = new ArrayBuffer[T]() + val checkpointDir = ssc.checkpointDir + var batchDuration = ssc.graph.batchDuration + + while(!isLastOutputGenerated && !isTimedOut) { + // Get the output buffer + val outputBuffer = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[T]].output + def output = outputBuffer.flatMap(x => x) + + // Start the thread to kill the streaming after some time + killed = false + val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10) + killingThread.start() + + var timeRan = 0L + try { + // Start the streaming computation and let it run while ... + // (i) StreamingContext has not been shut down yet + // (ii) The last expected output has not been generated yet + // (iii) Its not timed out yet + System.clearProperty("spark.streaming.clock") + System.clearProperty("spark.driver.port") + ssc.start() + val startTime = System.currentTimeMillis() + while (!killed && !isLastOutputGenerated && !isTimedOut) { + Thread.sleep(100) + timeRan = System.currentTimeMillis() - startTime + isLastOutputGenerated = (!output.isEmpty && output.last == lastExpectedOutput) + isTimedOut = (timeRan + totalTimeRan > maxTimeToRun) + } + } catch { + case e: Exception => logError("Error running streaming context", e) + } + if (killingThread.isAlive) killingThread.interrupt() + ssc.stop() + + logInfo("Has been killed = " + killed) + logInfo("Is last output generated = " + isLastOutputGenerated) + logInfo("Is timed out = " + isTimedOut) + + // Verify whether the output of each batch has only one element or no element + // and then merge the new output with all the earlier output + mergedOutput ++= output + totalTimeRan += timeRan + logInfo("New output = " + output) + logInfo("Merged output = " + mergedOutput) + logInfo("Time ran = " + timeRan) + logInfo("Total time ran = " + totalTimeRan) + + if (!isLastOutputGenerated && !isTimedOut) { + val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 10) + logInfo( + "\n-------------------------------------------\n" + + " Restarting stream computation in " + sleepTime + " ms " + + "\n-------------------------------------------\n" + ) + Thread.sleep(sleepTime) + // Recreate the streaming context from checkpoint + ssc = new StreamingContext(checkpointDir) + } + } + mergedOutput + } + + /** + * Verifies the output value are the same as expected. Since failures can lead to + * a batch being processed twice, a batches output may appear more than once + * consecutively. To avoid getting confused with those, we eliminate consecutive + * duplicate batch outputs of values from the `output`. As a result, the + * expected output should not have consecutive batches with the same values as output. + */ + private def verifyOutput[T: ClassManifest](output: Seq[T], expectedOutput: Seq[T]) { + // Verify whether expected outputs do not consecutive batches with same output + for (i <- 0 until expectedOutput.size - 1) { + assert(expectedOutput(i) != expectedOutput(i+1), + "Expected output has consecutive duplicate sequence of values") + } + + // Log the output + println("Expected output, size = " + expectedOutput.size) + println(expectedOutput.mkString("[", ",", "]")) + println("Output, size = " + output.size) + println(output.mkString("[", ",", "]")) + + // Match the output with the expected output + output.foreach(o => + assert(expectedOutput.contains(o), "Expected value " + o + " not found") + ) + } + + /** Resets counter to prepare for the test */ + private def reset() { + killed = false + killCount = 0 + } +} + +/** + * This is a output stream just for testing. All the output is collected into a + * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. + */ +private[streaming] +class TestOutputStream[T: ClassManifest]( + parent: DStream[T], + val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]] + ) extends ForEachDStream[T]( + parent, + (rdd: RDD[T], t: Time) => { + val collected = rdd.collect() + output += collected + println(t + ": " + collected.mkString("[", ",", "]")) + } + ) { + + // This is to clear the output buffer every it is read from a checkpoint + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + ois.defaultReadObject() + output.clear() + } +} + + +/** + * Thread to kill streaming context after a random period of time. + */ +private[streaming] +class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging { + initLogging() + + override def run() { + try { + // If it is the first killing, then allow the first checkpoint to be created + var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 1000 + val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime) + logInfo("Kill wait time = " + killWaitTime) + Thread.sleep(killWaitTime) + logInfo( + "\n---------------------------------------\n" + + "Killing streaming context after " + killWaitTime + " ms" + + "\n---------------------------------------\n" + ) + if (ssc != null) { + ssc.stop() + MasterFailureTest.killed = true + MasterFailureTest.killCount += 1 + } + logInfo("Killing thread finished normally") + } catch { + case ie: InterruptedException => logInfo("Killing thread interrupted") + case e: Exception => logWarning("Exception in killing thread", e) + } + + } +} + + +/** + * Thread to generate input files periodically with the desired text. + */ +private[streaming] +class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) + extends Thread with Logging { + initLogging() + + override def run() { + val localTestDir = Files.createTempDir() + val fs = testDir.getFileSystem(new Configuration()) + try { + Thread.sleep(5000) // To make sure that all the streaming context has been set up + for (i <- 0 until input.size) { + // Write the data to a local file and then move it to the target test directory + val localFile = new File(localTestDir, (i+1).toString) + val hadoopFile = new Path(testDir, (i+1).toString) + FileUtils.writeStringToFile(localFile, input(i).toString + "\n") + //fs.moveFromLocalFile(new Path(localFile.toString), new Path(testDir, i.toString)) + fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis) + Thread.sleep(interval) + localFile.delete() + } + logInfo("File generating thread finished normally") + } catch { + case ie: InterruptedException => logInfo("File generating thread interrupted") + case e: Exception => logWarning("File generating in killing thread", e) + } finally { + fs.close() + } + } +} + + diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala index db715cc295..8e10276deb 100644 --- a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala @@ -3,9 +3,9 @@ package spark.streaming.util private[streaming] class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) { - val minPollTime = 25L + private val minPollTime = 25L - val pollTime = { + private val pollTime = { if (period / 10.0 > minPollTime) { (period / 10.0).toLong } else { @@ -13,11 +13,20 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => } } - val thread = new Thread() { + private val thread = new Thread() { override def run() { loop } } - var nextTime = 0L + private var nextTime = 0L + + def getStartTime(): Long = { + (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period + } + + def getRestartTime(originalStartTime: Long): Long = { + val gap = clock.currentTime - originalStartTime + (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime + } def start(startTime: Long): Long = { nextTime = startTime @@ -26,21 +35,14 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => } def start(): Long = { - val startTime = (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period - start(startTime) + start(getStartTime()) } - def restart(originalStartTime: Long): Long = { - val gap = clock.currentTime - originalStartTime - val newStartTime = (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime - start(newStartTime) - } - - def stop() { + def stop() { thread.interrupt() } - def loop() { + private def loop() { try { while (true) { clock.waitTillTime(nextTime) diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index fbe4af4597..783a393a8f 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -33,7 +33,8 @@ public class JavaAPISuite implements Serializable { @Before public void setUp() { - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); ssc.checkpoint("checkpoint", new Duration(1000)); } @@ -45,7 +46,7 @@ public class JavaAPISuite implements Serializable { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port"); } - /* + @Test public void testCount() { List<List<Integer>> inputData = Arrays.asList( @@ -434,7 +435,7 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expected, result); } - */ + /* * Performs an order-invariant comparison of lists representing two RDD streams. This allows * us to account for ordering variation within individual RDD's which occurs during windowing. @@ -450,7 +451,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, actual); } - /* + // PairDStream Functions @Test public void testPairFilter() { @@ -897,7 +898,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } - */ + @Test public void testCheckpointMasterRecovery() throws InterruptedException { List<List<String>> inputData = Arrays.asList( @@ -964,7 +965,7 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expected, result1); } */ - /* + // Input stream tests. These mostly just test that we can instantiate a given InputStream with // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the // InputStream functionality is deferred to the existing Scala tests. @@ -972,9 +973,9 @@ public class JavaAPISuite implements Serializable { public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap(); - JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets); - JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets, + JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); + JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets); + JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets, StorageLevel.MEMORY_AND_DISK()); } @@ -1026,5 +1027,5 @@ public class JavaAPISuite implements Serializable { public void testFileStream() { JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); - }*/ + } } diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties index edfa1243fa..5652596e1e 100644 --- a/streaming/src/test/resources/log4j.properties +++ b/streaming/src/test/resources/log4j.properties @@ -1,6 +1,7 @@ # Set everything to be logged to the file streaming/target/unit-tests.log -log4j.rootCategory=INFO, file -log4j.appender.file=org.apache.log4j.FileAppender +log4j.rootCategory=WARN, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.ConsoleAppender log4j.appender.file.append=false log4j.appender.file.file=streaming/target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout @@ -8,4 +9,6 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.eclipse.jetty=WARN +log4j.logger.spark.streaming=INFO +log4j.logger.spark.streaming.dstream.FileInputDStream=DEBUG diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index c031949dd1..12388b8887 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -6,6 +6,8 @@ import util.ManualClock class BasicOperationsSuite extends TestSuiteBase { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + override def framework() = "BasicOperationsSuite" after { diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 7126af62d9..c89c4a8d43 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -1,5 +1,6 @@ package spark.streaming +import dstream.FileInputDStream import spark.streaming.StreamingContext._ import java.io.File import runtime.RichInt @@ -10,8 +11,16 @@ import util.{Clock, ManualClock} import scala.util.Random import com.google.common.io.Files + +/** + * This test suites tests the checkpointing functionality of DStreams - + * the checkpointing of a DStream's RDDs as well as the checkpointing of + * the whole DStream graph. + */ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + before { FileUtils.deleteDirectory(new File(checkpointDir)) } @@ -64,7 +73,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Run till a time such that at least one RDD in the stream should have been checkpointed, // then check whether some RDD has been checkpointed or not ssc.start() - runStreamsWithRealDelay(ssc, firstNumBatches) + advanceTimeWithRealDelay(ssc, firstNumBatches) logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData) assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure") stateStream.checkpointData.checkpointFiles.foreach { @@ -77,7 +86,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Run till a further time such that previous checkpoint files in the stream would be deleted // and check whether the earlier checkpoint files are deleted val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2)) - runStreamsWithRealDelay(ssc, secondNumBatches) + advanceTimeWithRealDelay(ssc, secondNumBatches) checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) ssc.stop() @@ -92,7 +101,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Run one batch to generate a new checkpoint file and check whether some RDD // is present in the checkpoint data or not ssc.start() - runStreamsWithRealDelay(ssc, 1) + advanceTimeWithRealDelay(ssc, 1) assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure") stateStream.checkpointData.checkpointFiles.foreach { case (time, data) => { @@ -113,7 +122,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Adjust manual clock time as if it is being restarted after a delay System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString) ssc.start() - runStreamsWithRealDelay(ssc, 4) + advanceTimeWithRealDelay(ssc, 4) ssc.stop() System.clearProperty("spark.streaming.manualClock.jump") ssc = null @@ -168,74 +177,95 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { } // This tests whether file input stream remembers what files were seen before - // the master failure and uses them again to process a large window operatoin. + // the master failure and uses them again to process a large window operation. // It also tests whether batches, whose processing was incomplete due to the // failure, are re-processed or not. test("recovery with file input stream") { + // Disable manual clock as FileInputDStream does not work with manual clock + val clockProperty = System.getProperty("spark.streaming.clock") + System.clearProperty("spark.streaming.clock") + // Set up the streaming context and input streams val testDir = Files.createTempDir() - var ssc = new StreamingContext(master, framework, batchDuration) + var ssc = new StreamingContext(master, framework, Seconds(1)) ssc.checkpoint(checkpointDir, checkpointInterval) val fileStream = ssc.textFileStream(testDir.toString) // Making value 3 take large time to process, to ensure that the master // shuts down in the middle of processing the 3rd batch val mappedStream = fileStream.map(s => { val i = s.toInt - if (i == 3) Thread.sleep(1000) + if (i == 3) Thread.sleep(2000) i }) + // Reducing over a large window to ensure that recovery from master failure // requires reprocessing of all the files seen before the failure - val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration) + val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1)) val outputBuffer = new ArrayBuffer[Seq[Int]] var outputStream = new TestOutputStream(reducedStream, outputBuffer) ssc.registerOutputStream(outputStream) ssc.start() // Create files and advance manual clock to process them - var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] Thread.sleep(1000) for (i <- Seq(1, 2, 3)) { FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") // wait to make sure that the file is written such that it gets shown in the file listings - Thread.sleep(500) - clock.addToTime(batchDuration.milliseconds) - // wait to make sure that FileInputDStream picks up this file only and not any other file - Thread.sleep(500) + Thread.sleep(1000) } logInfo("Output = " + outputStream.output.mkString(",")) assert(outputStream.output.size > 0, "No files processed before restart") ssc.stop() + // Verify whether files created have been recorded correctly or not + var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] + def recordedFiles = fileInputDStream.files.values.flatMap(x => x) + assert(!recordedFiles.filter(_.endsWith("1")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("2")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("3")).isEmpty) + // Create files while the master is down for (i <- Seq(4, 5, 6)) { FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") Thread.sleep(1000) } - // Restart stream computation from checkpoint and create more files to see whether - // they are being processed + // Recover context from checkpoint file and verify whether the files that were + // recorded before failure were saved and successfully recovered logInfo("*********** RESTARTING ************") ssc = new StreamingContext(checkpointDir) + fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] + assert(!recordedFiles.filter(_.endsWith("1")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("2")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("3")).isEmpty) + + // Restart stream computation ssc.start() - clock = ssc.scheduler.clock.asInstanceOf[ManualClock] for (i <- Seq(7, 8, 9)) { FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") - Thread.sleep(500) - clock.addToTime(batchDuration.milliseconds) - Thread.sleep(500) + Thread.sleep(1000) } Thread.sleep(1000) - logInfo("Output = " + outputStream.output.mkString(",")) + logInfo("Output = " + outputStream.output.mkString("[", ", ", "]")) assert(outputStream.output.size > 0, "No files processed after restart") ssc.stop() + // Verify whether files created while the driver was down have been recorded or not + assert(!recordedFiles.filter(_.endsWith("4")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("5")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("6")).isEmpty) + + // Verify whether new files created after recover have been recorded or not + assert(!recordedFiles.filter(_.endsWith("7")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("8")).isEmpty) + assert(!recordedFiles.filter(_.endsWith("9")).isEmpty) + // Append the new output to the old buffer outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] outputBuffer ++= outputStream.output - // Verify whether data received by Spark Streaming was as expected - val expectedOutput = Seq(1, 3, 6, 28, 36, 45) + val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45) logInfo("--------------------------------") logInfo("output, size = " + outputBuffer.size) outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) @@ -244,11 +274,17 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { logInfo("--------------------------------") // Verify whether all the elements received are as expected - assert(outputBuffer.size === expectedOutput.size) - for (i <- 0 until outputBuffer.size) { - assert(outputBuffer(i).size === 1) - assert(outputBuffer(i).head === expectedOutput(i)) - } + val output = outputBuffer.flatMap(x => x) + assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed + output.foreach(o => // To ensure all the inputs are correctly added cumulatively + assert(expectedOutput.contains(o), "Expected value " + o + " not found") + ) + // To ensure that all the inputs were received correctly + assert(expectedOutput.last === output.last) + + // Enable manual clock back again for other tests + if (clockProperty != null) + System.setProperty("spark.streaming.clock", clockProperty) } @@ -278,7 +314,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Do the computation for initial number of batches, create checkpoint file and quit ssc = setupStreams[U, V](input, operation) - val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs) + ssc.start() + val output = advanceTimeWithRealDelay[V](ssc, initialNumBatches) + ssc.stop() verifyOutput[V](output, expectedOutput.take(initialNumBatches), true) Thread.sleep(1000) @@ -289,17 +327,20 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { "\n-------------------------------------------\n" ) ssc = new StreamingContext(checkpointDir) - val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs) + System.clearProperty("spark.driver.port") + ssc.start() + val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) // the first element will be re-processed data of the last batch before restart verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true) + ssc.stop() ssc = null } /** * Advances the manual clock on the streaming scheduler by given number of batches. - * It also wait for the expected amount of time for each batch. + * It also waits for the expected amount of time for each batch. */ - def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) { + def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] logInfo("Manual clock before advancing = " + clock.time) for (i <- 1 to numBatches.toInt) { @@ -308,6 +349,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { } logInfo("Manual clock after advancing = " + clock.time) Thread.sleep(batchDuration.milliseconds) - } + val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] + outputStream.output + } }
\ No newline at end of file diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala index efaa098d2e..a5fa7ab92d 100644 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -1,14 +1,15 @@ package spark.streaming -import org.scalatest.{FunSuite, BeforeAndAfter} -import org.apache.commons.io.FileUtils -import java.io.File -import scala.runtime.RichInt -import scala.util.Random -import spark.streaming.StreamingContext._ -import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import spark.Logging +import spark.streaming.util.MasterFailureTest +import StreamingContext._ + +import org.scalatest.{FunSuite, BeforeAndAfter} import com.google.common.io.Files +import java.io.File +import org.apache.commons.io.FileUtils +import collection.mutable.ArrayBuffer + /** * This testsuite tests master failures at random times while the stream is running using @@ -16,295 +17,24 @@ import com.google.common.io.Files */ class FailureSuite extends FunSuite with BeforeAndAfter with Logging { - var testDir: File = null - var checkpointDir: File = null - val batchDuration = Milliseconds(500) + var directory = "FailureSuite" + val numBatches = 30 + val batchDuration = Milliseconds(1000) before { - testDir = Files.createTempDir() - checkpointDir = Files.createTempDir() + FileUtils.deleteDirectory(new File(directory)) } after { - FailureSuite.reset() - FileUtils.deleteDirectory(checkpointDir) - FileUtils.deleteDirectory(testDir) - - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") - } - - test("multiple failures with updateStateByKey") { - val n = 30 - // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ... - val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq - // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ... - val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j)) - - val operation = (st: DStream[String]) => { - val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { - Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) - } - st.flatMap(_.split(" ")) - .map(x => (x, 1)) - .updateStateByKey[RichInt](updateFunc) - .checkpoint(Seconds(2)) - .map(t => (t._1, t._2.self)) - } - - testOperationWithMultipleFailures(input, operation, expectedOutput) - } - - test("multiple failures with reduceByKeyAndWindow") { - val n = 30 - val w = 100 - assert(w > n, "Window should be much larger than the number of input sets in this test") - // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ... - val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq - // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ... - val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j)) - - val operation = (st: DStream[String]) => { - st.flatMap(_.split(" ")) - .map(x => (x, 1)) - .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration) - .checkpoint(Seconds(2)) - } - - testOperationWithMultipleFailures(input, operation, expectedOutput) + FileUtils.deleteDirectory(new File(directory)) } - - /** - * Tests stream operation with multiple master failures, and verifies whether the - * final set of output values is as expected or not. Checking the final value is - * proof that no intermediate data was lost due to master failures. - */ - def testOperationWithMultipleFailures( - input: Seq[String], - operation: DStream[String] => DStream[(String, Int)], - expectedOutput: Seq[(String, Int)] - ) { - var ssc = setupStreamsWithFileStream(operation) - - val mergedOutput = new ArrayBuffer[(String, Int)]() - val lastExpectedOutput = expectedOutput.last - - val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2 - var totalTimeRan = 0L - - // Start generating files in the a different thread - val fileGeneratingThread = new FileGeneratingThread(input, testDir.getPath, batchDuration.milliseconds) - fileGeneratingThread.start() - - // Repeatedly start and kill the streaming context until timed out or - // all expected output is generated - while(!FailureSuite.outputGenerated && !FailureSuite.timedOut) { - - // Start the thread to kill the streaming after some time - FailureSuite.failed = false - val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10) - killingThread.start() - - // Run the streams with real clock until last expected output is seen or timed out - val (output, timeRan) = runStreamsWithRealClock(ssc, lastExpectedOutput, maxTimeToRun - totalTimeRan) - if (killingThread.isAlive) killingThread.interrupt() - - // Merge output and time ran and see whether already timed out or not - mergedOutput ++= output - totalTimeRan += timeRan - logInfo("New output = " + output) - logInfo("Merged output = " + mergedOutput) - logInfo("Total time spent = " + totalTimeRan) - if (totalTimeRan > maxTimeToRun) { - FailureSuite.timedOut = true - } - - if (!FailureSuite.outputGenerated && !FailureSuite.timedOut) { - val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 2) - logInfo( - "\n-------------------------------------------\n" + - " Restarting stream computation in " + sleepTime + " ms " + - "\n-------------------------------------------\n" - ) - Thread.sleep(sleepTime) - } - - // Recreate the streaming context from checkpoint - ssc = new StreamingContext(checkpointDir.getPath) - } - ssc.stop() - ssc = null - logInfo("Finished test after " + FailureSuite.failureCount + " failures") - - if (FailureSuite.timedOut) { - logWarning("Timed out with run time of "+ maxTimeToRun + " ms for " + - expectedOutput.size + " batches of " + batchDuration) - } - - // Verify whether the output is as expected - verifyOutput(mergedOutput, expectedOutput) - if (fileGeneratingThread.isAlive) fileGeneratingThread.interrupt() + test("multiple failures with map") { + MasterFailureTest.testMap(directory, numBatches, batchDuration) } - /** Sets up the stream operations with file input stream */ - def setupStreamsWithFileStream( - operation: DStream[String] => DStream[(String, Int)] - ): StreamingContext = { - val ssc = new StreamingContext("local[4]", "FailureSuite", batchDuration) - ssc.checkpoint(checkpointDir.getPath) - val inputStream = ssc.textFileStream(testDir.getPath) - val operatedStream = operation(inputStream) - val outputBuffer = new ArrayBuffer[Seq[(String, Int)]] with SynchronizedBuffer[Seq[(String, Int)]] - val outputStream = new TestOutputStream(operatedStream, outputBuffer) - ssc.registerOutputStream(outputStream) - ssc - } - - /** - * Runs the streams set up in `ssc` on real clock. - */ - def runStreamsWithRealClock( - ssc: StreamingContext, - lastExpectedOutput: (String, Int), - timeout: Long - ): (Seq[(String, Int)], Long) = { - - System.clearProperty("spark.streaming.clock") - - // Get the output buffer - val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[(String, Int)]] - val output = outputStream.output - val startTime = System.currentTimeMillis() - - // Functions to detect various conditions - def hasFailed = FailureSuite.failed - def isLastOutputGenerated = !output.flatMap(x => x).isEmpty && output(output.lastIndexWhere(!_.isEmpty)).head == lastExpectedOutput - def isTimedOut = System.currentTimeMillis() - startTime > timeout - - // Start the streaming computation and let it run while ... - // (i) StreamingContext has not been shut down yet - // (ii) The last expected output has not been generated yet - // (iii) Its not timed out yet - try { - ssc.start() - while (!hasFailed && !isLastOutputGenerated && !isTimedOut) { - Thread.sleep(100) - } - logInfo("Has failed = " + hasFailed) - logInfo("Is last output generated = " + isLastOutputGenerated) - logInfo("Is timed out = " + isTimedOut) - } catch { - case e: Exception => logInfo("Exception while running streams: " + e) - } finally { - ssc.stop() - } - - // Verify whether the output of each batch has only one element - assert(output.forall(_.size <= 1), "output of each batch should have only one element") - - // Set appropriate flags is timed out or output has been generated - if (isTimedOut) FailureSuite.timedOut = true - if (isLastOutputGenerated) FailureSuite.outputGenerated = true - - val timeTaken = System.currentTimeMillis() - startTime - logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms") - (output.flatMap(_.headOption), timeTaken) - } - - /** - * Verifies the output value are the same as expected. Since failures can lead to - * a batch being processed twice, a batches output may appear more than once - * consecutively. To avoid getting confused with those, we eliminate consecutive - * duplicate batch outputs of values from the `output`. As a result, the - * expected output should not have consecutive batches with the same values as output. - */ - def verifyOutput(output: Seq[(String, Int)], expectedOutput: Seq[(String, Int)]) { - // Verify whether expected outputs do not consecutive batches with same output - for (i <- 0 until expectedOutput.size - 1) { - assert(expectedOutput(i) != expectedOutput(i+1), - "Expected output has consecutive duplicate sequence of values") - } - - // Match the output with the expected output - logInfo( - "\n-------------------------------------------\n" + - " Verifying output " + - "\n-------------------------------------------\n" - ) - logInfo("Expected output, size = " + expectedOutput.size) - logInfo(expectedOutput.mkString("[", ",", "]")) - logInfo("Output, size = " + output.size) - logInfo(output.mkString("[", ",", "]")) - output.foreach(o => - assert(expectedOutput.contains(o), "Expected value " + o + " not found") - ) - } -} - -object FailureSuite { - var failed = false - var outputGenerated = false - var timedOut = false - var failureCount = 0 - - def reset() { - failed = false - outputGenerated = false - timedOut = false - failureCount = 0 - } -} - -/** - * Thread to kill streaming context after some time. - */ -class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging { - initLogging() - - override def run() { - try { - var minKillWaitTime = if (FailureSuite.failureCount == 0) 5000 else 1000 // to allow the first checkpoint - val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime) - logInfo("Kill wait time = " + killWaitTime) - Thread.sleep(killWaitTime) - logInfo( - "\n---------------------------------------\n" + - "Killing streaming context after " + killWaitTime + " ms" + - "\n---------------------------------------\n" - ) - if (ssc != null) { - ssc.stop() - FailureSuite.failed = true - FailureSuite.failureCount += 1 - } - logInfo("Killing thread exited") - } catch { - case ie: InterruptedException => logInfo("Killing thread interrupted") - case e: Exception => logWarning("Exception in killing thread", e) - } - } -} - -/** - * Thread to generate input files periodically with the desired text - */ -class FileGeneratingThread(input: Seq[String], testDir: String, interval: Long) - extends Thread with Logging { - initLogging() - - override def run() { - try { - Thread.sleep(5000) // To make sure that all the streaming context has been set up - for (i <- 0 until input.size) { - FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n") - Thread.sleep(interval) - } - logInfo("File generating thread exited") - } catch { - case ie: InterruptedException => logInfo("File generating thread interrupted") - case e: Exception => logWarning("File generating in killing thread", e) - } + test("multiple failures with updateStateByKey") { + MasterFailureTest.testUpdateStateByKey(directory, numBatches, batchDuration) } } diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 0eb9c7b81e..7c1c2e1040 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -133,26 +133,29 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { test("file input stream") { + // Disable manual clock as FileInputDStream does not work with manual clock + System.clearProperty("spark.streaming.clock") + // Set up the streaming context and input streams val testDir = Files.createTempDir() val ssc = new StreamingContext(master, framework, batchDuration) - val filestream = ssc.textFileStream(testDir.toString) + val fileStream = ssc.textFileStream(testDir.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) - val outputStream = new TestOutputStream(filestream, outputBuffer) + val outputStream = new TestOutputStream(fileStream, outputBuffer) ssc.registerOutputStream(outputStream) ssc.start() // Create files in the temporary directory so that Spark Streaming can read data from it - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) val expectedOutput = input.map(_.toString) Thread.sleep(1000) for (i <- 0 until input.size) { - FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n") - Thread.sleep(500) - clock.addToTime(batchDuration.milliseconds) - //Thread.sleep(100) + val file = new File(testDir, i.toString) + FileUtils.writeStringToFile(file, input(i).toString + "\n") + logInfo("Created file " + file) + Thread.sleep(batchDuration.milliseconds) + Thread.sleep(1000) } val startTime = System.currentTimeMillis() Thread.sleep(1000) @@ -171,16 +174,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether all the elements received are as expected // (whether the elements were received one in each interval is not verified) - assert(output.size === expectedOutput.size) - for (i <- 0 until output.size) { - assert(output(i).size === 1) - assert(output(i).head.toString === expectedOutput(i)) - } + assert(output.toList === expectedOutput.toList) + FileUtils.deleteDirectory(testDir) + + // Enable manual clock back again for other tests + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") } } - +/** This is server to test the network input stream */ class TestServer(port: Int) extends Logging { val queue = new ArrayBlockingQueue[String](100) diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala index c2733831b2..2cc31d6137 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -63,20 +63,28 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu */ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { + // Name of the framework for Spark context def framework = "TestSuiteBase" + // Master for Spark context def master = "local[2]" + // Batch duration def batchDuration = Seconds(1) + // Directory where the checkpoint data will be saved def checkpointDir = "checkpoint" + // Duration after which the graph is checkpointed def checkpointInterval = batchDuration + // Number of partitions of the input parallel collections created for testing def numInputPartitions = 2 + // Maximum time to wait before the test times out def maxWaitTimeMillis = 10000 + // Whether to actually wait in real time before changing manual clock def actuallyWait = false /** @@ -140,9 +148,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { numBatches: Int, numExpectedOutput: Int ): Seq[Seq[V]] = { - - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - assert(numBatches > 0, "Number of batches to run stream computation is zero") assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero") logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput) @@ -186,7 +191,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { } finally { ssc.stop() } - output } diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index cd9608df53..1080790147 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -5,6 +5,8 @@ import collection.mutable.ArrayBuffer class WindowOperationsSuite extends TestSuiteBase { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + override def framework = "WindowOperationsSuite" override def maxWaitTimeMillis = 20000 |