aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/org/apache
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
committerSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
commit0e2405490f2056728d1353abbac6f3ea177ae533 (patch)
tree1a9ec960faec7abcb8d8fbac43b6a6dc633d2297 /streaming/src/test/java/org/apache
parent3871d94a695d47169720e877f77ff1e4bede43ee (diff)
downloadspark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.gz
spark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.bz2
spark-0e2405490f2056728d1353abbac6f3ea177ae533.zip
[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
- Move external/java8-tests tests into core, streaming, sql and remove - Remove MaxPermGen and related options - Fix some reflection / TODOs around Java 8+ methods - Update doc references to 1.7/1.8 differences - Remove Java 7/8 related build profiles - Update some plugins for better Java 8 compatibility - Fix a few Java-related warnings For the future: - Update Java 8 examples to fully use Java 8 - Update Java tests to use lambdas for simplicity - Update Java internal implementations to use lambdas ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes #16871 from srowen/SPARK-19493.
Diffstat (limited to 'streaming/src/test/java/org/apache')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java2000
1 files changed, 0 insertions, 2000 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
deleted file mode 100644
index 648a5abe0b..0000000000
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ /dev/null
@@ -1,2000 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import scala.Tuple2;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.io.Files;
-import com.google.common.collect.Sets;
-
-import org.apache.spark.HashPartitioner;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.Optional;
-import org.apache.spark.api.java.function.*;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.util.LongAccumulator;
-import org.apache.spark.util.Utils;
-
-// The test suite itself is Serializable so that anonymous Function implementations can be
-// serialized, as an alternative to converting these anonymous classes to static inner classes;
-// see http://stackoverflow.com/questions/758570/.
-public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
-
- public static void equalIterator(Iterator<?> a, Iterator<?> b) {
- while (a.hasNext() && b.hasNext()) {
- Assert.assertEquals(a.next(), b.next());
- }
- Assert.assertEquals(a.hasNext(), b.hasNext());
- }
-
- public static void equalIterable(Iterable<?> a, Iterable<?> b) {
- equalIterator(a.iterator(), b.iterator());
- }
-
- @Test
- public void testInitialization() {
- Assert.assertNotNull(ssc.sparkContext());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testContextState() {
- List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1, 2, 3, 4));
- Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaTestUtils.attachTestOutputStream(stream);
- Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState());
- ssc.start();
- Assert.assertEquals(StreamingContextState.ACTIVE, ssc.getState());
- ssc.stop();
- Assert.assertEquals(StreamingContextState.STOPPED, ssc.getState());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testCount() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1,2,3,4),
- Arrays.asList(3,4,5),
- Arrays.asList(3));
-
- List<List<Long>> expected = Arrays.asList(
- Arrays.asList(4L),
- Arrays.asList(3L),
- Arrays.asList(1L));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Long> count = stream.count();
- JavaTestUtils.attachTestOutputStream(count);
- List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3);
- assertOrderInvariantEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testMap() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("hello", "world"),
- Arrays.asList("goodnight", "moon"));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(5,5),
- Arrays.asList(9,4));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
- @Override
- public Integer call(String s) {
- return s.length();
- }
- });
- JavaTestUtils.attachTestOutputStream(letterCount);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testWindow() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1,2,3),
- Arrays.asList(4,5,6),
- Arrays.asList(7,8,9));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(1,2,3),
- Arrays.asList(4,5,6,1,2,3),
- Arrays.asList(7,8,9,4,5,6),
- Arrays.asList(7,8,9));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> windowed = stream.window(new Duration(2000));
- JavaTestUtils.attachTestOutputStream(windowed);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testWindowWithSlideDuration() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1,2,3),
- Arrays.asList(4,5,6),
- Arrays.asList(7,8,9),
- Arrays.asList(10,11,12),
- Arrays.asList(13,14,15),
- Arrays.asList(16,17,18));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(1,2,3,4,5,6),
- Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12),
- Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18),
- Arrays.asList(13,14,15,16,17,18));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> windowed = stream.window(new Duration(4000), new Duration(2000));
- JavaTestUtils.attachTestOutputStream(windowed);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testFilter() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red sox"));
-
- List<List<String>> expected = Arrays.asList(
- Arrays.asList("giants"),
- Arrays.asList("yankees"));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() {
- @Override
- public Boolean call(String s) {
- return s.contains("a");
- }
- });
- JavaTestUtils.attachTestOutputStream(filtered);
- List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testRepartitionMorePartitions() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
- Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
- JavaDStream<Integer> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
- JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned =
- stream.repartition(4);
- JavaTestUtils.attachTestOutputStream(repartitioned);
- List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
- Assert.assertEquals(2, result.size());
- for (List<List<Integer>> rdd : result) {
- Assert.assertEquals(4, rdd.size());
- Assert.assertEquals(
- 10, rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + rdd.get(3).size());
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testRepartitionFewerPartitions() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
- Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
- JavaDStream<Integer> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
- JavaDStreamLike<Integer,JavaDStream<Integer>,JavaRDD<Integer>> repartitioned =
- stream.repartition(2);
- JavaTestUtils.attachTestOutputStream(repartitioned);
- List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
- Assert.assertEquals(2, result.size());
- for (List<List<Integer>> rdd : result) {
- Assert.assertEquals(2, rdd.size());
- Assert.assertEquals(10, rdd.get(0).size() + rdd.get(1).size());
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testGlom() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red sox"));
-
- List<List<List<String>>> expected = Arrays.asList(
- Arrays.asList(Arrays.asList("giants", "dodgers")),
- Arrays.asList(Arrays.asList("yankees", "red sox")));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<List<String>> glommed = stream.glom();
- JavaTestUtils.attachTestOutputStream(glommed);
- List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testMapPartitions() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red sox"));
-
- List<List<String>> expected = Arrays.asList(
- Arrays.asList("GIANTSDODGERS"),
- Arrays.asList("YANKEESRED SOX"));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> mapped = stream.mapPartitions(
- new FlatMapFunction<Iterator<String>, String>() {
- @Override
- public Iterator<String> call(Iterator<String> in) {
- StringBuilder out = new StringBuilder();
- while (in.hasNext()) {
- out.append(in.next().toUpperCase(Locale.ENGLISH));
- }
- return Arrays.asList(out.toString()).iterator();
- }
- });
- JavaTestUtils.attachTestOutputStream(mapped);
- List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- private static class IntegerSum implements Function2<Integer, Integer, Integer> {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- }
-
- private static class IntegerDifference implements Function2<Integer, Integer, Integer> {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 - i2;
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testReduce() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1,2,3),
- Arrays.asList(4,5,6),
- Arrays.asList(7,8,9));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(6),
- Arrays.asList(15),
- Arrays.asList(24));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> reduced = stream.reduce(new IntegerSum());
- JavaTestUtils.attachTestOutputStream(reduced);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testReduceByWindowWithInverse() {
- testReduceByWindow(true);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testReduceByWindowWithoutInverse() {
- testReduceByWindow(false);
- }
-
- @SuppressWarnings("unchecked")
- private void testReduceByWindow(boolean withInverse) {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1,2,3),
- Arrays.asList(4,5,6),
- Arrays.asList(7,8,9));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(6),
- Arrays.asList(21),
- Arrays.asList(39),
- Arrays.asList(24));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> reducedWindowed;
- if (withInverse) {
- reducedWindowed = stream.reduceByWindow(new IntegerSum(),
- new IntegerDifference(),
- new Duration(2000),
- new Duration(1000));
- } else {
- reducedWindowed = stream.reduceByWindow(new IntegerSum(),
- new Duration(2000), new Duration(1000));
- }
- JavaTestUtils.attachTestOutputStream(reducedWindowed);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testQueueStream() {
- ssc.stop();
- // Create a new JavaStreamingContext without checkpointing
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(1,2,3),
- Arrays.asList(4,5,6),
- Arrays.asList(7,8,9));
-
- JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc());
- JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3));
- JavaRDD<Integer> rdd2 = jsc.parallelize(Arrays.asList(4, 5, 6));
- JavaRDD<Integer> rdd3 = jsc.parallelize(Arrays.asList(7,8,9));
-
- Queue<JavaRDD<Integer>> rdds = new LinkedList<>();
- rdds.add(rdd1);
- rdds.add(rdd2);
- rdds.add(rdd3);
-
- JavaDStream<Integer> stream = ssc.queueStream(rdds);
- JavaTestUtils.attachTestOutputStream(stream);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testTransform() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1,2,3),
- Arrays.asList(4,5,6),
- Arrays.asList(7,8,9));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(3,4,5),
- Arrays.asList(6,7,8),
- Arrays.asList(9,10,11));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> transformed = stream.transform(
- new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
- @Override
- public JavaRDD<Integer> call(JavaRDD<Integer> in) {
- return in.map(new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer i) {
- return i + 2;
- }
- });
- }
- });
-
- JavaTestUtils.attachTestOutputStream(transformed);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testVariousTransform() {
- // tests whether all variations of transform can be called from Java
-
- List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
-
- List<List<Tuple2<String, Integer>>> pairInputData =
- Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
- JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
-
- stream.transform(
- new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
- @Override
- public JavaRDD<Integer> call(JavaRDD<Integer> in) {
- return null;
- }
- }
- );
-
- stream.transform(
- new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() {
- @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) {
- return null;
- }
- }
- );
-
- stream.transformToPair(
- new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
- @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) {
- return null;
- }
- }
- );
-
- stream.transformToPair(
- new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
- @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) {
- return null;
- }
- }
- );
-
- pairStream.transform(
- new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() {
- @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) {
- return null;
- }
- }
- );
-
- pairStream.transform(
- new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() {
- @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) {
- return null;
- }
- }
- );
-
- pairStream.transformToPair(
- new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() {
- @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) {
- return null;
- }
- }
- );
-
- pairStream.transformToPair(
- new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() {
- @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in,
- Time time) {
- return null;
- }
- }
- );
-
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testTransformWith() {
- List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", "dodgers"),
- new Tuple2<>("new york", "yankees")),
- Arrays.asList(
- new Tuple2<>("california", "sharks"),
- new Tuple2<>("new york", "rangers")));
-
- List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", "giants"),
- new Tuple2<>("new york", "mets")),
- Arrays.asList(
- new Tuple2<>("california", "ducks"),
- new Tuple2<>("new york", "islanders")));
-
-
- List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
- Sets.newHashSet(
- new Tuple2<>("california",
- new Tuple2<>("dodgers", "giants")),
- new Tuple2<>("new york",
- new Tuple2<>("yankees", "mets"))),
- Sets.newHashSet(
- new Tuple2<>("california",
- new Tuple2<>("sharks", "ducks")),
- new Tuple2<>("new york",
- new Tuple2<>("rangers", "islanders"))));
-
- JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
- ssc, stringStringKVStream1, 1);
- JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
-
- JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
- ssc, stringStringKVStream2, 1);
- JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
-
- JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWithToPair(
- pairStream2,
- new Function3<
- JavaPairRDD<String, String>,
- JavaPairRDD<String, String>,
- Time,
- JavaPairRDD<String, Tuple2<String, String>>>() {
- @Override
- public JavaPairRDD<String, Tuple2<String, String>> call(
- JavaPairRDD<String, String> rdd1,
- JavaPairRDD<String, String> rdd2,
- Time time) {
- return rdd1.join(rdd2);
- }
- }
- );
-
- JavaTestUtils.attachTestOutputStream(joined);
- List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
- List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = new ArrayList<>();
- for (List<Tuple2<String, Tuple2<String, String>>> res: result) {
- unorderedResult.add(Sets.newHashSet(res));
- }
-
- Assert.assertEquals(expected, unorderedResult);
- }
-
-
- @SuppressWarnings("unchecked")
- @Test
- public void testVariousTransformWith() {
- // tests whether all variations of transformWith can be called from Java
-
- List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
- List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
- JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
- JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
-
- List<List<Tuple2<String, Integer>>> pairInputData1 =
- Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
- List<List<Tuple2<Double, Character>>> pairInputData2 =
- Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
- JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
- JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
- JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
- JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
-
- stream1.transformWith(
- stream2,
- new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
- @Override
- public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) {
- return null;
- }
- }
- );
-
- stream1.transformWith(
- pairStream1,
- new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
- @Override
- public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2,
- Time time) {
- return null;
- }
- }
- );
-
- stream1.transformWithToPair(
- stream2,
- new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
- @Override
- public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2,
- Time time) {
- return null;
- }
- }
- );
-
- stream1.transformWithToPair(
- pairStream1,
- new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time,
- JavaPairRDD<Double, Double>>() {
- @Override
- public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1,
- JavaPairRDD<String, Integer> rdd2,
- Time time) {
- return null;
- }
- }
- );
-
- pairStream1.transformWith(
- stream2,
- new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
- @Override
- public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2,
- Time time) {
- return null;
- }
- }
- );
-
- pairStream1.transformWith(
- pairStream1,
- new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time,
- JavaRDD<Double>>() {
- @Override
- public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1,
- JavaPairRDD<String, Integer> rdd2,
- Time time) {
- return null;
- }
- }
- );
-
- pairStream1.transformWithToPair(
- stream2,
- new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time,
- JavaPairRDD<Double, Double>>() {
- @Override
- public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1,
- JavaRDD<String> rdd2,
- Time time) {
- return null;
- }
- }
- );
-
- pairStream1.transformWithToPair(
- pairStream2,
- new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time,
- JavaPairRDD<Double, Double>>() {
- @Override
- public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1,
- JavaPairRDD<Double, Character> rdd2,
- Time time) {
- return null;
- }
- }
- );
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testStreamingContextTransform(){
- List<List<Integer>> stream1input = Arrays.asList(
- Arrays.asList(1),
- Arrays.asList(2)
- );
-
- List<List<Integer>> stream2input = Arrays.asList(
- Arrays.asList(3),
- Arrays.asList(4)
- );
-
- List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
- Arrays.asList(new Tuple2<>(1, "x")),
- Arrays.asList(new Tuple2<>(2, "y"))
- );
-
- List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
- Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
- );
-
- JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
- JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
- JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
- JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
-
- List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
-
- // This is just to test whether this transform to JavaStream compiles
- ssc.transform(
- listOfDStreams1,
- new Function2<List<JavaRDD<?>>, Time, JavaRDD<Long>>() {
- @Override
- public JavaRDD<Long> call(List<JavaRDD<?>> listOfRDDs, Time time) {
- Assert.assertEquals(2, listOfRDDs.size());
- return null;
- }
- }
- );
-
- List<JavaDStream<?>> listOfDStreams2 =
- Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
-
- JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
- listOfDStreams2,
- new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() {
- @Override
- public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs,
- Time time) {
- Assert.assertEquals(3, listOfRDDs.size());
- JavaRDD<Integer> rdd1 = (JavaRDD<Integer>)listOfRDDs.get(0);
- JavaRDD<Integer> rdd2 = (JavaRDD<Integer>)listOfRDDs.get(1);
- JavaRDD<Tuple2<Integer, String>> rdd3 =
- (JavaRDD<Tuple2<Integer, String>>)listOfRDDs.get(2);
- JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
- PairFunction<Integer, Integer, Integer> mapToTuple =
- new PairFunction<Integer, Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> call(Integer i) {
- return new Tuple2<>(i, i);
- }
- };
- return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
- }
- }
- );
- JavaTestUtils.attachTestOutputStream(transformed2);
- List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
- JavaTestUtils.runStreams(ssc, 2, 2);
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testFlatMap() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("go", "giants"),
- Arrays.asList("boo", "dodgers"),
- Arrays.asList("athletics"));
-
- List<List<String>> expected = Arrays.asList(
- Arrays.asList("g","o","g","i","a","n","t","s"),
- Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"),
- Arrays.asList("a","t","h","l","e","t","i","c","s"));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(x.split("(?!^)")).iterator();
- }
- });
- JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testForeachRDD() {
- final LongAccumulator accumRdd = ssc.sparkContext().sc().longAccumulator();
- final LongAccumulator accumEle = ssc.sparkContext().sc().longAccumulator();
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1,1,1),
- Arrays.asList(1,1,1));
-
- JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
-
- stream.foreachRDD(new VoidFunction<JavaRDD<Integer>>() {
- @Override
- public void call(JavaRDD<Integer> rdd) {
- accumRdd.add(1);
- rdd.foreach(new VoidFunction<Integer>() {
- @Override
- public void call(Integer i) {
- accumEle.add(1);
- }
- });
- }
- });
-
- // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
- stream.foreachRDD(new VoidFunction2<JavaRDD<Integer>, Time>() {
- @Override
- public void call(JavaRDD<Integer> rdd, Time time) {
- }
- });
-
- JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(2, accumRdd.value().intValue());
- Assert.assertEquals(6, accumEle.value().intValue());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testPairFlatMap() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("giants"),
- Arrays.asList("dodgers"),
- Arrays.asList("athletics"));
-
- List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(6, "g"),
- new Tuple2<>(6, "i"),
- new Tuple2<>(6, "a"),
- new Tuple2<>(6, "n"),
- new Tuple2<>(6, "t"),
- new Tuple2<>(6, "s")),
- Arrays.asList(
- new Tuple2<>(7, "d"),
- new Tuple2<>(7, "o"),
- new Tuple2<>(7, "d"),
- new Tuple2<>(7, "g"),
- new Tuple2<>(7, "e"),
- new Tuple2<>(7, "r"),
- new Tuple2<>(7, "s")),
- Arrays.asList(
- new Tuple2<>(9, "a"),
- new Tuple2<>(9, "t"),
- new Tuple2<>(9, "h"),
- new Tuple2<>(9, "l"),
- new Tuple2<>(9, "e"),
- new Tuple2<>(9, "t"),
- new Tuple2<>(9, "i"),
- new Tuple2<>(9, "c"),
- new Tuple2<>(9, "s")));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
- new PairFlatMapFunction<String, Integer, String>() {
- @Override
- public Iterator<Tuple2<Integer, String>> call(String in) {
- List<Tuple2<Integer, String>> out = new ArrayList<>();
- for (String letter: in.split("(?!^)")) {
- out.add(new Tuple2<>(in.length(), letter));
- }
- return out.iterator();
- }
- });
- JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testUnion() {
- List<List<Integer>> inputData1 = Arrays.asList(
- Arrays.asList(1,1),
- Arrays.asList(2,2),
- Arrays.asList(3,3));
-
- List<List<Integer>> inputData2 = Arrays.asList(
- Arrays.asList(4,4),
- Arrays.asList(5,5),
- Arrays.asList(6,6));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(1,1,4,4),
- Arrays.asList(2,2,5,5),
- Arrays.asList(3,3,6,6));
-
- JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2);
- JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2);
-
- JavaDStream<Integer> unioned = stream1.union(stream2);
- JavaTestUtils.attachTestOutputStream(unioned);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- /*
- * Performs an order-invariant comparison of lists representing two RDD streams. This allows
- * us to account for ordering variation within individual RDD's which occurs during windowing.
- */
- public static <T> void assertOrderInvariantEquals(
- List<List<T>> expected, List<List<T>> actual) {
- List<Set<T>> expectedSets = new ArrayList<>();
- for (List<T> list: expected) {
- expectedSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
- }
- List<Set<T>> actualSets = new ArrayList<>();
- for (List<T> list: actual) {
- actualSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
- }
- Assert.assertEquals(expectedSets, actualSets);
- }
-
-
- // PairDStream Functions
- @SuppressWarnings("unchecked")
- @Test
- public void testPairFilter() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("giants", "dodgers"),
- Arrays.asList("yankees", "red sox"));
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("giants", 6)),
- Arrays.asList(new Tuple2<>("yankees", 7)));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = stream.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String in) {
- return new Tuple2<>(in, in.length());
- }
- });
-
- JavaPairDStream<String, Integer> filtered = pairStream.filter(
- new Function<Tuple2<String, Integer>, Boolean>() {
- @Override
- public Boolean call(Tuple2<String, Integer> in) {
- return in._1().contains("a");
- }
- });
- JavaTestUtils.attachTestOutputStream(filtered);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- private final List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "dodgers"),
- new Tuple2<>("california", "giants"),
- new Tuple2<>("new york", "yankees"),
- new Tuple2<>("new york", "mets")),
- Arrays.asList(new Tuple2<>("california", "sharks"),
- new Tuple2<>("california", "ducks"),
- new Tuple2<>("new york", "rangers"),
- new Tuple2<>("new york", "islanders")));
-
- @SuppressWarnings("unchecked")
- private final List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", 1),
- new Tuple2<>("california", 3),
- new Tuple2<>("new york", 4),
- new Tuple2<>("new york", 1)),
- Arrays.asList(
- new Tuple2<>("california", 5),
- new Tuple2<>("california", 5),
- new Tuple2<>("new york", 3),
- new Tuple2<>("new york", 1)));
-
- @SuppressWarnings("unchecked")
- @Test
- public void testPairMap() { // Maps pair -> pair of different type
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(1, "california"),
- new Tuple2<>(3, "california"),
- new Tuple2<>(4, "new york"),
- new Tuple2<>(1, "new york")),
- Arrays.asList(
- new Tuple2<>(5, "california"),
- new Tuple2<>(5, "california"),
- new Tuple2<>(3, "new york"),
- new Tuple2<>(1, "new york")));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(
- new PairFunction<Tuple2<String, Integer>, Integer, String>() {
- @Override
- public Tuple2<Integer, String> call(Tuple2<String, Integer> in) {
- return in.swap();
- }
- });
-
- JavaTestUtils.attachTestOutputStream(reversed);
- List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testPairMapPartitions() { // Maps pair -> pair of different type
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(1, "california"),
- new Tuple2<>(3, "california"),
- new Tuple2<>(4, "new york"),
- new Tuple2<>(1, "new york")),
- Arrays.asList(
- new Tuple2<>(5, "california"),
- new Tuple2<>(5, "california"),
- new Tuple2<>(3, "new york"),
- new Tuple2<>(1, "new york")));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
- new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
- @Override
- public Iterator<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) {
- List<Tuple2<Integer, String>> out = new LinkedList<>();
- while (in.hasNext()) {
- Tuple2<String, Integer> next = in.next();
- out.add(next.swap());
- }
- return out.iterator();
- }
- });
-
- JavaTestUtils.attachTestOutputStream(reversed);
- List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testPairMap2() { // Maps pair -> single
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(1, 3, 4, 1),
- Arrays.asList(5, 5, 3, 1));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaDStream<Integer> reversed = pairStream.map(
- new Function<Tuple2<String, Integer>, Integer>() {
- @Override
- public Integer call(Tuple2<String, Integer> in) {
- return in._2();
- }
- });
-
- JavaTestUtils.attachTestOutputStream(reversed);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
- List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("hi", 1),
- new Tuple2<>("ho", 2)),
- Arrays.asList(
- new Tuple2<>("hi", 1),
- new Tuple2<>("ho", 2)));
-
- List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(1, "h"),
- new Tuple2<>(1, "i"),
- new Tuple2<>(2, "h"),
- new Tuple2<>(2, "o")),
- Arrays.asList(
- new Tuple2<>(1, "h"),
- new Tuple2<>(1, "i"),
- new Tuple2<>(2, "h"),
- new Tuple2<>(2, "o")));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
- new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
- @Override
- public Iterator<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) {
- List<Tuple2<Integer, String>> out = new LinkedList<>();
- for (Character s : in._1().toCharArray()) {
- out.add(new Tuple2<>(in._2(), s.toString()));
- }
- return out.iterator();
- }
- });
- JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testPairGroupByKey() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
-
- List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", Arrays.asList("dodgers", "giants")),
- new Tuple2<>("new york", Arrays.asList("yankees", "mets"))),
- Arrays.asList(
- new Tuple2<>("california", Arrays.asList("sharks", "ducks")),
- new Tuple2<>("new york", Arrays.asList("rangers", "islanders"))));
-
- JavaDStream<Tuple2<String, String>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Iterable<String>> grouped = pairStream.groupByKey();
- JavaTestUtils.attachTestOutputStream(grouped);
- List<List<Tuple2<String, Iterable<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected.size(), result.size());
- Iterator<List<Tuple2<String, Iterable<String>>>> resultItr = result.iterator();
- Iterator<List<Tuple2<String, List<String>>>> expectedItr = expected.iterator();
- while (resultItr.hasNext() && expectedItr.hasNext()) {
- Iterator<Tuple2<String, Iterable<String>>> resultElements = resultItr.next().iterator();
- Iterator<Tuple2<String, List<String>>> expectedElements = expectedItr.next().iterator();
- while (resultElements.hasNext() && expectedElements.hasNext()) {
- Tuple2<String, Iterable<String>> resultElement = resultElements.next();
- Tuple2<String, List<String>> expectedElement = expectedElements.next();
- Assert.assertEquals(expectedElement._1(), resultElement._1());
- equalIterable(expectedElement._2(), resultElement._2());
- }
- Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext());
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testPairReduceByKey() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(
- new Tuple2<>("california", 10),
- new Tuple2<>("new york", 4)));
-
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(new IntegerSum());
-
- JavaTestUtils.attachTestOutputStream(reduced);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testCombineByKey() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(
- new Tuple2<>("california", 10),
- new Tuple2<>("new york", 4)));
-
- JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> combined = pairStream.combineByKey(
- new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer i) {
- return i;
- }
- }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
-
- JavaTestUtils.attachTestOutputStream(combined);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testCountByValue() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("hello", "world"),
- Arrays.asList("hello", "moon"),
- Arrays.asList("hello"));
-
- List<List<Tuple2<String, Long>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("hello", 1L),
- new Tuple2<>("world", 1L)),
- Arrays.asList(
- new Tuple2<>("hello", 1L),
- new Tuple2<>("moon", 1L)),
- Arrays.asList(
- new Tuple2<>("hello", 1L)));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Long> counted = stream.countByValue();
- JavaTestUtils.attachTestOutputStream(counted);
- List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testGroupByKeyAndWindow() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california", Arrays.asList(1, 3)),
- new Tuple2<>("new york", Arrays.asList(1, 4))
- ),
- Arrays.asList(
- new Tuple2<>("california", Arrays.asList(1, 3, 5, 5)),
- new Tuple2<>("new york", Arrays.asList(1, 1, 3, 4))
- ),
- Arrays.asList(
- new Tuple2<>("california", Arrays.asList(5, 5)),
- new Tuple2<>("new york", Arrays.asList(1, 3))
- )
- );
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Iterable<Integer>> groupWindowed =
- pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
- JavaTestUtils.attachTestOutputStream(groupWindowed);
- List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected.size(), result.size());
- for (int i = 0; i < result.size(); i++) {
- Assert.assertEquals(convert(expected.get(i)), convert(result.get(i)));
- }
- }
-
- private static Set<Tuple2<String, HashSet<Integer>>>
- convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
- List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<>();
- for (Tuple2<String, List<Integer>> tuple: listOfTuples) {
- newListOfTuples.add(convert(tuple));
- }
- return new HashSet<>(newListOfTuples);
- }
-
- private static Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
- return new Tuple2<>(tuple._1(), new HashSet<>(tuple._2()));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testReduceByKeyAndWindow() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(new Tuple2<>("california", 14),
- new Tuple2<>("new york", 9)),
- Arrays.asList(new Tuple2<>("california", 10),
- new Tuple2<>("new york", 4)));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> reduceWindowed =
- pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000));
- JavaTestUtils.attachTestOutputStream(reduceWindowed);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testUpdateStateByKey() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(new Tuple2<>("california", 14),
- new Tuple2<>("new york", 9)),
- Arrays.asList(new Tuple2<>("california", 14),
- new Tuple2<>("new york", 9)));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
- new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
- @Override
- public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
- int out = 0;
- if (state.isPresent()) {
- out += state.get();
- }
- for (Integer v : values) {
- out += v;
- }
- return Optional.of(out);
- }
- });
- JavaTestUtils.attachTestOutputStream(updated);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testUpdateStateByKeyWithInitial() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<Tuple2<String, Integer>> initial = Arrays.asList(
- new Tuple2<>("california", 1),
- new Tuple2<>("new york", 2));
-
- JavaRDD<Tuple2<String, Integer>> tmpRDD = ssc.sparkContext().parallelize(initial);
- JavaPairRDD<String, Integer> initialRDD = JavaPairRDD.fromJavaRDD(tmpRDD);
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", 5),
- new Tuple2<>("new york", 7)),
- Arrays.asList(new Tuple2<>("california", 15),
- new Tuple2<>("new york", 11)),
- Arrays.asList(new Tuple2<>("california", 15),
- new Tuple2<>("new york", 11)));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
- new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
- @Override
- public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
- int out = 0;
- if (state.isPresent()) {
- out += state.get();
- }
- for (Integer v : values) {
- out += v;
- }
- return Optional.of(out);
- }
- }, new HashPartitioner(1), initialRDD);
- JavaTestUtils.attachTestOutputStream(updated);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testReduceByKeyAndWindowWithInverse() {
- List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
-
- List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", 4),
- new Tuple2<>("new york", 5)),
- Arrays.asList(new Tuple2<>("california", 14),
- new Tuple2<>("new york", 9)),
- Arrays.asList(new Tuple2<>("california", 10),
- new Tuple2<>("new york", 4)));
-
- JavaDStream<Tuple2<String, Integer>> stream =
- JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, Integer> reduceWindowed =
- pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
- new Duration(2000), new Duration(1000));
- JavaTestUtils.attachTestOutputStream(reduceWindowed);
- List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testCountByValueAndWindow() {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("hello", "world"),
- Arrays.asList("hello", "moon"),
- Arrays.asList("hello"));
-
- List<HashSet<Tuple2<String, Long>>> expected = Arrays.asList(
- Sets.newHashSet(
- new Tuple2<>("hello", 1L),
- new Tuple2<>("world", 1L)),
- Sets.newHashSet(
- new Tuple2<>("hello", 2L),
- new Tuple2<>("world", 1L),
- new Tuple2<>("moon", 1L)),
- Sets.newHashSet(
- new Tuple2<>("hello", 2L),
- new Tuple2<>("moon", 1L)));
-
- JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, Long> counted =
- stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
- JavaTestUtils.attachTestOutputStream(counted);
- List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
- List<Set<Tuple2<String, Long>>> unorderedResult = new ArrayList<>();
- for (List<Tuple2<String, Long>> res: result) {
- unorderedResult.add(Sets.newHashSet(res));
- }
-
- Assert.assertEquals(expected, unorderedResult);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testPairTransform() {
- List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(3, 5),
- new Tuple2<>(1, 5),
- new Tuple2<>(4, 5),
- new Tuple2<>(2, 5)),
- Arrays.asList(
- new Tuple2<>(2, 5),
- new Tuple2<>(3, 5),
- new Tuple2<>(4, 5),
- new Tuple2<>(1, 5)));
-
- List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(1, 5),
- new Tuple2<>(2, 5),
- new Tuple2<>(3, 5),
- new Tuple2<>(4, 5)),
- Arrays.asList(
- new Tuple2<>(1, 5),
- new Tuple2<>(2, 5),
- new Tuple2<>(3, 5),
- new Tuple2<>(4, 5)));
-
- JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(
- new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
- @Override
- public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) {
- return in.sortByKey();
- }
- });
-
- JavaTestUtils.attachTestOutputStream(sorted);
- List<List<Tuple2<Integer, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testPairToNormalRDDTransform() {
- List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
- Arrays.asList(
- new Tuple2<>(3, 5),
- new Tuple2<>(1, 5),
- new Tuple2<>(4, 5),
- new Tuple2<>(2, 5)),
- Arrays.asList(
- new Tuple2<>(2, 5),
- new Tuple2<>(3, 5),
- new Tuple2<>(4, 5),
- new Tuple2<>(1, 5)));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(3,1,4,2),
- Arrays.asList(2,3,4,1));
-
- JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaDStream<Integer> firstParts = pairStream.transform(
- new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() {
- @Override
- public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) {
- return in.map(new Function<Tuple2<Integer, Integer>, Integer>() {
- @Override
- public Integer call(Tuple2<Integer, Integer> in2) {
- return in2._1();
- }
- });
- }
- });
-
- JavaTestUtils.attachTestOutputStream(firstParts);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testMapValues() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
-
- List<List<Tuple2<String, String>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "DODGERS"),
- new Tuple2<>("california", "GIANTS"),
- new Tuple2<>("new york", "YANKEES"),
- new Tuple2<>("new york", "METS")),
- Arrays.asList(new Tuple2<>("california", "SHARKS"),
- new Tuple2<>("california", "DUCKS"),
- new Tuple2<>("new york", "RANGERS"),
- new Tuple2<>("new york", "ISLANDERS")));
-
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
- JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() {
- @Override
- public String call(String s) {
- return s.toUpperCase(Locale.ENGLISH);
- }
- });
-
- JavaTestUtils.attachTestOutputStream(mapped);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testFlatMapValues() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
-
- List<List<Tuple2<String, String>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "dodgers1"),
- new Tuple2<>("california", "dodgers2"),
- new Tuple2<>("california", "giants1"),
- new Tuple2<>("california", "giants2"),
- new Tuple2<>("new york", "yankees1"),
- new Tuple2<>("new york", "yankees2"),
- new Tuple2<>("new york", "mets1"),
- new Tuple2<>("new york", "mets2")),
- Arrays.asList(new Tuple2<>("california", "sharks1"),
- new Tuple2<>("california", "sharks2"),
- new Tuple2<>("california", "ducks1"),
- new Tuple2<>("california", "ducks2"),
- new Tuple2<>("new york", "rangers1"),
- new Tuple2<>("new york", "rangers2"),
- new Tuple2<>("new york", "islanders1"),
- new Tuple2<>("new york", "islanders2")));
-
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
-
- JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(
- new Function<String, Iterable<String>>() {
- @Override
- public Iterable<String> call(String in) {
- List<String> out = new ArrayList<>();
- out.add(in + "1");
- out.add(in + "2");
- return out;
- }
- });
-
- JavaTestUtils.attachTestOutputStream(flatMapped);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testCoGroup() {
- List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "dodgers"),
- new Tuple2<>("new york", "yankees")),
- Arrays.asList(new Tuple2<>("california", "sharks"),
- new Tuple2<>("new york", "rangers")));
-
- List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "giants"),
- new Tuple2<>("new york", "mets")),
- Arrays.asList(new Tuple2<>("california", "ducks"),
- new Tuple2<>("new york", "islanders")));
-
-
- List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california",
- new Tuple2<>(Arrays.asList("dodgers"), Arrays.asList("giants"))),
- new Tuple2<>("new york",
- new Tuple2<>(Arrays.asList("yankees"), Arrays.asList("mets")))),
- Arrays.asList(
- new Tuple2<>("california",
- new Tuple2<>(Arrays.asList("sharks"), Arrays.asList("ducks"))),
- new Tuple2<>("new york",
- new Tuple2<>(Arrays.asList("rangers"), Arrays.asList("islanders")))));
-
-
- JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
- ssc, stringStringKVStream1, 1);
- JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
-
- JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
- ssc, stringStringKVStream2, 1);
- JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
-
- JavaPairDStream<String, Tuple2<Iterable<String>, Iterable<String>>> grouped =
- pairStream1.cogroup(pairStream2);
- JavaTestUtils.attachTestOutputStream(grouped);
- List<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> result =
- JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected.size(), result.size());
- Iterator<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> resultItr =
- result.iterator();
- Iterator<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expectedItr =
- expected.iterator();
- while (resultItr.hasNext() && expectedItr.hasNext()) {
- Iterator<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>> resultElements =
- resultItr.next().iterator();
- Iterator<Tuple2<String, Tuple2<List<String>, List<String>>>> expectedElements =
- expectedItr.next().iterator();
- while (resultElements.hasNext() && expectedElements.hasNext()) {
- Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> resultElement =
- resultElements.next();
- Tuple2<String, Tuple2<List<String>, List<String>>> expectedElement =
- expectedElements.next();
- Assert.assertEquals(expectedElement._1(), resultElement._1());
- equalIterable(expectedElement._2()._1(), resultElement._2()._1());
- equalIterable(expectedElement._2()._2(), resultElement._2()._2());
- }
- Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext());
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testJoin() {
- List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "dodgers"),
- new Tuple2<>("new york", "yankees")),
- Arrays.asList(new Tuple2<>("california", "sharks"),
- new Tuple2<>("new york", "rangers")));
-
- List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "giants"),
- new Tuple2<>("new york", "mets")),
- Arrays.asList(new Tuple2<>("california", "ducks"),
- new Tuple2<>("new york", "islanders")));
-
-
- List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<>("california",
- new Tuple2<>("dodgers", "giants")),
- new Tuple2<>("new york",
- new Tuple2<>("yankees", "mets"))),
- Arrays.asList(
- new Tuple2<>("california",
- new Tuple2<>("sharks", "ducks")),
- new Tuple2<>("new york",
- new Tuple2<>("rangers", "islanders"))));
-
-
- JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
- ssc, stringStringKVStream1, 1);
- JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
-
- JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
- ssc, stringStringKVStream2, 1);
- JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
-
- JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2);
- JavaTestUtils.attachTestOutputStream(joined);
- List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testLeftOuterJoin() {
- List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "dodgers"),
- new Tuple2<>("new york", "yankees")),
- Arrays.asList(new Tuple2<>("california", "sharks") ));
-
- List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
- Arrays.asList(new Tuple2<>("california", "giants") ),
- Arrays.asList(new Tuple2<>("new york", "islanders") )
-
- );
-
- List<List<Long>> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L));
-
- JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
- ssc, stringStringKVStream1, 1);
- JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
-
- JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
- ssc, stringStringKVStream2, 1);
- JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
-
- JavaPairDStream<String, Tuple2<String, Optional<String>>> joined =
- pairStream1.leftOuterJoin(pairStream2);
- JavaDStream<Long> counted = joined.count();
- JavaTestUtils.attachTestOutputStream(counted);
- List<List<Long>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testCheckpointMasterRecovery() throws InterruptedException {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("this", "is"),
- Arrays.asList("a", "test"),
- Arrays.asList("counting", "letters"));
-
- List<List<Integer>> expectedInitial = Arrays.asList(
- Arrays.asList(4,2));
- List<List<Integer>> expectedFinal = Arrays.asList(
- Arrays.asList(1,4),
- Arrays.asList(8,7));
-
- File tempDir = Files.createTempDir();
- tempDir.deleteOnExit();
- ssc.checkpoint(tempDir.getAbsolutePath());
-
- JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
- @Override
- public Integer call(String s) {
- return s.length();
- }
- });
- JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
- List<List<Integer>> initialResult = JavaTestUtils.runStreams(ssc, 1, 1);
-
- assertOrderInvariantEquals(expectedInitial, initialResult);
- Thread.sleep(1000);
- ssc.stop();
-
- ssc = new JavaStreamingContext(tempDir.getAbsolutePath());
- // Tweak to take into consideration that the last batch before failure
- // will be re-processed after recovery
- List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
- assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
- ssc.stop();
- Utils.deleteRecursively(tempDir);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testContextGetOrCreate() throws InterruptedException {
- ssc.stop();
-
- final SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("newContext", "true");
-
- File emptyDir = Files.createTempDir();
- emptyDir.deleteOnExit();
- StreamingContextSuite contextSuite = new StreamingContextSuite();
- String corruptedCheckpointDir = contextSuite.createCorruptedCheckpoint();
- String checkpointDir = contextSuite.createValidCheckpoint();
-
- // Function to create JavaStreamingContext without any output operations
- // (used to detect the new context)
- final AtomicBoolean newContextCreated = new AtomicBoolean(false);
- Function0<JavaStreamingContext> creatingFunc = new Function0<JavaStreamingContext>() {
- @Override
- public JavaStreamingContext call() {
- newContextCreated.set(true);
- return new JavaStreamingContext(conf, Seconds.apply(1));
- }
- };
-
- newContextCreated.set(false);
- ssc = JavaStreamingContext.getOrCreate(emptyDir.getAbsolutePath(), creatingFunc);
- Assert.assertTrue("new context not created", newContextCreated.get());
- ssc.stop();
-
- newContextCreated.set(false);
- ssc = JavaStreamingContext.getOrCreate(corruptedCheckpointDir, creatingFunc,
- new Configuration(), true);
- Assert.assertTrue("new context not created", newContextCreated.get());
- ssc.stop();
-
- newContextCreated.set(false);
- ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
- new Configuration());
- Assert.assertTrue("old context not recovered", !newContextCreated.get());
- ssc.stop();
-
- newContextCreated.set(false);
- JavaSparkContext sc = new JavaSparkContext(conf);
- ssc = JavaStreamingContext.getOrCreate(checkpointDir, creatingFunc,
- new Configuration());
- Assert.assertTrue("old context not recovered", !newContextCreated.get());
- ssc.stop();
- }
-
- /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
- @SuppressWarnings("unchecked")
- @Test
- public void testCheckpointofIndividualStream() throws InterruptedException {
- List<List<String>> inputData = Arrays.asList(
- Arrays.asList("this", "is"),
- Arrays.asList("a", "test"),
- Arrays.asList("counting", "letters"));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(4,2),
- Arrays.asList(1,4),
- Arrays.asList(8,7));
-
- JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream letterCount = stream.map(new Function<String, Integer>() {
- @Override
- public Integer call(String s) {
- return s.length();
- }
- });
- JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
-
- letterCount.checkpoint(new Duration(1000));
-
- List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(ssc, 3, 3);
- assertOrderInvariantEquals(expected, result1);
- }
- */
-
- // Input stream tests. These mostly just test that we can instantiate a given InputStream with
- // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
- // InputStream functionality is deferred to the existing Scala tests.
- @Test
- public void testSocketTextStream() {
- ssc.socketTextStream("localhost", 12345);
- }
-
- @Test
- public void testSocketString() {
- ssc.socketStream(
- "localhost",
- 12345,
- new Function<InputStream, Iterable<String>>() {
- @Override
- public Iterable<String> call(InputStream in) throws IOException {
- List<String> out = new ArrayList<>();
- try (BufferedReader reader = new BufferedReader(
- new InputStreamReader(in, StandardCharsets.UTF_8))) {
- for (String line; (line = reader.readLine()) != null;) {
- out.add(line);
- }
- }
- return out;
- }
- },
- StorageLevel.MEMORY_ONLY());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testTextFileStream() throws IOException {
- File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark");
- List<List<String>> expected = fileTestPrepare(testDir);
-
- JavaDStream<String> input = ssc.textFileStream(testDir.toString());
- JavaTestUtils.attachTestOutputStream(input);
- List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testFileStream() throws IOException {
- File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark");
- List<List<String>> expected = fileTestPrepare(testDir);
-
- JavaPairInputDStream<LongWritable, Text> inputStream = ssc.fileStream(
- testDir.toString(),
- LongWritable.class,
- Text.class,
- TextInputFormat.class,
- new Function<Path, Boolean>() {
- @Override
- public Boolean call(Path v1) {
- return Boolean.TRUE;
- }
- },
- true);
-
- JavaDStream<String> test = inputStream.map(
- new Function<Tuple2<LongWritable, Text>, String>() {
- @Override
- public String call(Tuple2<LongWritable, Text> v1) {
- return v1._2().toString();
- }
- });
-
- JavaTestUtils.attachTestOutputStream(test);
- List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @Test
- public void testRawSocketStream() {
- ssc.rawSocketStream("localhost", 12345);
- }
-
- private static List<List<String>> fileTestPrepare(File testDir) throws IOException {
- File existingFile = new File(testDir, "0");
- Files.write("0\n", existingFile, StandardCharsets.UTF_8);
- Assert.assertTrue(existingFile.setLastModified(1000));
- Assert.assertEquals(1000, existingFile.lastModified());
- return Arrays.asList(Arrays.asList("0"));
- }
-
- @SuppressWarnings("unchecked")
- // SPARK-5795: no logic assertions, just testing that intended API invocations compile
- private void compileSaveAsJavaAPI(JavaPairDStream<LongWritable,Text> pds) {
- pds.saveAsNewAPIHadoopFiles(
- "", "", LongWritable.class, Text.class,
- org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
- pds.saveAsHadoopFiles(
- "", "", LongWritable.class, Text.class,
- org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
- // Checks that a previous common workaround for this API still compiles
- pds.saveAsNewAPIHadoopFiles(
- "", "", LongWritable.class, Text.class,
- (Class) org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
- pds.saveAsHadoopFiles(
- "", "", LongWritable.class, Text.class,
- (Class) org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
- }
-
-}