diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-17 15:06:41 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-17 15:06:41 -0800 |
commit | f98c7da23ef66812b8b4888230ee98c07f09af23 (patch) | |
tree | 28aa7c6757dcdfe0ee72e95f93634edd77c89265 /streaming/src/main | |
parent | ddcb976b0d7ce4a76168da33c0e947a5a6b5a255 (diff) | |
download | spark-f98c7da23ef66812b8b4888230ee98c07f09af23.tar.gz spark-f98c7da23ef66812b8b4888230ee98c07f09af23.tar.bz2 spark-f98c7da23ef66812b8b4888230ee98c07f09af23.zip |
Many changes to ensure better 2nd recovery if 2nd failure happens while
recovering from 1st failure
- Made the scheduler to checkpoint after clearing old metadata which
ensures that a new checkpoint is written as soon as at least one batch
gets computed while recovering from a failure. This ensures that if
there is a 2nd failure while recovering from 1st failure, the system
start 2nd recovery from a newer checkpoint.
- Modified Checkpoint writer to write checkpoint in a different thread.
- Added a check to make sure that compute for InputDStreams gets called
only for strictly increasing times.
- Changed implementation of slice to call getOrCompute on parent DStream
in time-increasing order.
- Added testcase to test slice.
- Fixed testGroupByKeyAndWindow testcase in JavaAPISuite to verify
results with expected output in an order-independent manner.
Diffstat (limited to 'streaming/src/main')
11 files changed, 141 insertions, 68 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index b9eb7f8ec4..7405c8b22e 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -6,6 +6,8 @@ import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.hadoop.conf.Configuration import java.io._ +import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} +import java.util.concurrent.Executors private[streaming] @@ -38,32 +40,50 @@ class CheckpointWriter(checkpointDir: String) extends Logging { val conf = new Configuration() var fs = file.getFileSystem(conf) val maxAttempts = 3 + val executor = Executors.newFixedThreadPool(1) - def write(checkpoint: Checkpoint) { - // TODO: maybe do this in a different thread from the main stream execution thread - var attempts = 0 - while (attempts < maxAttempts) { - attempts += 1 - try { - logDebug("Saving checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'") - if (fs.exists(file)) { - val bkFile = new Path(file.getParent, file.getName + ".bk") - FileUtil.copy(fs, file, fs, bkFile, true, true, conf) - logDebug("Moved existing checkpoint file to " + bkFile) + class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { + def run() { + var attempts = 0 + val startTime = System.currentTimeMillis() + while (attempts < maxAttempts) { + attempts += 1 + try { + logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") + if (fs.exists(file)) { + val bkFile = new Path(file.getParent, file.getName + ".bk") + FileUtil.copy(fs, file, fs, bkFile, true, true, conf) + logDebug("Moved existing checkpoint file to " + bkFile) + } + val fos = fs.create(file) + fos.write(bytes) + fos.close() + fos.close() + val finishTime = System.currentTimeMillis(); + logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") + return + } catch { + case ioe: IOException => + logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe) } - val fos = fs.create(file) - val oos = new ObjectOutputStream(fos) - oos.writeObject(checkpoint) - oos.close() - logInfo("Checkpoint for time " + checkpoint.checkpointTime + " saved to file '" + file + "'") - fos.close() - return - } catch { - case ioe: IOException => - logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe) } + logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'") } - logError("Could not write checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'") + } + + def write(checkpoint: Checkpoint) { + val bos = new ByteArrayOutputStream() + val zos = new LZFOutputStream(bos) + val oos = new ObjectOutputStream(zos) + oos.writeObject(checkpoint) + oos.close() + bos.close() + executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) + } + + def stop() { + executor.shutdown() } } @@ -85,7 +105,8 @@ object CheckpointReader extends Logging { // of ObjectInputStream is used to explicitly use the current thread's default class // loader to find and load classes. This is a well know Java issue and has popped up // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) - val ois = new ObjectInputStreamWithLoader(fis, Thread.currentThread().getContextClassLoader) + val zis = new LZFInputStream(fis) + val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader) val cp = ois.readObject.asInstanceOf[Checkpoint] ois.close() fs.close() diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index ce42b742d7..84e4b5bedb 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -238,13 +238,15 @@ abstract class DStream[T: ClassManifest] ( dependencies.foreach(_.remember(parentRememberDuration)) } - /** This method checks whether the 'time' is valid wrt slideDuration for generating RDD */ + /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */ protected def isTimeValid(time: Time): Boolean = { if (!isInitialized) { throw new Exception (this + " has not been initialized") } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { + logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) false } else { + logInfo("Time " + time + " is valid") true } } @@ -627,16 +629,21 @@ abstract class DStream[T: ClassManifest] ( * Return all the RDDs between 'fromTime' to 'toTime' (both included) */ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { - val rdds = new ArrayBuffer[RDD[T]]() - var time = toTime.floor(slideDuration) - while (time >= zeroTime && time >= fromTime) { - getOrCompute(time) match { - case Some(rdd) => rdds += rdd - case None => //throw new Exception("Could not get RDD for time " + time) - } - time -= slideDuration + if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { + logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") + } + if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { + logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") } - rdds.toSeq + val alignedToTime = toTime.floor(slideDuration) + val alignedFromTime = fromTime.floor(slideDuration) + + logInfo("Slicing from " + fromTime + " to " + toTime + + " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") + + alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { + if (time >= zeroTime) getOrCompute(time) else None + }) } /** diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index 22d9e24f05..adb7f3a24d 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -86,10 +86,12 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def getOutputStreams() = this.synchronized { outputStreams.toArray } - def generateRDDs(time: Time): Seq[Job] = { + def generateJobs(time: Time): Seq[Job] = { this.synchronized { - logInfo("Generating RDDs for time " + time) - outputStreams.flatMap(outputStream => outputStream.generateJob(time)) + logInfo("Generating jobs for time " + time) + val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time)) + logInfo("Generated " + jobs.length + " jobs for time " + time) + jobs } } @@ -97,18 +99,23 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { this.synchronized { logInfo("Clearing old metadata for time " + time) outputStreams.foreach(_.clearOldMetadata(time)) + logInfo("Cleared old metadata for time " + time) } } def updateCheckpointData(time: Time) { this.synchronized { + logInfo("Updating checkpoint data for time " + time) outputStreams.foreach(_.updateCheckpointData(time)) + logInfo("Updated checkpoint data for time " + time) } } def restoreCheckpointData() { this.synchronized { + logInfo("Restoring checkpoint data") outputStreams.foreach(_.restoreCheckpointData()) + logInfo("Restored checkpoint data") } } diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 649494ff4a..7696c4a592 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -43,20 +43,24 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { } private def clearJob(job: Job) { + var timeCleared = false + val time = job.time jobs.synchronized { - val time = job.time val jobsOfTime = jobs.get(time) if (jobsOfTime.isDefined) { jobsOfTime.get -= job if (jobsOfTime.get.isEmpty) { - ssc.scheduler.clearOldMetadata(time) jobs -= time + timeCleared = true } } else { throw new Exception("Job finished for time " + job.time + " but time does not exist in jobs") } } + if (timeCleared) { + ssc.scheduler.clearOldMetadata(time) + } } def getPendingTimes(): Array[Time] = { diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index 57d494da83..1c4b22a898 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -20,8 +20,9 @@ class Scheduler(ssc: StreamingContext) extends Logging { val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock") val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, - longTime => generateRDDs(new Time(longTime))) + longTime => generateJobs(new Time(longTime))) val graph = ssc.graph + var latestTime: Time = null def start() = synchronized { if (ssc.isCheckpointPresent) { @@ -35,6 +36,7 @@ class Scheduler(ssc: StreamingContext) extends Logging { def stop() = synchronized { timer.stop() jobManager.stop() + if (checkpointWriter != null) checkpointWriter.stop() ssc.graph.stop() logInfo("Scheduler stopped") } @@ -73,35 +75,38 @@ class Scheduler(ssc: StreamingContext) extends Logging { val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) logInfo("Batches to reschedule: " + timesToReschedule.mkString(", ")) timesToReschedule.foreach(time => - graph.generateRDDs(time).foreach(jobManager.runJob) + graph.generateJobs(time).foreach(jobManager.runJob) ) // Restart the timer timer.start(restartTime.milliseconds) - logInfo("Scheduler's timer restarted") + logInfo("Scheduler's timer restarted at " + restartTime) } - /** Generates the RDDs, clears old metadata and does checkpoint for the given time */ - def generateRDDs(time: Time) { + /** Generate jobs and perform checkpoint for the given `time`. */ + def generateJobs(time: Time) { SparkEnv.set(ssc.env) logInfo("\n-----------------------------------------------------\n") - graph.generateRDDs(time).foreach(jobManager.runJob) + graph.generateJobs(time).foreach(jobManager.runJob) + latestTime = time doCheckpoint(time) } - + /** + * Clear old metadata assuming jobs of `time` have finished processing. + * And also perform checkpoint. + */ def clearOldMetadata(time: Time) { ssc.graph.clearOldMetadata(time) + doCheckpoint(time) } - def doCheckpoint(time: Time) { + /** Perform checkpoint for the give `time`. */ + def doCheckpoint(time: Time) = synchronized { if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) - val startTime = System.currentTimeMillis() ssc.graph.updateCheckpointData(time) checkpointWriter.write(new Checkpoint(ssc, time)) - val stopTime = System.currentTimeMillis() - logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms") } } } diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 9be9d884be..d1407b7869 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -119,18 +119,15 @@ class StreamingContext private ( /** * Set the context to periodically checkpoint the DStream operations for master - * fault-tolerance. By default, the graph will be checkpointed every batch interval. + * fault-tolerance. The graph will be checkpointed every batch interval. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored - * @param interval checkpoint interval */ - def checkpoint(directory: String, interval: Duration = null) { + def checkpoint(directory: String) { if (directory != null) { sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory)) checkpointDir = directory - checkpointDuration = interval } else { checkpointDir = null - checkpointDuration = null } } diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala index 8201e84a20..f14decf08b 100644 --- a/streaming/src/main/scala/spark/streaming/Time.scala +++ b/streaming/src/main/scala/spark/streaming/Time.scala @@ -38,15 +38,14 @@ case class Time(private val millis: Long) { def max(that: Time): Time = if (this > that) this else that def until(that: Time, interval: Duration): Seq[Time] = { - assert(that > this, "Cannot create sequence as " + that + " not more than " + this) - assert( - (that - this).isMultipleOf(interval), - "Cannot create sequence as gap between " + that + " and " + - this + " is not multiple of " + interval - ) (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_)) } + def to(that: Time, interval: Duration): Seq[Time] = { + (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_)) + } + + override def toString: String = (millis.toString + " ms") } diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 5bbf2b084f..03933aae93 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -314,12 +314,11 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Sets the context to periodically checkpoint the DStream operations for master - * fault-tolerance. By default, the graph will be checkpointed every batch interval. + * fault-tolerance. The graph will be checkpointed every batch interval. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored - * @param interval checkpoint interval */ - def checkpoint(directory: String, interval: Duration = null) { - ssc.checkpoint(directory, interval) + def checkpoint(directory: String) { + ssc.checkpoint(directory) } /** diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala index 980ca5177e..a4db44a608 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala @@ -1,10 +1,42 @@ package spark.streaming.dstream -import spark.streaming.{Duration, StreamingContext, DStream} +import spark.streaming.{Time, Duration, StreamingContext, DStream} +/** + * This is the abstract base class for all input streams. This class provides to methods + * start() and stop() which called by the scheduler to start and stop receiving data/ + * Input streams that can generated RDDs from new data just by running a service on + * the driver node (that is, without running a receiver onworker nodes) can be + * implemented by directly subclassing this InputDStream. For example, + * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for + * new files and generates RDDs on the new files. For implementing input streams + * that requires running a receiver on the worker nodes, use NetworkInputDStream + * as the parent class. + */ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { + var lastValidTime: Time = null + + /** + * Checks whether the 'time' is valid wrt slideDuration for generating RDD. + * Additionally it also ensures valid times are in strictly increasing order. + * This ensures that InputDStream.compute() is called strictly on increasing + * times. + */ + override protected def isTimeValid(time: Time): Boolean = { + if (!super.isTimeValid(time)) { + false // Time not valid + } else { + // Time is valid, but check it it is more than lastValidTime + if (lastValidTime == null || lastValidTime <= time) { + logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime) + } + lastValidTime = time + true + } + } + override def dependencies = List() override def slideDuration: Duration = { @@ -13,7 +45,9 @@ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContex ssc.graph.batchDuration } + /** Method called to start receiving data. Subclasses must implement this method. */ def start() + /** Method called to stop receiving data. Subclasses must implement this method. */ def stop() } diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala index d733254ddb..e70822e5c3 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala @@ -2,8 +2,8 @@ package spark.streaming.dstream import spark._ import spark.streaming._ -import dstream.{NetworkReceiver, NetworkInputDStream} import storage.StorageLevel + import twitter4j._ import twitter4j.auth.BasicAuthorization @@ -19,7 +19,7 @@ class TwitterInputDStream( password: String, filters: Seq[String], storageLevel: StorageLevel - ) extends NetworkInputDStream[Status](ssc_) { + ) extends NetworkInputDStream[Status](ssc_) { override def createReceiver(): NetworkReceiver[Status] = { new TwitterReceiver(username, password, filters, storageLevel) diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala index 776e676063..bdd9f4d753 100644 --- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -315,7 +315,7 @@ class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread override def run() { try { // If it is the first killing, then allow the first checkpoint to be created - var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 1000 + var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 2000 val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime) logInfo("Kill wait time = " + killWaitTime) Thread.sleep(killWaitTime) |