aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-13 12:17:45 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-13 12:17:45 -0800
commit39addd380363c0371e935fae50983fe87158c1ac (patch)
tree0b428024e5ed5faac707ccb5b5573b0a2400d3b2 /streaming/src/test
parentfd90daf850a922fe33c3638b18304d827953e2cb (diff)
downloadspark-39addd380363c0371e935fae50983fe87158c1ac.tar.gz
spark-39addd380363c0371e935fae50983fe87158c1ac.tar.bz2
spark-39addd380363c0371e935fae50983fe87158c1ac.zip
Changed scheduler and file input stream to fix bugs in the driver fault tolerance. Added MasterFailureTest to rigorously test master fault tolerance with file input stream.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java21
-rw-r--r--streaming/src/test/resources/log4j.properties7
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala107
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala304
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala29
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala12
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala2
8 files changed, 136 insertions, 348 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index fbe4af4597..783a393a8f 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -33,7 +33,8 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint", new Duration(1000));
}
@@ -45,7 +46,7 @@ public class JavaAPISuite implements Serializable {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port");
}
- /*
+
@Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
@@ -434,7 +435,7 @@ public class JavaAPISuite implements Serializable {
assertOrderInvariantEquals(expected, result);
}
- */
+
/*
* Performs an order-invariant comparison of lists representing two RDD streams. This allows
* us to account for ordering variation within individual RDD's which occurs during windowing.
@@ -450,7 +451,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, actual);
}
- /*
+
// PairDStream Functions
@Test
public void testPairFilter() {
@@ -897,7 +898,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
- */
+
@Test
public void testCheckpointMasterRecovery() throws InterruptedException {
List<List<String>> inputData = Arrays.asList(
@@ -964,7 +965,7 @@ public class JavaAPISuite implements Serializable {
assertOrderInvariantEquals(expected, result1);
}
*/
- /*
+
// Input stream tests. These mostly just test that we can instantiate a given InputStream with
// Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
// InputStream functionality is deferred to the existing Scala tests.
@@ -972,9 +973,9 @@ public class JavaAPISuite implements Serializable {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
- JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets);
- JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets,
+ JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
+ JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets);
+ JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets,
StorageLevel.MEMORY_AND_DISK());
}
@@ -1026,5 +1027,5 @@ public class JavaAPISuite implements Serializable {
public void testFileStream() {
JavaPairDStream<String, String> foo =
ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
- }*/
+ }
}
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index edfa1243fa..5652596e1e 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,6 +1,7 @@
# Set everything to be logged to the file streaming/target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
+log4j.rootCategory=WARN, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.ConsoleAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
@@ -8,4 +9,6 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}:
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.spark.streaming=INFO
+log4j.logger.spark.streaming.dstream.FileInputDStream=DEBUG
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index c031949dd1..12388b8887 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -6,6 +6,8 @@ import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
override def framework() = "BasicOperationsSuite"
after {
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 7126af62d9..c89c4a8d43 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -1,5 +1,6 @@
package spark.streaming
+import dstream.FileInputDStream
import spark.streaming.StreamingContext._
import java.io.File
import runtime.RichInt
@@ -10,8 +11,16 @@ import util.{Clock, ManualClock}
import scala.util.Random
import com.google.common.io.Files
+
+/**
+ * This test suites tests the checkpointing functionality of DStreams -
+ * the checkpointing of a DStream's RDDs as well as the checkpointing of
+ * the whole DStream graph.
+ */
class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
before {
FileUtils.deleteDirectory(new File(checkpointDir))
}
@@ -64,7 +73,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a time such that at least one RDD in the stream should have been checkpointed,
// then check whether some RDD has been checkpointed or not
ssc.start()
- runStreamsWithRealDelay(ssc, firstNumBatches)
+ advanceTimeWithRealDelay(ssc, firstNumBatches)
logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.checkpointFiles.foreach {
@@ -77,7 +86,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a further time such that previous checkpoint files in the stream would be deleted
// and check whether the earlier checkpoint files are deleted
val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
- runStreamsWithRealDelay(ssc, secondNumBatches)
+ advanceTimeWithRealDelay(ssc, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
@@ -92,7 +101,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run one batch to generate a new checkpoint file and check whether some RDD
// is present in the checkpoint data or not
ssc.start()
- runStreamsWithRealDelay(ssc, 1)
+ advanceTimeWithRealDelay(ssc, 1)
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
@@ -113,7 +122,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Adjust manual clock time as if it is being restarted after a delay
System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
ssc.start()
- runStreamsWithRealDelay(ssc, 4)
+ advanceTimeWithRealDelay(ssc, 4)
ssc.stop()
System.clearProperty("spark.streaming.manualClock.jump")
ssc = null
@@ -168,74 +177,95 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
// This tests whether file input stream remembers what files were seen before
- // the master failure and uses them again to process a large window operatoin.
+ // the master failure and uses them again to process a large window operation.
// It also tests whether batches, whose processing was incomplete due to the
// failure, are re-processed or not.
test("recovery with file input stream") {
+ // Disable manual clock as FileInputDStream does not work with manual clock
+ val clockProperty = System.getProperty("spark.streaming.clock")
+ System.clearProperty("spark.streaming.clock")
+
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
- var ssc = new StreamingContext(master, framework, batchDuration)
+ var ssc = new StreamingContext(master, framework, Seconds(1))
ssc.checkpoint(checkpointDir, checkpointInterval)
val fileStream = ssc.textFileStream(testDir.toString)
// Making value 3 take large time to process, to ensure that the master
// shuts down in the middle of processing the 3rd batch
val mappedStream = fileStream.map(s => {
val i = s.toInt
- if (i == 3) Thread.sleep(1000)
+ if (i == 3) Thread.sleep(2000)
i
})
+
// Reducing over a large window to ensure that recovery from master failure
// requires reprocessing of all the files seen before the failure
- val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
+ val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
val outputBuffer = new ArrayBuffer[Seq[Int]]
var outputStream = new TestOutputStream(reducedStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
// Create files and advance manual clock to process them
- var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
// wait to make sure that the file is written such that it gets shown in the file listings
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- // wait to make sure that FileInputDStream picks up this file only and not any other file
- Thread.sleep(500)
+ Thread.sleep(1000)
}
logInfo("Output = " + outputStream.output.mkString(","))
assert(outputStream.output.size > 0, "No files processed before restart")
ssc.stop()
+ // Verify whether files created have been recorded correctly or not
+ var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+ def recordedFiles = fileInputDStream.files.values.flatMap(x => x)
+ assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+
// Create files while the master is down
for (i <- Seq(4, 5, 6)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Thread.sleep(1000)
}
- // Restart stream computation from checkpoint and create more files to see whether
- // they are being processed
+ // Recover context from checkpoint file and verify whether the files that were
+ // recorded before failure were saved and successfully recovered
logInfo("*********** RESTARTING ************")
ssc = new StreamingContext(checkpointDir)
+ fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+ assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+
+ // Restart stream computation
ssc.start()
- clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
for (i <- Seq(7, 8, 9)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(500)
+ Thread.sleep(1000)
}
Thread.sleep(1000)
- logInfo("Output = " + outputStream.output.mkString(","))
+ logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()
+ // Verify whether files created while the driver was down have been recorded or not
+ assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
+
+ // Verify whether new files created after recover have been recorded or not
+ assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
+
// Append the new output to the old buffer
outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
outputBuffer ++= outputStream.output
- // Verify whether data received by Spark Streaming was as expected
- val expectedOutput = Seq(1, 3, 6, 28, 36, 45)
+ val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
logInfo("--------------------------------")
logInfo("output, size = " + outputBuffer.size)
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
@@ -244,11 +274,17 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
logInfo("--------------------------------")
// Verify whether all the elements received are as expected
- assert(outputBuffer.size === expectedOutput.size)
- for (i <- 0 until outputBuffer.size) {
- assert(outputBuffer(i).size === 1)
- assert(outputBuffer(i).head === expectedOutput(i))
- }
+ val output = outputBuffer.flatMap(x => x)
+ assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed
+ output.foreach(o => // To ensure all the inputs are correctly added cumulatively
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+ // To ensure that all the inputs were received correctly
+ assert(expectedOutput.last === output.last)
+
+ // Enable manual clock back again for other tests
+ if (clockProperty != null)
+ System.setProperty("spark.streaming.clock", clockProperty)
}
@@ -278,7 +314,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Do the computation for initial number of batches, create checkpoint file and quit
ssc = setupStreams[U, V](input, operation)
- val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
+ ssc.start()
+ val output = advanceTimeWithRealDelay[V](ssc, initialNumBatches)
+ ssc.stop()
verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
Thread.sleep(1000)
@@ -289,17 +327,20 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
"\n-------------------------------------------\n"
)
ssc = new StreamingContext(checkpointDir)
- val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs)
+ System.clearProperty("spark.driver.port")
+ ssc.start()
+ val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
// the first element will be re-processed data of the last batch before restart
verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true)
+ ssc.stop()
ssc = null
}
/**
* Advances the manual clock on the streaming scheduler by given number of batches.
- * It also wait for the expected amount of time for each batch.
+ * It also waits for the expected amount of time for each batch.
*/
- def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) {
+ def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
for (i <- 1 to numBatches.toInt) {
@@ -308,6 +349,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
logInfo("Manual clock after advancing = " + clock.time)
Thread.sleep(batchDuration.milliseconds)
- }
+ val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
+ outputStream.output
+ }
} \ No newline at end of file
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index efaa098d2e..a5fa7ab92d 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -1,14 +1,15 @@
package spark.streaming
-import org.scalatest.{FunSuite, BeforeAndAfter}
-import org.apache.commons.io.FileUtils
-import java.io.File
-import scala.runtime.RichInt
-import scala.util.Random
-import spark.streaming.StreamingContext._
-import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import spark.Logging
+import spark.streaming.util.MasterFailureTest
+import StreamingContext._
+
+import org.scalatest.{FunSuite, BeforeAndAfter}
import com.google.common.io.Files
+import java.io.File
+import org.apache.commons.io.FileUtils
+import collection.mutable.ArrayBuffer
+
/**
* This testsuite tests master failures at random times while the stream is running using
@@ -16,295 +17,24 @@ import com.google.common.io.Files
*/
class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
- var testDir: File = null
- var checkpointDir: File = null
- val batchDuration = Milliseconds(500)
+ var directory = "FailureSuite"
+ val numBatches = 30
+ val batchDuration = Milliseconds(1000)
before {
- testDir = Files.createTempDir()
- checkpointDir = Files.createTempDir()
+ FileUtils.deleteDirectory(new File(directory))
}
after {
- FailureSuite.reset()
- FileUtils.deleteDirectory(checkpointDir)
- FileUtils.deleteDirectory(testDir)
-
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.driver.port")
- }
-
- test("multiple failures with updateStateByKey") {
- val n = 30
- // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
- val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
- // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
- val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j))
-
- val operation = (st: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
- }
- st.flatMap(_.split(" "))
- .map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
- .checkpoint(Seconds(2))
- .map(t => (t._1, t._2.self))
- }
-
- testOperationWithMultipleFailures(input, operation, expectedOutput)
- }
-
- test("multiple failures with reduceByKeyAndWindow") {
- val n = 30
- val w = 100
- assert(w > n, "Window should be much larger than the number of input sets in this test")
- // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
- val input = (1 to n).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
- // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
- val expectedOutput = (1 to n).map(i => (1 to i).reduce(_ + _)).map(j => ("a", j))
-
- val operation = (st: DStream[String]) => {
- st.flatMap(_.split(" "))
- .map(x => (x, 1))
- .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
- .checkpoint(Seconds(2))
- }
-
- testOperationWithMultipleFailures(input, operation, expectedOutput)
+ FileUtils.deleteDirectory(new File(directory))
}
-
- /**
- * Tests stream operation with multiple master failures, and verifies whether the
- * final set of output values is as expected or not. Checking the final value is
- * proof that no intermediate data was lost due to master failures.
- */
- def testOperationWithMultipleFailures(
- input: Seq[String],
- operation: DStream[String] => DStream[(String, Int)],
- expectedOutput: Seq[(String, Int)]
- ) {
- var ssc = setupStreamsWithFileStream(operation)
-
- val mergedOutput = new ArrayBuffer[(String, Int)]()
- val lastExpectedOutput = expectedOutput.last
-
- val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2
- var totalTimeRan = 0L
-
- // Start generating files in the a different thread
- val fileGeneratingThread = new FileGeneratingThread(input, testDir.getPath, batchDuration.milliseconds)
- fileGeneratingThread.start()
-
- // Repeatedly start and kill the streaming context until timed out or
- // all expected output is generated
- while(!FailureSuite.outputGenerated && !FailureSuite.timedOut) {
-
- // Start the thread to kill the streaming after some time
- FailureSuite.failed = false
- val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10)
- killingThread.start()
-
- // Run the streams with real clock until last expected output is seen or timed out
- val (output, timeRan) = runStreamsWithRealClock(ssc, lastExpectedOutput, maxTimeToRun - totalTimeRan)
- if (killingThread.isAlive) killingThread.interrupt()
-
- // Merge output and time ran and see whether already timed out or not
- mergedOutput ++= output
- totalTimeRan += timeRan
- logInfo("New output = " + output)
- logInfo("Merged output = " + mergedOutput)
- logInfo("Total time spent = " + totalTimeRan)
- if (totalTimeRan > maxTimeToRun) {
- FailureSuite.timedOut = true
- }
-
- if (!FailureSuite.outputGenerated && !FailureSuite.timedOut) {
- val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 2)
- logInfo(
- "\n-------------------------------------------\n" +
- " Restarting stream computation in " + sleepTime + " ms " +
- "\n-------------------------------------------\n"
- )
- Thread.sleep(sleepTime)
- }
-
- // Recreate the streaming context from checkpoint
- ssc = new StreamingContext(checkpointDir.getPath)
- }
- ssc.stop()
- ssc = null
- logInfo("Finished test after " + FailureSuite.failureCount + " failures")
-
- if (FailureSuite.timedOut) {
- logWarning("Timed out with run time of "+ maxTimeToRun + " ms for " +
- expectedOutput.size + " batches of " + batchDuration)
- }
-
- // Verify whether the output is as expected
- verifyOutput(mergedOutput, expectedOutput)
- if (fileGeneratingThread.isAlive) fileGeneratingThread.interrupt()
+ test("multiple failures with map") {
+ MasterFailureTest.testMap(directory, numBatches, batchDuration)
}
- /** Sets up the stream operations with file input stream */
- def setupStreamsWithFileStream(
- operation: DStream[String] => DStream[(String, Int)]
- ): StreamingContext = {
- val ssc = new StreamingContext("local[4]", "FailureSuite", batchDuration)
- ssc.checkpoint(checkpointDir.getPath)
- val inputStream = ssc.textFileStream(testDir.getPath)
- val operatedStream = operation(inputStream)
- val outputBuffer = new ArrayBuffer[Seq[(String, Int)]] with SynchronizedBuffer[Seq[(String, Int)]]
- val outputStream = new TestOutputStream(operatedStream, outputBuffer)
- ssc.registerOutputStream(outputStream)
- ssc
- }
-
- /**
- * Runs the streams set up in `ssc` on real clock.
- */
- def runStreamsWithRealClock(
- ssc: StreamingContext,
- lastExpectedOutput: (String, Int),
- timeout: Long
- ): (Seq[(String, Int)], Long) = {
-
- System.clearProperty("spark.streaming.clock")
-
- // Get the output buffer
- val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[(String, Int)]]
- val output = outputStream.output
- val startTime = System.currentTimeMillis()
-
- // Functions to detect various conditions
- def hasFailed = FailureSuite.failed
- def isLastOutputGenerated = !output.flatMap(x => x).isEmpty && output(output.lastIndexWhere(!_.isEmpty)).head == lastExpectedOutput
- def isTimedOut = System.currentTimeMillis() - startTime > timeout
-
- // Start the streaming computation and let it run while ...
- // (i) StreamingContext has not been shut down yet
- // (ii) The last expected output has not been generated yet
- // (iii) Its not timed out yet
- try {
- ssc.start()
- while (!hasFailed && !isLastOutputGenerated && !isTimedOut) {
- Thread.sleep(100)
- }
- logInfo("Has failed = " + hasFailed)
- logInfo("Is last output generated = " + isLastOutputGenerated)
- logInfo("Is timed out = " + isTimedOut)
- } catch {
- case e: Exception => logInfo("Exception while running streams: " + e)
- } finally {
- ssc.stop()
- }
-
- // Verify whether the output of each batch has only one element
- assert(output.forall(_.size <= 1), "output of each batch should have only one element")
-
- // Set appropriate flags is timed out or output has been generated
- if (isTimedOut) FailureSuite.timedOut = true
- if (isLastOutputGenerated) FailureSuite.outputGenerated = true
-
- val timeTaken = System.currentTimeMillis() - startTime
- logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms")
- (output.flatMap(_.headOption), timeTaken)
- }
-
- /**
- * Verifies the output value are the same as expected. Since failures can lead to
- * a batch being processed twice, a batches output may appear more than once
- * consecutively. To avoid getting confused with those, we eliminate consecutive
- * duplicate batch outputs of values from the `output`. As a result, the
- * expected output should not have consecutive batches with the same values as output.
- */
- def verifyOutput(output: Seq[(String, Int)], expectedOutput: Seq[(String, Int)]) {
- // Verify whether expected outputs do not consecutive batches with same output
- for (i <- 0 until expectedOutput.size - 1) {
- assert(expectedOutput(i) != expectedOutput(i+1),
- "Expected output has consecutive duplicate sequence of values")
- }
-
- // Match the output with the expected output
- logInfo(
- "\n-------------------------------------------\n" +
- " Verifying output " +
- "\n-------------------------------------------\n"
- )
- logInfo("Expected output, size = " + expectedOutput.size)
- logInfo(expectedOutput.mkString("[", ",", "]"))
- logInfo("Output, size = " + output.size)
- logInfo(output.mkString("[", ",", "]"))
- output.foreach(o =>
- assert(expectedOutput.contains(o), "Expected value " + o + " not found")
- )
- }
-}
-
-object FailureSuite {
- var failed = false
- var outputGenerated = false
- var timedOut = false
- var failureCount = 0
-
- def reset() {
- failed = false
- outputGenerated = false
- timedOut = false
- failureCount = 0
- }
-}
-
-/**
- * Thread to kill streaming context after some time.
- */
-class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
- initLogging()
-
- override def run() {
- try {
- var minKillWaitTime = if (FailureSuite.failureCount == 0) 5000 else 1000 // to allow the first checkpoint
- val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime)
- logInfo("Kill wait time = " + killWaitTime)
- Thread.sleep(killWaitTime)
- logInfo(
- "\n---------------------------------------\n" +
- "Killing streaming context after " + killWaitTime + " ms" +
- "\n---------------------------------------\n"
- )
- if (ssc != null) {
- ssc.stop()
- FailureSuite.failed = true
- FailureSuite.failureCount += 1
- }
- logInfo("Killing thread exited")
- } catch {
- case ie: InterruptedException => logInfo("Killing thread interrupted")
- case e: Exception => logWarning("Exception in killing thread", e)
- }
- }
-}
-
-/**
- * Thread to generate input files periodically with the desired text
- */
-class FileGeneratingThread(input: Seq[String], testDir: String, interval: Long)
- extends Thread with Logging {
- initLogging()
-
- override def run() {
- try {
- Thread.sleep(5000) // To make sure that all the streaming context has been set up
- for (i <- 0 until input.size) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
- Thread.sleep(interval)
- }
- logInfo("File generating thread exited")
- } catch {
- case ie: InterruptedException => logInfo("File generating thread interrupted")
- case e: Exception => logWarning("File generating in killing thread", e)
- }
+ test("multiple failures with updateStateByKey") {
+ MasterFailureTest.testUpdateStateByKey(directory, numBatches, batchDuration)
}
}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 0eb9c7b81e..7c1c2e1040 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -133,26 +133,29 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("file input stream") {
+ // Disable manual clock as FileInputDStream does not work with manual clock
+ System.clearProperty("spark.streaming.clock")
+
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
val ssc = new StreamingContext(master, framework, batchDuration)
- val filestream = ssc.textFileStream(testDir.toString)
+ val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
- val outputStream = new TestOutputStream(filestream, outputBuffer)
+ val outputStream = new TestOutputStream(fileStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
// Create files in the temporary directory so that Spark Streaming can read data from it
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
Thread.sleep(1000)
for (i <- 0 until input.size) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- //Thread.sleep(100)
+ val file = new File(testDir, i.toString)
+ FileUtils.writeStringToFile(file, input(i).toString + "\n")
+ logInfo("Created file " + file)
+ Thread.sleep(batchDuration.milliseconds)
+ Thread.sleep(1000)
}
val startTime = System.currentTimeMillis()
Thread.sleep(1000)
@@ -171,16 +174,16 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
- assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
- assert(output(i).size === 1)
- assert(output(i).head.toString === expectedOutput(i))
- }
+ assert(output.toList === expectedOutput.toList)
+
FileUtils.deleteDirectory(testDir)
+
+ // Enable manual clock back again for other tests
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
}
}
-
+/** This is server to test the network input stream */
class TestServer(port: Int) extends Logging {
val queue = new ArrayBlockingQueue[String](100)
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index c2733831b2..2cc31d6137 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -63,20 +63,28 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
*/
trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
+ // Name of the framework for Spark context
def framework = "TestSuiteBase"
+ // Master for Spark context
def master = "local[2]"
+ // Batch duration
def batchDuration = Seconds(1)
+ // Directory where the checkpoint data will be saved
def checkpointDir = "checkpoint"
+ // Duration after which the graph is checkpointed
def checkpointInterval = batchDuration
+ // Number of partitions of the input parallel collections created for testing
def numInputPartitions = 2
+ // Maximum time to wait before the test times out
def maxWaitTimeMillis = 10000
+ // Whether to actually wait in real time before changing manual clock
def actuallyWait = false
/**
@@ -140,9 +148,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
numBatches: Int,
numExpectedOutput: Int
): Seq[Seq[V]] = {
-
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
-
assert(numBatches > 0, "Number of batches to run stream computation is zero")
assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)
@@ -186,7 +191,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
} finally {
ssc.stop()
}
-
output
}
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index cd9608df53..1080790147 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -5,6 +5,8 @@ import collection.mutable.ArrayBuffer
class WindowOperationsSuite extends TestSuiteBase {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
override def framework = "WindowOperationsSuite"
override def maxWaitTimeMillis = 20000