aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala281
1 files changed, 200 insertions, 81 deletions
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index c4cfffbfc1..efaa098d2e 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -1,58 +1,58 @@
package spark.streaming
-import org.scalatest.BeforeAndAfter
+import org.scalatest.{FunSuite, BeforeAndAfter}
import org.apache.commons.io.FileUtils
import java.io.File
import scala.runtime.RichInt
import scala.util.Random
import spark.streaming.StreamingContext._
-import collection.mutable.ArrayBuffer
+import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import spark.Logging
+import com.google.common.io.Files
/**
* This testsuite tests master failures at random times while the stream is running using
* the real clock.
*/
-class FailureSuite extends TestSuiteBase with BeforeAndAfter {
+class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
+
+ var testDir: File = null
+ var checkpointDir: File = null
+ val batchDuration = Milliseconds(500)
before {
- FileUtils.deleteDirectory(new File(checkpointDir))
+ testDir = Files.createTempDir()
+ checkpointDir = Files.createTempDir()
}
after {
FailureSuite.reset()
- FileUtils.deleteDirectory(new File(checkpointDir))
+ FileUtils.deleteDirectory(checkpointDir)
+ FileUtils.deleteDirectory(testDir)
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
}
- override def framework = "CheckpointSuite"
-
- override def batchDuration = Milliseconds(500)
-
- override def checkpointDir = "checkpoint"
-
- override def checkpointInterval = batchDuration
-
test("multiple failures with updateStateByKey") {
val n = 30
// Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
- val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq
- // Last output: [ (a, 465) ] for n=30
- val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) )
+ val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
+ // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
+ val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j))
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
}
- st.map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
- .checkpoint(Seconds(2))
- .map(t => (t._1, t._2.self))
+ st.flatMap(_.split(" "))
+ .map(x => (x, 1))
+ .updateStateByKey[RichInt](updateFunc)
+ .checkpoint(Seconds(2))
+ .map(t => (t._1, t._2.self))
}
- testOperationWithMultipleFailures(input, operation, lastOutput, n, n)
+ testOperationWithMultipleFailures(input, operation, expectedOutput)
}
test("multiple failures with reduceByKeyAndWindow") {
@@ -60,17 +60,18 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter {
val w = 100
assert(w > n, "Window should be much larger than the number of input sets in this test")
// Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
- val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq
- // Last output: [ (a, 465) ]
- val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) )
+ val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
+ // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
+ val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j))
val operation = (st: DStream[String]) => {
- st.map(x => (x, 1))
+ st.flatMap(_.split(" "))
+ .map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
.checkpoint(Seconds(2))
}
- testOperationWithMultipleFailures(input, operation, lastOutput, n, n)
+ testOperationWithMultipleFailures(input, operation, expectedOutput)
}
@@ -79,113 +80,231 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter {
* final set of output values is as expected or not. Checking the final value is
* proof that no intermediate data was lost due to master failures.
*/
- def testOperationWithMultipleFailures[U: ClassManifest, V: ClassManifest](
- input: Seq[Seq[U]],
- operation: DStream[U] => DStream[V],
- lastExpectedOutput: Seq[V],
- numBatches: Int,
- numExpectedOutput: Int
+ def testOperationWithMultipleFailures(
+ input: Seq[String],
+ operation: DStream[String] => DStream[(String, Int)],
+ expectedOutput: Seq[(String, Int)]
) {
- var ssc = setupStreams[U, V](input, operation)
- val mergedOutput = new ArrayBuffer[Seq[V]]()
+ var ssc = setupStreamsWithFileStream(operation)
+
+ val mergedOutput = new ArrayBuffer[(String, Int)]()
+ val lastExpectedOutput = expectedOutput.last
+ val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2
var totalTimeRan = 0L
- while(totalTimeRan <= numBatches * batchDuration.milliseconds * 2) {
- new KillingThread(ssc, numBatches * batchDuration.milliseconds.toInt / 4).start()
- val (output, timeRan) = runStreamsWithRealClock[V](ssc, numBatches, numExpectedOutput)
+ // Start generating files in the a different thread
+ val fileGeneratingThread = new FileGeneratingThread(input, testDir.getPath, batchDuration.milliseconds)
+ fileGeneratingThread.start()
+
+ // Repeatedly start and kill the streaming context until timed out or
+ // all expected output is generated
+ while(!FailureSuite.outputGenerated && !FailureSuite.timedOut) {
+
+ // Start the thread to kill the streaming after some time
+ FailureSuite.failed = false
+ val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10)
+ killingThread.start()
+
+ // Run the streams with real clock until last expected output is seen or timed out
+ val (output, timeRan) = runStreamsWithRealClock(ssc, lastExpectedOutput, maxTimeToRun - totalTimeRan)
+ if (killingThread.isAlive) killingThread.interrupt()
+
+ // Merge output and time ran and see whether already timed out or not
mergedOutput ++= output
totalTimeRan += timeRan
logInfo("New output = " + output)
logInfo("Merged output = " + mergedOutput)
logInfo("Total time spent = " + totalTimeRan)
- val sleepTime = Random.nextInt(numBatches * batchDuration.milliseconds.toInt / 8)
- logInfo(
- "\n-------------------------------------------\n" +
- " Restarting stream computation in " + sleepTime + " ms " +
- "\n-------------------------------------------\n"
- )
- Thread.sleep(sleepTime)
- FailureSuite.failed = false
- ssc = new StreamingContext(checkpointDir)
+ if (totalTimeRan > maxTimeToRun) {
+ FailureSuite.timedOut = true
+ }
+
+ if (!FailureSuite.outputGenerated && !FailureSuite.timedOut) {
+ val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 2)
+ logInfo(
+ "\n-------------------------------------------\n" +
+ " Restarting stream computation in " + sleepTime + " ms " +
+ "\n-------------------------------------------\n"
+ )
+ Thread.sleep(sleepTime)
+ }
+
+ // Recreate the streaming context from checkpoint
+ ssc = new StreamingContext(checkpointDir.getPath)
}
ssc.stop()
ssc = null
+ logInfo("Finished test after " + FailureSuite.failureCount + " failures")
+
+ if (FailureSuite.timedOut) {
+ logWarning("Timed out with run time of "+ maxTimeToRun + " ms for " +
+ expectedOutput.size + " batches of " + batchDuration)
+ }
+
+ // Verify whether the output is as expected
+ verifyOutput(mergedOutput, expectedOutput)
+ if (fileGeneratingThread.isAlive) fileGeneratingThread.interrupt()
+ }
- // Verify whether the last output is the expected one
- val lastOutput = mergedOutput(mergedOutput.lastIndexWhere(!_.isEmpty))
- assert(lastOutput.toSet === lastExpectedOutput.toSet)
- logInfo("Finished computation after " + FailureSuite.failureCount + " failures")
+ /** Sets up the stream operations with file input stream */
+ def setupStreamsWithFileStream(
+ operation: DStream[String] => DStream[(String, Int)]
+ ): StreamingContext = {
+ val ssc = new StreamingContext("local[4]", "FailureSuite", batchDuration)
+ ssc.checkpoint(checkpointDir.getPath)
+ val inputStream = ssc.textFileStream(testDir.getPath)
+ val operatedStream = operation(inputStream)
+ val outputBuffer = new ArrayBuffer[Seq[(String, Int)]] with SynchronizedBuffer[Seq[(String, Int)]]
+ val outputStream = new TestOutputStream(operatedStream, outputBuffer)
+ ssc.registerOutputStream(outputStream)
+ ssc
}
/**
- * Runs the streams set up in `ssc` on real clock until the expected max number of
+ * Runs the streams set up in `ssc` on real clock.
*/
- def runStreamsWithRealClock[V: ClassManifest](
- ssc: StreamingContext,
- numBatches: Int,
- maxExpectedOutput: Int
- ): (Seq[Seq[V]], Long) = {
+ def runStreamsWithRealClock(
+ ssc: StreamingContext,
+ lastExpectedOutput: (String, Int),
+ timeout: Long
+ ): (Seq[(String, Int)], Long) = {
System.clearProperty("spark.streaming.clock")
- assert(numBatches > 0, "Number of batches to run stream computation is zero")
- assert(maxExpectedOutput > 0, "Max expected outputs after " + numBatches + " is zero")
- logInfo("numBatches = " + numBatches + ", maxExpectedOutput = " + maxExpectedOutput)
-
// Get the output buffer
- val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
+ val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[(String, Int)]]
val output = outputStream.output
- val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong
val startTime = System.currentTimeMillis()
+ // Functions to detect various conditions
+ def hasFailed = FailureSuite.failed
+ def isLastOutputGenerated = !output.flatMap(x => x).isEmpty && output(output.lastIndexWhere(!_.isEmpty)).head == lastExpectedOutput
+ def isTimedOut = System.currentTimeMillis() - startTime > timeout
+
+ // Start the streaming computation and let it run while ...
+ // (i) StreamingContext has not been shut down yet
+ // (ii) The last expected output has not been generated yet
+ // (iii) Its not timed out yet
try {
- // Start computation
ssc.start()
-
- // Wait until expected number of output items have been generated
- while (output.size < maxExpectedOutput && System.currentTimeMillis() - startTime < waitTime && !FailureSuite.failed) {
- logInfo("output.size = " + output.size + ", maxExpectedOutput = " + maxExpectedOutput)
+ while (!hasFailed && !isLastOutputGenerated && !isTimedOut) {
Thread.sleep(100)
}
+ logInfo("Has failed = " + hasFailed)
+ logInfo("Is last output generated = " + isLastOutputGenerated)
+ logInfo("Is timed out = " + isTimedOut)
} catch {
case e: Exception => logInfo("Exception while running streams: " + e)
} finally {
ssc.stop()
}
+
+ // Verify whether the output of each batch has only one element
+ assert(output.forall(_.size <= 1), "output of each batch should have only one element")
+
+ // Set appropriate flags is timed out or output has been generated
+ if (isTimedOut) FailureSuite.timedOut = true
+ if (isLastOutputGenerated) FailureSuite.outputGenerated = true
+
val timeTaken = System.currentTimeMillis() - startTime
logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms")
- (output, timeTaken)
+ (output.flatMap(_.headOption), timeTaken)
}
+ /**
+ * Verifies the output value are the same as expected. Since failures can lead to
+ * a batch being processed twice, a batches output may appear more than once
+ * consecutively. To avoid getting confused with those, we eliminate consecutive
+ * duplicate batch outputs of values from the `output`. As a result, the
+ * expected output should not have consecutive batches with the same values as output.
+ */
+ def verifyOutput(output: Seq[(String, Int)], expectedOutput: Seq[(String, Int)]) {
+ // Verify whether expected outputs do not consecutive batches with same output
+ for (i <- 0 until expectedOutput.size - 1) {
+ assert(expectedOutput(i) != expectedOutput(i+1),
+ "Expected output has consecutive duplicate sequence of values")
+ }
+ // Match the output with the expected output
+ logInfo(
+ "\n-------------------------------------------\n" +
+ " Verifying output " +
+ "\n-------------------------------------------\n"
+ )
+ logInfo("Expected output, size = " + expectedOutput.size)
+ logInfo(expectedOutput.mkString("[", ",", "]"))
+ logInfo("Output, size = " + output.size)
+ logInfo(output.mkString("[", ",", "]"))
+ output.foreach(o =>
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+ }
}
object FailureSuite {
var failed = false
+ var outputGenerated = false
+ var timedOut = false
var failureCount = 0
def reset() {
failed = false
+ outputGenerated = false
+ timedOut = false
failureCount = 0
}
}
-class KillingThread(ssc: StreamingContext, maxKillWaitTime: Int) extends Thread with Logging {
+/**
+ * Thread to kill streaming context after some time.
+ */
+class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
initLogging()
override def run() {
- var minKillWaitTime = if (FailureSuite.failureCount == 0) 3000 else 1000 // to allow the first checkpoint
- val killWaitTime = minKillWaitTime + Random.nextInt(maxKillWaitTime)
- logInfo("Kill wait time = " + killWaitTime)
- Thread.sleep(killWaitTime.toLong)
- logInfo(
- "\n---------------------------------------\n" +
- "Killing streaming context after " + killWaitTime + " ms" +
- "\n---------------------------------------\n"
- )
- if (ssc != null) ssc.stop()
- FailureSuite.failed = true
- FailureSuite.failureCount += 1
+ try {
+ var minKillWaitTime = if (FailureSuite.failureCount == 0) 5000 else 1000 // to allow the first checkpoint
+ val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime)
+ logInfo("Kill wait time = " + killWaitTime)
+ Thread.sleep(killWaitTime)
+ logInfo(
+ "\n---------------------------------------\n" +
+ "Killing streaming context after " + killWaitTime + " ms" +
+ "\n---------------------------------------\n"
+ )
+ if (ssc != null) {
+ ssc.stop()
+ FailureSuite.failed = true
+ FailureSuite.failureCount += 1
+ }
+ logInfo("Killing thread exited")
+ } catch {
+ case ie: InterruptedException => logInfo("Killing thread interrupted")
+ case e: Exception => logWarning("Exception in killing thread", e)
+ }
}
}
+
+/**
+ * Thread to generate input files periodically with the desired text
+ */
+class FileGeneratingThread(input: Seq[String], testDir: String, interval: Long)
+ extends Thread with Logging {
+ initLogging()
+
+ override def run() {
+ try {
+ Thread.sleep(5000) // To make sure that all the streaming context has been set up
+ for (i <- 0 until input.size) {
+ FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
+ Thread.sleep(interval)
+ }
+ logInfo("File generating thread exited")
+ } catch {
+ case ie: InterruptedException => logInfo("File generating thread interrupted")
+ case e: Exception => logWarning("File generating in killing thread", e)
+ }
+ }
+}
+