aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-17 15:06:41 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-17 15:06:41 -0800
commitf98c7da23ef66812b8b4888230ee98c07f09af23 (patch)
tree28aa7c6757dcdfe0ee72e95f93634edd77c89265 /streaming/src/test/java
parentddcb976b0d7ce4a76168da33c0e947a5a6b5a255 (diff)
downloadspark-f98c7da23ef66812b8b4888230ee98c07f09af23.tar.gz
spark-f98c7da23ef66812b8b4888230ee98c07f09af23.tar.bz2
spark-f98c7da23ef66812b8b4888230ee98c07f09af23.zip
Many changes to ensure better 2nd recovery if 2nd failure happens while
recovering from 1st failure - Made the scheduler to checkpoint after clearing old metadata which ensures that a new checkpoint is written as soon as at least one batch gets computed while recovering from a failure. This ensures that if there is a 2nd failure while recovering from 1st failure, the system start 2nd recovery from a newer checkpoint. - Modified Checkpoint writer to write checkpoint in a different thread. - Added a check to make sure that compute for InputDStreams gets called only for strictly increasing times. - Changed implementation of slice to call getOrCompute on parent DStream in time-increasing order. - Added testcase to test slice. - Fixed testGroupByKeyAndWindow testcase in JavaAPISuite to verify results with expected output in an order-independent manner.
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java54
-rw-r--r--streaming/src/test/java/spark/streaming/JavaTestUtils.scala1
2 files changed, 39 insertions, 16 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 7bea0b1fc4..16bacffb92 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -23,6 +23,7 @@ import spark.streaming.JavaCheckpointTestUtils;
import spark.streaming.dstream.KafkaPartitionKey;
import java.io.*;
+import java.text.Collator;
import java.util.*;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -35,7 +36,7 @@ public class JavaAPISuite implements Serializable {
public void setUp() {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock");
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
- ssc.checkpoint("checkpoint", new Duration(1000));
+ ssc.checkpoint("checkpoint");
}
@After
@@ -587,26 +588,47 @@ public class JavaAPISuite implements Serializable {
@Test
public void testGroupByKeyAndWindow() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
- List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
- Arrays.asList(new Tuple2<String, List<String>>("california",
- Arrays.asList("sharks", "ducks", "dodgers", "giants")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))),
- Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+ List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4))
+ ),
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4))
+ ),
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3))
+ )
+ );
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, List<String>> groupWindowed =
+ JavaPairDStream<String, List<Integer>> groupWindowed =
pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(groupWindowed);
- List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+ List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
- Assert.assertEquals(expected, result);
+ assert(result.size() == expected.size());
+ for (int i = 0; i < result.size(); i++) {
+ assert(convert(result.get(i)).equals(convert(expected.get(i))));
+ }
+ }
+
+ private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
+ List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>();
+ for (Tuple2<String, List<Integer>> tuple: listOfTuples) {
+ newListOfTuples.add(convert(tuple));
+ }
+ return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples);
+ }
+
+ private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
+ return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2()));
}
@Test
@@ -894,7 +916,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(8,7));
File tempDir = Files.createTempDir();
- ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000));
+ ssc.checkpoint(tempDir.getAbsolutePath());
JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
index 56349837e5..52ea28732a 100644
--- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
@@ -57,6 +57,7 @@ trait JavaTestBase extends TestSuiteBase {
}
object JavaTestUtils extends JavaTestBase {
+ override def maxWaitTimeMillis = 20000
}