aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-11 14:01:36 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-11 14:01:36 -0800
commit5e9ce83d682d6198cda4631faf11cb53fcccf07f (patch)
treef88ed888af44976a4bf3479fc2cf6f5224fc4b6b /streaming
parent6169fe14a140146602fb07cfcd13eee6efad98f9 (diff)
downloadspark-5e9ce83d682d6198cda4631faf11cb53fcccf07f.tar.gz
spark-5e9ce83d682d6198cda4631faf11cb53fcccf07f.tar.bz2
spark-5e9ce83d682d6198cda4631faf11cb53fcccf07f.zip
Fixed multiple file stream and checkpointing bugs.
- Made file stream more robust to transient failures. - Changed Spark.setCheckpointDir API to not have the second 'useExisting' parameter. Spark will always create a unique directory for checkpointing underneath the directory provide to the funtion. - Fixed bug wrt local relative paths as checkpoint directory. - Made DStream and RDD checkpointing use SparkContext.hadoopConfiguration, so that more HDFS compatible filesystems are supported for checkpointing.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala15
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala130
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala38
5 files changed, 117 insertions, 74 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 9271914eb5..bcf5e6b1e6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.Logging
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.deploy.SparkHadoopUtil
private[streaming]
@@ -57,7 +58,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
* Convenience class to speed up the writing of graph checkpoint to file
*/
private[streaming]
-class CheckpointWriter(checkpointDir: String) extends Logging {
+class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends Logging {
val file = new Path(checkpointDir, "graph")
// The file to which we actually write - and then "move" to file.
private val writeFile = new Path(file.getParent, file.getName + ".next")
@@ -65,8 +66,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
private var stopped = false
- val conf = new Configuration()
- var fs = file.getFileSystem(conf)
+ var fs = file.getFileSystem(hadoopConf)
val maxAttempts = 3
val executor = Executors.newFixedThreadPool(1)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
index ed892e33e6..4cd8695df5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala
@@ -29,7 +29,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
- new CheckpointWriter(ssc.checkpointDir)
+ new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
} else {
null
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 70bf902143..d6fc2a19f4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import twitter4j.Status
import twitter4j.auth.Authorization
+import org.apache.spark.deploy.SparkHadoopUtil
/**
@@ -85,7 +86,6 @@ class StreamingContext private (
null, batchDuration)
}
-
/**
* Re-create a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
@@ -139,7 +139,7 @@ class StreamingContext private (
protected[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
- sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true)
+ sc.setCheckpointDir(cp_.checkpointDir)
cp_.checkpointDir
} else {
null
@@ -173,8 +173,12 @@ class StreamingContext private (
*/
def checkpoint(directory: String) {
if (directory != null) {
- sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
- checkpointDir = directory
+ val path = new Path(directory)
+ val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
+ fs.mkdirs(path)
+ val fullPath = fs.getFileStatus(path).getPath().toString
+ sc.setCheckpointDir(fullPath)
+ checkpointDir = fullPath
} else {
checkpointDir = null
}
@@ -595,8 +599,9 @@ object StreamingContext {
prefix + "-" + time.milliseconds + "." + suffix
}
}
-
+ /*
protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = {
new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
}
+ */
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index fea0573b77..1a8db3ab59 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -39,8 +39,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
// Latest file mod time seen till any point of time
- private val lastModTimeFiles = new HashSet[String]()
- private var lastModTime = 0L
+ private val prevModTimeFiles = new HashSet[String]()
+ private var prevModTime = 0L
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
@@ -48,11 +48,11 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
override def start() {
if (newFilesOnly) {
- lastModTime = graph.zeroTime.milliseconds
+ prevModTime = graph.zeroTime.milliseconds
} else {
- lastModTime = 0
+ prevModTime = 0
}
- logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
+ logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly)
}
override def stop() { }
@@ -67,55 +67,22 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
* the previous call.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
- assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime)
-
- // Create the filter for selecting new files
- val newFilter = new PathFilter() {
- // Latest file mod time seen in this round of fetching files and its corresponding files
- var latestModTime = 0L
- val latestModTimeFiles = new HashSet[String]()
-
- def accept(path: Path): Boolean = {
- if (!filter(path)) { // Reject file if it does not satisfy filter
- logDebug("Rejected by filter " + path)
- return false
- } else { // Accept file only if
- val modTime = fs.getFileStatus(path).getModificationTime()
- logDebug("Mod time for " + path + " is " + modTime)
- if (modTime < lastModTime) {
- logDebug("Mod time less than last mod time")
- return false // If the file was created before the last time it was called
- } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
- logDebug("Mod time equal to last mod time, but file considered already")
- return false // If the file was created exactly as lastModTime but not reported yet
- } else if (modTime > validTime.milliseconds) {
- logDebug("Mod time more than valid time")
- return false // If the file was created after the time this function call requires
- }
- if (modTime > latestModTime) {
- latestModTime = modTime
- latestModTimeFiles.clear()
- logDebug("Latest mod time updated to " + latestModTime)
- }
- latestModTimeFiles += path.toString
- logDebug("Accepted " + path)
- return true
- }
- }
- }
- logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
- val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString)
+ assert(validTime.milliseconds >= prevModTime,
+ "Trying to get new files for really old time [" + validTime + " < " + prevModTime)
+
+ // Find new files
+ val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
if (newFiles.length > 0) {
// Update the modification time and the files processed for that modification time
- if (lastModTime != newFilter.latestModTime) {
- lastModTime = newFilter.latestModTime
- lastModTimeFiles.clear()
+ if (prevModTime < latestModTime) {
+ prevModTime = latestModTime
+ prevModTimeFiles.clear()
}
- lastModTimeFiles ++= newFilter.latestModTimeFiles
- logDebug("Last mod time updated to " + lastModTime)
+ prevModTimeFiles ++= latestModTimeFiles
+ logDebug("Last mod time updated to " + prevModTime)
}
- files += ((validTime, newFiles))
+ files += ((validTime, newFiles.toArray))
Some(filesToRDD(newFiles))
}
@@ -130,8 +97,30 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
+ /**
+ * Finds files which have modification timestamp <= current time. If some files are being
+ * deleted in the directory, then it can generate transient exceptions. Hence, multiple
+ * attempts are made to handle these transient exceptions. Returns 3-tuple
+ * (new files found, latest modification time among them, files with latest modification time)
+ */
+ private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
+ logDebug("Trying to get new files for time " + currentTime)
+ var attempts = 0
+ while (attempts < FileInputDStream.MAX_ATTEMPTS) {
+ attempts += 1
+ try {
+ val filter = new CustomPathFilter(currentTime)
+ val newFiles = fs.listStatus(path, filter)
+ return (newFiles.map(_.getPath.toString), filter.latestModTime, filter.latestModTimeFiles.toSeq)
+ } catch {
+ case ioe: IOException => logWarning("Attempt " + attempts + " to get new files failed", ioe)
+ }
+ }
+ (Seq(), -1, Seq())
+ }
+
/** Generate one RDD from an array of files */
- protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
+ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
new UnionRDD(
context.sparkContext,
files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
@@ -189,10 +178,51 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
}
}
+
+ /**
+ * PathFilter to find new files that have modification timestamps <= current time, but have not
+ * been seen before (i.e. the file should not be in lastModTimeFiles)
+ * @param currentTime
+ */
+ private[streaming]
+ class CustomPathFilter(currentTime: Long) extends PathFilter() {
+ // Latest file mod time seen in this round of fetching files and its corresponding files
+ var latestModTime = 0L
+ val latestModTimeFiles = new HashSet[String]()
+
+ def accept(path: Path): Boolean = {
+ if (!filter(path)) { // Reject file if it does not satisfy filter
+ logDebug("Rejected by filter " + path)
+ return false
+ } else { // Accept file only if
+ val modTime = fs.getFileStatus(path).getModificationTime()
+ logDebug("Mod time for " + path + " is " + modTime)
+ if (modTime < prevModTime) {
+ logDebug("Mod time less than last mod time")
+ return false // If the file was created before the last time it was called
+ } else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) {
+ logDebug("Mod time equal to last mod time, but file considered already")
+ return false // If the file was created exactly as lastModTime but not reported yet
+ } else if (modTime > currentTime) {
+ logDebug("Mod time more than valid time")
+ return false // If the file was created after the time this function call requires
+ }
+ if (modTime > latestModTime) {
+ latestModTime = modTime
+ latestModTimeFiles.clear()
+ logDebug("Latest mod time updated to " + latestModTime)
+ }
+ latestModTimeFiles += path.toString
+ logDebug("Accepted " + path)
+ return true
+ }
+ }
+ }
}
private[streaming]
object FileInputDStream {
+ val MAX_ATTEMPTS = 10
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index beb20831bd..e51754977c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -25,8 +25,10 @@ import org.scalatest.BeforeAndAfter
import org.apache.commons.io.FileUtils
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import util.{Clock, ManualClock}
-import scala.util.Random
import com.google.common.io.Files
+import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.hadoop.conf.Configuration
+
/**
@@ -44,7 +46,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
after {
if (ssc != null) ssc.stop()
- FileUtils.deleteDirectory(new File(checkpointDir))
+ //FileUtils.deleteDirectory(new File(checkpointDir))
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
@@ -66,7 +68,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
val stateStreamCheckpointInterval = Seconds(1)
-
+ val fs = FileSystem.getLocal(new Configuration())
// this ensure checkpointing occurs at least once
val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2
val secondNumBatches = firstNumBatches
@@ -90,11 +92,12 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
ssc.start()
advanceTimeWithRealDelay(ssc, firstNumBatches)
logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
- assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
+ assert(!stateStream.checkpointData.checkpointFiles.isEmpty,
+ "No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.checkpointFiles.foreach {
- case (time, data) => {
- val file = new File(data.toString)
- assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist")
+ case (time, file) => {
+ assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
+ " for state stream before first failure does not exist")
}
}
@@ -102,7 +105,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// and check whether the earlier checkpoint files are deleted
val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
advanceTimeWithRealDelay(ssc, secondNumBatches)
- checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
+ checkpointFiles.foreach(file =>
+ assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
// Restart stream computation using the checkpoint file and check whether
@@ -110,19 +114,20 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
ssc = new StreamingContext(checkpointDir)
stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
- assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure")
+ assert(!stateStream.generatedRDDs.isEmpty,
+ "No restored RDDs in state stream after recovery from first failure")
// Run one batch to generate a new checkpoint file and check whether some RDD
// is present in the checkpoint data or not
ssc.start()
advanceTimeWithRealDelay(ssc, 1)
- assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
+ assert(!stateStream.checkpointData.checkpointFiles.isEmpty,
+ "No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.checkpointFiles.foreach {
- case (time, data) => {
- val file = new File(data.toString)
- assert(file.exists(),
- "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist")
+ case (time, file) => {
+ assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
+ " for state stream before seconds failure does not exist")
}
}
ssc.stop()
@@ -132,7 +137,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
ssc = new StreamingContext(checkpointDir)
stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
- assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure")
+ assert(!stateStream.generatedRDDs.isEmpty,
+ "No restored RDDs in state stream after recovery from second failure")
// Adjust manual clock time as if it is being restarted after a delay
System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
@@ -143,6 +149,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
ssc = null
}
+
// This tests whether the systm can recover from a master failure with simple
// non-stateful operations. This assumes as reliable, replayable input
// source - TestInputDStream.
@@ -191,6 +198,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
testCheckpointedOperation(input, operation, output, 7)
}
+
// This tests whether file input stream remembers what files were seen before
// the master failure and uses them again to process a large window operation.
// It also tests whether batches, whose processing was incomplete due to the