diff options
author | Sean Owen <sowen@cloudera.com> | 2016-03-09 18:27:44 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-03-09 18:27:44 +0000 |
commit | 256704c771d301700af9ebf0d180c1ba7c4116c0 (patch) | |
tree | f9be79919b5c6ec4847c24a086fa844555e2cd12 /extras | |
parent | 7791d0c3a9bdfe73e071266846f9ab1491fce50c (diff) | |
download | spark-256704c771d301700af9ebf0d180c1ba7c4116c0.tar.gz spark-256704c771d301700af9ebf0d180c1ba7c4116c0.tar.bz2 spark-256704c771d301700af9ebf0d180c1ba7c4116c0.zip |
[SPARK-13595][BUILD] Move docker, extras modules into external
## What changes were proposed in this pull request?
Move `docker` dirs out of top level into `external/`; move `extras/*` into `external/`
## How was this patch tested?
This is tested with Jenkins tests.
Author: Sean Owen <sowen@cloudera.com>
Closes #11523 from srowen/SPARK-13595.
Diffstat (limited to 'extras')
30 files changed, 0 insertions, 5511 deletions
diff --git a/extras/README.md b/extras/README.md deleted file mode 100644 index 1b4174b7d5..0000000000 --- a/extras/README.md +++ /dev/null @@ -1 +0,0 @@ -This directory contains build components not included by default in Spark's build. diff --git a/extras/java8-tests/README.md b/extras/java8-tests/README.md deleted file mode 100644 index dc9e87f2ee..0000000000 --- a/extras/java8-tests/README.md +++ /dev/null @@ -1,24 +0,0 @@ -# Java 8 Test Suites - -These tests require having Java 8 installed and are isolated from the main Spark build. -If Java 8 is not your system's default Java version, you will need to point Spark's build -to your Java location. The set-up depends a bit on the build system: - -* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass - `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically - include the Java 8 test project. - - `$ JAVA_HOME=/opt/jdk1.8.0/ build/sbt clean "test-only org.apache.spark.Java8APISuite"` - -* For Maven users, - - Maven users can also refer to their Java 8 directory using JAVA_HOME. However, Maven will not - automatically detect the presence of a Java 8 JDK, so a special build profile `-Pjava8-tests` - must be used. - - `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests` - `$ JAVA_HOME=/opt/jdk1.8.0/ mvn test -Pjava8-tests -DwildcardSuites=org.apache.spark.Java8APISuite` - - Note that the above command can only be run from project root directory since this module - depends on core and the test-jars of core and streaming. This means an install step is - required to make the test dependencies visible to the Java 8 sub-project. diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml deleted file mode 100644 index 0ad9c5303a..0000000000 --- a/extras/java8-tests/pom.xml +++ /dev/null @@ -1,161 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -~ Licensed to the Apache Software Foundation (ASF) under one or more -~ contributor license agreements. See the NOTICE file distributed with -~ this work for additional information regarding copyright ownership. -~ The ASF licenses this file to You under the Apache License, Version 2.0 -~ (the "License"); you may not use this file except in compliance with -~ the License. You may obtain a copy of the License at -~ -~ http://www.apache.org/licenses/LICENSE-2.0 -~ -~ Unless required by applicable law or agreed to in writing, software -~ distributed under the License is distributed on an "AS IS" BASIS, -~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -~ See the License for the specific language governing permissions and -~ limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>2.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <groupId>org.apache.spark</groupId> - <artifactId>java8-tests_2.11</artifactId> - <packaging>pom</packaging> - <name>Spark Project Java8 Tests POM</name> - - <properties> - <sbt.project.name>java8-tests</sbt.project.name> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-test-tags_${scala.binary.version}</artifactId> - </dependency> - </dependencies> - - <profiles> - <profile> - <id>java8-tests</id> - </profile> - </profiles> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-deploy-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-install-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <executions> - <execution> - <id>test</id> - <goals> - <goal>test</goal> - </goals> - </execution> - </executions> - <configuration> - <systemPropertyVariables> - <!-- For some reason surefire isn't setting this log4j file on the - test classpath automatically. So we add it manually. --> - <log4j.configuration> - file:src/test/resources/log4j.properties - </log4j.configuration> - </systemPropertyVariables> - <skipTests>false</skipTests> - <includes> - <include>**/Suite*.java</include> - <include>**/*Suite.java</include> - </includes> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <executions> - <execution> - <id>test-compile-first</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <fork>true</fork> - <verbose>true</verbose> - <forceJavacCompilerUse>true</forceJavacCompilerUse> - <source>1.8</source> - <compilerVersion>1.8</compilerVersion> - <target>1.8</target> - <encoding>UTF-8</encoding> - <maxmem>1024m</maxmem> - </configuration> - </plugin> - <plugin> - <!-- disabled --> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <executions> - <execution> - <phase>none</phase> - </execution> - <execution> - <id>scala-compile-first</id> - <phase>none</phase> - </execution> - <execution> - <id>scala-test-compile-first</id> - <phase>none</phase> - </execution> - <execution> - <id>attach-scaladocs</id> - <phase>none</phase> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java deleted file mode 100644 index c0b58e713f..0000000000 --- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java +++ /dev/null @@ -1,393 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark; - -import java.io.File; -import java.io.Serializable; -import java.util.*; - -import scala.Tuple2; - -import com.google.common.collect.Iterables; -import com.google.common.io.Files; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.api.java.JavaDoubleRDD; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.function.*; -import org.apache.spark.util.Utils; - -/** - * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8 - * lambda syntax. - */ -public class Java8APISuite implements Serializable { - static int foreachCalls = 0; - private transient JavaSparkContext sc; - - @Before - public void setUp() { - sc = new JavaSparkContext("local", "JavaAPISuite"); - } - - @After - public void tearDown() { - sc.stop(); - sc = null; - } - - @Test - public void foreachWithAnonymousClass() { - foreachCalls = 0; - JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach(new VoidFunction<String>() { - @Override - public void call(String s) { - foreachCalls++; - } - }); - Assert.assertEquals(2, foreachCalls); - } - - @Test - public void foreach() { - foreachCalls = 0; - JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach(x -> foreachCalls++); - Assert.assertEquals(2, foreachCalls); - } - - @Test - public void groupBy() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function<Integer, Boolean> isOdd = x -> x % 2 == 0; - JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd); - Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens - Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds - - oddsAndEvens = rdd.groupBy(isOdd, 1); - Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens - Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds - } - - @Test - public void leftOuterJoin() { - JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList( - new Tuple2<>(1, 1), - new Tuple2<>(1, 2), - new Tuple2<>(2, 1), - new Tuple2<>(3, 1) - )); - JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList( - new Tuple2<>(1, 'x'), - new Tuple2<>(2, 'y'), - new Tuple2<>(2, 'z'), - new Tuple2<>(4, 'w') - )); - List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined = - rdd1.leftOuterJoin(rdd2).collect(); - Assert.assertEquals(5, joined.size()); - Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched = - rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first(); - Assert.assertEquals(3, firstUnmatched._1().intValue()); - } - - @Test - public void foldReduce() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function2<Integer, Integer, Integer> add = (a, b) -> a + b; - - int sum = rdd.fold(0, add); - Assert.assertEquals(33, sum); - - sum = rdd.reduce(add); - Assert.assertEquals(33, sum); - } - - @Test - public void foldByKey() { - List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<>(2, 1), - new Tuple2<>(2, 1), - new Tuple2<>(1, 1), - new Tuple2<>(3, 2), - new Tuple2<>(3, 1) - ); - JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b); - Assert.assertEquals(1, sums.lookup(1).get(0).intValue()); - Assert.assertEquals(2, sums.lookup(2).get(0).intValue()); - Assert.assertEquals(3, sums.lookup(3).get(0).intValue()); - } - - @Test - public void reduceByKey() { - List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<>(2, 1), - new Tuple2<>(2, 1), - new Tuple2<>(1, 1), - new Tuple2<>(3, 2), - new Tuple2<>(3, 1) - ); - JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b); - Assert.assertEquals(1, counts.lookup(1).get(0).intValue()); - Assert.assertEquals(2, counts.lookup(2).get(0).intValue()); - Assert.assertEquals(3, counts.lookup(3).get(0).intValue()); - - Map<Integer, Integer> localCounts = counts.collectAsMap(); - Assert.assertEquals(1, localCounts.get(1).intValue()); - Assert.assertEquals(2, localCounts.get(2).intValue()); - Assert.assertEquals(3, localCounts.get(3).intValue()); - - localCounts = rdd.reduceByKeyLocally((a, b) -> a + b); - Assert.assertEquals(1, localCounts.get(1).intValue()); - Assert.assertEquals(2, localCounts.get(2).intValue()); - Assert.assertEquals(3, localCounts.get(3).intValue()); - } - - @Test - public void map() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache(); - doubles.collect(); - JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)) - .cache(); - pairs.collect(); - JavaRDD<String> strings = rdd.map(Object::toString).cache(); - strings.collect(); - } - - @Test - public void flatMap() { - JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!", - "The quick brown fox jumps over the lazy dog.")); - JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" "))); - - Assert.assertEquals("Hello", words.first()); - Assert.assertEquals(11, words.count()); - - JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> { - List<Tuple2<String, String>> pairs2 = new LinkedList<>(); - for (String word : s.split(" ")) { - pairs2.add(new Tuple2<>(word, word)); - } - return pairs2; - }); - - Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first()); - Assert.assertEquals(11, pairs.count()); - - JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> { - List<Double> lengths = new LinkedList<>(); - for (String word : s.split(" ")) { - lengths.add((double) word.length()); - } - return lengths; - }); - - Assert.assertEquals(5.0, doubles.first(), 0.01); - Assert.assertEquals(11, pairs.count()); - } - - @Test - public void mapsFromPairsToPairs() { - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); - - // Regression test for SPARK-668: - JavaPairRDD<String, Integer> swapped = - pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap())); - swapped.collect(); - - // There was never a bug here, but it's worth testing: - pairRDD.map(Tuple2::swap).collect(); - } - - @Test - public void mapPartitions() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); - JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> { - int sum = 0; - while (iter.hasNext()) { - sum += iter.next(); - } - return Collections.singletonList(sum); - }); - - Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); - } - - @Test - public void sequenceFile() { - File tempDir = Files.createTempDir(); - tempDir.deleteOnExit(); - String outputDir = new File(tempDir, "output").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - - rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) - .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); - - // Try reading the output back as an object file - JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class) - .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString())); - Assert.assertEquals(pairs, readRDD.collect()); - Utils.deleteRecursively(tempDir); - } - - @Test - public void zip() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x); - JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles); - zipped.count(); - } - - @Test - public void zipPartitions() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2); - JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2); - FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn = - (Iterator<Integer> i, Iterator<String> s) -> { - int sizeI = 0; - while (i.hasNext()) { - sizeI += 1; - i.next(); - } - int sizeS = 0; - while (s.hasNext()) { - sizeS += 1; - s.next(); - } - return Arrays.asList(sizeI, sizeS).iterator(); - }; - JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn); - Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); - } - - @Test - public void accumulators() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - - Accumulator<Integer> intAccum = sc.intAccumulator(10); - rdd.foreach(intAccum::add); - Assert.assertEquals((Integer) 25, intAccum.value()); - - Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0); - rdd.foreach(x -> doubleAccum.add((double) x)); - Assert.assertEquals((Double) 25.0, doubleAccum.value()); - - // Try a custom accumulator type - AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() { - @Override - public Float addInPlace(Float r, Float t) { - return r + t; - } - @Override - public Float addAccumulator(Float r, Float t) { - return r + t; - } - @Override - public Float zero(Float initialValue) { - return 0.0f; - } - }; - - Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); - rdd.foreach(x -> floatAccum.add((float) x)); - Assert.assertEquals((Float) 25.0f, floatAccum.value()); - - // Test the setValue method - floatAccum.setValue(5.0f); - Assert.assertEquals((Float) 5.0f, floatAccum.value()); - } - - @Test - public void keyBy() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); - List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect(); - Assert.assertEquals(new Tuple2<>("1", 1), s.get(0)); - Assert.assertEquals(new Tuple2<>("2", 2), s.get(1)); - } - - @Test - public void mapOnPairRDD() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4)); - JavaPairRDD<Integer, Integer> rdd2 = - rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); - JavaPairRDD<Integer, Integer> rdd3 = - rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1())); - Assert.assertEquals(Arrays.asList( - new Tuple2<>(1, 1), - new Tuple2<>(0, 2), - new Tuple2<>(1, 3), - new Tuple2<>(0, 4)), rdd3.collect()); - } - - @Test - public void collectPartitions() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); - - JavaPairRDD<Integer, Integer> rdd2 = - rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); - List<Integer>[] parts = rdd1.collectPartitions(new int[]{0}); - Assert.assertEquals(Arrays.asList(1, 2), parts[0]); - - parts = rdd1.collectPartitions(new int[]{1, 2}); - Assert.assertEquals(Arrays.asList(3, 4), parts[0]); - Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]); - - Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)), - rdd2.collectPartitions(new int[]{0})[0]); - - List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2}); - Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]); - Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)), - parts2[1]); - } - - @Test - public void collectAsMapWithIntArrayValues() { - // Regression test for SPARK-1040 - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1)); - JavaPairRDD<Integer, int[]> pairRDD = - rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x})); - pairRDD.collect(); // Works fine - pairRDD.collectAsMap(); // Used to crash with ClassCastException - } -} diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java deleted file mode 100644 index 604d818ef1..0000000000 --- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java +++ /dev/null @@ -1,905 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming; - -import java.io.Serializable; -import java.util.*; - -import scala.Tuple2; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.junit.Assert; -import org.junit.Test; - -import org.apache.spark.Accumulator; -import org.apache.spark.HashPartitioner; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaMapWithStateDStream; - -/** - * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8 - * lambda syntax. - */ -@SuppressWarnings("unchecked") -public class Java8APISuite extends LocalJavaStreamingContext implements Serializable { - - @Test - public void testMap() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("hello", "world"), - Arrays.asList("goodnight", "moon")); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(5, 5), - Arrays.asList(9, 4)); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> letterCount = stream.map(String::length); - JavaTestUtils.attachTestOutputStream(letterCount); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testFilter() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List<List<String>> expected = Arrays.asList( - Arrays.asList("giants"), - Arrays.asList("yankees")); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> filtered = stream.filter(s -> s.contains("a")); - JavaTestUtils.attachTestOutputStream(filtered); - List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testMapPartitions() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List<List<String>> expected = Arrays.asList( - Arrays.asList("GIANTSDODGERS"), - Arrays.asList("YANKEESRED SOX")); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> mapped = stream.mapPartitions(in -> { - String out = ""; - while (in.hasNext()) { - out = out + in.next().toUpperCase(); - } - return Lists.newArrayList(out); - }); - JavaTestUtils.attachTestOutputStream(mapped); - List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduce() { - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1, 2, 3), - Arrays.asList(4, 5, 6), - Arrays.asList(7, 8, 9)); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(6), - Arrays.asList(15), - Arrays.asList(24)); - - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> reduced = stream.reduce((x, y) -> x + y); - JavaTestUtils.attachTestOutputStream(reduced); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByWindow() { - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1, 2, 3), - Arrays.asList(4, 5, 6), - Arrays.asList(7, 8, 9)); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(6), - Arrays.asList(21), - Arrays.asList(39), - Arrays.asList(24)); - - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y, - (x, y) -> x - y, new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reducedWindowed); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); - - Assert.assertEquals(expected, result); - } - - @Test - public void testTransform() { - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1, 2, 3), - Arrays.asList(4, 5, 6), - Arrays.asList(7, 8, 9)); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(3, 4, 5), - Arrays.asList(6, 7, 8), - Arrays.asList(9, 10, 11)); - - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2)); - - JavaTestUtils.attachTestOutputStream(transformed); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testVariousTransform() { - // tests whether all variations of transform can be called from Java - - List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1)); - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - - List<List<Tuple2<String, Integer>>> pairInputData = - Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); - - JavaDStream<Integer> transformed1 = stream.transform(in -> null); - JavaDStream<Integer> transformed2 = stream.transform((x, time) -> null); - JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(x -> null); - JavaPairDStream<String, Integer> transformed4 = stream.transformToPair((x, time) -> null); - JavaDStream<Integer> pairTransformed1 = pairStream.transform(x -> null); - JavaDStream<Integer> pairTransformed2 = pairStream.transform((x, time) -> null); - JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(x -> null); - JavaPairDStream<String, String> pairTransformed4 = - pairStream.transformToPair((x, time) -> null); - - } - - @Test - public void testTransformWith() { - List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", "dodgers"), - new Tuple2<>("new york", "yankees")), - Arrays.asList( - new Tuple2<>("california", "sharks"), - new Tuple2<>("new york", "rangers"))); - - List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", "giants"), - new Tuple2<>("new york", "mets")), - Arrays.asList( - new Tuple2<>("california", "ducks"), - new Tuple2<>("new york", "islanders"))); - - - List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( - Sets.newHashSet( - new Tuple2<>("california", - new Tuple2<>("dodgers", "giants")), - new Tuple2<>("new york", - new Tuple2<>("yankees", "mets"))), - Sets.newHashSet( - new Tuple2<>("california", - new Tuple2<>("sharks", "ducks")), - new Tuple2<>("new york", - new Tuple2<>("rangers", "islanders")))); - - JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream1, 1); - JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); - - JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream2, 1); - JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - - JavaPairDStream<String, Tuple2<String, String>> joined = - pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y)); - - JavaTestUtils.attachTestOutputStream(joined); - List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList(); - for (List<Tuple2<String, Tuple2<String, String>>> res : result) { - unorderedResult.add(Sets.newHashSet(res)); - } - - Assert.assertEquals(expected, unorderedResult); - } - - - @Test - public void testVariousTransformWith() { - // tests whether all variations of transformWith can be called from Java - - List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1)); - List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x")); - JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1); - JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); - - List<List<Tuple2<String, Integer>>> pairInputData1 = - Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); - List<List<Tuple2<Double, Character>>> pairInputData2 = - Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x'))); - JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); - JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); - - JavaDStream<Double> transformed1 = stream1.transformWith(stream2, (x, y, z) -> null); - JavaDStream<Double> transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null); - - JavaPairDStream<Double, Double> transformed3 = - stream1.transformWithToPair(stream2,(x, y, z) -> null); - - JavaPairDStream<Double, Double> transformed4 = - stream1.transformWithToPair(pairStream1,(x, y, z) -> null); - - JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null); - - JavaDStream<Double> pairTransformed2_ = - pairStream1.transformWith(pairStream1,(x, y, z) -> null); - - JavaPairDStream<Double, Double> pairTransformed3 = - pairStream1.transformWithToPair(stream2,(x, y, z) -> null); - - JavaPairDStream<Double, Double> pairTransformed4 = - pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null); - } - - @Test - public void testStreamingContextTransform() { - List<List<Integer>> stream1input = Arrays.asList( - Arrays.asList(1), - Arrays.asList(2) - ); - - List<List<Integer>> stream2input = Arrays.asList( - Arrays.asList(3), - Arrays.asList(4) - ); - - List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList( - Arrays.asList(new Tuple2<>(1, "x")), - Arrays.asList(new Tuple2<>(2, "y")) - ); - - List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))), - Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y"))) - ); - - JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1); - JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1); - JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); - - List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2); - - // This is just to test whether this transform to JavaStream compiles - JavaDStream<Long> transformed1 = ssc.transform( - listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> { - Assert.assertEquals(2, listOfRDDs.size()); - return null; - }); - - List<JavaDStream<?>> listOfDStreams2 = - Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream()); - - JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair( - listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> { - Assert.assertEquals(3, listOfRDDs.size()); - JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0); - JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1); - JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2); - JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3); - PairFunction<Integer, Integer, Integer> mapToTuple = - (Integer i) -> new Tuple2<>(i, i); - return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); - }); - JavaTestUtils.attachTestOutputStream(transformed2); - List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result = - JavaTestUtils.runStreams(ssc, 2, 2); - Assert.assertEquals(expected, result); - } - - @Test - public void testFlatMap() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("go", "giants"), - Arrays.asList("boo", "dodgers"), - Arrays.asList("athletics")); - - List<List<String>> expected = Arrays.asList( - Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"), - Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"), - Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s")); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> flatMapped = stream.flatMap(s -> Lists.newArrayList(s.split("(?!^)"))); - JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testForeachRDD() { - final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0); - final Accumulator<Integer> accumEle = ssc.sc().accumulator(0); - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1,1,1), - Arrays.asList(1,1,1)); - - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output - - stream.foreachRDD(rdd -> { - accumRdd.add(1); - rdd.foreach(x -> accumEle.add(1)); - }); - - // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java - stream.foreachRDD((rdd, time) -> null); - - JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(2, accumRdd.value().intValue()); - Assert.assertEquals(6, accumEle.value().intValue()); - } - - @Test - public void testPairFlatMap() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("giants"), - Arrays.asList("dodgers"), - Arrays.asList("athletics")); - - List<List<Tuple2<Integer, String>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(6, "g"), - new Tuple2<>(6, "i"), - new Tuple2<>(6, "a"), - new Tuple2<>(6, "n"), - new Tuple2<>(6, "t"), - new Tuple2<>(6, "s")), - Arrays.asList( - new Tuple2<>(7, "d"), - new Tuple2<>(7, "o"), - new Tuple2<>(7, "d"), - new Tuple2<>(7, "g"), - new Tuple2<>(7, "e"), - new Tuple2<>(7, "r"), - new Tuple2<>(7, "s")), - Arrays.asList( - new Tuple2<>(9, "a"), - new Tuple2<>(9, "t"), - new Tuple2<>(9, "h"), - new Tuple2<>(9, "l"), - new Tuple2<>(9, "e"), - new Tuple2<>(9, "t"), - new Tuple2<>(9, "i"), - new Tuple2<>(9, "c"), - new Tuple2<>(9, "s"))); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> { - List<Tuple2<Integer, String>> out = Lists.newArrayList(); - for (String letter : s.split("(?!^)")) { - out.add(new Tuple2<>(s.length(), letter)); - } - return out; - }); - - JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - /* - * Performs an order-invariant comparison of lists representing two RDD streams. This allows - * us to account for ordering variation within individual RDD's which occurs during windowing. - */ - public static <T extends Comparable<T>> void assertOrderInvariantEquals( - List<List<T>> expected, List<List<T>> actual) { - expected.forEach(list -> Collections.sort(list)); - List<List<T>> sortedActual = new ArrayList<>(); - actual.forEach(list -> { - List<T> sortedList = new ArrayList<>(list); - Collections.sort(sortedList); - sortedActual.add(sortedList); - }); - Assert.assertEquals(expected, sortedActual); - } - - @Test - public void testPairFilter() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("giants", 6)), - Arrays.asList(new Tuple2<>("yankees", 7))); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = - stream.mapToPair(x -> new Tuple2<>(x, x.length())); - JavaPairDStream<String, Integer> filtered = pairStream.filter(x -> x._1().contains("a")); - JavaTestUtils.attachTestOutputStream(filtered); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "dodgers"), - new Tuple2<>("california", "giants"), - new Tuple2<>("new york", "yankees"), - new Tuple2<>("new york", "mets")), - Arrays.asList(new Tuple2<>("california", "sharks"), - new Tuple2<>("california", "ducks"), - new Tuple2<>("new york", "rangers"), - new Tuple2<>("new york", "islanders"))); - - List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 1), - new Tuple2<>("california", 3), - new Tuple2<>("new york", 4), - new Tuple2<>("new york", 1)), - Arrays.asList( - new Tuple2<>("california", 5), - new Tuple2<>("california", 5), - new Tuple2<>("new york", 3), - new Tuple2<>("new york", 1))); - - @Test - public void testPairMap() { // Maps pair -> pair of different type - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<Integer, String>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "california"), - new Tuple2<>(3, "california"), - new Tuple2<>(4, "new york"), - new Tuple2<>(1, "new york")), - Arrays.asList( - new Tuple2<>(5, "california"), - new Tuple2<>(5, "california"), - new Tuple2<>(3, "new york"), - new Tuple2<>(1, "new york"))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap()); - JavaTestUtils.attachTestOutputStream(reversed); - List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairMapPartitions() { // Maps pair -> pair of different type - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<Integer, String>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "california"), - new Tuple2<>(3, "california"), - new Tuple2<>(4, "new york"), - new Tuple2<>(1, "new york")), - Arrays.asList( - new Tuple2<>(5, "california"), - new Tuple2<>(5, "california"), - new Tuple2<>(3, "new york"), - new Tuple2<>(1, "new york"))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> { - LinkedList<Tuple2<Integer, String>> out = new LinkedList<>(); - while (in.hasNext()) { - Tuple2<String, Integer> next = in.next(); - out.add(next.swap()); - } - return out; - }); - - JavaTestUtils.attachTestOutputStream(reversed); - List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairMap2() { // Maps pair -> single - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(1, 3, 4, 1), - Arrays.asList(5, 5, 3, 1)); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream<Integer> reversed = pairStream.map(in -> in._2()); - JavaTestUtils.attachTestOutputStream(reversed); - List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair - List<List<Tuple2<String, Integer>>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>("hi", 1), - new Tuple2<>("ho", 2)), - Arrays.asList( - new Tuple2<>("hi", 1), - new Tuple2<>("ho", 2))); - - List<List<Tuple2<Integer, String>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "h"), - new Tuple2<>(1, "i"), - new Tuple2<>(2, "h"), - new Tuple2<>(2, "o")), - Arrays.asList( - new Tuple2<>(1, "h"), - new Tuple2<>(1, "i"), - new Tuple2<>(2, "h"), - new Tuple2<>(2, "o"))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> { - List<Tuple2<Integer, String>> out = new LinkedList<>(); - for (Character s : in._1().toCharArray()) { - out.add(new Tuple2<>(in._2(), s.toString())); - } - return out; - }); - - JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairReduceByKey() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList( - new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey((x, y) -> x + y); - - JavaTestUtils.attachTestOutputStream(reduced); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testCombineByKey() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList( - new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i, - (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2)); - - JavaTestUtils.attachTestOutputStream(combined); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByKeyAndWindow() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> reduceWindowed = - pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reduceWindowed); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testUpdateStateByKey() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> { - int out = 0; - if (state.isPresent()) { - out = out + state.get(); - } - for (Integer v : values) { - out = out + v; - } - return Optional.of(out); - }); - - JavaTestUtils.attachTestOutputStream(updated); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByKeyAndWindowWithInverse() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> reduceWindowed = - pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000), - new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reduceWindowed); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairTransform() { - List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>(3, 5), - new Tuple2<>(1, 5), - new Tuple2<>(4, 5), - new Tuple2<>(2, 5)), - Arrays.asList( - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5), - new Tuple2<>(1, 5))); - - List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, 5), - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5)), - Arrays.asList( - new Tuple2<>(1, 5), - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5))); - - JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey()); - - JavaTestUtils.attachTestOutputStream(sorted); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairToNormalRDDTransform() { - List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>(3, 5), - new Tuple2<>(1, 5), - new Tuple2<>(4, 5), - new Tuple2<>(2, 5)), - Arrays.asList( - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5), - new Tuple2<>(1, 5))); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(3, 1, 4, 2), - Arrays.asList(2, 3, 4, 1)); - - JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(x -> x._1())); - JavaTestUtils.attachTestOutputStream(firstParts); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testMapValues() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; - - List<List<Tuple2<String, String>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "DODGERS"), - new Tuple2<>("california", "GIANTS"), - new Tuple2<>("new york", "YANKEES"), - new Tuple2<>("new york", "METS")), - Arrays.asList(new Tuple2<>("california", "SHARKS"), - new Tuple2<>("california", "DUCKS"), - new Tuple2<>("new york", "RANGERS"), - new Tuple2<>("new york", "ISLANDERS"))); - - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase); - JavaTestUtils.attachTestOutputStream(mapped); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testFlatMapValues() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; - - List<List<Tuple2<String, String>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "dodgers1"), - new Tuple2<>("california", "dodgers2"), - new Tuple2<>("california", "giants1"), - new Tuple2<>("california", "giants2"), - new Tuple2<>("new york", "yankees1"), - new Tuple2<>("new york", "yankees2"), - new Tuple2<>("new york", "mets1"), - new Tuple2<>("new york", "mets2")), - Arrays.asList(new Tuple2<>("california", "sharks1"), - new Tuple2<>("california", "sharks2"), - new Tuple2<>("california", "ducks1"), - new Tuple2<>("california", "ducks2"), - new Tuple2<>("new york", "rangers1"), - new Tuple2<>("new york", "rangers2"), - new Tuple2<>("new york", "islanders1"), - new Tuple2<>("new york", "islanders2"))); - - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, String> flatMapped = - pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2")); - JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - Assert.assertEquals(expected, result); - } - - /** - * This test is only for testing the APIs. It's not necessary to run it. - */ - public void testMapWithStateAPI() { - JavaPairRDD<String, Boolean> initialRDD = null; - JavaPairDStream<String, Integer> wordsDstream = null; - - JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream = - wordsDstream.mapWithState( - StateSpec.<String, Integer, Boolean, Double> function((time, key, value, state) -> { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return Optional.of(2.0); - }).initialState(initialRDD) - .numPartitions(10) - .partitioner(new HashPartitioner(10)) - .timeout(Durations.seconds(10))); - - JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots(); - - JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 = - wordsDstream.mapWithState( - StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> { - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return 2.0; - }).initialState(initialRDD) - .numPartitions(10) - .partitioner(new HashPartitioner(10)) - .timeout(Durations.seconds(10))); - - JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots(); - } -} diff --git a/extras/java8-tests/src/test/resources/log4j.properties b/extras/java8-tests/src/test/resources/log4j.properties deleted file mode 100644 index eb3b1999eb..0000000000 --- a/extras/java8-tests/src/test/resources/log4j.properties +++ /dev/null @@ -1,28 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=true -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.spark-project.jetty=WARN -org.spark-project.jetty.LEVEL=WARN diff --git a/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala b/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala deleted file mode 100644 index fa0681db41..0000000000 --- a/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -/** - * Test cases where JDK8-compiled Scala user code is used with Spark. - */ -class JDK8ScalaSuite extends SparkFunSuite with SharedSparkContext { - test("basic RDD closure test (SPARK-6152)") { - sc.parallelize(1 to 1000).map(x => x * x).count() - } -} diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml deleted file mode 100644 index d1c38c7ca5..0000000000 --- a/extras/kinesis-asl-assembly/pom.xml +++ /dev/null @@ -1,181 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>2.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-kinesis-asl-assembly_2.11</artifactId> - <packaging>jar</packaging> - <name>Spark Project Kinesis Assembly</name> - <url>http://spark.apache.org/</url> - - <properties> - <sbt.project.name>streaming-kinesis-asl-assembly</sbt.project.name> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <!-- - Demote already included in the Spark assembly. - --> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-server</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>net.java.dev.jets3t</groupId> - <artifactId>jets3t</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-ipc</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-mapred</artifactId> - <classifier>${avro.mapred.classifier}</classifier> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-recipes</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - - <build> - <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <configuration> - <shadedArtifactAttached>false</shadedArtifactAttached> - <artifactSet> - <includes> - <include>*:*</include> - </includes> - </artifactSet> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.RSA</exclude> - </excludes> - </filter> - </filters> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> - <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> - <resource>reference.conf</resource> - </transformer> - <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer"> - <resource>log4j.properties</resource> - </transformer> - <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/> - <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - </plugins> -</build> -</project> - diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml deleted file mode 100644 index 935155eb5d..0000000000 --- a/extras/kinesis-asl/pom.xml +++ /dev/null @@ -1,87 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -~ Licensed to the Apache Software Foundation (ASF) under one or more -~ contributor license agreements. See the NOTICE file distributed with -~ this work for additional information regarding copyright ownership. -~ The ASF licenses this file to You under the Apache License, Version 2.0 -~ (the "License"); you may not use this file except in compliance with -~ the License. You may obtain a copy of the License at -~ -~ http://www.apache.org/licenses/LICENSE-2.0 -~ -~ Unless required by applicable law or agreed to in writing, software -~ distributed under the License is distributed on an "AS IS" BASIS, -~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -~ See the License for the specific language governing permissions and -~ limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>2.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <!-- Kinesis integration is not included by default due to ASL-licensed code. --> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-kinesis-asl_2.11</artifactId> - <packaging>jar</packaging> - <name>Spark Kinesis Integration</name> - - <properties> - <sbt.project.name>streaming-kinesis-asl</sbt.project.name> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>amazon-kinesis-client</artifactId> - <version>${aws.kinesis.client.version}</version> - </dependency> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>amazon-kinesis-producer</artifactId> - <version>${aws.kinesis.producer.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_${scala.binary.version}</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-test-tags_${scala.binary.version}</artifactId> - </dependency> - </dependencies> - <build> - <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> - </build> -</project> diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java deleted file mode 100644 index 5dc825dfdc..0000000000 --- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.examples.streaming; - -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.regex.Pattern; - -import com.amazonaws.regions.RegionUtils; -import org.apache.log4j.Logger; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kinesis.KinesisUtils; - -import scala.Tuple2; - -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; - -/** - * Consumes messages from a Amazon Kinesis streams and does wordcount. - * - * This example spins up 1 Kinesis Receiver per shard for the given stream. - * It then starts pulling from the last checkpointed sequence number of the given stream. - * - * Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name] - * [app-name] is the name of the consumer app, used to track the read data in DynamoDB - * [stream-name] name of the Kinesis stream (ie. mySparkStream) - * [endpoint-url] endpoint of the Kinesis service - * (e.g. https://kinesis.us-east-1.amazonaws.com) - * - * - * Example: - * # export AWS keys if necessary - * $ export AWS_ACCESS_KEY_ID=[your-access-key] - * $ export AWS_SECRET_KEY=<your-secret-key> - * - * # run the example - * $ SPARK_HOME/bin/run-example streaming.JavaKinesisWordCountASL myAppName mySparkStream \ - * https://kinesis.us-east-1.amazonaws.com - * - * There is a companion helper class called KinesisWordProducerASL which puts dummy data - * onto the Kinesis stream. - * - * This code uses the DefaultAWSCredentialsProviderChain to find credentials - * in the following order: - * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY - * Java System Properties - aws.accessKeyId and aws.secretKey - * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs - * Instance profile credentials - delivered through the Amazon EC2 metadata service - * For more information, see - * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html - * - * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on - * the Kinesis Spark Streaming integration. - */ -public final class JavaKinesisWordCountASL { // needs to be public for access from run-example - private static final Pattern WORD_SEPARATOR = Pattern.compile(" "); - private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class); - - public static void main(String[] args) { - // Check that all required args were passed in. - if (args.length != 3) { - System.err.println( - "Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n\n" + - " <app-name> is the name of the app, used to track the read data in DynamoDB\n" + - " <stream-name> is the name of the Kinesis stream\n" + - " <endpoint-url> is the endpoint of the Kinesis service\n" + - " (e.g. https://kinesis.us-east-1.amazonaws.com)\n" + - "Generate data for the Kinesis stream using the example KinesisWordProducerASL.\n" + - "See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more\n" + - "details.\n" - ); - System.exit(1); - } - - // Set default log4j logging level to WARN to hide Spark logs - StreamingExamples.setStreamingLogLevels(); - - // Populate the appropriate variables from the given args - String kinesisAppName = args[0]; - String streamName = args[1]; - String endpointUrl = args[2]; - - // Create a Kinesis client in order to determine the number of shards for the given stream - AmazonKinesisClient kinesisClient = - new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()); - kinesisClient.setEndpoint(endpointUrl); - int numShards = - kinesisClient.describeStream(streamName).getStreamDescription().getShards().size(); - - - // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard. - // This is not a necessity; if there are less receivers/DStreams than the number of shards, - // then the shards will be automatically distributed among the receivers and each receiver - // will receive data from multiple shards. - int numStreams = numShards; - - // Spark Streaming batch interval - Duration batchInterval = new Duration(2000); - - // Kinesis checkpoint interval. Same as batchInterval for this example. - Duration kinesisCheckpointInterval = batchInterval; - - // Get the region name from the endpoint URL to save Kinesis Client Library metadata in - // DynamoDB of the same region as the Kinesis stream - String regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName(); - - // Setup the Spark config and StreamingContext - SparkConf sparkConfig = new SparkConf().setAppName("JavaKinesisWordCountASL"); - JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval); - - // Create the Kinesis DStreams - List<JavaDStream<byte[]>> streamsList = new ArrayList<>(numStreams); - for (int i = 0; i < numStreams; i++) { - streamsList.add( - KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName, - InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2()) - ); - } - - // Union all the streams if there is more than 1 stream - JavaDStream<byte[]> unionStreams; - if (streamsList.size() > 1) { - unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size())); - } else { - // Otherwise, just use the 1 stream - unionStreams = streamsList.get(0); - } - - // Convert each line of Array[Byte] to String, and split into words - JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() { - @Override - public Iterator<String> call(byte[] line) { - String s = new String(line, StandardCharsets.UTF_8); - return Arrays.asList(WORD_SEPARATOR.split(s)).iterator(); - } - }); - - // Map each word to a (word, 1) tuple so we can reduce by key to count the words - JavaPairDStream<String, Integer> wordCounts = words.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<String, Integer>(s, 1); - } - } - ).reduceByKey( - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - } - ); - - // Print the first 10 wordCounts - wordCounts.print(); - - // Start the streaming context and await termination - jssc.start(); - jssc.awaitTermination(); - } -} diff --git a/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py b/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py deleted file mode 100644 index 51f8c5ca66..0000000000 --- a/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py +++ /dev/null @@ -1,83 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" - Consumes messages from a Amazon Kinesis streams and does wordcount. - - This example spins up 1 Kinesis Receiver per shard for the given stream. - It then starts pulling from the last checkpointed sequence number of the given stream. - - Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name> - <app-name> is the name of the consumer app, used to track the read data in DynamoDB - <stream-name> name of the Kinesis stream (ie. mySparkStream) - <endpoint-url> endpoint of the Kinesis service - (e.g. https://kinesis.us-east-1.amazonaws.com) - - - Example: - # export AWS keys if necessary - $ export AWS_ACCESS_KEY_ID=<your-access-key> - $ export AWS_SECRET_KEY=<your-secret-key> - - # run the example - $ bin/spark-submit -jar extras/kinesis-asl/target/scala-*/\ - spark-streaming-kinesis-asl-assembly_*.jar \ - extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \ - myAppName mySparkStream https://kinesis.us-east-1.amazonaws.com - - There is a companion helper class called KinesisWordProducerASL which puts dummy data - onto the Kinesis stream. - - This code uses the DefaultAWSCredentialsProviderChain to find credentials - in the following order: - Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY - Java System Properties - aws.accessKeyId and aws.secretKey - Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs - Instance profile credentials - delivered through the Amazon EC2 metadata service - For more information, see - http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html - - See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on - the Kinesis Spark Streaming integration. -""" -from __future__ import print_function - -import sys - -from pyspark import SparkContext -from pyspark.streaming import StreamingContext -from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream - -if __name__ == "__main__": - if len(sys.argv) != 5: - print( - "Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>", - file=sys.stderr) - sys.exit(-1) - - sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl") - ssc = StreamingContext(sc, 1) - appName, streamName, endpointUrl, regionName = sys.argv[1:] - lines = KinesisUtils.createStream( - ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2) - counts = lines.flatMap(lambda line: line.split(" ")) \ - .map(lambda word: (word, 1)) \ - .reduceByKey(lambda a, b: a+b) - counts.pprint() - - ssc.start() - ssc.awaitTermination() diff --git a/extras/kinesis-asl/src/main/resources/log4j.properties b/extras/kinesis-asl/src/main/resources/log4j.properties deleted file mode 100644 index 6cdc9286c5..0000000000 --- a/extras/kinesis-asl/src/main/resources/log4j.properties +++ /dev/null @@ -1,37 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -log4j.rootCategory=WARN, console - -# File appender -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=false -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n - -# Console appender -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.out -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n - -# Settings to quiet third party logs that are too verbose -log4j.logger.org.spark-project.jetty=WARN -log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR -log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO -log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
\ No newline at end of file diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala deleted file mode 100644 index 6a73bc0e30..0000000000 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.streaming - -import java.nio.ByteBuffer - -import scala.util.Random - -import com.amazonaws.auth.{BasicAWSCredentials, DefaultAWSCredentialsProviderChain} -import com.amazonaws.regions.RegionUtils -import com.amazonaws.services.kinesis.AmazonKinesisClient -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream -import com.amazonaws.services.kinesis.model.PutRecordRequest -import org.apache.log4j.{Level, Logger} - -import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Milliseconds, StreamingContext} -import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions -import org.apache.spark.streaming.kinesis.KinesisUtils - - -/** - * Consumes messages from a Amazon Kinesis streams and does wordcount. - * - * This example spins up 1 Kinesis Receiver per shard for the given stream. - * It then starts pulling from the last checkpointed sequence number of the given stream. - * - * Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name> - * <app-name> is the name of the consumer app, used to track the read data in DynamoDB - * <stream-name> name of the Kinesis stream (ie. mySparkStream) - * <endpoint-url> endpoint of the Kinesis service - * (e.g. https://kinesis.us-east-1.amazonaws.com) - * - * - * Example: - * # export AWS keys if necessary - * $ export AWS_ACCESS_KEY_ID=<your-access-key> - * $ export AWS_SECRET_KEY=<your-secret-key> - * - * # run the example - * $ SPARK_HOME/bin/run-example streaming.KinesisWordCountASL myAppName mySparkStream \ - * https://kinesis.us-east-1.amazonaws.com - * - * There is a companion helper class called KinesisWordProducerASL which puts dummy data - * onto the Kinesis stream. - * - * This code uses the DefaultAWSCredentialsProviderChain to find credentials - * in the following order: - * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY - * Java System Properties - aws.accessKeyId and aws.secretKey - * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs - * Instance profile credentials - delivered through the Amazon EC2 metadata service - * For more information, see - * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html - * - * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on - * the Kinesis Spark Streaming integration. - */ -object KinesisWordCountASL extends Logging { - def main(args: Array[String]) { - // Check that all required args were passed in. - if (args.length != 3) { - System.err.println( - """ - |Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name> - | - | <app-name> is the name of the consumer app, used to track the read data in DynamoDB - | <stream-name> is the name of the Kinesis stream - | <endpoint-url> is the endpoint of the Kinesis service - | (e.g. https://kinesis.us-east-1.amazonaws.com) - | - |Generate input data for Kinesis stream using the example KinesisWordProducerASL. - |See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more - |details. - """.stripMargin) - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - // Populate the appropriate variables from the given args - val Array(appName, streamName, endpointUrl) = args - - - // Determine the number of shards from the stream using the low-level Kinesis Client - // from the AWS Java SDK. - val credentials = new DefaultAWSCredentialsProviderChain().getCredentials() - require(credentials != null, - "No AWS credentials found. Please specify credentials using one of the methods specified " + - "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html") - val kinesisClient = new AmazonKinesisClient(credentials) - kinesisClient.setEndpoint(endpointUrl) - val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size - - - // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard. - // This is not a necessity; if there are less receivers/DStreams than the number of shards, - // then the shards will be automatically distributed among the receivers and each receiver - // will receive data from multiple shards. - val numStreams = numShards - - // Spark Streaming batch interval - val batchInterval = Milliseconds(2000) - - // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information - // on sequence number of records that have been received. Same as batchInterval for this - // example. - val kinesisCheckpointInterval = batchInterval - - // Get the region name from the endpoint URL to save Kinesis Client Library metadata in - // DynamoDB of the same region as the Kinesis stream - val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() - - // Setup the SparkConfig and StreamingContext - val sparkConfig = new SparkConf().setAppName("KinesisWordCountASL") - val ssc = new StreamingContext(sparkConfig, batchInterval) - - // Create the Kinesis DStreams - val kinesisStreams = (0 until numStreams).map { i => - KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, - InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) - } - - // Union all the streams - val unionStreams = ssc.union(kinesisStreams) - - // Convert each line of Array[Byte] to String, and split into words - val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) - - // Map each word to a (word, 1) tuple so we can reduce by key to count the words - val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) - - // Print the first 10 wordCounts - wordCounts.print() - - // Start the streaming context and await termination - ssc.start() - ssc.awaitTermination() - } -} - -/** - * Usage: KinesisWordProducerASL <stream-name> <endpoint-url> \ - * <records-per-sec> <words-per-record> - * - * <stream-name> is the name of the Kinesis stream (ie. mySparkStream) - * <endpoint-url> is the endpoint of the Kinesis service - * (ie. https://kinesis.us-east-1.amazonaws.com) - * <records-per-sec> is the rate of records per second to put onto the stream - * <words-per-record> is the rate of records per second to put onto the stream - * - * Example: - * $ SPARK_HOME/bin/run-example streaming.KinesisWordProducerASL mySparkStream \ - * https://kinesis.us-east-1.amazonaws.com us-east-1 10 5 - */ -object KinesisWordProducerASL { - def main(args: Array[String]) { - if (args.length != 4) { - System.err.println( - """ - |Usage: KinesisWordProducerASL <stream-name> <endpoint-url> <records-per-sec> - <words-per-record> - | - | <stream-name> is the name of the Kinesis stream - | <endpoint-url> is the endpoint of the Kinesis service - | (e.g. https://kinesis.us-east-1.amazonaws.com) - | <records-per-sec> is the rate of records per second to put onto the stream - | <words-per-record> is the rate of records per second to put onto the stream - | - """.stripMargin) - - System.exit(1) - } - - // Set default log4j logging level to WARN to hide Spark logs - StreamingExamples.setStreamingLogLevels() - - // Populate the appropriate variables from the given args - val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args - - // Generate the records and return the totals - val totals = generate(stream, endpoint, recordsPerSecond.toInt, - wordsPerRecord.toInt) - - // Print the array of (word, total) tuples - println("Totals for the words sent") - totals.foreach(println(_)) - } - - def generate(stream: String, - endpoint: String, - recordsPerSecond: Int, - wordsPerRecord: Int): Seq[(String, Int)] = { - - val randomWords = List("spark", "you", "are", "my", "father") - val totals = scala.collection.mutable.Map[String, Int]() - - // Create the low-level Kinesis Client from the AWS Java SDK. - val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) - kinesisClient.setEndpoint(endpoint) - - println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" + - s" $recordsPerSecond records per second and $wordsPerRecord words per record") - - // Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord - for (i <- 1 to 10) { - // Generate recordsPerSec records to put onto the stream - val records = (1 to recordsPerSecond.toInt).foreach { recordNum => - // Randomly generate wordsPerRecord number of words - val data = (1 to wordsPerRecord.toInt).map(x => { - // Get a random index to a word - val randomWordIdx = Random.nextInt(randomWords.size) - val randomWord = randomWords(randomWordIdx) - - // Increment total count to compare to server counts later - totals(randomWord) = totals.getOrElse(randomWord, 0) + 1 - - randomWord - }).mkString(" ") - - // Create a partitionKey based on recordNum - val partitionKey = s"partitionKey-$recordNum" - - // Create a PutRecordRequest with an Array[Byte] version of the data - val putRecordRequest = new PutRecordRequest().withStreamName(stream) - .withPartitionKey(partitionKey) - .withData(ByteBuffer.wrap(data.getBytes())) - - // Put the record onto the stream and capture the PutRecordResult - val putRecordResult = kinesisClient.putRecord(putRecordRequest) - } - - // Sleep for a second - Thread.sleep(1000) - println("Sent " + recordsPerSecond + " records") - } - // Convert the totals to (index, total) tuple - totals.toSeq.sortBy(_._1) - } -} - -/** - * Utility functions for Spark Streaming examples. - * This has been lifted from the examples/ project to remove the circular dependency. - */ -private[streaming] object StreamingExamples extends Logging { - // Set reasonable logging levels for streaming if the user has not configured log4j. - def setStreamingLogLevels() { - val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements - if (!log4jInitialized) { - // We first log something to initialize Spark's default logging, then we override the - // logging level. - logInfo("Setting log level to [WARN] for streaming example." + - " To override add a custom log4j.properties to the classpath.") - Logger.getRootLogger.setLevel(Level.WARN) - } - } -} -// scalastyle:on println diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala deleted file mode 100644 index 3996f168e6..0000000000 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis - -import scala.collection.JavaConverters._ -import scala.reflect.ClassTag -import scala.util.control.NonFatal - -import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} -import com.amazonaws.services.kinesis.AmazonKinesisClient -import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord -import com.amazonaws.services.kinesis.model._ - -import org.apache.spark._ -import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition} -import org.apache.spark.storage.BlockId -import org.apache.spark.util.NextIterator - - -/** Class representing a range of Kinesis sequence numbers. Both sequence numbers are inclusive. */ -private[kinesis] -case class SequenceNumberRange( - streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String) - -/** Class representing an array of Kinesis sequence number ranges */ -private[kinesis] -case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) { - def isEmpty(): Boolean = ranges.isEmpty - - def nonEmpty(): Boolean = ranges.nonEmpty - - override def toString(): String = ranges.mkString("SequenceNumberRanges(", ", ", ")") -} - -private[kinesis] -object SequenceNumberRanges { - def apply(range: SequenceNumberRange): SequenceNumberRanges = { - new SequenceNumberRanges(Seq(range)) - } -} - - -/** Partition storing the information of the ranges of Kinesis sequence numbers to read */ -private[kinesis] -class KinesisBackedBlockRDDPartition( - idx: Int, - blockId: BlockId, - val isBlockIdValid: Boolean, - val seqNumberRanges: SequenceNumberRanges - ) extends BlockRDDPartition(blockId, idx) - -/** - * A BlockRDD where the block data is backed by Kinesis, which can accessed using the - * sequence numbers of the corresponding blocks. - */ -private[kinesis] -class KinesisBackedBlockRDD[T: ClassTag]( - sc: SparkContext, - val regionName: String, - val endpointUrl: String, - @transient private val _blockIds: Array[BlockId], - @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges], - @transient private val isBlockIdValid: Array[Boolean] = Array.empty, - val retryTimeoutMs: Int = 10000, - val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _, - val awsCredentialsOption: Option[SerializableAWSCredentials] = None - ) extends BlockRDD[T](sc, _blockIds) { - - require(_blockIds.length == arrayOfseqNumberRanges.length, - "Number of blockIds is not equal to the number of sequence number ranges") - - override def isValid(): Boolean = true - - override def getPartitions: Array[Partition] = { - Array.tabulate(_blockIds.length) { i => - val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i) - new KinesisBackedBlockRDDPartition(i, _blockIds(i), isValid, arrayOfseqNumberRanges(i)) - } - } - - override def compute(split: Partition, context: TaskContext): Iterator[T] = { - val blockManager = SparkEnv.get.blockManager - val partition = split.asInstanceOf[KinesisBackedBlockRDDPartition] - val blockId = partition.blockId - - def getBlockFromBlockManager(): Option[Iterator[T]] = { - logDebug(s"Read partition data of $this from block manager, block $blockId") - blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]]) - } - - def getBlockFromKinesis(): Iterator[T] = { - val credentials = awsCredentialsOption.getOrElse { - new DefaultAWSCredentialsProviderChain().getCredentials() - } - partition.seqNumberRanges.ranges.iterator.flatMap { range => - new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName, - range, retryTimeoutMs).map(messageHandler) - } - } - if (partition.isBlockIdValid) { - getBlockFromBlockManager().getOrElse { getBlockFromKinesis() } - } else { - getBlockFromKinesis() - } - } -} - - -/** - * An iterator that return the Kinesis data based on the given range of sequence numbers. - * Internally, it repeatedly fetches sets of records starting from the fromSequenceNumber, - * until the endSequenceNumber is reached. - */ -private[kinesis] -class KinesisSequenceRangeIterator( - credentials: AWSCredentials, - endpointUrl: String, - regionId: String, - range: SequenceNumberRange, - retryTimeoutMs: Int) extends NextIterator[Record] with Logging { - - private val client = new AmazonKinesisClient(credentials) - private val streamName = range.streamName - private val shardId = range.shardId - - private var toSeqNumberReceived = false - private var lastSeqNumber: String = null - private var internalIterator: Iterator[Record] = null - - client.setEndpoint(endpointUrl, "kinesis", regionId) - - override protected def getNext(): Record = { - var nextRecord: Record = null - if (toSeqNumberReceived) { - finished = true - } else { - - if (internalIterator == null) { - - // If the internal iterator has not been initialized, - // then fetch records from starting sequence number - internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber) - } else if (!internalIterator.hasNext) { - - // If the internal iterator does not have any more records, - // then fetch more records after the last consumed sequence number - internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber) - } - - if (!internalIterator.hasNext) { - - // If the internal iterator still does not have any data, then throw exception - // and terminate this iterator - finished = true - throw new SparkException( - s"Could not read until the end sequence number of the range: $range") - } else { - - // Get the record, copy the data into a byte array and remember its sequence number - nextRecord = internalIterator.next() - lastSeqNumber = nextRecord.getSequenceNumber() - - // If the this record's sequence number matches the stopping sequence number, then make sure - // the iterator is marked finished next time getNext() is called - if (nextRecord.getSequenceNumber == range.toSeqNumber) { - toSeqNumberReceived = true - } - } - } - nextRecord - } - - override protected def close(): Unit = { - client.shutdown() - } - - /** - * Get records starting from or after the given sequence number. - */ - private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Iterator[Record] = { - val shardIterator = getKinesisIterator(iteratorType, seqNum) - val result = getRecordsAndNextKinesisIterator(shardIterator) - result._1 - } - - /** - * Get the records starting from using a Kinesis shard iterator (which is a progress handle - * to get records from Kinesis), and get the next shard iterator for next consumption. - */ - private def getRecordsAndNextKinesisIterator( - shardIterator: String): (Iterator[Record], String) = { - val getRecordsRequest = new GetRecordsRequest - getRecordsRequest.setRequestCredentials(credentials) - getRecordsRequest.setShardIterator(shardIterator) - val getRecordsResult = retryOrTimeout[GetRecordsResult]( - s"getting records using shard iterator") { - client.getRecords(getRecordsRequest) - } - // De-aggregate records, if KPL was used in producing the records. The KCL automatically - // handles de-aggregation during regular operation. This code path is used during recovery - val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords) - (recordIterator.iterator().asScala, getRecordsResult.getNextShardIterator) - } - - /** - * Get the Kinesis shard iterator for getting records starting from or after the given - * sequence number. - */ - private def getKinesisIterator( - iteratorType: ShardIteratorType, - sequenceNumber: String): String = { - val getShardIteratorRequest = new GetShardIteratorRequest - getShardIteratorRequest.setRequestCredentials(credentials) - getShardIteratorRequest.setStreamName(streamName) - getShardIteratorRequest.setShardId(shardId) - getShardIteratorRequest.setShardIteratorType(iteratorType.toString) - getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber) - val getShardIteratorResult = retryOrTimeout[GetShardIteratorResult]( - s"getting shard iterator from sequence number $sequenceNumber") { - client.getShardIterator(getShardIteratorRequest) - } - getShardIteratorResult.getShardIterator - } - - /** Helper method to retry Kinesis API request with exponential backoff and timeouts */ - private def retryOrTimeout[T](message: String)(body: => T): T = { - import KinesisSequenceRangeIterator._ - - var startTimeMs = System.currentTimeMillis() - var retryCount = 0 - var waitTimeMs = MIN_RETRY_WAIT_TIME_MS - var result: Option[T] = None - var lastError: Throwable = null - - def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= retryTimeoutMs - def isMaxRetryDone = retryCount >= MAX_RETRIES - - while (result.isEmpty && !isTimedOut && !isMaxRetryDone) { - if (retryCount > 0) { // wait only if this is a retry - Thread.sleep(waitTimeMs) - waitTimeMs *= 2 // if you have waited, then double wait time for next round - } - try { - result = Some(body) - } catch { - case NonFatal(t) => - lastError = t - t match { - case ptee: ProvisionedThroughputExceededException => - logWarning(s"Error while $message [attempt = ${retryCount + 1}]", ptee) - case e: Throwable => - throw new SparkException(s"Error while $message", e) - } - } - retryCount += 1 - } - result.getOrElse { - if (isTimedOut) { - throw new SparkException( - s"Timed out after $retryTimeoutMs ms while $message, last exception: ", lastError) - } else { - throw new SparkException( - s"Gave up after $retryCount retries while $message, last exception: ", lastError) - } - } - } -} - -private[streaming] -object KinesisSequenceRangeIterator { - val MAX_RETRIES = 3 - val MIN_RETRY_WAIT_TIME_MS = 100 -} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala deleted file mode 100644 index 1ca6d4302c..0000000000 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.kinesis - -import java.util.concurrent._ - -import scala.util.control.NonFatal - -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason - -import org.apache.spark.Logging -import org.apache.spark.streaming.Duration -import org.apache.spark.streaming.util.RecurringTimer -import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} - -/** - * This is a helper class for managing Kinesis checkpointing. - * - * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint - * @param checkpointInterval How frequently we will checkpoint to DynamoDB - * @param workerId Worker Id of KCL worker for logging purposes - * @param clock In order to use ManualClocks for the purpose of testing - */ -private[kinesis] class KinesisCheckpointer( - receiver: KinesisReceiver[_], - checkpointInterval: Duration, - workerId: String, - clock: Clock = new SystemClock) extends Logging { - - // a map from shardId's to checkpointers - private val checkpointers = new ConcurrentHashMap[String, IRecordProcessorCheckpointer]() - - private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]() - - private val checkpointerThread: RecurringTimer = startCheckpointerThread() - - /** Update the checkpointer instance to the most recent one for the given shardId. */ - def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { - checkpointers.put(shardId, checkpointer) - } - - /** - * Stop tracking the specified shardId. - * - * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown [[ShutdownReason.TERMINATE]], - * we will use that to make the final checkpoint. If `null` is provided, we will not make the - * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]]. - */ - def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { - synchronized { - checkpointers.remove(shardId) - checkpoint(shardId, checkpointer) - } - } - - /** Perform the checkpoint. */ - private def checkpoint(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { - try { - if (checkpointer != null) { - receiver.getLatestSeqNumToCheckpoint(shardId).foreach { latestSeqNum => - val lastSeqNum = lastCheckpointedSeqNums.get(shardId) - // Kinesis sequence numbers are monotonically increasing strings, therefore we can do - // safely do the string comparison - if (lastSeqNum == null || latestSeqNum > lastSeqNum) { - /* Perform the checkpoint */ - KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(latestSeqNum), 4, 100) - logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint at sequence number" + - s" $latestSeqNum for shardId $shardId") - lastCheckpointedSeqNums.put(shardId, latestSeqNum) - } - } - } else { - logDebug(s"Checkpointing skipped for shardId $shardId. Checkpointer not set.") - } - } catch { - case NonFatal(e) => - logWarning(s"Failed to checkpoint shardId $shardId to DynamoDB.", e) - } - } - - /** Checkpoint the latest saved sequence numbers for all active shardId's. */ - private def checkpointAll(): Unit = synchronized { - // if this method throws an exception, then the scheduled task will not run again - try { - val shardIds = checkpointers.keys() - while (shardIds.hasMoreElements) { - val shardId = shardIds.nextElement() - checkpoint(shardId, checkpointers.get(shardId)) - } - } catch { - case NonFatal(e) => - logWarning("Failed to checkpoint to DynamoDB.", e) - } - } - - /** - * Start the checkpointer thread with the given checkpoint duration. - */ - private def startCheckpointerThread(): RecurringTimer = { - val period = checkpointInterval.milliseconds - val threadName = s"Kinesis Checkpointer - Worker $workerId" - val timer = new RecurringTimer(clock, period, _ => checkpointAll(), threadName) - timer.start() - logDebug(s"Started checkpointer thread: $threadName") - timer - } - - /** - * Shutdown the checkpointer. Should be called on the onStop of the Receiver. - */ - def shutdown(): Unit = { - // the recurring timer checkpoints for us one last time. - checkpointerThread.stop(interruptTimer = false) - checkpointers.clear() - lastCheckpointedSeqNums.clear() - logInfo("Successfully shutdown Kinesis Checkpointer.") - } -} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala deleted file mode 100644 index 5223c81a8e..0000000000 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis - -import scala.reflect.ClassTag - -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream -import com.amazonaws.services.kinesis.model.Record - -import org.apache.spark.rdd.RDD -import org.apache.spark.storage.{BlockId, StorageLevel} -import org.apache.spark.streaming.{Duration, StreamingContext, Time} -import org.apache.spark.streaming.dstream.ReceiverInputDStream -import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.streaming.scheduler.ReceivedBlockInfo - -private[kinesis] class KinesisInputDStream[T: ClassTag]( - _ssc: StreamingContext, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointAppName: String, - checkpointInterval: Duration, - storageLevel: StorageLevel, - messageHandler: Record => T, - awsCredentialsOption: Option[SerializableAWSCredentials] - ) extends ReceiverInputDStream[T](_ssc) { - - private[streaming] - override def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = { - - // This returns true even for when blockInfos is empty - val allBlocksHaveRanges = blockInfos.map { _.metadataOption }.forall(_.nonEmpty) - - if (allBlocksHaveRanges) { - // Create a KinesisBackedBlockRDD, even when there are no blocks - val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray - val seqNumRanges = blockInfos.map { - _.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray - val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray - logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " + - s"seq number ranges: ${seqNumRanges.mkString(", ")} ") - new KinesisBackedBlockRDD( - context.sc, regionName, endpointUrl, blockIds, seqNumRanges, - isBlockIdValid = isBlockIdValid, - retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt, - messageHandler = messageHandler, - awsCredentialsOption = awsCredentialsOption) - } else { - logWarning("Kinesis sequence number information was not present with some block metadata," + - " it may not be possible to recover from failures") - super.createBlockRDD(time, blockInfos) - } - } - - override def getReceiver(): Receiver[T] = { - new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream, - checkpointAppName, checkpointInterval, storageLevel, messageHandler, awsCredentialsOption) - } -} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala deleted file mode 100644 index 48ee2a9597..0000000000 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ /dev/null @@ -1,361 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.kinesis - -import java.util.UUID -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.util.control.NonFatal - -import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain} -import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer, IRecordProcessorFactory} -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker} -import com.amazonaws.services.kinesis.model.Record - -import org.apache.spark.storage.{StorageLevel, StreamBlockId} -import org.apache.spark.streaming.Duration -import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver} -import org.apache.spark.util.Utils -import org.apache.spark.Logging - -private[kinesis] -case class SerializableAWSCredentials(accessKeyId: String, secretKey: String) - extends AWSCredentials { - override def getAWSAccessKeyId: String = accessKeyId - override def getAWSSecretKey: String = secretKey -} - -/** - * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. - * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: - * https://github.com/awslabs/amazon-kinesis-client - * - * The way this Receiver works is as follows: - * - * - The receiver starts a KCL Worker, which is essentially runs a threadpool of multiple - * KinesisRecordProcessor - * - Each KinesisRecordProcessor receives data from a Kinesis shard in batches. Each batch is - * inserted into a Block Generator, and the corresponding range of sequence numbers is recorded. - * - When the block generator defines a block, then the recorded sequence number ranges that were - * inserted into the block are recorded separately for being used later. - * - When the block is ready to be pushed, the block is pushed and the ranges are reported as - * metadata of the block. In addition, the ranges are used to find out the latest sequence - * number for each shard that can be checkpointed through the DynamoDB. - * - Periodically, each KinesisRecordProcessor checkpoints the latest successfully stored sequence - * number for it own shard. - * - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Region name used by the Kinesis Client Library for - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointAppName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams - * by the Kinesis Client Library. If you change the App name or Stream name, - * the KCL will throw errors. This usually requires deleting the backing - * DynamoDB table with the same name this Kinesis application. - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects - * @param awsCredentialsOption Optional AWS credentials, used when user directly specifies - * the credentials - */ -private[kinesis] class KinesisReceiver[T]( - val streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointAppName: String, - checkpointInterval: Duration, - storageLevel: StorageLevel, - messageHandler: Record => T, - awsCredentialsOption: Option[SerializableAWSCredentials]) - extends Receiver[T](storageLevel) with Logging { receiver => - - /* - * ================================================================================= - * The following vars are initialize in the onStart() method which executes in the - * Spark worker after this Receiver is serialized and shipped to the worker. - * ================================================================================= - */ - - /** - * workerId is used by the KCL should be based on the ip address of the actual Spark Worker - * where this code runs (not the driver's IP address.) - */ - @volatile private var workerId: String = null - - /** - * Worker is the core client abstraction from the Kinesis Client Library (KCL). - * A worker can process more than one shards from the given stream. - * Each shard is assigned its own IRecordProcessor and the worker run multiple such - * processors. - */ - @volatile private var worker: Worker = null - @volatile private var workerThread: Thread = null - - /** BlockGenerator used to generates blocks out of Kinesis data */ - @volatile private var blockGenerator: BlockGenerator = null - - /** - * Sequence number ranges added to the current block being generated. - * Accessing and updating of this map is synchronized by locks in BlockGenerator. - */ - private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange] - - /** Sequence number ranges of data added to each generated block */ - private val blockIdToSeqNumRanges = new ConcurrentHashMap[StreamBlockId, SequenceNumberRanges] - - /** - * The centralized kinesisCheckpointer that checkpoints based on the given checkpointInterval. - */ - @volatile private var kinesisCheckpointer: KinesisCheckpointer = null - - /** - * Latest sequence number ranges that have been stored successfully. - * This is used for checkpointing through KCL */ - private val shardIdToLatestStoredSeqNum = new ConcurrentHashMap[String, String] - - /** - * This is called when the KinesisReceiver starts and must be non-blocking. - * The KCL creates and manages the receiving/processing thread pool through Worker.run(). - */ - override def onStart() { - blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler) - - workerId = Utils.localHostName() + ":" + UUID.randomUUID() - - kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId) - // KCL config instance - val awsCredProvider = resolveAWSCredentialsProvider() - val kinesisClientLibConfiguration = - new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId) - .withKinesisEndpoint(endpointUrl) - .withInitialPositionInStream(initialPositionInStream) - .withTaskBackoffTimeMillis(500) - .withRegionName(regionName) - - /* - * RecordProcessorFactory creates impls of IRecordProcessor. - * IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the - * IRecordProcessor.processRecords() method. - * We're using our custom KinesisRecordProcessor in this case. - */ - val recordProcessorFactory = new IRecordProcessorFactory { - override def createProcessor: IRecordProcessor = - new KinesisRecordProcessor(receiver, workerId) - } - - worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration) - workerThread = new Thread() { - override def run(): Unit = { - try { - worker.run() - } catch { - case NonFatal(e) => - restart("Error running the KCL worker in Receiver", e) - } - } - } - - blockIdToSeqNumRanges.clear() - blockGenerator.start() - - workerThread.setName(s"Kinesis Receiver ${streamId}") - workerThread.setDaemon(true) - workerThread.start() - - logInfo(s"Started receiver with workerId $workerId") - } - - /** - * This is called when the KinesisReceiver stops. - * The KCL worker.shutdown() method stops the receiving/processing threads. - * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown. - */ - override def onStop() { - if (workerThread != null) { - if (worker != null) { - worker.shutdown() - worker = null - } - workerThread.join() - workerThread = null - logInfo(s"Stopped receiver for workerId $workerId") - } - workerId = null - if (kinesisCheckpointer != null) { - kinesisCheckpointer.shutdown() - kinesisCheckpointer = null - } - } - - /** Add records of the given shard to the current block being generated */ - private[kinesis] def addRecords(shardId: String, records: java.util.List[Record]): Unit = { - if (records.size > 0) { - val dataIterator = records.iterator().asScala.map(messageHandler) - val metadata = SequenceNumberRange(streamName, shardId, - records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber()) - blockGenerator.addMultipleDataWithCallback(dataIterator, metadata) - } - } - - /** Get the latest sequence number for the given shard that can be checkpointed through KCL */ - private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = { - Option(shardIdToLatestStoredSeqNum.get(shardId)) - } - - /** - * Set the checkpointer that will be used to checkpoint sequence numbers to DynamoDB for the - * given shardId. - */ - def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { - assert(kinesisCheckpointer != null, "Kinesis Checkpointer not initialized!") - kinesisCheckpointer.setCheckpointer(shardId, checkpointer) - } - - /** - * Remove the checkpointer for the given shardId. The provided checkpointer will be used to - * checkpoint one last time for the given shard. If `checkpointer` is `null`, then we will not - * checkpoint. - */ - def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = { - assert(kinesisCheckpointer != null, "Kinesis Checkpointer not initialized!") - kinesisCheckpointer.removeCheckpointer(shardId, checkpointer) - } - - /** - * Remember the range of sequence numbers that was added to the currently active block. - * Internally, this is synchronized with `finalizeRangesForCurrentBlock()`. - */ - private def rememberAddedRange(range: SequenceNumberRange): Unit = { - seqNumRangesInCurrentBlock += range - } - - /** - * Finalize the ranges added to the block that was active and prepare the ranges buffer - * for next block. Internally, this is synchronized with `rememberAddedRange()`. - */ - private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = { - blockIdToSeqNumRanges.put(blockId, SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray)) - seqNumRangesInCurrentBlock.clear() - logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges") - } - - /** Store the block along with its associated ranges */ - private def storeBlockWithRanges( - blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = { - val rangesToReportOption = Option(blockIdToSeqNumRanges.remove(blockId)) - if (rangesToReportOption.isEmpty) { - stop("Error while storing block into Spark, could not find sequence number ranges " + - s"for block $blockId") - return - } - - val rangesToReport = rangesToReportOption.get - var attempt = 0 - var stored = false - var throwable: Throwable = null - while (!stored && attempt <= 3) { - try { - store(arrayBuffer, rangesToReport) - stored = true - } catch { - case NonFatal(th) => - attempt += 1 - throwable = th - } - } - if (!stored) { - stop("Error while storing block into Spark", throwable) - } - - // Update the latest sequence number that have been successfully stored for each shard - // Note that we are doing this sequentially because the array of sequence number ranges - // is assumed to be - rangesToReport.ranges.foreach { range => - shardIdToLatestStoredSeqNum.put(range.shardId, range.toSeqNumber) - } - } - - /** - * If AWS credential is provided, return a AWSCredentialProvider returning that credential. - * Otherwise, return the DefaultAWSCredentialsProviderChain. - */ - private def resolveAWSCredentialsProvider(): AWSCredentialsProvider = { - awsCredentialsOption match { - case Some(awsCredentials) => - logInfo("Using provided AWS credentials") - new AWSCredentialsProvider { - override def getCredentials: AWSCredentials = awsCredentials - override def refresh(): Unit = { } - } - case None => - logInfo("Using DefaultAWSCredentialsProviderChain") - new DefaultAWSCredentialsProviderChain() - } - } - - - /** - * Class to handle blocks generated by this receiver's block generator. Specifically, in - * the context of the Kinesis Receiver, this handler does the following. - * - * - When an array of records is added to the current active block in the block generator, - * this handler keeps track of the corresponding sequence number range. - * - When the currently active block is ready to sealed (not more records), this handler - * keep track of the list of ranges added into this block in another H - */ - private class GeneratedBlockHandler extends BlockGeneratorListener { - - /** - * Callback method called after a data item is added into the BlockGenerator. - * The data addition, block generation, and calls to onAddData and onGenerateBlock - * are all synchronized through the same lock. - */ - def onAddData(data: Any, metadata: Any): Unit = { - rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange]) - } - - /** - * Callback method called after a block has been generated. - * The data addition, block generation, and calls to onAddData and onGenerateBlock - * are all synchronized through the same lock. - */ - def onGenerateBlock(blockId: StreamBlockId): Unit = { - finalizeRangesForCurrentBlock(blockId) - } - - /** Callback method called when a block is ready to be pushed / stored. */ - def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { - storeBlockWithRanges(blockId, - arrayBuffer.asInstanceOf[mutable.ArrayBuffer[T]]) - } - - /** Callback called in case of any error in internal of the BlockGenerator */ - def onError(message: String, throwable: Throwable): Unit = { - reportError(message, throwable) - } - } -} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala deleted file mode 100644 index b5b76cb92d..0000000000 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.kinesis - -import java.util.List - -import scala.util.Random -import scala.util.control.NonFatal - -import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException} -import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer} -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason -import com.amazonaws.services.kinesis.model.Record - -import org.apache.spark.Logging -import org.apache.spark.streaming.Duration - -/** - * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor. - * This implementation operates on the Array[Byte] from the KinesisReceiver. - * The Kinesis Worker creates an instance of this KinesisRecordProcessor for each - * shard in the Kinesis stream upon startup. This is normally done in separate threads, - * but the KCLs within the KinesisReceivers will balance themselves out if you create - * multiple Receivers. - * - * @param receiver Kinesis receiver - * @param workerId for logging purposes - */ -private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], workerId: String) - extends IRecordProcessor with Logging { - - // shardId populated during initialize() - @volatile - private var shardId: String = _ - - /** - * The Kinesis Client Library calls this method during IRecordProcessor initialization. - * - * @param shardId assigned by the KCL to this particular RecordProcessor. - */ - override def initialize(shardId: String) { - this.shardId = shardId - logInfo(s"Initialized workerId $workerId with shardId $shardId") - } - - /** - * This method is called by the KCL when a batch of records is pulled from the Kinesis stream. - * This is the record-processing bridge between the KCL's IRecordProcessor.processRecords() - * and Spark Streaming's Receiver.store(). - * - * @param batch list of records from the Kinesis stream shard - * @param checkpointer used to update Kinesis when this batch has been processed/stored - * in the DStream - */ - override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) { - if (!receiver.isStopped()) { - try { - receiver.addRecords(shardId, batch) - logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId") - receiver.setCheckpointer(shardId, checkpointer) - } catch { - case NonFatal(e) => { - /* - * If there is a failure within the batch, the batch will not be checkpointed. - * This will potentially cause records since the last checkpoint to be processed - * more than once. - */ - logError(s"Exception: WorkerId $workerId encountered and exception while storing " + - s" or checkpointing a batch for workerId $workerId and shardId $shardId.", e) - - /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */ - throw e - } - } - } else { - /* RecordProcessor has been stopped. */ - logInfo(s"Stopped: KinesisReceiver has stopped for workerId $workerId" + - s" and shardId $shardId. No more records will be processed.") - } - } - - /** - * Kinesis Client Library is shutting down this Worker for 1 of 2 reasons: - * 1) the stream is resharding by splitting or merging adjacent shards - * (ShutdownReason.TERMINATE) - * 2) the failed or latent Worker has stopped sending heartbeats for whatever reason - * (ShutdownReason.ZOMBIE) - * - * @param checkpointer used to perform a Kinesis checkpoint for ShutdownReason.TERMINATE - * @param reason for shutdown (ShutdownReason.TERMINATE or ShutdownReason.ZOMBIE) - */ - override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) { - logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason") - reason match { - /* - * TERMINATE Use Case. Checkpoint. - * Checkpoint to indicate that all records from the shard have been drained and processed. - * It's now OK to read from the new shards that resulted from a resharding event. - */ - case ShutdownReason.TERMINATE => - receiver.removeCheckpointer(shardId, checkpointer) - - /* - * ZOMBIE Use Case or Unknown reason. NoOp. - * No checkpoint because other workers may have taken over and already started processing - * the same records. - * This may lead to records being processed more than once. - */ - case _ => - receiver.removeCheckpointer(shardId, null) // return null so that we don't checkpoint - } - - } -} - -private[kinesis] object KinesisRecordProcessor extends Logging { - /** - * Retry the given amount of times with a random backoff time (millis) less than the - * given maxBackOffMillis - * - * @param expression expression to evalute - * @param numRetriesLeft number of retries left - * @param maxBackOffMillis: max millis between retries - * - * @return evaluation of the given expression - * @throws Unretryable exception, unexpected exception, - * or any exception that persists after numRetriesLeft reaches 0 - */ - @annotation.tailrec - def retryRandom[T](expression: => T, numRetriesLeft: Int, maxBackOffMillis: Int): T = { - util.Try { expression } match { - /* If the function succeeded, evaluate to x. */ - case util.Success(x) => x - /* If the function failed, either retry or throw the exception */ - case util.Failure(e) => e match { - /* Retry: Throttling or other Retryable exception has occurred */ - case _: ThrottlingException | _: KinesisClientLibDependencyException if numRetriesLeft > 1 - => { - val backOffMillis = Random.nextInt(maxBackOffMillis) - Thread.sleep(backOffMillis) - logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e) - retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis) - } - /* Throw: Shutdown has been requested by the Kinesis Client Library. */ - case _: ShutdownException => { - logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e) - throw e - } - /* Throw: Non-retryable exception has occurred with the Kinesis Client Library */ - case _: InvalidStateException => { - logError(s"InvalidStateException: Cannot save checkpoint to the DynamoDB table used" + - s" by the Amazon Kinesis Client Library. Table likely doesn't exist.", e) - throw e - } - /* Throw: Unexpected exception has occurred */ - case _ => { - logError(s"Unexpected, non-retryable exception.", e) - throw e - } - } - } - } -} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala deleted file mode 100644 index 0ace453ee9..0000000000 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis - -import java.nio.ByteBuffer -import java.util.concurrent.TimeUnit - -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import scala.util.{Failure, Random, Success, Try} - -import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} -import com.amazonaws.regions.RegionUtils -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient -import com.amazonaws.services.dynamodbv2.document.DynamoDB -import com.amazonaws.services.kinesis.AmazonKinesisClient -import com.amazonaws.services.kinesis.model._ - -import org.apache.spark.Logging - -/** - * Shared utility methods for performing Kinesis tests that actually transfer data. - * - * PLEASE KEEP THIS FILE UNDER src/main AS PYTHON TESTS NEED ACCESS TO THIS FILE! - */ -private[kinesis] class KinesisTestUtils extends Logging { - - val endpointUrl = KinesisTestUtils.endpointUrl - val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() - val streamShardCount = 2 - - private val createStreamTimeoutSeconds = 300 - private val describeStreamPollTimeSeconds = 1 - - @volatile - private var streamCreated = false - - @volatile - private var _streamName: String = _ - - protected lazy val kinesisClient = { - val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials()) - client.setEndpoint(endpointUrl) - client - } - - private lazy val dynamoDB = { - val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain()) - dynamoDBClient.setRegion(RegionUtils.getRegion(regionName)) - new DynamoDB(dynamoDBClient) - } - - protected def getProducer(aggregate: Boolean): KinesisDataGenerator = { - if (!aggregate) { - new SimpleDataGenerator(kinesisClient) - } else { - throw new UnsupportedOperationException("Aggregation is not supported through this code path") - } - } - - def streamName: String = { - require(streamCreated, "Stream not yet created, call createStream() to create one") - _streamName - } - - def createStream(): Unit = { - require(!streamCreated, "Stream already created") - _streamName = findNonExistentStreamName() - - // Create a stream. The number of shards determines the provisioned throughput. - logInfo(s"Creating stream ${_streamName}") - val createStreamRequest = new CreateStreamRequest() - createStreamRequest.setStreamName(_streamName) - createStreamRequest.setShardCount(2) - kinesisClient.createStream(createStreamRequest) - - // The stream is now being created. Wait for it to become active. - waitForStreamToBeActive(_streamName) - streamCreated = true - logInfo(s"Created stream ${_streamName}") - } - - /** - * Push data to Kinesis stream and return a map of - * shardId -> seq of (data, seq number) pushed to corresponding shard - */ - def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]] = { - require(streamCreated, "Stream not yet created, call createStream() to create one") - val producer = getProducer(aggregate) - val shardIdToSeqNumbers = producer.sendData(streamName, testData) - logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}") - shardIdToSeqNumbers.toMap - } - - /** - * Expose a Python friendly API. - */ - def pushData(testData: java.util.List[Int]): Unit = { - pushData(testData.asScala, aggregate = false) - } - - def deleteStream(): Unit = { - try { - if (streamCreated) { - kinesisClient.deleteStream(streamName) - } - } catch { - case e: Exception => - logWarning(s"Could not delete stream $streamName") - } - } - - def deleteDynamoDBTable(tableName: String): Unit = { - try { - val table = dynamoDB.getTable(tableName) - table.delete() - table.waitForDelete() - } catch { - case e: Exception => - logWarning(s"Could not delete DynamoDB table $tableName") - } - } - - private def describeStream(streamNameToDescribe: String): Option[StreamDescription] = { - try { - val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe) - val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription() - Some(desc) - } catch { - case rnfe: ResourceNotFoundException => - None - } - } - - private def findNonExistentStreamName(): String = { - var testStreamName: String = null - do { - Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds)) - testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}" - } while (describeStream(testStreamName).nonEmpty) - testStreamName - } - - private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = { - val startTime = System.currentTimeMillis() - val endTime = startTime + TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds) - while (System.currentTimeMillis() < endTime) { - Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds)) - describeStream(streamNameToWaitFor).foreach { description => - val streamStatus = description.getStreamStatus() - logDebug(s"\t- current state: $streamStatus\n") - if ("ACTIVE".equals(streamStatus)) { - return - } - } - } - require(false, s"Stream $streamName never became active") - } -} - -private[kinesis] object KinesisTestUtils { - - val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS" - val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL" - val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com" - - lazy val shouldRunTests = { - val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1") - if (isEnvSet) { - // scalastyle:off println - // Print this so that they are easily visible on the console and not hidden in the log4j logs. - println( - s""" - |Kinesis tests that actually send data has been enabled by setting the environment - |variable $envVarNameForEnablingTests to 1. This will create Kinesis Streams and - |DynamoDB tables in AWS. Please be aware that this may incur some AWS costs. - |By default, the tests use the endpoint URL $defaultEndpointUrl to create Kinesis streams. - |To change this endpoint URL to a different region, you can set the environment variable - |$endVarNameForEndpoint to the desired endpoint URL - |(e.g. $endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com"). - """.stripMargin) - // scalastyle:on println - } - isEnvSet - } - - lazy val endpointUrl = { - val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl) - // scalastyle:off println - // Print this so that they are easily visible on the console and not hidden in the log4j logs. - println(s"Using endpoint URL $url for creating Kinesis streams for tests.") - // scalastyle:on println - url - } - - def isAWSCredentialsPresent: Boolean = { - Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess - } - - def getAWSCredentials(): AWSCredentials = { - assert(shouldRunTests, - "Kinesis test not enabled, should not attempt to get AWS credentials") - Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match { - case Success(cred) => cred - case Failure(e) => - throw new Exception( - s""" - |Kinesis tests enabled using environment variable $envVarNameForEnablingTests - |but could not find AWS credentials. Please follow instructions in AWS documentation - |to set the credentials in your system such that the DefaultAWSCredentialsProviderChain - |can find the credentials. - """.stripMargin) - } - } -} - -/** A wrapper interface that will allow us to consolidate the code for synthetic data generation. */ -private[kinesis] trait KinesisDataGenerator { - /** Sends the data to Kinesis and returns the metadata for everything that has been sent. */ - def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] -} - -private[kinesis] class SimpleDataGenerator( - client: AmazonKinesisClient) extends KinesisDataGenerator { - override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = { - val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]() - data.foreach { num => - val str = num.toString - val data = ByteBuffer.wrap(str.getBytes()) - val putRecordRequest = new PutRecordRequest().withStreamName(streamName) - .withData(data) - .withPartitionKey(str) - - val putRecordResult = client.putRecord(putRecordRequest) - val shardId = putRecordResult.getShardId - val seqNumber = putRecordResult.getSequenceNumber() - val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId, - new ArrayBuffer[(Int, String)]()) - sentSeqNumbers += ((num, seqNumber)) - } - - shardIdToSeqNumbers.toMap - } -} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala deleted file mode 100644 index 15ac588b82..0000000000 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala +++ /dev/null @@ -1,560 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.kinesis - -import scala.reflect.ClassTag - -import com.amazonaws.regions.RegionUtils -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream -import com.amazonaws.services.kinesis.model.Record - -import org.apache.spark.api.java.function.{Function => JFunction} -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Duration, StreamingContext} -import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} -import org.apache.spark.streaming.dstream.ReceiverInputDStream - -object KinesisUtils { - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets the AWS credentials. - * - * @param ssc StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - * @param messageHandler A custom message handler that can generate a generic output from a - * Kinesis `Record`, which contains both message data, and metadata. - */ - def createStream[T: ClassTag]( - ssc: StreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel, - messageHandler: Record => T): ReceiverInputDStream[T] = { - val cleanedHandler = ssc.sc.clean(messageHandler) - // Setting scope to override receiver stream's scope of "receiver stream" - ssc.withNamedScope("kinesis stream") { - new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, - cleanedHandler, None) - } - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: - * The given AWS credentials will get saved in DStream checkpoints if checkpointing - * is enabled. Make sure that your checkpoint directory is secure. - * - * @param ssc StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - * @param messageHandler A custom message handler that can generate a generic output from a - * Kinesis `Record`, which contains both message data, and metadata. - * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain) - * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain) - */ - // scalastyle:off - def createStream[T: ClassTag]( - ssc: StreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel, - messageHandler: Record => T, - awsAccessKeyId: String, - awsSecretKey: String): ReceiverInputDStream[T] = { - // scalastyle:on - val cleanedHandler = ssc.sc.clean(messageHandler) - ssc.withNamedScope("kinesis stream") { - new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, - cleanedHandler, Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey))) - } - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets the AWS credentials. - * - * @param ssc StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - */ - def createStream( - ssc: StreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = { - // Setting scope to override receiver stream's scope of "receiver stream" - ssc.withNamedScope("kinesis stream") { - new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, - defaultMessageHandler, None) - } - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: - * The given AWS credentials will get saved in DStream checkpoints if checkpointing - * is enabled. Make sure that your checkpoint directory is secure. - * - * @param ssc StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain) - * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain) - */ - def createStream( - ssc: StreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel, - awsAccessKeyId: String, - awsSecretKey: String): ReceiverInputDStream[Array[Byte]] = { - ssc.withNamedScope("kinesis stream") { - new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, - defaultMessageHandler, Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey))) - } - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: - * - * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets AWS credentials. - * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch. - * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name - * in [[org.apache.spark.SparkConf]]. - * - * @param ssc StreamingContext object - * @param streamName Kinesis stream name - * @param endpointUrl Endpoint url of Kinesis service - * (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param storageLevel Storage level to use for storing the received objects - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - */ - @deprecated("use other forms of createStream", "1.4.0") - def createStream( - ssc: StreamingContext, - streamName: String, - endpointUrl: String, - checkpointInterval: Duration, - initialPositionInStream: InitialPositionInStream, - storageLevel: StorageLevel - ): ReceiverInputDStream[Array[Byte]] = { - ssc.withNamedScope("kinesis stream") { - new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, - getRegionByEndpoint(endpointUrl), initialPositionInStream, ssc.sc.appName, - checkpointInterval, storageLevel, defaultMessageHandler, None) - } - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets the AWS credentials. - * - * @param jssc Java StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - * @param messageHandler A custom message handler that can generate a generic output from a - * Kinesis `Record`, which contains both message data, and metadata. - * @param recordClass Class of the records in DStream - */ - def createStream[T]( - jssc: JavaStreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel, - messageHandler: JFunction[Record, T], - recordClass: Class[T]): JavaReceiverInputDStream[T] = { - implicit val recordCmt: ClassTag[T] = ClassTag(recordClass) - val cleanedHandler = jssc.sparkContext.clean(messageHandler.call(_)) - createStream[T](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName, - initialPositionInStream, checkpointInterval, storageLevel, cleanedHandler) - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: - * The given AWS credentials will get saved in DStream checkpoints if checkpointing - * is enabled. Make sure that your checkpoint directory is secure. - * - * @param jssc Java StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - * @param messageHandler A custom message handler that can generate a generic output from a - * Kinesis `Record`, which contains both message data, and metadata. - * @param recordClass Class of the records in DStream - * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain) - * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain) - */ - // scalastyle:off - def createStream[T]( - jssc: JavaStreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel, - messageHandler: JFunction[Record, T], - recordClass: Class[T], - awsAccessKeyId: String, - awsSecretKey: String): JavaReceiverInputDStream[T] = { - // scalastyle:on - implicit val recordCmt: ClassTag[T] = ClassTag(recordClass) - val cleanedHandler = jssc.sparkContext.clean(messageHandler.call(_)) - createStream[T](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName, - initialPositionInStream, checkpointInterval, storageLevel, cleanedHandler, - awsAccessKeyId, awsSecretKey) - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets the AWS credentials. - * - * @param jssc Java StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - */ - def createStream( - jssc: JavaStreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel - ): JavaReceiverInputDStream[Array[Byte]] = { - createStream[Array[Byte]](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName, - initialPositionInStream, checkpointInterval, storageLevel, defaultMessageHandler(_)) - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: - * The given AWS credentials will get saved in DStream checkpoints if checkpointing - * is enabled. Make sure that your checkpoint directory is secure. - * - * @param jssc Java StreamingContext object - * @param kinesisAppName Kinesis application name used by the Kinesis Client Library - * (KCL) to update DynamoDB - * @param streamName Kinesis stream name - * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param regionName Name of region used by the Kinesis Client Library (KCL) to update - * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param storageLevel Storage level to use for storing the received objects. - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain) - * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain) - */ - def createStream( - jssc: JavaStreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: InitialPositionInStream, - checkpointInterval: Duration, - storageLevel: StorageLevel, - awsAccessKeyId: String, - awsSecretKey: String): JavaReceiverInputDStream[Array[Byte]] = { - createStream[Array[Byte]](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName, - initialPositionInStream, checkpointInterval, storageLevel, - defaultMessageHandler(_), awsAccessKeyId, awsSecretKey) - } - - /** - * Create an input stream that pulls messages from a Kinesis stream. - * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis. - * - * Note: - * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain - * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain - * gets AWS credentials. - * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch. - * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in - * [[org.apache.spark.SparkConf]]. - * - * @param jssc Java StreamingContext object - * @param streamName Kinesis stream name - * @param endpointUrl Endpoint url of Kinesis service - * (e.g., https://kinesis.us-east-1.amazonaws.com) - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. - * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the - * worker's initial starting position in the stream. - * The values are either the beginning of the stream - * per Kinesis' limit of 24 hours - * (InitialPositionInStream.TRIM_HORIZON) or - * the tip of the stream (InitialPositionInStream.LATEST). - * @param storageLevel Storage level to use for storing the received objects - * StorageLevel.MEMORY_AND_DISK_2 is recommended. - */ - @deprecated("use other forms of createStream", "1.4.0") - def createStream( - jssc: JavaStreamingContext, - streamName: String, - endpointUrl: String, - checkpointInterval: Duration, - initialPositionInStream: InitialPositionInStream, - storageLevel: StorageLevel - ): JavaReceiverInputDStream[Array[Byte]] = { - createStream( - jssc.ssc, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel) - } - - private def getRegionByEndpoint(endpointUrl: String): String = { - RegionUtils.getRegionByEndpoint(endpointUrl).getName() - } - - private def validateRegion(regionName: String): String = { - Option(RegionUtils.getRegion(regionName)).map { _.getName }.getOrElse { - throw new IllegalArgumentException(s"Region name '$regionName' is not valid") - } - } - - private[kinesis] def defaultMessageHandler(record: Record): Array[Byte] = { - if (record == null) return null - val byteBuffer = record.getData() - val byteArray = new Array[Byte](byteBuffer.remaining()) - byteBuffer.get(byteArray) - byteArray - } -} - -/** - * This is a helper class that wraps the methods in KinesisUtils into more Python-friendly class and - * function so that it can be easily instantiated and called from Python's KinesisUtils. - */ -private class KinesisUtilsPythonHelper { - - def getInitialPositionInStream(initialPositionInStream: Int): InitialPositionInStream = { - initialPositionInStream match { - case 0 => InitialPositionInStream.LATEST - case 1 => InitialPositionInStream.TRIM_HORIZON - case _ => throw new IllegalArgumentException( - "Illegal InitialPositionInStream. Please use " + - "InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON") - } - } - - def createStream( - jssc: JavaStreamingContext, - kinesisAppName: String, - streamName: String, - endpointUrl: String, - regionName: String, - initialPositionInStream: Int, - checkpointInterval: Duration, - storageLevel: StorageLevel, - awsAccessKeyId: String, - awsSecretKey: String - ): JavaReceiverInputDStream[Array[Byte]] = { - if (awsAccessKeyId == null && awsSecretKey != null) { - throw new IllegalArgumentException("awsSecretKey is set but awsAccessKeyId is null") - } - if (awsAccessKeyId != null && awsSecretKey == null) { - throw new IllegalArgumentException("awsAccessKeyId is set but awsSecretKey is null") - } - if (awsAccessKeyId == null && awsSecretKey == null) { - KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName, - getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel) - } else { - KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName, - getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel, - awsAccessKeyId, awsSecretKey) - } - } - -} diff --git a/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java deleted file mode 100644 index 5c2371c543..0000000000 --- a/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis; - -import com.amazonaws.services.kinesis.model.Record; -import org.junit.Test; - -import org.apache.spark.api.java.function.Function; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.LocalJavaStreamingContext; -import org.apache.spark.streaming.api.java.JavaDStream; - -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; - -/** - * Demonstrate the use of the KinesisUtils Java API - */ -public class JavaKinesisStreamSuite extends LocalJavaStreamingContext { - @Test - public void testKinesisStream() { - // Tests the API, does not actually test data receiving - JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream", - "https://kinesis.us-west-2.amazonaws.com", new Duration(2000), - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2()); - - ssc.stop(); - } - - - private static Function<Record, String> handler = new Function<Record, String>() { - @Override - public String call(Record record) { - return record.getPartitionKey() + "-" + record.getSequenceNumber(); - } - }; - - @Test - public void testCustomHandler() { - // Tests the API, does not actually test data receiving - JavaDStream<String> kinesisStream = KinesisUtils.createStream(ssc, "testApp", "mySparkStream", - "https://kinesis.us-west-2.amazonaws.com", "us-west-2", InitialPositionInStream.LATEST, - new Duration(2000), StorageLevel.MEMORY_AND_DISK_2(), handler, String.class); - - ssc.stop(); - } -} diff --git a/extras/kinesis-asl/src/test/resources/log4j.properties b/extras/kinesis-asl/src/test/resources/log4j.properties deleted file mode 100644 index edbecdae92..0000000000 --- a/extras/kinesis-asl/src/test/resources/log4j.properties +++ /dev/null @@ -1,27 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=true -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.spark-project.jetty=WARN diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala deleted file mode 100644 index fdb270eaad..0000000000 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.kinesis - -import java.nio.ByteBuffer - -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer, KinesisProducerConfiguration, UserRecordResult} -import com.google.common.util.concurrent.{FutureCallback, Futures} - -private[kinesis] class KPLBasedKinesisTestUtils extends KinesisTestUtils { - override protected def getProducer(aggregate: Boolean): KinesisDataGenerator = { - if (!aggregate) { - new SimpleDataGenerator(kinesisClient) - } else { - new KPLDataGenerator(regionName) - } - } -} - -/** A wrapper for the KinesisProducer provided in the KPL. */ -private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataGenerator { - - private lazy val producer: KPLProducer = { - val conf = new KinesisProducerConfiguration() - .setRecordMaxBufferedTime(1000) - .setMaxConnections(1) - .setRegion(regionName) - .setMetricsLevel("none") - - new KPLProducer(conf) - } - - override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = { - val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]() - data.foreach { num => - val str = num.toString - val data = ByteBuffer.wrap(str.getBytes()) - val future = producer.addUserRecord(streamName, str, data) - val kinesisCallBack = new FutureCallback[UserRecordResult]() { - override def onFailure(t: Throwable): Unit = {} // do nothing - - override def onSuccess(result: UserRecordResult): Unit = { - val shardId = result.getShardId - val seqNumber = result.getSequenceNumber() - val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId, - new ArrayBuffer[(Int, String)]()) - sentSeqNumbers += ((num, seqNumber)) - } - } - Futures.addCallback(future, kinesisCallBack) - } - producer.flushSync() - shardIdToSeqNumbers.toMap - } -} diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala deleted file mode 100644 index 2555332d22..0000000000 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis - -import org.scalatest.BeforeAndAfterEach - -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException} -import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} - -abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) - extends KinesisFunSuite with BeforeAndAfterEach with LocalSparkContext { - - private val testData = 1 to 8 - - private var testUtils: KinesisTestUtils = null - private var shardIds: Seq[String] = null - private var shardIdToData: Map[String, Seq[Int]] = null - private var shardIdToSeqNumbers: Map[String, Seq[String]] = null - private var shardIdToDataAndSeqNumbers: Map[String, Seq[(Int, String)]] = null - private var shardIdToRange: Map[String, SequenceNumberRange] = null - private var allRanges: Seq[SequenceNumberRange] = null - - private var blockManager: BlockManager = null - - override def beforeAll(): Unit = { - super.beforeAll() - runIfTestsEnabled("Prepare KinesisTestUtils") { - testUtils = new KPLBasedKinesisTestUtils() - testUtils.createStream() - - shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = aggregateTestData) - require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards") - - shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq - shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }} - shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }} - shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) => - val seqNumRange = SequenceNumberRange( - testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last) - (shardId, seqNumRange) - } - allRanges = shardIdToRange.values.toSeq - } - } - - override def beforeEach(): Unit = { - super.beforeEach() - val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite") - sc = new SparkContext(conf) - blockManager = sc.env.blockManager - } - - override def afterAll(): Unit = { - try { - if (testUtils != null) { - testUtils.deleteStream() - } - } finally { - super.afterAll() - } - } - - testIfEnabled("Basic reading from Kinesis") { - // Verify all data using multiple ranges in a single RDD partition - val receivedData1 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName, - testUtils.endpointUrl, fakeBlockIds(1), - Array(SequenceNumberRanges(allRanges.toArray)) - ).map { bytes => new String(bytes).toInt }.collect() - assert(receivedData1.toSet === testData.toSet) - - // Verify all data using one range in each of the multiple RDD partitions - val receivedData2 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName, - testUtils.endpointUrl, fakeBlockIds(allRanges.size), - allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray - ).map { bytes => new String(bytes).toInt }.collect() - assert(receivedData2.toSet === testData.toSet) - - // Verify ordering within each partition - val receivedData3 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName, - testUtils.endpointUrl, fakeBlockIds(allRanges.size), - allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray - ).map { bytes => new String(bytes).toInt }.collectPartitions() - assert(receivedData3.length === allRanges.size) - for (i <- 0 until allRanges.size) { - assert(receivedData3(i).toSeq === shardIdToData(allRanges(i).shardId)) - } - } - - testIfEnabled("Read data available in both block manager and Kinesis") { - testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2) - } - - testIfEnabled("Read data available only in block manager, not in Kinesis") { - testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0) - } - - testIfEnabled("Read data available only in Kinesis, not in block manager") { - testRDD(numPartitions = 2, numPartitionsInBM = 0, numPartitionsInKinesis = 2) - } - - testIfEnabled("Read data available partially in block manager, rest in Kinesis") { - testRDD(numPartitions = 2, numPartitionsInBM = 1, numPartitionsInKinesis = 1) - } - - testIfEnabled("Test isBlockValid skips block fetching from block manager") { - testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0, - testIsBlockValid = true) - } - - testIfEnabled("Test whether RDD is valid after removing blocks from block anager") { - testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2, - testBlockRemove = true) - } - - /** - * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager - * and the rest to a write ahead log, and then reading reading it all back using the RDD. - * It can also test if the partitions that were read from the log were again stored in - * block manager. - * - * - * - * @param numPartitions Number of partitions in RDD - * @param numPartitionsInBM Number of partitions to write to the BlockManager. - * Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager - * @param numPartitionsInKinesis Number of partitions to write to the Kinesis. - * Partitions (numPartitions - 1 - numPartitionsInKinesis) to - * (numPartitions - 1) will be written to Kinesis - * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching - * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with - * reads falling back to the WAL - * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4 - * - * numPartitionsInBM = 3 - * |------------------| - * | | - * 0 1 2 3 4 - * | | - * |-------------------------| - * numPartitionsInKinesis = 4 - */ - private def testRDD( - numPartitions: Int, - numPartitionsInBM: Int, - numPartitionsInKinesis: Int, - testIsBlockValid: Boolean = false, - testBlockRemove: Boolean = false - ): Unit = { - require(shardIds.size > 1, "Need at least 2 shards to test") - require(numPartitionsInBM <= shardIds.size, - "Number of partitions in BlockManager cannot be more than the Kinesis test shards available") - require(numPartitionsInKinesis <= shardIds.size, - "Number of partitions in Kinesis cannot be more than the Kinesis test shards available") - require(numPartitionsInBM <= numPartitions, - "Number of partitions in BlockManager cannot be more than that in RDD") - require(numPartitionsInKinesis <= numPartitions, - "Number of partitions in Kinesis cannot be more than that in RDD") - - // Put necessary blocks in the block manager - val blockIds = fakeBlockIds(numPartitions) - blockIds.foreach(blockManager.removeBlock(_)) - (0 until numPartitionsInBM).foreach { i => - val blockData = shardIdToData(shardIds(i)).iterator.map { _.toString.getBytes() } - blockManager.putIterator(blockIds(i), blockData, StorageLevel.MEMORY_ONLY) - } - - // Create the necessary ranges to use in the RDD - val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)( - SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))) - val realRanges = Array.tabulate(numPartitionsInKinesis) { i => - val range = shardIdToRange(shardIds(i + (numPartitions - numPartitionsInKinesis))) - SequenceNumberRanges(Array(range)) - } - val ranges = (fakeRanges ++ realRanges) - - - // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not - require( - blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty), - "Expected blocks not in BlockManager" - ) - - require( - blockIds.drop(numPartitionsInBM).forall(blockManager.get(_).isEmpty), - "Unexpected blocks in BlockManager" - ) - - // Make sure that the right sequence `numPartitionsInKinesis` are configured, and others are not - require( - ranges.takeRight(numPartitionsInKinesis).forall { - _.ranges.forall { _.streamName == testUtils.streamName } - }, "Incorrect configuration of RDD, expected ranges not set: " - ) - - require( - ranges.dropRight(numPartitionsInKinesis).forall { - _.ranges.forall { _.streamName != testUtils.streamName } - }, "Incorrect configuration of RDD, unexpected ranges set" - ) - - val rdd = new KinesisBackedBlockRDD[Array[Byte]]( - sc, testUtils.regionName, testUtils.endpointUrl, blockIds, ranges) - val collectedData = rdd.map { bytes => - new String(bytes).toInt - }.collect() - assert(collectedData.toSet === testData.toSet) - - // Verify that the block fetching is skipped when isBlockValid is set to false. - // This is done by using a RDD whose data is only in memory but is set to skip block fetching - // Using that RDD will throw exception, as it skips block fetching even if the blocks are in - // in BlockManager. - if (testIsBlockValid) { - require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager") - require(numPartitionsInKinesis === 0, "No partitions must be in Kinesis") - val rdd2 = new KinesisBackedBlockRDD[Array[Byte]]( - sc, testUtils.regionName, testUtils.endpointUrl, blockIds.toArray, ranges, - isBlockIdValid = Array.fill(blockIds.length)(false)) - intercept[SparkException] { - rdd2.collect() - } - } - - // Verify that the RDD is not invalid after the blocks are removed and can still read data - // from write ahead log - if (testBlockRemove) { - require(numPartitions === numPartitionsInKinesis, - "All partitions must be in WAL for this test") - require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test") - rdd.removeBlocks() - assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSet === testData.toSet) - } - } - - /** Generate fake block ids */ - private def fakeBlockIds(num: Int): Array[BlockId] = { - Array.tabulate(num) { i => new StreamBlockId(0, i) } - } -} - -class WithAggregationKinesisBackedBlockRDDSuite - extends KinesisBackedBlockRDDTests(aggregateTestData = true) - -class WithoutAggregationKinesisBackedBlockRDDSuite - extends KinesisBackedBlockRDDTests(aggregateTestData = false) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala deleted file mode 100644 index e1499a8220..0000000000 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis - -import java.util.concurrent.{ExecutorService, TimeoutException} - -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.concurrent.duration._ -import scala.language.postfixOps - -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer -import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester} -import org.scalatest.concurrent.Eventually -import org.scalatest.concurrent.Eventually._ -import org.scalatest.mock.MockitoSugar - -import org.apache.spark.streaming.{Duration, TestSuiteBase} -import org.apache.spark.util.ManualClock - -class KinesisCheckpointerSuite extends TestSuiteBase - with MockitoSugar - with BeforeAndAfterEach - with PrivateMethodTester - with Eventually { - - private val workerId = "dummyWorkerId" - private val shardId = "dummyShardId" - private val seqNum = "123" - private val otherSeqNum = "245" - private val checkpointInterval = Duration(10) - private val someSeqNum = Some(seqNum) - private val someOtherSeqNum = Some(otherSeqNum) - - private var receiverMock: KinesisReceiver[Array[Byte]] = _ - private var checkpointerMock: IRecordProcessorCheckpointer = _ - private var kinesisCheckpointer: KinesisCheckpointer = _ - private var clock: ManualClock = _ - - private val checkpoint = PrivateMethod[Unit]('checkpoint) - - override def beforeEach(): Unit = { - receiverMock = mock[KinesisReceiver[Array[Byte]]] - checkpointerMock = mock[IRecordProcessorCheckpointer] - clock = new ManualClock() - kinesisCheckpointer = new KinesisCheckpointer(receiverMock, checkpointInterval, workerId, clock) - } - - test("checkpoint is not called twice for the same sequence number") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) - kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) - kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) - - verify(checkpointerMock, times(1)).checkpoint(anyString()) - } - - test("checkpoint is called after sequence number increases") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) - .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) - kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) - kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock)) - - verify(checkpointerMock, times(1)).checkpoint(seqNum) - verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) - } - - test("should checkpoint if we have exceeded the checkpoint interval") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) - .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) - - kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) - clock.advance(5 * checkpointInterval.milliseconds) - - eventually(timeout(1 second)) { - verify(checkpointerMock, times(1)).checkpoint(seqNum) - verify(checkpointerMock, times(1)).checkpoint(otherSeqNum) - } - } - - test("shouldn't checkpoint if we have not exceeded the checkpoint interval") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) - - kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) - clock.advance(checkpointInterval.milliseconds / 2) - - verify(checkpointerMock, never()).checkpoint(anyString()) - } - - test("should not checkpoint for the same sequence number") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) - - kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) - - clock.advance(checkpointInterval.milliseconds * 5) - eventually(timeout(1 second)) { - verify(checkpointerMock, atMost(1)).checkpoint(anyString()) - } - } - - test("removing checkpointer checkpoints one last time") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) - - kinesisCheckpointer.removeCheckpointer(shardId, checkpointerMock) - verify(checkpointerMock, times(1)).checkpoint(anyString()) - } - - test("if checkpointing is going on, wait until finished before removing and checkpointing") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)) - .thenReturn(someSeqNum).thenReturn(someOtherSeqNum) - when(checkpointerMock.checkpoint(anyString)).thenAnswer(new Answer[Unit] { - override def answer(invocations: InvocationOnMock): Unit = { - clock.waitTillTime(clock.getTimeMillis() + checkpointInterval.milliseconds / 2) - } - }) - - kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock) - clock.advance(checkpointInterval.milliseconds) - eventually(timeout(1 second)) { - verify(checkpointerMock, times(1)).checkpoint(anyString()) - } - // don't block test thread - val f = Future(kinesisCheckpointer.removeCheckpointer(shardId, checkpointerMock))( - ExecutionContext.global) - - intercept[TimeoutException] { - Await.ready(f, 50 millis) - } - - clock.advance(checkpointInterval.milliseconds / 2) - eventually(timeout(1 second)) { - verify(checkpointerMock, times(2)).checkpoint(anyString()) - } - } -} diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala deleted file mode 100644 index ee428f31d6..0000000000 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis - -import org.apache.spark.SparkFunSuite - -/** - * Helper class that runs Kinesis real data transfer tests or - * ignores them based on env variable is set or not. - */ -trait KinesisFunSuite extends SparkFunSuite { - import KinesisTestUtils._ - - /** Run the test if environment variable is set or ignore the test */ - def testIfEnabled(testName: String)(testBody: => Unit) { - if (shouldRunTests) { - test(testName)(testBody) - } else { - ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody) - } - } - - /** Run the give body of code only if Kinesis tests are enabled */ - def runIfTestsEnabled(message: String)(body: => Unit): Unit = { - if (shouldRunTests) { - body - } else { - ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")() - } - } -} diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala deleted file mode 100644 index fd15b6ccdc..0000000000 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.kinesis - -import java.nio.ByteBuffer -import java.nio.charset.StandardCharsets -import java.util.Arrays - -import com.amazonaws.services.kinesis.clientlibrary.exceptions._ -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason -import com.amazonaws.services.kinesis.model.Record -import org.mockito.Matchers._ -import org.mockito.Matchers.{eq => meq} -import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfter, Matchers} -import org.scalatest.mock.MockitoSugar - -import org.apache.spark.streaming.{Duration, TestSuiteBase} -import org.apache.spark.util.Utils - -/** - * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor - */ -class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter - with MockitoSugar { - - val app = "TestKinesisReceiver" - val stream = "mySparkStream" - val endpoint = "endpoint-url" - val workerId = "dummyWorkerId" - val shardId = "dummyShardId" - val seqNum = "dummySeqNum" - val checkpointInterval = Duration(10) - val someSeqNum = Some(seqNum) - - val record1 = new Record() - record1.setData(ByteBuffer.wrap("Spark In Action".getBytes(StandardCharsets.UTF_8))) - val record2 = new Record() - record2.setData(ByteBuffer.wrap("Learning Spark".getBytes(StandardCharsets.UTF_8))) - val batch = Arrays.asList(record1, record2) - - var receiverMock: KinesisReceiver[Array[Byte]] = _ - var checkpointerMock: IRecordProcessorCheckpointer = _ - - override def beforeFunction(): Unit = { - receiverMock = mock[KinesisReceiver[Array[Byte]]] - checkpointerMock = mock[IRecordProcessorCheckpointer] - } - - test("check serializability of SerializableAWSCredentials") { - Utils.deserialize[SerializableAWSCredentials]( - Utils.serialize(new SerializableAWSCredentials("x", "y"))) - } - - test("process records including store and set checkpointer") { - when(receiverMock.isStopped()).thenReturn(false) - - val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) - recordProcessor.initialize(shardId) - recordProcessor.processRecords(batch, checkpointerMock) - - verify(receiverMock, times(1)).isStopped() - verify(receiverMock, times(1)).addRecords(shardId, batch) - verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock) - } - - test("shouldn't store and update checkpointer when receiver is stopped") { - when(receiverMock.isStopped()).thenReturn(true) - - val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) - recordProcessor.processRecords(batch, checkpointerMock) - - verify(receiverMock, times(1)).isStopped() - verify(receiverMock, never).addRecords(anyString, anyListOf(classOf[Record])) - verify(receiverMock, never).setCheckpointer(anyString, meq(checkpointerMock)) - } - - test("shouldn't update checkpointer when exception occurs during store") { - when(receiverMock.isStopped()).thenReturn(false) - when( - receiverMock.addRecords(anyString, anyListOf(classOf[Record])) - ).thenThrow(new RuntimeException()) - - intercept[RuntimeException] { - val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) - recordProcessor.initialize(shardId) - recordProcessor.processRecords(batch, checkpointerMock) - } - - verify(receiverMock, times(1)).isStopped() - verify(receiverMock, times(1)).addRecords(shardId, batch) - verify(receiverMock, never).setCheckpointer(anyString, meq(checkpointerMock)) - } - - test("shutdown should checkpoint if the reason is TERMINATE") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) - - val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) - recordProcessor.initialize(shardId) - recordProcessor.shutdown(checkpointerMock, ShutdownReason.TERMINATE) - - verify(receiverMock, times(1)).removeCheckpointer(meq(shardId), meq(checkpointerMock)) - } - - - test("shutdown should not checkpoint if the reason is something other than TERMINATE") { - when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) - - val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) - recordProcessor.initialize(shardId) - recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE) - recordProcessor.shutdown(checkpointerMock, null) - - verify(receiverMock, times(2)).removeCheckpointer(meq(shardId), - meq[IRecordProcessorCheckpointer](null)) - } - - test("retry success on first attempt") { - val expectedIsStopped = false - when(receiverMock.isStopped()).thenReturn(expectedIsStopped) - - val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100) - assert(actualVal == expectedIsStopped) - - verify(receiverMock, times(1)).isStopped() - } - - test("retry success on second attempt after a Kinesis throttling exception") { - val expectedIsStopped = false - when(receiverMock.isStopped()) - .thenThrow(new ThrottlingException("error message")) - .thenReturn(expectedIsStopped) - - val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100) - assert(actualVal == expectedIsStopped) - - verify(receiverMock, times(2)).isStopped() - } - - test("retry success on second attempt after a Kinesis dependency exception") { - val expectedIsStopped = false - when(receiverMock.isStopped()) - .thenThrow(new KinesisClientLibDependencyException("error message")) - .thenReturn(expectedIsStopped) - - val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100) - assert(actualVal == expectedIsStopped) - - verify(receiverMock, times(2)).isStopped() - } - - test("retry failed after a shutdown exception") { - when(checkpointerMock.checkpoint()).thenThrow(new ShutdownException("error message")) - - intercept[ShutdownException] { - KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100) - } - - verify(checkpointerMock, times(1)).checkpoint() - } - - test("retry failed after an invalid state exception") { - when(checkpointerMock.checkpoint()).thenThrow(new InvalidStateException("error message")) - - intercept[InvalidStateException] { - KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100) - } - - verify(checkpointerMock, times(1)).checkpoint() - } - - test("retry failed after unexpected exception") { - when(checkpointerMock.checkpoint()).thenThrow(new RuntimeException("error message")) - - intercept[RuntimeException] { - KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100) - } - - verify(checkpointerMock, times(1)).checkpoint() - } - - test("retry failed after exhausing all retries") { - val expectedErrorMessage = "final try error message" - when(checkpointerMock.checkpoint()) - .thenThrow(new ThrottlingException("error message")) - .thenThrow(new ThrottlingException(expectedErrorMessage)) - - val exception = intercept[RuntimeException] { - KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100) - } - exception.getMessage().shouldBe(expectedErrorMessage) - - verify(checkpointerMock, times(2)).checkpoint() - } -} diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala deleted file mode 100644 index ca5d13da46..0000000000 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kinesis - -import scala.collection.mutable -import scala.concurrent.duration._ -import scala.language.postfixOps -import scala.util.Random - -import com.amazonaws.regions.RegionUtils -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream -import com.amazonaws.services.kinesis.model.Record -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.scalatest.Matchers._ -import org.scalatest.concurrent.Eventually - -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.network.util.JavaUtils -import org.apache.spark.rdd.RDD -import org.apache.spark.storage.{StorageLevel, StreamBlockId} -import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream.ReceiverInputDStream -import org.apache.spark.streaming.kinesis.KinesisTestUtils._ -import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult -import org.apache.spark.streaming.scheduler.ReceivedBlockInfo -import org.apache.spark.util.Utils - -abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite - with Eventually with BeforeAndAfter with BeforeAndAfterAll { - - // This is the name that KCL will use to save metadata to DynamoDB - private val appName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}" - private val batchDuration = Seconds(1) - - // Dummy parameters for API testing - private val dummyEndpointUrl = defaultEndpointUrl - private val dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName() - private val dummyAWSAccessKey = "dummyAccessKey" - private val dummyAWSSecretKey = "dummySecretKey" - - private var testUtils: KinesisTestUtils = null - private var ssc: StreamingContext = null - private var sc: SparkContext = null - - override def beforeAll(): Unit = { - val conf = new SparkConf() - .setMaster("local[4]") - .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name - sc = new SparkContext(conf) - - runIfTestsEnabled("Prepare KinesisTestUtils") { - testUtils = new KPLBasedKinesisTestUtils() - testUtils.createStream() - } - } - - override def afterAll(): Unit = { - if (ssc != null) { - ssc.stop() - } - if (sc != null) { - sc.stop() - } - if (testUtils != null) { - // Delete the Kinesis stream as well as the DynamoDB table generated by - // Kinesis Client Library when consuming the stream - testUtils.deleteStream() - testUtils.deleteDynamoDBTable(appName) - } - } - - before { - ssc = new StreamingContext(sc, batchDuration) - } - - after { - if (ssc != null) { - ssc.stop(stopSparkContext = false) - ssc = null - } - if (testUtils != null) { - testUtils.deleteDynamoDBTable(appName) - } - } - - test("KinesisUtils API") { - // Tests the API, does not actually test data receiving - val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream", - dummyEndpointUrl, Seconds(2), - InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) - val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream", - dummyEndpointUrl, dummyRegionName, - InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2) - val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream", - dummyEndpointUrl, dummyRegionName, - InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2, - dummyAWSAccessKey, dummyAWSSecretKey) - } - - test("RDD generation") { - val inputStream = KinesisUtils.createStream(ssc, appName, "dummyStream", - dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2), - StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey) - assert(inputStream.isInstanceOf[KinesisInputDStream[Array[Byte]]]) - - val kinesisStream = inputStream.asInstanceOf[KinesisInputDStream[Array[Byte]]] - val time = Time(1000) - - // Generate block info data for testing - val seqNumRanges1 = SequenceNumberRanges( - SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy")) - val blockId1 = StreamBlockId(kinesisStream.id, 123) - val blockInfo1 = ReceivedBlockInfo( - 0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None)) - - val seqNumRanges2 = SequenceNumberRanges( - SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb")) - val blockId2 = StreamBlockId(kinesisStream.id, 345) - val blockInfo2 = ReceivedBlockInfo( - 0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None)) - - // Verify that the generated KinesisBackedBlockRDD has the all the right information - val blockInfos = Seq(blockInfo1, blockInfo2) - val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos) - nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[_]] - val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]] - assert(kinesisRDD.regionName === dummyRegionName) - assert(kinesisRDD.endpointUrl === dummyEndpointUrl) - assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds) - assert(kinesisRDD.awsCredentialsOption === - Some(SerializableAWSCredentials(dummyAWSAccessKey, dummyAWSSecretKey))) - assert(nonEmptyRDD.partitions.size === blockInfos.size) - nonEmptyRDD.partitions.foreach { _ shouldBe a [KinesisBackedBlockRDDPartition] } - val partitions = nonEmptyRDD.partitions.map { - _.asInstanceOf[KinesisBackedBlockRDDPartition] }.toSeq - assert(partitions.map { _.seqNumberRanges } === Seq(seqNumRanges1, seqNumRanges2)) - assert(partitions.map { _.blockId } === Seq(blockId1, blockId2)) - assert(partitions.forall { _.isBlockIdValid === true }) - - // Verify that KinesisBackedBlockRDD is generated even when there are no blocks - val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty) - emptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]] - emptyRDD.partitions shouldBe empty - - // Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid - blockInfos.foreach { _.setBlockIdInvalid() } - kinesisStream.createBlockRDD(time, blockInfos).partitions.foreach { partition => - assert(partition.asInstanceOf[KinesisBackedBlockRDDPartition].isBlockIdValid === false) - } - } - - - /** - * Test the stream by sending data to a Kinesis stream and receiving from it. - * This test is not run by default as it requires AWS credentials that the test - * environment may not have. Even if there is AWS credentials available, the user - * may not want to run these tests to avoid the Kinesis costs. To enable this test, - * you must have AWS credentials available through the default AWS provider chain, - * and you have to set the system environment variable RUN_KINESIS_TESTS=1 . - */ - testIfEnabled("basic operation") { - val awsCredentials = KinesisTestUtils.getAWSCredentials() - val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, - testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, - awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) - - val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] - stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => - collected ++= rdd.collect() - logInfo("Collected = " + collected.mkString(", ")) - } - ssc.start() - - val testData = 1 to 10 - eventually(timeout(120 seconds), interval(10 second)) { - testUtils.pushData(testData, aggregateTestData) - assert(collected === testData.toSet, "\nData received does not match data sent") - } - ssc.stop(stopSparkContext = false) - } - - testIfEnabled("custom message handling") { - val awsCredentials = KinesisTestUtils.getAWSCredentials() - def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 - val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, - testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, addFive, - awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) - - stream shouldBe a [ReceiverInputDStream[_]] - - val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] - stream.foreachRDD { rdd => - collected ++= rdd.collect() - logInfo("Collected = " + collected.mkString(", ")) - } - ssc.start() - - val testData = 1 to 10 - eventually(timeout(120 seconds), interval(10 second)) { - testUtils.pushData(testData, aggregateTestData) - val modData = testData.map(_ + 5) - assert(collected === modData.toSet, "\nData received does not match data sent") - } - ssc.stop(stopSparkContext = false) - } - - testIfEnabled("failure recovery") { - val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) - val checkpointDir = Utils.createTempDir().getAbsolutePath - - ssc = new StreamingContext(sc, Milliseconds(1000)) - ssc.checkpoint(checkpointDir) - - val awsCredentials = KinesisTestUtils.getAWSCredentials() - val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])] - - val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, - testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, - awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) - - // Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch - kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => { - val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] - val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq - collectedData.synchronized { - collectedData(time) = (kRdd.arrayOfseqNumberRanges, data) - } - }) - - ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint - ssc.start() - - def numBatchesWithData: Int = - collectedData.synchronized { collectedData.count(_._2._2.nonEmpty) } - - def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty - - // Run until there are at least 10 batches with some data in them - // If this times out because numBatchesWithData is empty, then its likely that foreachRDD - // function failed with exceptions, and nothing got added to `collectedData` - eventually(timeout(2 minutes), interval(1 seconds)) { - testUtils.pushData(1 to 5, aggregateTestData) - assert(isCheckpointPresent && numBatchesWithData > 10) - } - ssc.stop(stopSparkContext = true) // stop the SparkContext so that the blocks are not reused - - // Restart the context from checkpoint and verify whether the - logInfo("Restarting from checkpoint") - ssc = new StreamingContext(checkpointDir) - ssc.start() - val recoveredKinesisStream = ssc.graph.getInputStreams().head - - // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges - // and return the same data - collectedData.synchronized { - val times = collectedData.keySet - times.foreach { time => - val (arrayOfSeqNumRanges, data) = collectedData(time) - val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]] - rdd shouldBe a[KinesisBackedBlockRDD[_]] - - // Verify the recovered sequence ranges - val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] - assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size) - arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) => - assert(expected.ranges.toSeq === found.ranges.toSeq) - } - - // Verify the recovered data - assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data) - } - } - ssc.stop() - } -} - -class WithAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = true) - -class WithoutAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = false) diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml deleted file mode 100644 index bfb92791de..0000000000 --- a/extras/spark-ganglia-lgpl/pom.xml +++ /dev/null @@ -1,49 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -~ Licensed to the Apache Software Foundation (ASF) under one or more -~ contributor license agreements. See the NOTICE file distributed with -~ this work for additional information regarding copyright ownership. -~ The ASF licenses this file to You under the Apache License, Version 2.0 -~ (the "License"); you may not use this file except in compliance with -~ the License. You may obtain a copy of the License at -~ -~ http://www.apache.org/licenses/LICENSE-2.0 -~ -~ Unless required by applicable law or agreed to in writing, software -~ distributed under the License is distributed on an "AS IS" BASIS, -~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -~ See the License for the specific language governing permissions and -~ limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>2.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <!-- Ganglia integration is not included by default due to LGPL-licensed code --> - <groupId>org.apache.spark</groupId> - <artifactId>spark-ganglia-lgpl_2.11</artifactId> - <packaging>jar</packaging> - <name>Spark Ganglia Integration</name> - - <properties> - <sbt.project.name>ganglia-lgpl</sbt.project.name> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-ganglia</artifactId> - </dependency> - </dependencies> -</project> diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala deleted file mode 100644 index 3b1880e143..0000000000 --- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.metrics.sink - -import java.util.Properties -import java.util.concurrent.TimeUnit - -import com.codahale.metrics.MetricRegistry -import com.codahale.metrics.ganglia.GangliaReporter -import info.ganglia.gmetric4j.gmetric.GMetric -import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode - -import org.apache.spark.SecurityManager -import org.apache.spark.metrics.MetricsSystem - -class GangliaSink(val property: Properties, val registry: MetricRegistry, - securityMgr: SecurityManager) extends Sink { - val GANGLIA_KEY_PERIOD = "period" - val GANGLIA_DEFAULT_PERIOD = 10 - - val GANGLIA_KEY_UNIT = "unit" - val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS - - val GANGLIA_KEY_MODE = "mode" - val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST - - // TTL for multicast messages. If listeners are X hops away in network, must be at least X. - val GANGLIA_KEY_TTL = "ttl" - val GANGLIA_DEFAULT_TTL = 1 - - val GANGLIA_KEY_HOST = "host" - val GANGLIA_KEY_PORT = "port" - - def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop)) - - if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) { - throw new Exception("Ganglia sink requires 'host' property.") - } - - if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) { - throw new Exception("Ganglia sink requires 'port' property.") - } - - val host = propertyToOption(GANGLIA_KEY_HOST).get - val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt - val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) - val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE) - .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) - val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) - .getOrElse(GANGLIA_DEFAULT_PERIOD) - val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT) - .map(u => TimeUnit.valueOf(u.toUpperCase)) - .getOrElse(GANGLIA_DEFAULT_UNIT) - - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - - val ganglia = new GMetric(host, port, mode, ttl) - val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .convertRatesTo(TimeUnit.SECONDS) - .build(ganglia) - - override def start() { - reporter.start(pollPeriod, pollUnit) - } - - override def stop() { - reporter.stop() - } - - override def report() { - reporter.report() - } -} - |