aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-17 15:06:41 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-17 15:06:41 -0800
commitf98c7da23ef66812b8b4888230ee98c07f09af23 (patch)
tree28aa7c6757dcdfe0ee72e95f93634edd77c89265 /streaming/src/test
parentddcb976b0d7ce4a76168da33c0e947a5a6b5a255 (diff)
downloadspark-f98c7da23ef66812b8b4888230ee98c07f09af23.tar.gz
spark-f98c7da23ef66812b8b4888230ee98c07f09af23.tar.bz2
spark-f98c7da23ef66812b8b4888230ee98c07f09af23.zip
Many changes to ensure better 2nd recovery if 2nd failure happens while
recovering from 1st failure - Made the scheduler to checkpoint after clearing old metadata which ensures that a new checkpoint is written as soon as at least one batch gets computed while recovering from a failure. This ensures that if there is a 2nd failure while recovering from 1st failure, the system start 2nd recovery from a newer checkpoint. - Modified Checkpoint writer to write checkpoint in a different thread. - Added a check to make sure that compute for InputDStreams gets called only for strictly increasing times. - Changed implementation of slice to call getOrCompute on parent DStream in time-increasing order. - Added testcase to test slice. - Fixed testGroupByKeyAndWindow testcase in JavaAPISuite to verify results with expected output in an order-independent manner.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java54
-rw-r--r--streaming/src/test/java/spark/streaming/JavaTestUtils.scala1
-rw-r--r--streaming/src/test/resources/log4j.properties4
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala20
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala5
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala7
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala5
7 files changed, 67 insertions, 29 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 7bea0b1fc4..16bacffb92 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -23,6 +23,7 @@ import spark.streaming.JavaCheckpointTestUtils;
import spark.streaming.dstream.KafkaPartitionKey;
import java.io.*;
+import java.text.Collator;
import java.util.*;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -35,7 +36,7 @@ public class JavaAPISuite implements Serializable {
public void setUp() {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock");
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
- ssc.checkpoint("checkpoint", new Duration(1000));
+ ssc.checkpoint("checkpoint");
}
@After
@@ -587,26 +588,47 @@ public class JavaAPISuite implements Serializable {
@Test
public void testGroupByKeyAndWindow() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
- List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
- Arrays.asList(new Tuple2<String, List<String>>("california",
- Arrays.asList("sharks", "ducks", "dodgers", "giants")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))),
- Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+ List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4))
+ ),
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4))
+ ),
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3))
+ )
+ );
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, List<String>> groupWindowed =
+ JavaPairDStream<String, List<Integer>> groupWindowed =
pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(groupWindowed);
- List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+ List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
- Assert.assertEquals(expected, result);
+ assert(result.size() == expected.size());
+ for (int i = 0; i < result.size(); i++) {
+ assert(convert(result.get(i)).equals(convert(expected.get(i))));
+ }
+ }
+
+ private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
+ List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>();
+ for (Tuple2<String, List<Integer>> tuple: listOfTuples) {
+ newListOfTuples.add(convert(tuple));
+ }
+ return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples);
+ }
+
+ private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
+ return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2()));
}
@Test
@@ -894,7 +916,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(8,7));
File tempDir = Files.createTempDir();
- ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000));
+ ssc.checkpoint(tempDir.getAbsolutePath());
JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
index 56349837e5..52ea28732a 100644
--- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
@@ -57,6 +57,7 @@ trait JavaTestBase extends TestSuiteBase {
}
object JavaTestUtils extends JavaTestBase {
+ override def maxWaitTimeMillis = 20000
}
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index f0638e0e02..59c445e63f 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,5 +1,5 @@
# Set everything to be logged to the file streaming/target/unit-tests.log
-log4j.rootCategory=WARN, file
+log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
@@ -9,6 +9,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}:
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
-log4j.logger.spark.streaming=INFO
-log4j.logger.spark.streaming.dstream.FileInputDStream=DEBUG
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index 1e86cf49bb..8fce91853c 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -229,6 +229,26 @@ class BasicOperationsSuite extends TestSuiteBase {
testOperation(inputData, updateStateOperation, outputData, true)
}
+ test("slice") {
+ val ssc = new StreamingContext("local[2]", "BasicOperationSuite", Seconds(1))
+ val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
+ val stream = new TestInputStream[Int](ssc, input, 2)
+ ssc.registerInputStream(stream)
+ stream.foreach(_ => {}) // Dummy output stream
+ ssc.start()
+ Thread.sleep(2000)
+ def getInputFromSlice(fromMillis: Long, toMillis: Long) = {
+ stream.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet
+ }
+
+ assert(getInputFromSlice(0, 1000) == Set(1))
+ assert(getInputFromSlice(0, 2000) == Set(1, 2))
+ assert(getInputFromSlice(1000, 2000) == Set(1, 2))
+ assert(getInputFromSlice(2000, 4000) == Set(2, 3, 4))
+ ssc.stop()
+ Thread.sleep(1000)
+ }
+
test("forgetting of RDDs - map and window operations") {
assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second")
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index c89c4a8d43..5250667bcb 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -39,14 +39,11 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
override def batchDuration = Milliseconds(500)
- override def checkpointInterval = batchDuration
-
override def actuallyWait = true
test("basic rdd checkpoints + dstream graph checkpoint recovery") {
assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
- assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration")
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
@@ -188,7 +185,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
var ssc = new StreamingContext(master, framework, Seconds(1))
- ssc.checkpoint(checkpointDir, checkpointInterval)
+ ssc.checkpoint(checkpointDir)
val fileStream = ssc.textFileStream(testDir.toString)
// Making value 3 take large time to process, to ensure that the master
// shuts down in the middle of processing the 3rd batch
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index 2cc31d6137..ad6aa79d10 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -75,9 +75,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Directory where the checkpoint data will be saved
def checkpointDir = "checkpoint"
- // Duration after which the graph is checkpointed
- def checkpointInterval = batchDuration
-
// Number of partitions of the input parallel collections created for testing
def numInputPartitions = 2
@@ -99,7 +96,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Create StreamingContext
val ssc = new StreamingContext(master, framework, batchDuration)
if (checkpointDir != null) {
- ssc.checkpoint(checkpointDir, checkpointInterval)
+ ssc.checkpoint(checkpointDir)
}
// Setup the stream computation
@@ -124,7 +121,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Create StreamingContext
val ssc = new StreamingContext(master, framework, batchDuration)
if (checkpointDir != null) {
- ssc.checkpoint(checkpointDir, checkpointInterval)
+ ssc.checkpoint(checkpointDir)
}
// Setup the stream computation
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index f8380af331..1b66f3bda2 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -273,6 +273,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideDuration: Duration = Seconds(1)
) {
test("reduceByKeyAndWindow - " + name) {
+ logInfo("reduceByKeyAndWindow - " + name)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowDuration, slideDuration)
@@ -288,7 +289,8 @@ class WindowOperationsSuite extends TestSuiteBase {
windowDuration: Duration = Seconds(2),
slideDuration: Duration = Seconds(1)
) {
- test("ReduceByKeyAndWindow with inverse function - " + name) {
+ test("reduceByKeyAndWindow with inverse function - " + name) {
+ logInfo("reduceByKeyAndWindow with inverse function - " + name)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration)
@@ -306,6 +308,7 @@ class WindowOperationsSuite extends TestSuiteBase {
slideDuration: Duration = Seconds(1)
) {
test("reduceByKeyAndWindow with inverse and filter functions - " + name) {
+ logInfo("reduceByKeyAndWindow with inverse and filter functions - " + name)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val filterFunc = (p: (String, Int)) => p._2 != 0
val operation = (s: DStream[(String, Int)]) => {