diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-13 12:17:45 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-13 12:17:45 -0800 |
commit | 39addd380363c0371e935fae50983fe87158c1ac (patch) | |
tree | 0b428024e5ed5faac707ccb5b5573b0a2400d3b2 /streaming/src/test/java | |
parent | fd90daf850a922fe33c3638b18304d827953e2cb (diff) | |
download | spark-39addd380363c0371e935fae50983fe87158c1ac.tar.gz spark-39addd380363c0371e935fae50983fe87158c1ac.tar.bz2 spark-39addd380363c0371e935fae50983fe87158c1ac.zip |
Changed scheduler and file input stream to fix bugs in the driver fault tolerance. Added MasterFailureTest to rigorously test master fault tolerance with file input stream.
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaAPISuite.java | 21 |
1 files changed, 11 insertions, 10 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index fbe4af4597..783a393a8f 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -33,7 +33,8 @@ public class JavaAPISuite implements Serializable { @Before public void setUp() { - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); ssc.checkpoint("checkpoint", new Duration(1000)); } @@ -45,7 +46,7 @@ public class JavaAPISuite implements Serializable { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port"); } - /* + @Test public void testCount() { List<List<Integer>> inputData = Arrays.asList( @@ -434,7 +435,7 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expected, result); } - */ + /* * Performs an order-invariant comparison of lists representing two RDD streams. This allows * us to account for ordering variation within individual RDD's which occurs during windowing. @@ -450,7 +451,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, actual); } - /* + // PairDStream Functions @Test public void testPairFilter() { @@ -897,7 +898,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } - */ + @Test public void testCheckpointMasterRecovery() throws InterruptedException { List<List<String>> inputData = Arrays.asList( @@ -964,7 +965,7 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expected, result1); } */ - /* + // Input stream tests. These mostly just test that we can instantiate a given InputStream with // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the // InputStream functionality is deferred to the existing Scala tests. @@ -972,9 +973,9 @@ public class JavaAPISuite implements Serializable { public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap(); - JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets); - JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets, + JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); + JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets); + JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets, StorageLevel.MEMORY_AND_DISK()); } @@ -1026,5 +1027,5 @@ public class JavaAPISuite implements Serializable { public void testFileStream() { JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); - }*/ + } } |