aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala36
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala14
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala63
3 files changed, 78 insertions, 35 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 05760bfcd4..957c227996 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -21,7 +21,7 @@ import java.io._
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.Logging
@@ -55,31 +55,29 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
/**
- * Convenience class to speed up the writing of graph checkpoint to file
+ * Convenience class to handle the writing of graph checkpoint to file
*/
private[streaming]
class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends Logging {
val file = new Path(checkpointDir, "graph")
- // The file to which we actually write - and then "move" to file.
- private val writeFile = new Path(file.getParent, file.getName + ".next")
- private val bakFile = new Path(file.getParent, file.getName + ".bk")
-
- private var stopped = false
-
- var fs = file.getFileSystem(hadoopConf)
- val maxAttempts = 3
+ val MAX_ATTEMPTS = 3
val executor = Executors.newFixedThreadPool(1)
+ val compressionCodec = CompressionCodec.createCodec()
+ // The file to which we actually write - and then "move" to file
+ val writeFile = new Path(file.getParent, file.getName + ".next")
+ // The file to which existing checkpoint is backed up (i.e. "moved")
+ val bakFile = new Path(file.getParent, file.getName + ".bk")
- private val compressionCodec = CompressionCodec.createCodec()
+ private var stopped = false
+ private var fs_ : FileSystem = _
- // Removed code which validates whether there is only one CheckpointWriter per path 'file' since
+ // Removed code which validates whether there is only one CheckpointWriter per path 'file' since
// I did not notice any errors - reintroduce it ?
-
class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
def run() {
var attempts = 0
val startTime = System.currentTimeMillis()
- while (attempts < maxAttempts) {
+ while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
try {
logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
@@ -101,6 +99,7 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends
} catch {
case ioe: IOException =>
logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
+ reset()
}
}
logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'")
@@ -135,6 +134,15 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends
val endTime = System.currentTimeMillis()
logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.")
}
+
+ private def fs = synchronized {
+ if (fs_ == null) fs_ = file.getFileSystem(hadoopConf)
+ fs_
+ }
+
+ private def reset() = synchronized {
+ fs_ = null
+ }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 79513548d2..a618a709a7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -40,6 +40,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
+ // Max attempts to try if listing files fail
+ val MAX_ATTEMPTS = 10
+
// Latest file mod time seen till any point of time
private val prevModTimeFiles = new HashSet[String]()
private var prevModTime = 0L
@@ -108,14 +111,16 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
logDebug("Trying to get new files for time " + currentTime)
var attempts = 0
- while (attempts < FileInputDStream.MAX_ATTEMPTS) {
+ while (attempts < MAX_ATTEMPTS) {
attempts += 1
try {
val filter = new CustomPathFilter(currentTime)
val newFiles = fs.listStatus(path, filter)
return (newFiles.map(_.getPath.toString), filter.latestModTime, filter.latestModTimeFiles.toSeq)
} catch {
- case ioe: IOException => logWarning("Attempt " + attempts + " to get new files failed", ioe)
+ case ioe: IOException =>
+ logWarning("Attempt " + attempts + " to get new files failed", ioe)
+ reset()
}
}
(Seq(), -1, Seq())
@@ -139,6 +144,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
fs_
}
+ private def reset() {
+ fs_ = null
+ }
+
@throws(classOf[IOException])
private def readObject(ois: ObjectInputStream) {
logDebug(this.getClass().getSimpleName + ".readObject used")
@@ -224,6 +233,5 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
private[streaming]
object FileInputDStream {
- val MAX_ATTEMPTS = 10
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index cf3fc82a2a..2552d51654 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -21,6 +21,12 @@ import org.apache.spark.SparkEnv
import org.apache.spark.Logging
import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
+import akka.actor.{Props, Actor}
+
+sealed trait JobGeneratorEvent
+case class GenerateJobs(time: Time) extends JobGeneratorEvent
+case class ClearOldMetadata(time: Time) extends JobGeneratorEvent
+case class DoCheckpoint(time: Time) extends JobGeneratorEvent
/**
* This class generates jobs from DStreams as well as drives checkpointing and cleaning
@@ -32,43 +38,67 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
initLogging()
val ssc = jobScheduler.ssc
- val clockClass = System.getProperty(
- "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
- val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
- val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
- longTime => generateJobs(new Time(longTime)))
val graph = ssc.graph
+ val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor {
+ def receive = {
+ case event: JobGeneratorEvent =>
+ logDebug("Got event of type " + event.getClass.getName)
+ processEvent(event)
+ }
+ }))
+ val clock = {
+ val clockClass = System.getProperty(
+ "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
+ Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+ }
+ val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
+ longTime => eventProcessorActor ! GenerateJobs(new Time(longTime)))
lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
} else {
null
}
- var latestTime: Time = null
-
def start() = synchronized {
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
- logInfo("JobGenerator started")
}
- def stop() = synchronized {
+ def stop() {
timer.stop()
if (checkpointWriter != null) checkpointWriter.stop()
ssc.graph.stop()
logInfo("JobGenerator stopped")
}
+ /**
+ * On batch completion, clear old metadata and checkpoint computation.
+ */
+ private[scheduler] def onBatchCompletion(time: Time) {
+ eventProcessorActor ! ClearOldMetadata(time)
+ }
+
+ /** Processes all events */
+ private def processEvent(event: JobGeneratorEvent) {
+ event match {
+ case GenerateJobs(time) => generateJobs(time)
+ case ClearOldMetadata(time) => clearOldMetadata(time)
+ case DoCheckpoint(time) => doCheckpoint(time)
+ }
+ }
+
+ /** Starts the generator for the first time */
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
- logInfo("JobGenerator's timer started at " + startTime)
+ logInfo("JobGenerator started at " + startTime)
}
+ /** Restarts the generator based on the information in checkpoint */
private def restart() {
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
@@ -100,7 +130,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// Restart the timer
timer.start(restartTime.milliseconds)
- logInfo("JobGenerator's timer restarted at " + restartTime)
+ logInfo("JobGenerator restarted at " + restartTime)
}
/** Generate jobs and perform checkpoint for the given `time`. */
@@ -108,16 +138,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
jobScheduler.runJobs(time, graph.generateJobs(time))
- latestTime = time
- doCheckpoint(time)
+ eventProcessorActor ! DoCheckpoint(time)
}
- /**
- * On batch completion, clear old metadata and checkpoint computation.
- */
- private[streaming] def onBatchCompletion(time: Time) {
+ /** Clear DStream metadata for the given `time`. */
+ private def clearOldMetadata(time: Time) {
ssc.graph.clearOldMetadata(time)
- doCheckpoint(time)
+ eventProcessorActor ! DoCheckpoint(time)
}
/** Perform checkpoint for the give `time`. */