diff options
author | Soumitra Kumar <kumar.soumitra@gmail.com> | 2014-11-12 12:25:31 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-11-12 12:25:31 -0800 |
commit | 36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a (patch) | |
tree | 08841ec5510053f2788c3f4597ffef25767455fe /streaming/src/test/java | |
parent | 4b736dbab3e177e5265439d37063bb501657d830 (diff) | |
download | spark-36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a.tar.gz spark-36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a.tar.bz2 spark-36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a.zip |
[SPARK-3660][STREAMING] Initial RDD for updateStateByKey transformation
SPARK-3660 : Initial RDD for updateStateByKey transformation
I have added a sample StatefulNetworkWordCountWithInitial inspired by StatefulNetworkWordCount.
Please let me know if any changes are required.
Author: Soumitra Kumar <kumar.soumitra@gmail.com>
Closes #2665 from soumitrak/master and squashes the following commits:
ee8980b [Soumitra Kumar] Fixed copy/paste issue.
304f636 [Soumitra Kumar] Added simpler version of updateStateByKey API with initialRDD and test.
9781135 [Soumitra Kumar] Fixed test, and renamed variable.
3da51a2 [Soumitra Kumar] Adding updateStateByKey with initialRDD API to JavaPairDStream.
2f78f7e [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
d4fdd18 [Soumitra Kumar] Renamed variable and moved method.
d0ce2cd [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
31399a4 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
4efa58b [Soumitra Kumar] [SPARK-3660][STREAMING] Initial RDD for updateStateByKey transformation
8f40ca0 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
dde4271 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
fdd7db3 [Soumitra Kumar] Adding support of initial value for state update. SPARK-3660 : Initial RDD for updateStateByKey transformation
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 53 |
1 files changed, 49 insertions, 4 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 4efeb8dfbe..ce645fccba 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -806,15 +806,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa * Performs an order-invariant comparison of lists representing two RDD streams. This allows * us to account for ordering variation within individual RDD's which occurs during windowing. */ - public static <T extends Comparable<T>> void assertOrderInvariantEquals( + public static <T> void assertOrderInvariantEquals( List<List<T>> expected, List<List<T>> actual) { + List<Set<T>> expectedSets = new ArrayList<Set<T>>(); for (List<T> list: expected) { - Collections.sort(list); + expectedSets.add(Collections.unmodifiableSet(new HashSet<T>(list))); } + List<Set<T>> actualSets = new ArrayList<Set<T>>(); for (List<T> list: actual) { - Collections.sort(list); + actualSets.add(Collections.unmodifiableSet(new HashSet<T>(list))); } - Assert.assertEquals(expected, actual); + Assert.assertEquals(expectedSets, actualSets); } @@ -1241,6 +1243,49 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @SuppressWarnings("unchecked") @Test + public void testUpdateStateByKeyWithInitial() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<Tuple2<String, Integer>> initial = Arrays.asList ( + new Tuple2<String, Integer> ("california", 1), + new Tuple2<String, Integer> ("new york", 2)); + + JavaRDD<Tuple2<String, Integer>> tmpRDD = ssc.sparkContext().parallelize(initial); + JavaPairRDD<String, Integer> initialRDD = JavaPairRDD.fromJavaRDD (tmpRDD); + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, Integer>("california", 5), + new Tuple2<String, Integer>("new york", 7)), + Arrays.asList(new Tuple2<String, Integer>("california", 15), + new Tuple2<String, Integer>("new york", 11)), + Arrays.asList(new Tuple2<String, Integer>("california", 15), + new Tuple2<String, Integer>("new york", 11))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( + new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { + @Override + public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { + int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } + for (Integer v: values) { + out = out + v; + } + return Optional.of(out); + } + }, new HashPartitioner(1), initialRDD); + JavaTestUtils.attachTestOutputStream(updated); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + @SuppressWarnings("unchecked") + @Test public void testReduceByKeyAndWindowWithInverse() { List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; |