aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala23
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/DStreamGraph.scala49
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala92
-rw-r--r--streaming/src/main/scala/spark/streaming/Time.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala59
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala375
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala30
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java21
-rw-r--r--streaming/src/test/resources/log4j.properties7
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala107
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala304
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala29
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala12
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala2
18 files changed, 693 insertions, 452 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 0eb6aad187..0c1b667c0a 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -292,7 +292,7 @@ abstract class DStream[T: ClassManifest] (
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
- * (eg. ForEachDStream).
+ * to generate their own jobs.
*/
protected[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
@@ -308,19 +308,18 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Dereference RDDs that are older than rememberDuration.
+ * Clear metadata that are older than `rememberDuration` of this DStream.
+ * This is an internal method that should not be called directly. This default
+ * implementation clears the old generated RDDs. Subclasses of DStream may override
+ * this to clear their own metadata along with the generated RDDs.
*/
- protected[streaming] def forgetOldMetadata(time: Time) {
+ protected[streaming] def clearOldMetadata(time: Time) {
var numForgotten = 0
- generatedRDDs.keys.foreach(t => {
- if (t <= (time - rememberDuration)) {
- generatedRDDs.remove(t)
- numForgotten += 1
- logInfo("Forgot RDD of time " + t + " from " + this)
- }
- })
- logInfo("Forgot " + numForgotten + " RDDs from " + this)
- dependencies.foreach(_.forgetOldMetadata(time))
+ val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
+ generatedRDDs --= oldRDDs.keys
+ logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " +
+ (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
+ dependencies.foreach(_.clearOldMetadata(time))
}
/* Adds metadata to the Stream while it is running.
diff --git a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
index a375980b84..6b0fade7c6 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamCheckpointData.scala
@@ -87,7 +87,7 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
}
override def toString() = {
- "[\n" + checkpointFiles.size + "\n" + checkpointFiles.mkString("\n") + "\n]"
+ "[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]"
}
}
diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
index 7aa9d20004..22d9e24f05 100644
--- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala
@@ -11,17 +11,20 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
- private[streaming] var zeroTime: Time = null
- private[streaming] var batchDuration: Duration = null
- private[streaming] var rememberDuration: Duration = null
- private[streaming] var checkpointInProgress = false
+ var rememberDuration: Duration = null
+ var checkpointInProgress = false
- private[streaming] def start(time: Time) {
+ var zeroTime: Time = null
+ var startTime: Time = null
+ var batchDuration: Duration = null
+
+ def start(time: Time) {
this.synchronized {
if (zeroTime != null) {
throw new Exception("DStream graph computation already started")
}
zeroTime = time
+ startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validate)
@@ -29,19 +32,23 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
}
- private[streaming] def stop() {
+ def restart(time: Time) {
+ this.synchronized { startTime = time }
+ }
+
+ def stop() {
this.synchronized {
inputStreams.par.foreach(_.stop())
}
}
- private[streaming] def setContext(ssc: StreamingContext) {
+ def setContext(ssc: StreamingContext) {
this.synchronized {
outputStreams.foreach(_.setContext(ssc))
}
}
- private[streaming] def setBatchDuration(duration: Duration) {
+ def setBatchDuration(duration: Duration) {
this.synchronized {
if (batchDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
@@ -51,61 +58,61 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
batchDuration = duration
}
- private[streaming] def remember(duration: Duration) {
+ def remember(duration: Duration) {
this.synchronized {
if (rememberDuration != null) {
throw new Exception("Batch duration already set as " + batchDuration +
". cannot set it again.")
}
+ rememberDuration = duration
}
- rememberDuration = duration
}
- private[streaming] def addInputStream(inputStream: InputDStream[_]) {
+ def addInputStream(inputStream: InputDStream[_]) {
this.synchronized {
inputStream.setGraph(this)
inputStreams += inputStream
}
}
- private[streaming] def addOutputStream(outputStream: DStream[_]) {
+ def addOutputStream(outputStream: DStream[_]) {
this.synchronized {
outputStream.setGraph(this)
outputStreams += outputStream
}
}
- private[streaming] def getInputStreams() = this.synchronized { inputStreams.toArray }
+ def getInputStreams() = this.synchronized { inputStreams.toArray }
- private[streaming] def getOutputStreams() = this.synchronized { outputStreams.toArray }
+ def getOutputStreams() = this.synchronized { outputStreams.toArray }
- private[streaming] def generateRDDs(time: Time): Seq[Job] = {
+ def generateRDDs(time: Time): Seq[Job] = {
this.synchronized {
logInfo("Generating RDDs for time " + time)
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
}
}
- private[streaming] def forgetOldRDDs(time: Time) {
+ def clearOldMetadata(time: Time) {
this.synchronized {
- logInfo("Forgetting old RDDs for time " + time)
- outputStreams.foreach(_.forgetOldMetadata(time))
+ logInfo("Clearing old metadata for time " + time)
+ outputStreams.foreach(_.clearOldMetadata(time))
}
}
- private[streaming] def updateCheckpointData(time: Time) {
+ def updateCheckpointData(time: Time) {
this.synchronized {
outputStreams.foreach(_.updateCheckpointData(time))
}
}
- private[streaming] def restoreCheckpointData() {
+ def restoreCheckpointData() {
this.synchronized {
outputStreams.foreach(_.restoreCheckpointData())
}
}
- private[streaming] def validate() {
+ def validate() {
this.synchronized {
assert(batchDuration != null, "Batch duration has not been set")
//assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low")
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 8b18c7bc6a..649494ff4a 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -38,13 +38,19 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
logInfo("Added " + job + " to queue")
}
+ def stop() {
+ jobExecutor.shutdown()
+ }
+
private def clearJob(job: Job) {
jobs.synchronized {
- val jobsOfTime = jobs.get(job.time)
+ val time = job.time
+ val jobsOfTime = jobs.get(time)
if (jobsOfTime.isDefined) {
jobsOfTime.get -= job
if (jobsOfTime.get.isEmpty) {
- jobs -= job.time
+ ssc.scheduler.clearOldMetadata(time)
+ jobs -= time
}
} else {
throw new Exception("Job finished for time " + job.time +
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index 23a0f0974d..57d494da83 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -9,11 +9,8 @@ class Scheduler(ssc: StreamingContext) extends Logging {
initLogging()
- val graph = ssc.graph
-
val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
val jobManager = new JobManager(ssc, concurrentJobs)
-
val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
new CheckpointWriter(ssc.checkpointDir)
} else {
@@ -24,53 +21,80 @@ class Scheduler(ssc: StreamingContext) extends Logging {
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => generateRDDs(new Time(longTime)))
+ val graph = ssc.graph
- def start() {
- // If context was started from checkpoint, then restart timer such that
- // this timer's triggers occur at the same time as the original timer.
- // Otherwise just start the timer from scratch, and initialize graph based
- // on this first trigger time of the timer.
+ def start() = synchronized {
if (ssc.isCheckpointPresent) {
- // If manual clock is being used for testing, then
- // either set the manual clock to the last checkpointed time,
- // or if the property is defined set it to that time
- if (clock.isInstanceOf[ManualClock]) {
- val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
- val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
- clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
- }
- // Reschedule the batches that were received but not processed before failure
- //ssc.initialCheckpoint.pendingTimes.foreach(time => generateRDDs(time))
- val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
- println(pendingTimes.mkString(", "))
- pendingTimes.foreach(time =>
- graph.generateRDDs(time).foreach(jobManager.runJob))
- // Restart the timer
- timer.restart(graph.zeroTime.milliseconds)
- logInfo("Scheduler's timer restarted")
+ restart()
} else {
- val firstTime = new Time(timer.start())
- graph.start(firstTime - ssc.graph.batchDuration)
- logInfo("Scheduler's timer started")
+ startFirstTime()
}
logInfo("Scheduler started")
}
- def stop() {
+ def stop() = synchronized {
timer.stop()
- graph.stop()
+ jobManager.stop()
+ ssc.graph.stop()
logInfo("Scheduler stopped")
}
-
- private def generateRDDs(time: Time) {
+
+ private def startFirstTime() {
+ val startTime = new Time(timer.getStartTime())
+ graph.start(startTime - graph.batchDuration)
+ timer.start(startTime.milliseconds)
+ logInfo("Scheduler's timer started at " + startTime)
+ }
+
+ private def restart() {
+
+ // If manual clock is being used for testing, then
+ // either set the manual clock to the last checkpointed time,
+ // or if the property is defined set it to that time
+ if (clock.isInstanceOf[ManualClock]) {
+ val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
+ val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong
+ clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
+ }
+
+ val batchDuration = ssc.graph.batchDuration
+
+ // Batches when the master was down, that is,
+ // between the checkpoint and current restart time
+ val checkpointTime = ssc.initialCheckpoint.checkpointTime
+ val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
+ val downTimes = checkpointTime.until(restartTime, batchDuration)
+ logInfo("Batches during down time: " + downTimes.mkString(", "))
+
+ // Batches that were unprocessed before failure
+ val pendingTimes = ssc.initialCheckpoint.pendingTimes
+ logInfo("Batches pending processing: " + pendingTimes.mkString(", "))
+ // Reschedule jobs for these times
+ val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
+ logInfo("Batches to reschedule: " + timesToReschedule.mkString(", "))
+ timesToReschedule.foreach(time =>
+ graph.generateRDDs(time).foreach(jobManager.runJob)
+ )
+
+ // Restart the timer
+ timer.start(restartTime.milliseconds)
+ logInfo("Scheduler's timer restarted")
+ }
+
+ /** Generates the RDDs, clears old metadata and does checkpoint for the given time */
+ def generateRDDs(time: Time) {
SparkEnv.set(ssc.env)
logInfo("\n-----------------------------------------------------\n")
graph.generateRDDs(time).foreach(jobManager.runJob)
- graph.forgetOldRDDs(time)
doCheckpoint(time)
}
- private def doCheckpoint(time: Time) {
+
+ def clearOldMetadata(time: Time) {
+ ssc.graph.clearOldMetadata(time)
+ }
+
+ def doCheckpoint(time: Time) {
if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
val startTime = System.currentTimeMillis()
diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala
index 8a6c9a5cb5..8201e84a20 100644
--- a/streaming/src/main/scala/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/spark/streaming/Time.scala
@@ -37,6 +37,16 @@ case class Time(private val millis: Long) {
def max(that: Time): Time = if (this > that) this else that
+ def until(that: Time, interval: Duration): Seq[Time] = {
+ assert(that > this, "Cannot create sequence as " + that + " not more than " + this)
+ assert(
+ (that - this).isMultipleOf(interval),
+ "Cannot create sequence as gap between " + that + " and " +
+ this + " is not multiple of " + interval
+ )
+ (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))
+ }
+
override def toString: String = (millis.toString + " ms")
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index 10ccb4318d..41b9bd9461 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -21,19 +21,21 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
+ // Latest file mod time seen till any point of time
private val lastModTimeFiles = new HashSet[String]()
private var lastModTime = 0L
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
- @transient private var files = new HashMap[Time, Array[String]]
+ @transient private[streaming] var files = new HashMap[Time, Array[String]]
override def start() {
if (newFilesOnly) {
- lastModTime = System.currentTimeMillis()
+ lastModTime = graph.zeroTime.milliseconds
} else {
lastModTime = 0
}
+ logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
}
override def stop() { }
@@ -43,38 +45,50 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
* a union RDD out of them. Note that this maintains the list of files that were processed
* in the latest modification time in the previous call to this method. This is because the
* modification time returned by the FileStatus API seems to return times only at the
- * granularity of seconds. Hence, new files may have the same modification time as the
- * latest modification time in the previous call to this method and the list of files
- * maintained is used to filter the one that have been processed.
+ * granularity of seconds. And new files may have the same modification time as the
+ * latest modification time in the previous call to this method yet was not reported in
+ * the previous call.
*/
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
+ assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime)
+
// Create the filter for selecting new files
val newFilter = new PathFilter() {
+ // Latest file mod time seen in this round of fetching files and its corresponding files
var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]()
def accept(path: Path): Boolean = {
- if (!filter(path)) {
+ if (!filter(path)) { // Reject file if it does not satisfy filter
+ logDebug("Rejected by filter " + path)
return false
- } else {
+ } else { // Accept file only if
val modTime = fs.getFileStatus(path).getModificationTime()
- if (modTime < lastModTime){
- return false
+ logDebug("Mod time for " + path + " is " + modTime)
+ if (modTime < lastModTime) {
+ logDebug("Mod time less than last mod time")
+ return false // If the file was created before the last time it was called
} else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
- return false
+ logDebug("Mod time equal to last mod time, but file considered already")
+ return false // If the file was created exactly as lastModTime but not reported yet
+ } else if (modTime > validTime.milliseconds) {
+ logDebug("Mod time more than valid time")
+ return false // If the file was created after the time this function call requires
}
if (modTime > latestModTime) {
latestModTime = modTime
latestModTimeFiles.clear()
+ logDebug("Latest mod time updated to " + latestModTime)
}
latestModTimeFiles += path.toString
+ logDebug("Accepted " + path)
return true
}
}
}
-
+ logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString)
- logInfo("New files: " + newFiles.mkString(", "))
+ logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
if (newFiles.length > 0) {
// Update the modification time and the files processed for that modification time
if (lastModTime != newFilter.latestModTime) {
@@ -82,17 +96,21 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
lastModTimeFiles.clear()
}
lastModTimeFiles ++= newFilter.latestModTimeFiles
+ logDebug("Last mod time updated to " + lastModTime)
}
files += ((validTime, newFiles))
Some(filesToRDD(newFiles))
}
- /** Forget the old time-to-files mappings along with old RDDs */
- protected[streaming] override def forgetOldMetadata(time: Time) {
- super.forgetOldMetadata(time)
- val filesToBeRemoved = files.filter(_._1 <= (time - rememberDuration))
- files --= filesToBeRemoved.keys
- logInfo("Forgot " + filesToBeRemoved.size + " files from " + this)
+ /** Clear the old time-to-files mappings along with old RDDs */
+ protected[streaming] override def clearOldMetadata(time: Time) {
+ super.clearOldMetadata(time)
+ val oldFiles = files.filter(_._1 <= (time - rememberDuration))
+ files --= oldFiles.keys
+ logInfo("Cleared " + oldFiles.size + " old files that were older than " +
+ (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
+ logDebug("Cleared files are:\n" +
+ oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
/** Generate one RDD from an array of files */
@@ -148,6 +166,11 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
}
}
+
+ override def toString() = {
+ "[\n" + hadoopFiles.size + " file sets\n" +
+ hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
+ }
}
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 8c322dd698..ecc75ec913 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -46,8 +46,15 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
def stop() {}
override def compute(validTime: Time): Option[RDD[T]] = {
- val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
- Some(new BlockRDD[T](ssc.sc, blockIds))
+ // If this is called for any time before the start time of the context,
+ // then this returns an empty RDD. This may happen when recovering from a
+ // master failure forces
+ if (validTime >= graph.startTime) {
+ val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
+ Some(new BlockRDD[T](ssc.sc, blockIds))
+ } else {
+ Some(new BlockRDD[T](ssc.sc, Array[String]()))
+ }
}
}
diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
new file mode 100644
index 0000000000..3ffe4b64d0
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala
@@ -0,0 +1,375 @@
+package spark.streaming.util
+
+import spark.{Logging, RDD}
+import spark.streaming._
+import spark.streaming.dstream.ForEachDStream
+import StreamingContext._
+
+import scala.util.Random
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+
+import java.io.{File, ObjectInputStream, IOException}
+import java.util.UUID
+
+import com.google.common.io.Files
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.{FileUtil, FileSystem, Path}
+import org.apache.hadoop.conf.Configuration
+
+
+private[streaming]
+object MasterFailureTest extends Logging {
+ initLogging()
+
+ @volatile var killed = false
+ @volatile var killCount = 0
+
+ def main(args: Array[String]) {
+ if (args.size < 2) {
+ println(
+ "Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]")
+ System.exit(1)
+ }
+ val directory = args(0)
+ val numBatches = args(1).toInt
+ val batchDuration = if (args.size > 2) Milliseconds(args(2).toInt) else Seconds(1)
+
+ println("\n\n========================= MAP TEST =========================\n\n")
+ testMap(directory, numBatches, batchDuration)
+
+ println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n")
+ testUpdateStateByKey(directory, numBatches, batchDuration)
+ }
+
+ def testMap(directory: String, numBatches: Int, batchDuration: Duration) {
+ // Input: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ...
+ val input = (1 to numBatches).map(_.toString).toSeq
+ // Expected output: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ...
+ val expectedOutput = (1 to numBatches)
+
+ val operation = (st: DStream[String]) => st.map(_.toInt)
+
+ // Run streaming operation with multiple master failures
+ val output = testOperation(directory, batchDuration, input, operation, expectedOutput)
+
+ logInfo("Expected output, size = " + expectedOutput.size)
+ logInfo(expectedOutput.mkString("[", ",", "]"))
+ logInfo("Output, size = " + output.size)
+ logInfo(output.mkString("[", ",", "]"))
+
+ // Verify whether all the values of the expected output is present
+ // in the output
+ assert(output.distinct.toSet == expectedOutput.toSet)
+ }
+
+
+ def testUpdateStateByKey(directory: String, numBatches: Int, batchDuration: Duration) {
+ // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
+ val input = (1 to numBatches).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
+ // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
+ val expectedOutput = (1L to numBatches).map(i => (1L to i).reduce(_ + _)).map(j => ("a", j))
+
+ val operation = (st: DStream[String]) => {
+ val updateFunc = (values: Seq[Long], state: Option[Long]) => {
+ Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L))
+ }
+ st.flatMap(_.split(" "))
+ .map(x => (x, 1L))
+ .updateStateByKey[Long](updateFunc)
+ .checkpoint(batchDuration * 5)
+ }
+
+ // Run streaming operation with multiple master failures
+ val output = testOperation(directory, batchDuration, input, operation, expectedOutput)
+
+ logInfo("Expected output, size = " + expectedOutput.size + "\n" + expectedOutput)
+ logInfo("Output, size = " + output.size + "\n" + output)
+
+ // Verify whether all the values in the output are among the expected output values
+ output.foreach(o =>
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+
+ // Verify whether the last expected output value has been generated, there by
+ // confirming that none of the inputs have been missed
+ assert(output.last == expectedOutput.last)
+ }
+
+ /**
+ * Tests stream operation with multiple master failures, and verifies whether the
+ * final set of output values is as expected or not.
+ */
+ def testOperation[T: ClassManifest](
+ directory: String,
+ batchDuration: Duration,
+ input: Seq[String],
+ operation: DStream[String] => DStream[T],
+ expectedOutput: Seq[T]
+ ): Seq[T] = {
+
+ // Just making sure that the expected output does not have duplicates
+ assert(expectedOutput.distinct.toSet == expectedOutput.toSet)
+
+ // Setup the stream computation with the given operation
+ val (ssc, checkpointDir, testDir) = setupStreams(directory, batchDuration, operation)
+
+ // Start generating files in the a different thread
+ val fileGeneratingThread = new FileGeneratingThread(input, testDir, batchDuration.milliseconds)
+ fileGeneratingThread.start()
+
+ // Run the streams and repeatedly kill it until the last expected output
+ // has been generated, or until it has run for twice the expected time
+ val lastExpectedOutput = expectedOutput.last
+ val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2
+ val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun)
+
+ // Delete directories
+ fileGeneratingThread.join()
+ val fs = checkpointDir.getFileSystem(new Configuration())
+ fs.delete(checkpointDir, true)
+ fs.delete(testDir, true)
+ logInfo("Finished test after " + killCount + " failures")
+ mergedOutput
+ }
+
+ /**
+ * Sets up the stream computation with the given operation, directory (local or HDFS),
+ * and batch duration. Returns the streaming context and the directory to which
+ * files should be written for testing.
+ */
+ private def setupStreams[T: ClassManifest](
+ directory: String,
+ batchDuration: Duration,
+ operation: DStream[String] => DStream[T]
+ ): (StreamingContext, Path, Path) = {
+ // Reset all state
+ reset()
+
+ // Create the directories for this test
+ val uuid = UUID.randomUUID().toString
+ val rootDir = new Path(directory, uuid)
+ val fs = rootDir.getFileSystem(new Configuration())
+ val checkpointDir = new Path(rootDir, "checkpoint")
+ val testDir = new Path(rootDir, "test")
+ fs.mkdirs(checkpointDir)
+ fs.mkdirs(testDir)
+
+ // Setup the streaming computation with the given operation
+ System.clearProperty("spark.driver.port")
+ var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration)
+ ssc.checkpoint(checkpointDir.toString)
+ val inputStream = ssc.textFileStream(testDir.toString)
+ val operatedStream = operation(inputStream)
+ val outputStream = new TestOutputStream(operatedStream)
+ ssc.registerOutputStream(outputStream)
+ (ssc, checkpointDir, testDir)
+ }
+
+
+ /**
+ * Repeatedly starts and kills the streaming context until timed out or
+ * the last expected output is generated. Finally, return
+ */
+ private def runStreams[T: ClassManifest](
+ ssc_ : StreamingContext,
+ lastExpectedOutput: T,
+ maxTimeToRun: Long
+ ): Seq[T] = {
+
+ var ssc = ssc_
+ var totalTimeRan = 0L
+ var isLastOutputGenerated = false
+ var isTimedOut = false
+ val mergedOutput = new ArrayBuffer[T]()
+ val checkpointDir = ssc.checkpointDir
+ var batchDuration = ssc.graph.batchDuration
+
+ while(!isLastOutputGenerated && !isTimedOut) {
+ // Get the output buffer
+ val outputBuffer = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[T]].output
+ def output = outputBuffer.flatMap(x => x)
+
+ // Start the thread to kill the streaming after some time
+ killed = false
+ val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10)
+ killingThread.start()
+
+ var timeRan = 0L
+ try {
+ // Start the streaming computation and let it run while ...
+ // (i) StreamingContext has not been shut down yet
+ // (ii) The last expected output has not been generated yet
+ // (iii) Its not timed out yet
+ System.clearProperty("spark.streaming.clock")
+ System.clearProperty("spark.driver.port")
+ ssc.start()
+ val startTime = System.currentTimeMillis()
+ while (!killed && !isLastOutputGenerated && !isTimedOut) {
+ Thread.sleep(100)
+ timeRan = System.currentTimeMillis() - startTime
+ isLastOutputGenerated = (!output.isEmpty && output.last == lastExpectedOutput)
+ isTimedOut = (timeRan + totalTimeRan > maxTimeToRun)
+ }
+ } catch {
+ case e: Exception => logError("Error running streaming context", e)
+ }
+ if (killingThread.isAlive) killingThread.interrupt()
+ ssc.stop()
+
+ logInfo("Has been killed = " + killed)
+ logInfo("Is last output generated = " + isLastOutputGenerated)
+ logInfo("Is timed out = " + isTimedOut)
+
+ // Verify whether the output of each batch has only one element or no element
+ // and then merge the new output with all the earlier output
+ mergedOutput ++= output
+ totalTimeRan += timeRan
+ logInfo("New output = " + output)
+ logInfo("Merged output = " + mergedOutput)
+ logInfo("Time ran = " + timeRan)
+ logInfo("Total time ran = " + totalTimeRan)
+
+ if (!isLastOutputGenerated && !isTimedOut) {
+ val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 10)
+ logInfo(
+ "\n-------------------------------------------\n" +
+ " Restarting stream computation in " + sleepTime + " ms " +
+ "\n-------------------------------------------\n"
+ )
+ Thread.sleep(sleepTime)
+ // Recreate the streaming context from checkpoint
+ ssc = new StreamingContext(checkpointDir)
+ }
+ }
+ mergedOutput
+ }
+
+ /**
+ * Verifies the output value are the same as expected. Since failures can lead to
+ * a batch being processed twice, a batches output may appear more than once
+ * consecutively. To avoid getting confused with those, we eliminate consecutive
+ * duplicate batch outputs of values from the `output`. As a result, the
+ * expected output should not have consecutive batches with the same values as output.
+ */
+ private def verifyOutput[T: ClassManifest](output: Seq[T], expectedOutput: Seq[T]) {
+ // Verify whether expected outputs do not consecutive batches with same output
+ for (i <- 0 until expectedOutput.size - 1) {
+ assert(expectedOutput(i) != expectedOutput(i+1),
+ "Expected output has consecutive duplicate sequence of values")
+ }
+
+ // Log the output
+ println("Expected output, size = " + expectedOutput.size)
+ println(expectedOutput.mkString("[", ",", "]"))
+ println("Output, size = " + output.size)
+ println(output.mkString("[", ",", "]"))
+
+ // Match the output with the expected output
+ output.foreach(o =>
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+ }
+
+ /** Resets counter to prepare for the test */
+ private def reset() {
+ killed = false
+ killCount = 0
+ }
+}
+
+/**
+ * This is a output stream just for testing. All the output is collected into a
+ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ */
+private[streaming]
+class TestOutputStream[T: ClassManifest](
+ parent: DStream[T],
+ val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
+ ) extends ForEachDStream[T](
+ parent,
+ (rdd: RDD[T], t: Time) => {
+ val collected = rdd.collect()
+ output += collected
+ println(t + ": " + collected.mkString("[", ",", "]"))
+ }
+ ) {
+
+ // This is to clear the output buffer every it is read from a checkpoint
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream) {
+ ois.defaultReadObject()
+ output.clear()
+ }
+}
+
+
+/**
+ * Thread to kill streaming context after a random period of time.
+ */
+private[streaming]
+class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
+ initLogging()
+
+ override def run() {
+ try {
+ // If it is the first killing, then allow the first checkpoint to be created
+ var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 1000
+ val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime)
+ logInfo("Kill wait time = " + killWaitTime)
+ Thread.sleep(killWaitTime)
+ logInfo(
+ "\n---------------------------------------\n" +
+ "Killing streaming context after " + killWaitTime + " ms" +
+ "\n---------------------------------------\n"
+ )
+ if (ssc != null) {
+ ssc.stop()
+ MasterFailureTest.killed = true
+ MasterFailureTest.killCount += 1
+ }
+ logInfo("Killing thread finished normally")
+ } catch {
+ case ie: InterruptedException => logInfo("Killing thread interrupted")
+ case e: Exception => logWarning("Exception in killing thread", e)
+ }
+
+ }
+}
+
+
+/**
+ * Thread to generate input files periodically with the desired text.
+ */
+private[streaming]
+class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
+ extends Thread with Logging {
+ initLogging()
+
+ override def run() {
+ val localTestDir = Files.createTempDir()
+ val fs = testDir.getFileSystem(new Configuration())
+ try {
+ Thread.sleep(5000) // To make sure that all the streaming context has been set up
+ for (i <- 0 until input.size) {
+ // Write the data to a local file and then move it to the target test directory
+ val localFile = new File(localTestDir, (i+1).toString)
+ val hadoopFile = new Path(testDir, (i+1).toString)
+ FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
+ //fs.moveFromLocalFile(new Path(localFile.toString), new Path(testDir, i.toString))
+ fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
+ logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
+ Thread.sleep(interval)
+ localFile.delete()
+ }
+ logInfo("File generating thread finished normally")
+ } catch {
+ case ie: InterruptedException => logInfo("File generating thread interrupted")
+ case e: Exception => logWarning("File generating in killing thread", e)
+ } finally {
+ fs.close()
+ }
+ }
+}
+
+
diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
index db715cc295..8e10276deb 100644
--- a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala
@@ -3,9 +3,9 @@ package spark.streaming.util
private[streaming]
class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
- val minPollTime = 25L
+ private val minPollTime = 25L
- val pollTime = {
+ private val pollTime = {
if (period / 10.0 > minPollTime) {
(period / 10.0).toLong
} else {
@@ -13,11 +13,20 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
}
}
- val thread = new Thread() {
+ private val thread = new Thread() {
override def run() { loop }
}
- var nextTime = 0L
+ private var nextTime = 0L
+
+ def getStartTime(): Long = {
+ (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
+ }
+
+ def getRestartTime(originalStartTime: Long): Long = {
+ val gap = clock.currentTime - originalStartTime
+ (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
+ }
def start(startTime: Long): Long = {
nextTime = startTime
@@ -26,21 +35,14 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
}
def start(): Long = {
- val startTime = (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period
- start(startTime)
+ start(getStartTime())
}
- def restart(originalStartTime: Long): Long = {
- val gap = clock.currentTime - originalStartTime
- val newStartTime = (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
- start(newStartTime)
- }
-
- def stop() {
+ def stop() {
thread.interrupt()
}
- def loop() {
+ private def loop() {
try {
while (true) {
clock.waitTillTime(nextTime)
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index fbe4af4597..783a393a8f 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -33,7 +33,8 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint", new Duration(1000));
}
@@ -45,7 +46,7 @@ public class JavaAPISuite implements Serializable {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port");
}
- /*
+
@Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
@@ -434,7 +435,7 @@ public class JavaAPISuite implements Serializable {
assertOrderInvariantEquals(expected, result);
}
- */
+
/*
* Performs an order-invariant comparison of lists representing two RDD streams. This allows
* us to account for ordering variation within individual RDD's which occurs during windowing.
@@ -450,7 +451,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, actual);
}
- /*
+
// PairDStream Functions
@Test
public void testPairFilter() {
@@ -897,7 +898,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
- */
+
@Test
public void testCheckpointMasterRecovery() throws InterruptedException {
List<List<String>> inputData = Arrays.asList(
@@ -964,7 +965,7 @@ public class JavaAPISuite implements Serializable {
assertOrderInvariantEquals(expected, result1);
}
*/
- /*
+
// Input stream tests. These mostly just test that we can instantiate a given InputStream with
// Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
// InputStream functionality is deferred to the existing Scala tests.
@@ -972,9 +973,9 @@ public class JavaAPISuite implements Serializable {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
- JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets);
- JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets,
+ JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
+ JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets);
+ JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets,
StorageLevel.MEMORY_AND_DISK());
}
@@ -1026,5 +1027,5 @@ public class JavaAPISuite implements Serializable {
public void testFileStream() {
JavaPairDStream<String, String> foo =
ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
- }*/
+ }
}
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index edfa1243fa..5652596e1e 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,6 +1,7 @@
# Set everything to be logged to the file streaming/target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
+log4j.rootCategory=WARN, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.ConsoleAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
@@ -8,4 +9,6 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}:
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.spark.streaming=INFO
+log4j.logger.spark.streaming.dstream.FileInputDStream=DEBUG
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index c031949dd1..12388b8887 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -6,6 +6,8 @@ import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
override def framework() = "BasicOperationsSuite"
after {
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 7126af62d9..c89c4a8d43 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -1,5 +1,6 @@
package spark.streaming
+import dstream.FileInputDStream
import spark.streaming.StreamingContext._
import java.io.File
import runtime.RichInt
@@ -10,8 +11,16 @@ import util.{Clock, ManualClock}
import scala.util.Random
import com.google.common.io.Files
+
+/**
+ * This test suites tests the checkpointing functionality of DStreams -
+ * the checkpointing of a DStream's RDDs as well as the checkpointing of
+ * the whole DStream graph.
+ */
class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
before {
FileUtils.deleteDirectory(new File(checkpointDir))
}
@@ -64,7 +73,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a time such that at least one RDD in the stream should have been checkpointed,
// then check whether some RDD has been checkpointed or not
ssc.start()
- runStreamsWithRealDelay(ssc, firstNumBatches)
+ advanceTimeWithRealDelay(ssc, firstNumBatches)
logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.checkpointFiles.foreach {
@@ -77,7 +86,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a further time such that previous checkpoint files in the stream would be deleted
// and check whether the earlier checkpoint files are deleted
val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
- runStreamsWithRealDelay(ssc, secondNumBatches)
+ advanceTimeWithRealDelay(ssc, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
@@ -92,7 +101,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run one batch to generate a new checkpoint file and check whether some RDD
// is present in the checkpoint data or not
ssc.start()
- runStreamsWithRealDelay(ssc, 1)
+ advanceTimeWithRealDelay(ssc, 1)
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
@@ -113,7 +122,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Adjust manual clock time as if it is being restarted after a delay
System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
ssc.start()
- runStreamsWithRealDelay(ssc, 4)
+ advanceTimeWithRealDelay(ssc, 4)
ssc.stop()
System.clearProperty("spark.streaming.manualClock.jump")
ssc = null
@@ -168,74 +177,95 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
// This tests whether file input stream remembers what files were seen before
- // the master failure and uses them again to process a large window operatoin.
+ // the master failure and uses them again to process a large window operation.
// It also tests whether batches, whose processing was incomplete due to the
// failure, are re-processed or not.
test("recovery with file input stream") {
+ // Disable manual clock as FileInputDStream does not work with manual clock
+ val clockProperty = System.getProperty("spark.streaming.clock")
+ System.clearProperty("spark.streaming.clock")
+
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
- var ssc = new StreamingContext(master, framework, batchDuration)
+ var ssc = new StreamingContext(master, framework, Seconds(1))
ssc.checkpoint(checkpointDir, checkpointInterval)
val fileStream = ssc.textFileStream(testDir.toString)
// Making value 3 take large time to process, to ensure that the master
// shuts down in the middle of processing the 3rd batch
val mappedStream = fileStream.map(s => {
val i = s.toInt
- if (i == 3) Thread.sleep(1000)
+ if (i == 3) Thread.sleep(2000)
i
})
+
// Reducing over a large window to ensure that recovery from master failure
// requires reprocessing of all the files seen before the failure
- val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
+ val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
val outputBuffer = new ArrayBuffer[Seq[Int]]
var outputStream = new TestOutputStream(reducedStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
// Create files and advance manual clock to process them
- var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
// wait to make sure that the file is written such that it gets shown in the file listings
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- // wait to make sure that FileInputDStream picks up this file only and not any other file
- Thread.sleep(500)
+ Thread.sleep(1000)
}
logInfo("Output = " + outputStream.output.mkString(","))
assert(outputStream.output.size > 0, "No files processed before restart")
ssc.stop()
+ // Verify whether files created have been recorded correctly or not
+ var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+ def recordedFiles = fileInputDStream.files.values.flatMap(x => x)
+ assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+
// Create files while the master is down
for (i <- Seq(4, 5, 6)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Thread.sleep(1000)
}
- // Restart stream computation from checkpoint and create more files to see whether
- // they are being processed
+ // Recover context from checkpoint file and verify whether the files that were
+ // recorded before failure were saved and successfully recovered
logInfo("*********** RESTARTING ************")
ssc = new StreamingContext(checkpointDir)
+ fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+ assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+
+ // Restart stream computation
ssc.start()
- clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
for (i <- Seq(7, 8, 9)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(500)
+ Thread.sleep(1000)
}
Thread.sleep(1000)
- logInfo("Output = " + outputStream.output.mkString(","))
+ logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()
+ // Verify whether files created while the driver was down have been recorded or not
+ assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
+
+ // Verify whether new files created after recover have been recorded or not
+ assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
+
// Append the new output to the old buffer
outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
outputBuffer ++= outputStream.output
- // Verify whether data received by Spark Streaming was as expected
- val expectedOutput = Seq(1, 3, 6, 28, 36, 45)
+ val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
logInfo("--------------------------------")
logInfo("output, size = " + outputBuffer.size)
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
@@ -244,11 +274,17 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
logInfo("--------------------------------")
// Verify whether all the elements received are as expected
- assert(outputBuffer.size === expectedOutput.size)
- for (i <- 0 until outputBuffer.size) {
- assert(outputBuffer(i).size === 1)
- assert(outputBuffer(i).head === expectedOutput(i))
- }
+ val output = outputBuffer.flatMap(x => x)
+ assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed
+ output.foreach(o => // To ensure all the inputs are correctly added cumulatively
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+ // To ensure that all the inputs were received correctly
+ assert(expectedOutput.last === output.last)
+
+ // Enable manual clock back again for other tests
+ if (clockProperty != null)
+ System.setProperty("spark.streaming.clock", clockProperty)
}
@@ -278,7 +314,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Do the computation for initial number of batches, create checkpoint file and quit
ssc = setupStreams[U, V](input, operation)
- val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
+ ssc.start()
+ val output = advanceTimeWithRealDelay[V](ssc, initialNumBatches)
+ ssc.stop()
verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
Thread.sleep(1000)
@@ -289,17 +327,20 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
"\n-------------------------------------------\n"
)
ssc = new StreamingContext(checkpointDir)
- val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs)
+ System.clearProperty("spark.driver.port")
+ ssc.start()
+ val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
// the first element will be re-processed data of the last batch before restart
verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true)
+ ssc.stop()
ssc = null
}
/**
* Advances the manual clock on the streaming scheduler by given number of batches.
- * It also wait for the expected amount of time for each batch.
+ * It also waits for the expected amount of time for each batch.
*/
- def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) {
+ def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
for (i <- 1 to numBatches.toInt) {
@@ -308,6 +349,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
logInfo("Manual clock after advancing = " + clock.time)
Thread.sleep(batchDuration.milliseconds)
- }
+ val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
+ outputStream.output
+ }
} \ No newline at end of file
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index efaa098d2e..a5fa7ab92d 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -1,14 +1,15 @@
package spark.streaming
-import org.scalatest.{FunSuite, BeforeAndAfter}
-import org.apache.commons.io.FileUtils
-import java.io.File
-import scala.runtime.RichInt
-import scala.util.Random
-import spark.streaming.StreamingContext._
-import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import spark.Logging
+import spark.streaming.util.MasterFailureTest
+import StreamingContext._
+
+import org.scalatest.{FunSuite, BeforeAndAfter}
import com.google.common.io.Files
+import java.io.File
+import org.apache.commons.io.FileUtils
+import collection.mutable.ArrayBuffer
+
/**
* This testsuite tests master failures at random times while the stream is running using
@@ -16,295 +17,24 @@ import com.google.common.io.Files
*/
class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
- var testDir: File = null
- var checkpointDir: File = null
- val batchDuration = Milliseconds(500)
+ var directory = "FailureSuite"
+ val numBatches = 30
+ val batchDuration = Milliseconds(1000)
before {
- testDir = Files.createTempDir()
- checkpointDir = Files.createTempDir()
+ FileUtils.deleteDirectory(new File(directory))
}
after {
- FailureSuite.reset()
- FileUtils.deleteDirectory(checkpointDir)
- FileUtils.deleteDirectory(testDir)
-
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port")
- }
-
- test("multiple failures with updateStateByKey") {
- val n = 30
- // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
- val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
- // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
- val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j))
-
- val operation = (st: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
- }
- st.flatMap(_.split(" "))
- .map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
- .checkpoint(Seconds(2))
- .map(t => (t._1, t._2.self))
- }
-
- testOperationWithMultipleFailures(input, operation, expectedOutput)
- }
-
- test("multiple failures with reduceByKeyAndWindow") {
- val n = 30
- val w = 100
- assert(w > n, "Window should be much larger than the number of input sets in this test")
- // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
- val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
- // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
- val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j))
-
- val operation = (st: DStream[String]) => {
- st.flatMap(_.split(" "))
- .map(x => (x, 1))
- .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
- .checkpoint(Seconds(2))
- }
-
- testOperationWithMultipleFailures(input, operation, expectedOutput)
+ FileUtils.deleteDirectory(new File(directory))
}
-
- /**
- * Tests stream operation with multiple master failures, and verifies whether the
- * final set of output values is as expected or not. Checking the final value is
- * proof that no intermediate data was lost due to master failures.
- */
- def testOperationWithMultipleFailures(
- input: Seq[String],
- operation: DStream[String] => DStream[(String, Int)],
- expectedOutput: Seq[(String, Int)]
- ) {
- var ssc = setupStreamsWithFileStream(operation)
-
- val mergedOutput = new ArrayBuffer[(String, Int)]()
- val lastExpectedOutput = expectedOutput.last
-
- val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2
- var totalTimeRan = 0L
-
- // Start generating files in the a different thread
- val fileGeneratingThread = new FileGeneratingThread(input, testDir.getPath, batchDuration.milliseconds)
- fileGeneratingThread.start()
-
- // Repeatedly start and kill the streaming context until timed out or
- // all expected output is generated
- while(!FailureSuite.outputGenerated && !FailureSuite.timedOut) {
-
- // Start the thread to kill the streaming after some time
- FailureSuite.failed = false
- val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10)
- killingThread.start()
-
- // Run the streams with real clock until last expected output is seen or timed out
- val (output, timeRan) = runStreamsWithRealClock(ssc, lastExpectedOutput, maxTimeToRun - totalTimeRan)
- if (killingThread.isAlive) killingThread.interrupt()
-
- // Merge output and time ran and see whether already timed out or not
- mergedOutput ++= output
- totalTimeRan += timeRan
- logInfo("New output = " + output)
- logInfo("Merged output = " + mergedOutput)
- logInfo("Total time spent = " + totalTimeRan)
- if (totalTimeRan > maxTimeToRun) {
- FailureSuite.timedOut = true
- }
-
- if (!FailureSuite.outputGenerated && !FailureSuite.timedOut) {
- val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 2)
- logInfo(
- "\n-------------------------------------------\n" +
- " Restarting stream computation in " + sleepTime + " ms " +
- "\n-------------------------------------------\n"
- )
- Thread.sleep(sleepTime)
- }
-
- // Recreate the streaming context from checkpoint
- ssc = new StreamingContext(checkpointDir.getPath)
- }
- ssc.stop()
- ssc = null
- logInfo("Finished test after " + FailureSuite.failureCount + " failures")
-
- if (FailureSuite.timedOut) {
- logWarning("Timed out with run time of "+ maxTimeToRun + " ms for " +
- expectedOutput.size + " batches of " + batchDuration)
- }
-
- // Verify whether the output is as expected
- verifyOutput(mergedOutput, expectedOutput)
- if (fileGeneratingThread.isAlive) fileGeneratingThread.interrupt()
+ test("multiple failures with map") {
+ MasterFailureTest.testMap(directory, numBatches, batchDuration)
}
- /** Sets up the stream operations with file input stream */
- def setupStreamsWithFileStream(
- operation: DStream[String] => DStream[(String, Int)]
- ): StreamingContext = {
- val ssc = new StreamingContext("local[4]", "FailureSuite", batchDuration)
- ssc.checkpoint(checkpointDir.getPath)
- val inputStream = ssc.textFileStream(testDir.getPath)
- val operatedStream = operation(inputStream)
- val outputBuffer = new ArrayBuffer[Seq[(String, Int)]] with SynchronizedBuffer[Seq[(String, Int)]]
- val outputStream = new TestOutputStream(operatedStream, outputBuffer)
- ssc.registerOutputStream(outputStream)
- ssc
- }
-
- /**
- * Runs the streams set up in `ssc` on real clock.
- */
- def runStreamsWithRealClock(
- ssc: StreamingContext,
- lastExpectedOutput: (String, Int),
- timeout: Long
- ): (Seq[(String, Int)], Long) = {
-
- System.clearProperty("spark.streaming.clock")
-
- // Get the output buffer
- val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[(String, Int)]]
- val output = outputStream.output
- val startTime = System.currentTimeMillis()
-
- // Functions to detect various conditions
- def hasFailed = FailureSuite.failed
- def isLastOutputGenerated = !output.flatMap(x => x).isEmpty && output(output.lastIndexWhere(!_.isEmpty)).head == lastExpectedOutput
- def isTimedOut = System.currentTimeMillis() - startTime > timeout
-
- // Start the streaming computation and let it run while ...
- // (i) StreamingContext has not been shut down yet
- // (ii) The last expected output has not been generated yet
- // (iii) Its not timed out yet
- try {
- ssc.start()
- while (!hasFailed && !isLastOutputGenerated && !isTimedOut) {
- Thread.sleep(100)
- }
- logInfo("Has failed = " + hasFailed)
- logInfo("Is last output generated = " + isLastOutputGenerated)
- logInfo("Is timed out = " + isTimedOut)
- } catch {
- case e: Exception => logInfo("Exception while running streams: " + e)
- } finally {
- ssc.stop()
- }
-
- // Verify whether the output of each batch has only one element
- assert(output.forall(_.size <= 1), "output of each batch should have only one element")
-
- // Set appropriate flags is timed out or output has been generated
- if (isTimedOut) FailureSuite.timedOut = true
- if (isLastOutputGenerated) FailureSuite.outputGenerated = true
-
- val timeTaken = System.currentTimeMillis() - startTime
- logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms")
- (output.flatMap(_.headOption), timeTaken)
- }
-
- /**
- * Verifies the output value are the same as expected. Since failures can lead to
- * a batch being processed twice, a batches output may appear more than once
- * consecutively. To avoid getting confused with those, we eliminate consecutive
- * duplicate batch outputs of values from the `output`. As a result, the
- * expected output should not have consecutive batches with the same values as output.
- */
- def verifyOutput(output: Seq[(String, Int)], expectedOutput: Seq[(String, Int)]) {
- // Verify whether expected outputs do not consecutive batches with same output
- for (i <- 0 until expectedOutput.size - 1) {
- assert(expectedOutput(i) != expectedOutput(i+1),
- "Expected output has consecutive duplicate sequence of values")
- }
-
- // Match the output with the expected output
- logInfo(
- "\n-------------------------------------------\n" +
- " Verifying output " +
- "\n-------------------------------------------\n"
- )
- logInfo("Expected output, size = " + expectedOutput.size)
- logInfo(expectedOutput.mkString("[", ",", "]"))
- logInfo("Output, size = " + output.size)
- logInfo(output.mkString("[", ",", "]"))
- output.foreach(o =>
- assert(expectedOutput.contains(o), "Expected value " + o + " not found")
- )
- }
-}
-
-object FailureSuite {
- var failed = false
- var outputGenerated = false
- var timedOut = false
- var failureCount = 0
-
- def reset() {
- failed = false
- outputGenerated = false
- timedOut = false
- failureCount = 0
- }
-}
-
-/**
- * Thread to kill streaming context after some time.
- */
-class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
- initLogging()
-
- override def run() {
- try {
- var minKillWaitTime = if (FailureSuite.failureCount == 0) 5000 else 1000 // to allow the first checkpoint
- val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime)
- logInfo("Kill wait time = " + killWaitTime)
- Thread.sleep(killWaitTime)
- logInfo(
- "\n---------------------------------------\n" +
- "Killing streaming context after " + killWaitTime + " ms" +
- "\n---------------------------------------\n"
- )
- if (ssc != null) {
- ssc.stop()
- FailureSuite.failed = true
- FailureSuite.failureCount += 1
- }
- logInfo("Killing thread exited")
- } catch {
- case ie: InterruptedException => logInfo("Killing thread interrupted")
- case e: Exception => logWarning("Exception in killing thread", e)
- }
- }
-}
-
-/**
- * Thread to generate input files periodically with the desired text
- */
-class FileGeneratingThread(input: Seq[String], testDir: String, interval: Long)
- extends Thread with Logging {
- initLogging()
-
- override def run() {
- try {
- Thread.sleep(5000) // To make sure that all the streaming context has been set up
- for (i <- 0 until input.size) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
- Thread.sleep(interval)
- }
- logInfo("File generating thread exited")
- } catch {
- case ie: InterruptedException => logInfo("File generating thread interrupted")
- case e: Exception => logWarning("File generating in killing thread", e)
- }
+ test("multiple failures with updateStateByKey") {
+ MasterFailureTest.testUpdateStateByKey(directory, numBatches, batchDuration)
}
}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 0eb9c7b81e..7c1c2e1040 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -133,26 +133,29 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("file input stream") {
+ // Disable manual clock as FileInputDStream does not work with manual clock
+ System.clearProperty("spark.streaming.clock")
+
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
val ssc = new StreamingContext(master, framework, batchDuration)
- val filestream = ssc.textFileStream(testDir.toString)
+ val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
- val outputStream = new TestOutputStream(filestream, outputBuffer)
+ val outputStream = new TestOutputStream(fileStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
// Create files in the temporary directory so that Spark Streaming can read data from it
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
Thread.sleep(1000)
for (i <- 0 until input.size) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- //Thread.sleep(100)
+ val file = new File(testDir, i.toString)
+ FileUtils.writeStringToFile(file, input(i).toString + "\n")
+ logInfo("Created file " + file)
+ Thread.sleep(batchDuration.milliseconds)
+ Thread.sleep(1000)
}
val startTime = System.currentTimeMillis()
Thread.sleep(1000)
@@ -171,16 +174,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
- assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
- assert(output(i).size === 1)
- assert(output(i).head.toString === expectedOutput(i))
- }
+ assert(output.toList === expectedOutput.toList)
+
FileUtils.deleteDirectory(testDir)
+
+ // Enable manual clock back again for other tests
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
}
}
-
+/** This is server to test the network input stream */
class TestServer(port: Int) extends Logging {
val queue = new ArrayBlockingQueue[String](100)
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index c2733831b2..2cc31d6137 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -63,20 +63,28 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
*/
trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
+ // Name of the framework for Spark context
def framework = "TestSuiteBase"
+ // Master for Spark context
def master = "local[2]"
+ // Batch duration
def batchDuration = Seconds(1)
+ // Directory where the checkpoint data will be saved
def checkpointDir = "checkpoint"
+ // Duration after which the graph is checkpointed
def checkpointInterval = batchDuration
+ // Number of partitions of the input parallel collections created for testing
def numInputPartitions = 2
+ // Maximum time to wait before the test times out
def maxWaitTimeMillis = 10000
+ // Whether to actually wait in real time before changing manual clock
def actuallyWait = false
/**
@@ -140,9 +148,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
numBatches: Int,
numExpectedOutput: Int
): Seq[Seq[V]] = {
-
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
-
assert(numBatches > 0, "Number of batches to run stream computation is zero")
assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)
@@ -186,7 +191,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
} finally {
ssc.stop()
}
-
output
}
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index cd9608df53..1080790147 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -5,6 +5,8 @@ import collection.mutable.ArrayBuffer
class WindowOperationsSuite extends TestSuiteBase {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
override def framework = "WindowOperationsSuite"
override def maxWaitTimeMillis = 20000