From f98c7da23ef66812b8b4888230ee98c07f09af23 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 17 Feb 2013 15:06:41 -0800 Subject: Many changes to ensure better 2nd recovery if 2nd failure happens while recovering from 1st failure - Made the scheduler to checkpoint after clearing old metadata which ensures that a new checkpoint is written as soon as at least one batch gets computed while recovering from a failure. This ensures that if there is a 2nd failure while recovering from 1st failure, the system start 2nd recovery from a newer checkpoint. - Modified Checkpoint writer to write checkpoint in a different thread. - Added a check to make sure that compute for InputDStreams gets called only for strictly increasing times. - Changed implementation of slice to call getOrCompute on parent DStream in time-increasing order. - Added testcase to test slice. - Fixed testGroupByKeyAndWindow testcase in JavaAPISuite to verify results with expected output in an order-independent manner. --- .../test/java/spark/streaming/JavaAPISuite.java | 54 +++++++++++++++------- .../test/java/spark/streaming/JavaTestUtils.scala | 1 + 2 files changed, 39 insertions(+), 16 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 7bea0b1fc4..16bacffb92 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -23,6 +23,7 @@ import spark.streaming.JavaCheckpointTestUtils; import spark.streaming.dstream.KafkaPartitionKey; import java.io.*; +import java.text.Collator; import java.util.*; // The test suite itself is Serializable so that anonymous Function implementations can be @@ -35,7 +36,7 @@ public class JavaAPISuite implements Serializable { public void setUp() { System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock"); ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); - ssc.checkpoint("checkpoint", new Duration(1000)); + ssc.checkpoint("checkpoint"); } @After @@ -587,26 +588,47 @@ public class JavaAPISuite implements Serializable { @Test public void testGroupByKeyAndWindow() { - List>> inputData = stringStringKVStream; + List>> inputData = stringIntKVStream; - List>>> expected = Arrays.asList( - Arrays.asList(new Tuple2>("california", Arrays.asList("dodgers", "giants")), - new Tuple2>("new york", Arrays.asList("yankees", "mets"))), - Arrays.asList(new Tuple2>("california", - Arrays.asList("sharks", "ducks", "dodgers", "giants")), - new Tuple2>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))), - Arrays.asList(new Tuple2>("california", Arrays.asList("sharks", "ducks")), - new Tuple2>("new york", Arrays.asList("rangers", "islanders")))); + List>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2>("california", Arrays.asList(1, 3)), + new Tuple2>("new york", Arrays.asList(1, 4)) + ), + Arrays.asList( + new Tuple2>("california", Arrays.asList(1, 3, 5, 5)), + new Tuple2>("new york", Arrays.asList(1, 1, 3, 4)) + ), + Arrays.asList( + new Tuple2>("california", Arrays.asList(5, 5)), + new Tuple2>("new york", Arrays.asList(1, 3)) + ) + ); - JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream> groupWindowed = + JavaPairDStream> groupWindowed = pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(groupWindowed); - List>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + List>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - Assert.assertEquals(expected, result); + assert(result.size() == expected.size()); + for (int i = 0; i < result.size(); i++) { + assert(convert(result.get(i)).equals(convert(expected.get(i)))); + } + } + + private HashSet>> convert(List>> listOfTuples) { + List>> newListOfTuples = new ArrayList>>(); + for (Tuple2> tuple: listOfTuples) { + newListOfTuples.add(convert(tuple)); + } + return new HashSet>>(newListOfTuples); + } + + private Tuple2> convert(Tuple2> tuple) { + return new Tuple2>(tuple._1(), new HashSet(tuple._2())); } @Test @@ -894,7 +916,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList(8,7)); File tempDir = Files.createTempDir(); - ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); + ssc.checkpoint(tempDir.getAbsolutePath()); JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream letterCount = stream.map(new Function() { diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala index 56349837e5..52ea28732a 100644 --- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala @@ -57,6 +57,7 @@ trait JavaTestBase extends TestSuiteBase { } object JavaTestUtils extends JavaTestBase { + override def maxWaitTimeMillis = 20000 } -- cgit v1.2.3