aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-22 00:43:31 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-22 00:43:31 -0800
commit364cdb679cf2b0d5e6ed7ab89628f15594d7947f (patch)
tree4d9687703a2c6249c2bcee762f575779e9d1155b
parent86057ec7c868262763d1e31b3f3c94bd43eeafb3 (diff)
downloadspark-364cdb679cf2b0d5e6ed7ab89628f15594d7947f.tar.gz
spark-364cdb679cf2b0d5e6ed7ab89628f15594d7947f.tar.bz2
spark-364cdb679cf2b0d5e6ed7ab89628f15594d7947f.zip
Refactored DStreamCheckpointData.
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala58
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala84
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala9
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala12
4 files changed, 99 insertions, 64 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index b11ef443dc..3c1861a840 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -12,7 +12,7 @@ import scala.collection.mutable.HashMap
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
/**
@@ -75,7 +75,7 @@ abstract class DStream[T: ClassManifest] (
// Checkpoint details
protected[streaming] val mustCheckpoint = false
protected[streaming] var checkpointDuration: Duration = null
- protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]())
+ protected[streaming] val checkpointData = new DStreamCheckpointData(this)
// Reference to whole DStream graph
protected[streaming] var graph: DStreamGraph = null
@@ -85,10 +85,10 @@ abstract class DStream[T: ClassManifest] (
// Duration for which the DStream requires its parent DStream to remember each RDD created
protected[streaming] def parentRememberDuration = rememberDuration
- /** Returns the StreamingContext associated with this DStream */
+ /** Return the StreamingContext associated with this DStream */
def context() = ssc
- /** Persists the RDDs of this DStream with the given storage level */
+ /** Persist the RDDs of this DStream with the given storage level */
def persist(level: StorageLevel): DStream[T] = {
if (this.isInitialized) {
throw new UnsupportedOperationException(
@@ -342,40 +342,10 @@ abstract class DStream[T: ClassManifest] (
*/
protected[streaming] def updateCheckpointData(currentTime: Time) {
logInfo("Updating checkpoint data for time " + currentTime)
-
- // Get the checkpointed RDDs from the generated RDDs
- val newRdds = generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
- .map(x => (x._1, x._2.getCheckpointFile.get))
-
- // Make a copy of the existing checkpoint data (checkpointed RDDs)
- val oldRdds = checkpointData.rdds.clone()
-
- // If the new checkpoint data has checkpoints then replace existing with the new one
- if (newRdds.size > 0) {
- checkpointData.rdds.clear()
- checkpointData.rdds ++= newRdds
- }
-
- // Make parent DStreams update their checkpoint data
+ checkpointData.update()
dependencies.foreach(_.updateCheckpointData(currentTime))
-
- // TODO: remove this, this is just for debugging
- newRdds.foreach {
- case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") }
- }
-
- if (newRdds.size > 0) {
- (oldRdds -- newRdds.keySet).foreach {
- case (time, data) => {
- val path = new Path(data.toString)
- val fs = path.getFileSystem(new Configuration())
- fs.delete(path, true)
- logInfo("Deleted checkpoint file '" + path + "' for time " + time)
- }
- }
- }
- logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.rdds.size + " checkpoints, "
- + "[" + checkpointData.rdds.mkString(",") + "]")
+ checkpointData.cleanup()
+ logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
}
/**
@@ -386,14 +356,8 @@ abstract class DStream[T: ClassManifest] (
*/
protected[streaming] def restoreCheckpointData() {
// Create RDDs from the checkpoint data
- logInfo("Restoring checkpoint data from " + checkpointData.rdds.size + " checkpointed RDDs")
- checkpointData.rdds.foreach {
- case(time, data) => {
- logInfo("Restoring checkpointed RDD for time " + time + " from file '" + data.toString + "'")
- val rdd = ssc.sc.checkpointFile[T](data.toString)
- generatedRDDs += ((time, rdd))
- }
- }
+ logInfo("Restoring checkpoint data from " + checkpointData.checkpointFiles.size + " checkpointed RDDs")
+ checkpointData.restore()
dependencies.foreach(_.restoreCheckpointData())
logInfo("Restored checkpoint data")
}
@@ -651,7 +615,3 @@ abstract class DStream[T: ClassManifest] (
ssc.registerOutputStream(this)
}
}
-
-private[streaming]
-case class DStreamCheckpointData(rdds: HashMap[Time, Any])
-
diff --git a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
new file mode 100644
index 0000000000..abf903293f
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
@@ -0,0 +1,84 @@
+package spark.streaming
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.conf.Configuration
+import collection.mutable.HashMap
+import spark.Logging
+
+
+
+private[streaming]
+class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
+ extends Serializable with Logging {
+ private[streaming] val checkpointFiles = new HashMap[Time, String]()
+ @transient private lazy val fileSystem =
+ new Path(dstream.context.checkpointDir).getFileSystem(new Configuration())
+ @transient private var lastCheckpointFiles: HashMap[Time, String] = null
+
+ /**
+ * Update the checkpoint data of the DStream. Default implementation records the checkpoint files to
+ * which the generate RDDs of the DStream has been saved.
+ */
+ def update() {
+
+ // Get the checkpointed RDDs from the generated RDDs
+ val newCheckpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
+ .map(x => (x._1, x._2.getCheckpointFile.get))
+
+ // Make a copy of the existing checkpoint data (checkpointed RDDs)
+ lastCheckpointFiles = checkpointFiles.clone()
+
+ // If the new checkpoint data has checkpoints then replace existing with the new one
+ if (newCheckpointFiles.size > 0) {
+ checkpointFiles.clear()
+ checkpointFiles ++= newCheckpointFiles
+ }
+
+ // TODO: remove this, this is just for debugging
+ newCheckpointFiles.foreach {
+ case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") }
+ }
+ }
+
+ /**
+ * Cleanup old checkpoint data. Default implementation, cleans up old checkpoint files.
+ */
+ def cleanup() {
+ // If there is at least on checkpoint file in the current checkpoint files,
+ // then delete the old checkpoint files.
+ if (checkpointFiles.size > 0 && lastCheckpointFiles != null) {
+ (lastCheckpointFiles -- checkpointFiles.keySet).foreach {
+ case (time, file) => {
+ try {
+ val path = new Path(file)
+ fileSystem.delete(path, true)
+ logInfo("Deleted checkpoint file '" + file + "' for time " + time)
+ } catch {
+ case e: Exception =>
+ logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e)
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Restore the checkpoint data. Default implementation restores the RDDs from their
+ * checkpoint files.
+ */
+ def restore() {
+ // Create RDDs from the checkpoint data
+ checkpointFiles.foreach {
+ case(time, file) => {
+ logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'")
+ dstream.generatedRDDs += ((time, dstream.context.sc.checkpointFile[T](file)))
+ }
+ }
+ }
+
+ override def toString() = {
+ "[\n" + checkpointFiles.size + "\n" + checkpointFiles.mkString("\n") + "\n]"
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 2b4740bdf7..760d9b5cf3 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -19,15 +19,6 @@ import scala.collection.JavaConversions._
// Key for a specific Kafka Partition: (broker, topic, group, part)
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
-// NOT USED - Originally intended for fault-tolerance
-// Metadata for a Kafka Stream that it sent to the Master
-private[streaming]
-case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long])
-// NOT USED - Originally intended for fault-tolerance
-// Checkpoint data specific to a KafkaInputDstream
-private[streaming]
-case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
- savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
/**
* Input stream that pulls messages from a Kafka Broker.
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index d2f32c189b..58da4ee539 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -63,9 +63,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// then check whether some RDD has been checkpointed or not
ssc.start()
runStreamsWithRealDelay(ssc, firstNumBatches)
- logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.rdds.mkString(",\n") + "]")
- assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before first failure")
- stateStream.checkpointData.rdds.foreach {
+ logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
+ assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
+ stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist")
@@ -74,7 +74,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a further time such that previous checkpoint files in the stream would be deleted
// and check whether the earlier checkpoint files are deleted
- val checkpointFiles = stateStream.checkpointData.rdds.map(x => new File(x._2.toString))
+ val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
runStreamsWithRealDelay(ssc, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
@@ -91,8 +91,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// is present in the checkpoint data or not
ssc.start()
runStreamsWithRealDelay(ssc, 1)
- assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before second failure")
- stateStream.checkpointData.rdds.foreach {
+ assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
+ stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(),