aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-22 18:49:36 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-22 18:49:36 -0800
commite7b62cbfbfdb8fda880548bce4249672c6a0a851 (patch)
tree2ec7da067fbcb5dcae1a3be696a6023a47f2f19e /streaming
parentd91ec6f8ea3ab6683af92eddfedd9ea6c0710f00 (diff)
downloadspark-e7b62cbfbfdb8fda880548bce4249672c6a0a851.tar.gz
spark-e7b62cbfbfdb8fda880548bce4249672c6a0a851.tar.bz2
spark-e7b62cbfbfdb8fda880548bce4249672c6a0a851.zip
Updated CheckpointWriter and FileInputDStream to be robust against failed FileSystem objects. Refactored JobGenerator to use actor so that all updating of DStream's metadata is single threaded.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala36
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala14
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala63
3 files changed, 78 insertions, 35 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 05760bfcd4..957c227996 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -21,7 +21,7 @@ import java.io._
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.Logging
@@ -55,31 +55,29 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
/**
- * Convenience class to speed up the writing of graph checkpoint to file
+ * Convenience class to handle the writing of graph checkpoint to file
*/
private[streaming]
class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends Logging {
val file = new Path(checkpointDir, "graph")
- // The file to which we actually write - and then "move" to file.
- private val writeFile = new Path(file.getParent, file.getName + ".next")
- private val bakFile = new Path(file.getParent, file.getName + ".bk")
-
- private var stopped = false
-
- var fs = file.getFileSystem(hadoopConf)
- val maxAttempts = 3
+ val MAX_ATTEMPTS = 3
val executor = Executors.newFixedThreadPool(1)
+ val compressionCodec = CompressionCodec.createCodec()
+ // The file to which we actually write - and then "move" to file
+ val writeFile = new Path(file.getParent, file.getName + ".next")
+ // The file to which existing checkpoint is backed up (i.e. "moved")
+ val bakFile = new Path(file.getParent, file.getName + ".bk")
- private val compressionCodec = CompressionCodec.createCodec()
+ private var stopped = false
+ private var fs_ : FileSystem = _
- // Removed code which validates whether there is only one CheckpointWriter per path 'file' since
+ // Removed code which validates whether there is only one CheckpointWriter per path 'file' since
// I did not notice any errors - reintroduce it ?
-
class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
def run() {
var attempts = 0
val startTime = System.currentTimeMillis()
- while (attempts < maxAttempts) {
+ while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
try {
logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
@@ -101,6 +99,7 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends
} catch {
case ioe: IOException =>
logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
+ reset()
}
}
logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'")
@@ -135,6 +134,15 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends
val endTime = System.currentTimeMillis()
logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.")
}
+
+ private def fs = synchronized {
+ if (fs_ == null) fs_ = file.getFileSystem(hadoopConf)
+ fs_
+ }
+
+ private def reset() = synchronized {
+ fs_ = null
+ }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 79513548d2..a618a709a7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -40,6 +40,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
+ // Max attempts to try if listing files fail
+ val MAX_ATTEMPTS = 10
+
// Latest file mod time seen till any point of time
private val prevModTimeFiles = new HashSet[String]()
private var prevModTime = 0L
@@ -108,14 +111,16 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
logDebug("Trying to get new files for time " + currentTime)
var attempts = 0
- while (attempts < FileInputDStream.MAX_ATTEMPTS) {
+ while (attempts < MAX_ATTEMPTS) {
attempts += 1
try {
val filter = new CustomPathFilter(currentTime)
val newFiles = fs.listStatus(path, filter)
return (newFiles.map(_.getPath.toString), filter.latestModTime, filter.latestModTimeFiles.toSeq)
} catch {
- case ioe: IOException => logWarning("Attempt " + attempts + " to get new files failed", ioe)
+ case ioe: IOException =>
+ logWarning("Attempt " + attempts + " to get new files failed", ioe)
+ reset()
}
}
(Seq(), -1, Seq())
@@ -139,6 +144,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
fs_
}
+ private def reset() {
+ fs_ = null
+ }
+
@throws(classOf[IOException])
private def readObject(ois: ObjectInputStream) {
logDebug(this.getClass().getSimpleName + ".readObject used")
@@ -224,6 +233,5 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
private[streaming]
object FileInputDStream {
- val MAX_ATTEMPTS = 10
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index cf3fc82a2a..2552d51654 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -21,6 +21,12 @@ import org.apache.spark.SparkEnv
import org.apache.spark.Logging
import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
+import akka.actor.{Props, Actor}
+
+sealed trait JobGeneratorEvent
+case class GenerateJobs(time: Time) extends JobGeneratorEvent
+case class ClearOldMetadata(time: Time) extends JobGeneratorEvent
+case class DoCheckpoint(time: Time) extends JobGeneratorEvent
/**
* This class generates jobs from DStreams as well as drives checkpointing and cleaning
@@ -32,43 +38,67 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
initLogging()
val ssc = jobScheduler.ssc
- val clockClass = System.getProperty(
- "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
- val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
- val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
- longTime => generateJobs(new Time(longTime)))
val graph = ssc.graph
+ val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor {
+ def receive = {
+ case event: JobGeneratorEvent =>
+ logDebug("Got event of type " + event.getClass.getName)
+ processEvent(event)
+ }
+ }))
+ val clock = {
+ val clockClass = System.getProperty(
+ "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
+ Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+ }
+ val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
+ longTime => eventProcessorActor ! GenerateJobs(new Time(longTime)))
lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
} else {
null
}
- var latestTime: Time = null
-
def start() = synchronized {
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
- logInfo("JobGenerator started")
}
- def stop() = synchronized {
+ def stop() {
timer.stop()
if (checkpointWriter != null) checkpointWriter.stop()
ssc.graph.stop()
logInfo("JobGenerator stopped")
}
+ /**
+ * On batch completion, clear old metadata and checkpoint computation.
+ */
+ private[scheduler] def onBatchCompletion(time: Time) {
+ eventProcessorActor ! ClearOldMetadata(time)
+ }
+
+ /** Processes all events */
+ private def processEvent(event: JobGeneratorEvent) {
+ event match {
+ case GenerateJobs(time) => generateJobs(time)
+ case ClearOldMetadata(time) => clearOldMetadata(time)
+ case DoCheckpoint(time) => doCheckpoint(time)
+ }
+ }
+
+ /** Starts the generator for the first time */
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
- logInfo("JobGenerator's timer started at " + startTime)
+ logInfo("JobGenerator started at " + startTime)
}
+ /** Restarts the generator based on the information in checkpoint */
private def restart() {
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
@@ -100,7 +130,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// Restart the timer
timer.start(restartTime.milliseconds)
- logInfo("JobGenerator's timer restarted at " + restartTime)
+ logInfo("JobGenerator restarted at " + restartTime)
}
/** Generate jobs and perform checkpoint for the given `time`. */
@@ -108,16 +138,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
jobScheduler.runJobs(time, graph.generateJobs(time))
- latestTime = time
- doCheckpoint(time)
+ eventProcessorActor ! DoCheckpoint(time)
}
- /**
- * On batch completion, clear old metadata and checkpoint computation.
- */
- private[streaming] def onBatchCompletion(time: Time) {
+ /** Clear DStream metadata for the given `time`. */
+ private def clearOldMetadata(time: Time) {
ssc.graph.clearOldMetadata(time)
- doCheckpoint(time)
+ eventProcessorActor ! DoCheckpoint(time)
}
/** Perform checkpoint for the give `time`. */