diff options
author | Denny <dennybritz@gmail.com> | 2012-11-09 12:26:17 -0800 |
---|---|---|
committer | Denny <dennybritz@gmail.com> | 2012-11-09 12:26:17 -0800 |
commit | 2e8f2ee4adbe078a690dbadfa1dbd74fdc824d89 (patch) | |
tree | ecd8e96a1e1760c75a220e90ccd422aaf8add7db /streaming/src/main | |
parent | e5a09367870be757a0abb3e2ad7a53e74110b033 (diff) | |
parent | cc2a65f54715ff0990d5873d50eec0dedf64d409 (diff) | |
download | spark-2e8f2ee4adbe078a690dbadfa1dbd74fdc824d89.tar.gz spark-2e8f2ee4adbe078a690dbadfa1dbd74fdc824d89.tar.bz2 spark-2e8f2ee4adbe078a690dbadfa1dbd74fdc824d89.zip |
Merge branch 'dev' of github.com:radlab/spark into kafka
Conflicts:
streaming/src/main/scala/spark/streaming/DStream.scala
Diffstat (limited to 'streaming/src/main')
4 files changed, 27 insertions, 12 deletions
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index 1643f45ffb..a70fb8f73a 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -32,7 +32,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val file = new Path(path, "graph") val conf = new Configuration() val fs = file.getFileSystem(conf) - logDebug("Saved checkpoint for time " + checkpointTime + " to file '" + file + "'") + logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") if (fs.exists(file)) { val bkFile = new Path(file.getParent, file.getName + ".bk") FileUtil.copy(fs, file, fs, bkFile, true, true, conf) @@ -43,7 +43,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) oos.writeObject(this) oos.close() fs.close() - logInfo("Saved checkpoint for time " + checkpointTime + " to file '" + file + "'") + logInfo("Checkpoint of streaming context for time " + checkpointTime + " saved successfully to file '" + file + "'") } def toBytes(): Array[Byte] = { @@ -58,7 +58,6 @@ object Checkpoint extends Logging { val fs = new Path(path).getFileSystem(new Configuration()) val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk")) - var detailedLog: String = "" attempts.foreach(file => { if (fs.exists(file)) { @@ -76,6 +75,7 @@ object Checkpoint extends Logging { fs.close() cp.validate() logInfo("Checkpoint successfully loaded from file '" + file + "'") + logInfo("Checkpoint was generated at time " + cp.checkpointTime) return cp } catch { case e: Exception => diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index f891730317..3219919a24 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -297,16 +297,20 @@ extends Serializable with Logging { * this method to save custom checkpoint data. */ protected[streaming] def updateCheckpointData(currentTime: Time) { + // Get the checkpointed RDDs from the generated RDDs val newRdds = generatedRDDs.filter(_._2.getCheckpointData() != null) .map(x => (x._1, x._2.getCheckpointData())) + // Make a copy of the existing checkpoint data val oldRdds = checkpointData.rdds.clone() + // If the new checkpoint has checkpoints then replace existing with the new one if (newRdds.size > 0) { checkpointData.rdds.clear() checkpointData.rdds ++= newRdds } - + // Make dependencies update their checkpoint data dependencies.foreach(_.updateCheckpointData(currentTime)) + // TODO: remove this, this is just for debugging newRdds.foreach { case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") } } @@ -321,7 +325,7 @@ extends Serializable with Logging { } } } - logInfo("Updated checkpoint data") + logInfo("Updated checkpoint data for time " + currentTime) } /** @@ -331,6 +335,7 @@ extends Serializable with Logging { * override the updateCheckpointData() method would also need to override this method. */ protected[streaming] def restoreCheckpointData() { + // Create RDDs from the checkpoint data logInfo("Restoring checkpoint data from " + checkpointData.rdds.size + " checkpointed RDDs") checkpointData.rdds.foreach { case(time, data) => { @@ -339,6 +344,7 @@ extends Serializable with Logging { } } dependencies.foreach(_.restoreCheckpointData()) + logInfo("Restored checkpoint data") } @throws(classOf[IOException]) diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index 2b3f5a4829..de0fb1f3ad 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -29,10 +29,12 @@ extends Logging { // on this first trigger time of the timer. if (ssc.isCheckpointPresent) { // If manual clock is being used for testing, then - // set manual clock to the last checkpointed time + // either set the manual clock to the last checkpointed time, + // or if the property is defined set it to that time if (clock.isInstanceOf[ManualClock]) { val lastTime = ssc.getInitialCheckpoint.checkpointTime.milliseconds - clock.asInstanceOf[ManualClock].setTime(lastTime) + val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong + clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } timer.restart(graph.zeroTime.milliseconds) logInfo("Scheduler's timer restarted") diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 770fd61498..d68d2632e7 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -18,7 +18,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import java.util.UUID -class StreamingContext ( +final class StreamingContext ( sc_ : SparkContext, cp_ : Checkpoint ) extends Logging { @@ -61,12 +61,12 @@ class StreamingContext ( } } - val nextNetworkInputStreamId = new AtomicInteger(0) - var networkInputTracker: NetworkInputTracker = null + private[streaming] val nextNetworkInputStreamId = new AtomicInteger(0) + private[streaming] var networkInputTracker: NetworkInputTracker = null private[streaming] var checkpointDir: String = { if (isCheckpointPresent) { - sc.setCheckpointDir(cp_.checkpointDir, true) + sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true) cp_.checkpointDir } else { null @@ -87,7 +87,7 @@ class StreamingContext ( def checkpoint(dir: String, interval: Time) { if (dir != null) { - sc.setCheckpointDir(new Path(dir, "rdds-" + UUID.randomUUID.toString).toString) + sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir)) checkpointDir = dir checkpointInterval = interval } else { @@ -240,8 +240,11 @@ class StreamingContext ( } def doCheckpoint(currentTime: Time) { + val startTime = System.currentTimeMillis() graph.updateCheckpointData(currentTime) new Checkpoint(this, currentTime).save(checkpointDir) + val stopTime = System.currentTimeMillis() + logInfo("Checkpointing the graph took " + (stopTime - startTime) + " ms") } } @@ -260,5 +263,9 @@ object StreamingContext { prefix + "-" + time.milliseconds + "." + suffix } } + + def getSparkCheckpointDir(sscCheckpointDir: String): String = { + new Path(sscCheckpointDir, UUID.randomUUID.toString).toString + } } |