diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-18 13:26:12 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-18 13:26:12 -0800 |
commit | 6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1 (patch) | |
tree | 3848e9e09a2c8b7537f4a0635ea0a32daee1f9a8 /streaming/src/test/java | |
parent | 56b9bd197c522f33e354c2e9ad7e76440cf817e9 (diff) | |
parent | 8ad561dc7d6475d7b217ec3f57bac3b584fed31a (diff) | |
download | spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.gz spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.bz2 spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.zip |
Merge branch 'streaming' into ScrapCode-streaming
Conflicts:
streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaAPISuite.java (renamed from streaming/src/test/java/JavaAPISuite.java) | 155 | ||||
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaTestUtils.scala (renamed from streaming/src/test/java/JavaTestUtils.scala) | 1 |
2 files changed, 82 insertions, 74 deletions
diff --git a/streaming/src/test/java/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 8c94e13e65..16bacffb92 100644 --- a/streaming/src/test/java/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -23,6 +23,7 @@ import spark.streaming.JavaCheckpointTestUtils; import spark.streaming.dstream.KafkaPartitionKey; import java.io.*; +import java.text.Collator; import java.util.*; // The test suite itself is Serializable so that anonymous Function implementations can be @@ -33,15 +34,18 @@ public class JavaAPISuite implements Serializable { @Before public void setUp() { - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint"); } @After public void tearDown() { ssc.stop(); ssc = null; + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port"); + System.clearProperty("spark.driver.port"); } @Test @@ -132,29 +136,6 @@ public class JavaAPISuite implements Serializable { } @Test - public void testTumble() { - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1,2,3), - Arrays.asList(4,5,6), - Arrays.asList(7,8,9), - Arrays.asList(10,11,12), - Arrays.asList(13,14,15), - Arrays.asList(16,17,18)); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(1,2,3,4,5,6), - Arrays.asList(7,8,9,10,11,12), - Arrays.asList(13,14,15,16,17,18)); - - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.tumble(new Duration(2000)); - JavaTestUtils.attachTestOutputStream(windowed); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 6, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test public void testFilter() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants", "dodgers"), @@ -581,50 +562,73 @@ public class JavaAPISuite implements Serializable { } @Test - public void testCountByKey() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + public void testCountByValue() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("hello", "world"), + Arrays.asList("hello", "moon"), + Arrays.asList("hello")); List<List<Tuple2<String, Long>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<String, Long>("california", 2L), - new Tuple2<String, Long>("new york", 2L)), - Arrays.asList( - new Tuple2<String, Long>("california", 2L), - new Tuple2<String, Long>("new york", 2L))); - - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + Arrays.asList( + new Tuple2<String, Long>("hello", 1L), + new Tuple2<String, Long>("world", 1L)), + Arrays.asList( + new Tuple2<String, Long>("hello", 1L), + new Tuple2<String, Long>("moon", 1L)), + Arrays.asList( + new Tuple2<String, Long>("hello", 1L))); - JavaPairDStream<String, Long> counted = pairStream.countByKey(); + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Long> counted = stream.countByValue(); JavaTestUtils.attachTestOutputStream(counted); - List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3); Assert.assertEquals(expected, result); } @Test public void testGroupByKeyAndWindow() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - List<List<Tuple2<String, List<String>>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")), - new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))), - Arrays.asList(new Tuple2<String, List<String>>("california", - Arrays.asList("sharks", "ducks", "dodgers", "giants")), - new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))), - Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), - new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); + List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)), + new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4)) + ), + Arrays.asList( + new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)), + new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4)) + ), + Arrays.asList( + new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)), + new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3)) + ) + ); - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, List<String>> groupWindowed = + JavaPairDStream<String, List<Integer>> groupWindowed = pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(groupWindowed); - List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - Assert.assertEquals(expected, result); + assert(result.size() == expected.size()); + for (int i = 0; i < result.size(); i++) { + assert(convert(result.get(i)).equals(convert(expected.get(i)))); + } + } + + private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) { + List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>(); + for (Tuple2<String, List<Integer>> tuple: listOfTuples) { + newListOfTuples.add(convert(tuple)); + } + return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples); + } + + private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) { + return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2())); } @Test @@ -709,26 +713,28 @@ public class JavaAPISuite implements Serializable { } @Test - public void testCountByKeyAndWindow() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + public void testCountByValueAndWindow() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("hello", "world"), + Arrays.asList("hello", "moon"), + Arrays.asList("hello")); List<List<Tuple2<String, Long>>> expected = Arrays.asList( Arrays.asList( - new Tuple2<String, Long>("california", 2L), - new Tuple2<String, Long>("new york", 2L)), + new Tuple2<String, Long>("hello", 1L), + new Tuple2<String, Long>("world", 1L)), Arrays.asList( - new Tuple2<String, Long>("california", 4L), - new Tuple2<String, Long>("new york", 4L)), + new Tuple2<String, Long>("hello", 2L), + new Tuple2<String, Long>("world", 1L), + new Tuple2<String, Long>("moon", 1L)), Arrays.asList( - new Tuple2<String, Long>("california", 2L), - new Tuple2<String, Long>("new york", 2L))); + new Tuple2<String, Long>("hello", 2L), + new Tuple2<String, Long>("moon", 1L))); - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream( ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, Long> counted = - pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000)); + stream.countByValueAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(counted); List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -909,9 +915,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(1,4), Arrays.asList(8,7)); - File tempDir = Files.createTempDir(); - ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); + ssc.checkpoint(tempDir.getAbsolutePath()); JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream letterCount = stream.map(new Function<String, Integer>() { @@ -925,14 +930,16 @@ public class JavaAPISuite implements Serializable { assertOrderInvariantEquals(expectedInitial, initialResult); Thread.sleep(1000); - ssc.stop(); + ssc = new JavaStreamingContext(tempDir.getAbsolutePath()); - ssc.start(); - List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2); - assertOrderInvariantEquals(expectedFinal, finalResult); + // Tweak to take into consideration that the last batch before failure + // will be re-processed after recovery + List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3); + assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3)); } + /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD @Test public void testCheckpointofIndividualStream() throws InterruptedException { @@ -969,9 +976,9 @@ public class JavaAPISuite implements Serializable { public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap(); - JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets); - JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets, + JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); + JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets); + JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets, StorageLevel.MEMORY_AND_DISK()); } diff --git a/streaming/src/test/java/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala index 56349837e5..52ea28732a 100644 --- a/streaming/src/test/java/JavaTestUtils.scala +++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala @@ -57,6 +57,7 @@ trait JavaTestBase extends TestSuiteBase { } object JavaTestUtils extends JavaTestBase { + override def maxWaitTimeMillis = 20000 } |