aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
authorSoumitra Kumar <kumar.soumitra@gmail.com>2014-11-12 12:25:31 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-11-12 12:25:31 -0800
commit36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a (patch)
tree08841ec5510053f2788c3f4597ffef25767455fe /streaming/src/test/java
parent4b736dbab3e177e5265439d37063bb501657d830 (diff)
downloadspark-36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a.tar.gz
spark-36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a.tar.bz2
spark-36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a.zip
[SPARK-3660][STREAMING] Initial RDD for updateStateByKey transformation
SPARK-3660 : Initial RDD for updateStateByKey transformation I have added a sample StatefulNetworkWordCountWithInitial inspired by StatefulNetworkWordCount. Please let me know if any changes are required. Author: Soumitra Kumar <kumar.soumitra@gmail.com> Closes #2665 from soumitrak/master and squashes the following commits: ee8980b [Soumitra Kumar] Fixed copy/paste issue. 304f636 [Soumitra Kumar] Added simpler version of updateStateByKey API with initialRDD and test. 9781135 [Soumitra Kumar] Fixed test, and renamed variable. 3da51a2 [Soumitra Kumar] Adding updateStateByKey with initialRDD API to JavaPairDStream. 2f78f7e [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' d4fdd18 [Soumitra Kumar] Renamed variable and moved method. d0ce2cd [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' 31399a4 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' 4efa58b [Soumitra Kumar] [SPARK-3660][STREAMING] Initial RDD for updateStateByKey transformation 8f40ca0 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' dde4271 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' fdd7db3 [Soumitra Kumar] Adding support of initial value for state update. SPARK-3660 : Initial RDD for updateStateByKey transformation
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java53
1 files changed, 49 insertions, 4 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 4efeb8dfbe..ce645fccba 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -806,15 +806,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
* Performs an order-invariant comparison of lists representing two RDD streams. This allows
* us to account for ordering variation within individual RDD's which occurs during windowing.
*/
- public static <T extends Comparable<T>> void assertOrderInvariantEquals(
+ public static <T> void assertOrderInvariantEquals(
List<List<T>> expected, List<List<T>> actual) {
+ List<Set<T>> expectedSets = new ArrayList<Set<T>>();
for (List<T> list: expected) {
- Collections.sort(list);
+ expectedSets.add(Collections.unmodifiableSet(new HashSet<T>(list)));
}
+ List<Set<T>> actualSets = new ArrayList<Set<T>>();
for (List<T> list: actual) {
- Collections.sort(list);
+ actualSets.add(Collections.unmodifiableSet(new HashSet<T>(list)));
}
- Assert.assertEquals(expected, actual);
+ Assert.assertEquals(expectedSets, actualSets);
}
@@ -1241,6 +1243,49 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@SuppressWarnings("unchecked")
@Test
+ public void testUpdateStateByKeyWithInitial() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<Tuple2<String, Integer>> initial = Arrays.asList (
+ new Tuple2<String, Integer> ("california", 1),
+ new Tuple2<String, Integer> ("new york", 2));
+
+ JavaRDD<Tuple2<String, Integer>> tmpRDD = ssc.sparkContext().parallelize(initial);
+ JavaPairRDD<String, Integer> initialRDD = JavaPairRDD.fromJavaRDD (tmpRDD);
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 5),
+ new Tuple2<String, Integer>("new york", 7)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 15),
+ new Tuple2<String, Integer>("new york", 11)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 15),
+ new Tuple2<String, Integer>("new york", 11)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
+ new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
+ @Override
+ public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v: values) {
+ out = out + v;
+ }
+ return Optional.of(out);
+ }
+ }, new HashPartitioner(1), initialRDD);
+ JavaTestUtils.attachTestOutputStream(updated);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
public void testReduceByKeyAndWindowWithInverse() {
List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;