aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-22 18:10:00 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-22 18:10:00 -0800
commitfad2b82fc8fb49f2171af10cf7e408d8b8dd7349 (patch)
tree55a804861cd866c7583ae5074968571b76cd29ec /streaming
parent364cdb679cf2b0d5e6ed7ab89628f15594d7947f (diff)
downloadspark-fad2b82fc8fb49f2171af10cf7e408d8b8dd7349.tar.gz
spark-fad2b82fc8fb49f2171af10cf7e408d8b8dd7349.tar.bz2
spark-fad2b82fc8fb49f2171af10cf7e408d8b8dd7349.zip
Added support for saving input files of FileInputDStream to graph checkpoints. Modified 'file input stream with checkpoint' testcase to test recovery of pre-master-failure input files.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala29
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala27
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala96
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala64
6 files changed, 159 insertions, 66 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 3c1861a840..07ecb018ee 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -86,7 +86,7 @@ abstract class DStream[T: ClassManifest] (
protected[streaming] def parentRememberDuration = rememberDuration
/** Return the StreamingContext associated with this DStream */
- def context() = ssc
+ def context = ssc
/** Persist the RDDs of this DStream with the given storage level */
def persist(level: StorageLevel): DStream[T] = {
@@ -159,7 +159,7 @@ abstract class DStream[T: ClassManifest] (
)
assert(
- checkpointDuration == null || ssc.sc.checkpointDir.isDefined,
+ checkpointDuration == null || context.sparkContext.checkpointDir.isDefined,
"The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" +
" or SparkContext.checkpoint() to set the checkpoint directory."
)
@@ -298,8 +298,8 @@ abstract class DStream[T: ClassManifest] (
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
- val emptyFunc = { (iterator: Iterator[T]) => {} }
- ssc.sc.runJob(rdd, emptyFunc)
+ val emptyFunc = { (iterator: Iterator[T]) => {} }
+ context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
@@ -310,10 +310,9 @@ abstract class DStream[T: ClassManifest] (
/**
* Dereference RDDs that are older than rememberDuration.
*/
- protected[streaming] def forgetOldRDDs(time: Time) {
- val keys = generatedRDDs.keys
+ protected[streaming] def forgetOldMetadata(time: Time) {
var numForgotten = 0
- keys.foreach(t => {
+ generatedRDDs.keys.foreach(t => {
if (t <= (time - rememberDuration)) {
generatedRDDs.remove(t)
numForgotten += 1
@@ -321,7 +320,7 @@ abstract class DStream[T: ClassManifest] (
}
})
logInfo("Forgot " + numForgotten + " RDDs from " + this)
- dependencies.foreach(_.forgetOldRDDs(time))
+ dependencies.foreach(_.forgetOldMetadata(time))
}
/* Adds metadata to the Stream while it is running.
@@ -356,7 +355,7 @@ abstract class DStream[T: ClassManifest] (
*/
protected[streaming] def restoreCheckpointData() {
// Create RDDs from the checkpoint data
- logInfo("Restoring checkpoint data from " + checkpointData.checkpointFiles.size + " checkpointed RDDs")
+ logInfo("Restoring checkpoint data")
checkpointData.restore()
dependencies.foreach(_.restoreCheckpointData())
logInfo("Restored checkpoint data")
@@ -397,7 +396,7 @@ abstract class DStream[T: ClassManifest] (
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassManifest](mapFunc: T => U): DStream[U] = {
- new MappedDStream(this, ssc.sc.clean(mapFunc))
+ new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
/**
@@ -405,7 +404,7 @@ abstract class DStream[T: ClassManifest] (
* and then flattening the results
*/
def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = {
- new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc))
+ new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
/** Return a new DStream containing only the elements that satisfy a predicate. */
@@ -427,7 +426,7 @@ abstract class DStream[T: ClassManifest] (
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean = false
): DStream[U] = {
- new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc), preservePartitioning)
+ new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
}
/**
@@ -456,7 +455,7 @@ abstract class DStream[T: ClassManifest] (
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: (RDD[T], Time) => Unit) {
- val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc))
+ val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
ssc.registerOutputStream(newStream)
newStream
}
@@ -474,7 +473,7 @@ abstract class DStream[T: ClassManifest] (
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
- new TransformedDStream(this, ssc.sc.clean(transformFunc))
+ new TransformedDStream(this, context.sparkContext.clean(transformFunc))
}
/**
@@ -491,7 +490,7 @@ abstract class DStream[T: ClassManifest] (
if (first11.size > 10) println("...")
println()
}
- val newStream = new ForEachDStream(this, ssc.sc.clean(foreachFunc))
+ val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
ssc.registerOutputStream(newStream)
}
diff --git a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
index abf903293f..a375980b84 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
@@ -11,14 +11,17 @@ import spark.Logging
private[streaming]
class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
extends Serializable with Logging {
- private[streaming] val checkpointFiles = new HashMap[Time, String]()
- @transient private lazy val fileSystem =
- new Path(dstream.context.checkpointDir).getFileSystem(new Configuration())
+ protected val data = new HashMap[Time, AnyRef]()
+
+ @transient private var fileSystem : FileSystem = null
@transient private var lastCheckpointFiles: HashMap[Time, String] = null
+ protected[streaming] def checkpointFiles = data.asInstanceOf[HashMap[Time, String]]
+
/**
- * Update the checkpoint data of the DStream. Default implementation records the checkpoint files to
- * which the generate RDDs of the DStream has been saved.
+ * Updates the checkpoint data of the DStream. This gets called every time
+ * the graph checkpoint is initiated. Default implementation records the
+ * checkpoint files to which the generate RDDs of the DStream has been saved.
*/
def update() {
@@ -42,7 +45,9 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
}
/**
- * Cleanup old checkpoint data. Default implementation, cleans up old checkpoint files.
+ * Cleanup old checkpoint data. This gets called every time the graph
+ * checkpoint is initiated, but after `update` is called. Default
+ * implementation, cleans up old checkpoint files.
*/
def cleanup() {
// If there is at least on checkpoint file in the current checkpoint files,
@@ -52,6 +57,9 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
case (time, file) => {
try {
val path = new Path(file)
+ if (fileSystem == null) {
+ fileSystem = path.getFileSystem(new Configuration())
+ }
fileSystem.delete(path, true)
logInfo("Deleted checkpoint file '" + file + "' for time " + time)
} catch {
@@ -64,15 +72,16 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
}
/**
- * Restore the checkpoint data. Default implementation restores the RDDs from their
- * checkpoint files.
+ * Restore the checkpoint data. This gets called once when the DStream graph
+ * (along with its DStreams) are being restored from a graph checkpoint file.
+ * Default implementation restores the RDDs from their checkpoint files.
*/
def restore() {
// Create RDDs from the checkpoint data
checkpointFiles.foreach {
case(time, file) => {
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'")
- dstream.generatedRDDs += ((time, dstream.context.sc.checkpointFile[T](file)))
+ dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file)))
}
}
}
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index bc4a40d7bc..d5a5496839 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -87,7 +87,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private[streaming] def forgetOldRDDs(time: Time) {
this.synchronized {
- outputStreams.foreach(_.forgetOldRDDs(time))
+ outputStreams.foreach(_.forgetOldMetadata(time))
}
}
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 14500bdcb1..2cf00e3baa 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -61,7 +61,7 @@ class StreamingContext private (
protected[streaming] val isCheckpointPresent = (cp_ != null)
- val sc: SparkContext = {
+ protected[streaming] val sc: SparkContext = {
if (isCheckpointPresent) {
new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars)
} else {
@@ -101,6 +101,11 @@ class StreamingContext private (
protected[streaming] var scheduler: Scheduler = null
/**
+ * Returns the associated Spark context
+ */
+ def sparkContext = sc
+
+ /**
* Sets each DStreams in this context to remember RDDs it generated in the last given duration.
* DStreams remember RDDs only for a limited duration of time and releases them for garbage
* collection. This method allows the developer to specify how to long to remember the RDDs (
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index 1e6ad84b44..c6ffb252ce 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -2,13 +2,14 @@ package spark.streaming.dstream
import spark.RDD
import spark.rdd.UnionRDD
-import spark.streaming.{StreamingContext, Time}
+import spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import scala.collection.mutable.HashSet
+import scala.collection.mutable.{HashSet, HashMap}
+import java.io.{ObjectInputStream, IOException}
private[streaming]
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
@@ -18,21 +19,14 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {
- @transient private var path_ : Path = null
- @transient private var fs_ : FileSystem = null
-
- var lastModTime = 0L
- val lastModTimeFiles = new HashSet[String]()
+ protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
- def path(): Path = {
- if (path_ == null) path_ = new Path(directory)
- path_
- }
+ private val lastModTimeFiles = new HashSet[String]()
+ private var lastModTime = 0L
- def fs(): FileSystem = {
- if (fs_ == null) fs_ = path.getFileSystem(new Configuration())
- fs_
- }
+ @transient private var path_ : Path = null
+ @transient private var fs_ : FileSystem = null
+ @transient private var files = new HashMap[Time, Array[String]]
override def start() {
if (newFilesOnly) {
@@ -79,8 +73,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
}
- val newFiles = fs.listStatus(path, newFilter)
- logInfo("New files: " + newFiles.map(_.getPath).mkString(", "))
+ val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString)
+ logInfo("New files: " + newFiles.mkString(", "))
if (newFiles.length > 0) {
// Update the modification time and the files processed for that modification time
if (lastModTime != newFilter.latestModTime) {
@@ -89,9 +83,70 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
lastModTimeFiles ++= newFilter.latestModTimeFiles
}
- val newRDD = new UnionRDD(ssc.sc, newFiles.map(
- file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString)))
- Some(newRDD)
+ files += ((validTime, newFiles))
+ Some(filesToRDD(newFiles))
+ }
+
+ /** Forget the old time-to-files mappings along with old RDDs */
+ protected[streaming] override def forgetOldMetadata(time: Time) {
+ super.forgetOldMetadata(time)
+ val filesToBeRemoved = files.filter(_._1 <= (time - rememberDuration))
+ files --= filesToBeRemoved.keys
+ logInfo("Forgot " + filesToBeRemoved.size + " files from " + this)
+ }
+
+ /** Generate one RDD from an array of files */
+ protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
+ new UnionRDD(
+ context.sparkContext,
+ files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
+ )
+ }
+
+ private def path: Path = {
+ if (path_ == null) path_ = new Path(directory)
+ path_
+ }
+
+ private def fs: FileSystem = {
+ if (fs_ == null) fs_ = path.getFileSystem(new Configuration())
+ fs_
+ }
+
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream) {
+ logDebug(this.getClass().getSimpleName + ".readObject used")
+ ois.defaultReadObject()
+ generatedRDDs = new HashMap[Time, RDD[(K,V)]] ()
+ files = new HashMap[Time, Array[String]]
+ }
+
+ /**
+ * A custom version of the DStreamCheckpointData that stores names of
+ * Hadoop files as checkpoint data.
+ */
+ private[streaming]
+ class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) {
+
+ def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]]
+
+ override def update() {
+ hadoopFiles.clear()
+ hadoopFiles ++= files
+ }
+
+ override def cleanup() { }
+
+ override def restore() {
+ hadoopFiles.foreach {
+ case (time, files) => {
+ logInfo("Restoring Hadoop RDD for time " + time + " from files " +
+ files.mkString("[", ",", "]") )
+ files
+ generatedRDDs += ((time, filesToRDD(files)))
+ }
+ }
+ }
}
}
@@ -100,3 +155,4 @@ object FileInputDStream {
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
}
+
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index d7ba7a5d17..4f6204f205 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -214,10 +214,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
//Thread.sleep(100)
}
val startTime = System.currentTimeMillis()
- /*while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- logInfo("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
- Thread.sleep(100)
- }*/
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
@@ -226,11 +222,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether data received by Spark Streaming was as expected
logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
- logInfo("output")
+ logInfo("output, size = " + outputBuffer.size)
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("expected output.size = " + expectedOutput.size)
- logInfo("expected output")
+ logInfo("expected output, size = " + expectedOutput.size)
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
@@ -256,8 +250,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
var ssc = new StreamingContext(master, framework, batchDuration)
ssc.checkpoint(checkpointDir, checkpointInterval)
- val filestream = ssc.textFileStream(testDir.toString)
- var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]])
+ val fileStream = ssc.textFileStream(testDir.toString)
+ val outputBuffer = new ArrayBuffer[Seq[Int]]
+ // Reduced over a large window to ensure that recovery from master failure
+ // requires reprocessing of all the files seen before the failure
+ val reducedStream = fileStream.map(_.toInt)
+ .reduceByWindow(_ + _, batchDuration * 30, batchDuration)
+ var outputStream = new TestOutputStream(reducedStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
@@ -266,31 +265,56 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
- Thread.sleep(100)
+ // wait to make sure that the file is written such that it gets shown in the file listings
+ Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
+ // wait to make sure that FileInputDStream picks up this file only and not any other file
+ Thread.sleep(500)
}
- Thread.sleep(500)
logInfo("Output = " + outputStream.output.mkString(","))
- assert(outputStream.output.size > 0)
+ assert(outputStream.output.size > 0, "No files processed before restart")
ssc.stop()
+ for (i <- Seq(4, 5, 6)) {
+ FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
+ Thread.sleep(1000)
+ }
+
// Restart stream computation from checkpoint and create more files to see whether
// they are being processed
logInfo("*********** RESTARTING ************")
ssc = new StreamingContext(checkpointDir)
ssc.start()
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- Thread.sleep(500)
- for (i <- Seq(4, 5, 6)) {
+ for (i <- Seq(7, 8, 9)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
- Thread.sleep(100)
+ Thread.sleep(500)
clock.addToTime(batchDuration.milliseconds)
+ Thread.sleep(500)
}
- Thread.sleep(500)
- outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]]
- logInfo("Output = " + outputStream.output.mkString(","))
- assert(outputStream.output.size > 0)
+ Thread.sleep(1000)
+ assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()
+
+ // Append the new output to the old buffer
+ outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
+ outputBuffer ++= outputStream.output
+
+ // Verify whether data received by Spark Streaming was as expected
+ val expectedOutput = Seq(1, 3, 6, 28, 36, 45)
+ logInfo("--------------------------------")
+ logInfo("output, size = " + outputBuffer.size)
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("expected output, size = " + expectedOutput.size)
+ expectedOutput.foreach(x => logInfo("[" + x + "]"))
+ logInfo("--------------------------------")
+
+ // Verify whether all the elements received are as expected
+ assert(outputBuffer.size === expectedOutput.size)
+ for (i <- 0 until outputBuffer.size) {
+ assert(outputBuffer(i).size === 1)
+ assert(outputBuffer(i).head === expectedOutput(i))
+ }
}
}