diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-22 18:10:00 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-22 18:10:00 -0800 |
commit | fad2b82fc8fb49f2171af10cf7e408d8b8dd7349 (patch) | |
tree | 55a804861cd866c7583ae5074968571b76cd29ec /streaming/src | |
parent | 364cdb679cf2b0d5e6ed7ab89628f15594d7947f (diff) | |
download | spark-fad2b82fc8fb49f2171af10cf7e408d8b8dd7349.tar.gz spark-fad2b82fc8fb49f2171af10cf7e408d8b8dd7349.tar.bz2 spark-fad2b82fc8fb49f2171af10cf7e408d8b8dd7349.zip |
Added support for saving input files of FileInputDStream to graph checkpoints. Modified 'file input stream with checkpoint' testcase to test recovery of pre-master-failure input files.
Diffstat (limited to 'streaming/src')
6 files changed, 159 insertions, 66 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 3c1861a840..07ecb018ee 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -86,7 +86,7 @@ abstract class DStream[T: ClassManifest] ( protected[streaming] def parentRememberDuration = rememberDuration /** Return the StreamingContext associated with this DStream */ - def context() = ssc + def context = ssc /** Persist the RDDs of this DStream with the given storage level */ def persist(level: StorageLevel): DStream[T] = { @@ -159,7 +159,7 @@ abstract class DStream[T: ClassManifest] ( ) assert( - checkpointDuration == null || ssc.sc.checkpointDir.isDefined, + checkpointDuration == null || context.sparkContext.checkpointDir.isDefined, "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" + " or SparkContext.checkpoint() to set the checkpoint directory." ) @@ -298,8 +298,8 @@ abstract class DStream[T: ClassManifest] ( getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { - val emptyFunc = { (iterator: Iterator[T]) => {} } - ssc.sc.runJob(rdd, emptyFunc) + val emptyFunc = { (iterator: Iterator[T]) => {} } + context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } @@ -310,10 +310,9 @@ abstract class DStream[T: ClassManifest] ( /** * Dereference RDDs that are older than rememberDuration. */ - protected[streaming] def forgetOldRDDs(time: Time) { - val keys = generatedRDDs.keys + protected[streaming] def forgetOldMetadata(time: Time) { var numForgotten = 0 - keys.foreach(t => { + generatedRDDs.keys.foreach(t => { if (t <= (time - rememberDuration)) { generatedRDDs.remove(t) numForgotten += 1 @@ -321,7 +320,7 @@ abstract class DStream[T: ClassManifest] ( } }) logInfo("Forgot " + numForgotten + " RDDs from " + this) - dependencies.foreach(_.forgetOldRDDs(time)) + dependencies.foreach(_.forgetOldMetadata(time)) } /* Adds metadata to the Stream while it is running. @@ -356,7 +355,7 @@ abstract class DStream[T: ClassManifest] ( */ protected[streaming] def restoreCheckpointData() { // Create RDDs from the checkpoint data - logInfo("Restoring checkpoint data from " + checkpointData.checkpointFiles.size + " checkpointed RDDs") + logInfo("Restoring checkpoint data") checkpointData.restore() dependencies.foreach(_.restoreCheckpointData()) logInfo("Restored checkpoint data") @@ -397,7 +396,7 @@ abstract class DStream[T: ClassManifest] ( /** Return a new DStream by applying a function to all elements of this DStream. */ def map[U: ClassManifest](mapFunc: T => U): DStream[U] = { - new MappedDStream(this, ssc.sc.clean(mapFunc)) + new MappedDStream(this, context.sparkContext.clean(mapFunc)) } /** @@ -405,7 +404,7 @@ abstract class DStream[T: ClassManifest] ( * and then flattening the results */ def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = { - new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc)) + new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) } /** Return a new DStream containing only the elements that satisfy a predicate. */ @@ -427,7 +426,7 @@ abstract class DStream[T: ClassManifest] ( mapPartFunc: Iterator[T] => Iterator[U], preservePartitioning: Boolean = false ): DStream[U] = { - new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc), preservePartitioning) + new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning) } /** @@ -456,7 +455,7 @@ abstract class DStream[T: ClassManifest] ( * this DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: (RDD[T], Time) => Unit) { - val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc)) + val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) ssc.registerOutputStream(newStream) newStream } @@ -474,7 +473,7 @@ abstract class DStream[T: ClassManifest] ( * on each RDD of this DStream. */ def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { - new TransformedDStream(this, ssc.sc.clean(transformFunc)) + new TransformedDStream(this, context.sparkContext.clean(transformFunc)) } /** @@ -491,7 +490,7 @@ abstract class DStream[T: ClassManifest] ( if (first11.size > 10) println("...") println() } - val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc)) + val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) ssc.registerOutputStream(newStream) } diff --git a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala index abf903293f..a375980b84 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala @@ -11,14 +11,17 @@ import spark.Logging private[streaming] class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) extends Serializable with Logging { - private[streaming] val checkpointFiles = new HashMap[Time, String]() - @transient private lazy val fileSystem = - new Path(dstream.context.checkpointDir).getFileSystem(new Configuration()) + protected val data = new HashMap[Time, AnyRef]() + + @transient private var fileSystem : FileSystem = null @transient private var lastCheckpointFiles: HashMap[Time, String] = null + protected[streaming] def checkpointFiles = data.asInstanceOf[HashMap[Time, String]] + /** - * Update the checkpoint data of the DStream. Default implementation records the checkpoint files to - * which the generate RDDs of the DStream has been saved. + * Updates the checkpoint data of the DStream. This gets called every time + * the graph checkpoint is initiated. Default implementation records the + * checkpoint files to which the generate RDDs of the DStream has been saved. */ def update() { @@ -42,7 +45,9 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) } /** - * Cleanup old checkpoint data. Default implementation, cleans up old checkpoint files. + * Cleanup old checkpoint data. This gets called every time the graph + * checkpoint is initiated, but after `update` is called. Default + * implementation, cleans up old checkpoint files. */ def cleanup() { // If there is at least on checkpoint file in the current checkpoint files, @@ -52,6 +57,9 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) case (time, file) => { try { val path = new Path(file) + if (fileSystem == null) { + fileSystem = path.getFileSystem(new Configuration()) + } fileSystem.delete(path, true) logInfo("Deleted checkpoint file '" + file + "' for time " + time) } catch { @@ -64,15 +72,16 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) } /** - * Restore the checkpoint data. Default implementation restores the RDDs from their - * checkpoint files. + * Restore the checkpoint data. This gets called once when the DStream graph + * (along with its DStreams) are being restored from a graph checkpoint file. + * Default implementation restores the RDDs from their checkpoint files. */ def restore() { // Create RDDs from the checkpoint data checkpointFiles.foreach { case(time, file) => { logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'") - dstream.generatedRDDs += ((time, dstream.context.sc.checkpointFile[T](file))) + dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file))) } } } diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index bc4a40d7bc..d5a5496839 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -87,7 +87,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private[streaming] def forgetOldRDDs(time: Time) { this.synchronized { - outputStreams.foreach(_.forgetOldRDDs(time)) + outputStreams.foreach(_.forgetOldMetadata(time)) } } diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 14500bdcb1..2cf00e3baa 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -61,7 +61,7 @@ class StreamingContext private ( protected[streaming] val isCheckpointPresent = (cp_ != null) - val sc: SparkContext = { + protected[streaming] val sc: SparkContext = { if (isCheckpointPresent) { new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars) } else { @@ -101,6 +101,11 @@ class StreamingContext private ( protected[streaming] var scheduler: Scheduler = null /** + * Returns the associated Spark context + */ + def sparkContext = sc + + /** * Sets each DStreams in this context to remember RDDs it generated in the last given duration. * DStreams remember RDDs only for a limited duration of time and releases them for garbage * collection. This method allows the developer to specify how to long to remember the RDDs ( diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala index 1e6ad84b44..c6ffb252ce 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala @@ -2,13 +2,14 @@ package spark.streaming.dstream import spark.RDD import spark.rdd.UnionRDD -import spark.streaming.{StreamingContext, Time} +import spark.streaming.{DStreamCheckpointData, StreamingContext, Time} import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import scala.collection.mutable.HashSet +import scala.collection.mutable.{HashSet, HashMap} +import java.io.{ObjectInputStream, IOException} private[streaming] class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( @@ -18,21 +19,14 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K newFilesOnly: Boolean = true) extends InputDStream[(K, V)](ssc_) { - @transient private var path_ : Path = null - @transient private var fs_ : FileSystem = null - - var lastModTime = 0L - val lastModTimeFiles = new HashSet[String]() + protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData - def path(): Path = { - if (path_ == null) path_ = new Path(directory) - path_ - } + private val lastModTimeFiles = new HashSet[String]() + private var lastModTime = 0L - def fs(): FileSystem = { - if (fs_ == null) fs_ = path.getFileSystem(new Configuration()) - fs_ - } + @transient private var path_ : Path = null + @transient private var fs_ : FileSystem = null + @transient private var files = new HashMap[Time, Array[String]] override def start() { if (newFilesOnly) { @@ -79,8 +73,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K } } - val newFiles = fs.listStatus(path, newFilter) - logInfo("New files: " + newFiles.map(_.getPath).mkString(", ")) + val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString) + logInfo("New files: " + newFiles.mkString(", ")) if (newFiles.length > 0) { // Update the modification time and the files processed for that modification time if (lastModTime != newFilter.latestModTime) { @@ -89,9 +83,70 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K } lastModTimeFiles ++= newFilter.latestModTimeFiles } - val newRDD = new UnionRDD(ssc.sc, newFiles.map( - file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString))) - Some(newRDD) + files += ((validTime, newFiles)) + Some(filesToRDD(newFiles)) + } + + /** Forget the old time-to-files mappings along with old RDDs */ + protected[streaming] override def forgetOldMetadata(time: Time) { + super.forgetOldMetadata(time) + val filesToBeRemoved = files.filter(_._1 <= (time - rememberDuration)) + files --= filesToBeRemoved.keys + logInfo("Forgot " + filesToBeRemoved.size + " files from " + this) + } + + /** Generate one RDD from an array of files */ + protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = { + new UnionRDD( + context.sparkContext, + files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) + ) + } + + private def path: Path = { + if (path_ == null) path_ = new Path(directory) + path_ + } + + private def fs: FileSystem = { + if (fs_ == null) fs_ = path.getFileSystem(new Configuration()) + fs_ + } + + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + logDebug(this.getClass().getSimpleName + ".readObject used") + ois.defaultReadObject() + generatedRDDs = new HashMap[Time, RDD[(K,V)]] () + files = new HashMap[Time, Array[String]] + } + + /** + * A custom version of the DStreamCheckpointData that stores names of + * Hadoop files as checkpoint data. + */ + private[streaming] + class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) { + + def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]] + + override def update() { + hadoopFiles.clear() + hadoopFiles ++= files + } + + override def cleanup() { } + + override def restore() { + hadoopFiles.foreach { + case (time, files) => { + logInfo("Restoring Hadoop RDD for time " + time + " from files " + + files.mkString("[", ",", "]") ) + files + generatedRDDs += ((time, filesToRDD(files))) + } + } + } } } @@ -100,3 +155,4 @@ object FileInputDStream { def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") } + diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index d7ba7a5d17..4f6204f205 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -214,10 +214,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { //Thread.sleep(100) } val startTime = System.currentTimeMillis() - /*while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - logInfo("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size) - Thread.sleep(100) - }*/ Thread.sleep(1000) val timeTaken = System.currentTimeMillis() - startTime assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") @@ -226,11 +222,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Verify whether data received by Spark Streaming was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) - logInfo("output") + logInfo("output, size = " + outputBuffer.size) outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("expected output.size = " + expectedOutput.size) - logInfo("expected output") + logInfo("expected output, size = " + expectedOutput.size) expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("--------------------------------") @@ -256,8 +250,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams var ssc = new StreamingContext(master, framework, batchDuration) ssc.checkpoint(checkpointDir, checkpointInterval) - val filestream = ssc.textFileStream(testDir.toString) - var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]]) + val fileStream = ssc.textFileStream(testDir.toString) + val outputBuffer = new ArrayBuffer[Seq[Int]] + // Reduced over a large window to ensure that recovery from master failure + // requires reprocessing of all the files seen before the failure + val reducedStream = fileStream.map(_.toInt) + .reduceByWindow(_ + _, batchDuration * 30, batchDuration) + var outputStream = new TestOutputStream(reducedStream, outputBuffer) ssc.registerOutputStream(outputStream) ssc.start() @@ -266,31 +265,56 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { Thread.sleep(1000) for (i <- Seq(1, 2, 3)) { FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") - Thread.sleep(100) + // wait to make sure that the file is written such that it gets shown in the file listings + Thread.sleep(500) clock.addToTime(batchDuration.milliseconds) + // wait to make sure that FileInputDStream picks up this file only and not any other file + Thread.sleep(500) } - Thread.sleep(500) logInfo("Output = " + outputStream.output.mkString(",")) - assert(outputStream.output.size > 0) + assert(outputStream.output.size > 0, "No files processed before restart") ssc.stop() + for (i <- Seq(4, 5, 6)) { + FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Thread.sleep(1000) + } + // Restart stream computation from checkpoint and create more files to see whether // they are being processed logInfo("*********** RESTARTING ************") ssc = new StreamingContext(checkpointDir) ssc.start() clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - Thread.sleep(500) - for (i <- Seq(4, 5, 6)) { + for (i <- Seq(7, 8, 9)) { FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") - Thread.sleep(100) + Thread.sleep(500) clock.addToTime(batchDuration.milliseconds) + Thread.sleep(500) } - Thread.sleep(500) - outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]] - logInfo("Output = " + outputStream.output.mkString(",")) - assert(outputStream.output.size > 0) + Thread.sleep(1000) + assert(outputStream.output.size > 0, "No files processed after restart") ssc.stop() + + // Append the new output to the old buffer + outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]] + outputBuffer ++= outputStream.output + + // Verify whether data received by Spark Streaming was as expected + val expectedOutput = Seq(1, 3, 6, 28, 36, 45) + logInfo("--------------------------------") + logInfo("output, size = " + outputBuffer.size) + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output, size = " + expectedOutput.size) + expectedOutput.foreach(x => logInfo("[" + x + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + assert(outputBuffer.size === expectedOutput.size) + for (i <- 0 until outputBuffer.size) { + assert(outputBuffer(i).size === 1) + assert(outputBuffer(i).head === expectedOutput(i)) + } } } |