aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-05 16:11:50 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-05 16:11:50 -0800
commit395167f2b2a1906cde23b1f3ddc2808514bce47b (patch)
treee6f47c12b03494226116042a27bbd9545bffc0b1 /streaming
parent72b2303f99bd652fc4bdaa929f37731a7ba8f640 (diff)
downloadspark-395167f2b2a1906cde23b1f3ddc2808514bce47b.tar.gz
spark-395167f2b2a1906cde23b1f3ddc2808514bce47b.tar.bz2
spark-395167f2b2a1906cde23b1f3ddc2808514bce47b.zip
Made more bug fixes for checkpointing.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala77
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala77
6 files changed, 97 insertions, 66 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 6b4b05103f..1643f45ffb 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -6,7 +6,6 @@ import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.conf.Configuration
import java.io.{InputStream, ObjectStreamClass, ObjectInputStream, ObjectOutputStream}
-import sys.process.processInternal
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 2fecbe0acf..922ff5088d 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -1,6 +1,7 @@
package spark.streaming
-import spark.streaming.StreamingContext._
+import StreamingContext._
+import Time._
import spark._
import spark.SparkContext._
@@ -12,8 +13,7 @@ import scala.collection.mutable.HashMap
import java.util.concurrent.ArrayBlockingQueue
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
-import scala.Some
-import collection.mutable
+
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
@@ -206,10 +206,10 @@ extends Serializable with Logging {
}
/**
- * This method either retrieves a precomputed RDD of this DStream,
- * or computes the RDD (if the time is valid)
+ * Retrieves a precomputed RDD of this DStream, or computes the RDD. This is an internal
+ * method that should not be called directly.
*/
- def getOrCompute(time: Time): Option[RDD[T]] = {
+ protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
// If this DStream was not initialized (i.e., zeroTime not set), then do it
// If RDD was already generated, then retrieve it from HashMap
generatedRDDs.get(time) match {
@@ -245,10 +245,12 @@ extends Serializable with Logging {
}
/**
- * This method generates a SparkStreaming job for the given time
- * and may required to be overriden by subclasses
+ * Generates a SparkStreaming job for the given time. This is an internal method that
+ * should not be called directly. This default implementation creates a job
+ * that materializes the corresponding RDD. Subclasses of DStream may override this
+ * (eg. PerRDDForEachDStream).
*/
- def generateJob(time: Time): Option[Job] = {
+ protected[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
@@ -261,6 +263,9 @@ extends Serializable with Logging {
}
}
+ /**
+ * Dereferences RDDs that are older than rememberDuration.
+ */
protected[streaming] def forgetOldRDDs(time: Time) {
val keys = generatedRDDs.keys
var numForgotten = 0
@@ -276,42 +281,46 @@ extends Serializable with Logging {
}
/**
- * Refreshes the list of checkpointed RDDs that will be saved along with checkpoint of this stream.
- * Along with that it forget old checkpoint files.
+ * Refreshes the list of checkpointed RDDs that will be saved along with checkpoint of
+ * this stream. This is an internal method that should not be called directly. This is
+ * a default implementation that saves only the file names of the checkpointed RDDs to
+ * checkpointData. Subclasses of DStream (especially those of InputDStream) may override
+ * this method to save custom checkpoint data.
*/
- protected[streaming] def updateCheckpointData() {
-
- // TODO (tdas): This code can be simplified. Its kept verbose to aid debugging.
- val checkpointedRDDs = generatedRDDs.filter(_._2.getCheckpointData() != null)
- val removedCheckpointData = checkpointData.filter(x => !generatedRDDs.contains(x._1))
-
- checkpointData.clear()
- checkpointedRDDs.foreach {
- case (time, rdd) => {
- val data = rdd.getCheckpointData()
- assert(data != null)
- checkpointData += ((time, data))
- logInfo("Added checkpointed RDD " + rdd + " for time " + time + " to stream checkpoint")
- }
+ protected[streaming] def updateCheckpointData(currentTime: Time) {
+ val newCheckpointData = generatedRDDs.filter(_._2.getCheckpointData() != null)
+ .map(x => (x._1, x._2.getCheckpointData()))
+ val oldCheckpointData = checkpointData.clone()
+ if (newCheckpointData.size > 0) {
+ checkpointData.clear()
+ checkpointData ++= newCheckpointData
+ }
+
+ dependencies.foreach(_.updateCheckpointData(currentTime))
+
+ newCheckpointData.foreach {
+ case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") }
}
- dependencies.foreach(_.updateCheckpointData())
- // If at least one checkpoint is present, then delete old checkpoints
- if (checkpointData.size > 0) {
- // Delete the checkpoint RDD files that are not needed any more
- removedCheckpointData.foreach {
- case (time: Time, file: String) => {
- val path = new Path(file)
+ if (newCheckpointData.size > 0) {
+ (oldCheckpointData -- newCheckpointData.keySet).foreach {
+ case (time, data) => {
+ val path = new Path(data.toString)
val fs = path.getFileSystem(new Configuration())
fs.delete(path, true)
- logInfo("Deleted checkpoint file '" + file + "' for time " + time)
+ logInfo("Deleted checkpoint file '" + path + "' for time " + time)
}
}
}
-
logInfo("Updated checkpoint data")
}
+ /**
+ * Restores the RDDs in generatedRDDs from the checkpointData. This is an internal method
+ * that should not be called directly. This is a default implementation that recreates RDDs
+ * from the checkpoint file names stored in checkpointData. Subclasses of DStream that
+ * override the updateCheckpointData() method would also need to override this method.
+ */
protected[streaming] def restoreCheckpointData() {
logInfo("Restoring checkpoint data from " + checkpointData.size + " checkpointed RDDs")
checkpointData.foreach {
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index 7437f4402d..246522838a 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -90,9 +90,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
}
- private[streaming] def updateCheckpointData() {
+ private[streaming] def updateCheckpointData(time: Time) {
this.synchronized {
- outputStreams.foreach(_.updateCheckpointData())
+ outputStreams.foreach(_.updateCheckpointData(time))
}
}
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index fb36ab9dc9..25caaf7d39 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -227,7 +227,7 @@ class StreamingContext (
}
def doCheckpoint(currentTime: Time) {
- graph.updateCheckpointData()
+ graph.updateCheckpointData(currentTime)
new Checkpoint(this, currentTime).save(checkpointDir)
}
}
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 2ba6502971..480d292d7c 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -43,7 +43,7 @@ object Time {
implicit def toTime(long: Long) = Time(long)
- implicit def toLong(time: Time) = time.milliseconds
+ implicit def toLong(time: Time) = time.milliseconds
}
object Milliseconds {
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index aa8ded513c..9fdfd50be2 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -6,7 +6,7 @@ import runtime.RichInt
import org.scalatest.BeforeAndAfter
import org.apache.commons.io.FileUtils
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
-import util.ManualClock
+import util.{Clock, ManualClock}
class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
@@ -31,12 +31,14 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
test("basic stream+rdd recovery") {
assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
+ assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration")
+
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
- val checkpointingInterval = Seconds(2)
+ val stateStreamCheckpointInterval = Seconds(2)
// this ensure checkpointing occurs at least once
- val firstNumBatches = (checkpointingInterval.millis / batchDuration.millis) * 2
+ val firstNumBatches = (stateStreamCheckpointInterval.millis / batchDuration.millis) * 2
val secondNumBatches = firstNumBatches
// Setup the streams
@@ -47,7 +49,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
st.map(x => (x, 1))
.updateStateByKey[RichInt](updateFunc)
- .checkpoint(checkpointingInterval)
+ .checkpoint(stateStreamCheckpointInterval)
.map(t => (t._1, t._2.self))
}
val ssc = setupStreams(input, operation)
@@ -56,35 +58,22 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a time such that at least one RDD in the stream should have been checkpointed
ssc.start()
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- logInfo("Manual clock before advancing = " + clock.time)
- for (i <- 1 to firstNumBatches.toInt) {
- clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(batchDuration.milliseconds)
- }
- logInfo("Manual clock after advancing = " + clock.time)
- Thread.sleep(batchDuration.milliseconds)
+ advanceClock(clock, firstNumBatches)
// Check whether some RDD has been checkpointed or not
logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.mkString(",\n") + "]")
- assert(!stateStream.checkpointData.isEmpty, "No checkpointed RDDs in state stream")
+ assert(!stateStream.checkpointData.isEmpty, "No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.foreach {
case (time, data) => {
val file = new File(data.toString)
- assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " does not exist")
+ assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist")
}
}
- val checkpointFiles = stateStream.checkpointData.map(x => new File(x._2.toString))
// Run till a further time such that previous checkpoint files in the stream would be deleted
- logInfo("Manual clock before advancing = " + clock.time)
- for (i <- 1 to secondNumBatches.toInt) {
- clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(batchDuration.milliseconds)
- }
- logInfo("Manual clock after advancing = " + clock.time)
- Thread.sleep(batchDuration.milliseconds)
-
- // Check whether the earlier checkpoint files are deleted
+ // and check whether the earlier checkpoint files are deleted
+ val checkpointFiles = stateStream.checkpointData.map(x => new File(x._2.toString))
+ advanceClock(clock, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
// Restart stream computation using the checkpoint file and check whether
@@ -93,11 +82,35 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val sscNew = new StreamingContext(checkpointDir)
val stateStreamNew = sscNew.graph.getOutputStreams().head.dependencies.head.dependencies.head
logInfo("Restored data of state stream = \n[" + stateStreamNew.generatedRDDs.mkString("\n") + "]")
- assert(!stateStreamNew.generatedRDDs.isEmpty, "No restored RDDs in state stream")
+ assert(!stateStreamNew.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure")
+
+
+ // Run one batch to generate a new checkpoint file
+ sscNew.start()
+ val clockNew = sscNew.scheduler.clock.asInstanceOf[ManualClock]
+ advanceClock(clockNew, 1)
+
+ // Check whether some RDD is present in the checkpoint data or not
+ assert(!stateStreamNew.checkpointData.isEmpty, "No checkpointed RDDs in state stream before second failure")
+ stateStream.checkpointData.foreach {
+ case (time, data) => {
+ val file = new File(data.toString)
+ assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist")
+ }
+ }
+
+ // Restart stream computation from the new checkpoint file to see whether that file has
+ // correct checkpoint data
sscNew.stop()
+ val sscNewNew = new StreamingContext(checkpointDir)
+ val stateStreamNewNew = sscNew.graph.getOutputStreams().head.dependencies.head.dependencies.head
+ logInfo("Restored data of state stream = \n[" + stateStreamNew.generatedRDDs.mkString("\n") + "]")
+ assert(!stateStreamNewNew.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure")
+ sscNewNew.start()
+ advanceClock(sscNewNew.scheduler.clock.asInstanceOf[ManualClock], 1)
+ sscNewNew.stop()
}
-
test("map and reduceByKey") {
testCheckpointedOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq() ),
@@ -163,11 +176,21 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Restart and complete the computation from checkpoint file
logInfo(
"\n-------------------------------------------\n" +
- " Restarting stream computation " +
- "\n-------------------------------------------\n"
+ " Restarting stream computation " +
+ "\n-------------------------------------------\n"
)
val sscNew = new StreamingContext(checkpointDir)
val outputNew = runStreams[V](sscNew, nextNumBatches, nextNumExpectedOutputs)
verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true)
}
+
+ def advanceClock(clock: ManualClock, numBatches: Long) {
+ logInfo("Manual clock before advancing = " + clock.time)
+ for (i <- 1 to numBatches.toInt) {
+ clock.addToTime(batchDuration.milliseconds)
+ Thread.sleep(batchDuration.milliseconds)
+ }
+ logInfo("Manual clock after advancing = " + clock.time)
+ Thread.sleep(batchDuration.milliseconds)
+ }
} \ No newline at end of file