aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-17 15:06:41 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-17 15:06:41 -0800
commitf98c7da23ef66812b8b4888230ee98c07f09af23 (patch)
tree28aa7c6757dcdfe0ee72e95f93634edd77c89265 /streaming/src/main
parentddcb976b0d7ce4a76168da33c0e947a5a6b5a255 (diff)
downloadspark-f98c7da23ef66812b8b4888230ee98c07f09af23.tar.gz
spark-f98c7da23ef66812b8b4888230ee98c07f09af23.tar.bz2
spark-f98c7da23ef66812b8b4888230ee98c07f09af23.zip
Many changes to ensure better 2nd recovery if 2nd failure happens while
recovering from 1st failure - Made the scheduler to checkpoint after clearing old metadata which ensures that a new checkpoint is written as soon as at least one batch gets computed while recovering from a failure. This ensures that if there is a 2nd failure while recovering from 1st failure, the system start 2nd recovery from a newer checkpoint. - Modified Checkpoint writer to write checkpoint in a different thread. - Added a check to make sure that compute for InputDStreams gets called only for strictly increasing times. - Changed implementation of slice to call getOrCompute on parent DStream in time-increasing order. - Added testcase to test slice. - Fixed testGroupByKeyAndWindow testcase in JavaAPISuite to verify results with expected output in an order-independent manner.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala67
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala27
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala13
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala27
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala36
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala2
11 files changed, 141 insertions, 68 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index b9eb7f8ec4..7405c8b22e 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -6,6 +6,8 @@ import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.conf.Configuration
import java.io._
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
+import java.util.concurrent.Executors
private[streaming]
@@ -38,32 +40,50 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
val conf = new Configuration()
var fs = file.getFileSystem(conf)
val maxAttempts = 3
+ val executor = Executors.newFixedThreadPool(1)
- def write(checkpoint: Checkpoint) {
- // TODO: maybe do this in a different thread from the main stream execution thread
- var attempts = 0
- while (attempts < maxAttempts) {
- attempts += 1
- try {
- logDebug("Saving checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'")
- if (fs.exists(file)) {
- val bkFile = new Path(file.getParent, file.getName + ".bk")
- FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
- logDebug("Moved existing checkpoint file to " + bkFile)
+ class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
+ def run() {
+ var attempts = 0
+ val startTime = System.currentTimeMillis()
+ while (attempts < maxAttempts) {
+ attempts += 1
+ try {
+ logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
+ if (fs.exists(file)) {
+ val bkFile = new Path(file.getParent, file.getName + ".bk")
+ FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
+ logDebug("Moved existing checkpoint file to " + bkFile)
+ }
+ val fos = fs.create(file)
+ fos.write(bytes)
+ fos.close()
+ fos.close()
+ val finishTime = System.currentTimeMillis();
+ logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
+ "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds")
+ return
+ } catch {
+ case ioe: IOException =>
+ logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
}
- val fos = fs.create(file)
- val oos = new ObjectOutputStream(fos)
- oos.writeObject(checkpoint)
- oos.close()
- logInfo("Checkpoint for time " + checkpoint.checkpointTime + " saved to file '" + file + "'")
- fos.close()
- return
- } catch {
- case ioe: IOException =>
- logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
}
+ logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'")
}
- logError("Could not write checkpoint for time " + checkpoint.checkpointTime + " to file '" + file + "'")
+ }
+
+ def write(checkpoint: Checkpoint) {
+ val bos = new ByteArrayOutputStream()
+ val zos = new LZFOutputStream(bos)
+ val oos = new ObjectOutputStream(zos)
+ oos.writeObject(checkpoint)
+ oos.close()
+ bos.close()
+ executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
+ }
+
+ def stop() {
+ executor.shutdown()
}
}
@@ -85,7 +105,8 @@ object CheckpointReader extends Logging {
// of ObjectInputStream is used to explicitly use the current thread's default class
// loader to find and load classes. This is a well know Java issue and has popped up
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
- val ois = new ObjectInputStreamWithLoader(fis, Thread.currentThread().getContextClassLoader)
+ val zis = new LZFInputStream(fis)
+ val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader)
val cp = ois.readObject.asInstanceOf[Checkpoint]
ois.close()
fs.close()
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index ce42b742d7..84e4b5bedb 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -238,13 +238,15 @@ abstract class DStream[T: ClassManifest] (
dependencies.foreach(_.remember(parentRememberDuration))
}
- /** This method checks whether the 'time' is valid wrt slideDuration for generating RDD */
+ /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
protected def isTimeValid(time: Time): Boolean = {
if (!isInitialized) {
throw new Exception (this + " has not been initialized")
} else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
+ logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
false
} else {
+ logInfo("Time " + time + " is valid")
true
}
}
@@ -627,16 +629,21 @@ abstract class DStream[T: ClassManifest] (
* Return all the RDDs between 'fromTime' to 'toTime' (both included)
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
- val rdds = new ArrayBuffer[RDD[T]]()
- var time = toTime.floor(slideDuration)
- while (time >= zeroTime && time >= fromTime) {
- getOrCompute(time) match {
- case Some(rdd) => rdds += rdd
- case None => //throw new Exception("Could not get RDD for time " + time)
- }
- time -= slideDuration
+ if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
+ logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
+ }
+ if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
+ logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")")
}
- rdds.toSeq
+ val alignedToTime = toTime.floor(slideDuration)
+ val alignedFromTime = fromTime.floor(slideDuration)
+
+ logInfo("Slicing from " + fromTime + " to " + toTime +
+ " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
+
+ alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
+ if (time >= zeroTime) getOrCompute(time) else None
+ })
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index 22d9e24f05..adb7f3a24d 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -86,10 +86,12 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def getOutputStreams() = this.synchronized { outputStreams.toArray }
- def generateRDDs(time: Time): Seq[Job] = {
+ def generateJobs(time: Time): Seq[Job] = {
this.synchronized {
- logInfo("Generating RDDs for time " + time)
- outputStreams.flatMap(outputStream => outputStream.generateJob(time))
+ logInfo("Generating jobs for time " + time)
+ val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time))
+ logInfo("Generated " + jobs.length + " jobs for time " + time)
+ jobs
}
}
@@ -97,18 +99,23 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
this.synchronized {
logInfo("Clearing old metadata for time " + time)
outputStreams.foreach(_.clearOldMetadata(time))
+ logInfo("Cleared old metadata for time " + time)
}
}
def updateCheckpointData(time: Time) {
this.synchronized {
+ logInfo("Updating checkpoint data for time " + time)
outputStreams.foreach(_.updateCheckpointData(time))
+ logInfo("Updated checkpoint data for time " + time)
}
}
def restoreCheckpointData() {
this.synchronized {
+ logInfo("Restoring checkpoint data")
outputStreams.foreach(_.restoreCheckpointData())
+ logInfo("Restored checkpoint data")
}
}
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 649494ff4a..7696c4a592 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -43,20 +43,24 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
}
private def clearJob(job: Job) {
+ var timeCleared = false
+ val time = job.time
jobs.synchronized {
- val time = job.time
val jobsOfTime = jobs.get(time)
if (jobsOfTime.isDefined) {
jobsOfTime.get -= job
if (jobsOfTime.get.isEmpty) {
- ssc.scheduler.clearOldMetadata(time)
jobs -= time
+ timeCleared = true
}
} else {
throw new Exception("Job finished for time " + job.time +
" but time does not exist in jobs")
}
}
+ if (timeCleared) {
+ ssc.scheduler.clearOldMetadata(time)
+ }
}
def getPendingTimes(): Array[Time] = {
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 57d494da83..1c4b22a898 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -20,8 +20,9 @@ class Scheduler(ssc: StreamingContext) extends Logging {
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
- longTime => generateRDDs(new Time(longTime)))
+ longTime => generateJobs(new Time(longTime)))
val graph = ssc.graph
+ var latestTime: Time = null
def start() = synchronized {
if (ssc.isCheckpointPresent) {
@@ -35,6 +36,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
def stop() = synchronized {
timer.stop()
jobManager.stop()
+ if (checkpointWriter != null) checkpointWriter.stop()
ssc.graph.stop()
logInfo("Scheduler stopped")
}
@@ -73,35 +75,38 @@ class Scheduler(ssc: StreamingContext) extends Logging {
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
logInfo("Batches to reschedule: " + timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
- graph.generateRDDs(time).foreach(jobManager.runJob)
+ graph.generateJobs(time).foreach(jobManager.runJob)
)
// Restart the timer
timer.start(restartTime.milliseconds)
- logInfo("Scheduler's timer restarted")
+ logInfo("Scheduler's timer restarted at " + restartTime)
}
- /** Generates the RDDs, clears old metadata and does checkpoint for the given time */
- def generateRDDs(time: Time) {
+ /** Generate jobs and perform checkpoint for the given `time`. */
+ def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
- graph.generateRDDs(time).foreach(jobManager.runJob)
+ graph.generateJobs(time).foreach(jobManager.runJob)
+ latestTime = time
doCheckpoint(time)
}
-
+ /**
+ * Clear old metadata assuming jobs of `time` have finished processing.
+ * And also perform checkpoint.
+ */
def clearOldMetadata(time: Time) {
ssc.graph.clearOldMetadata(time)
+ doCheckpoint(time)
}
- def doCheckpoint(time: Time) {
+ /** Perform checkpoint for the give `time`. */
+ def doCheckpoint(time: Time) = synchronized {
if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
- val startTime = System.currentTimeMillis()
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time))
- val stopTime = System.currentTimeMillis()
- logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms")
}
}
}
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 9be9d884be..d1407b7869 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -119,18 +119,15 @@ class StreamingContext private (
/**
* Set the context to periodically checkpoint the DStream operations for master
- * fault-tolerance. By default, the graph will be checkpointed every batch interval.
+ * fault-tolerance. The graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
- * @param interval checkpoint interval
*/
- def checkpoint(directory: String, interval: Duration = null) {
+ def checkpoint(directory: String) {
if (directory != null) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
checkpointDir = directory
- checkpointDuration = interval
} else {
checkpointDir = null
- checkpointDuration = null
}
}
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 8201e84a20..f14decf08b 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -38,15 +38,14 @@ case class Time(private val millis: Long) {
def max(that: Time): Time = if (this > that) this else that
def until(that: Time, interval: Duration): Seq[Time] = {
- assert(that > this, "Cannot create sequence as " + that + " not more than " + this)
- assert(
- (that - this).isMultipleOf(interval),
- "Cannot create sequence as gap between " + that + " and " +
- this + " is not multiple of " + interval
- )
(this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))
}
+ def to(that: Time, interval: Duration): Seq[Time] = {
+ (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_))
+ }
+
+
override def toString: String = (millis.toString + " ms")
}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 5bbf2b084f..03933aae93 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -314,12 +314,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Sets the context to periodically checkpoint the DStream operations for master
- * fault-tolerance. By default, the graph will be checkpointed every batch interval.
+ * fault-tolerance. The graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
- * @param interval checkpoint interval
*/
- def checkpoint(directory: String, interval: Duration = null) {
- ssc.checkpoint(directory, interval)
+ def checkpoint(directory: String) {
+ ssc.checkpoint(directory)
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
index 980ca5177e..a4db44a608 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala
@@ -1,10 +1,42 @@
package spark.streaming.dstream
-import spark.streaming.{Duration, StreamingContext, DStream}
+import spark.streaming.{Time, Duration, StreamingContext, DStream}
+/**
+ * This is the abstract base class for all input streams. This class provides to methods
+ * start() and stop() which called by the scheduler to start and stop receiving data/
+ * Input streams that can generated RDDs from new data just by running a service on
+ * the driver node (that is, without running a receiver onworker nodes) can be
+ * implemented by directly subclassing this InputDStream. For example,
+ * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for
+ * new files and generates RDDs on the new files. For implementing input streams
+ * that requires running a receiver on the worker nodes, use NetworkInputDStream
+ * as the parent class.
+ */
abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
extends DStream[T](ssc_) {
+ var lastValidTime: Time = null
+
+ /**
+ * Checks whether the 'time' is valid wrt slideDuration for generating RDD.
+ * Additionally it also ensures valid times are in strictly increasing order.
+ * This ensures that InputDStream.compute() is called strictly on increasing
+ * times.
+ */
+ override protected def isTimeValid(time: Time): Boolean = {
+ if (!super.isTimeValid(time)) {
+ false // Time not valid
+ } else {
+ // Time is valid, but check it it is more than lastValidTime
+ if (lastValidTime == null || lastValidTime <= time) {
+ logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime)
+ }
+ lastValidTime = time
+ true
+ }
+ }
+
override def dependencies = List()
override def slideDuration: Duration = {
@@ -13,7 +45,9 @@ abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContex
ssc.graph.batchDuration
}
+ /** Method called to start receiving data. Subclasses must implement this method. */
def start()
+ /** Method called to stop receiving data. Subclasses must implement this method. */
def stop()
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
index d733254ddb..e70822e5c3 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
@@ -2,8 +2,8 @@ package spark.streaming.dstream
import spark._
import spark.streaming._
-import dstream.{NetworkReceiver, NetworkInputDStream}
import storage.StorageLevel
+
import twitter4j._
import twitter4j.auth.BasicAuthorization
@@ -19,7 +19,7 @@ class TwitterInputDStream(
password: String,
filters: Seq[String],
storageLevel: StorageLevel
- ) extends NetworkInputDStream[Status](ssc_) {
+ ) extends NetworkInputDStream[Status](ssc_) {
override def createReceiver(): NetworkReceiver[Status] = {
new TwitterReceiver(username, password, filters, storageLevel)
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
index 776e676063..bdd9f4d753 100644
--- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -315,7 +315,7 @@ class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread
override def run() {
try {
// If it is the first killing, then allow the first checkpoint to be created
- var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 1000
+ var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 2000
val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime)
logInfo("Kill wait time = " + killWaitTime)
Thread.sleep(killWaitTime)