diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-22 00:43:31 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-22 00:43:31 -0800 |
commit | 364cdb679cf2b0d5e6ed7ab89628f15594d7947f (patch) | |
tree | 4d9687703a2c6249c2bcee762f575779e9d1155b | |
parent | 86057ec7c868262763d1e31b3f3c94bd43eeafb3 (diff) | |
download | spark-364cdb679cf2b0d5e6ed7ab89628f15594d7947f.tar.gz spark-364cdb679cf2b0d5e6ed7ab89628f15594d7947f.tar.bz2 spark-364cdb679cf2b0d5e6ed7ab89628f15594d7947f.zip |
Refactored DStreamCheckpointData.
4 files changed, 99 insertions, 64 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index b11ef443dc..3c1861a840 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -12,7 +12,7 @@ import scala.collection.mutable.HashMap import java.io.{ObjectInputStream, IOException, ObjectOutputStream} -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration /** @@ -75,7 +75,7 @@ abstract class DStream[T: ClassManifest] ( // Checkpoint details protected[streaming] val mustCheckpoint = false protected[streaming] var checkpointDuration: Duration = null - protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]()) + protected[streaming] val checkpointData = new DStreamCheckpointData(this) // Reference to whole DStream graph protected[streaming] var graph: DStreamGraph = null @@ -85,10 +85,10 @@ abstract class DStream[T: ClassManifest] ( // Duration for which the DStream requires its parent DStream to remember each RDD created protected[streaming] def parentRememberDuration = rememberDuration - /** Returns the StreamingContext associated with this DStream */ + /** Return the StreamingContext associated with this DStream */ def context() = ssc - /** Persists the RDDs of this DStream with the given storage level */ + /** Persist the RDDs of this DStream with the given storage level */ def persist(level: StorageLevel): DStream[T] = { if (this.isInitialized) { throw new UnsupportedOperationException( @@ -342,40 +342,10 @@ abstract class DStream[T: ClassManifest] ( */ protected[streaming] def updateCheckpointData(currentTime: Time) { logInfo("Updating checkpoint data for time " + currentTime) - - // Get the checkpointed RDDs from the generated RDDs - val newRdds = generatedRDDs.filter(_._2.getCheckpointFile.isDefined) - .map(x => (x._1, x._2.getCheckpointFile.get)) - - // Make a copy of the existing checkpoint data (checkpointed RDDs) - val oldRdds = checkpointData.rdds.clone() - - // If the new checkpoint data has checkpoints then replace existing with the new one - if (newRdds.size > 0) { - checkpointData.rdds.clear() - checkpointData.rdds ++= newRdds - } - - // Make parent DStreams update their checkpoint data + checkpointData.update() dependencies.foreach(_.updateCheckpointData(currentTime)) - - // TODO: remove this, this is just for debugging - newRdds.foreach { - case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") } - } - - if (newRdds.size > 0) { - (oldRdds -- newRdds.keySet).foreach { - case (time, data) => { - val path = new Path(data.toString) - val fs = path.getFileSystem(new Configuration()) - fs.delete(path, true) - logInfo("Deleted checkpoint file '" + path + "' for time " + time) - } - } - } - logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.rdds.size + " checkpoints, " - + "[" + checkpointData.rdds.mkString(",") + "]") + checkpointData.cleanup() + logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData) } /** @@ -386,14 +356,8 @@ abstract class DStream[T: ClassManifest] ( */ protected[streaming] def restoreCheckpointData() { // Create RDDs from the checkpoint data - logInfo("Restoring checkpoint data from " + checkpointData.rdds.size + " checkpointed RDDs") - checkpointData.rdds.foreach { - case(time, data) => { - logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'") - val rdd = ssc.sc.checkpointFile[T](data.toString) - generatedRDDs += ((time, rdd)) - } - } + logInfo("Restoring checkpoint data from " + checkpointData.checkpointFiles.size + " checkpointed RDDs") + checkpointData.restore() dependencies.foreach(_.restoreCheckpointData()) logInfo("Restored checkpoint data") } @@ -651,7 +615,3 @@ abstract class DStream[T: ClassManifest] ( ssc.registerOutputStream(this) } } - -private[streaming] -case class DStreamCheckpointData(rdds: HashMap[Time, Any]) - diff --git a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala new file mode 100644 index 0000000000..abf903293f --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala @@ -0,0 +1,84 @@ +package spark.streaming + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.conf.Configuration +import collection.mutable.HashMap +import spark.Logging + + + +private[streaming] +class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) + extends Serializable with Logging { + private[streaming] val checkpointFiles = new HashMap[Time, String]() + @transient private lazy val fileSystem = + new Path(dstream.context.checkpointDir).getFileSystem(new Configuration()) + @transient private var lastCheckpointFiles: HashMap[Time, String] = null + + /** + * Update the checkpoint data of the DStream. Default implementation records the checkpoint files to + * which the generate RDDs of the DStream has been saved. + */ + def update() { + + // Get the checkpointed RDDs from the generated RDDs + val newCheckpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) + .map(x => (x._1, x._2.getCheckpointFile.get)) + + // Make a copy of the existing checkpoint data (checkpointed RDDs) + lastCheckpointFiles = checkpointFiles.clone() + + // If the new checkpoint data has checkpoints then replace existing with the new one + if (newCheckpointFiles.size > 0) { + checkpointFiles.clear() + checkpointFiles ++= newCheckpointFiles + } + + // TODO: remove this, this is just for debugging + newCheckpointFiles.foreach { + case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") } + } + } + + /** + * Cleanup old checkpoint data. Default implementation, cleans up old checkpoint files. + */ + def cleanup() { + // If there is at least on checkpoint file in the current checkpoint files, + // then delete the old checkpoint files. + if (checkpointFiles.size > 0 && lastCheckpointFiles != null) { + (lastCheckpointFiles -- checkpointFiles.keySet).foreach { + case (time, file) => { + try { + val path = new Path(file) + fileSystem.delete(path, true) + logInfo("Deleted checkpoint file '" + file + "' for time " + time) + } catch { + case e: Exception => + logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) + } + } + } + } + } + + /** + * Restore the checkpoint data. Default implementation restores the RDDs from their + * checkpoint files. + */ + def restore() { + // Create RDDs from the checkpoint data + checkpointFiles.foreach { + case(time, file) => { + logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'") + dstream.generatedRDDs += ((time, dstream.context.sc.checkpointFile[T](file))) + } + } + } + + override def toString() = { + "[\n" + checkpointFiles.size + "\n" + checkpointFiles.mkString("\n") + "\n]" + } +} + diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 2b4740bdf7..760d9b5cf3 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -19,15 +19,6 @@ import scala.collection.JavaConversions._ // Key for a specific Kafka Partition: (broker, topic, group, part) case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) -// NOT USED - Originally intended for fault-tolerance -// Metadata for a Kafka Stream that it sent to the Master -private[streaming] -case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long]) -// NOT USED - Originally intended for fault-tolerance -// Checkpoint data specific to a KafkaInputDstream -private[streaming] -case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], - savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds) /** * Input stream that pulls messages from a Kafka Broker. diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index d2f32c189b..58da4ee539 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -63,9 +63,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // then check whether some RDD has been checkpointed or not ssc.start() runStreamsWithRealDelay(ssc, firstNumBatches) - logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.rdds.mkString(",\n") + "]") - assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before first failure") - stateStream.checkpointData.rdds.foreach { + logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData) + assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure") + stateStream.checkpointData.checkpointFiles.foreach { case (time, data) => { val file = new File(data.toString) assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist") @@ -74,7 +74,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Run till a further time such that previous checkpoint files in the stream would be deleted // and check whether the earlier checkpoint files are deleted - val checkpointFiles = stateStream.checkpointData.rdds.map(x => new File(x._2.toString)) + val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2)) runStreamsWithRealDelay(ssc, secondNumBatches) checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) ssc.stop() @@ -91,8 +91,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // is present in the checkpoint data or not ssc.start() runStreamsWithRealDelay(ssc, 1) - assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before second failure") - stateStream.checkpointData.rdds.foreach { + assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure") + stateStream.checkpointData.checkpointFiles.foreach { case (time, data) => { val file = new File(data.toString) assert(file.exists(), |