diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-17 15:06:41 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-17 15:06:41 -0800 |
commit | f98c7da23ef66812b8b4888230ee98c07f09af23 (patch) | |
tree | 28aa7c6757dcdfe0ee72e95f93634edd77c89265 /streaming/src/test/java | |
parent | ddcb976b0d7ce4a76168da33c0e947a5a6b5a255 (diff) | |
download | spark-f98c7da23ef66812b8b4888230ee98c07f09af23.tar.gz spark-f98c7da23ef66812b8b4888230ee98c07f09af23.tar.bz2 spark-f98c7da23ef66812b8b4888230ee98c07f09af23.zip |
Many changes to ensure better 2nd recovery if 2nd failure happens while
recovering from 1st failure
- Made the scheduler to checkpoint after clearing old metadata which
ensures that a new checkpoint is written as soon as at least one batch
gets computed while recovering from a failure. This ensures that if
there is a 2nd failure while recovering from 1st failure, the system
start 2nd recovery from a newer checkpoint.
- Modified Checkpoint writer to write checkpoint in a different thread.
- Added a check to make sure that compute for InputDStreams gets called
only for strictly increasing times.
- Changed implementation of slice to call getOrCompute on parent DStream
in time-increasing order.
- Added testcase to test slice.
- Fixed testGroupByKeyAndWindow testcase in JavaAPISuite to verify
results with expected output in an order-independent manner.
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaAPISuite.java | 54 | ||||
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaTestUtils.scala | 1 |
2 files changed, 39 insertions, 16 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 7bea0b1fc4..16bacffb92 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -23,6 +23,7 @@ import spark.streaming.JavaCheckpointTestUtils; import spark.streaming.dstream.KafkaPartitionKey; import java.io.*; +import java.text.Collator; import java.util.*; // The test suite itself is Serializable so that anonymous Function implementations can be @@ -35,7 +36,7 @@ public class JavaAPISuite implements Serializable { public void setUp() { System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock"); ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); - ssc.checkpoint("checkpoint", new Duration(1000)); + ssc.checkpoint("checkpoint"); } @After @@ -587,26 +588,47 @@ public class JavaAPISuite implements Serializable { @Test public void testGroupByKeyAndWindow() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - List<List<Tuple2<String, List<String>>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")), - new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))), - Arrays.asList(new Tuple2<String, List<String>>("california", - Arrays.asList("sharks", "ducks", "dodgers", "giants")), - new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))), - Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), - new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); + List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)), + new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4)) + ), + Arrays.asList( + new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)), + new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4)) + ), + Arrays.asList( + new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)), + new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3)) + ) + ); - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, List<String>> groupWindowed = + JavaPairDStream<String, List<Integer>> groupWindowed = pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(groupWindowed); - List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - Assert.assertEquals(expected, result); + assert(result.size() == expected.size()); + for (int i = 0; i < result.size(); i++) { + assert(convert(result.get(i)).equals(convert(expected.get(i)))); + } + } + + private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) { + List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>(); + for (Tuple2<String, List<Integer>> tuple: listOfTuples) { + newListOfTuples.add(convert(tuple)); + } + return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples); + } + + private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) { + return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2())); } @Test @@ -894,7 +916,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList(8,7)); File tempDir = Files.createTempDir(); - ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); + ssc.checkpoint(tempDir.getAbsolutePath()); JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream letterCount = stream.map(new Function<String, Integer>() { diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala index 56349837e5..52ea28732a 100644 --- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala @@ -57,6 +57,7 @@ trait JavaTestBase extends TestSuiteBase { } object JavaTestUtils extends JavaTestBase { + override def maxWaitTimeMillis = 20000 } |