aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-18 13:26:12 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-18 13:26:12 -0800
commit6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1 (patch)
tree3848e9e09a2c8b7537f4a0635ea0a32daee1f9a8 /streaming/src/test/java
parent56b9bd197c522f33e354c2e9ad7e76440cf817e9 (diff)
parent8ad561dc7d6475d7b217ec3f57bac3b584fed31a (diff)
downloadspark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.gz
spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.bz2
spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.zip
Merge branch 'streaming' into ScrapCode-streaming
Conflicts: streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java (renamed from streaming/src/test/java/JavaAPISuite.java)155
-rw-r--r--streaming/src/test/java/spark/streaming/JavaTestUtils.scala (renamed from streaming/src/test/java/JavaTestUtils.scala)1
2 files changed, 82 insertions, 74 deletions
diff --git a/streaming/src/test/java/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 8c94e13e65..16bacffb92 100644
--- a/streaming/src/test/java/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -23,6 +23,7 @@ import spark.streaming.JavaCheckpointTestUtils;
import spark.streaming.dstream.KafkaPartitionKey;
import java.io.*;
+import java.text.Collator;
import java.util.*;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -33,15 +34,18 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ ssc.checkpoint("checkpoint");
}
@After
public void tearDown() {
ssc.stop();
ssc = null;
+
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port");
+ System.clearProperty("spark.driver.port");
}
@Test
@@ -132,29 +136,6 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testTumble() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1,2,3),
- Arrays.asList(4,5,6),
- Arrays.asList(7,8,9),
- Arrays.asList(10,11,12),
- Arrays.asList(13,14,15),
- Arrays.asList(16,17,18));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(1,2,3,4,5,6),
- Arrays.asList(7,8,9,10,11,12),
- Arrays.asList(13,14,15,16,17,18));
-
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream windowed = stream.tumble(new Duration(2000));
- JavaTestUtils.attachTestOutputStream(windowed);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 6, 3);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @Test
public void testFilter() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
@@ -581,50 +562,73 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testCountByKey() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+ public void testCountByValue() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("hello", "world"),
+ Arrays.asList("hello", "moon"),
+ Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)),
- Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)));
-
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("world", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("moon", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L)));
- JavaPairDStream<String, Long> counted = pairStream.countByKey();
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Long> counted = stream.countByValue();
JavaTestUtils.attachTestOutputStream(counted);
- List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@Test
public void testGroupByKeyAndWindow() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
- List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
- Arrays.asList(new Tuple2<String, List<String>>("california",
- Arrays.asList("sharks", "ducks", "dodgers", "giants")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))),
- Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+ List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4))
+ ),
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4))
+ ),
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3))
+ )
+ );
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, List<String>> groupWindowed =
+ JavaPairDStream<String, List<Integer>> groupWindowed =
pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(groupWindowed);
- List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+ List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
- Assert.assertEquals(expected, result);
+ assert(result.size() == expected.size());
+ for (int i = 0; i < result.size(); i++) {
+ assert(convert(result.get(i)).equals(convert(expected.get(i))));
+ }
+ }
+
+ private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
+ List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>();
+ for (Tuple2<String, List<Integer>> tuple: listOfTuples) {
+ newListOfTuples.add(convert(tuple));
+ }
+ return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples);
+ }
+
+ private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
+ return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2()));
}
@Test
@@ -709,26 +713,28 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testCountByKeyAndWindow() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+ public void testCountByValueAndWindow() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("hello", "world"),
+ Arrays.asList("hello", "moon"),
+ Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)),
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("world", 1L)),
Arrays.asList(
- new Tuple2<String, Long>("california", 4L),
- new Tuple2<String, Long>("new york", 4L)),
+ new Tuple2<String, Long>("hello", 2L),
+ new Tuple2<String, Long>("world", 1L),
+ new Tuple2<String, Long>("moon", 1L)),
Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)));
+ new Tuple2<String, Long>("hello", 2L),
+ new Tuple2<String, Long>("moon", 1L)));
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
JavaPairDStream<String, Long> counted =
- pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000));
+ stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -909,9 +915,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(1,4),
Arrays.asList(8,7));
-
File tempDir = Files.createTempDir();
- ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000));
+ ssc.checkpoint(tempDir.getAbsolutePath());
JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@@ -925,14 +930,16 @@ public class JavaAPISuite implements Serializable {
assertOrderInvariantEquals(expectedInitial, initialResult);
Thread.sleep(1000);
-
ssc.stop();
+
ssc = new JavaStreamingContext(tempDir.getAbsolutePath());
- ssc.start();
- List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2);
- assertOrderInvariantEquals(expectedFinal, finalResult);
+ // Tweak to take into consideration that the last batch before failure
+ // will be re-processed after recovery
+ List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
+ assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
}
+
/** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
@Test
public void testCheckpointofIndividualStream() throws InterruptedException {
@@ -969,9 +976,9 @@ public class JavaAPISuite implements Serializable {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
- JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets);
- JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets,
+ JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
+ JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets);
+ JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets,
StorageLevel.MEMORY_AND_DISK());
}
diff --git a/streaming/src/test/java/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
index 56349837e5..52ea28732a 100644
--- a/streaming/src/test/java/JavaTestUtils.scala
+++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
@@ -57,6 +57,7 @@ trait JavaTestBase extends TestSuiteBase {
}
object JavaTestUtils extends JavaTestBase {
+ override def maxWaitTimeMillis = 20000
}