aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-11-09 12:26:17 -0800
committerDenny <dennybritz@gmail.com>2012-11-09 12:26:17 -0800
commit2e8f2ee4adbe078a690dbadfa1dbd74fdc824d89 (patch)
treeecd8e96a1e1760c75a220e90ccd422aaf8add7db /streaming/src/main
parente5a09367870be757a0abb3e2ad7a53e74110b033 (diff)
parentcc2a65f54715ff0990d5873d50eec0dedf64d409 (diff)
downloadspark-2e8f2ee4adbe078a690dbadfa1dbd74fdc824d89.tar.gz
spark-2e8f2ee4adbe078a690dbadfa1dbd74fdc824d89.tar.bz2
spark-2e8f2ee4adbe078a690dbadfa1dbd74fdc824d89.zip
Merge branch 'dev' of github.com:radlab/spark into kafka
Conflicts: streaming/src/main/scala/spark/streaming/DStream.scala
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala17
4 files changed, 27 insertions, 12 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 1643f45ffb..a70fb8f73a 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -32,7 +32,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val file = new Path(path, "graph")
val conf = new Configuration()
val fs = file.getFileSystem(conf)
- logDebug("Saved checkpoint for time " + checkpointTime + " to file '" + file + "'")
+ logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
if (fs.exists(file)) {
val bkFile = new Path(file.getParent, file.getName + ".bk")
FileUtil.copy(fs, file, fs, bkFile, true, true, conf)
@@ -43,7 +43,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
oos.writeObject(this)
oos.close()
fs.close()
- logInfo("Saved checkpoint for time " + checkpointTime + " to file '" + file + "'")
+ logInfo("Checkpoint of streaming context for time " + checkpointTime + " saved successfully to file '" + file + "'")
}
def toBytes(): Array[Byte] = {
@@ -58,7 +58,6 @@ object Checkpoint extends Logging {
val fs = new Path(path).getFileSystem(new Configuration())
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
- var detailedLog: String = ""
attempts.foreach(file => {
if (fs.exists(file)) {
@@ -76,6 +75,7 @@ object Checkpoint extends Logging {
fs.close()
cp.validate()
logInfo("Checkpoint successfully loaded from file '" + file + "'")
+ logInfo("Checkpoint was generated at time " + cp.checkpointTime)
return cp
} catch {
case e: Exception =>
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index f891730317..3219919a24 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -297,16 +297,20 @@ extends Serializable with Logging {
* this method to save custom checkpoint data.
*/
protected[streaming] def updateCheckpointData(currentTime: Time) {
+ // Get the checkpointed RDDs from the generated RDDs
val newRdds = generatedRDDs.filter(_._2.getCheckpointData() != null)
.map(x => (x._1, x._2.getCheckpointData()))
+ // Make a copy of the existing checkpoint data
val oldRdds = checkpointData.rdds.clone()
+ // If the new checkpoint has checkpoints then replace existing with the new one
if (newRdds.size > 0) {
checkpointData.rdds.clear()
checkpointData.rdds ++= newRdds
}
-
+ // Make dependencies update their checkpoint data
dependencies.foreach(_.updateCheckpointData(currentTime))
+ // TODO: remove this, this is just for debugging
newRdds.foreach {
case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") }
}
@@ -321,7 +325,7 @@ extends Serializable with Logging {
}
}
}
- logInfo("Updated checkpoint data")
+ logInfo("Updated checkpoint data for time " + currentTime)
}
/**
@@ -331,6 +335,7 @@ extends Serializable with Logging {
* override the updateCheckpointData() method would also need to override this method.
*/
protected[streaming] def restoreCheckpointData() {
+ // Create RDDs from the checkpoint data
logInfo("Restoring checkpoint data from " + checkpointData.rdds.size + " checkpointed RDDs")
checkpointData.rdds.foreach {
case(time, data) => {
@@ -339,6 +344,7 @@ extends Serializable with Logging {
}
}
dependencies.foreach(_.restoreCheckpointData())
+ logInfo("Restored checkpoint data")
}
@throws(classOf[IOException])
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 2b3f5a4829..de0fb1f3ad 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -29,10 +29,12 @@ extends Logging {
// on this first trigger time of the timer.
if (ssc.isCheckpointPresent) {
// If manual clock is being used for testing, then
- // set manual clock to the last checkpointed time
+ // either set the manual clock to the last checkpointed time,
+ // or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.getInitialCheckpoint.checkpointTime.milliseconds
- clock.asInstanceOf[ManualClock].setTime(lastTime)
+ val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
+ clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
timer.restart(graph.zeroTime.milliseconds)
logInfo("Scheduler's timer restarted")
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 770fd61498..d68d2632e7 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -18,7 +18,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import java.util.UUID
-class StreamingContext (
+final class StreamingContext (
sc_ : SparkContext,
cp_ : Checkpoint
) extends Logging {
@@ -61,12 +61,12 @@ class StreamingContext (
}
}
- val nextNetworkInputStreamId = new AtomicInteger(0)
- var networkInputTracker: NetworkInputTracker = null
+ private[streaming] val nextNetworkInputStreamId = new AtomicInteger(0)
+ private[streaming] var networkInputTracker: NetworkInputTracker = null
private[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
- sc.setCheckpointDir(cp_.checkpointDir, true)
+ sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true)
cp_.checkpointDir
} else {
null
@@ -87,7 +87,7 @@ class StreamingContext (
def checkpoint(dir: String, interval: Time) {
if (dir != null) {
- sc.setCheckpointDir(new Path(dir, "rdds-" + UUID.randomUUID.toString).toString)
+ sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir))
checkpointDir = dir
checkpointInterval = interval
} else {
@@ -240,8 +240,11 @@ class StreamingContext (
}
def doCheckpoint(currentTime: Time) {
+ val startTime = System.currentTimeMillis()
graph.updateCheckpointData(currentTime)
new Checkpoint(this, currentTime).save(checkpointDir)
+ val stopTime = System.currentTimeMillis()
+ logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms")
}
}
@@ -260,5 +263,9 @@ object StreamingContext {
prefix + "-" + time.milliseconds + "." + suffix
}
}
+
+ def getSparkCheckpointDir(sscCheckpointDir: String): String = {
+ new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
+ }
}