aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2013-12-31 18:23:14 -0500
committerMatei Zaharia <matei@databricks.com>2013-12-31 18:23:14 -0500
commitba9338f104ccc71d4f342a3f96624a9b36895f48 (patch)
tree5a5b2bf6b34565a594a68fff59b9dd2af234be7f /streaming
parent0fa5809768cf60ec62b4277f04e23a44dc1582e2 (diff)
parent63b411dd8664c27ac55586d8345733afad80961f (diff)
downloadspark-ba9338f104ccc71d4f342a3f96624a9b36895f48.tar.gz
spark-ba9338f104ccc71d4f342a3f96624a9b36895f48.tar.bz2
spark-ba9338f104ccc71d4f342a3f96624a9b36895f48.zip
Merge remote-tracking branch 'apache/master' into conf2
Conflicts: core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala53
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala31
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala153
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala66
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala44
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala2
7 files changed, 221 insertions, 141 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 35e23c1355..af443279a1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -21,12 +21,13 @@ import java.io._
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.deploy.SparkHadoopUtil
private[streaming]
@@ -54,36 +55,36 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
/**
- * Convenience class to speed up the writing of graph checkpoint to file
+ * Convenience class to handle the writing of graph checkpoint to file
*/
private[streaming]
-class CheckpointWriter(conf: SparkConf, checkpointDir: String) extends Logging {
+class CheckpointWriter(conf: SparkConf, checkpointDir: String, hadoopConf: Configuration)
+ extends Logging
+{
val file = new Path(checkpointDir, "graph")
- // The file to which we actually write - and then "move" to file.
- private val writeFile = new Path(file.getParent, file.getName + ".next")
- private val bakFile = new Path(file.getParent, file.getName + ".bk")
-
- private var stopped = false
-
- val hadoopConf = new Configuration()
- var fs = file.getFileSystem(hadoopConf)
- val maxAttempts = 3
+ val MAX_ATTEMPTS = 3
val executor = Executors.newFixedThreadPool(1)
+ val compressionCodec = CompressionCodec.createCodec()
+ // The file to which we actually write - and then "move" to file
+ val writeFile = new Path(file.getParent, file.getName + ".next")
+ // The file to which existing checkpoint is backed up (i.e. "moved")
+ val bakFile = new Path(file.getParent, file.getName + ".bk")
- private val compressionCodec = CompressionCodec.createCodec(conf)
+ private var stopped = false
+ private var fs_ : FileSystem = _
// Removed code which validates whether there is only one CheckpointWriter per path 'file' since
// I did not notice any errors - reintroduce it ?
-
class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
def run() {
var attempts = 0
val startTime = System.currentTimeMillis()
- while (attempts < maxAttempts) {
+ while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
try {
logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
- // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast.
+ // This is inherently thread unsafe, so alleviating it by writing to '.new' and
+ // then moving it to the final file
val fos = fs.create(writeFile)
fos.write(bytes)
fos.close()
@@ -101,6 +102,7 @@ class CheckpointWriter(conf: SparkConf, checkpointDir: String) extends Logging {
} catch {
case ioe: IOException =>
logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe)
+ reset()
}
}
logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'")
@@ -133,7 +135,17 @@ class CheckpointWriter(conf: SparkConf, checkpointDir: String) extends Logging {
val startTime = System.currentTimeMillis()
val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
val endTime = System.currentTimeMillis()
- logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.")
+ logInfo("CheckpointWriter executor terminated ? " + terminated +
+ ", waited for " + (endTime - startTime) + " ms.")
+ }
+
+ private def fs = synchronized {
+ if (fs_ == null) fs_ = file.getFileSystem(hadoopConf)
+ fs_
+ }
+
+ private def reset() = synchronized {
+ fs_ = null
}
}
@@ -143,8 +155,8 @@ object CheckpointReader extends Logging {
def read(conf: SparkConf, path: String): Checkpoint = {
val fs = new Path(path).getFileSystem(new Configuration())
- val attempts = Seq(
- new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
+ val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"),
+ new Path(path), new Path(path + ".bk"))
val compressionCodec = CompressionCodec.createCodec(conf)
@@ -159,7 +171,8 @@ object CheckpointReader extends Logging {
// loader to find and load classes. This is a well know Java issue and has popped up
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
val zis = compressionCodec.compressedInputStream(fis)
- val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader)
+ val ois = new ObjectInputStreamWithLoader(zis,
+ Thread.currentThread().getContextClassLoader)
val cp = ois.readObject.asInstanceOf[Checkpoint]
ois.close()
fs.close()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 339f6e64a2..0566704c94 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -20,17 +20,19 @@ package org.apache.spark.streaming
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.zeromq.Subscribe
+import akka.util.ByteString
import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.StreamingListener
import org.apache.spark._
import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.receivers.ActorReceiver
import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
import org.apache.spark.streaming.receivers.ZeroMQReceiver
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.MetadataCleaner
import org.apache.spark.streaming.receivers.ActorReceiver
+import org.apache.spark.streaming.scheduler.{JobScheduler, NetworkInputTracker}
import scala.collection.mutable.Queue
import scala.collection.Map
@@ -38,17 +40,15 @@ import scala.reflect.ClassTag
import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
-import java.util.UUID
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
+
import twitter4j.Status
import twitter4j.auth.Authorization
-import org.apache.spark.streaming.scheduler._
-import akka.util.ByteString
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -153,7 +153,7 @@ class StreamingContext private (
protected[streaming] var checkpointDir: String = {
if (isCheckpointPresent) {
- sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true)
+ sc.setCheckpointDir(cp_.checkpointDir)
cp_.checkpointDir
} else {
null
@@ -188,8 +188,12 @@ class StreamingContext private (
*/
def checkpoint(directory: String) {
if (directory != null) {
- sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
- checkpointDir = directory
+ val path = new Path(directory)
+ val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
+ fs.mkdirs(path)
+ val fullPath = fs.getFileStatus(path).getPath().toString
+ sc.setCheckpointDir(fullPath)
+ checkpointDir = fullPath
} else {
checkpointDir = null
}
@@ -380,7 +384,8 @@ class StreamingContext private (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
- * File names starting with . are ignored.
+ * Files must be written to the monitored directory by "moving" them from another
+ * location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
@@ -399,6 +404,8 @@ class StreamingContext private (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
+ * Files must be written to the monitored directory by "moving" them from another
+ * location within the same file system.
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
@@ -419,7 +426,9 @@ class StreamingContext private (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
- * as Text and input format as TextInputFormat). File names starting with . are ignored.
+ * as Text and input format as TextInputFormat). Files must be written to the
+ * monitored directory by "moving" them from another location within the same
+ * file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): DStream[String] = {
@@ -623,8 +632,4 @@ object StreamingContext {
prefix + "-" + time.milliseconds + "." + suffix
}
}
-
- protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = {
- new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
- }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 29f673d8ae..b79173c6aa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -265,9 +265,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
- * as Text and input format as TextInputFormat). File names starting with . are ignored.
+ * as Text and input format as TextInputFormat). Files must be written to the
+ * monitored directory by "moving" them from another location within the same
+ * file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): JavaDStream[String] = {
@@ -309,9 +311,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
- * File names starting with . are ignored.
+ * Files must be written to the monitored directory by "moving" them from another
+ * location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
@@ -340,7 +343,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
- * Creates a input stream from a Flume source.
+ * Create a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 39e25239bf..fb9eda8996 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -17,18 +17,17 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.UnionRDD
-import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
-
+import java.io.{ObjectInputStream, IOException}
+import scala.collection.mutable.{HashSet, HashMap}
+import scala.reflect.ClassTag
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.UnionRDD
+import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
-import scala.collection.mutable.{HashSet, HashMap}
-import scala.reflect.ClassTag
-
-import java.io.{ObjectInputStream, IOException}
private[streaming]
class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
@@ -41,8 +40,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
// Latest file mod time seen till any point of time
- private val lastModTimeFiles = new HashSet[String]()
- private var lastModTime = 0L
+ private val prevModTimeFiles = new HashSet[String]()
+ private var prevModTime = 0L
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
@@ -50,11 +49,11 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
override def start() {
if (newFilesOnly) {
- lastModTime = graph.zeroTime.milliseconds
+ prevModTime = graph.zeroTime.milliseconds
} else {
- lastModTime = 0
+ prevModTime = 0
}
- logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
+ logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly)
}
override def stop() { }
@@ -69,55 +68,22 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
* the previous call.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
- assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime)
+ assert(validTime.milliseconds >= prevModTime,
+ "Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]")
- // Create the filter for selecting new files
- val newFilter = new PathFilter() {
- // Latest file mod time seen in this round of fetching files and its corresponding files
- var latestModTime = 0L
- val latestModTimeFiles = new HashSet[String]()
-
- def accept(path: Path): Boolean = {
- if (!filter(path)) { // Reject file if it does not satisfy filter
- logDebug("Rejected by filter " + path)
- return false
- } else { // Accept file only if
- val modTime = fs.getFileStatus(path).getModificationTime()
- logDebug("Mod time for " + path + " is " + modTime)
- if (modTime < lastModTime) {
- logDebug("Mod time less than last mod time")
- return false // If the file was created before the last time it was called
- } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
- logDebug("Mod time equal to last mod time, but file considered already")
- return false // If the file was created exactly as lastModTime but not reported yet
- } else if (modTime > validTime.milliseconds) {
- logDebug("Mod time more than valid time")
- return false // If the file was created after the time this function call requires
- }
- if (modTime > latestModTime) {
- latestModTime = modTime
- latestModTimeFiles.clear()
- logDebug("Latest mod time updated to " + latestModTime)
- }
- latestModTimeFiles += path.toString
- logDebug("Accepted " + path)
- return true
- }
- }
- }
- logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
- val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString)
+ // Find new files
+ val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
if (newFiles.length > 0) {
// Update the modification time and the files processed for that modification time
- if (lastModTime != newFilter.latestModTime) {
- lastModTime = newFilter.latestModTime
- lastModTimeFiles.clear()
+ if (prevModTime < latestModTime) {
+ prevModTime = latestModTime
+ prevModTimeFiles.clear()
}
- lastModTimeFiles ++= newFilter.latestModTimeFiles
- logDebug("Last mod time updated to " + lastModTime)
+ prevModTimeFiles ++= latestModTimeFiles
+ logDebug("Last mod time updated to " + prevModTime)
}
- files += ((validTime, newFiles))
+ files += ((validTime, newFiles.toArray))
Some(filesToRDD(newFiles))
}
@@ -132,12 +98,28 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
+ /**
+ * Find files which have modification timestamp <= current time and return a 3-tuple of
+ * (new files found, latest modification time among them, files with latest modification time)
+ */
+ private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
+ logDebug("Trying to get new files for time " + currentTime)
+ val filter = new CustomPathFilter(currentTime)
+ val newFiles = fs.listStatus(path, filter).map(_.getPath.toString)
+ (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
+ }
+
/** Generate one RDD from an array of files */
- protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
- new UnionRDD(
- context.sparkContext,
- files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
- )
+ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
+ val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
+ files.zip(fileRDDs).foreach { case (file, rdd) => {
+ if (rdd.partitions.size == 0) {
+ logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
+ "files that have been \"moved\" to the directory assigned to the file stream. " +
+ "Refer to the streaming programming guide for more details.")
+ }
+ }}
+ new UnionRDD(context.sparkContext, fileRDDs)
}
private def path: Path = {
@@ -150,6 +132,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
fs_
}
+ private def reset() {
+ fs_ = null
+ }
+
@throws(classOf[IOException])
private def readObject(ois: ObjectInputStream) {
logDebug(this.getClass().getSimpleName + ".readObject used")
@@ -191,6 +177,51 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
}
}
+
+ /**
+ * Custom PathFilter class to find new files that have modification timestamps <= current time,
+ * but have not been seen before (i.e. the file should not be in lastModTimeFiles)
+ */
+ private[streaming]
+ class CustomPathFilter(maxModTime: Long) extends PathFilter {
+ // Latest file mod time seen in this round of fetching files and its corresponding files
+ var latestModTime = 0L
+ val latestModTimeFiles = new HashSet[String]()
+
+ def accept(path: Path): Boolean = {
+ try {
+ if (!filter(path)) { // Reject file if it does not satisfy filter
+ logDebug("Rejected by filter " + path)
+ return false
+ }
+ val modTime = fs.getFileStatus(path).getModificationTime()
+ logDebug("Mod time for " + path + " is " + modTime)
+ if (modTime < prevModTime) {
+ logDebug("Mod time less than last mod time")
+ return false // If the file was created before the last time it was called
+ } else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) {
+ logDebug("Mod time equal to last mod time, but file considered already")
+ return false // If the file was created exactly as lastModTime but not reported yet
+ } else if (modTime > maxModTime) {
+ logDebug("Mod time more than ")
+ return false // If the file is too new that considering it may give errors
+ }
+ if (modTime > latestModTime) {
+ latestModTime = modTime
+ latestModTimeFiles.clear()
+ logDebug("Latest mod time updated to " + latestModTime)
+ }
+ latestModTimeFiles += path.toString
+ logDebug("Accepted " + path)
+ } catch {
+ case fnfe: java.io.FileNotFoundException =>
+ logWarning("Error finding new files", fnfe)
+ reset()
+ return false
+ }
+ return true
+ }
+ }
}
private[streaming]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index dbd08415a1..e448211732 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -17,11 +17,18 @@
package org.apache.spark.streaming.scheduler
+import akka.actor.{Props, Actor}
import org.apache.spark.SparkEnv
import org.apache.spark.Logging
import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
+/** Event classes for JobGenerator */
+private[scheduler] sealed trait JobGeneratorEvent
+private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent
+private[scheduler] case class ClearOldMetadata(time: Time) extends JobGeneratorEvent
+private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent
+
/**
* This class generates jobs from DStreams as well as drives checkpointing and cleaning
* up DStream metadata.
@@ -32,43 +39,67 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
initLogging()
val ssc = jobScheduler.ssc
- val clockClass = ssc.sc.conf.getOrElse(
- "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
- val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
- val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
- longTime => generateJobs(new Time(longTime)))
val graph = ssc.graph
+ val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor {
+ def receive = {
+ case event: JobGeneratorEvent =>
+ logDebug("Got event of type " + event.getClass.getName)
+ processEvent(event)
+ }
+ }))
+ val clock = {
+ val clockClass = ssc.sc.conf.getOrElse(
+ "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
+ val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
+ }
+ val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
+ longTime => eventProcessorActor ! GenerateJobs(new Time(longTime)))
lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
- new CheckpointWriter(ssc.conf, ssc.checkpointDir)
+ new CheckpointWriter(ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
} else {
null
}
- var latestTime: Time = null
-
def start() = synchronized {
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
- logInfo("JobGenerator started")
}
- def stop() = synchronized {
+ def stop() {
timer.stop()
if (checkpointWriter != null) checkpointWriter.stop()
ssc.graph.stop()
logInfo("JobGenerator stopped")
}
+ /**
+ * On batch completion, clear old metadata and checkpoint computation.
+ */
+ private[scheduler] def onBatchCompletion(time: Time) {
+ eventProcessorActor ! ClearOldMetadata(time)
+ }
+
+ /** Processes all events */
+ private def processEvent(event: JobGeneratorEvent) {
+ event match {
+ case GenerateJobs(time) => generateJobs(time)
+ case ClearOldMetadata(time) => clearOldMetadata(time)
+ case DoCheckpoint(time) => doCheckpoint(time)
+ }
+ }
+
+ /** Starts the generator for the first time */
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
- logInfo("JobGenerator's timer started at " + startTime)
+ logInfo("JobGenerator started at " + startTime)
}
+ /** Restarts the generator based on the information in checkpoint */
private def restart() {
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
@@ -100,7 +131,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// Restart the timer
timer.start(restartTime.milliseconds)
- logInfo("JobGenerator's timer restarted at " + restartTime)
+ logInfo("JobGenerator restarted at " + restartTime)
}
/** Generate jobs and perform checkpoint for the given `time`. */
@@ -108,16 +139,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
jobScheduler.runJobs(time, graph.generateJobs(time))
- latestTime = time
- doCheckpoint(time)
+ eventProcessorActor ! DoCheckpoint(time)
}
- /**
- * On batch completion, clear old metadata and checkpoint computation.
- */
- private[streaming] def onBatchCompletion(time: Time) {
+ /** Clear DStream metadata for the given `time`. */
+ private def clearOldMetadata(time: Time) {
ssc.graph.clearOldMetadata(time)
- doCheckpoint(time)
+ eventProcessorActor ! DoCheckpoint(time)
}
/** Perform checkpoint for the give `time`. */
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index c60a3f5390..8dc80ac2ed 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -17,24 +17,18 @@
package org.apache.spark.streaming
-import dstream.FileInputDStream
-import org.apache.spark.streaming.StreamingContext._
import java.io.File
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
-
import org.apache.commons.io.FileUtils
-import org.scalatest.BeforeAndAfter
-
import com.google.common.io.Files
-
-import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.FileInputDStream
import org.apache.spark.streaming.util.ManualClock
-
-
/**
* This test suites tests the checkpointing functionality of DStreams -
* the checkpointing of a DStream's RDDs as well as the checkpointing of
@@ -66,7 +60,7 @@ class CheckpointSuite extends TestSuiteBase {
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
val stateStreamCheckpointInterval = Seconds(1)
-
+ val fs = FileSystem.getLocal(new Configuration())
// this ensure checkpointing occurs at least once
val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2
val secondNumBatches = firstNumBatches
@@ -90,11 +84,12 @@ class CheckpointSuite extends TestSuiteBase {
ssc.start()
advanceTimeWithRealDelay(ssc, firstNumBatches)
logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
- assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
+ assert(!stateStream.checkpointData.checkpointFiles.isEmpty,
+ "No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.checkpointFiles.foreach {
- case (time, data) => {
- val file = new File(data.toString)
- assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist")
+ case (time, file) => {
+ assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
+ " for state stream before first failure does not exist")
}
}
@@ -102,7 +97,8 @@ class CheckpointSuite extends TestSuiteBase {
// and check whether the earlier checkpoint files are deleted
val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
advanceTimeWithRealDelay(ssc, secondNumBatches)
- checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
+ checkpointFiles.foreach(file =>
+ assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
// Restart stream computation using the checkpoint file and check whether
@@ -110,19 +106,20 @@ class CheckpointSuite extends TestSuiteBase {
ssc = new StreamingContext(checkpointDir)
stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
- assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure")
+ assert(!stateStream.generatedRDDs.isEmpty,
+ "No restored RDDs in state stream after recovery from first failure")
// Run one batch to generate a new checkpoint file and check whether some RDD
// is present in the checkpoint data or not
ssc.start()
advanceTimeWithRealDelay(ssc, 1)
- assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
+ assert(!stateStream.checkpointData.checkpointFiles.isEmpty,
+ "No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.checkpointFiles.foreach {
- case (time, data) => {
- val file = new File(data.toString)
- assert(file.exists(),
- "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist")
+ case (time, file) => {
+ assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
+ " for state stream before seconds failure does not exist")
}
}
ssc.stop()
@@ -132,7 +129,8 @@ class CheckpointSuite extends TestSuiteBase {
ssc = new StreamingContext(checkpointDir)
stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
- assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure")
+ assert(!stateStream.generatedRDDs.isEmpty,
+ "No restored RDDs in state stream after recovery from second failure")
// Adjust manual clock time as if it is being restarted after a delay; this is a hack because
// we modify the conf object, but it works for this one property
@@ -144,6 +142,7 @@ class CheckpointSuite extends TestSuiteBase {
ssc = null
}
+
// This tests whether the systm can recover from a master failure with simple
// non-stateful operations. This assumes as reliable, replayable input
// source - TestInputDStream.
@@ -192,6 +191,7 @@ class CheckpointSuite extends TestSuiteBase {
testCheckpointedOperation(input, operation, output, 7)
}
+
// This tests whether file input stream remembers what files were seen before
// the master failure and uses them again to process a large window operation.
// It also tests whether batches, whose processing was incomplete due to the
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 52381c10b0..5185954521 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -23,7 +23,7 @@ import akka.actor.IOManager
import akka.actor.Props
import akka.util.ByteString
-import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent}
+import org.apache.spark.streaming.dstream.{FileInputDStream, NetworkReceiver, SparkFlumeEvent}
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}