aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-13 12:17:45 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-13 12:17:45 -0800
commit39addd380363c0371e935fae50983fe87158c1ac (patch)
tree0b428024e5ed5faac707ccb5b5573b0a2400d3b2 /streaming
parentfd90daf850a922fe33c3638b18304d827953e2cb (diff)
downloadspark-39addd380363c0371e935fae50983fe87158c1ac.tar.gz
spark-39addd380363c0371e935fae50983fe87158c1ac.tar.bz2
spark-39addd380363c0371e935fae50983fe87158c1ac.zip
Changed scheduler and file input stream to fix bugs in the driver fault tolerance. Added MasterFailureTest to rigorously test master fault tolerance with file input stream.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala23
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala49
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala92
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala59
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala375
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala30
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java21
-rw-r--r--streaming/src/test/resources/log4j.properties7
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala107
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala304
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala29
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala12
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala2
18 files changed, 693 insertions, 452 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 0eb6aad187..0c1b667c0a 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -292,7 +292,7 @@ abstract class DStream[T: ClassManifest] (
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
- * (eg. ForEachDStream).
+ * to generate their own jobs.
*/
protected[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
@@ -308,19 +308,18 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Dereference RDDs that are older than rememberDuration.
+ * Clear metadata that are older than `rememberDuration` of this DStream.
+ * This is an internal method that should not be called directly. This default
+ * implementation clears the old generated RDDs. Subclasses of DStream may override
+ * this to clear their own metadata along with the generated RDDs.
*/
- protected[streaming] def forgetOldMetadata(time: Time) {
+ protected[streaming] def clearOldMetadata(time: Time) {
var numForgotten = 0
- generatedRDDs.keys.foreach(t => {
- if (t <= (time - rememberDuration)) {
- generatedRDDs.remove(t)
- numForgotten += 1
- logInfo("Forgot RDD of time " + t + " from " + this)
- }
- })
- logInfo("Forgot " + numForgotten + " RDDs from " + this)
- dependencies.foreach(_.forgetOldMetadata(time))
+ val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
+ generatedRDDs --= oldRDDs.keys
+ logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " +
+ (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
+ dependencies.foreach(_.clearOldMetadata(time))
}
/* Adds metadata to the Stream while it is running.
diff --git a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
index a375980b84..6b0fade7c6 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
@@ -87,7 +87,7 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
}
override def toString() = {
- "[\n" + checkpointFiles.size + "\n" + checkpointFiles.mkString("\n") + "\n]"
+ "[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]"
}
}
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index 7aa9d20004..22d9e24f05 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -11,17 +11,20 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
- private[streaming] var zeroTime: Time = null
- private[streaming] var batchDuration: Duration = null
- private[streaming] var rememberDuration: Duration = null
- private[streaming] var checkpointInProgress = false
+ var rememberDuration: Duration = null
+ var checkpointInProgress = false
- private[streaming] def start(time: Time) {
+ var zeroTime: Time = null
+ var startTime: Time = null
+ var batchDuration: Duration = null
+
+ def start(time: Time) {
this.synchronized {
if (zeroTime != null) {
throw new Exception("DStream graph computation already started")
}
zeroTime = time
+ startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validate)
@@ -29,19 +32,23 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
}
- private[streaming] def stop() {
+ def restart(time: Time) {
+ this.synchronized { startTime = time }
+ }
+
+ def stop() {
this.synchronized {
inputStreams.par.foreach(_.stop())
}
}
- private[streaming] def setContext(ssc: StreamingContext) {
+ def setContext(ssc: StreamingContext) {
this.synchronized {
outputStreams.foreach(_.setContext(ssc))
}
}
- private[streaming] def setBatchDuration(duration: Duration) {
+ def setBatchDuration(duration: Duration) {
this.synchronized {
if (batchDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
@@ -51,61 +58,61 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
batchDuration = duration
}
- private[streaming] def remember(duration: Duration) {
+ def remember(duration: Duration) {
this.synchronized {
if (rememberDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
". cannot set it again.")
}
+ rememberDuration = duration
}
- rememberDuration = duration
}
- private[streaming] def addInputStream(inputStream: InputDStream[_]) {
+ def addInputStream(inputStream: InputDStream[_]) {
this.synchronized {
inputStream.setGraph(this)
inputStreams += inputStream
}
}
- private[streaming] def addOutputStream(outputStream: DStream[_]) {
+ def addOutputStream(outputStream: DStream[_]) {
this.synchronized {
outputStream.setGraph(this)
outputStreams += outputStream
}
}
- private[streaming] def getInputStreams() = this.synchronized { inputStreams.toArray }
+ def getInputStreams() = this.synchronized { inputStreams.toArray }
- private[streaming] def getOutputStreams() = this.synchronized { outputStreams.toArray }
+ def getOutputStreams() = this.synchronized { outputStreams.toArray }
- private[streaming] def generateRDDs(time: Time): Seq[Job] = {
+ def generateRDDs(time: Time): Seq[Job] = {
this.synchronized {
logInfo("Generating RDDs for time " + time)
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
}
}
- private[streaming] def forgetOldRDDs(time: Time) {
+ def clearOldMetadata(time: Time) {
this.synchronized {
- logInfo("Forgetting old RDDs for time " + time)
- outputStreams.foreach(_.forgetOldMetadata(time))
+ logInfo("Clearing old metadata for time " + time)
+ outputStreams.foreach(_.clearOldMetadata(time))
}
}
- private[streaming] def updateCheckpointData(time: Time) {
+ def updateCheckpointData(time: Time) {
this.synchronized {
outputStreams.foreach(_.updateCheckpointData(time))
}
}
- private[streaming] def restoreCheckpointData() {
+ def restoreCheckpointData() {
this.synchronized {
outputStreams.foreach(_.restoreCheckpointData())
}
}
- private[streaming] def validate() {
+ def validate() {
this.synchronized {
assert(batchDuration != null, "Batch duration has not been set")
//assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low")
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 8b18c7bc6a..649494ff4a 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -38,13 +38,19 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
logInfo("Added " + job + " to queue")
}
+ def stop() {
+ jobExecutor.shutdown()
+ }
+
private def clearJob(job: Job) {
jobs.synchronized {
- val jobsOfTime = jobs.get(job.time)
+ val time = job.time
+ val jobsOfTime = jobs.get(time)
if (jobsOfTime.isDefined) {
jobsOfTime.get -= job
if (jobsOfTime.get.isEmpty) {
- jobs -= job.time
+ ssc.scheduler.clearOldMetadata(time)
+ jobs -= time
}
} else {
throw new Exception("Job finished for time " + job.time +
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 23a0f0974d..57d494da83 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -9,11 +9,8 @@ class Scheduler(ssc: StreamingContext) extends Logging {
initLogging()
- val graph = ssc.graph
-
val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
-
val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
new CheckpointWriter(ssc.checkpointDir)
} else {
@@ -24,53 +21,80 @@ class Scheduler(ssc: StreamingContext) extends Logging {
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => generateRDDs(new Time(longTime)))
+ val graph = ssc.graph
- def start() {
- // If context was started from checkpoint, then restart timer such that
- // this timer's triggers occur at the same time as the original timer.
- // Otherwise just start the timer from scratch, and initialize graph based
- // on this first trigger time of the timer.
+ def start() = synchronized {
if (ssc.isCheckpointPresent) {
- // If manual clock is being used for testing, then
- // either set the manual clock to the last checkpointed time,
- // or if the property is defined set it to that time
- if (clock.isInstanceOf[ManualClock]) {
- val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
- val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
- clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
- }
- // Reschedule the batches that were received but not processed before failure
- //ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time))
- val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
- println(pendingTimes.mkString(", "))
- pendingTimes.foreach(time =>
- graph.generateRDDs(time).foreach(jobManager.runJob))
- // Restart the timer
- timer.restart(graph.zeroTime.milliseconds)
- logInfo("Scheduler's timer restarted")
+ restart()
} else {
- val firstTime = new Time(timer.start())
- graph.start(firstTime - ssc.graph.batchDuration)
- logInfo("Scheduler's timer started")
+ startFirstTime()
}
logInfo("Scheduler started")
}
- def stop() {
+ def stop() = synchronized {
timer.stop()
- graph.stop()
+ jobManager.stop()
+ ssc.graph.stop()
logInfo("Scheduler stopped")
}
-
- private def generateRDDs(time: Time) {
+
+ private def startFirstTime() {
+ val startTime = new Time(timer.getStartTime())
+ graph.start(startTime - graph.batchDuration)
+ timer.start(startTime.milliseconds)
+ logInfo("Scheduler's timer started at " + startTime)
+ }
+
+ private def restart() {
+
+ // If manual clock is being used for testing, then
+ // either set the manual clock to the last checkpointed time,
+ // or if the property is defined set it to that time
+ if (clock.isInstanceOf[ManualClock]) {
+ val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
+ val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
+ clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
+ }
+
+ val batchDuration = ssc.graph.batchDuration
+
+ // Batches when the master was down, that is,
+ // between the checkpoint and current restart time
+ val checkpointTime = ssc.initialCheckpoint.checkpointTime
+ val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
+ val downTimes = checkpointTime.until(restartTime, batchDuration)
+ logInfo("Batches during down time: " + downTimes.mkString(", "))
+
+ // Batches that were unprocessed before failure
+ val pendingTimes = ssc.initialCheckpoint.pendingTimes
+ logInfo("Batches pending processing: " + pendingTimes.mkString(", "))
+ // Reschedule jobs for these times
+ val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
+ logInfo("Batches to reschedule: " + timesToReschedule.mkString(", "))
+ timesToReschedule.foreach(time =>
+ graph.generateRDDs(time).foreach(jobManager.runJob)
+ )
+
+ // Restart the timer
+ timer.start(restartTime.milliseconds)
+ logInfo("Scheduler's timer restarted")
+ }
+
+ /** Generates the RDDs, clears old metadata and does checkpoint for the given time */
+ def generateRDDs(time: Time) {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
graph.generateRDDs(time).foreach(jobManager.runJob)
- graph.forgetOldRDDs(time)
doCheckpoint(time)
}
- private def doCheckpoint(time: Time) {
+
+ def clearOldMetadata(time: Time) {
+ ssc.graph.clearOldMetadata(time)
+ }
+
+ def doCheckpoint(time: Time) {
if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
val startTime = System.currentTimeMillis()
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 8a6c9a5cb5..8201e84a20 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -37,6 +37,16 @@ case class Time(private val millis: Long) {
def max(that: Time): Time = if (this > that) this else that
+ def until(that: Time, interval: Duration): Seq[Time] = {
+ assert(that > this, "Cannot create sequence as " + that + " not more than " + this)
+ assert(
+ (that - this).isMultipleOf(interval),
+ "Cannot create sequence as gap between " + that + " and " +
+ this + " is not multiple of " + interval
+ )
+ (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))
+ }
+
override def toString: String = (millis.toString + " ms")
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index 10ccb4318d..41b9bd9461 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -21,19 +21,21 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
+ // Latest file mod time seen till any point of time
private val lastModTimeFiles = new HashSet[String]()
private var lastModTime = 0L
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
- @transient private var files = new HashMap[Time, Array[String]]
+ @transient private[streaming] var files = new HashMap[Time, Array[String]]
override def start() {
if (newFilesOnly) {
- lastModTime = System.currentTimeMillis()
+ lastModTime = graph.zeroTime.milliseconds
} else {
lastModTime = 0
}
+ logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
}
override def stop() { }
@@ -43,38 +45,50 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
* a union RDD out of them. Note that this maintains the list of files that were processed
* in the latest modification time in the previous call to this method. This is because the
* modification time returned by the FileStatus API seems to return times only at the
- * granularity of seconds. Hence, new files may have the same modification time as the
- * latest modification time in the previous call to this method and the list of files
- * maintained is used to filter the one that have been processed.
+ * granularity of seconds. And new files may have the same modification time as the
+ * latest modification time in the previous call to this method yet was not reported in
+ * the previous call.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
+ assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime)
+
// Create the filter for selecting new files
val newFilter = new PathFilter() {
+ // Latest file mod time seen in this round of fetching files and its corresponding files
var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]()
def accept(path: Path): Boolean = {
- if (!filter(path)) {
+ if (!filter(path)) { // Reject file if it does not satisfy filter
+ logDebug("Rejected by filter " + path)
return false
- } else {
+ } else { // Accept file only if
val modTime = fs.getFileStatus(path).getModificationTime()
- if (modTime < lastModTime){
- return false
+ logDebug("Mod time for " + path + " is " + modTime)
+ if (modTime < lastModTime) {
+ logDebug("Mod time less than last mod time")
+ return false // If the file was created before the last time it was called
} else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
- return false
+ logDebug("Mod time equal to last mod time, but file considered already")
+ return false // If the file was created exactly as lastModTime but not reported yet
+ } else if (modTime > validTime.milliseconds) {
+ logDebug("Mod time more than valid time")
+ return false // If the file was created after the time this function call requires
}
if (modTime > latestModTime) {
latestModTime = modTime
latestModTimeFiles.clear()
+ logDebug("Latest mod time updated to " + latestModTime)
}
latestModTimeFiles += path.toString
+ logDebug("Accepted " + path)
return true
}
}
}
-
+ logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString)
- logInfo("New files: " + newFiles.mkString(", "))
+ logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
if (newFiles.length > 0) {
// Update the modification time and the files processed for that modification time
if (lastModTime != newFilter.latestModTime) {
@@ -82,17 +96,21 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
lastModTimeFiles.clear()
}
lastModTimeFiles ++= newFilter.latestModTimeFiles
+ logDebug("Last mod time updated to " + lastModTime)
}
files += ((validTime, newFiles))
Some(filesToRDD(newFiles))
}
- /** Forget the old time-to-files mappings along with old RDDs */
- protected[streaming] override def forgetOldMetadata(time: Time) {
- super.forgetOldMetadata(time)
- val filesToBeRemoved = files.filter(_._1 <= (time - rememberDuration))
- files --= filesToBeRemoved.keys
- logInfo("Forgot " + filesToBeRemoved.size + " files from " + this)
+ /** Clear the old time-to-files mappings along with old RDDs */
+ protected[streaming] override def clearOldMetadata(time: Time) {
+ super.clearOldMetadata(time)
+ val oldFiles = files.filter(_._1 <= (time - rememberDuration))
+ files --= oldFiles.keys
+ logInfo("Cleared " + oldFiles.size + " old files that were older than " +
+ (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
+ logDebug("Cleared files are:\n" +
+ oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
/** Generate one RDD from an array of files */
@@ -148,6 +166,11 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
}
}
+
+ override def toString() = {
+ "[\n" + hadoopFiles.size + " file sets\n" +
+ hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
+ }
}
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 8c322dd698..ecc75ec913 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -46,8 +46,15 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
def stop() {}
override def compute(validTime: Time): Option[RDD[T]] = {
- val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
- Some(new BlockRDD[T](ssc.sc, blockIds))
+ // If this is called for any time before the start time of the context,
+ // then this returns an empty RDD. This may happen when recovering from a
+ // master failure forces
+ if (validTime >= graph.startTime) {
+ val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
+ Some(new BlockRDD[T](ssc.sc, blockIds))
+ } else {
+ Some(new BlockRDD[T](ssc.sc, Array[String]()))
+ }
}
}
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
new file mode 100644
index 0000000000..3ffe4b64d0
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -0,0 +1,375 @@
+package spark.streaming.util
+
+import spark.{Logging, RDD}
+import spark.streaming._
+import spark.streaming.dstream.ForEachDStream
+import StreamingContext._
+
+import scala.util.Random
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+
+import java.io.{File, ObjectInputStream, IOException}
+import java.util.UUID
+
+import com.google.common.io.Files
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.{FileUtil, FileSystem, Path}
+import org.apache.hadoop.conf.Configuration
+
+
+private[streaming]
+object MasterFailureTest extends Logging {
+ initLogging()
+
+ @volatile var killed = false
+ @volatile var killCount = 0
+
+ def main(args: Array[String]) {
+ if (args.size < 2) {
+ println(
+ "Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]")
+ System.exit(1)
+ }
+ val directory = args(0)
+ val numBatches = args(1).toInt
+ val batchDuration = if (args.size > 2) Milliseconds(args(2).toInt) else Seconds(1)
+
+ println("\n\n========================= MAP TEST =========================\n\n")
+ testMap(directory, numBatches, batchDuration)
+
+ println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n")
+ testUpdateStateByKey(directory, numBatches, batchDuration)
+ }
+
+ def testMap(directory: String, numBatches: Int, batchDuration: Duration) {
+ // Input: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ...
+ val input = (1 to numBatches).map(_.toString).toSeq
+ // Expected output: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ...
+ val expectedOutput = (1 to numBatches)
+
+ val operation = (st: DStream[String]) => st.map(_.toInt)
+
+ // Run streaming operation with multiple master failures
+ val output = testOperation(directory, batchDuration, input, operation, expectedOutput)
+
+ logInfo("Expected output, size = " + expectedOutput.size)
+ logInfo(expectedOutput.mkString("[", ",", "]"))
+ logInfo("Output, size = " + output.size)
+ logInfo(output.mkString("[", ",", "]"))
+
+ // Verify whether all the values of the expected output is present
+ // in the output
+ assert(output.distinct.toSet == expectedOutput.toSet)
+ }
+
+
+ def testUpdateStateByKey(directory: String, numBatches: Int, batchDuration: Duration) {
+ // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
+ val input = (1 to numBatches).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
+ // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
+ val expectedOutput = (1L to numBatches).map(i => (1L to i).reduce(_ + _)).map(j => ("a", j))
+
+ val operation = (st: DStream[String]) => {
+ val updateFunc = (values: Seq[Long], state: Option[Long]) => {
+ Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L))
+ }
+ st.flatMap(_.split(" "))
+ .map(x => (x, 1L))
+ .updateStateByKey[Long](updateFunc)
+ .checkpoint(batchDuration * 5)
+ }
+
+ // Run streaming operation with multiple master failures
+ val output = testOperation(directory, batchDuration, input, operation, expectedOutput)
+
+ logInfo("Expected output, size = " + expectedOutput.size + "\n" + expectedOutput)
+ logInfo("Output, size = " + output.size + "\n" + output)
+
+ // Verify whether all the values in the output are among the expected output values
+ output.foreach(o =>
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+
+ // Verify whether the last expected output value has been generated, there by
+ // confirming that none of the inputs have been missed
+ assert(output.last == expectedOutput.last)
+ }
+
+ /**
+ * Tests stream operation with multiple master failures, and verifies whether the
+ * final set of output values is as expected or not.
+ */
+ def testOperation[T: ClassManifest](
+ directory: String,
+ batchDuration: Duration,
+ input: Seq[String],
+ operation: DStream[String] => DStream[T],
+ expectedOutput: Seq[T]
+ ): Seq[T] = {
+
+ // Just making sure that the expected output does not have duplicates
+ assert(expectedOutput.distinct.toSet == expectedOutput.toSet)
+
+ // Setup the stream computation with the given operation
+ val (ssc, checkpointDir, testDir) = setupStreams(directory, batchDuration, operation)
+
+ // Start generating files in the a different thread
+ val fileGeneratingThread = new FileGeneratingThread(input, testDir, batchDuration.milliseconds)
+ fileGeneratingThread.start()
+
+ // Run the streams and repeatedly kill it until the last expected output
+ // has been generated, or until it has run for twice the expected time
+ val lastExpectedOutput = expectedOutput.last
+ val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2
+ val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun)
+
+ // Delete directories
+ fileGeneratingThread.join()
+ val fs = checkpointDir.getFileSystem(new Configuration())
+ fs.delete(checkpointDir, true)
+ fs.delete(testDir, true)
+ logInfo("Finished test after " + killCount + " failures")
+ mergedOutput
+ }
+
+ /**
+ * Sets up the stream computation with the given operation, directory (local or HDFS),
+ * and batch duration. Returns the streaming context and the directory to which
+ * files should be written for testing.
+ */
+ private def setupStreams[T: ClassManifest](
+ directory: String,
+ batchDuration: Duration,
+ operation: DStream[String] => DStream[T]
+ ): (StreamingContext, Path, Path) = {
+ // Reset all state
+ reset()
+
+ // Create the directories for this test
+ val uuid = UUID.randomUUID().toString
+ val rootDir = new Path(directory, uuid)
+ val fs = rootDir.getFileSystem(new Configuration())
+ val checkpointDir = new Path(rootDir, "checkpoint")
+ val testDir = new Path(rootDir, "test")
+ fs.mkdirs(checkpointDir)
+ fs.mkdirs(testDir)
+
+ // Setup the streaming computation with the given operation
+ System.clearProperty("spark.driver.port")
+ var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration)
+ ssc.checkpoint(checkpointDir.toString)
+ val inputStream = ssc.textFileStream(testDir.toString)
+ val operatedStream = operation(inputStream)
+ val outputStream = new TestOutputStream(operatedStream)
+ ssc.registerOutputStream(outputStream)
+ (ssc, checkpointDir, testDir)
+ }
+
+
+ /**
+ * Repeatedly starts and kills the streaming context until timed out or
+ * the last expected output is generated. Finally, return
+ */
+ private def runStreams[T: ClassManifest](
+ ssc_ : StreamingContext,
+ lastExpectedOutput: T,
+ maxTimeToRun: Long
+ ): Seq[T] = {
+
+ var ssc = ssc_
+ var totalTimeRan = 0L
+ var isLastOutputGenerated = false
+ var isTimedOut = false
+ val mergedOutput = new ArrayBuffer[T]()
+ val checkpointDir = ssc.checkpointDir
+ var batchDuration = ssc.graph.batchDuration
+
+ while(!isLastOutputGenerated && !isTimedOut) {
+ // Get the output buffer
+ val outputBuffer = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[T]].output
+ def output = outputBuffer.flatMap(x => x)
+
+ // Start the thread to kill the streaming after some time
+ killed = false
+ val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10)
+ killingThread.start()
+
+ var timeRan = 0L
+ try {
+ // Start the streaming computation and let it run while ...
+ // (i) StreamingContext has not been shut down yet
+ // (ii) The last expected output has not been generated yet
+ // (iii) Its not timed out yet
+ System.clearProperty("spark.streaming.clock")
+ System.clearProperty("spark.driver.port")
+ ssc.start()
+ val startTime = System.currentTimeMillis()
+ while (!killed && !isLastOutputGenerated && !isTimedOut) {
+ Thread.sleep(100)
+ timeRan = System.currentTimeMillis() - startTime
+ isLastOutputGenerated = (!output.isEmpty && output.last == lastExpectedOutput)
+ isTimedOut = (timeRan + totalTimeRan > maxTimeToRun)
+ }
+ } catch {
+ case e: Exception => logError("Error running streaming context", e)
+ }
+ if (killingThread.isAlive) killingThread.interrupt()
+ ssc.stop()
+
+ logInfo("Has been killed = " + killed)
+ logInfo("Is last output generated = " + isLastOutputGenerated)
+ logInfo("Is timed out = " + isTimedOut)
+
+ // Verify whether the output of each batch has only one element or no element
+ // and then merge the new output with all the earlier output
+ mergedOutput ++= output
+ totalTimeRan += timeRan
+ logInfo("New output = " + output)
+ logInfo("Merged output = " + mergedOutput)
+ logInfo("Time ran = " + timeRan)
+ logInfo("Total time ran = " + totalTimeRan)
+
+ if (!isLastOutputGenerated && !isTimedOut) {
+ val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 10)
+ logInfo(
+ "\n-------------------------------------------\n" +
+ " Restarting stream computation in " + sleepTime + " ms " +
+ "\n-------------------------------------------\n"
+ )
+ Thread.sleep(sleepTime)
+ // Recreate the streaming context from checkpoint
+ ssc = new StreamingContext(checkpointDir)
+ }
+ }
+ mergedOutput
+ }
+
+ /**
+ * Verifies the output value are the same as expected. Since failures can lead to
+ * a batch being processed twice, a batches output may appear more than once
+ * consecutively. To avoid getting confused with those, we eliminate consecutive
+ * duplicate batch outputs of values from the `output`. As a result, the
+ * expected output should not have consecutive batches with the same values as output.
+ */
+ private def verifyOutput[T: ClassManifest](output: Seq[T], expectedOutput: Seq[T]) {
+ // Verify whether expected outputs do not consecutive batches with same output
+ for (i <- 0 until expectedOutput.size - 1) {
+ assert(expectedOutput(i) != expectedOutput(i+1),
+ "Expected output has consecutive duplicate sequence of values")
+ }
+
+ // Log the output
+ println("Expected output, size = " + expectedOutput.size)
+ println(expectedOutput.mkString("[", ",", "]"))
+ println("Output, size = " + output.size)
+ println(output.mkString("[", ",", "]"))
+
+ // Match the output with the expected output
+ output.foreach(o =>
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+ }
+
+ /** Resets counter to prepare for the test */
+ private def reset() {
+ killed = false
+ killCount = 0
+ }
+}
+
+/**
+ * This is a output stream just for testing. All the output is collected into a
+ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ */
+private[streaming]
+class TestOutputStream[T: ClassManifest](
+ parent: DStream[T],
+ val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
+ ) extends ForEachDStream[T](
+ parent,
+ (rdd: RDD[T], t: Time) => {
+ val collected = rdd.collect()
+ output += collected
+ println(t + ": " + collected.mkString("[", ",", "]"))
+ }
+ ) {
+
+ // This is to clear the output buffer every it is read from a checkpoint
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream) {
+ ois.defaultReadObject()
+ output.clear()
+ }
+}
+
+
+/**
+ * Thread to kill streaming context after a random period of time.
+ */
+private[streaming]
+class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
+ initLogging()
+
+ override def run() {
+ try {
+ // If it is the first killing, then allow the first checkpoint to be created
+ var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 1000
+ val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime)
+ logInfo("Kill wait time = " + killWaitTime)
+ Thread.sleep(killWaitTime)
+ logInfo(
+ "\n---------------------------------------\n" +
+ "Killing streaming context after " + killWaitTime + " ms" +
+ "\n---------------------------------------\n"
+ )
+ if (ssc != null) {
+ ssc.stop()
+ MasterFailureTest.killed = true
+ MasterFailureTest.killCount += 1
+ }
+ logInfo("Killing thread finished normally")
+ } catch {
+ case ie: InterruptedException => logInfo("Killing thread interrupted")
+ case e: Exception => logWarning("Exception in killing thread", e)
+ }
+
+ }
+}
+
+
+/**
+ * Thread to generate input files periodically with the desired text.
+ */
+private[streaming]
+class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
+ extends Thread with Logging {
+ initLogging()
+
+ override def run() {
+ val localTestDir = Files.createTempDir()
+ val fs = testDir.getFileSystem(new Configuration())
+ try {
+ Thread.sleep(5000) // To make sure that all the streaming context has been set up
+ for (i <- 0 until input.size) {
+ // Write the data to a local file and then move it to the target test directory
+ val localFile = new File(localTestDir, (i+1).toString)
+ val hadoopFile = new Path(testDir, (i+1).toString)
+ FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
+ //fs.moveFromLocalFile(new Path(localFile.toString), new Path(testDir, i.toString))
+ fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+ logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
+ Thread.sleep(interval)
+ localFile.delete()
+ }
+ logInfo("File generating thread finished normally")
+ } catch {
+ case ie: InterruptedException => logInfo("File generating thread interrupted")
+ case e: Exception => logWarning("File generating in killing thread", e)
+ } finally {
+ fs.close()
+ }
+ }
+}
+
+
diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
index db715cc295..8e10276deb 100644
--- a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
@@ -3,9 +3,9 @@ package spark.streaming.util
private[streaming]
class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
- val minPollTime = 25L
+ private val minPollTime = 25L
- val pollTime = {
+ private val pollTime = {
if (period / 10.0 > minPollTime) {
(period / 10.0).toLong
} else {
@@ -13,11 +13,20 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
}
}
- val thread = new Thread() {
+ private val thread = new Thread() {
override def run() { loop }
}
- var nextTime = 0L
+ private var nextTime = 0L
+
+ def getStartTime(): Long = {
+ (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
+ }
+
+ def getRestartTime(originalStartTime: Long): Long = {
+ val gap = clock.currentTime - originalStartTime
+ (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
+ }
def start(startTime: Long): Long = {
nextTime = startTime
@@ -26,21 +35,14 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
}
def start(): Long = {
- val startTime = (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
- start(startTime)
+ start(getStartTime())
}
- def restart(originalStartTime: Long): Long = {
- val gap = clock.currentTime - originalStartTime
- val newStartTime = (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
- start(newStartTime)
- }
-
- def stop() {
+ def stop() {
thread.interrupt()
}
- def loop() {
+ private def loop() {
try {
while (true) {
clock.waitTillTime(nextTime)
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index fbe4af4597..783a393a8f 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -33,7 +33,8 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint", new Duration(1000));
}
@@ -45,7 +46,7 @@ public class JavaAPISuite implements Serializable {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port");
}
- /*
+
@Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
@@ -434,7 +435,7 @@ public class JavaAPISuite implements Serializable {
assertOrderInvariantEquals(expected, result);
}
- */
+
/*
* Performs an order-invariant comparison of lists representing two RDD streams. This allows
* us to account for ordering variation within individual RDD's which occurs during windowing.
@@ -450,7 +451,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, actual);
}
- /*
+
// PairDStream Functions
@Test
public void testPairFilter() {
@@ -897,7 +898,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
- */
+
@Test
public void testCheckpointMasterRecovery() throws InterruptedException {
List<List<String>> inputData = Arrays.asList(
@@ -964,7 +965,7 @@ public class JavaAPISuite implements Serializable {
assertOrderInvariantEquals(expected, result1);
}
*/
- /*
+
// Input stream tests. These mostly just test that we can instantiate a given InputStream with
// Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
// InputStream functionality is deferred to the existing Scala tests.
@@ -972,9 +973,9 @@ public class JavaAPISuite implements Serializable {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
- JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets);
- JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets,
+ JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
+ JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets);
+ JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets,
StorageLevel.MEMORY_AND_DISK());
}
@@ -1026,5 +1027,5 @@ public class JavaAPISuite implements Serializable {
public void testFileStream() {
JavaPairDStream<String, String> foo =
ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
- }*/
+ }
}
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index edfa1243fa..5652596e1e 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,6 +1,7 @@
# Set everything to be logged to the file streaming/target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
+log4j.rootCategory=WARN, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.ConsoleAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
@@ -8,4 +9,6 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}:
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.spark.streaming=INFO
+log4j.logger.spark.streaming.dstream.FileInputDStream=DEBUG
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index c031949dd1..12388b8887 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -6,6 +6,8 @@ import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
override def framework() = "BasicOperationsSuite"
after {
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 7126af62d9..c89c4a8d43 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -1,5 +1,6 @@
package spark.streaming
+import dstream.FileInputDStream
import spark.streaming.StreamingContext._
import java.io.File
import runtime.RichInt
@@ -10,8 +11,16 @@ import util.{Clock, ManualClock}
import scala.util.Random
import com.google.common.io.Files
+
+/**
+ * This test suites tests the checkpointing functionality of DStreams -
+ * the checkpointing of a DStream's RDDs as well as the checkpointing of
+ * the whole DStream graph.
+ */
class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
before {
FileUtils.deleteDirectory(new File(checkpointDir))
}
@@ -64,7 +73,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a time such that at least one RDD in the stream should have been checkpointed,
// then check whether some RDD has been checkpointed or not
ssc.start()
- runStreamsWithRealDelay(ssc, firstNumBatches)
+ advanceTimeWithRealDelay(ssc, firstNumBatches)
logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.checkpointFiles.foreach {
@@ -77,7 +86,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a further time such that previous checkpoint files in the stream would be deleted
// and check whether the earlier checkpoint files are deleted
val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
- runStreamsWithRealDelay(ssc, secondNumBatches)
+ advanceTimeWithRealDelay(ssc, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
@@ -92,7 +101,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run one batch to generate a new checkpoint file and check whether some RDD
// is present in the checkpoint data or not
ssc.start()
- runStreamsWithRealDelay(ssc, 1)
+ advanceTimeWithRealDelay(ssc, 1)
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
@@ -113,7 +122,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Adjust manual clock time as if it is being restarted after a delay
System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
ssc.start()
- runStreamsWithRealDelay(ssc, 4)
+ advanceTimeWithRealDelay(ssc, 4)
ssc.stop()
System.clearProperty("spark.streaming.manualClock.jump")
ssc = null
@@ -168,74 +177,95 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
// This tests whether file input stream remembers what files were seen before
- // the master failure and uses them again to process a large window operatoin.
+ // the master failure and uses them again to process a large window operation.
// It also tests whether batches, whose processing was incomplete due to the
// failure, are re-processed or not.
test("recovery with file input stream") {
+ // Disable manual clock as FileInputDStream does not work with manual clock
+ val clockProperty = System.getProperty("spark.streaming.clock")
+ System.clearProperty("spark.streaming.clock")
+
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
- var ssc = new StreamingContext(master, framework, batchDuration)
+ var ssc = new StreamingContext(master, framework, Seconds(1))
ssc.checkpoint(checkpointDir, checkpointInterval)
val fileStream = ssc.textFileStream(testDir.toString)
// Making value 3 take large time to process, to ensure that the master
// shuts down in the middle of processing the 3rd batch
val mappedStream = fileStream.map(s => {
val i = s.toInt
- if (i == 3) Thread.sleep(1000)
+ if (i == 3) Thread.sleep(2000)
i
})
+
// Reducing over a large window to ensure that recovery from master failure
// requires reprocessing of all the files seen before the failure
- val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
+ val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
val outputBuffer = new ArrayBuffer[Seq[Int]]
var outputStream = new TestOutputStream(reducedStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
// Create files and advance manual clock to process them
- var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
// wait to make sure that the file is written such that it gets shown in the file listings
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- // wait to make sure that FileInputDStream picks up this file only and not any other file
- Thread.sleep(500)
+ Thread.sleep(1000)
}
logInfo("Output = " + outputStream.output.mkString(","))
assert(outputStream.output.size > 0, "No files processed before restart")
ssc.stop()
+ // Verify whether files created have been recorded correctly or not
+ var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+ def recordedFiles = fileInputDStream.files.values.flatMap(x => x)
+ assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+
// Create files while the master is down
for (i <- Seq(4, 5, 6)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Thread.sleep(1000)
}
- // Restart stream computation from checkpoint and create more files to see whether
- // they are being processed
+ // Recover context from checkpoint file and verify whether the files that were
+ // recorded before failure were saved and successfully recovered
logInfo("*********** RESTARTING ************")
ssc = new StreamingContext(checkpointDir)
+ fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+ assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+
+ // Restart stream computation
ssc.start()
- clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
for (i <- Seq(7, 8, 9)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(500)
+ Thread.sleep(1000)
}
Thread.sleep(1000)
- logInfo("Output = " + outputStream.output.mkString(","))
+ logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()
+ // Verify whether files created while the driver was down have been recorded or not
+ assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
+
+ // Verify whether new files created after recover have been recorded or not
+ assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
+
// Append the new output to the old buffer
outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
outputBuffer ++= outputStream.output
- // Verify whether data received by Spark Streaming was as expected
- val expectedOutput = Seq(1, 3, 6, 28, 36, 45)
+ val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
logInfo("--------------------------------")
logInfo("output, size = " + outputBuffer.size)
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
@@ -244,11 +274,17 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
logInfo("--------------------------------")
// Verify whether all the elements received are as expected
- assert(outputBuffer.size === expectedOutput.size)
- for (i <- 0 until outputBuffer.size) {
- assert(outputBuffer(i).size === 1)
- assert(outputBuffer(i).head === expectedOutput(i))
- }
+ val output = outputBuffer.flatMap(x => x)
+ assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed
+ output.foreach(o => // To ensure all the inputs are correctly added cumulatively
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+ // To ensure that all the inputs were received correctly
+ assert(expectedOutput.last === output.last)
+
+ // Enable manual clock back again for other tests
+ if (clockProperty != null)
+ System.setProperty("spark.streaming.clock", clockProperty)
}
@@ -278,7 +314,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Do the computation for initial number of batches, create checkpoint file and quit
ssc = setupStreams[U, V](input, operation)
- val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
+ ssc.start()
+ val output = advanceTimeWithRealDelay[V](ssc, initialNumBatches)
+ ssc.stop()
verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
Thread.sleep(1000)
@@ -289,17 +327,20 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
"\n-------------------------------------------\n"
)
ssc = new StreamingContext(checkpointDir)
- val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs)
+ System.clearProperty("spark.driver.port")
+ ssc.start()
+ val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
// the first element will be re-processed data of the last batch before restart
verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true)
+ ssc.stop()
ssc = null
}
/**
* Advances the manual clock on the streaming scheduler by given number of batches.
- * It also wait for the expected amount of time for each batch.
+ * It also waits for the expected amount of time for each batch.
*/
- def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) {
+ def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
for (i <- 1 to numBatches.toInt) {
@@ -308,6 +349,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
logInfo("Manual clock after advancing = " + clock.time)
Thread.sleep(batchDuration.milliseconds)
- }
+ val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
+ outputStream.output
+ }
} \ No newline at end of file
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index efaa098d2e..a5fa7ab92d 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -1,14 +1,15 @@
package spark.streaming
-import org.scalatest.{FunSuite, BeforeAndAfter}
-import org.apache.commons.io.FileUtils
-import java.io.File
-import scala.runtime.RichInt
-import scala.util.Random
-import spark.streaming.StreamingContext._
-import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import spark.Logging
+import spark.streaming.util.MasterFailureTest
+import StreamingContext._
+
+import org.scalatest.{FunSuite, BeforeAndAfter}
import com.google.common.io.Files
+import java.io.File
+import org.apache.commons.io.FileUtils
+import collection.mutable.ArrayBuffer
+
/**
* This testsuite tests master failures at random times while the stream is running using
@@ -16,295 +17,24 @@ import com.google.common.io.Files
*/
class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
- var testDir: File = null
- var checkpointDir: File = null
- val batchDuration = Milliseconds(500)
+ var directory = "FailureSuite"
+ val numBatches = 30
+ val batchDuration = Milliseconds(1000)
before {
- testDir = Files.createTempDir()
- checkpointDir = Files.createTempDir()
+ FileUtils.deleteDirectory(new File(directory))
}
after {
- FailureSuite.reset()
- FileUtils.deleteDirectory(checkpointDir)
- FileUtils.deleteDirectory(testDir)
-
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port")
- }
-
- test("multiple failures with updateStateByKey") {
- val n = 30
- // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
- val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
- // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
- val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j))
-
- val operation = (st: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
- }
- st.flatMap(_.split(" "))
- .map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
- .checkpoint(Seconds(2))
- .map(t => (t._1, t._2.self))
- }
-
- testOperationWithMultipleFailures(input, operation, expectedOutput)
- }
-
- test("multiple failures with reduceByKeyAndWindow") {
- val n = 30
- val w = 100
- assert(w > n, "Window should be much larger than the number of input sets in this test")
- // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
- val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
- // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
- val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j))
-
- val operation = (st: DStream[String]) => {
- st.flatMap(_.split(" "))
- .map(x => (x, 1))
- .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
- .checkpoint(Seconds(2))
- }
-
- testOperationWithMultipleFailures(input, operation, expectedOutput)
+ FileUtils.deleteDirectory(new File(directory))
}
-
- /**
- * Tests stream operation with multiple master failures, and verifies whether the
- * final set of output values is as expected or not. Checking the final value is
- * proof that no intermediate data was lost due to master failures.
- */
- def testOperationWithMultipleFailures(
- input: Seq[String],
- operation: DStream[String] => DStream[(String, Int)],
- expectedOutput: Seq[(String, Int)]
- ) {
- var ssc = setupStreamsWithFileStream(operation)
-
- val mergedOutput = new ArrayBuffer[(String, Int)]()
- val lastExpectedOutput = expectedOutput.last
-
- val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2
- var totalTimeRan = 0L
-
- // Start generating files in the a different thread
- val fileGeneratingThread = new FileGeneratingThread(input, testDir.getPath, batchDuration.milliseconds)
- fileGeneratingThread.start()
-
- // Repeatedly start and kill the streaming context until timed out or
- // all expected output is generated
- while(!FailureSuite.outputGenerated && !FailureSuite.timedOut) {
-
- // Start the thread to kill the streaming after some time
- FailureSuite.failed = false
- val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10)
- killingThread.start()
-
- // Run the streams with real clock until last expected output is seen or timed out
- val (output, timeRan) = runStreamsWithRealClock(ssc, lastExpectedOutput, maxTimeToRun - totalTimeRan)
- if (killingThread.isAlive) killingThread.interrupt()
-
- // Merge output and time ran and see whether already timed out or not
- mergedOutput ++= output
- totalTimeRan += timeRan
- logInfo("New output = " + output)
- logInfo("Merged output = " + mergedOutput)
- logInfo("Total time spent = " + totalTimeRan)
- if (totalTimeRan > maxTimeToRun) {
- FailureSuite.timedOut = true
- }
-
- if (!FailureSuite.outputGenerated && !FailureSuite.timedOut) {
- val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 2)
- logInfo(
- "\n-------------------------------------------\n" +
- " Restarting stream computation in " + sleepTime + " ms " +
- "\n-------------------------------------------\n"
- )
- Thread.sleep(sleepTime)
- }
-
- // Recreate the streaming context from checkpoint
- ssc = new StreamingContext(checkpointDir.getPath)
- }
- ssc.stop()
- ssc = null
- logInfo("Finished test after " + FailureSuite.failureCount + " failures")
-
- if (FailureSuite.timedOut) {
- logWarning("Timed out with run time of "+ maxTimeToRun + " ms for " +
- expectedOutput.size + " batches of " + batchDuration)
- }
-
- // Verify whether the output is as expected
- verifyOutput(mergedOutput, expectedOutput)
- if (fileGeneratingThread.isAlive) fileGeneratingThread.interrupt()
+ test("multiple failures with map") {
+ MasterFailureTest.testMap(directory, numBatches, batchDuration)
}
- /** Sets up the stream operations with file input stream */
- def setupStreamsWithFileStream(
- operation: DStream[String] => DStream[(String, Int)]
- ): StreamingContext = {
- val ssc = new StreamingContext("local[4]", "FailureSuite", batchDuration)
- ssc.checkpoint(checkpointDir.getPath)
- val inputStream = ssc.textFileStream(testDir.getPath)
- val operatedStream = operation(inputStream)
- val outputBuffer = new ArrayBuffer[Seq[(String, Int)]] with SynchronizedBuffer[Seq[(String, Int)]]
- val outputStream = new TestOutputStream(operatedStream, outputBuffer)
- ssc.registerOutputStream(outputStream)
- ssc
- }
-
- /**
- * Runs the streams set up in `ssc` on real clock.
- */
- def runStreamsWithRealClock(
- ssc: StreamingContext,
- lastExpectedOutput: (String, Int),
- timeout: Long
- ): (Seq[(String, Int)], Long) = {
-
- System.clearProperty("spark.streaming.clock")
-
- // Get the output buffer
- val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[(String, Int)]]
- val output = outputStream.output
- val startTime = System.currentTimeMillis()
-
- // Functions to detect various conditions
- def hasFailed = FailureSuite.failed
- def isLastOutputGenerated = !output.flatMap(x => x).isEmpty && output(output.lastIndexWhere(!_.isEmpty)).head == lastExpectedOutput
- def isTimedOut = System.currentTimeMillis() - startTime > timeout
-
- // Start the streaming computation and let it run while ...
- // (i) StreamingContext has not been shut down yet
- // (ii) The last expected output has not been generated yet
- // (iii) Its not timed out yet
- try {
- ssc.start()
- while (!hasFailed && !isLastOutputGenerated && !isTimedOut) {
- Thread.sleep(100)
- }
- logInfo("Has failed = " + hasFailed)
- logInfo("Is last output generated = " + isLastOutputGenerated)
- logInfo("Is timed out = " + isTimedOut)
- } catch {
- case e: Exception => logInfo("Exception while running streams: " + e)
- } finally {
- ssc.stop()
- }
-
- // Verify whether the output of each batch has only one element
- assert(output.forall(_.size <= 1), "output of each batch should have only one element")
-
- // Set appropriate flags is timed out or output has been generated
- if (isTimedOut) FailureSuite.timedOut = true
- if (isLastOutputGenerated) FailureSuite.outputGenerated = true
-
- val timeTaken = System.currentTimeMillis() - startTime
- logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms")
- (output.flatMap(_.headOption), timeTaken)
- }
-
- /**
- * Verifies the output value are the same as expected. Since failures can lead to
- * a batch being processed twice, a batches output may appear more than once
- * consecutively. To avoid getting confused with those, we eliminate consecutive
- * duplicate batch outputs of values from the `output`. As a result, the
- * expected output should not have consecutive batches with the same values as output.
- */
- def verifyOutput(output: Seq[(String, Int)], expectedOutput: Seq[(String, Int)]) {
- // Verify whether expected outputs do not consecutive batches with same output
- for (i <- 0 until expectedOutput.size - 1) {
- assert(expectedOutput(i) != expectedOutput(i+1),
- "Expected output has consecutive duplicate sequence of values")
- }
-
- // Match the output with the expected output
- logInfo(
- "\n-------------------------------------------\n" +
- " Verifying output " +
- "\n-------------------------------------------\n"
- )
- logInfo("Expected output, size = " + expectedOutput.size)
- logInfo(expectedOutput.mkString("[", ",", "]"))
- logInfo("Output, size = " + output.size)
- logInfo(output.mkString("[", ",", "]"))
- output.foreach(o =>
- assert(expectedOutput.contains(o), "Expected value " + o + " not found")
- )
- }
-}
-
-object FailureSuite {
- var failed = false
- var outputGenerated = false
- var timedOut = false
- var failureCount = 0
-
- def reset() {
- failed = false
- outputGenerated = false
- timedOut = false
- failureCount = 0
- }
-}
-
-/**
- * Thread to kill streaming context after some time.
- */
-class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
- initLogging()
-
- override def run() {
- try {
- var minKillWaitTime = if (FailureSuite.failureCount == 0) 5000 else 1000 // to allow the first checkpoint
- val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime)
- logInfo("Kill wait time = " + killWaitTime)
- Thread.sleep(killWaitTime)
- logInfo(
- "\n---------------------------------------\n" +
- "Killing streaming context after " + killWaitTime + " ms" +
- "\n---------------------------------------\n"
- )
- if (ssc != null) {
- ssc.stop()
- FailureSuite.failed = true
- FailureSuite.failureCount += 1
- }
- logInfo("Killing thread exited")
- } catch {
- case ie: InterruptedException => logInfo("Killing thread interrupted")
- case e: Exception => logWarning("Exception in killing thread", e)
- }
- }
-}
-
-/**
- * Thread to generate input files periodically with the desired text
- */
-class FileGeneratingThread(input: Seq[String], testDir: String, interval: Long)
- extends Thread with Logging {
- initLogging()
-
- override def run() {
- try {
- Thread.sleep(5000) // To make sure that all the streaming context has been set up
- for (i <- 0 until input.size) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
- Thread.sleep(interval)
- }
- logInfo("File generating thread exited")
- } catch {
- case ie: InterruptedException => logInfo("File generating thread interrupted")
- case e: Exception => logWarning("File generating in killing thread", e)
- }
+ test("multiple failures with updateStateByKey") {
+ MasterFailureTest.testUpdateStateByKey(directory, numBatches, batchDuration)
}
}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 0eb9c7b81e..7c1c2e1040 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -133,26 +133,29 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("file input stream") {
+ // Disable manual clock as FileInputDStream does not work with manual clock
+ System.clearProperty("spark.streaming.clock")
+
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
val ssc = new StreamingContext(master, framework, batchDuration)
- val filestream = ssc.textFileStream(testDir.toString)
+ val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
- val outputStream = new TestOutputStream(filestream, outputBuffer)
+ val outputStream = new TestOutputStream(fileStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
// Create files in the temporary directory so that Spark Streaming can read data from it
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
Thread.sleep(1000)
for (i <- 0 until input.size) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- //Thread.sleep(100)
+ val file = new File(testDir, i.toString)
+ FileUtils.writeStringToFile(file, input(i).toString + "\n")
+ logInfo("Created file " + file)
+ Thread.sleep(batchDuration.milliseconds)
+ Thread.sleep(1000)
}
val startTime = System.currentTimeMillis()
Thread.sleep(1000)
@@ -171,16 +174,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
- assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
- assert(output(i).size === 1)
- assert(output(i).head.toString === expectedOutput(i))
- }
+ assert(output.toList === expectedOutput.toList)
+
FileUtils.deleteDirectory(testDir)
+
+ // Enable manual clock back again for other tests
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
}
}
-
+/** This is server to test the network input stream */
class TestServer(port: Int) extends Logging {
val queue = new ArrayBlockingQueue[String](100)
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index c2733831b2..2cc31d6137 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -63,20 +63,28 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
*/
trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
+ // Name of the framework for Spark context
def framework = "TestSuiteBase"
+ // Master for Spark context
def master = "local[2]"
+ // Batch duration
def batchDuration = Seconds(1)
+ // Directory where the checkpoint data will be saved
def checkpointDir = "checkpoint"
+ // Duration after which the graph is checkpointed
def checkpointInterval = batchDuration
+ // Number of partitions of the input parallel collections created for testing
def numInputPartitions = 2
+ // Maximum time to wait before the test times out
def maxWaitTimeMillis = 10000
+ // Whether to actually wait in real time before changing manual clock
def actuallyWait = false
/**
@@ -140,9 +148,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
numBatches: Int,
numExpectedOutput: Int
): Seq[Seq[V]] = {
-
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
-
assert(numBatches > 0, "Number of batches to run stream computation is zero")
assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)
@@ -186,7 +191,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
} finally {
ssc.stop()
}
-
output
}
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index cd9608df53..1080790147 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -5,6 +5,8 @@ import collection.mutable.ArrayBuffer
class WindowOperationsSuite extends TestSuiteBase {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
override def framework = "WindowOperationsSuite"
override def maxWaitTimeMillis = 20000