diff options
author | Matei Zaharia <matei@databricks.com> | 2013-12-31 18:23:14 -0500 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2013-12-31 18:23:14 -0500 |
commit | ba9338f104ccc71d4f342a3f96624a9b36895f48 (patch) | |
tree | 5a5b2bf6b34565a594a68fff59b9dd2af234be7f /streaming | |
parent | 0fa5809768cf60ec62b4277f04e23a44dc1582e2 (diff) | |
parent | 63b411dd8664c27ac55586d8345733afad80961f (diff) | |
download | spark-ba9338f104ccc71d4f342a3f96624a9b36895f48.tar.gz spark-ba9338f104ccc71d4f342a3f96624a9b36895f48.tar.bz2 spark-ba9338f104ccc71d4f342a3f96624a9b36895f48.zip |
Merge remote-tracking branch 'apache/master' into conf2
Conflicts:
core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
Diffstat (limited to 'streaming')
7 files changed, 221 insertions, 141 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 35e23c1355..af443279a1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -21,12 +21,13 @@ import java.io._ import java.util.concurrent.Executors import java.util.concurrent.RejectedExecutionException -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, Logging} import org.apache.spark.io.CompressionCodec import org.apache.spark.util.MetadataCleaner +import org.apache.spark.deploy.SparkHadoopUtil private[streaming] @@ -54,36 +55,36 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) /** - * Convenience class to speed up the writing of graph checkpoint to file + * Convenience class to handle the writing of graph checkpoint to file */ private[streaming] -class CheckpointWriter(conf: SparkConf, checkpointDir: String) extends Logging { +class CheckpointWriter(conf: SparkConf, checkpointDir: String, hadoopConf: Configuration) + extends Logging +{ val file = new Path(checkpointDir, "graph") - // The file to which we actually write - and then "move" to file. - private val writeFile = new Path(file.getParent, file.getName + ".next") - private val bakFile = new Path(file.getParent, file.getName + ".bk") - - private var stopped = false - - val hadoopConf = new Configuration() - var fs = file.getFileSystem(hadoopConf) - val maxAttempts = 3 + val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) + val compressionCodec = CompressionCodec.createCodec() + // The file to which we actually write - and then "move" to file + val writeFile = new Path(file.getParent, file.getName + ".next") + // The file to which existing checkpoint is backed up (i.e. "moved") + val bakFile = new Path(file.getParent, file.getName + ".bk") - private val compressionCodec = CompressionCodec.createCodec(conf) + private var stopped = false + private var fs_ : FileSystem = _ // Removed code which validates whether there is only one CheckpointWriter per path 'file' since // I did not notice any errors - reintroduce it ? - class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { def run() { var attempts = 0 val startTime = System.currentTimeMillis() - while (attempts < maxAttempts) { + while (attempts < MAX_ATTEMPTS && !stopped) { attempts += 1 try { logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast. + // This is inherently thread unsafe, so alleviating it by writing to '.new' and + // then moving it to the final file val fos = fs.create(writeFile) fos.write(bytes) fos.close() @@ -101,6 +102,7 @@ class CheckpointWriter(conf: SparkConf, checkpointDir: String) extends Logging { } catch { case ioe: IOException => logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe) + reset() } } logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'") @@ -133,7 +135,17 @@ class CheckpointWriter(conf: SparkConf, checkpointDir: String) extends Logging { val startTime = System.currentTimeMillis() val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS) val endTime = System.currentTimeMillis() - logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.") + logInfo("CheckpointWriter executor terminated ? " + terminated + + ", waited for " + (endTime - startTime) + " ms.") + } + + private def fs = synchronized { + if (fs_ == null) fs_ = file.getFileSystem(hadoopConf) + fs_ + } + + private def reset() = synchronized { + fs_ = null } } @@ -143,8 +155,8 @@ object CheckpointReader extends Logging { def read(conf: SparkConf, path: String): Checkpoint = { val fs = new Path(path).getFileSystem(new Configuration()) - val attempts = Seq( - new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk")) + val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), + new Path(path), new Path(path + ".bk")) val compressionCodec = CompressionCodec.createCodec(conf) @@ -159,7 +171,8 @@ object CheckpointReader extends Logging { // loader to find and load classes. This is a well know Java issue and has popped up // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) val zis = compressionCodec.compressedInputStream(fis) - val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader) + val ois = new ObjectInputStreamWithLoader(zis, + Thread.currentThread().getContextClassLoader) val cp = ois.readObject.asInstanceOf[Checkpoint] ois.close() fs.close() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 339f6e64a2..0566704c94 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -20,17 +20,19 @@ package org.apache.spark.streaming import akka.actor.Props import akka.actor.SupervisorStrategy import akka.zeromq.Subscribe +import akka.util.ByteString import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.StreamingListener import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.receivers.ActorReceiver import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy import org.apache.spark.streaming.receivers.ZeroMQReceiver import org.apache.spark.storage.StorageLevel import org.apache.spark.util.MetadataCleaner import org.apache.spark.streaming.receivers.ActorReceiver +import org.apache.spark.streaming.scheduler.{JobScheduler, NetworkInputTracker} import scala.collection.mutable.Queue import scala.collection.Map @@ -38,17 +40,15 @@ import scala.reflect.ClassTag import java.io.InputStream import java.util.concurrent.atomic.AtomicInteger -import java.util.UUID import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path + import twitter4j.Status import twitter4j.auth.Authorization -import org.apache.spark.streaming.scheduler._ -import akka.util.ByteString /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -153,7 +153,7 @@ class StreamingContext private ( protected[streaming] var checkpointDir: String = { if (isCheckpointPresent) { - sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true) + sc.setCheckpointDir(cp_.checkpointDir) cp_.checkpointDir } else { null @@ -188,8 +188,12 @@ class StreamingContext private ( */ def checkpoint(directory: String) { if (directory != null) { - sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory)) - checkpointDir = directory + val path = new Path(directory) + val fs = path.getFileSystem(sparkContext.hadoopConfiguration) + fs.mkdirs(path) + val fullPath = fs.getFileStatus(path).getPath().toString + sc.setCheckpointDir(fullPath) + checkpointDir = fullPath } else { checkpointDir = null } @@ -380,7 +384,8 @@ class StreamingContext private ( /** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. - * File names starting with . are ignored. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file @@ -399,6 +404,8 @@ class StreamingContext private ( /** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. * @param directory HDFS directory to monitor for new file * @param filter Function to filter paths to process * @param newFilesOnly Should process only new files and ignore existing files in the directory @@ -419,7 +426,9 @@ class StreamingContext private ( /** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable, value - * as Text and input format as TextInputFormat). File names starting with . are ignored. + * as Text and input format as TextInputFormat). Files must be written to the + * monitored directory by "moving" them from another location within the same + * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ def textFileStream(directory: String): DStream[String] = { @@ -623,8 +632,4 @@ object StreamingContext { prefix + "-" + time.milliseconds + "." + suffix } } - - protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = { - new Path(sscCheckpointDir, UUID.randomUUID.toString).toString - } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 29f673d8ae..b79173c6aa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -265,9 +265,11 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable, value - * as Text and input format as TextInputFormat). File names starting with . are ignored. + * as Text and input format as TextInputFormat). Files must be written to the + * monitored directory by "moving" them from another location within the same + * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ def textFileStream(directory: String): JavaDStream[String] = { @@ -309,9 +311,10 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Creates a input stream that monitors a Hadoop-compatible filesystem + * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. - * File names starting with . are ignored. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file @@ -340,7 +343,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** - * Creates a input stream from a Flume source. + * Create a input stream from a Flume source. * @param hostname Hostname of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 39e25239bf..fb9eda8996 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -17,18 +17,17 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.UnionRDD -import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time} - +import java.io.{ObjectInputStream, IOException} +import scala.collection.mutable.{HashSet, HashMap} +import scala.reflect.ClassTag import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.spark.SparkException +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.UnionRDD +import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time} -import scala.collection.mutable.{HashSet, HashMap} -import scala.reflect.ClassTag - -import java.io.{ObjectInputStream, IOException} private[streaming] class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag]( @@ -41,8 +40,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData // Latest file mod time seen till any point of time - private val lastModTimeFiles = new HashSet[String]() - private var lastModTime = 0L + private val prevModTimeFiles = new HashSet[String]() + private var prevModTime = 0L @transient private var path_ : Path = null @transient private var fs_ : FileSystem = null @@ -50,11 +49,11 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas override def start() { if (newFilesOnly) { - lastModTime = graph.zeroTime.milliseconds + prevModTime = graph.zeroTime.milliseconds } else { - lastModTime = 0 + prevModTime = 0 } - logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly) + logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly) } override def stop() { } @@ -69,55 +68,22 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas * the previous call. */ override def compute(validTime: Time): Option[RDD[(K, V)]] = { - assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime) + assert(validTime.milliseconds >= prevModTime, + "Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]") - // Create the filter for selecting new files - val newFilter = new PathFilter() { - // Latest file mod time seen in this round of fetching files and its corresponding files - var latestModTime = 0L - val latestModTimeFiles = new HashSet[String]() - - def accept(path: Path): Boolean = { - if (!filter(path)) { // Reject file if it does not satisfy filter - logDebug("Rejected by filter " + path) - return false - } else { // Accept file only if - val modTime = fs.getFileStatus(path).getModificationTime() - logDebug("Mod time for " + path + " is " + modTime) - if (modTime < lastModTime) { - logDebug("Mod time less than last mod time") - return false // If the file was created before the last time it was called - } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) { - logDebug("Mod time equal to last mod time, but file considered already") - return false // If the file was created exactly as lastModTime but not reported yet - } else if (modTime > validTime.milliseconds) { - logDebug("Mod time more than valid time") - return false // If the file was created after the time this function call requires - } - if (modTime > latestModTime) { - latestModTime = modTime - latestModTimeFiles.clear() - logDebug("Latest mod time updated to " + latestModTime) - } - latestModTimeFiles += path.toString - logDebug("Accepted " + path) - return true - } - } - } - logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime) - val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString) + // Find new files + val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) if (newFiles.length > 0) { // Update the modification time and the files processed for that modification time - if (lastModTime != newFilter.latestModTime) { - lastModTime = newFilter.latestModTime - lastModTimeFiles.clear() + if (prevModTime < latestModTime) { + prevModTime = latestModTime + prevModTimeFiles.clear() } - lastModTimeFiles ++= newFilter.latestModTimeFiles - logDebug("Last mod time updated to " + lastModTime) + prevModTimeFiles ++= latestModTimeFiles + logDebug("Last mod time updated to " + prevModTime) } - files += ((validTime, newFiles)) + files += ((validTime, newFiles.toArray)) Some(filesToRDD(newFiles)) } @@ -132,12 +98,28 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) } + /** + * Find files which have modification timestamp <= current time and return a 3-tuple of + * (new files found, latest modification time among them, files with latest modification time) + */ + private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = { + logDebug("Trying to get new files for time " + currentTime) + val filter = new CustomPathFilter(currentTime) + val newFiles = fs.listStatus(path, filter).map(_.getPath.toString) + (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq) + } + /** Generate one RDD from an array of files */ - protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = { - new UnionRDD( - context.sparkContext, - files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) - ) + private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { + val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) + files.zip(fileRDDs).foreach { case (file, rdd) => { + if (rdd.partitions.size == 0) { + logError("File " + file + " has no data in it. Spark Streaming can only ingest " + + "files that have been \"moved\" to the directory assigned to the file stream. " + + "Refer to the streaming programming guide for more details.") + } + }} + new UnionRDD(context.sparkContext, fileRDDs) } private def path: Path = { @@ -150,6 +132,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas fs_ } + private def reset() { + fs_ = null + } + @throws(classOf[IOException]) private def readObject(ois: ObjectInputStream) { logDebug(this.getClass().getSimpleName + ".readObject used") @@ -191,6 +177,51 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]" } } + + /** + * Custom PathFilter class to find new files that have modification timestamps <= current time, + * but have not been seen before (i.e. the file should not be in lastModTimeFiles) + */ + private[streaming] + class CustomPathFilter(maxModTime: Long) extends PathFilter { + // Latest file mod time seen in this round of fetching files and its corresponding files + var latestModTime = 0L + val latestModTimeFiles = new HashSet[String]() + + def accept(path: Path): Boolean = { + try { + if (!filter(path)) { // Reject file if it does not satisfy filter + logDebug("Rejected by filter " + path) + return false + } + val modTime = fs.getFileStatus(path).getModificationTime() + logDebug("Mod time for " + path + " is " + modTime) + if (modTime < prevModTime) { + logDebug("Mod time less than last mod time") + return false // If the file was created before the last time it was called + } else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) { + logDebug("Mod time equal to last mod time, but file considered already") + return false // If the file was created exactly as lastModTime but not reported yet + } else if (modTime > maxModTime) { + logDebug("Mod time more than ") + return false // If the file is too new that considering it may give errors + } + if (modTime > latestModTime) { + latestModTime = modTime + latestModTimeFiles.clear() + logDebug("Latest mod time updated to " + latestModTime) + } + latestModTimeFiles += path.toString + logDebug("Accepted " + path) + } catch { + case fnfe: java.io.FileNotFoundException => + logWarning("Error finding new files", fnfe) + reset() + return false + } + return true + } + } } private[streaming] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index dbd08415a1..e448211732 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -17,11 +17,18 @@ package org.apache.spark.streaming.scheduler +import akka.actor.{Props, Actor} import org.apache.spark.SparkEnv import org.apache.spark.Logging import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter} import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} +/** Event classes for JobGenerator */ +private[scheduler] sealed trait JobGeneratorEvent +private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent +private[scheduler] case class ClearOldMetadata(time: Time) extends JobGeneratorEvent +private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent + /** * This class generates jobs from DStreams as well as drives checkpointing and cleaning * up DStream metadata. @@ -32,43 +39,67 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { initLogging() val ssc = jobScheduler.ssc - val clockClass = ssc.sc.conf.getOrElse( - "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] - val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, - longTime => generateJobs(new Time(longTime))) val graph = ssc.graph + val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor { + def receive = { + case event: JobGeneratorEvent => + logDebug("Got event of type " + event.getClass.getName) + processEvent(event) + } + })) + val clock = { + val clockClass = ssc.sc.conf.getOrElse( + "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") + val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] + } + val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, + longTime => eventProcessorActor ! GenerateJobs(new Time(longTime))) lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { - new CheckpointWriter(ssc.conf, ssc.checkpointDir) + new CheckpointWriter(ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) } else { null } - var latestTime: Time = null - def start() = synchronized { if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } - logInfo("JobGenerator started") } - def stop() = synchronized { + def stop() { timer.stop() if (checkpointWriter != null) checkpointWriter.stop() ssc.graph.stop() logInfo("JobGenerator stopped") } + /** + * On batch completion, clear old metadata and checkpoint computation. + */ + private[scheduler] def onBatchCompletion(time: Time) { + eventProcessorActor ! ClearOldMetadata(time) + } + + /** Processes all events */ + private def processEvent(event: JobGeneratorEvent) { + event match { + case GenerateJobs(time) => generateJobs(time) + case ClearOldMetadata(time) => clearOldMetadata(time) + case DoCheckpoint(time) => doCheckpoint(time) + } + } + + /** Starts the generator for the first time */ private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) - logInfo("JobGenerator's timer started at " + startTime) + logInfo("JobGenerator started at " + startTime) } + /** Restarts the generator based on the information in checkpoint */ private def restart() { // If manual clock is being used for testing, then // either set the manual clock to the last checkpointed time, @@ -100,7 +131,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // Restart the timer timer.start(restartTime.milliseconds) - logInfo("JobGenerator's timer restarted at " + restartTime) + logInfo("JobGenerator restarted at " + restartTime) } /** Generate jobs and perform checkpoint for the given `time`. */ @@ -108,16 +139,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { SparkEnv.set(ssc.env) logInfo("\n-----------------------------------------------------\n") jobScheduler.runJobs(time, graph.generateJobs(time)) - latestTime = time - doCheckpoint(time) + eventProcessorActor ! DoCheckpoint(time) } - /** - * On batch completion, clear old metadata and checkpoint computation. - */ - private[streaming] def onBatchCompletion(time: Time) { + /** Clear DStream metadata for the given `time`. */ + private def clearOldMetadata(time: Time) { ssc.graph.clearOldMetadata(time) - doCheckpoint(time) + eventProcessorActor ! DoCheckpoint(time) } /** Perform checkpoint for the give `time`. */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index c60a3f5390..8dc80ac2ed 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -17,24 +17,18 @@ package org.apache.spark.streaming -import dstream.FileInputDStream -import org.apache.spark.streaming.StreamingContext._ import java.io.File import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag - import org.apache.commons.io.FileUtils -import org.scalatest.BeforeAndAfter - import com.google.common.io.Files - -import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.hadoop.conf.Configuration +import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.FileInputDStream import org.apache.spark.streaming.util.ManualClock - - /** * This test suites tests the checkpointing functionality of DStreams - * the checkpointing of a DStream's RDDs as well as the checkpointing of @@ -66,7 +60,7 @@ class CheckpointSuite extends TestSuiteBase { conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") val stateStreamCheckpointInterval = Seconds(1) - + val fs = FileSystem.getLocal(new Configuration()) // this ensure checkpointing occurs at least once val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2 val secondNumBatches = firstNumBatches @@ -90,11 +84,12 @@ class CheckpointSuite extends TestSuiteBase { ssc.start() advanceTimeWithRealDelay(ssc, firstNumBatches) logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData) - assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure") + assert(!stateStream.checkpointData.checkpointFiles.isEmpty, + "No checkpointed RDDs in state stream before first failure") stateStream.checkpointData.checkpointFiles.foreach { - case (time, data) => { - val file = new File(data.toString) - assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist") + case (time, file) => { + assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time + + " for state stream before first failure does not exist") } } @@ -102,7 +97,8 @@ class CheckpointSuite extends TestSuiteBase { // and check whether the earlier checkpoint files are deleted val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2)) advanceTimeWithRealDelay(ssc, secondNumBatches) - checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) + checkpointFiles.foreach(file => + assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) ssc.stop() // Restart stream computation using the checkpoint file and check whether @@ -110,19 +106,20 @@ class CheckpointSuite extends TestSuiteBase { ssc = new StreamingContext(checkpointDir) stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]") - assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure") + assert(!stateStream.generatedRDDs.isEmpty, + "No restored RDDs in state stream after recovery from first failure") // Run one batch to generate a new checkpoint file and check whether some RDD // is present in the checkpoint data or not ssc.start() advanceTimeWithRealDelay(ssc, 1) - assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure") + assert(!stateStream.checkpointData.checkpointFiles.isEmpty, + "No checkpointed RDDs in state stream before second failure") stateStream.checkpointData.checkpointFiles.foreach { - case (time, data) => { - val file = new File(data.toString) - assert(file.exists(), - "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist") + case (time, file) => { + assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time + + " for state stream before seconds failure does not exist") } } ssc.stop() @@ -132,7 +129,8 @@ class CheckpointSuite extends TestSuiteBase { ssc = new StreamingContext(checkpointDir) stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]") - assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure") + assert(!stateStream.generatedRDDs.isEmpty, + "No restored RDDs in state stream after recovery from second failure") // Adjust manual clock time as if it is being restarted after a delay; this is a hack because // we modify the conf object, but it works for this one property @@ -144,6 +142,7 @@ class CheckpointSuite extends TestSuiteBase { ssc = null } + // This tests whether the systm can recover from a master failure with simple // non-stateful operations. This assumes as reliable, replayable input // source - TestInputDStream. @@ -192,6 +191,7 @@ class CheckpointSuite extends TestSuiteBase { testCheckpointedOperation(input, operation, output, 7) } + // This tests whether file input stream remembers what files were seen before // the master failure and uses them again to process a large window operation. // It also tests whether batches, whose processing was incomplete due to the diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 52381c10b0..5185954521 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -23,7 +23,7 @@ import akka.actor.IOManager import akka.actor.Props import akka.util.ByteString -import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent} +import org.apache.spark.streaming.dstream.{FileInputDStream, NetworkReceiver, SparkFlumeEvent} import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} |