diff options
Diffstat (limited to 'streaming')
3 files changed, 78 insertions, 35 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 05760bfcd4..957c227996 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -21,7 +21,7 @@ import java.io._ import java.util.concurrent.Executors import java.util.concurrent.RejectedExecutionException -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration import org.apache.spark.Logging @@ -55,31 +55,29 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) /** - * Convenience class to speed up the writing of graph checkpoint to file + * Convenience class to handle the writing of graph checkpoint to file */ private[streaming] class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends Logging { val file = new Path(checkpointDir, "graph") - // The file to which we actually write - and then "move" to file. - private val writeFile = new Path(file.getParent, file.getName + ".next") - private val bakFile = new Path(file.getParent, file.getName + ".bk") - - private var stopped = false - - var fs = file.getFileSystem(hadoopConf) - val maxAttempts = 3 + val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) + val compressionCodec = CompressionCodec.createCodec() + // The file to which we actually write - and then "move" to file + val writeFile = new Path(file.getParent, file.getName + ".next") + // The file to which existing checkpoint is backed up (i.e. "moved") + val bakFile = new Path(file.getParent, file.getName + ".bk") - private val compressionCodec = CompressionCodec.createCodec() + private var stopped = false + private var fs_ : FileSystem = _ - // Removed code which validates whether there is only one CheckpointWriter per path 'file' since + // Removed code which validates whether there is only one CheckpointWriter per path 'file' since // I did not notice any errors - reintroduce it ? - class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { def run() { var attempts = 0 val startTime = System.currentTimeMillis() - while (attempts < maxAttempts) { + while (attempts < MAX_ATTEMPTS && !stopped) { attempts += 1 try { logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") @@ -101,6 +99,7 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends } catch { case ioe: IOException => logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe) + reset() } } logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'") @@ -135,6 +134,15 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends val endTime = System.currentTimeMillis() logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.") } + + private def fs = synchronized { + if (fs_ == null) fs_ = file.getFileSystem(hadoopConf) + fs_ + } + + private def reset() = synchronized { + fs_ = null + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 79513548d2..a618a709a7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -40,6 +40,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData + // Max attempts to try if listing files fail + val MAX_ATTEMPTS = 10 + // Latest file mod time seen till any point of time private val prevModTimeFiles = new HashSet[String]() private var prevModTime = 0L @@ -108,14 +111,16 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = { logDebug("Trying to get new files for time " + currentTime) var attempts = 0 - while (attempts < FileInputDStream.MAX_ATTEMPTS) { + while (attempts < MAX_ATTEMPTS) { attempts += 1 try { val filter = new CustomPathFilter(currentTime) val newFiles = fs.listStatus(path, filter) return (newFiles.map(_.getPath.toString), filter.latestModTime, filter.latestModTimeFiles.toSeq) } catch { - case ioe: IOException => logWarning("Attempt " + attempts + " to get new files failed", ioe) + case ioe: IOException => + logWarning("Attempt " + attempts + " to get new files failed", ioe) + reset() } } (Seq(), -1, Seq()) @@ -139,6 +144,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas fs_ } + private def reset() { + fs_ = null + } + @throws(classOf[IOException]) private def readObject(ois: ObjectInputStream) { logDebug(this.getClass().getSimpleName + ".readObject used") @@ -224,6 +233,5 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas private[streaming] object FileInputDStream { - val MAX_ATTEMPTS = 10 def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index cf3fc82a2a..2552d51654 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -21,6 +21,12 @@ import org.apache.spark.SparkEnv import org.apache.spark.Logging import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter} import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} +import akka.actor.{Props, Actor} + +sealed trait JobGeneratorEvent +case class GenerateJobs(time: Time) extends JobGeneratorEvent +case class ClearOldMetadata(time: Time) extends JobGeneratorEvent +case class DoCheckpoint(time: Time) extends JobGeneratorEvent /** * This class generates jobs from DStreams as well as drives checkpointing and cleaning @@ -32,43 +38,67 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { initLogging() val ssc = jobScheduler.ssc - val clockClass = System.getProperty( - "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] - val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, - longTime => generateJobs(new Time(longTime))) val graph = ssc.graph + val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor { + def receive = { + case event: JobGeneratorEvent => + logDebug("Got event of type " + event.getClass.getName) + processEvent(event) + } + })) + val clock = { + val clockClass = System.getProperty( + "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") + Class.forName(clockClass).newInstance().asInstanceOf[Clock] + } + val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, + longTime => eventProcessorActor ! GenerateJobs(new Time(longTime))) lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) } else { null } - var latestTime: Time = null - def start() = synchronized { if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } - logInfo("JobGenerator started") } - def stop() = synchronized { + def stop() { timer.stop() if (checkpointWriter != null) checkpointWriter.stop() ssc.graph.stop() logInfo("JobGenerator stopped") } + /** + * On batch completion, clear old metadata and checkpoint computation. + */ + private[scheduler] def onBatchCompletion(time: Time) { + eventProcessorActor ! ClearOldMetadata(time) + } + + /** Processes all events */ + private def processEvent(event: JobGeneratorEvent) { + event match { + case GenerateJobs(time) => generateJobs(time) + case ClearOldMetadata(time) => clearOldMetadata(time) + case DoCheckpoint(time) => doCheckpoint(time) + } + } + + /** Starts the generator for the first time */ private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) - logInfo("JobGenerator's timer started at " + startTime) + logInfo("JobGenerator started at " + startTime) } + /** Restarts the generator based on the information in checkpoint */ private def restart() { // If manual clock is being used for testing, then // either set the manual clock to the last checkpointed time, @@ -100,7 +130,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // Restart the timer timer.start(restartTime.milliseconds) - logInfo("JobGenerator's timer restarted at " + restartTime) + logInfo("JobGenerator restarted at " + restartTime) } /** Generate jobs and perform checkpoint for the given `time`. */ @@ -108,16 +138,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { SparkEnv.set(ssc.env) logInfo("\n-----------------------------------------------------\n") jobScheduler.runJobs(time, graph.generateJobs(time)) - latestTime = time - doCheckpoint(time) + eventProcessorActor ! DoCheckpoint(time) } - /** - * On batch completion, clear old metadata and checkpoint computation. - */ - private[streaming] def onBatchCompletion(time: Time) { + /** Clear DStream metadata for the given `time`. */ + private def clearOldMetadata(time: Time) { ssc.graph.clearOldMetadata(time) - doCheckpoint(time) + eventProcessorActor ! DoCheckpoint(time) } /** Perform checkpoint for the give `time`. */ |